Browse Source

Removed edge-triggered events and added updatable events

Since edge-triggered events lose the order of events, they were
replaced with updatable events. If a listener receives two or more
of these events in a row, they will be squashed into a single event.
Steven Engler 4 years ago
parent
commit
a41ce2a9b1

+ 13 - 12
src/core/mainloop/connection.c

@@ -391,22 +391,23 @@ or_connection_new(int type, int socket_family)
 
   event_listener_set_callback(TO_CONN(or_conn)->event_listener,
                               safe_or_conn_tcp_connecting_ev,
-                              false, connection_or_process_event);
+                              NULL, connection_or_process_event);
   event_listener_set_callback(TO_CONN(or_conn)->event_listener,
                               safe_or_conn_tls_handshaking_ev,
-                              false, connection_or_process_event);
+                              NULL, connection_or_process_event);
   event_listener_set_callback(TO_CONN(or_conn)->event_listener,
                               safe_or_conn_link_handshaking_ev,
-                              false, connection_or_process_event);
+                              NULL, connection_or_process_event);
   event_listener_set_callback(TO_CONN(or_conn)->event_listener,
                               safe_or_conn_open_ev,
-                              false, connection_or_process_event);
+                              NULL, connection_or_process_event);
   event_listener_set_callback(TO_CONN(or_conn)->event_listener,
                               safe_or_conn_closed_ev,
-                              false, connection_or_process_event);
+                              NULL, connection_or_process_event);
   event_listener_set_callback(TO_CONN(or_conn)->event_listener,
                               safe_or_conn_has_buffered_data_ev,
-                              true, connection_or_process_event);
+                              safe_or_conn_buf_data_event_update,
+                              connection_or_process_event);
 
   connection_or_set_canonical(or_conn, 0);
 
@@ -1891,17 +1892,17 @@ connection_handle_listener_read(connection_t *conn, int new_type)
       newconn->safe_conn = safe_conn;
 
       safe_connection_subscribe(newconn->safe_conn, newconn->event_listener,
-                                safe_or_conn_tcp_connecting_ev, true);
+                                safe_or_conn_tcp_connecting_ev);
       safe_connection_subscribe(newconn->safe_conn, newconn->event_listener,
-                                safe_or_conn_tls_handshaking_ev, true);
+                                safe_or_conn_tls_handshaking_ev);
       safe_connection_subscribe(newconn->safe_conn, newconn->event_listener,
-                                safe_or_conn_link_handshaking_ev, true);
+                                safe_or_conn_link_handshaking_ev);
       safe_connection_subscribe(newconn->safe_conn, newconn->event_listener,
-                                safe_or_conn_open_ev, true);
+                                safe_or_conn_open_ev);
       safe_connection_subscribe(newconn->safe_conn, newconn->event_listener,
-                                safe_or_conn_closed_ev, true);
+                                safe_or_conn_closed_ev);
       safe_connection_subscribe(newconn->safe_conn, newconn->event_listener,
-                                safe_or_conn_has_buffered_data_ev, false);
+                                safe_or_conn_has_buffered_data_ev);
 
       safe_connection_set_socket(newconn->safe_conn, news);
     }

+ 17 - 9
src/core/or/connection_or.c

@@ -84,7 +84,8 @@
 
 //static int connection_tls_finish_handshake(or_connection_t *conn);
 static int connection_or_launch_v3_or_handshake(or_connection_t *conn);
-static int connection_or_new_process_cells_from_inbuf(or_connection_t *conn);
+static int connection_or_new_process_cells_from_inbuf(or_connection_t *conn,
+                                                      size_t num_bytes);
 //static int connection_or_process_cells_from_inbuf(or_connection_t *conn);
 //static int connection_or_check_valid_tls_handshake(or_connection_t *conn,
 //                                                   int started_here,
