Просмотр исходного кода

r12763@Kushana: nickm | 2007-04-20 18:42:58 -0400
Initial version of code to stop using socket pairs for linked connections. Superficially, it seems to work, but it probably needs a lot more testing and attention.


svn:r9995

Nick Mathewson 19 лет назад
Родитель
Сommit
648065fcb4
9 измененных файлов с 420 добавлено и 123 удалено
  1. 5 0
      ChangeLog
  2. 39 11
      doc/TODO
  3. 26 0
      src/or/buffers.c
  4. 117 11
      src/or/connection.c
  5. 17 47
      src/or/connection_edge.c
  6. 8 3
      src/or/directory.c
  7. 157 44
      src/or/main.c
  8. 18 4
      src/or/or.h
  9. 33 3
      src/or/test.c

+ 5 - 0
ChangeLog

@@ -21,6 +21,11 @@ Changes in version 0.2.0.1-alpha - 2007-??-??
     - Count the number of open sockets separately from the number of active
     - Count the number of open sockets separately from the number of active
       connection_t objects.  This will let us avoid underusing our
       connection_t objects.  This will let us avoid underusing our
       allocated connection limit.
       allocated connection limit.
+    - We no longer use socket pairs to link an edge connection to an
+      anonymous directory connection.  Instead, we track the link
+      internally and transfer the data in-process.  This saves two
+      sockets per anonymous directory connection (at the client and at
+      the server), and avoids the nasty Windows socketpair() workaround.
 
 
   o Minor features (build):
   o Minor features (build):
     - Make autoconf search for libevent, openssl, and zlib consistently.
     - Make autoconf search for libevent, openssl, and zlib consistently.

+ 39 - 11
doc/TODO

@@ -54,6 +54,19 @@ N   . Document transport and natdport
 Things we'd like to do in 0.2.0.x:
 Things we'd like to do in 0.2.0.x:
   - Proposals:
   - Proposals:
     - 101: Voting on the Tor Directory System
     - 101: Voting on the Tor Directory System
+      - Prepare ASAP for new voting formats
+        - Don't flip out with warnings when voting-related URLs are
+          uploaded/downloaded.
+      - Finalize proposal
+      - Get authorities voting
+        - Implement parsing for new document formats
+        - Code to generate votes
+        - Code to generate consensus from a list of votes
+        - Add a signature to a consensus.
+        - Code to check signatures on a consensus
+        - Push/pull documents as appropriate.
+      - Start caching consensus documents once authorities make them
+      - Start downloading and using consensus documents once caches serve them
     . 104: Long and Short Router Descriptors (by Jun 1)
     . 104: Long and Short Router Descriptors (by Jun 1)
       . Finalize proposal
       . Finalize proposal
       o Implement parsing for extra-info documents
       o Implement parsing for extra-info documents
@@ -109,14 +122,24 @@ Things we'd like to do in 0.2.0.x:
           to make sure that we call the event base dispatch function enough.)
           to make sure that we call the event base dispatch function enough.)
       . Implement
       . Implement
         o Count connections and sockets separately
         o Count connections and sockets separately
-        - Allow connections with s == -1
-        - Add a linked_conn field; it should get marked when we're marked.
-        - Add a function to move bytes from buffer to buffer.
-        - Have handle_read dtrt for linked connections
-        - Have an activate/deactivate_linked_connection function.
-        - Have activated functions added to a list on first activation, and
+        . Allow connections with s == -1
+        o Add a linked_conn field; it should get marked when we're marked.
+        o Add a function to move bytes from buffer to buffer.
+        o Have read_to_buf dtrt for linked connections
+        o Have handle_read dtrt for linked connections
+        o Have an activate/deactivate_linked_connection function.
+        o Have activated connections added to a list on first activation, and
           that list made active before calls to event_loop.
           that list made active before calls to event_loop.
-    - Generate torrc.{complete|sample}.in, tor.1.in, the HTML manual, and the
+        o Have connections get deactivated when no more data to write on
+          linked conn outbuf.
+        o Handle closing connections properly.
+        o Actually create and use linked connections.
+        - Handle rate-limiting on directory writes to linked directory
+          connections in a more sensible manner.
+        - Rename want_to_read and want_to_write; they're actually about
+          being blocked, not about wanting to read/write.
+        - Find more ways to test this.
+    D Generate torrc.{complete|sample}.in, tor.1.in, the HTML manual, and the
       online config documentation from a single source.
       online config documentation from a single source.
     - Have clients do TLS connection rotation less often than "every 10
     - Have clients do TLS connection rotation less often than "every 10
       minutes" in the thrashy case, and more often than "once a week" in the
       minutes" in the thrashy case, and more often than "once a week" in the
@@ -172,6 +195,8 @@ Things we'd like to do in 0.2.0.x:
     - Blocking-resistance.
     - Blocking-resistance.
     - It would be potentially helpful to https requests on the OR port by
     - It would be potentially helpful to https requests on the OR port by
       acting like an HTTPS server.
       acting like an HTTPS server.
+    - Audit how much RAM we're using for buffers and cell pools; try to
+      trim down a lot.
   o Deprecations:
   o Deprecations:
     o Remove v0 control protocol.
     o Remove v0 control protocol.
 P  - Packaging:
 P  - Packaging:
@@ -280,7 +305,7 @@ M   - rewrite how libevent does select() on win32 so it's not so very slow.
 
 
 Minor items for 0.1.2.x as time permits:
 Minor items for 0.1.2.x as time permits:
   - include bandwidth breakdown by conn->type in BW events.
   - include bandwidth breakdown by conn->type in BW events.
-  - Unify autoconf search code for libevent and openssl.  Make code
+  o Unify autoconf search code for libevent and openssl.  Make code
     suggest platform-appropriate "devel" / "dev" / whatever packages
     suggest platform-appropriate "devel" / "dev" / whatever packages
     if we can link but we can't find the headers.
     if we can link but we can't find the headers.
   - Recommend polipo? Please?
   - Recommend polipo? Please?
@@ -318,7 +343,7 @@ R - add d64 and fp64 along-side d and fp so people can paste status
     the solution is to have a separate 'extend-data' cell type
     the solution is to have a separate 'extend-data' cell type
     which is used for the first N data cells, and only
     which is used for the first N data cells, and only
     extend-data cells can be extend requests.
     extend-data cells can be extend requests.
-    - Specify, including thought about anonymity implications.
+    . Specify, including thought about anonymity implications. [proposal 110]
   - Display the reasons in 'destroy' and 'truncated' cells under some
   - Display the reasons in 'destroy' and 'truncated' cells under some
     circumstances?
     circumstances?
   - If the server is spewing complaints about raising your ulimit -n,
   - If the server is spewing complaints about raising your ulimit -n,
@@ -373,7 +398,7 @@ Future version:
     I can say "banana" as my bandwidthcapacity, and it won't even squeak.
     I can say "banana" as my bandwidthcapacity, and it won't even squeak.
   o Include the output of svn info in the binary, so it's trivial to see what
   o Include the output of svn info in the binary, so it's trivial to see what
     version a binary was built from.
     version a binary was built from.
-    - Do the same for svk info.
+    o Do the same for svk info.
   - Add a doxygen style checker to make check-spaces so nick doesn't drift
   - Add a doxygen style checker to make check-spaces so nick doesn't drift
     too far from arma's undocumented styleguide.  Also, document that
     too far from arma's undocumented styleguide.  Also, document that
     styleguide in HACKING.  (See r9634 for example.)
     styleguide in HACKING.  (See r9634 for example.)
@@ -388,7 +413,10 @@ Future version:
   - Should TrackHostExits expire TrackHostExitsExpire seconds after their
   - Should TrackHostExits expire TrackHostExitsExpire seconds after their
     *last* use, not their *first* use?
     *last* use, not their *first* use?
   X Configuration format really wants sections.
   X Configuration format really wants sections.
