Browse Source

Unpack cells in worker threads

Steven Engler 4 years ago
parent
commit
568c9ff0a3

+ 10 - 5
src/core/mainloop/connection.c

@@ -387,7 +387,8 @@ or_connection_new(int type, int socket_family)
   tor_assert(safe_or_conn_link_handshaking_ev != EVENT_LABEL_UNSET);
   tor_assert(safe_or_conn_open_ev != EVENT_LABEL_UNSET);
   tor_assert(safe_or_conn_closed_ev != EVENT_LABEL_UNSET);
-  tor_assert(safe_or_conn_has_buffered_data_ev != EVENT_LABEL_UNSET);
+  tor_assert(safe_or_conn_fixed_cell_ev != EVENT_LABEL_UNSET);
+  tor_assert(safe_or_conn_var_cell_ev != EVENT_LABEL_UNSET);
 
   event_listener_set_callback(TO_CONN(or_conn)->event_listener,
                               safe_or_conn_tcp_connecting_ev,
@@ -405,9 +406,11 @@ or_connection_new(int type, int socket_family)
                               safe_or_conn_closed_ev,
                               NULL, connection_or_process_event);
   event_listener_set_callback(TO_CONN(or_conn)->event_listener,
-                              safe_or_conn_has_buffered_data_ev,
-                              safe_or_conn_buf_data_event_update,
-                              connection_or_process_event);
+                              safe_or_conn_fixed_cell_ev,
+                              NULL, connection_or_process_event);
+  event_listener_set_callback(TO_CONN(or_conn)->event_listener,
+                              safe_or_conn_var_cell_ev,
+                              NULL, connection_or_process_event);
 
   connection_or_set_canonical(or_conn, 0);
 
@@ -1902,7 +1905,9 @@ connection_handle_listener_read(connection_t *conn, int new_type)
       safe_connection_subscribe(newconn->safe_conn, newconn->event_listener,
                                 safe_or_conn_closed_ev);
       safe_connection_subscribe(newconn->safe_conn, newconn->event_listener,
-                                safe_or_conn_has_buffered_data_ev);
+                                safe_or_conn_fixed_cell_ev);
+      safe_connection_subscribe(newconn->safe_conn, newconn->event_listener,
+                                safe_or_conn_var_cell_ev);
 
       safe_connection_set_socket(newconn->safe_conn, news);
     }

+ 4 - 0
src/core/or/channeltls.c

@@ -1533,6 +1533,10 @@ channel_tls_process_versions_cell(var_cell_t *cell, channel_tls_t *chan)
   chan->conn->link_proto = highest_supported_version;
   chan->conn->handshake_state->received_versions = 1;
 
