Explorar el Código

Merge branch 'decouple_controller_events_squashed'

Nick Mathewson hace 8 años
padre
commit
eafae7f677

+ 8 - 0
changes/decouple_control_events

@@ -0,0 +1,8 @@
+  o Code simplification and refactoring:
+    - When generating an event to send to the controller, we no longer
+      put the event over the network immediately.  Instead, we queue
+      these events, and use a Libevent callback to deliver them.
+      This change simplifies Tor's callgraph by reducing the number
+      of functions from which all other Tor functions are reachable.
+      Closes ticket 16695.
+

+ 27 - 0
src/common/compat_pthreads.c

@@ -275,6 +275,33 @@ tor_cond_signal_all(tor_cond_t *cond)
   pthread_cond_broadcast(&cond->cond);
 }
 
+int
+tor_threadlocal_init(tor_threadlocal_t *threadlocal)
+{
+  int err = pthread_key_create(&threadlocal->key, NULL);
+  return err ? -1 : 0;
+}
+
+void
+tor_threadlocal_destroy(tor_threadlocal_t *threadlocal)
+{
+  pthread_key_delete(threadlocal->key);
+  memset(threadlocal, 0, sizeof(tor_threadlocal_t));
+}
+
+void *
+tor_threadlocal_get(tor_threadlocal_t *threadlocal)
+{
+  return pthread_getspecific(threadlocal->key);
+}
+
+void
+tor_threadlocal_set(tor_threadlocal_t *threadlocal, void *value)
+{
+  int err = pthread_setspecific(threadlocal->key, value);
+  tor_assert(err == 0);
+}
+
 /** Set up common structures for use by threading. */
 void
 tor_threads_init(void)

+ 36 - 0
src/common/compat_threads.h

@@ -111,5 +111,41 @@ typedef struct alert_sockets_s {
 int alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags);
 void alert_sockets_close(alert_sockets_t *socks);
 
+typedef struct tor_threadlocal_s {
+#ifdef _WIN32
+  DWORD index;
+#else
+  pthread_key_t key;
+#endif
+} tor_threadlocal_t;
+
+/** Initialize a thread-local variable.
+ *
+ * After you call this function on a tor_threadlocal_t, you can call
+ * tor_threadlocal_set to change the current value of this variable for the
+ * current thread, and tor_threadlocal_get to retrieve the current value for
+ * the current thread.  Each thread has its own value.
+ **/
+int tor_threadlocal_init(tor_threadlocal_t *threadlocal);
+/**
+ * Release all resource associated with a thread-local variable.
+ */
+void tor_threadlocal_destroy(tor_threadlocal_t *threadlocal);
+/**
+ * Return the current value of a thread-local variable for this thread.
+ *
+ * It's undefined behavior to use this function if the threadlocal hasn't
+ * been initialized, or has been destroyed.
+ */
+void *tor_threadlocal_get(tor_threadlocal_t *threadlocal);
+/**
+ * Change the current value of a thread-local variable for this thread to
+ * <b>value</b>.
+ *
+ * It's undefined behavior to use this function if the threadlocal hasn't
+ * been initialized, or has been destroyed.
+ */
+void tor_threadlocal_set(tor_threadlocal_t *threadlocal, void *value);
+
 #endif
 

+ 43 - 0
src/common/compat_winthreads.c

@@ -124,6 +124,49 @@ tor_cond_signal_all(tor_cond_t *cond)
   tor_cond_signal_impl(cond, 1);
 }
 
+int
+tor_threadlocal_init(tor_threadlocal_t *threadlocal)
+{
+  threadlocal->index = TlsAlloc();
+  return (threadlocal->index == TLS_OUT_OF_INDEXES) ? -1 : 0;
+}
+
+void
+tor_threadlocal_destroy(tor_threadlocal_t *threadlocal)
+{
+  TlsFree(threadlocal->index);
+  memset(threadlocal, 0, sizeof(tor_threadlocal_t));
+}
+
+void *
+tor_threadlocal_get(tor_threadlocal_t *threadlocal)
+{
+  void *value = TlsGetValue(threadlocal->index);
+  if (value == NULL) {
+    DWORD err = GetLastError();
+    if (err != ERROR_SUCCESS) {
+      char *msg = format_win32_error(err);
+      log_err(LD_GENERAL, "Error retrieving thread-local value: %s", msg);
+      tor_free(msg);
+      tor_assert(err == ERROR_SUCCESS);
+    }
+  }
+  return value;
+}
+
+void
+tor_threadlocal_set(tor_threadlocal_t *threadlocal, void *value)
+{
+  BOOL ok = TlsSetValue(threadlocal->index, value);
+  if (!ok) {
+    DWORD err = GetLastError();
+    char *msg = format_win32_error(err);
+    log_err(LD_GENERAL, "Error adjusting thread-local value: %s", msg);
+    tor_free(msg);
+    tor_assert(ok);
+  }
+}
+
 int
 tor_cond_wait(tor_cond_t *cond, tor_mutex_t *lock_, const struct timeval *tv)
 {

+ 3 - 0
src/or/config.c

@@ -1106,6 +1106,9 @@ options_act_reversible(const or_options_t *old_options, char **msg)
       init_libevent(options);
       libevent_initialized = 1;
 
+      /* This has to come up after libevent is initialized. */
+      control_initialize_event_queue();
+
       /*
        * Initialize the scheduler - this has to come after
        * options_init_from_torrc() sets up libevent - why yes, that seems

+ 310 - 95
src/or/control.c

@@ -20,6 +20,7 @@
 #include "circuitstats.h"
 #include "circuituse.h"
 #include "command.h"
+#include "compat_libevent.h"
 #include "config.h"
 #include "confparse.h"
 #include "connection.h"
@@ -50,6 +51,12 @@
 #include <sys/resource.h>
 #endif
 
+#ifdef HAVE_EVENT2_EVENT_H
+#include <event2/event.h>
+#else
+#include <event.h>
+#endif
+
 #include "crypto_s2k.h"
 #include "procmon.h"
 
@@ -110,17 +117,17 @@ static char last_sent_bootstrap_message[BOOTSTRAP_MSG_LEN];
 static void connection_printf_to_buf(control_connection_t *conn,
                                      const char *format, ...)
   CHECK_PRINTF(2,3);
-static void send_control_event_impl(uint16_t event, event_format_t which,
+static void send_control_event_impl(uint16_t event,
                                     const char *format, va_list ap)
-  CHECK_PRINTF(3,0);
+  CHECK_PRINTF(2,0);
 static int control_event_status(int type, int severity, const char *format,
                                 va_list args)
   CHECK_PRINTF(3,0);
 
 static void send_control_done(control_connection_t *conn);
-static void send_control_event(uint16_t event, event_format_t which,
+static void send_control_event(uint16_t event,
                                const char *format, ...)
-  CHECK_PRINTF(3,4);
+  CHECK_PRINTF(2,3);
 static int handle_control_setconf(control_connection_t *conn, uint32_t len,
                                   char *body);
 static int handle_control_resetconf(control_connection_t *conn, uint32_t len,
@@ -181,6 +188,8 @@ static void orconn_target_get_name(char *buf, size_t len,
 static int get_cached_network_liveness(void);
 static void set_cached_network_liveness(int liveness);
 
+static void flush_queued_events_cb(evutil_socket_t fd, short what, void *arg);
+
 /** Given a control event code for a message event, return the corresponding
  * log severity. */
 static INLINE int
@@ -578,46 +587,217 @@ send_control_done(control_connection_t *conn)
   connection_write_str_to_buf("250 OK\r\n", conn);
 }
 