-  - Good RBL substitute.
+  . Good RBL substitute.
+    - Play with the implementations; link them from somewhere; add a
+      round-robin link from torel.torproject.org; describe how to
+      use them in the FAQ.
   - Authorities should try using exits for http to connect to some URLS
   - Authorities should try using exits for http to connect to some URLS
     (specified in a configuration file, so as not to make the List Of Things
     (specified in a configuration file, so as not to make the List Of Things
     Not To Censor completely obvious) and ask them for results.  Exits that
     Not To Censor completely obvious) and ask them for results.  Exits that

+ 26 - 0
src/or/buffers.c

@@ -817,6 +817,32 @@ fetch_from_buf(char *string, size_t string_len, buf_t *buf)
   return buf->datalen;
   return buf->datalen;
 }
 }
 
 
+/** Move up to <b>buf_flushlen</b> bytes from <b>buf_in</b> to <b>buf_out</b>.
+ * Return the number of bytes actually copied.
+ */
+int
+move_buf_to_buf(buf_t *buf_out, buf_t *buf_in, size_t *buf_flushlen)
+{
+  char b[4096];
+  size_t cp, len;
+  len = *buf_flushlen;
+  if (len > buf_in->datalen)
+    len = buf_in->datalen;
+
+  cp = len; /* Remember the number of bytes we intend to copy. */
+  while (len) {
+    /* This isn't the most efficient implementation one could imagine, since
+     * it does two copies instead of 1, but I kinda doubt that this will be
+     * critical path. */
+    size_t n = len > sizeof(b) ? sizeof(b) : len;
+    fetch_from_buf(b, n, buf_in);
+    write_to_buf(b, n, buf_out);
+    len -= n;
+  }
+  *buf_flushlen -= cp;
+  return cp;
+}
+
 /** There is a (possibly incomplete) http statement on <b>buf</b>, of the
 /** There is a (possibly incomplete) http statement on <b>buf</b>, of the
  * form "\%s\\r\\n\\r\\n\%s", headers, body. (body may contain nuls.)
  * form "\%s\\r\\n\\r\\n\%s", headers, body. (body may contain nuls.)
  * If a) the headers include a Content-Length field and all bytes in
  * If a) the headers include a Content-Length field and all bytes in

+ 117 - 11
src/or/connection.c

@@ -113,6 +113,7 @@ conn_state_to_string(int type, int state)
         case DIR_CONN_STATE_CONNECTING: return "connecting";
         case DIR_CONN_STATE_CONNECTING: return "connecting";
         case DIR_CONN_STATE_CLIENT_SENDING: return "client sending";
         case DIR_CONN_STATE_CLIENT_SENDING: return "client sending";
         case DIR_CONN_STATE_CLIENT_READING: return "client reading";
         case DIR_CONN_STATE_CLIENT_READING: return "client reading";
+        case DIR_CONN_STATE_CLIENT_FINISHED: return "client finished";
         case DIR_CONN_STATE_SERVER_COMMAND_WAIT: return "waiting for command";
         case DIR_CONN_STATE_SERVER_COMMAND_WAIT: return "waiting for command";
         case DIR_CONN_STATE_SERVER_WRITING: return "writing";
         case DIR_CONN_STATE_SERVER_WRITING: return "writing";
       }
       }
@@ -212,9 +213,22 @@ connection_new(int type)
   return conn;
   return conn;
 }
 }
 
 
+/** Create a link between <b>conn_a</b> and <b>conn_b</b> */
+void
+connection_link_connections(connection_t *conn_a, connection_t *conn_b)
+{
+  tor_assert(conn_a->s < 0);
+  tor_assert(conn_b->s < 0);
+
+  conn_a->linked = 1;
+  conn_b->linked = 1;
+  conn_a->linked_conn = conn_b;
+  conn_b->linked_conn = conn_a;
+}
+
 /** Tell libevent that we don't care about <b>conn</b> any more. */
 /** Tell libevent that we don't care about <b>conn</b> any more. */
 void
 void