+  tor_assert(TO_CONN(chan->conn)->safe_conn != NULL);
+  safe_or_connection_set_link_protocol(TO_SAFE_OR_CONN(TO_CONN(chan->conn)->safe_conn),
+                                       chan->conn->link_proto);
+
   if (chan->conn->link_proto == 2) {
     log_info(LD_OR,
              "Negotiated version %d with %s:%d; sending NETINFO.",

+ 23 - 101
src/core/or/connection_or.c

@@ -521,9 +521,24 @@ connection_or_process_event(event_label_t label, event_data_t data,
     connection_or_notify_error(or_conn, END_OR_CONN_REASON_CONNRESET, "unknown");
     connection_close_immediate(TO_CONN(or_conn));
     connection_mark_for_close_internal(TO_CONN(or_conn));
-  } else if (label == safe_or_conn_has_buffered_data_ev) {
-    size_t num_bytes = data.u64;
-    connection_or_new_process_cells_from_inbuf(or_conn, num_bytes);
+  } else if (label == safe_or_conn_fixed_cell_ev) {
+    // touch the channel's active timestamp if there is one
+    if (or_conn->chan) {
+      channel_timestamp_active(TLS_CHAN_TO_BASE(or_conn->chan));
+    }
+    circuit_build_times_network_is_live(get_circuit_build_times_mutable());
+
+    cell_t *cell = data.ptr;
+    channel_tls_handle_cell(cell, or_conn);
+  } else if (label == safe_or_conn_var_cell_ev) {
+    // touch the channel's active timestamp if there is one
+    if (or_conn->chan) {
+      channel_timestamp_active(TLS_CHAN_TO_BASE(or_conn->chan));
+    }
+    circuit_build_times_network_is_live(get_circuit_build_times_mutable());
+
+    var_cell_t *var_cell = data.ptr;
+    channel_tls_handle_var_cell(var_cell, or_conn);
   } else {
     log_warn(LD_OR, "Received an OR event that we don't recognize");
   }
@@ -559,7 +574,7 @@ cell_pack(packed_cell_t *dst, const cell_t *src, int wide_circ_ids)
 /** Unpack the network-order buffer <b>src</b> into a host-order
  * cell_t structure <b>dest</b>.
  */
-static void
+void
 cell_unpack(cell_t *dest, const char *src, int wide_circ_ids)
 {
   if (wide_circ_ids) {
@@ -1582,7 +1597,10 @@ connection_or_connect, (const tor_addr_t *_addr, uint16_t port,
                             safe_or_conn_closed_ev);
   safe_connection_subscribe(TO_CONN(conn)->safe_conn,
                             TO_CONN(conn)->event_listener,
-                            safe_or_conn_has_buffered_data_ev);
+                            safe_or_conn_fixed_cell_ev);
+  safe_connection_subscribe(TO_CONN(conn)->safe_conn,
+                            TO_CONN(conn)->event_listener,
+                            safe_or_conn_var_cell_ev);
 
   /*
    * Set up conn so it's got all the data we need to remember for channels
@@ -2508,102 +2526,6 @@ connection_or_write_var_cell_to_buf,(const var_cell_t *cell,
 //  }
 //}
 
-static int
-connection_or_fetch_cell(or_connection_t *or_conn, char *cell_buf)
-{
-  // TODO: fix this ugly locking
-  tor_assert(TO_CONN(or_conn)->safe_conn != NULL);
-  tor_mutex_acquire(&(TO_CONN(or_conn)->safe_conn->lock));
-  struct buf_t *inbuf = TO_CONN(or_conn)->safe_conn->inbuf;
-
-  size_t cell_network_size = get_cell_network_size(or_conn->wide_circ_ids);
-
-  if (buf_datalen(inbuf) < cell_network_size) {
-    // don't have a full cell
-    tor_mutex_release(&(TO_CONN(or_conn)->safe_conn->lock));
-    return 0;
-  }
-
-  buf_get_bytes(inbuf, cell_buf, cell_network_size);
-
-  safe_connection_inbuf_modified(TO_CONN(or_conn)->safe_conn);
-  tor_mutex_release(&(TO_CONN(or_conn)->safe_conn->lock));
-  return 1;
-}
-
-static int
-connection_or_fetch_var_cell(or_connection_t *or_conn, var_cell_t **var_cell_ptr)
-{
-  // TODO: fix this ugly locking
-  tor_assert(TO_CONN(or_conn)->safe_conn != NULL);
-  tor_mutex_acquire(&(TO_CONN(or_conn)->safe_conn->lock));
-  struct buf_t *inbuf = TO_CONN(or_conn)->safe_conn->inbuf;
-
-  int link_proto = or_conn->link_proto;
-  *var_cell_ptr = NULL;
-  int found_var_cell = fetch_var_cell_from_buf(inbuf, var_cell_ptr, link_proto);
-
-  safe_connection_inbuf_modified(TO_CONN(or_conn)->safe_conn);
-  tor_mutex_release(&(TO_CONN(or_conn)->safe_conn->lock));
-  return found_var_cell;
-}
-
-static int
-connection_or_new_process_cells_from_inbuf(or_connection_t *conn,
-                                           size_t num_bytes)
-{
-  //log_debug(LD_OR, "Starting OR conn process inbuf");
-  log_warn(LD_OR, "Starting OR conn process inbuf for conn %p", conn);
-
-  // TODO: we should only use 'num_bytes' bytes from the buffer, instead we
-  //       just ignore this
-  //       we would also need to keep track of the number of unused bytes so
-  //       that we can use them again next time we call this function
-
-  while (1) {
-    var_cell_t *var_cell = NULL;
-    int found_var_cell = connection_or_fetch_var_cell(conn, &var_cell);
-
-    if (found_var_cell) {
-      if (var_cell == NULL) {
-        // the next cell is a var cell, but it is not yet complete
-        return 0;
-      }
-
-      // touch the channel's active timestamp if there is one
-      if (conn->chan) {
-        channel_timestamp_active(TLS_CHAN_TO_BASE(conn->chan));
-      }
-      circuit_build_times_network_is_live(get_circuit_build_times_mutable());
-
-      channel_tls_handle_var_cell(var_cell, conn);
-      var_cell_free(var_cell);
-      var_cell = NULL;
-    } else {
-      char buf[CELL_MAX_NETWORK_SIZE];
-      int found_cell = connection_or_fetch_cell(conn, buf);
-
-      if (found_cell) {
-        // touch the channel's active timestamp if there is one
-        if (conn->chan) {
-          channel_timestamp_active(TLS_CHAN_TO_BASE(conn->chan));
-        }
-        circuit_build_times_network_is_live(get_circuit_build_times_mutable());
-
-        // retrieve cell info from buf (create the host-order struct from the
-        // network-order string)
-        cell_t cell;
-        cell_unpack(&cell, buf, conn->wide_circ_ids);
-
-        channel_tls_handle_cell(&cell, conn);
-      } else {
-        // there is not yet a complete cell
-        return 0;
-      }
-    }
-  }
-}
-
 /** Array of recognized link protocol versions. */
 static const uint16_t or_protocol_versions[] = { 1, 2, 3, 4, 5 };
 /** Number of versions in <b>or_protocol_versions</b>. */

+ 1 - 0
src/core/or/connection_or.h

@@ -117,6 +117,7 @@ MOCK_DECL(int,connection_or_send_authenticate_cell,
 int is_or_protocol_version_known(uint16_t version);
 
 void cell_pack(packed_cell_t *dest, const cell_t *src, int wide_circ_ids);
+void cell_unpack(cell_t *dest, const char *src, int wide_circ_ids);
 int var_cell_pack_header(const var_cell_t *cell, char *hdr_out,
                          int wide_circ_ids);
 var_cell_t *var_cell_new(uint16_t payload_len);

+ 135 - 20
src/core/or/safe_connection.c

@@ -3,13 +3,19 @@
 #include "lib/net/buffers_net.h"
 #include "lib/tls/tortls.h"
 #include "lib/tls/buffers_tls.h"
+#include "lib/malloc/malloc.h"
+#include "core/proto/proto_cell.h"
+#include "core/or/connection_or.h"
+#include "core/or/var_cell_st.h"
+#include "core/or/cell_st.h"
 
 event_label_t safe_or_conn_tcp_connecting_ev = EVENT_LABEL_UNSET;
 event_label_t safe_or_conn_tls_handshaking_ev = EVENT_LABEL_UNSET;
 event_label_t safe_or_conn_link_handshaking_ev = EVENT_LABEL_UNSET;
 event_label_t safe_or_conn_open_ev = EVENT_LABEL_UNSET;
 event_label_t safe_or_conn_closed_ev = EVENT_LABEL_UNSET;
-event_label_t safe_or_conn_has_buffered_data_ev = EVENT_LABEL_UNSET;
+event_label_t safe_or_conn_fixed_cell_ev = EVENT_LABEL_UNSET;
+event_label_t safe_or_conn_var_cell_ev = EVENT_LABEL_UNSET;
 
 static void
 safe_connection_refresh_events(safe_connection_t *safe_conn);
@@ -39,6 +45,9 @@ safe_or_connection_socket_added_cb(safe_connection_t *safe_conn);
 static void
 safe_or_connection_outbuf_modified_cb(safe_connection_t *safe_conn);
 
+static void
+process_cells_from_inbuf(safe_or_connection_t *safe_or_conn);
+
 /********************************************************/
 
 safe_or_connection_t *
@@ -57,7 +66,8 @@ safe_or_conn_register_events(event_registry_t *registry)
   tor_assert(safe_or_conn_link_handshaking_ev == EVENT_LABEL_UNSET);
   tor_assert(safe_or_conn_open_ev == EVENT_LABEL_UNSET);
   tor_assert(safe_or_conn_closed_ev == EVENT_LABEL_UNSET);
-  tor_assert(safe_or_conn_has_buffered_data_ev == EVENT_LABEL_UNSET);
+  tor_assert(safe_or_conn_fixed_cell_ev == EVENT_LABEL_UNSET);
+  tor_assert(safe_or_conn_var_cell_ev == EVENT_LABEL_UNSET);
 
   safe_or_conn_tcp_connecting_ev = \
     event_registry_register_event(registry, "OR Connection Connecting");
@@ -69,17 +79,10 @@ safe_or_conn_register_events(event_registry_t *registry)
     event_registry_register_event(registry, "OR Connection Open");
   safe_or_conn_closed_ev = \
     event_registry_register_event(registry, "OR Connection Closed");
-  safe_or_conn_has_buffered_data_ev = \
-    event_registry_register_event(registry, "OR Connection Has Data In Buffer");
-}
-
-void
-safe_or_conn_buf_data_event_update(event_label_t label,
-                                   event_data_t *old_data,
-                                   event_data_t *new_data)
-{
-  tor_assert(label == safe_or_conn_has_buffered_data_ev);
-  old_data->u64 += new_data->u64;
+  safe_or_conn_fixed_cell_ev = \
+    event_registry_register_event(registry, "OR Connection New Fixed-Size Cell");
+  safe_or_conn_var_cell_ev = \
+    event_registry_register_event(registry, "OR Connection New Variable-Size Cell");
 }
 
 /********************************************************/
@@ -448,6 +451,10 @@ safe_or_connection_new(bool requires_buffers, bool is_outgoing,
     log_warn(LD_OR, "No remote address string was provided");
   }
 
+  safe_or_conn->link_protocol = 0; // unknown protocol
+  safe_or_conn->wide_circ_ids = false;
+  safe_or_conn->allowed_to_process_cells = true;
+
   // these states should be set by 'safe_or_connection_update_state()'
   socket_rw_state_init(&safe_or_conn->tor_read_wanted,  false);
   socket_rw_state_init(&safe_or_conn->tor_write_wanted, false);
@@ -519,6 +526,23 @@ safe_or_connection_refresh_bucket_rw_states(safe_or_connection_t *safe_or_conn)
   }
 }
 
+void
+safe_or_connection_set_link_protocol(safe_or_connection_t *safe_or_conn,
+                                     uint16_t link_protocol)
+{
+  tor_assert(safe_or_conn != NULL);
+  tor_assert(link_protocol >= 3);
+  tor_assert(safe_or_conn->allowed_to_process_cells == false);
+  tor_mutex_acquire(&TO_SAFE_CONN(safe_or_conn)->lock);
+
+  safe_or_conn->link_protocol = link_protocol;
+  safe_or_conn->wide_circ_ids = (link_protocol >= 3);
+  safe_or_conn->allowed_to_process_cells = true;
+  event_active(TO_SAFE_CONN(safe_or_conn)->read_event, 0, 0);
+
+  tor_mutex_release(&TO_SAFE_CONN(safe_or_conn)->lock);
+}
+
 // TODO: we should get rid of this at some point
 void
 safe_or_connection_get_tls_desc(safe_or_connection_t *safe_or_conn,
@@ -1033,13 +1057,6 @@ safe_or_connection_read_encrypted(safe_or_connection_t *safe_or_conn,
              bytes_read);
   }
 
-  // let any listeners know that we have new data in our incoming buffer
-  if (bytes_read > 0) {
-    event_data_t event_data = { .u64 = bytes_read };
-    event_source_publish(TO_SAFE_CONN(safe_or_conn)->event_source,
-                         safe_or_conn_has_buffered_data_ev, event_data, NULL);
-  }
-
   size_t tls_bytes_read = 0;
   size_t tls_bytes_written = 0;
   tor_tls_get_n_raw_bytes(safe_or_conn->tls, &tls_bytes_read,
@@ -1316,6 +1333,11 @@ safe_or_connection_read_cb(safe_connection_t *safe_conn)
       tor_assert(safe_or_connection_update_state(safe_or_conn,
         SAFE_OR_CONN_STATE_CLOSED) == E_SUCCESS);
     }
+
+    if (safe_or_conn->allowed_to_process_cells) {
+      process_cells_from_inbuf(safe_or_conn);
+    }
+
     break;
   }
   case SAFE_OR_CONN_STATE_CLOSED:
