Browse Source

Can set a silent mode for events

If you need to send many event at once, you probably don't want
to alert the listener threads for each event. Instead you can
set the event source to deliver events silently, and then manually
wake up listener threads when you're done.
Steven Engler 4 years ago
parent
commit
e4be10e787
2 changed files with 93 additions and 3 deletions
  1. 87 3
      src/lib/evloop/events.c
  2. 6 0
      src/lib/evloop/events.h

+ 87 - 3
src/lib/evloop/events.c

@@ -287,7 +287,7 @@ event_listener_set_callback(event_listener_t *listener, event_label_t label,
 
 static void
 event_listener_receive(event_listener_t *listener, event_label_t label,
-                       event_wrapper_t *wrapper)
+                       event_wrapper_t *wrapper, bool notify)
 {
   tor_assert(listener != NULL);
   tor_assert(label != EVENT_LABEL_UNSET);
@@ -330,7 +330,24 @@ event_listener_receive(event_listener_t *listener, event_label_t label,
     TOR_TAILQ_INSERT_TAIL(&listener->pending_events, wrapper, next_event);
   }
 
-  if (!listener->is_pending) {
+  if (!listener->is_pending && notify) {
+    listener->is_pending = true;
+    if (listener->eventloop_ev != NULL) {
+      event_active(listener->eventloop_ev, EV_READ, 1);
+    }
+  }
+
+  tor_mutex_release(&listener->lock);
+}
+
+static void
+event_listener_wakeup(event_listener_t *listener)
+{
+  tor_assert(listener != NULL);
+  tor_mutex_acquire(&listener->lock);
+
+  if (!listener->is_pending && !TOR_TAILQ_EMPTY(&listener->pending_events)) {
+    // not pending but have waiting events
     listener->is_pending = true;
     if (listener->eventloop_ev != NULL) {
       event_active(listener->eventloop_ev, EV_READ, 1);
@@ -390,6 +407,7 @@ event_source_new(void)
 {
   event_source_t* source = tor_malloc_zero(sizeof(event_source_t));
   tor_mutex_init(&source->lock);
+  source->deliver_silently = smartlist_new();
   source->subscriptions = smartlist_new();
 
   return source;
@@ -408,6 +426,7 @@ event_source_free(event_source_t *source)
     }
   } SMARTLIST_FOREACH_END(sub);
   smartlist_free(source->subscriptions);
+  smartlist_free(source->deliver_silently);
 
   memset(source, 0x00, sizeof(*source));
   tor_free(source);
@@ -532,9 +551,74 @@ event_source_publish(event_source_t *source, event_label_t label,
     return;
   }
 
+  bool deliver_silently;
+  if (index >= smartlist_len(source->deliver_silently)) {
+    // default is to not deliver silently
+    deliver_silently = false;
+  } else {
+    deliver_silently = (smartlist_get(source->deliver_silently, index) != 0);
+  }
+
   event_wrapper_t *wrapper = NULL;
   wrapper = event_wrapper_new(label, data, free_data_fn);
-  event_listener_receive(sub->listener, label, wrapper);
+  event_listener_receive(sub->listener, label, wrapper, !deliver_silently);
+
+  tor_mutex_release(&source->lock);
+}
+
+void
+event_source_deliver_silently(event_source_t *source, event_label_t label,
+                              bool deliver_silently)
+{
+  tor_assert(source != NULL);
+  tor_assert(label != EVENT_LABEL_UNSET);
+
+  int index = (int)label;
+  tor_assert(index >= 0);
+
+  if (index >= 1000) {
+    log_warn(LD_BUG, "An event label was very large (%d), but the event source "
+                     "assumes that event labels are small.", index);
+    /* We're using a smartlist as a lookup table, and assume that the labels are
+       small and therefore the list should not be sparse. If the label is large,
+       then we either have *many* events, or we're choosing our event labels
+       inefficiently. */
+  }
+
+  tor_mutex_acquire(&source->lock);
+
+  smartlist_grow(source->deliver_silently, index+1);
+  // default is to not deliver silently
+  smartlist_set(source->deliver_silently, index, (void *)deliver_silently);
+
+  tor_mutex_release(&source->lock);
+}
+
+void
+event_source_wakeup_listener(event_source_t *source, event_label_t label)
+{
+  tor_assert(source != NULL);
+  tor_assert(label != EVENT_LABEL_UNSET);
+
+  int index = (int)label;
+  tor_assert(index >= 0);
+
+  tor_mutex_acquire(&source->lock);
+
+  if (index >= smartlist_len(source->subscriptions)) {
+    // there are no subscribers for this event
+    tor_mutex_release(&source->lock);
+    return;
+  }
+
+  event_subscription_t *sub = smartlist_get(source->subscriptions, index);
+  if (sub == NULL || sub->listener == NULL) {
+    // there are no subscribers for this event
+    tor_mutex_release(&source->lock);
+    return;
+  }
+
+  event_listener_wakeup(sub->listener);
 
   tor_mutex_release(&source->lock);
 }

+ 6 - 0
src/lib/evloop/events.h

@@ -38,6 +38,7 @@ typedef struct event_registry_t {
 /* An object that publishes events to any subscribed listeners. */
 typedef struct event_source_t {
   tor_mutex_t lock;
+  smartlist_t *deliver_silently;
   smartlist_t *subscriptions;
 } event_source_t;
 
@@ -104,4 +105,9 @@ void event_source_unsubscribe_all(event_source_t *source,
 void event_source_publish(event_source_t *source, event_label_t label,
                           event_data_t data, void (*free_data_fn)(void *));
 
+void event_source_deliver_silently(event_source_t *source, event_label_t label,
+                                   bool deliver_silently);
+
+void event_source_wakeup_listener(event_source_t *source, event_label_t label);
+
 #endif