@@ -521,7 +522,8 @@ connection_or_process_event(event_label_t label, event_data_t data,
     connection_close_immediate(TO_CONN(or_conn));
     connection_mark_for_close_internal(TO_CONN(or_conn));
   } else if (label == safe_or_conn_has_buffered_data_ev) {
-    connection_or_new_process_cells_from_inbuf(or_conn);
+    size_t num_bytes = data.u64;
+    connection_or_new_process_cells_from_inbuf(or_conn, num_bytes);
   } else {
     log_warn(LD_OR, "Received an OR event that we don't recognize");
   }
@@ -1565,22 +1567,22 @@ connection_or_connect, (const tor_addr_t *_addr, uint16_t port,
 
   safe_connection_subscribe(TO_CONN(conn)->safe_conn,
                             TO_CONN(conn)->event_listener,
-                            safe_or_conn_tcp_connecting_ev, true);
+                            safe_or_conn_tcp_connecting_ev);
   safe_connection_subscribe(TO_CONN(conn)->safe_conn,
                             TO_CONN(conn)->event_listener,
-                            safe_or_conn_tls_handshaking_ev, true);
+                            safe_or_conn_tls_handshaking_ev);
   safe_connection_subscribe(TO_CONN(conn)->safe_conn,
                             TO_CONN(conn)->event_listener,
-                            safe_or_conn_link_handshaking_ev, true);
+                            safe_or_conn_link_handshaking_ev);
   safe_connection_subscribe(TO_CONN(conn)->safe_conn,
                             TO_CONN(conn)->event_listener,
-                            safe_or_conn_open_ev, true);
+                            safe_or_conn_open_ev);
   safe_connection_subscribe(TO_CONN(conn)->safe_conn,
                             TO_CONN(conn)->event_listener,
-                            safe_or_conn_closed_ev, true);
+                            safe_or_conn_closed_ev);
   safe_connection_subscribe(TO_CONN(conn)->safe_conn,
                             TO_CONN(conn)->event_listener,
-                            safe_or_conn_has_buffered_data_ev, false);
+                            safe_or_conn_has_buffered_data_ev);
 
   /*
    * Set up conn so it's got all the data we need to remember for channels
@@ -2547,11 +2549,17 @@ connection_or_fetch_var_cell(or_connection_t *or_conn, var_cell_t **var_cell_ptr
 }
 
 static int
-connection_or_new_process_cells_from_inbuf(or_connection_t *conn)
+connection_or_new_process_cells_from_inbuf(or_connection_t *conn,
+                                           size_t num_bytes)
 {
   //log_debug(LD_OR, "Starting OR conn process inbuf");
   log_warn(LD_OR, "Starting OR conn process inbuf for conn %p", conn);
 
+  // TODO: we should only use 'num_bytes' bytes from the buffer, instead we
+  //       just ignore this
+  //       we would also need to keep track of the number of unused bytes so
+  //       that we can use them again next time we call this function
+
   while (1) {
     var_cell_t *var_cell = NULL;
     int found_var_cell = connection_or_fetch_var_cell(conn, &var_cell);

+ 13 - 6
src/core/or/safe_connection.c

@@ -73,6 +73,15 @@ safe_or_conn_register_events(event_registry_t *registry)
     event_registry_register_event(registry, "OR Connection Has Data In Buffer");
 }
 
+void
+safe_or_conn_buf_data_event_update(event_label_t label,
+                                   event_data_t *old_data,
+                                   event_data_t *new_data)
+{
+  tor_assert(label == safe_or_conn_has_buffered_data_ev);
+  old_data->u64 += new_data->u64;
+}
+
 /********************************************************/
 
 void
@@ -245,14 +254,12 @@ safe_connection_write_cb(evutil_socket_t ev_sock, short fd, void *void_safe_conn
 
 void
 safe_connection_subscribe(safe_connection_t *safe_conn,
-                          event_listener_t *listener, event_label_t label,
-                          bool send_full_event)
+                          event_listener_t *listener, event_label_t label)
 {
   tor_assert(safe_conn != NULL);
   tor_mutex_acquire(&safe_conn->lock);
 
-  event_source_subscribe(safe_conn->event_source, listener, label,
-                         send_full_event);
+  event_source_subscribe(safe_conn->event_source, listener, label);
 
   tor_mutex_release(&safe_conn->lock);
 }