@@ -1416,3 +1438,96 @@ safe_or_connection_write_cb(safe_connection_t *safe_conn)
     break;
   }
 }
+
+/********************************************************/
+
+static bool
+fetch_cell(safe_or_connection_t *safe_or_conn, char *cell_buf)
+{
+  safe_connection_t *safe_conn = TO_SAFE_CONN(safe_or_conn);
+
+  size_t cell_network_size = \
+    get_cell_network_size(safe_or_conn->wide_circ_ids?1:0);
+
+  if (buf_datalen(safe_conn->inbuf) < cell_network_size) {
+    // don't have a full cell
+    return false;
+  }
+
+  buf_get_bytes(safe_conn->inbuf, cell_buf, cell_network_size);
+  safe_connection_inbuf_modified(safe_conn);
+
+  return true;
+}
+
+static bool
+fetch_var_cell(safe_or_connection_t *safe_or_conn, var_cell_t **var_cell_ptr)
+{
+  safe_connection_t *safe_conn = TO_SAFE_CONN(safe_or_conn);
+
+  int link_protocol = safe_or_conn->link_protocol;
+  *var_cell_ptr = NULL;
+  int found_var_cell = fetch_var_cell_from_buf(safe_conn->inbuf, var_cell_ptr,
+                                               link_protocol);
+  if (*var_cell_ptr != NULL) {
+    // there was not a *full* cell
+    safe_connection_inbuf_modified(safe_conn);
+  }
+  return (found_var_cell != 0);
+}
+
+static void
+void_var_cell_free(void *void_var_cell)
+{
+  var_cell_free_((var_cell_t *)void_var_cell);
+}
+
+static void
+process_cells_from_inbuf(safe_or_connection_t *safe_or_conn)
+{
+  tor_assert(safe_or_conn != NULL);
+
+  while (true) {
+    var_cell_t *var_cell = NULL;
+    bool found_var_cell = fetch_var_cell(safe_or_conn, &var_cell);
+
+    if (found_var_cell) {
+      if (var_cell == NULL) {
+        // the next cell is a var cell, but it is not yet complete
+        return;
+      }
+
+      event_data_t event_data = { .ptr = var_cell };
+      event_source_publish(TO_SAFE_CONN(safe_or_conn)->event_source,
+                           safe_or_conn_var_cell_ev, event_data,
+                           void_var_cell_free);
+
+      if (safe_or_conn->link_protocol == 0 &&
+          var_cell->command == CELL_VERSIONS) {
+        // this is the first VERSIONS cell we've received;
+        // in order to process future cells, we need to be told our
+        // protocol version
+        safe_or_conn->allowed_to_process_cells = false;
+        return;
+      }
+    } else {
+      char buf[CELL_MAX_NETWORK_SIZE];
+      bool found_cell = fetch_cell(safe_or_conn, buf);
+
+      if (found_cell) {
+        // retrieve cell info from buf (create the host-order struct from the
+        // network-order string)
+        cell_t *cell = tor_malloc(sizeof(cell_t));
+        cell_unpack(cell, buf, safe_or_conn->wide_circ_ids?1:0);
+
+        event_data_t event_data = { .ptr = cell };
+        event_source_publish(TO_SAFE_CONN(safe_or_conn)->event_source,
+                             safe_or_conn_fixed_cell_ev, event_data,
+                             tor_free_);
+      } else {
+        // there is not yet a complete cell
+        return;
+      }
+    }
+  }
+}