-/** Send an event to all v1 controllers that are listening for code
- * <b>event</b>.  The event's body is given by <b>msg</b>.
+/** Represents an event that's queued to be sent to one or more
+ * controllers. */
+typedef struct queued_event_s {
+  uint16_t event;
+  char *msg;
+} queued_event_t;
+
+/** Pointer to int. If this is greater than 0, we don't allow new events to be
+ * queued. */
+static tor_threadlocal_t block_event_queue;
+
+/** Holds a smartlist of queued_event_t objects that may need to be sent
+ * to one or more controllers */
+static smartlist_t *queued_control_events = NULL;
+
+/** True if the flush_queued_events_event is pending. */
+static int flush_queued_event_pending = 0;
+
+/** Lock to protect the above fields. */
+static tor_mutex_t *queued_control_events_lock = NULL;
+
+/** An event that should fire in order to flush the contents of
+ * queued_control_events. */
+static struct event *flush_queued_events_event = NULL;
+
+void
+control_initialize_event_queue(void)
+{
+  if (queued_control_events == NULL) {
+    queued_control_events = smartlist_new();
+  }
+
+  if (flush_queued_events_event == NULL) {
+    struct event_base *b = tor_libevent_get_base();
+    if (b) {
+      flush_queued_events_event = tor_event_new(b,
+                                              -1, 0, flush_queued_events_cb,
+                                              NULL);
+      tor_assert(flush_queued_events_event);
+    }
+  }
+
+  if (queued_control_events_lock == NULL) {
+    queued_control_events_lock = tor_mutex_new();
+    tor_threadlocal_init(&block_event_queue);
+  }
+}
+
+static int *
+get_block_event_queue(void)
+{
+  int *val = tor_threadlocal_get(&block_event_queue);
+  if (PREDICT_UNLIKELY(val == NULL)) {
+    val = tor_malloc_zero(sizeof(int));
+    tor_threadlocal_set(&block_event_queue, val);
+  }
+  return val;
+}
+
+/** Helper: inserts an event on the list of events queued to be sent to
+ * one or more controllers, and schedules the events to be flushed if needed.
  *
- * If <b>which</b> & SHORT_NAMES, the event contains short-format names: send
- * it to controllers that haven't enabled the VERBOSE_NAMES feature.  If
- * <b>which</b> & LONG_NAMES, the event contains long-format names: send it
- * to controllers that <em>have</em> enabled VERBOSE_NAMES.
+ * This function takes ownership of <b>msg</b>, and may free it.
  *
- * The EXTENDED_FORMAT and NONEXTENDED_FORMAT flags behave similarly with
- * respect to the EXTENDED_EVENTS feature. */
+ * We queue these events rather than send them immediately in order to break
+ * the dependency in our callgraph from code that generates events for the
+ * controller, and the network layer at large.  Otherwise, nearly every
+ * interesting part of Tor would potentially call every other interesting part
+ * of Tor.
+ */
 MOCK_IMPL(STATIC void,
-send_control_event_string,(uint16_t event, event_format_t which,
-                           const char *msg))
+queue_control_event_string,(uint16_t event, char *msg))
 {
-  smartlist_t *conns = get_connection_array();
-  (void)which;
-  tor_assert(event >= EVENT_MIN_ && event <= EVENT_MAX_);
+  /* This is redundant with checks done elsewhere, but it's a last-ditch
+   * attempt to avoid queueing something we shouldn't have to queue. */
+  if (PREDICT_UNLIKELY( ! EVENT_IS_INTERESTING(event) )) {
+    tor_free(msg);
+    return;
+  }
+
+  int *block_event_queue = get_block_event_queue();
+  if (*block_event_queue) {
+    tor_free(msg);
+    return;
+  }
+
+  queued_event_t *ev = tor_malloc(sizeof(*ev));
+  ev->event = event;
+  ev->msg = msg;
+
+  /* No queueing an event while queueing an event */
+  ++*block_event_queue;
+
+  tor_mutex_acquire(queued_control_events_lock);
+  tor_assert(queued_control_events);
+  smartlist_add(queued_control_events, ev);
+
+  int activate_event = 0;
+  if (! flush_queued_event_pending && in_main_thread()) {
+    activate_event = 1;
+    flush_queued_event_pending = 1;
+  }
+
+  tor_mutex_release(queued_control_events_lock);
+
+  --*block_event_queue;
+
+  /* We just put an event on the queue; mark the queue to be
+   * flushed.  We only do this from the main thread for now; otherwise,
+   * we'd need to incur locking overhead in Libevent or use a socket.
+   */
+  if (activate_event) {
+    tor_assert(flush_queued_events_event);
+    event_active(flush_queued_events_event, EV_READ, 1);
+  }
+}
+
+/** Release all storage held by <b>ev</b>. */
+static void
+queued_event_free(queued_event_t *ev)
+{
+  if (ev == NULL)
+    return;
+
+  tor_free(ev->msg);
+  tor_free(ev);
+}
+
+/** Send every queued event to every controller that's interested in it,
+ * and remove the events from the queue.  If <b>force</b> is true,
+ * then make all controllers send their data out immediately, since we
+ * may be about to shut down. */
+static void
+queued_events_flush_all(int force)
+{
+  if (PREDICT_UNLIKELY(queued_control_events == NULL)) {
+    return;
+  }
+  smartlist_t *all_conns = get_connection_array();
+  smartlist_t *controllers = smartlist_new();
+  smartlist_t *queued_events;
+
+  int *block_event_queue = get_block_event_queue();
+  ++*block_event_queue;
+
+  tor_mutex_acquire(queued_control_events_lock);
+  /* No queueing an event while flushing events. */
+  flush_queued_event_pending = 0;
+  queued_events = queued_control_events;
+  queued_control_events = smartlist_new();
+  tor_mutex_release(queued_control_events_lock);
 
-  SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) {
+  /* Gather all the controllers that will care... */
+  SMARTLIST_FOREACH_BEGIN(all_conns, connection_t *, conn) {
     if (conn->type == CONN_TYPE_CONTROL &&
         !conn->marked_for_close &&
         conn->state == CONTROL_CONN_STATE_OPEN) {
       control_connection_t *control_conn = TO_CONTROL_CONN(conn);
 
-      if (control_conn->event_mask & (((event_mask_t)1)<<event)) {
-        int is_err = 0;
-        connection_write_to_buf(msg, strlen(msg), TO_CONN(control_conn));
-        if (event == EVENT_ERR_MSG)
-          is_err = 1;
-        else if (event == EVENT_STATUS_GENERAL)
-          is_err = !strcmpstart(msg, "STATUS_GENERAL ERR ");
-        else if (event == EVENT_STATUS_CLIENT)
-          is_err = !strcmpstart(msg, "STATUS_CLIENT ERR ");
-        else if (event == EVENT_STATUS_SERVER)
-          is_err = !strcmpstart(msg, "STATUS_SERVER ERR ");
-        if (is_err)
-          connection_flush(TO_CONN(control_conn));
-      }
+      smartlist_add(controllers, control_conn);
     }
   } SMARTLIST_FOREACH_END(conn);
+
+  SMARTLIST_FOREACH_BEGIN(queued_events, queued_event_t *, ev) {
+    const event_mask_t bit = ((event_mask_t)1) << ev->event;
+    const size_t msg_len = strlen(ev->msg);
+    SMARTLIST_FOREACH_BEGIN(controllers, control_connection_t *,
+                            control_conn) {
+      if (control_conn->event_mask & bit) {
+        connection_write_to_buf(ev->msg, msg_len, TO_CONN(control_conn));
+      }
+    } SMARTLIST_FOREACH_END(control_conn);
+
+    queued_event_free(ev);
+  } SMARTLIST_FOREACH_END(ev);
+
+  if (force) {
+    SMARTLIST_FOREACH_BEGIN(controllers, control_connection_t *,
+                            control_conn) {
+      connection_flush(TO_CONN(control_conn));
+    } SMARTLIST_FOREACH_END(control_conn);
+  }
+
+  smartlist_free(queued_events);
+  smartlist_free(controllers);
+
+  --*block_event_queue;
+}
+
+/** Libevent callback: Flushes pending events to controllers that are
+ * interested in them */
+static void
+flush_queued_events_cb(evutil_socket_t fd, short what, void *arg)
+{
+  (void) fd;
+  (void) what;
+  (void) arg;
+  queued_events_flush_all(0);
+}
+
+/** Send an event to all v1 controllers that are listening for code
+ * <b>event</b>.  The event's body is given by <b>msg</b>.
+ *
+ * The EXTENDED_FORMAT and NONEXTENDED_FORMAT flags behave similarly with
+ * respect to the EXTENDED_EVENTS feature. */
+MOCK_IMPL(STATIC void,
+send_control_event_string,(uint16_t event,
+                           const char *msg))
+{
+  tor_assert(event >= EVENT_MIN_ && event <= EVENT_MAX_);
+  queue_control_event_string(event, tor_strdup(msg));
 }
 
 /** Helper for send_control_event and control_event_status:
@@ -625,8 +805,8 @@ send_control_event_string,(uint16_t event, event_format_t which,
  * <b>event</b>.  The event's body is created by the printf-style format in
  * <b>format</b>, and other arguments as provided. */
 static void