@@ -1028,9 +1035,9 @@ safe_or_connection_read_encrypted(safe_or_connection_t *safe_or_conn,
 
   // let any listeners know that we have new data in our incoming buffer
   if (bytes_read > 0) {
-    event_data_t null_data = { .ptr = NULL };
+    event_data_t event_data = { .u64 = bytes_read };
     event_source_publish(TO_SAFE_CONN(safe_or_conn)->event_source,
-                         safe_or_conn_has_buffered_data_ev, null_data, NULL);
+                         safe_or_conn_has_buffered_data_ev, event_data, NULL);
   }
 
   size_t tls_bytes_read = 0;

+ 5 - 2
src/core/or/safe_connection.h

@@ -99,6 +99,10 @@ safe_or_connection_t *TO_SAFE_OR_CONN(safe_connection_t *safe_conn);
 
 void safe_or_conn_register_events(event_registry_t *registry);
 
+void safe_or_conn_buf_data_event_update(event_label_t label,
+                                        event_data_t *old_data,
+                                        event_data_t *new_data);
+
 /********************************************************/
 
 void
@@ -117,8 +121,7 @@ safe_connection_set_socket(safe_connection_t *safe_conn, tor_socket_t socket);
 
 void
 safe_connection_subscribe(safe_connection_t *safe_conn,
-                          event_listener_t *listener, event_label_t label,
-                          bool send_full_event);
+                          event_listener_t *listener, event_label_t label);
 
 void
 safe_connection_unsubscribe_all(safe_connection_t *safe_conn,

+ 40 - 91
src/lib/evloop/events.c

@@ -12,25 +12,22 @@
 /* How a subscribed listener wants to receive an event. */
 typedef struct event_subscription_t {
   event_listener_t *listener;
-  bool send_full_event;
 } event_subscription_t;
 
 /* What a listener should do if it receives an event. */
 typedef struct event_callback_t {
-  bool edge_triggered;
-  bool is_edge_pending;
-  void (*process_event_fn)(event_label_t, event_data_t, void *);
+  event_update_fn_t update_fn;
+  process_event_fn_t process_event_fn;
 } event_callback_t;
 
 /**************************/
 
 static event_subscription_t *
-event_subscription_new(event_listener_t *listener, bool send_full_event)
+event_subscription_new(event_listener_t *listener)
 {
   tor_assert(listener != NULL);
   event_subscription_t *sub = tor_malloc_zero(sizeof(event_subscription_t));
   sub->listener = listener;
-  sub->send_full_event = send_full_event;
   return sub;
 }
 
@@ -45,13 +42,12 @@ event_subscription_free(event_subscription_t *sub)
 /**************************/
 
 static event_callback_t *
-event_callback_new(bool edge_triggered,
-                   void (*process_event_fn)(event_label_t, event_data_t, void *))
+event_callback_new(event_update_fn_t update_fn,
+                   process_event_fn_t process_event_fn)
 {
   tor_assert(process_event_fn != NULL);
   event_callback_t *cb = tor_malloc_zero(sizeof(event_callback_t));
-  cb->edge_triggered = edge_triggered;
-  cb->is_edge_pending = false;
+  cb->update_fn = update_fn;
   cb->process_event_fn = process_event_fn;
   return cb;
 }
@@ -253,10 +249,8 @@ event_listener_detach(event_listener_t *listener)
 
 void
 event_listener_set_callback(event_listener_t *listener, event_label_t label,
-                            bool edge_triggered,
-                            void (*process_event_fn)(event_label_t,
-                                                     event_data_t,
-                                                     void *))
+                            event_update_fn_t update_fn,
+                            process_event_fn_t process_event_fn)
 {
   tor_assert(listener != NULL);
   tor_assert(label != EVENT_LABEL_UNSET);
@@ -265,7 +259,7 @@ event_listener_set_callback(event_listener_t *listener, event_label_t label,
   int index = (int)label;
   tor_assert(index >= 0);
 
-  event_callback_t *cb = event_callback_new(edge_triggered, process_event_fn);
+  event_callback_t *cb = event_callback_new(update_fn, process_event_fn);
 
   if (index >= 1000) {
     log_warn(LD_BUG, "An event label was very large (%d), but the event "
@@ -322,10 +316,13 @@ event_listener_receive(event_listener_t *listener, event_label_t label,
     return;
   }
 
-  if (cb->edge_triggered) {
-    cb->is_edge_pending = true;
+  event_wrapper_t *last = TOR_TAILQ_LAST(&listener->pending_events,
+                                         pending_events_head_t);
+  if (cb->update_fn != NULL && last != NULL && last->label == label) {
+    // the last added event was of the same type and we set an update function,
+    // so we should update the last event rather than adding a new one
+    cb->update_fn(label, &last->data, &wrapper->data);
     if (wrapper != NULL) {
-      log_warn(LD_BUG, "An edge-triggered event received a full event");
       event_wrapper_free(wrapper);
     }
   } else {
@@ -351,73 +348,34 @@ event_listener_process(event_listener_t *listener)
   tor_mutex_acquire(&listener->lock);
 
   void *context = listener->context;
-  bool more_events = true;
-
-  while (more_events) {
-    // first process edge-triggered events
-    event_data_t null_data = { .ptr = NULL };
-    SMARTLIST_FOREACH_BEGIN(listener->callbacks, event_callback_t *, cb) {
-      if (cb != NULL && cb->is_edge_pending) {
-        void (*process_event_fn)(event_label_t, event_data_t, void *) = NULL;
-        process_event_fn = cb->process_event_fn;
-        event_label_t label = (int)cb_sl_idx;
-        cb->is_edge_pending = false;
-
-        tor_mutex_release(&listener->lock);
-
-        if (PREDICT_LIKELY(process_event_fn != NULL)) {
-          process_event_fn(label, null_data, context);
-          // edge-triggered events don't have corresponding event data
-        } else {
-          // no callback available
-          log_warn(LD_BUG, "An edge event was received but had no callback");
-        }
-
-        tor_mutex_acquire(&listener->lock);
-        // while we were unlocked, the list may have changed length,
-        // so we make sure it's correct
-        cb_sl_len = smartlist_len(listener->callbacks);
-      }
-    } SMARTLIST_FOREACH_END(cb);
-
-    // then process regular events
-    while (!TOR_TAILQ_EMPTY(&listener->pending_events)) {
-      event_wrapper_t *wrapper = TOR_TAILQ_FIRST(&listener->pending_events);
-      TOR_TAILQ_REMOVE(&listener->pending_events, wrapper, next_event);
-      tor_assert(wrapper != NULL);
-
-      void (*process_event_fn)(event_label_t, event_data_t, void *) = NULL;
-      int index = (int)wrapper->label;
-
-      // do we have a callback for this event label?
-      if (PREDICT_LIKELY(index < smartlist_len(listener->callbacks))) {
-        event_callback_t *cb = smartlist_get(listener->callbacks, index);
-        if (cb != NULL) {
-          tor_assert(!cb->edge_triggered);
-          process_event_fn = cb->process_event_fn;
-        }
-      }
 
-      tor_mutex_release(&listener->lock);
+  while (!TOR_TAILQ_EMPTY(&listener->pending_events)) {
+    event_wrapper_t *wrapper = TOR_TAILQ_FIRST(&listener->pending_events);
+    TOR_TAILQ_REMOVE(&listener->pending_events, wrapper, next_event);
+    tor_assert(wrapper != NULL);
+
+    process_event_fn_t process_event_fn = NULL;
+    int index = (int)wrapper->label;
 
-      if (PREDICT_LIKELY(process_event_fn != NULL)) {
-        process_event_fn(wrapper->label, wrapper->data, context);
-      } else {
-        // no callback available
-        log_warn(LD_BUG, "An event was received but had no callback");
+    // do we have a callback for this event label?
+    if (PREDICT_LIKELY(index < smartlist_len(listener->callbacks))) {
+      event_callback_t *cb = smartlist_get(listener->callbacks, index);
+      if (cb != NULL) {
+        process_event_fn = cb->process_event_fn;
       }
+    }
 
-      event_wrapper_free(wrapper);
-      tor_mutex_acquire(&listener->lock);
+    tor_mutex_release(&listener->lock);
+
+    if (PREDICT_LIKELY(process_event_fn != NULL)) {
+      process_event_fn(wrapper->label, wrapper->data, context);
+    } else {
+      // no callback available
+      log_warn(LD_BUG, "An event was received but had no callback");
     }
 
-    // there's a possibility edge events have been added while running callbacks
-    more_events = false;
-    SMARTLIST_FOREACH_BEGIN(listener->callbacks, event_callback_t *, cb) {
-      if (cb != NULL && cb->is_edge_pending) {
-        more_events = true;
-      }
-    } SMARTLIST_FOREACH_END(cb);
+    event_wrapper_free(wrapper);
+    tor_mutex_acquire(&listener->lock);
   }
 
   listener->is_pending = false;
@@ -457,7 +415,7 @@ event_source_free(event_source_t *source)
 
 void
 event_source_subscribe(event_source_t *source, event_listener_t *listener,
-                       event_label_t label, bool send_full_event)
+                       event_label_t label)
 {
   tor_assert(source != NULL);
   tor_assert(listener != NULL);
@@ -475,7 +433,7 @@ event_source_subscribe(event_source_t *source, event_listener_t *listener,
        inefficiently. */
   }
 
-  event_subscription_t *sub = event_subscription_new(listener, send_full_event);
+  event_subscription_t *sub = event_subscription_new(listener);
 
   tor_mutex_acquire(&source->lock);
 
@@ -575,16 +533,7 @@ event_source_publish(event_source_t *source, event_label_t label,
   }
 
   event_wrapper_t *wrapper = NULL;
-
-  if (sub->send_full_event) {
-    wrapper = event_wrapper_new(label, data, free_data_fn);
-  } else {
-    // we don't need to send an event, so free the data now
-    if (free_data_fn != NULL) {
-      free_data_fn(data.ptr);
-    }
-  }
-
+  wrapper = event_wrapper_new(label, data, free_data_fn);
   event_listener_receive(sub->listener, label, wrapper);
 
   tor_mutex_release(&source->lock);

+ 10 - 6
src/lib/evloop/events.h

@@ -44,12 +44,18 @@ typedef struct event_source_t {
 typedef struct event_listener_t {
   tor_mutex_t lock;
   smartlist_t *callbacks;
-  TOR_TAILQ_HEAD(, event_wrapper_t) pending_events;
+  TOR_TAILQ_HEAD(pending_events_head_t, event_wrapper_t) pending_events;
   bool is_pending;
   struct event *eventloop_ev;
   void *context;
 } event_listener_t;
 
+typedef void (*event_update_fn_t)(event_label_t,
+                                  event_data_t *,
+                                  event_data_t *);
+
+typedef void (*process_event_fn_t)(event_label_t, event_data_t, void *);
+
 /* Create the event registry. */
 event_registry_t *event_registry_new(void);
 
@@ -74,10 +80,8 @@ void event_listener_attach(event_listener_t *listener, struct event_base *base);
 void event_listener_detach(event_listener_t *listener);
 
 void event_listener_set_callback(event_listener_t *listener, event_label_t label,
-                                 bool edge_triggered,
-                                 void (*process_event_fn)(event_label_t,
-                                                          event_data_t,
-                                                          void *));
+                                 event_update_fn_t update_fn,
+                                 process_event_fn_t process_event_fn);
 
 void event_listener_process(event_listener_t *listener);
 
@@ -87,7 +91,7 @@ event_source_t *event_source_new(void);
 void event_source_free(event_source_t *source);
 
 void event_source_subscribe(event_source_t *source, event_listener_t *listener,
-                            event_label_t label, bool send_full_event);
+                            event_label_t label);
 
 void event_source_unsubscribe(event_source_t *source,
                               event_listener_t *listener,