+ 9 - 1
src/core/or/safe_connection.h

@@ -16,7 +16,8 @@ extern event_label_t safe_or_conn_tls_handshaking_ev;
 extern event_label_t safe_or_conn_link_handshaking_ev;
 extern event_label_t safe_or_conn_open_ev;
 extern event_label_t safe_or_conn_closed_ev;
-extern event_label_t safe_or_conn_has_buffered_data_ev;
+extern event_label_t safe_or_conn_fixed_cell_ev;
+extern event_label_t safe_or_conn_var_cell_ev;
 
 typedef struct link_handshaking_ev_data_t {
   tor_x509_cert_t *tls_own_cert; // the ownership is passed in this event
@@ -81,6 +82,9 @@ typedef struct safe_or_connection_t {
   or_conn_state_t state;
   bool is_outgoing;
   char *remote_address_str;
+  uint16_t link_protocol;
+  bool wide_circ_ids;
+  bool allowed_to_process_cells;
 
   socket_rw_state_t tor_read_wanted;
   socket_rw_state_t tor_write_wanted;
@@ -154,6 +158,10 @@ safe_or_connection_t *
 safe_or_connection_new(bool requires_buffers, bool is_outgoing,
                        const char *remote_address_str);
 
+void
+safe_or_connection_set_link_protocol(safe_or_connection_t *safe_or_conn,
+                                     uint16_t link_protocol);
+
 void
 safe_or_connection_get_tls_desc(safe_or_connection_t *safe_or_conn,
                                 char *buf, size_t buf_size);