-send_control_event_impl(uint16_t event, event_format_t which,
-                         const char *format, va_list ap)
+send_control_event_impl(uint16_t event,
+                        const char *format, va_list ap)
 {
   char *buf = NULL;
   int len;
@@ -637,21 +817,19 @@ send_control_event_impl(uint16_t event, event_format_t which,
     return;
   }
 
-  send_control_event_string(event, which|ALL_FORMATS, buf);
-
-  tor_free(buf);
+  queue_control_event_string(event, buf);
 }
 
 /** Send an event to all v1 controllers that are listening for code
  * <b>event</b>.  The event's body is created by the printf-style format in
  * <b>format</b>, and other arguments as provided. */
 static void
-send_control_event(uint16_t event, event_format_t which,
+send_control_event(uint16_t event,
                    const char *format, ...)
 {
   va_list ap;
   va_start(ap, format);
-  send_control_event_impl(event, which, format, ap);
+  send_control_event_impl(event, format, ap);
   va_end(ap);
 }
 
@@ -4281,7 +4459,7 @@ control_event_circuit_status(origin_circuit_t *circ, circuit_status_event_t tp,
   {
     char *circdesc = circuit_describe_status_for_controller(circ);
     const char *sp = strlen(circdesc) ? " " : "";
-    send_control_event(EVENT_CIRCUIT_STATUS, ALL_FORMATS,
+    send_control_event(EVENT_CIRCUIT_STATUS,
                                 "650 CIRC %lu %s%s%s%s\r\n",
                                 (unsigned long)circ->global_identifier,
                                 status, sp,
@@ -4352,7 +4530,7 @@ control_event_circuit_status_minor(origin_circuit_t *circ,
   {
     char *circdesc = circuit_describe_status_for_controller(circ);
     const char *sp = strlen(circdesc) ? " " : "";
-    send_control_event(EVENT_CIRCUIT_STATUS_MINOR, ALL_FORMATS,
+    send_control_event(EVENT_CIRCUIT_STATUS_MINOR,
                        "650 CIRC_MINOR %lu %s%s%s%s\r\n",
                        (unsigned long)circ->global_identifier,
                        event_desc, sp,
@@ -4527,7 +4705,7 @@ control_event_stream_status(entry_connection_t *conn, stream_status_event_t tp,
   circ = circuit_get_by_edge_conn(ENTRY_TO_EDGE_CONN(conn));
   if (circ && CIRCUIT_IS_ORIGIN(circ))
     origin_circ = TO_ORIGIN_CIRCUIT(circ);
-  send_control_event(EVENT_STREAM_STATUS, ALL_FORMATS,
+  send_control_event(EVENT_STREAM_STATUS,
                         "650 STREAM "U64_FORMAT" %s %lu %s%s%s%s\r\n",
                      U64_PRINTF_ARG(ENTRY_TO_CONN(conn)->global_identifier),
                      status,
@@ -4599,7 +4777,7 @@ control_event_or_conn_status(or_connection_t *conn, or_conn_status_event_t tp,
   }
 
   orconn_target_get_name(name, sizeof(name), conn);
-  send_control_event(EVENT_OR_CONN_STATUS, ALL_FORMATS,
+  send_control_event(EVENT_OR_CONN_STATUS,
                               "650 ORCONN %s %s%s%s%s ID="U64_FORMAT"\r\n",
                               name, status,
                               reason ? " REASON=" : "",
@@ -4622,7 +4800,7 @@ control_event_stream_bandwidth(edge_connection_t *edge_conn)
     if (!edge_conn->n_read && !edge_conn->n_written)
       return 0;
 
-    send_control_event(EVENT_STREAM_BANDWIDTH_USED, ALL_FORMATS,
+    send_control_event(EVENT_STREAM_BANDWIDTH_USED,
                        "650 STREAM_BW "U64_FORMAT" %lu %lu\r\n",
                        U64_PRINTF_ARG(edge_conn->base_.global_identifier),
                        (unsigned long)edge_conn->n_read,
@@ -4657,7 +4835,7 @@ control_event_stream_bandwidth_used(void)
         if (!edge_conn->n_read && !edge_conn->n_written)
           continue;
 
-        send_control_event(EVENT_STREAM_BANDWIDTH_USED, ALL_FORMATS,
+        send_control_event(EVENT_STREAM_BANDWIDTH_USED,
                            "650 STREAM_BW "U64_FORMAT" %lu %lu\r\n",
                            U64_PRINTF_ARG(edge_conn->base_.global_identifier),
                            (unsigned long)edge_conn->n_read,
@@ -4686,7 +4864,7 @@ control_event_circ_bandwidth_used(void)
     ocirc = TO_ORIGIN_CIRCUIT(circ);
     if (!ocirc->n_read_circ_bw && !ocirc->n_written_circ_bw)
       continue;
-    send_control_event(EVENT_CIRC_BANDWIDTH_USED, ALL_FORMATS,
+    send_control_event(EVENT_CIRC_BANDWIDTH_USED,
                        "650 CIRC_BW ID=%d READ=%lu WRITTEN=%lu\r\n",
                        ocirc->global_identifier,
                        (unsigned long)ocirc->n_read_circ_bw,
@@ -4722,7 +4900,7 @@ control_event_conn_bandwidth(connection_t *conn)
     default:
       return 0;
   }
-  send_control_event(EVENT_CONN_BW, ALL_FORMATS,
+  send_control_event(EVENT_CONN_BW,
                      "650 CONN_BW ID="U64_FORMAT" TYPE=%s "
                      "READ=%lu WRITTEN=%lu\r\n",
                      U64_PRINTF_ARG(conn->global_identifier),
@@ -4869,7 +5047,7 @@ control_event_circuit_cell_stats(void)
       continue;
     sum_up_cell_stats_by_command(circ, cell_stats);
     format_cell_stats(&event_string, circ, cell_stats);
-    send_control_event(EVENT_CELL_STATS, ALL_FORMATS,
+    send_control_event(EVENT_CELL_STATS,
                        "650 CELL_STATS %s\r\n", event_string);
     tor_free(event_string);
   }
@@ -4891,7 +5069,7 @@ control_event_tb_empty(const char *bucket, uint32_t read_empty_time,
   if (get_options()->TestingEnableTbEmptyEvent &&
       EVENT_IS_INTERESTING(EVENT_TB_EMPTY) &&
       (read_empty_time > 0 || write_empty_time > 0)) {
-    send_control_event(EVENT_TB_EMPTY, ALL_FORMATS,
+    send_control_event(EVENT_TB_EMPTY,
                        "650 TB_EMPTY %s READ=%d WRITTEN=%d "
                        "LAST=%d\r\n",
                        bucket, read_empty_time, write_empty_time,
@@ -4924,7 +5102,7 @@ control_event_bandwidth_used(uint32_t n_read, uint32_t n_written)
     ++n_measurements;
 
   if (EVENT_IS_INTERESTING(EVENT_BANDWIDTH_USED)) {
-    send_control_event(EVENT_BANDWIDTH_USED, ALL_FORMATS,
+    send_control_event(EVENT_BANDWIDTH_USED,
                        "650 BW %lu %lu\r\n",
                        (unsigned long)n_read,
                        (unsigned long)n_written);
@@ -5023,7 +5201,11 @@ control_event_logmsg(int severity, uint32_t domain, const char *msg)
       default: s = "UnknownLogSeverity"; break;
     }
     ++disable_log_messages;
-    send_control_event(event, ALL_FORMATS, "650 %s %s\r\n", s, b?b:msg);
+    send_control_event(event,  "650 %s %s\r\n", s, b?b:msg);
+    if (severity == LOG_ERR) {
+      /* Force a flush, since we may be about to die horribly */
+      queued_events_flush_all(1);
+    }
     --disable_log_messages;
     tor_free(b);
   }
@@ -5051,7 +5233,7 @@ control_event_descriptors_changed(smartlist_t *routers)
       });
     ids = smartlist_join_strings(names, " ", 0, NULL);
     tor_asprintf(&msg, "650 NEWDESC %s\r\n", ids);
-    send_control_event_string(EVENT_NEW_DESC, ALL_FORMATS, msg);
+    send_control_event_string(EVENT_NEW_DESC,  msg);
     tor_free(ids);
     tor_free(msg);
     SMARTLIST_FOREACH(names, char *, cp, tor_free(cp));
@@ -5073,7 +5255,7 @@ control_event_address_mapped(const char *from, const char *to, time_t expires,
     return 0;
 
   if (expires < 3 || expires == TIME_MAX)
-    send_control_event(EVENT_ADDRMAP, ALL_FORMATS,
+    send_control_event(EVENT_ADDRMAP,
                                 "650 ADDRMAP %s %s NEVER %s%s"
                                 "CACHED=\"%s\"\r\n",
                                   from, to, error?error:"", error?" ":"",
@@ -5083,7 +5265,7 @@ control_event_address_mapped(const char *from, const char *to, time_t expires,
     char buf2[ISO_TIME_LEN+1];
     format_local_iso_time(buf,expires);
     format_iso_time(buf2,expires);
-    send_control_event(EVENT_ADDRMAP, ALL_FORMATS,
+    send_control_event(EVENT_ADDRMAP,
                                 "650 ADDRMAP %s %s \"%s\""
                                 " %s%sEXPIRES=\"%s\" CACHED=\"%s\"\r\n",
                                 from, to, buf,
@@ -5125,9 +5307,9 @@ control_event_or_authdir_new_descriptor(const char *action,
   buf = tor_malloc(totallen);
   strlcpy(buf, firstline, totallen);
   strlcpy(buf+strlen(firstline), esc, totallen);
-  send_control_event_string(EVENT_AUTHDIR_NEWDESCS, ALL_FORMATS,
+  send_control_event_string(EVENT_AUTHDIR_NEWDESCS,
                             buf);
-  send_control_event_string(EVENT_AUTHDIR_NEWDESCS, ALL_FORMATS,
+  send_control_event_string(EVENT_AUTHDIR_NEWDESCS,
                             "650 OK\r\n");
   tor_free(esc);
   tor_free(buf);
@@ -5163,7 +5345,7 @@ control_event_network_liveness_update(int liveness)
       /* Update cached liveness */
       set_cached_network_liveness(1);
       log_debug(LD_CONTROL, "Sending NETWORK_LIVENESS UP");
-      send_control_event_string(EVENT_NETWORK_LIVENESS, ALL_FORMATS,
+      send_control_event_string(EVENT_NETWORK_LIVENESS,
                                 "650 NETWORK_LIVENESS UP\r\n");
     }
     /* else was already live, no-op */
@@ -5172,7 +5354,7 @@ control_event_network_liveness_update(int liveness)
       /* Update cached liveness */
       set_cached_network_liveness(0);
       log_debug(LD_CONTROL, "Sending NETWORK_LIVENESS DOWN");
-      send_control_event_string(EVENT_NETWORK_LIVENESS, ALL_FORMATS,
+      send_control_event_string(EVENT_NETWORK_LIVENESS,
                                 "650 NETWORK_LIVENESS DOWN\r\n");
     }
     /* else was already dead, no-op */
@@ -5211,8 +5393,8 @@ control_event_networkstatus_changed_helper(smartlist_t *statuses,
   SMARTLIST_FOREACH(strs, char *, cp, tor_free(cp));
   smartlist_free(strs);
   tor_free(s);
-  send_control_event_string(event, ALL_FORMATS, esc);
-  send_control_event_string(event, ALL_FORMATS,
+  send_control_event_string(event,  esc);
+  send_control_event_string(event,
                             "650 OK\r\n");
 
   tor_free(esc);
@@ -5269,7 +5451,7 @@ control_event_buildtimeout_set(buildtimeout_set_event_t type,
       break;
   }
 
-  send_control_event(EVENT_BUILDTIMEOUT_SET, ALL_FORMATS,
+  send_control_event(EVENT_BUILDTIMEOUT_SET,
                      "650 BUILDTIMEOUT_SET %s %s\r\n",
                      type_string, args);
 
@@ -5310,7 +5492,7 @@ control_event_signal(uintptr_t signal)
       return -1;
   }
 
-  send_control_event(EVENT_SIGNAL, ALL_FORMATS, "650 SIGNAL %s\r\n",
+  send_control_event(EVENT_SIGNAL,  "650 SIGNAL %s\r\n",
                      signal_string);
   return 0;
 }
@@ -5338,7 +5520,7 @@ control_event_networkstatus_changed_single(const routerstatus_t *rs)
 int
 control_event_my_descriptor_changed(void)
 {
-  send_control_event(EVENT_DESCCHANGED, ALL_FORMATS, "650 DESCCHANGED\r\n");
+  send_control_event(EVENT_DESCCHANGED,  "650 DESCCHANGED\r\n");
   return 0;
 }
 
@@ -5388,24 +5570,40 @@ control_event_status(int type, int severity, const char *format, va_list args)
   }
   tor_vasprintf(&user_buf, format, args);
 
-  send_control_event(type, ALL_FORMATS, "%s %s\r\n", format_buf, user_buf);
+  send_control_event(type,  "%s %s\r\n", format_buf, user_buf);
   tor_free(user_buf);
   return 0;
 }
 
+#define CONTROL_EVENT_STATUS_BODY(event, sev)                   \
+  int r;                                                        \
+  do {                                                          \
+    va_list ap;                                                 \
+    if (!EVENT_IS_INTERESTING(event))                           \
+      return 0;                                                 \
+                                                                \
+    va_start(ap, format);                                       \
+    r = control_event_status((event), (sev), format, ap);       \
+    va_end(ap);                                                 \
+  } while (0)
+
 /** Format and send an EVENT_STATUS_GENERAL event whose main text is obtained
  * by formatting the arguments using the printf-style <b>format</b>. */
 int
 control_event_general_status(int severity, const char *format, ...)
 {
-  va_list ap;
-  int r;
-  if (!EVENT_IS_INTERESTING(EVENT_STATUS_GENERAL))
-    return 0;
+  CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_GENERAL, severity);
+  return r;
+}
 
-  va_start(ap, format);
-  r = control_event_status(EVENT_STATUS_GENERAL, severity, format, ap);
-  va_end(ap);
+/** Format and send an EVENT_STATUS_GENERAL LOG_ERR event, and flush it to the
+ * controller(s) immediately. */
+int
+control_event_general_error(const char *format, ...)
+{
+  CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_GENERAL, LOG_ERR);
+  /* Force a flush, since we may be about to die horribly */
+  queued_events_flush_all(1);
   return r;
 }
 
@@ -5414,14 +5612,18 @@ control_event_general_status(int severity, const char *format, ...)
 int
 control_event_client_status(int severity, const char *format, ...)
 {
-  va_list ap;
-  int r;
-  if (!EVENT_IS_INTERESTING(EVENT_STATUS_CLIENT))
-    return 0;
+  CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_CLIENT, severity);
+  return r;
+}
 
-  va_start(ap, format);
-  r = control_event_status(EVENT_STATUS_CLIENT, severity, format, ap);
-  va_end(ap);
+/** Format and send an EVENT_STATUS_CLIENT LOG_ERR event, and flush it to the
+ * controller(s) immediately. */
+int
+control_event_client_error(const char *format, ...)
+{
+  CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_CLIENT, LOG_ERR);
+  /* Force a flush, since we may be about to die horribly */
+  queued_events_flush_all(1);
   return r;
 }
 
@@ -5430,14 +5632,18 @@ control_event_client_status(int severity, const char *format, ...)
 int
 control_event_server_status(int severity, const char *format, ...)
 {
-  va_list ap;
-  int r;
-  if (!EVENT_IS_INTERESTING(EVENT_STATUS_SERVER))
-    return 0;
+  CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_SERVER, severity);
+  return r;
+}
 
-  va_start(ap, format);
-  r = control_event_status(EVENT_STATUS_SERVER, severity, format, ap);
-  va_end(ap);
+/** Format and send an EVENT_STATUS_SERVER LOG_ERR event, and flush it to the
+ * controller(s) immediately. */
+int
+control_event_server_error(const char *format, ...)
+{
+  CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_SERVER, LOG_ERR);
+  /* Force a flush, since we may be about to die horribly */
+  queued_events_flush_all(1);
   return r;
 }
 
@@ -5461,7 +5667,7 @@ control_event_guard(const char *nickname, const char *digest,
     } else {
       tor_snprintf(buf, sizeof(buf), "$%s~%s", hbuf, nickname);
     }
-    send_control_event(EVENT_GUARD, ALL_FORMATS,
+    send_control_event(EVENT_GUARD,
                        "650 GUARD ENTRY %s %s\r\n", buf, status);
   }
   return 0;
@@ -5492,7 +5698,7 @@ control_event_conf_changed(const smartlist_t *elements)
     }
   }
   result = smartlist_join_strings(lines, "\r\n", 0, NULL);
-  send_control_event(EVENT_CONF_CHANGED, 0,
+  send_control_event(EVENT_CONF_CHANGED,
     "650-CONF_CHANGED\r\n%s\r\n650 OK\r\n", result);
   tor_free(result);
   SMARTLIST_FOREACH(lines, char *, cp, tor_free(cp));
@@ -5882,7 +6088,7 @@ MOCK_IMPL(void,
 void
 control_event_clients_seen(const char *controller_str)
 {
-  send_control_event(EVENT_CLIENTS_SEEN, 0,
+  send_control_event(EVENT_CLIENTS_SEEN,
     "650 CLIENTS_SEEN %s\r\n", controller_str);
 }
 
@@ -5896,7 +6102,7 @@ void
 control_event_transport_launched(const char *mode, const char *transport_name,
                                  tor_addr_t *addr, uint16_t port)
 {
-  send_control_event(EVENT_TRANSPORT_LAUNCHED, ALL_FORMATS,
+  send_control_event(EVENT_TRANSPORT_LAUNCHED,
                      "650 TRANSPORT_LAUNCHED %s %s %s %u\r\n",
                      mode, transport_name, fmt_addr(addr), port);
 }
@@ -5981,7 +6187,7 @@ control_event_hs_descriptor_requested(const rend_data_t *rend_query,
     return;
   }
 
-  send_control_event(EVENT_HS_DESC, ALL_FORMATS,
+  send_control_event(EVENT_HS_DESC,
                      "650 HS_DESC REQUESTED %s %s %s %s\r\n",
                      rend_hsaddress_str_or_unknown(rend_query->onion_address),
                      rend_auth_type_to_string(rend_query->auth_type),
@@ -6045,7 +6251,7 @@ control_event_hs_descriptor_upload(const char *service_id,
     return;
   }
 
-  send_control_event(EVENT_HS_DESC, ALL_FORMATS,
+  send_control_event(EVENT_HS_DESC,
                      "650 HS_DESC UPLOAD %s UNKNOWN %s %s\r\n",
                      service_id,
                      node_describe_longname_by_id(id_digest),
@@ -6092,7 +6298,7 @@ control_event_hs_descriptor_receive_end(const char *action,
     tor_asprintf(&reason_field, " REASON=%s", reason);
   }
 
-  send_control_event(EVENT_HS_DESC, ALL_FORMATS,
+  send_control_event(EVENT_HS_DESC,
                      "650 HS_DESC %s %s %s %s%s%s\r\n",
                      action,
                      rend_hsaddress_str_or_unknown(onion_address),
@@ -6130,7 +6336,7 @@ control_event_hs_descriptor_upload_end(const char *action,
     tor_asprintf(&reason_field, " REASON=%s", reason);
   }
 
-  send_control_event(EVENT_HS_DESC, ALL_FORMATS,
+  send_control_event(EVENT_HS_DESC,
                      "650 HS_DESC %s UNKNOWN UNKNOWN %s%s\r\n",
                      action,
                      node_describe_longname_by_id(id_digest),
@@ -6215,7 +6421,7 @@ control_event_hs_descriptor_content(const char *onion_address,
   }
   write_escaped_data(content, strlen(content), &esc_content);
 
-  send_control_event(EVENT_HS_DESC_CONTENT, ALL_FORMATS,
+  send_control_event(EVENT_HS_DESC_CONTENT,
                      "650+%s %s %s %s\r\n%s650 OK\r\n",
                      event_name,
                      rend_hsaddress_str_or_unknown(onion_address),
@@ -6252,6 +6458,16 @@ control_free_all(void)
     SMARTLIST_FOREACH(detached_onion_services, char *, cp, tor_free(cp));
     smartlist_free(detached_onion_services);
   }
+  if (queued_control_events) {
+    SMARTLIST_FOREACH(queued_control_events, queued_event_t *, ev,
+                      queued_event_free(ev));
+    smartlist_free(queued_control_events);
+    queued_control_events = NULL;
+  }
+  if (flush_queued_events_event) {
+    tor_event_free(flush_queued_events_event);
+    flush_queued_events_event = NULL;
+  }
 }
 
 #ifdef TOR_UNIT_TESTS
@@ -6262,4 +6478,3 @@ control_testing_set_global_event_mask(uint64_t mask)
   global_event_mask = mask;
 }
 #endif
-

+ 14 - 9
src/or/control.h

@@ -12,6 +12,8 @@
 #ifndef TOR_CONTROL_H
 #define TOR_CONTROL_H
 
+void control_initialize_event_queue(void);
+
 void control_update_global_event_mask(void);
 void control_adjust_event_log_severity(void);
 
@@ -78,6 +80,14 @@ int control_event_client_status(int severity, const char *format, ...)
   CHECK_PRINTF(2,3);
 int control_event_server_status(int severity, const char *format, ...)
   CHECK_PRINTF(2,3);
+
+int control_event_general_error(const char *format, ...)
+  CHECK_PRINTF(1,2);
+int control_event_client_error(const char *format, ...)
+  CHECK_PRINTF(1,2);
+int control_event_server_error(const char *format, ...)
+  CHECK_PRINTF(1,2);
+
 int control_event_guard(const char *nickname, const char *digest,
                         const char *status);
 int control_event_conf_changed(const smartlist_t *elements);
@@ -203,18 +213,13 @@ void control_free_all(void);
 /* Used only by control.c and test.c */
 STATIC size_t write_escaped_data(const char *data, size_t len, char **out);
 STATIC size_t read_escaped_data(const char *data, size_t len, char **out);
-/** Flag for event_format_t.  Indicates that we should use the one standard
-    format.  (Other formats previous existed, and are now deprecated)
- */
-#define ALL_FORMATS 1
-/** Bit field of flags to select how to format a controller event.  Recognized
- * flag is ALL_FORMATS. */
-typedef int event_format_t;
 
 #ifdef TOR_UNIT_TESTS
 MOCK_DECL(STATIC void,
-send_control_event_string,(uint16_t event, event_format_t which,
-                           const char *msg));
+          send_control_event_string,(uint16_t event, const char *msg));
+
+MOCK_DECL(STATIC void,
+          queue_control_event_string,(uint16_t event, char *msg));
 
 void control_testing_set_global_event_mask(uint64_t mask);
 #endif

+ 1 - 1
src/or/main.c

@@ -1007,7 +1007,7 @@ directory_all_unreachable_cb(evutil_socket_t fd, short event, void *arg)
     connection_mark_unattached_ap(entry_conn,
                                   END_STREAM_REASON_NET_UNREACHABLE);
   }
-  control_event_general_status(LOG_ERR, "DIR_ALL_UNREACHABLE");
+  control_event_general_error("DIR_ALL_UNREACHABLE");
 }
 
 static struct event *directory_all_unreachable_cb_event = NULL;

+ 6 - 6
src/test/test_controller_events.c

@@ -395,12 +395,12 @@ test_cntev_event_mask(void *arg)
   { #name, test_cntev_ ## name, flags, 0, NULL }
 
 struct testcase_t controller_event_tests[] = {
-  TEST(bucket_note_empty, 0),
-  TEST(bucket_millis_empty, 0),
-  TEST(sum_up_cell_stats, 0),
-  TEST(append_cell_stats, 0),
-  TEST(format_cell_stats, 0),
-  TEST(event_mask, 0),
+  TEST(bucket_note_empty, TT_FORK),
+  TEST(bucket_millis_empty, TT_FORK),
+  TEST(sum_up_cell_stats, TT_FORK),
+  TEST(append_cell_stats, TT_FORK),
+  TEST(format_cell_stats, TT_FORK),
+  TEST(event_mask, TT_FORK),
   END_OF_TESTCASES
 };
 

+ 5 - 7
src/test/test_hs.c

@@ -102,13 +102,11 @@ static char *received_msg = NULL;
 /** Mock function for send_control_event_string
  */
 static void
-send_control_event_string_replacement(uint16_t event, event_format_t which,
-                                      const char *msg)
+queue_control_event_string_replacement(uint16_t event, char *msg)
 {
   (void) event;
-  (void) which;
   tor_free(received_msg);
-  received_msg = tor_strdup(msg);
+  received_msg = msg;
 }
 
 /** Mock function for node_describe_longname_by_id, it returns either
@@ -141,8 +139,8 @@ test_hs_desc_event(void *arg)
   char desc_id_base32[REND_DESC_ID_V2_LEN_BASE32 + 1];
 
   (void) arg;
-  MOCK(send_control_event_string,
-       send_control_event_string_replacement);
+  MOCK(queue_control_event_string,
+       queue_control_event_string_replacement);
   MOCK(node_describe_longname_by_id,
        node_describe_longname_by_id_replacement);
 
@@ -225,7 +223,7 @@ test_hs_desc_event(void *arg)
   smartlist_free(rend_query.hsdirs_fp);
 
  done:
-  UNMOCK(send_control_event_string);
+  UNMOCK(queue_control_event_string);
   UNMOCK(node_describe_longname_by_id);
   tor_free(received_msg);
 }

+ 5 - 7
src/test/test_pt.c

@@ -333,15 +333,13 @@ static uint16_t controlevent_event = 0;
 static smartlist_t *controlevent_msgs = NULL;
 
 static void
-send_control_event_string_replacement(uint16_t event, event_format_t which,
-                                      const char *msg)
+queue_control_event_string_replacement(uint16_t event, char *msg)
 {
-  (void) which;
   ++controlevent_n;
   controlevent_event = event;
   if (!controlevent_msgs)
     controlevent_msgs = smartlist_new();
-  smartlist_add(controlevent_msgs, tor_strdup(msg));
+  smartlist_add(controlevent_msgs, msg);
 }
 
 /* Test the configure_proxy() function. */
@@ -360,8 +358,8 @@ test_pt_configure_proxy(void *arg)
        tor_process_handle_destroy_replacement);
   MOCK(get_or_state,
        get_or_state_replacement);
-  MOCK(send_control_event_string,
-       send_control_event_string_replacement);
+  MOCK(queue_control_event_string,
+       queue_control_event_string_replacement);
 
   control_testing_set_global_event_mask(EVENT_TRANSPORT_LAUNCHED);
 
@@ -435,7 +433,7 @@ test_pt_configure_proxy(void *arg)
   UNMOCK(tor_get_lines_from_handle);
   UNMOCK(tor_process_handle_destroy);
   UNMOCK(get_or_state);
-  UNMOCK(send_control_event_string);
+  UNMOCK(queue_control_event_string);
   if (controlevent_msgs) {
     SMARTLIST_FOREACH(controlevent_msgs, char *, cp, tor_free(cp));
     smartlist_free(controlevent_msgs);

+ 8 - 6
src/test/test_threads.c

@@ -28,7 +28,7 @@ static unsigned long thread_fn_tid1, thread_fn_tid2;
 static void thread_test_func_(void* _s) ATTR_NORETURN;
 
 /** How many iterations have the threads in the unit test run? */
-static int t1_count = 0, t2_count = 0;
+static tor_threadlocal_t count;
 
 /** Helper function for threading unit tests: This function runs in a
  * subthread. It grabs its own mutex (start1 or start2) to make sure that it
@@ -38,19 +38,19 @@ static void
 thread_test_func_(void* _s)
 {
   char *s = _s;
-  int i, *count;
+  int i;
   tor_mutex_t *m;
   char buf[64];
   char **cp;
+  int *mycount = tor_malloc_zero(sizeof(int));
+  tor_threadlocal_set(&count, mycount);
   if (!strcmp(s, "thread 1")) {
     m = thread_test_start1_;
     cp = &thread1_name_;
-    count = &t1_count;
     thread_fn_tid1 = tor_get_thread_id();
   } else {
     m = thread_test_start2_;
     cp = &thread2_name_;
-    count = &t2_count;
     thread_fn_tid2 = tor_get_thread_id();
   }
 
@@ -62,8 +62,10 @@ thread_test_func_(void* _s)
   for (i=0; i<10000; ++i) {
     tor_mutex_acquire(thread_test_mutex_);
     strmap_set(thread_test_strmap_, "last to run", *cp);
-    ++*count;
     tor_mutex_release(thread_test_mutex_);
+    int *tls_count = tor_threadlocal_get(&count);
+    tor_assert(tls_count == mycount);
+    ++*tls_count;
   }
   tor_mutex_acquire(thread_test_mutex_);
   strmap_set(thread_test_strmap_, s, *cp);
@@ -89,6 +91,7 @@ test_threads_basic(void *arg)
   tv.tv_usec=100*1000;
 #endif
   (void) arg;
+  tt_int_op(tor_threadlocal_init(&count), OP_EQ, 0);
 
   set_main_thread();
 
@@ -128,7 +131,6 @@ test_threads_basic(void *arg)
   tor_mutex_free(thread_test_mutex_);
 
   if (timedout) {
-    printf("\nTimed out: %d %d", t1_count, t2_count);
     tt_assert(strmap_get(thread_test_strmap_, "thread 1"));
     tt_assert(strmap_get(thread_test_strmap_, "thread 2"));
     tt_assert(!timedout);

+ 2 - 0
src/test/testing_common.c

@@ -14,6 +14,7 @@ const char tor_git_revision[] = "";
 
 #include "orconfig.h"
 #include "or.h"
+#include "control.h"
 #include "config.h"
 #include "rephist.h"
 #include "backtrace.h"
@@ -237,6 +238,7 @@ main(int c, const char **v)
   update_approx_time(time(NULL));
   options = options_new();
   tor_threads_init();
+  control_initialize_event_queue();
   init_logging(1);
   configure_backtrace_handler(get_version());