Browse Source

Added another set of connection event sources/listeners

An OR connection can now broadcast events when the link protocol
version is decided, and when the link handshake completes. The
safe OR connection can listen for these events.
Steven Engler 5 years ago
parent
commit
a92a16d5ab

+ 4 - 1
src/core/mainloop/connection.c

@@ -567,6 +567,7 @@ connection_init(time_t now, connection_t *conn, int type, int socket_family)
   conn->global_identifier = n_connections_allocated++;
 
   conn->event_listener = event_listener_new(conn);
+  conn->event_source = event_source_new();
 
   conn->type = type;
   conn->socket_family = socket_family;
@@ -676,6 +677,7 @@ connection_free_minimal(connection_t *conn)
   }
 
   event_listener_free(conn->event_listener);
+  event_source_free(conn->event_source);
 
   if (connection_is_listener(conn)) {
     if (conn->socket_family == AF_UNIX) {
@@ -1885,7 +1887,8 @@ connection_handle_listener_read(connection_t *conn, int new_type)
       safe_connection_t *safe_conn = NULL;
       if (new_type == CONN_TYPE_OR) {
         safe_or_connection_t *safe_or_conn;
-        safe_or_conn = safe_or_connection_new(true, false, newconn->address);
+        safe_or_conn = safe_or_connection_new(true, false, newconn->address,
+                                              newconn->event_source);
         safe_conn = TO_SAFE_CONN(safe_or_conn);
       } else {
         log_warn(LD_NET, "Safe conn not yet implemented for type %d", new_type);

+ 1 - 0
src/core/mainloop/mainloop.c

@@ -2391,6 +2391,7 @@ init_event_registry(void)
   event_registry = event_registry_new();
 
   safe_or_conn_register_events(event_registry);
+  or_conn_register_events(event_registry);
 }
 
 static void

+ 22 - 1
src/core/or/connection_or.c

@@ -82,6 +82,9 @@
 
 #include "core/or/orconn_event.h"
 
+event_label_t or_conn_link_protocol_version_ev = EVENT_LABEL_UNSET;
+event_label_t or_conn_open_ev = EVENT_LABEL_UNSET;
+
 //static int connection_tls_finish_handshake(or_connection_t *conn);
 static int connection_or_launch_v3_or_handshake(or_connection_t *conn);
 static int connection_or_new_process_cells_from_inbuf(or_connection_t *conn,
@@ -118,6 +121,18 @@ TO_OR_CONN(connection_t *c)
   return DOWNCAST(or_connection_t, c);
 }
 
+void
+or_conn_register_events(event_registry_t *registry)
+{
+  tor_assert(or_conn_link_protocol_version_ev == EVENT_LABEL_UNSET);
+  tor_assert(or_conn_open_ev == EVENT_LABEL_UNSET);
+
+  or_conn_link_protocol_version_ev = \
+    event_registry_register_event(registry, "Decided protocol version");
+  or_conn_open_ev = \
+    event_registry_register_event(registry, "Connection is open");
+}
+
 /** Global map between Extended ORPort identifiers and OR
  *  connections. */
 static digestmap_t *orconn_ext_or_id_map = NULL;
@@ -541,6 +556,7 @@ connection_or_process_event(event_label_t label, event_data_t data,
     channel_tls_handle_var_cell(var_cell, or_conn);
   } else {
     log_warn(LD_OR, "Received an OR event that we don't recognize");
+    tor_assert_nonfatal_unreached_once();
   }
 }
 
@@ -1577,7 +1593,8 @@ connection_or_connect, (const tor_addr_t *_addr, uint16_t port,
   conn = or_connection_new(CONN_TYPE_OR, tor_addr_family(&addr));
 
   safe_or_connection_t *safe_or_conn;
-  safe_or_conn = safe_or_connection_new(true, true, TO_CONN(conn)->address);
+  safe_or_conn = safe_or_connection_new(true, true, TO_CONN(conn)->address,
+                                        TO_CONN(conn)->event_source);
   TO_CONN(conn)->safe_conn = TO_SAFE_CONN(safe_or_conn);
 
   safe_connection_subscribe(TO_CONN(conn)->safe_conn,
@@ -2374,6 +2391,10 @@ connection_or_set_state_open(or_connection_t *conn)
   connection_or_change_state(conn, OR_CONN_STATE_OPEN);
   connection_or_event_status(conn, OR_CONN_EVENT_CONNECTED, 0);
 
+  event_data_t null_data = { .ptr = NULL };
+  event_source_publish(TO_CONN(conn)->event_source,
+                       or_conn_open_ev, null_data, NULL);
+
   /* Link protocol 3 appeared in Tor 0.2.3.6-alpha, so any connection
    * that uses an earlier link protocol should not be treated as a relay. */
   if (conn->link_proto < 3) {

+ 7 - 2
src/core/or/connection_or.h

@@ -12,13 +12,18 @@
 #ifndef TOR_CONNECTION_OR_H
 #define TOR_CONNECTION_OR_H
 
+#include "core/or/orconn_event.h"
+#include "lib/evloop/events.h"
+
+extern event_label_t or_conn_link_protocol_version_ev;
+extern event_label_t or_conn_open_ev;
+
 struct ed25519_public_key_t;
 struct ed25519_keypair_t;
 
 or_connection_t *TO_OR_CONN(connection_t *);
 
-#include "lib/evloop/events.h"
-#include "core/or/orconn_event.h"
+void or_conn_register_events(event_registry_t *registry);
 
 void connection_or_clear_identity(or_connection_t *conn);
 void connection_or_clear_identity_map(void);

+ 1 - 0
src/core/or/connection_st.h

@@ -45,6 +45,7 @@ struct connection_t {
 
   safe_connection_t *safe_conn;
   event_listener_t *event_listener;
+  event_source_t *event_source;
 
   uint8_t state; /**< Current state of this connection. */
   unsigned int type:5; /**< What kind of connection is this? */

+ 69 - 12
src/core/or/safe_connection.c

@@ -23,6 +23,13 @@ safe_connection_refresh_events(safe_connection_t *safe_conn);
 static void
 safe_or_connection_refresh_bucket_rw_states(safe_or_connection_t *safe_or_conn);
 
+static void
+safe_or_conn_link_protocol_version_cb(event_label_t label, event_data_t data,
+                                      void *context);
+
+static void
+safe_or_conn_open_cb(event_label_t label, event_data_t data, void *context);
+
 static tor_error_t
 safe_or_connection_update_state(safe_or_connection_t *safe_or_conn,
                                 or_conn_state_t new_state);
@@ -131,6 +138,7 @@ socket_rw_state_set(socket_rw_state_t *rw_state,
 
 void
 safe_connection_init(safe_connection_t *safe_conn, uint32_t type_magic,
+                     event_source_t *conn_event_source,
                      bool (*is_read_wanted)(safe_connection_t *),
                      bool (*is_write_wanted)(safe_connection_t *),
                      void (*read_cb)(safe_connection_t *),
@@ -140,6 +148,8 @@ safe_connection_init(safe_connection_t *safe_conn, uint32_t type_magic,
                      void (*outbuf_modified_cb)(safe_connection_t *),
                      bool requires_buffers, bool linked)
 {
+  (void)conn_event_source;
+
   tor_assert(safe_conn != NULL);
   tor_assert(is_read_wanted != NULL);
   tor_assert(is_write_wanted != NULL);
@@ -153,6 +163,7 @@ safe_connection_init(safe_connection_t *safe_conn, uint32_t type_magic,
   safe_conn->linked = linked;
 
   safe_conn->event_source = event_source_new();
+  safe_conn->event_listener = event_listener_new(safe_conn);
 
   socket_rw_state_init(&safe_conn->read_allowed, true);
   socket_rw_state_init(&safe_conn->write_allowed, true);
@@ -293,6 +304,7 @@ safe_connection_unregister_events(safe_connection_t *safe_conn)
   if (safe_conn->write_event != NULL) {
     tor_event_free(safe_conn->write_event);
   }
+  event_listener_detach(safe_conn->event_listener);
 
   tor_mutex_release(&safe_conn->lock);
 }
@@ -324,6 +336,8 @@ safe_connection_register_events(safe_connection_t *safe_conn,
     return E_ERROR;
   }
 
+  event_listener_attach(safe_conn->event_listener, event_base);
+
   safe_connection_refresh_events(safe_conn);
 
   tor_mutex_release(&safe_conn->lock);
@@ -448,13 +462,15 @@ safe_connection_outbuf_modified(safe_connection_t *safe_conn)
 
 safe_or_connection_t *
 safe_or_connection_new(bool requires_buffers, bool is_outgoing,
-                       const char *remote_address_str)
+                       const char *remote_address_str,
+                       event_source_t *conn_event_source)
 {
   safe_or_connection_t *safe_or_conn = \
     tor_malloc_zero(sizeof(safe_or_connection_t));
 
   safe_connection_init(TO_SAFE_CONN(safe_or_conn),
                        SAFE_OR_CONN_MAGIC,
+                       conn_event_source,
                        safe_or_connection_is_read_wanted,
                        safe_or_connection_is_write_wanted,
                        safe_or_connection_read_cb,
@@ -475,9 +491,25 @@ safe_or_connection_new(bool requires_buffers, bool is_outgoing,
     log_warn(LD_OR, "No remote address string was provided");
   }
 
+  event_listener_set_callback(TO_SAFE_CONN(safe_or_conn)->event_listener,
+                              or_conn_link_protocol_version_ev,
+                              NULL, safe_or_conn_link_protocol_version_cb);
+  event_listener_set_callback(TO_SAFE_CONN(safe_or_conn)->event_listener,
+                              or_conn_open_ev,
+                              NULL, safe_or_conn_open_cb);
+
+  if (conn_event_source) {
+    event_source_subscribe(conn_event_source,
+                           TO_SAFE_CONN(safe_or_conn)->event_listener,
+                           or_conn_link_protocol_version_ev);
+    event_source_subscribe(conn_event_source,
+                           TO_SAFE_CONN(safe_or_conn)->event_listener,
+                           or_conn_open_ev);
+  }
+
   safe_or_conn->link_protocol = 0; // unknown protocol
   safe_or_conn->wide_circ_ids = false;
-  safe_or_conn->allowed_to_process_cells = true;
+  safe_or_conn->waiting_for_link_protocol = false;
 
   // these states should be set by 'safe_or_connection_update_state()'
   socket_rw_state_init(&safe_or_conn->tor_read_wanted,  false);
@@ -550,19 +582,42 @@ safe_or_connection_refresh_bucket_rw_states(safe_or_connection_t *safe_or_conn)
   }
 }
 
-void
-safe_or_connection_set_link_protocol(safe_or_connection_t *safe_or_conn,
-                                     uint16_t link_protocol)
+static void
+safe_or_conn_link_protocol_version_cb(event_label_t label, event_data_t data,
+                                      void *context)
 {
+  safe_or_connection_t *safe_or_conn = context;
+  tor_assert(label == or_conn_link_protocol_version_ev);
   tor_assert(safe_or_conn != NULL);
-  tor_assert(link_protocol >= 3);
-  tor_assert(safe_or_conn->allowed_to_process_cells == false);
   tor_mutex_acquire(&TO_SAFE_CONN(safe_or_conn)->lock);
+  tor_assert(safe_or_conn->state == SAFE_OR_CONN_STATE_LINK_HANDSHAKING);
+  tor_assert(safe_or_conn->waiting_for_link_protocol);
+
+  uint16_t link_protocol = data.u16;
+  tor_assert(link_protocol >= 3);
 
   safe_or_conn->link_protocol = link_protocol;
   safe_or_conn->wide_circ_ids = (link_protocol >= 3);
-  safe_or_conn->allowed_to_process_cells = true;
+  safe_or_conn->waiting_for_link_protocol = false;
   event_active(TO_SAFE_CONN(safe_or_conn)->read_event, 0, 0);
+  // we need to process incoming cells on the buffer, even if there's
+  // no data waiting on the incoming socket
+
+  tor_mutex_release(&TO_SAFE_CONN(safe_or_conn)->lock);
+}
+
+static void
+safe_or_conn_open_cb(event_label_t label, event_data_t data, void *context)
+{
+  (void)data;
+
+  safe_or_connection_t *safe_or_conn = context;
+  tor_assert(label == or_conn_open_ev);
+  tor_assert(safe_or_conn != NULL);
+  tor_mutex_acquire(&TO_SAFE_CONN(safe_or_conn)->lock);
+  tor_assert(safe_or_conn->state == SAFE_OR_CONN_STATE_LINK_HANDSHAKING);
+
+  safe_or_connection_update_state(safe_or_conn, SAFE_OR_CONN_STATE_OPEN);
 
   tor_mutex_release(&TO_SAFE_CONN(safe_or_conn)->lock);
 }
@@ -633,7 +688,7 @@ safe_or_connection_refill_buckets(safe_or_connection_t *safe_or_conn,
 
 // TODO: this might be better implemented as a message so that we don't need
 //       to wait for the lock (but would require us to add a listener to the
-//       safe conn
+//       safe conn)
 void
 safe_or_connection_adjust_buckets(safe_or_connection_t *safe_or_conn,
                                   uint32_t rate, uint32_t burst,
@@ -1335,7 +1390,9 @@ safe_or_connection_read_cb(safe_connection_t *safe_conn)
     // TODO: we may not actually want to read here now that the states are
     // updated, should we re-check?
 
-    bool use_conn_buckets = (safe_or_conn->state == SAFE_OR_CONN_STATE_OPEN);
+    //bool use_conn_buckets = (safe_or_conn->state == SAFE_OR_CONN_STATE_OPEN);
+    bool use_conn_buckets = false;
+    // TODO: still need to implement a timer event to refresh the token buckets
 
     tor_error_t rv = safe_or_connection_read_encrypted(safe_or_conn,
                                                        use_conn_buckets);
@@ -1344,7 +1401,7 @@ safe_or_connection_read_cb(safe_connection_t *safe_conn)
         SAFE_OR_CONN_STATE_CLOSED) == E_SUCCESS);
     }
 
-    if (safe_or_conn->allowed_to_process_cells) {
+    if (!safe_or_conn->waiting_for_link_protocol) {
       process_cells_from_inbuf(safe_or_conn);
     }
 
@@ -1516,7 +1573,7 @@ process_cells_from_inbuf(safe_or_connection_t *safe_or_conn)
         // this is the first VERSIONS cell we've received;
         // in order to process future cells, we need to be told our
         // protocol version
-        safe_or_conn->allowed_to_process_cells = false;
+        safe_or_conn->waiting_for_link_protocol = true;
         return;
       }
     } else {

+ 6 - 6
src/core/or/safe_connection.h

@@ -73,6 +73,7 @@ typedef struct safe_connection_t {
   struct buf_t *outbuf;
 
   event_source_t *event_source;
+  event_listener_t *event_listener;
 } safe_connection_t;
 
 typedef struct safe_or_connection_t {
@@ -82,9 +83,10 @@ typedef struct safe_or_connection_t {
   or_conn_state_t state;
   bool is_outgoing;
   char *remote_address_str;
+
   uint16_t link_protocol;
   bool wide_circ_ids;
-  bool allowed_to_process_cells;
+  bool waiting_for_link_protocol;
 
   socket_rw_state_t tor_read_wanted;
   socket_rw_state_t tor_write_wanted;
@@ -111,6 +113,7 @@ void safe_or_conn_buf_data_event_update(event_label_t label,
 
 void
 safe_connection_init(safe_connection_t *safe_conn, uint32_t type_magic,
+                     event_source_t *conn_event_source,
                      bool (*is_read_wanted)(safe_connection_t *),
                      bool (*is_write_wanted)(safe_connection_t *),
                      void (*read_cb)(safe_connection_t *),
@@ -162,11 +165,8 @@ safe_connection_outbuf_modified(safe_connection_t *safe_conn);
 
 safe_or_connection_t *
 safe_or_connection_new(bool requires_buffers, bool is_outgoing,
-                       const char *remote_address_str);
-
-void
-safe_or_connection_set_link_protocol(safe_or_connection_t *safe_or_conn,
-                                     uint16_t link_protocol);
+                       const char *remote_address_str,
+                       event_source_t *conn_event_source);
 
 void
 safe_or_connection_get_tls_desc(safe_or_connection_t *safe_or_conn,

+ 1 - 0
src/lib/evloop/events.h

@@ -18,6 +18,7 @@ typedef int64_t event_label_t;
 typedef union event_data_t {
   void *ptr;
   uint64_t u64;
+  uint16_t u16;
 } event_data_t;
 
 /* Object to hold an individual event and associated data. */