-connection_unregister(connection_t *conn)
+connection_unregister_events(connection_t *conn)
 {
 {
   if (conn->read_event) {
   if (conn->read_event) {
     if (event_del(conn->read_event))
     if (event_del(conn->read_event))
@@ -260,6 +274,17 @@ _connection_free(connection_t *conn)
       break;
       break;
   }
   }
 
 
+  if (conn->linked) {
+    int severity = buf_datalen(conn->inbuf)+buf_datalen(conn->outbuf)
+      ? LOG_NOTICE : LOG_INFO;
+    log_fn(severity, LD_GENERAL, "Freeing linked %s connection [%s] with %d "
+           "bytes on inbuf, %d on outbuf.",
+           conn_type_to_string(conn->type),
+           conn_state_to_string(conn->type, conn->state),
+           (int)buf_datalen(conn->inbuf), (int)buf_datalen(conn->outbuf));
+    // tor_assert(!buf_datalen(conn->outbuf)); /*XXXX020 remove me.*/
+  }
+
   if (!connection_is_listener(conn)) {
   if (!connection_is_listener(conn)) {
     buf_free(conn->inbuf);
     buf_free(conn->inbuf);
     buf_free(conn->outbuf);
     buf_free(conn->outbuf);
@@ -325,6 +350,15 @@ connection_free(connection_t *conn)
   tor_assert(conn);
   tor_assert(conn);
   tor_assert(!connection_is_on_closeable_list(conn));
   tor_assert(!connection_is_on_closeable_list(conn));
   tor_assert(!connection_in_array(conn));
   tor_assert(!connection_in_array(conn));
+  if (conn->linked_conn) {
+    log_err(LD_BUG, "Called with conn->linked_conn still set.");
+    tor_fragile_assert();
+    conn->linked_conn->linked_conn = NULL;
+    if (! conn->linked_conn->marked_for_close &&
+        conn->linked_conn->reading_from_linked_conn)
+      connection_start_reading(conn->linked_conn);
+    conn->linked_conn = NULL;
+  }
   if (connection_speaks_cells(conn)) {
   if (connection_speaks_cells(conn)) {
     if (conn->state == OR_CONN_STATE_OPEN)
     if (conn->state == OR_CONN_STATE_OPEN)
       directory_set_dirty();
       directory_set_dirty();
@@ -336,7 +370,7 @@ connection_free(connection_t *conn)
     TO_CONTROL_CONN(conn)->event_mask = 0;
     TO_CONTROL_CONN(conn)->event_mask = 0;
     control_update_global_event_mask();
     control_update_global_event_mask();
   }
   }
-  connection_unregister(conn);
+  connection_unregister_events(conn);
   _connection_free(conn);
   _connection_free(conn);
 }
 }
 
 
@@ -486,7 +520,7 @@ void
 connection_close_immediate(connection_t *conn)
 connection_close_immediate(connection_t *conn)
 {
 {
   assert_connection_ok(conn,0);
   assert_connection_ok(conn,0);
-  if (conn->s < 0) {
+  if (conn->s < 0 && !conn->linked) {
     log_err(LD_BUG,"Attempt to close already-closed connection.");
     log_err(LD_BUG,"Attempt to close already-closed connection.");
     tor_fragile_assert();
     tor_fragile_assert();
     return;
     return;
@@ -498,9 +532,10 @@ connection_close_immediate(connection_t *conn)
              (int)conn->outbuf_flushlen);
              (int)conn->outbuf_flushlen);
   }
   }
 
 
-  connection_unregister(conn);
+  connection_unregister_events(conn);
 
 
-  tor_close_socket(conn->s);
+  if (conn->s >= 0)
+    tor_close_socket(conn->s);
   conn->s = -1;
   conn->s = -1;
   if (!connection_is_listener(conn)) {
   if (!connection_is_listener(conn)) {
     buf_clear(conn->outbuf);
     buf_clear(conn->outbuf);
@@ -529,6 +564,12 @@ _connection_mark_for_close(connection_t *conn, int line, const char *file)
   conn->marked_for_close_file = file;
   conn->marked_for_close_file = file;
   add_connection_to_closeable_list(conn);
   add_connection_to_closeable_list(conn);
 
 
+#if 0
+  /* XXXX020 Actually, I don't think this is right. */
+  if (conn->linked_conn && !conn->linked_conn->marked_for_close)
+    _connection_mark_for_close(conn->linked_conn, line, file);
+#endif
+
   /* in case we're going to be held-open-til-flushed, reset
   /* in case we're going to be held-open-til-flushed, reset
    * the number of seconds since last successful write, so
    * the number of seconds since last successful write, so
    * we get our whole 15 seconds */
    * we get our whole 15 seconds */
@@ -1101,11 +1142,14 @@ retry_all_listeners(int force, smartlist_t *replaced_conns,
 
 
 /** Return 1 if we should apply rate limiting to <b>conn</b>,
 /** Return 1 if we should apply rate limiting to <b>conn</b>,
  * and 0 otherwise. Right now this just checks if it's an internal
  * and 0 otherwise. Right now this just checks if it's an internal
- * IP address. */
+ * IP address or an internal connection. */
 static int
 static int
 connection_is_rate_limited(connection_t *conn)
 connection_is_rate_limited(connection_t *conn)
 {
 {
-  return !is_internal_IP(conn->addr, 0);
+  if (conn->linked || is_internal_IP(conn->addr, 0))
+    return 0;
+  else
+    return 1;
 }
 }
 
 
 extern int global_read_bucket, global_write_bucket;
 extern int global_read_bucket, global_write_bucket;
@@ -1483,6 +1527,7 @@ int
 connection_handle_read(connection_t *conn)
 connection_handle_read(connection_t *conn)
 {
 {
   int max_to_read=-1, try_to_read;
   int max_to_read=-1, try_to_read;
+  size_t before, n_read = 0;
 
 
   if (conn->marked_for_close)
   if (conn->marked_for_close)
     return 0; /* do nothing */
     return 0; /* do nothing */
@@ -1505,6 +1550,8 @@ connection_handle_read(connection_t *conn)
 loop_again:
 loop_again:
   try_to_read = max_to_read;
   try_to_read = max_to_read;
   tor_assert(!conn->marked_for_close);
   tor_assert(!conn->marked_for_close);
+
+  before = buf_datalen(conn->inbuf);
   if (connection_read_to_buf(conn, &max_to_read) < 0) {
   if (connection_read_to_buf(conn, &max_to_read) < 0) {
     /* There's a read error; kill the connection.*/
     /* There's a read error; kill the connection.*/
     connection_close_immediate(conn); /* Don't flush; connection is dead. */
     connection_close_immediate(conn); /* Don't flush; connection is dead. */
@@ -1517,6 +1564,7 @@ loop_again:
     connection_mark_for_close(conn);
     connection_mark_for_close(conn);
     return -1;
     return -1;
   }
   }
+  n_read += buf_datalen(conn->inbuf) - before;
   if (CONN_IS_EDGE(conn) && try_to_read != max_to_read) {
   if (CONN_IS_EDGE(conn) && try_to_read != max_to_read) {
     /* instruct it not to try to package partial cells. */
     /* instruct it not to try to package partial cells. */
     if (connection_process_inbuf(conn, 0) < 0) {
     if (connection_process_inbuf(conn, 0) < 0) {
@@ -1533,6 +1581,27 @@ loop_again:
       connection_process_inbuf(conn, 1) < 0) {
       connection_process_inbuf(conn, 1) < 0) {
     return -1;
     return -1;
   }
   }
+  if (conn->linked_conn) {
+    /* The other side's handle_write will never actually get called, so
+     * we need to invoke the appropriate callbacks ourself. */
+    connection_t *linked = conn->linked_conn;
+    /* XXXX020 Do we need to ensure that this stuff is called even if
+     * conn dies in a way that causes us to return -1 earlier? */
+
+    if (n_read) {
+      /* Probably a no-op, but hey. */
+      connection_buckets_decrement(linked, time(NULL), 0, n_read);
+
+      if (connection_flushed_some(linked) < 0)
+        connection_mark_for_close(linked);
+      if (!connection_wants_to_flush(linked))
+        connection_finished_flushing(linked);
+    }
+
+    if (!buf_datalen(linked->outbuf) && conn->active_on_link)
+      connection_stop_reading_from_linked_conn(conn);
+  }
+  /* If we hit the EOF, call connection_reached_eof. */
   if (!conn->marked_for_close &&
   if (!conn->marked_for_close &&
       conn->inbuf_reached_eof &&
       conn->inbuf_reached_eof &&
       connection_reached_eof(conn) < 0) {
       connection_reached_eof(conn) < 0) {
@@ -1541,9 +1610,9 @@ loop_again:
   return 0;
   return 0;
 }
 }
 
 
-/** Pull in new bytes from conn-\>s onto conn-\>inbuf, either
- * directly or via TLS. Reduce the token buckets by the number of
- * bytes read.
+/** Pull in new bytes from conn-\>s or conn-\>linked_conn onto conn-\>inbuf,
+ * either directly or via TLS. Reduce the token buckets by the number of bytes
+ * read.
  *
  *
  * If *max_to_read is -1, then decide it ourselves, else go with the
  * If *max_to_read is -1, then decide it ourselves, else go with the
  * value passed to us. When returning, if it's changed, subtract the
  * value passed to us. When returning, if it's changed, subtract the
@@ -1633,7 +1702,24 @@ connection_read_to_buf(connection_t *conn, int *max_to_read)
     tor_tls_get_n_raw_bytes(or_conn->tls, &n_read, &n_written);
     tor_tls_get_n_raw_bytes(or_conn->tls, &n_read, &n_written);
     log_debug(LD_GENERAL, "After TLS read of %d: %ld read, %ld written",
     log_debug(LD_GENERAL, "After TLS read of %d: %ld read, %ld written",
               result, (long)n_read, (long)n_written);
               result, (long)n_read, (long)n_written);
+  } else if (conn->linked) {
+    if (conn->linked_conn) {
+      result = move_buf_to_buf(conn->inbuf, conn->linked_conn->outbuf,
+                               &conn->linked_conn->outbuf_flushlen);
+    } else {
+      result = 0;
+    }
+    //log_notice(LD_GENERAL, "Moved %d bytes on an internal link!", result);
+    /* If the other side has disappeared, or if it's been marked for close and
+     * we flushed its outbuf, then we should set our inbuf_reached_eof. */
+    if (!conn->linked_conn ||
+        (conn->linked_conn->marked_for_close &&
+         buf_datalen(conn->linked_conn->outbuf) == 0))
+      conn->inbuf_reached_eof = 1;
+
+    n_read = (size_t) result;
   } else {
   } else {
+    /* !connection_speaks_cells, !conn->linked_conn. */
     int reached_eof = 0;
     int reached_eof = 0;
     CONN_LOG_PROTECT(conn,
     CONN_LOG_PROTECT(conn,
         result = read_to_buf(conn->s, at_most, conn->inbuf, &reached_eof));
         result = read_to_buf(conn->s, at_most, conn->inbuf, &reached_eof));
@@ -1687,7 +1773,7 @@ connection_fetch_from_buf(char *string, size_t len, connection_t *conn)
 int
 int
 connection_wants_to_flush(connection_t *conn)
 connection_wants_to_flush(connection_t *conn)
 {
 {
-  return conn->outbuf_flushlen;
+  return conn->outbuf_flushlen > 0;
 }
 }
 
 
 /** Are there too many bytes on edge connection <b>conn</b>'s outbuf to
 /** Are there too many bytes on edge connection <b>conn</b>'s outbuf to
@@ -2203,6 +2289,19 @@ connection_state_is_connecting(connection_t *conn)
   return 0;
   return 0;
 }
 }
 
 
+/** DOCDOC */
+int
+connection_should_read_from_linked_conn(connection_t *conn)
+{
+  if (conn->linked && conn->reading_from_linked_conn) {
+    if (! conn->linked_conn ||
+        (conn->linked_conn->writing_to_linked_conn &&
+         buf_datalen(conn->linked_conn->outbuf)))
+      return 1;
+  }
+  return 0;
+}
+
 /** Allocates a base64'ed authenticator for use in http or https
 /** Allocates a base64'ed authenticator for use in http or https
  * auth, based on the input string <b>authenticator</b>. Returns it
  * auth, based on the input string <b>authenticator</b>. Returns it
  * if success, else returns NULL. */
  * if success, else returns NULL. */
@@ -2433,6 +2532,13 @@ assert_connection_ok(connection_t *conn, time_t now)
       break;
       break;
   }
   }
 
 
+  if (conn->linked_conn) {
+    tor_assert(conn->linked_conn->linked_conn == conn);
+    tor_assert(conn->linked != 0);
+  }
+  if (conn->linked)
+    tor_assert(conn->s < 0);
+
   if (conn->outbuf_flushlen > 0) {
   if (conn->outbuf_flushlen > 0) {
     tor_assert(connection_is_writing(conn) || conn->wants_to_write ||
     tor_assert(connection_is_writing(conn) || conn->wants_to_write ||
                conn->edge_blocked_on_circ);
                conn->edge_blocked_on_circ);

+ 17 - 47
src/or/connection_edge.c

@@ -1860,32 +1860,21 @@ connection_ap_handshake_send_resolve(edge_connection_t *ap_conn)
  * and call connection_ap_handshake_attach_circuit(conn) on it.
  * and call connection_ap_handshake_attach_circuit(conn) on it.
  *
  *
  * Return the other end of the socketpair, or -1 if error.
  * Return the other end of the socketpair, or -1 if error.
+ *
+ * DOCDOC The above is now wrong; we use links.
+ * DOCDOC start_reading
  */
  */
-int
+edge_connection_t *
 connection_ap_make_bridge(char *address, uint16_t port,
 connection_ap_make_bridge(char *address, uint16_t port,
                           const char *digest, int command)
                           const char *digest, int command)
 {
 {
-  int fd[2];
   edge_connection_t *conn;
   edge_connection_t *conn;
-  int err;
-
-  log_info(LD_APP,"Making AP bridge to %s:%d ...",safe_str(address),port);
-
-  if ((err = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) < 0) {
-    log_warn(LD_NET,
-             "Couldn't construct socketpair (%s). Network down? Delaying.",
-             tor_socket_strerror(-err));
-    return -1;
-  }
-
-  tor_assert(fd[0] >= 0);
-  tor_assert(fd[1] >= 0);
 
 
-  set_socket_nonblocking(fd[0]);
-  set_socket_nonblocking(fd[1]);
+  log_notice(LD_APP,"Making internal anonymized tunnel to %s:%d ...",
+             safe_str(address),port); /* XXXX020 Downgrade back to info. */
 
 
   conn = TO_EDGE_CONN(connection_new(CONN_TYPE_AP));
   conn = TO_EDGE_CONN(connection_new(CONN_TYPE_AP));
-  conn->_base.s = fd[0];
+  conn->_base.linked = 1; /* so that we can add it safely below. */
 
 
   /* populate conn->socks_request */
   /* populate conn->socks_request */
 
 
@@ -1903,28 +1892,25 @@ connection_ap_make_bridge(char *address, uint16_t port,
                   digest, DIGEST_LEN);
                   digest, DIGEST_LEN);
   }
   }
 
 
-  conn->_base.address = tor_strdup("(local bridge)");
+  conn->_base.address = tor_strdup("(local bridge)"); /*XXXX020 no "bridge"*/
   conn->_base.addr = 0;
   conn->_base.addr = 0;
   conn->_base.port = 0;
   conn->_base.port = 0;
 
 
   if (connection_add(TO_CONN(conn)) < 0) { /* no space, forget it */
   if (connection_add(TO_CONN(conn)) < 0) { /* no space, forget it */
-    connection_free(TO_CONN(conn)); /* this closes fd[0] */
-    tor_close_socket(fd[1]);
-    return -1;
+    connection_free(TO_CONN(conn));
+    return NULL;
   }
   }
 
 
   conn->_base.state = AP_CONN_STATE_CIRCUIT_WAIT;
   conn->_base.state = AP_CONN_STATE_CIRCUIT_WAIT;
-  connection_start_reading(TO_CONN(conn));
 
 
   /* attaching to a dirty circuit is fine */
   /* attaching to a dirty circuit is fine */
   if (connection_ap_handshake_attach_circuit(conn) < 0) {
   if (connection_ap_handshake_attach_circuit(conn) < 0) {
     connection_mark_unattached_ap(conn, END_STREAM_REASON_CANT_ATTACH);
     connection_mark_unattached_ap(conn, END_STREAM_REASON_CANT_ATTACH);
-    tor_close_socket(fd[1]);
-    return -1;
+    return NULL;
   }
   }
 
 
   log_info(LD_APP,"... AP bridge created and connected.");
   log_info(LD_APP,"... AP bridge created and connected.");
-  return fd[1];
+  return conn;
 }
 }
 
 
 /** Send an answer to an AP connection that has requested a DNS lookup
 /** Send an answer to an AP connection that has requested a DNS lookup
@@ -2406,37 +2392,19 @@ connection_exit_connect(edge_connection_t *edge_conn)
  * back an end cell for).  Return -(some circuit end reason) if the circuit
  * back an end cell for).  Return -(some circuit end reason) if the circuit
  * needs to be torn down.  Either connects exit_conn, frees it, or marks it,
  * needs to be torn down.  Either connects exit_conn, frees it, or marks it,
  * as appropriate.
  * as appropriate.
+ *
+ * DOCDOC no longer uses socketpair
  */
  */
 static int
 static int
 connection_exit_connect_dir(edge_connection_t *exit_conn)
 connection_exit_connect_dir(edge_connection_t *exit_conn)
 {
 {
-  int fd[2];
-  int err;
   dir_connection_t *dir_conn = NULL;
   dir_connection_t *dir_conn = NULL;
 
 
-  log_info(LD_EXIT, "Opening dir bridge");
-
-  if ((err = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) < 0) {
-    log_warn(LD_NET,
-             "Couldn't construct socketpair (%s). "
-             "Network down? Out of sockets?",
-             tor_socket_strerror(-err));
-    connection_edge_end(exit_conn, END_STREAM_REASON_RESOURCELIMIT);
-    connection_free(TO_CONN(exit_conn));
-    return 0;
-  }
-
-  tor_assert(fd[0] >= 0);
-  tor_assert(fd[1] >= 0);
+  log_info(LD_EXIT, "Opening local connection for anonymized directory exit");
 
 
-  set_socket_nonblocking(fd[0]);
-  set_socket_nonblocking(fd[1]);
-
-  exit_conn->_base.s = fd[0];
   exit_conn->_base.state = EXIT_CONN_STATE_OPEN;
   exit_conn->_base.state = EXIT_CONN_STATE_OPEN;
 
 
   dir_conn = TO_DIR_CONN(connection_new(CONN_TYPE_DIR));
   dir_conn = TO_DIR_CONN(connection_new(CONN_TYPE_DIR));
-  dir_conn->_base.s = fd[1];
 
 
   dir_conn->_base.addr = 0x7f000001;
   dir_conn->_base.addr = 0x7f000001;
   dir_conn->_base.port = 0;
   dir_conn->_base.port = 0;
@@ -2445,6 +2413,8 @@ connection_exit_connect_dir(edge_connection_t *exit_conn)
   dir_conn->_base.purpose = DIR_PURPOSE_SERVER;
   dir_conn->_base.purpose = DIR_PURPOSE_SERVER;
   dir_conn->_base.state = DIR_CONN_STATE_SERVER_COMMAND_WAIT;
   dir_conn->_base.state = DIR_CONN_STATE_SERVER_COMMAND_WAIT;
 
 
+  connection_link_connections(TO_CONN(dir_conn), TO_CONN(exit_conn));
+
   if (connection_add(TO_CONN(exit_conn))<0) {
   if (connection_add(TO_CONN(exit_conn))<0) {
     connection_edge_end(exit_conn, END_STREAM_REASON_RESOURCELIMIT);
     connection_edge_end(exit_conn, END_STREAM_REASON_RESOURCELIMIT);
     connection_free(TO_CONN(exit_conn));
     connection_free(TO_CONN(exit_conn));

+ 8 - 3
src/or/directory.c

@@ -451,22 +451,25 @@ directory_initiate_command(const char *address, uint32_t addr,
            error indicates broken link in windowsland. */
            error indicates broken link in windowsland. */
     }
     }
   } else { /* we want to connect via tor */
   } else { /* we want to connect via tor */
+    edge_connection_t *linked_conn;
     /* make an AP connection
     /* make an AP connection
      * populate it and add it at the right state
      * populate it and add it at the right state
      * socketpair and hook up both sides
      * socketpair and hook up both sides
      */
      */
     conn->dirconn_direct = 0;
     conn->dirconn_direct = 0;
-    conn->_base.s =
+    linked_conn =
       connection_ap_make_bridge(conn->_base.address, conn->_base.port,
       connection_ap_make_bridge(conn->_base.address, conn->_base.port,
                                 digest,
                                 digest,
                                 private_connection ?
                                 private_connection ?
                                   SOCKS_COMMAND_CONNECT :
                                   SOCKS_COMMAND_CONNECT :
                                   SOCKS_COMMAND_CONNECT_DIR);
                                   SOCKS_COMMAND_CONNECT_DIR);
-    if (conn->_base.s < 0) {
+    if (!linked_conn) {
       log_warn(LD_NET,"Making AP bridge to dirserver failed.");
       log_warn(LD_NET,"Making AP bridge to dirserver failed.");
       connection_mark_for_close(TO_CONN(conn));
       connection_mark_for_close(TO_CONN(conn));
+      connection_mark_for_close(TO_CONN(linked_conn));
       return;
       return;
     }
     }
+    connection_link_connections(TO_CONN(conn), TO_CONN(linked_conn));
 
 
     if (connection_add(TO_CONN(conn)) < 0) {
     if (connection_add(TO_CONN(conn)) < 0) {
       log_warn(LD_NET,"Unable to add AP bridge to dirserver.");
       log_warn(LD_NET,"Unable to add AP bridge to dirserver.");
@@ -478,6 +481,7 @@ directory_initiate_command(const char *address, uint32_t addr,
     directory_send_command(conn, purpose, 0, resource,
     directory_send_command(conn, purpose, 0, resource,
                            payload, payload_len);
                            payload, payload_len);
     connection_watch_events(TO_CONN(conn), EV_READ | EV_WRITE);
     connection_watch_events(TO_CONN(conn), EV_READ | EV_WRITE);
+    connection_start_reading(TO_CONN(linked_conn));
   }
   }
 }
 }
 
 
@@ -1297,7 +1301,8 @@ connection_dir_reached_eof(dir_connection_t *conn)
 {
 {
   int retval;
   int retval;
   if (conn->_base.state != DIR_CONN_STATE_CLIENT_READING) {
   if (conn->_base.state != DIR_CONN_STATE_CLIENT_READING) {
-    log_info(LD_HTTP,"conn reached eof, not reading. Closing.");
+    log_info(LD_HTTP,"conn reached eof, not reading. [state=%d] Closing.",
+             conn->_base.state);
     connection_close_immediate(TO_CONN(conn)); /* error: give up on flushing */
     connection_close_immediate(TO_CONN(conn)); /* error: give up on flushing */
     connection_mark_for_close(TO_CONN(conn));
     connection_mark_for_close(TO_CONN(conn));
     return -1;
     return -1;

+ 157 - 44
src/or/main.c

@@ -74,6 +74,10 @@ static connection_t *connection_array[MAXCONNECTIONS+1] =
 /** List of connections that have been marked for close and need to be freed
 /** List of connections that have been marked for close and need to be freed
  * and removed from connection_array. */
  * and removed from connection_array. */
 static smartlist_t *closeable_connection_lst = NULL;
 static smartlist_t *closeable_connection_lst = NULL;
+/** DOCDOC */
+static smartlist_t *active_linked_connection_lst = NULL;
+/** DOCDOC */
+static int called_loop_once = 0;
 
 
 static int n_conns=0; /**< Number of connections currently active. */
 static int n_conns=0; /**< Number of connections currently active. */
 
 
@@ -155,7 +159,7 @@ int
 connection_add(connection_t *conn)
 connection_add(connection_t *conn)
 {
 {
   tor_assert(conn);
   tor_assert(conn);
-  tor_assert(conn->s >= 0);
+  tor_assert(conn->s >= 0 || conn->linked);
 
 
   tor_assert(conn->conn_array_index == -1); /* can only connection_add once */
   tor_assert(conn->conn_array_index == -1); /* can only connection_add once */
   if (n_conns == MAXCONNECTIONS) {
   if (n_conns == MAXCONNECTIONS) {
@@ -198,13 +202,12 @@ connection_remove(connection_t *conn)
 
 
   tor_assert(conn->conn_array_index >= 0);
   tor_assert(conn->conn_array_index >= 0);
   current_index = conn->conn_array_index;
   current_index = conn->conn_array_index;
+  connection_unregister_events(conn); /* This is redundant, but cheap. */
   if (current_index == n_conns-1) { /* this is the end */
   if (current_index == n_conns-1) { /* this is the end */
     n_conns--;
     n_conns--;
     return 0;
     return 0;
   }
   }
 
 
-  connection_unregister(conn);
-
   /* replace this one with the one at the end */
   /* replace this one with the one at the end */
   n_conns--;
   n_conns--;
   connection_array[current_index] = connection_array[n_conns];
   connection_array[current_index] = connection_array[n_conns];
@@ -213,23 +216,31 @@ connection_remove(connection_t *conn)
   return 0;
   return 0;
 }
 }
 
 
-/** If it's an edge conn, remove it from the list
+/** If <b>conn</b> is an edge conn, remove it from the list
  * of conn's on this circuit. If it's not on an edge,
  * of conn's on this circuit. If it's not on an edge,
  * flush and send destroys for all circuits on this conn.
  * flush and send destroys for all circuits on this conn.
  *
  *
- * If <b>remove</b> is non-zero, then remove it from the
- * connection_array and closeable_connection_lst.
+ * Remove it from connection_array (if applicable) and
+ * from closeable_connection_list.
  *
  *
  * Then free it.
  * Then free it.
  */
  */
 static void
 static void
-connection_unlink(connection_t *conn, int remove)
+connection_unlink(connection_t *conn)
 {
 {
   connection_about_to_close_connection(conn);
   connection_about_to_close_connection(conn);
-  if (remove) {
+  if (conn->conn_array_index >= 0) {
     connection_remove(conn);
     connection_remove(conn);
   }
   }
+  if (conn->linked_conn) {
+    conn->linked_conn->linked_conn = NULL;
+    if (! conn->linked_conn->marked_for_close &&
+        conn->linked_conn->reading_from_linked_conn)
+      connection_start_reading(conn->linked_conn);
+    conn->linked_conn = NULL;
+  }
   smartlist_remove(closeable_connection_lst, conn);
   smartlist_remove(closeable_connection_lst, conn);
+  smartlist_remove(active_linked_connection_lst, conn);
   if (conn->type == CONN_TYPE_EXIT) {
   if (conn->type == CONN_TYPE_EXIT) {
     assert_connection_edge_not_dns_pending(TO_EDGE_CONN(conn));
     assert_connection_edge_not_dns_pending(TO_EDGE_CONN(conn));
   }
   }
@@ -286,16 +297,23 @@ get_connection_array(connection_t ***array, int *n)
 void
 void
 connection_watch_events(connection_t *conn, short events)
 connection_watch_events(connection_t *conn, short events)
 {
 {
-  int r;
+  int r = 0;
 
 
   tor_assert(conn);
   tor_assert(conn);
   tor_assert(conn->read_event);
   tor_assert(conn->read_event);
   tor_assert(conn->write_event);
   tor_assert(conn->write_event);
 
 
-  if (events & EV_READ) {
-    r = event_add(conn->read_event, NULL);
+  if (conn->linked) {
+    if (events & EV_READ)
+      connection_start_reading(conn);
+    else
+      connection_stop_reading(conn);
   } else {
   } else {
-    r = event_del(conn->read_event);
+    if (events & EV_READ) {
+      r = event_add(conn->read_event, NULL);
+    } else {
+      r = event_del(conn->read_event);
+    }
   }
   }
 
 
   if (r<0)
   if (r<0)
@@ -305,10 +323,17 @@ connection_watch_events(connection_t *conn, short events)
              conn->s, (events & EV_READ)?"":"un",
              conn->s, (events & EV_READ)?"":"un",
              tor_socket_strerror(tor_socket_errno(conn->s)));
              tor_socket_strerror(tor_socket_errno(conn->s)));
 
 
-  if (events & EV_WRITE) {
-    r = event_add(conn->write_event, NULL);
+  if (conn->linked) {
+    if (events & EV_WRITE)
+      connection_start_writing(conn);
+    else
+      connection_stop_writing(conn);
   } else {
   } else {
-    r = event_del(conn->write_event);
+    if (events & EV_WRITE) {
+      r = event_add(conn->write_event, NULL);
+    } else {
+      r = event_del(conn->write_event);
+    }
   }
   }
 
 
   if (r<0)
   if (r<0)
@@ -325,7 +350,8 @@ connection_is_reading(connection_t *conn)
 {
 {
   tor_assert(conn);
   tor_assert(conn);
 
 
-  return conn->read_event && event_pending(conn->read_event, EV_READ, NULL);
+  return conn->reading_from_linked_conn ||
+    (conn->read_event && event_pending(conn->read_event, EV_READ, NULL));
 }
 }
 
 
 /** Tell the main loop to stop notifying <b>conn</b> of any read events. */
 /** Tell the main loop to stop notifying <b>conn</b> of any read events. */
@@ -335,12 +361,16 @@ connection_stop_reading(connection_t *conn)
   tor_assert(conn);
   tor_assert(conn);
   tor_assert(conn->read_event);
   tor_assert(conn->read_event);
 
 
-  log_debug(LD_NET,"entering.");
-  if (event_del(conn->read_event))
-    log_warn(LD_NET, "Error from libevent setting read event state for %d "
-             "to unwatched: %s",
-             conn->s,
-             tor_socket_strerror(tor_socket_errno(conn->s)));
+  if (conn->linked) {
+    conn->reading_from_linked_conn = 0;
+    connection_stop_reading_from_linked_conn(conn);
+  } else {
+    if (event_del(conn->read_event))
+      log_warn(LD_NET, "Error from libevent setting read event state for %d "
+               "to unwatched: %s",
+               conn->s,
+               tor_socket_strerror(tor_socket_errno(conn->s)));
+  }
 }
 }
 
 
 /** Tell the main loop to start notifying <b>conn</b> of any read events. */
 /** Tell the main loop to start notifying <b>conn</b> of any read events. */
@@ -350,11 +380,17 @@ connection_start_reading(connection_t *conn)
   tor_assert(conn);
   tor_assert(conn);
   tor_assert(conn->read_event);
   tor_assert(conn->read_event);
 
 
-  if (event_add(conn->read_event, NULL))
-    log_warn(LD_NET, "Error from libevent setting read event state for %d "
-             "to watched: %s",
-             conn->s,
-             tor_socket_strerror(tor_socket_errno(conn->s)));
+  if (conn->linked) {
+    conn->reading_from_linked_conn = 1;
+    if (connection_should_read_from_linked_conn(conn))
+      connection_start_reading_from_linked_conn(conn);
+  } else {
+    if (event_add(conn->read_event, NULL))
+      log_warn(LD_NET, "Error from libevent setting read event state for %d "
+               "to watched: %s",
+               conn->s,
+               tor_socket_strerror(tor_socket_errno(conn->s)));
+  }
 }
 }
 
 
 /** Return true iff <b>conn</b> is listening for write events. */
 /** Return true iff <b>conn</b> is listening for write events. */
@@ -363,7 +399,8 @@ connection_is_writing(connection_t *conn)
 {
 {
   tor_assert(conn);
   tor_assert(conn);
 
 
-  return conn->write_event && event_pending(conn->write_event, EV_WRITE, NULL);
+  return conn->writing_to_linked_conn ||
+    (conn->write_event && event_pending(conn->write_event, EV_WRITE, NULL));
 }
 }
 
 
 /** Tell the main loop to stop notifying <b>conn</b> of any write events. */
 /** Tell the main loop to stop notifying <b>conn</b> of any write events. */
@@ -373,11 +410,17 @@ connection_stop_writing(connection_t *conn)
   tor_assert(conn);
   tor_assert(conn);
   tor_assert(conn->write_event);
   tor_assert(conn->write_event);
 
 
-  if (event_del(conn->write_event))
-    log_warn(LD_NET, "Error from libevent setting write event state for %d "
-             "to unwatched: %s",
-             conn->s,
-             tor_socket_strerror(tor_socket_errno(conn->s)));
+  if (conn->linked) {
+    conn->writing_to_linked_conn = 0;
+    if (conn->linked_conn)
+      connection_stop_reading_from_linked_conn(conn->linked_conn);
+  } else {
+    if (event_del(conn->write_event))
+      log_warn(LD_NET, "Error from libevent setting write event state for %d "
+               "to unwatched: %s",
+               conn->s,
+               tor_socket_strerror(tor_socket_errno(conn->s)));
+  }
 }
 }
 
 
 /** Tell the main loop to start notifying <b>conn</b> of any write events. */
 /** Tell the main loop to start notifying <b>conn</b> of any write events. */
@@ -387,11 +430,58 @@ connection_start_writing(connection_t *conn)
   tor_assert(conn);
   tor_assert(conn);
   tor_assert(conn->write_event);
   tor_assert(conn->write_event);
 
 
-  if (event_add(conn->write_event, NULL))
-    log_warn(LD_NET, "Error from libevent setting write event state for %d "
-             "to watched: %s",
-             conn->s,
-             tor_socket_strerror(tor_socket_errno(conn->s)));
+  if (conn->linked) {
+    conn->writing_to_linked_conn = 1;
+    if (conn->linked_conn &&
+        connection_should_read_from_linked_conn(conn->linked_conn))
+      connection_start_reading_from_linked_conn(conn->linked_conn);
+  } else {
+    if (event_add(conn->write_event, NULL))
+      log_warn(LD_NET, "Error from libevent setting write event state for %d "
+               "to watched: %s",
+               conn->s,
+               tor_socket_strerror(tor_socket_errno(conn->s)));
+  }
+}
+
+/** DOCDOC*/
+void
+connection_start_reading_from_linked_conn(connection_t *conn)
+{
+  tor_assert(conn);
+  tor_assert(conn->linked == 1);
+
+  if (!conn->active_on_link) {
+    conn->active_on_link = 1;
+    smartlist_add(active_linked_connection_lst, conn);
+    if (!called_loop_once) {
+      /* This is the first event on the list; we won't be in LOOP_ONCE mode,
+       * so we need to make sure that the event_loop() actually exits at the
+       * end of its run through the current connections and
+       * lets us activate read events for linked connections.  */
+      struct timeval tv = { 0, 0 };
+      event_loopexit(&tv);
+    }
+  } else {
+    tor_assert(smartlist_isin(active_linked_connection_lst, conn));
+  }
+}
+
+/** DOCDOC*/
+void
+connection_stop_reading_from_linked_conn(connection_t *conn)
+{
+  tor_assert(conn);
+  tor_assert(conn->linked == 1);
+
+  if (conn->active_on_link) {
+    conn->active_on_link = 0;
+    /* XXXX020 maybe we should keep an index here so we can smartlist_del
+     * cleanly. */
+    smartlist_remove(active_linked_connection_lst, conn);
+  } else {
+    tor_assert(!smartlist_isin(active_linked_connection_lst, conn));
+  }
 }
 }
 
 
 /** Close all connections that have been scheduled to get closed */
 /** Close all connections that have been scheduled to get closed */
@@ -402,7 +492,7 @@ close_closeable_connections(void)
   for (i = 0; i < smartlist_len(closeable_connection_lst); ) {
   for (i = 0; i < smartlist_len(closeable_connection_lst); ) {
     connection_t *conn = smartlist_get(closeable_connection_lst, i);
     connection_t *conn = smartlist_get(closeable_connection_lst, i);
     if (conn->conn_array_index < 0) {
     if (conn->conn_array_index < 0) {
-      connection_unlink(conn, 0); /* blow it away right now */
+      connection_unlink(conn); /* blow it away right now */
     } else {
     } else {
       if (!conn_close_if_marked(conn->conn_array_index))
       if (!conn_close_if_marked(conn->conn_array_index))
         ++i;
         ++i;
@@ -500,7 +590,7 @@ conn_close_if_marked(int i)
   assert_all_pending_dns_resolves_ok();
   assert_all_pending_dns_resolves_ok();
 
 
   log_debug(LD_NET,"Cleaning up connection (fd %d).",conn->s);
   log_debug(LD_NET,"Cleaning up connection (fd %d).",conn->s);
-  if (conn->s >= 0 && connection_wants_to_flush(conn)) {
+  if ((conn->s >= 0 || conn->linked_conn) && connection_wants_to_flush(conn)) {
     /* s == -1 means it's an incomplete edge connection, or that the socket
     /* s == -1 means it's an incomplete edge connection, or that the socket
      * has already been closed as unflushable. */
      * has already been closed as unflushable. */
     int sz = connection_bucket_write_limit(conn);
     int sz = connection_bucket_write_limit(conn);
@@ -512,7 +602,21 @@ conn_close_if_marked(int i)
                conn->s, conn_type_to_string(conn->type), conn->state,
                conn->s, conn_type_to_string(conn->type), conn->state,
                (int)conn->outbuf_flushlen,
                (int)conn->outbuf_flushlen,
                 conn->marked_for_close_file, conn->marked_for_close);
                 conn->marked_for_close_file, conn->marked_for_close);
-    if (connection_speaks_cells(conn)) {
+    if (conn->linked_conn) {
+      retval = move_buf_to_buf(conn->linked_conn->inbuf, conn->outbuf,
+                               &conn->outbuf_flushlen);
+      if (retval >= 0) {
+        /* The linked conn will notice that it has data when it notices that
+         * we're gone. */
+        connection_start_reading_from_linked_conn(conn->linked_conn);
+      }
+      /* XXXX020 Downgrade to debug. */
+      log_info(LD_GENERAL, "Flushed last %d bytes from a linked conn; "
+               "%d left; flushlen %d; wants-to-flush==%d", retval,
+               (int)buf_datalen(conn->outbuf),
+               (int)conn->outbuf_flushlen,
+               connection_wants_to_flush(conn));
+    } else if (connection_speaks_cells(conn)) {
       if (conn->state == OR_CONN_STATE_OPEN) {
       if (conn->state == OR_CONN_STATE_OPEN) {
         retval = flush_buf_tls(TO_OR_CONN(conn)->tls, conn->outbuf, sz,
         retval = flush_buf_tls(TO_OR_CONN(conn)->tls, conn->outbuf, sz,
                                &conn->outbuf_flushlen);
                                &conn->outbuf_flushlen);
@@ -553,7 +657,7 @@ conn_close_if_marked(int i)
              conn->marked_for_close);
              conn->marked_for_close);
     }
     }
   }
   }
-  connection_unlink(conn, 1); /* unlink, remove, free */
+  connection_unlink(conn); /* unlink, remove, free */
   return 1;
   return 1;
 }
 }
 
 
@@ -1270,8 +1374,14 @@ do_main_loop(void)
     /* Make it easier to tell whether libevent failure is our fault or not. */
     /* Make it easier to tell whether libevent failure is our fault or not. */
     errno = 0;
     errno = 0;
 #endif
 #endif
-    /* poll until we have an event, or the second ends */
-    loop_result = event_dispatch();
+    /* All active linked conns should get their read events activated. */
+    SMARTLIST_FOREACH(active_linked_connection_lst, connection_t *, conn,
+                      event_active(conn->read_event, EV_READ, 1));
+    called_loop_once = smartlist_len(active_linked_connection_lst) ? 1 : 0;
+
+    /* poll until we have an event, or the second ends, or until we have
+     * some active linked connections to triggger events for. */
+    loop_result = event_loop(called_loop_once ? EVLOOP_ONCE : 0);
 
 
     /* let catch() handle things like ^c, and otherwise don't worry about it */
     /* let catch() handle things like ^c, and otherwise don't worry about it */
     if (loop_result < 0) {
     if (loop_result < 0) {
@@ -1601,6 +1711,8 @@ tor_init(int argc, char *argv[])
   time_of_process_start = time(NULL);
   time_of_process_start = time(NULL);
   if (!closeable_connection_lst)
   if (!closeable_connection_lst)
     closeable_connection_lst = smartlist_create();
     closeable_connection_lst = smartlist_create();
+  if (!active_linked_connection_lst)
+    active_linked_connection_lst = smartlist_create();
   /* Initialize the history structures. */
   /* Initialize the history structures. */
   rep_hist_init();
   rep_hist_init();
   /* Initialize the service cache. */
   /* Initialize the service cache. */
@@ -1673,6 +1785,7 @@ tor_free_all(int postfork)
   tor_tls_free_all();
   tor_tls_free_all();
   /* stuff in main.c */
   /* stuff in main.c */
   smartlist_free(closeable_connection_lst);
   smartlist_free(closeable_connection_lst);
+  smartlist_free(active_linked_connection_lst);
   tor_free(timeout_event);
   tor_free(timeout_event);
   /* Stuff in util.c */
   /* Stuff in util.c */
   escaped(NULL);
   escaped(NULL);

+ 18 - 4
src/or/or.h

@@ -748,6 +748,7 @@ typedef struct connection_t {
   /* The next fields are all one-bit booleans. Some are only applicable
   /* The next fields are all one-bit booleans. Some are only applicable
    * to connection subtypes, but we hold them here anyway, to save space.
    * to connection subtypes, but we hold them here anyway, to save space.
    * (Currently, they all fit into a single byte.) */
    * (Currently, they all fit into a single byte.) */
+  /*XXXX020 rename wants_to_*; the names are misleading. */
   unsigned wants_to_read:1; /**< Boolean: should we start reading again once
   unsigned wants_to_read:1; /**< Boolean: should we start reading again once
                             * the bandwidth throttler allows it? */
                             * the bandwidth throttler allows it? */
   unsigned wants_to_write:1; /**< Boolean: should we start writing again once
   unsigned wants_to_write:1; /**< Boolean: should we start writing again once
@@ -771,7 +772,7 @@ typedef struct connection_t {
   unsigned int chosen_exit_optional:1;
   unsigned int chosen_exit_optional:1;
 
 
   int s; /**< Our socket; -1 if this connection is closed, or has no
   int s; /**< Our socket; -1 if this connection is closed, or has no
-          * sockets. */
+          * socket. */
   int conn_array_index; /**< Index into the global connection array. */
   int conn_array_index; /**< Index into the global connection array. */
   struct event *read_event; /**< Libevent event structure. */
   struct event *read_event; /**< Libevent event structure. */
   struct event *write_event; /**< Libevent event structure. */
   struct event *write_event; /**< Libevent event structure. */
@@ -797,6 +798,13 @@ typedef struct connection_t {
                                       * we marked for close? */
                                       * we marked for close? */
   char *address; /**< FQDN (or IP) of the guy on the other end.
   char *address; /**< FQDN (or IP) of the guy on the other end.
                   * strdup into this, because free_connection frees it. */
                   * strdup into this, because free_connection frees it. */
+  /** Annother connection that's connected to this one in lieu of a socket. */
+  struct connection_t *linked_conn;
+  /* XXXX020 NM move these up to the other 1-bit flags. */
+  unsigned int linked:1; /**< True if there is, or has been, a linked_conn. */
+  unsigned int reading_from_linked_conn:1; /**DOCDOC*/
+  unsigned int writing_to_linked_conn:1; /**DOCDOC*/
+  unsigned int active_on_link:1; /**DOCDOC*/
 
 
 } connection_t;
 } connection_t;
 
 
@@ -1967,6 +1975,7 @@ int flush_buf_tls(tor_tls_t *tls, buf_t *buf, size_t sz, size_t *buf_flushlen);
 int write_to_buf(const char *string, size_t string_len, buf_t *buf);
 int write_to_buf(const char *string, size_t string_len, buf_t *buf);
 int write_to_buf_zlib(buf_t *buf, tor_zlib_state_t *state,
 int write_to_buf_zlib(buf_t *buf, tor_zlib_state_t *state,
                       const char *data, size_t data_len, int done);
                       const char *data, size_t data_len, int done);
+int move_buf_to_buf(buf_t *buf_out, buf_t *buf_in, size_t *buf_flushlen);
 int fetch_from_buf(char *string, size_t string_len, buf_t *buf);
 int fetch_from_buf(char *string, size_t string_len, buf_t *buf);
 int fetch_from_buf_http(buf_t *buf,
 int fetch_from_buf_http(buf_t *buf,
                         char **headers_out, size_t max_headerlen,
                         char **headers_out, size_t max_headerlen,
@@ -2163,7 +2172,8 @@ const char *conn_type_to_string(int type);
 const char *conn_state_to_string(int type, int state);
 const char *conn_state_to_string(int type, int state);
 
 
 connection_t *connection_new(int type);
 connection_t *connection_new(int type);
-void connection_unregister(connection_t *conn);
+void connection_link_connections(connection_t *conn_a, connection_t *conn_b);
+void connection_unregister_events(connection_t *conn);
 void connection_free(connection_t *conn);
 void connection_free(connection_t *conn);
 void connection_free_all(void);
 void connection_free_all(void);
 void connection_about_to_close_connection(connection_t *conn);
 void connection_about_to_close_connection(connection_t *conn);
@@ -2227,6 +2237,7 @@ connection_t *connection_get_by_type_state_rendquery(int type, int state,
 int connection_is_listener(connection_t *conn);
 int connection_is_listener(connection_t *conn);
 int connection_state_is_open(connection_t *conn);
 int connection_state_is_open(connection_t *conn);
 int connection_state_is_connecting(connection_t *conn);
 int connection_state_is_connecting(connection_t *conn);
+int connection_should_read_from_linked_conn(connection_t *conn);
 
 
 char *alloc_http_authenticator(const char *authenticator);
 char *alloc_http_authenticator(const char *authenticator);
 
 
@@ -2252,8 +2263,8 @@ int connection_edge_finished_connecting(edge_connection_t *conn);
 int connection_ap_handshake_send_begin(edge_connection_t *ap_conn);
 int connection_ap_handshake_send_begin(edge_connection_t *ap_conn);
 int connection_ap_handshake_send_resolve(edge_connection_t *ap_conn);
 int connection_ap_handshake_send_resolve(edge_connection_t *ap_conn);
 
 
-int connection_ap_make_bridge(char *address, uint16_t port,
-                              const char *digest, int command);
+edge_connection_t  *connection_ap_make_bridge(char *address, uint16_t port,
+                                              const char *digest, int command);
 void connection_ap_handshake_socks_reply(edge_connection_t *conn, char *reply,
 void connection_ap_handshake_socks_reply(edge_connection_t *conn, char *reply,
                                          size_t replylen,
                                          size_t replylen,
                                          int endreason);
                                          int endreason);
@@ -2580,6 +2591,9 @@ int connection_is_writing(connection_t *conn);
 void connection_stop_writing(connection_t *conn);
 void connection_stop_writing(connection_t *conn);
 void connection_start_writing(connection_t *conn);
 void connection_start_writing(connection_t *conn);
 
 
+void connection_stop_reading_from_linked_conn(connection_t *conn);
+void connection_start_reading_from_linked_conn(connection_t *conn);
+
 void directory_all_unreachable(time_t now);
 void directory_all_unreachable(time_t now);
 void directory_info_has_arrived(time_t now, int from_cache);
 void directory_info_has_arrived(time_t now, int from_cache);
 
 

+ 33 - 3
src/or/test.c

@@ -111,9 +111,10 @@ test_buffers(void)
   char str[256];
   char str[256];
   char str2[256];
   char str2[256];
 
 
-  buf_t *buf;
+  buf_t *buf, *buf2;
 
 
   int j;
   int j;
+  size_t r;
 
 
   /****
   /****
    * buf_new
    * buf_new
@@ -218,6 +219,37 @@ test_buffers(void)
     test_memeq(str2, str, 255);
     test_memeq(str2, str, 255);
   }
   }
 
 
+  /* Move from buf to buf. */
+  buf_free(buf);
+  buf = buf_new_with_capacity(4096);
+  buf2 = buf_new_with_capacity(4096);
+  for (j=0;j<100;++j)
+    write_to_buf(str, 255, buf);
+  test_eq(buf_datalen(buf), 25500);
+  for (j=0;j<100;++j) {
+    r = 10;
+    move_buf_to_buf(buf2, buf, &r);
+    test_eq(r, 0);
+  }
+  test_eq(buf_datalen(buf), 24500);
+  test_eq(buf_datalen(buf2), 1000);
+  for (j=0;j<3;++j) {
+    fetch_from_buf(str2, 255, buf2);
+    test_memeq(str2, str, 255);
+  }
+  r = 8192; /*big move*/
+  move_buf_to_buf(buf2, buf, &r);
+  test_eq(r, 0);
+  r = 30000; /* incomplete move */
+  move_buf_to_buf(buf2, buf, &r);
+  test_eq(r, 13692);
+  for (j=0;j<97;++j) {
+    fetch_from_buf(str2, 255, buf2);
+    test_memeq(str2, str, 255);
+  }
+  buf_free(buf);
+  buf_free(buf2);
+
 #if 0
 #if 0
   {
   {
   int s;
   int s;
@@ -285,8 +317,6 @@ test_buffers(void)
   test_eq(eof, 1);
   test_eq(eof, 1);
   }
   }
 #endif
 #endif
-
-  buf_free(buf);
 }
 }
 
 
 static void
 static void