Browse Source

Added support for multiple eventloops

Each eventloop runs in a different thread. By default no additional
eventloops/threads are started, but this can be changed by modifying
'num_additional_eventloops' in 'config.c'. At the moment, these
additional eventloops don't do anything (functionality will be
added later).
Steven Engler 2 years ago
parent
commit
818176870a

+ 5 - 1
configure.ac

@@ -776,7 +776,7 @@ if test "$enable_static_libevent" = "yes"; then
     fi
 fi
 
-TOR_SEARCH_LIBRARY(libevent, $trylibeventdir, [-levent $STATIC_LIBEVENT_FLAGS $TOR_LIB_WS32], [
+TOR_SEARCH_LIBRARY(libevent, $trylibeventdir, [-levent -levent_pthreads $STATIC_LIBEVENT_FLAGS $TOR_LIB_WS32], [
 #ifdef _WIN32
 #include <winsock2.h>
 #endif
@@ -817,6 +817,7 @@ else
      if test "x$ac_cv_header_event2_event_h" = "xyes"; then
        AC_SEARCH_LIBS(event_new, [event event_core], , AC_MSG_ERROR("libevent2 is installed but linking it failed while searching for event_new"))
        AC_SEARCH_LIBS(evdns_base_new, [event event_extra], , AC_MSG_ERROR("libevent2 is installed but linking it failed while searching for evdns_base_new"))
+       AC_SEARCH_LIBS(evthread_use_pthreads, [event event_pthreads], , AC_MSG_ERROR("libevent2 is installed but linking it failed while searching for evthread_use_pthreads"))
 
        if test "$ac_cv_search_event_new" != "none required"; then
          TOR_LIBEVENT_LIBS="$ac_cv_search_event_new"
@@ -824,6 +825,9 @@ else
        if test "$ac_cv_search_evdns_base_new" != "none required"; then
          TOR_LIBEVENT_LIBS="$ac_cv_search_evdns_base_new $TOR_LIBEVENT_LIBS"
        fi
+       if test "$ac_cv_search_evthread_use_pthreads" != "none required"; then
+         TOR_LIBEVENT_LIBS="$ac_cv_search_evthread_use_pthreads $TOR_LIBEVENT_LIBS"
+       fi
      else
        AC_MSG_ERROR("libevent2 is required but the headers could not be found")
      fi

+ 6 - 1
src/app/config/config.c

@@ -7991,7 +7991,12 @@ init_libevent(const or_options_t *options)
   cfg.num_cpus = get_num_cpus(options);
   cfg.msec_per_tick = options->TokenBucketRefillInterval;
 
-  tor_libevent_initialize(&cfg);
+  /* Don't use any additional eventloops (only use the mainloop). */
+  int num_additional_eventloops = 0;
+
+  tor_libevent_initialize(&cfg, num_additional_eventloops);
+  log_info(LD_CONFIG, "Initializing libevent with %d additional eventloops.",
+           num_additional_eventloops);
 
   suppress_libevent_log_msg(NULL);
 }

+ 34 - 1
src/core/mainloop/mainloop.c

@@ -52,6 +52,7 @@
 #include "app/config/config.h"
 #include "app/config/statefile.h"
 #include "app/main/ntmain.h"
+#include "app/main/tor_threads.h"
 #include "core/mainloop/connection.h"
 #include "core/mainloop/cpuworker.h"
 #include "core/mainloop/mainloop.h"
@@ -180,6 +181,10 @@ static int main_loop_should_exit = 0;
  * main_loop_should_exit is true.
  */
 static int main_loop_exit_value = 0;
+/** Flag: if true, it's time to shut down, so the other eventloops should
+ * exit as soon as possible.
+ */
+static int other_eventloops_should_exit = 0;
 
 /** We set this to 1 when we've opened a circuit, so we can print a log
  * entry to inform the user that Tor is working.  We set it to 0 when
@@ -2327,6 +2332,23 @@ initialize_mainloop_events(void)
   }
 }
 
+static void
+tor_eventloop_thread(void)
+{
+  log_debug(LD_GENERAL, "Starting eventloop thread.");
+  int loop_result = 0;
+  while (loop_result == 0 && other_eventloops_should_exit == 0) {
+    loop_result = event_base_loop(tor_libevent_get_base(), EVLOOP_NO_EXIT_ON_EMPTY);
+  }
+  log_debug(LD_GENERAL, "Done eventloop thread.");
+
+  if (other_eventloops_should_exit == 0) {
+    log_err(LD_GENERAL, "Eventloop thread stopped unexpectedly. (loop_result:%d, \
+                         other_eventloops_should_exit:%d)",
+            loop_result, other_eventloops_should_exit);
+  }
+}
+
 /** Tor main loop. */
 int
 do_main_loop(void)
@@ -2391,7 +2413,17 @@ do_main_loop(void)
   }
 #endif /* defined(ENABLE_RESTART_DEBUGGING) */
 
-  return run_main_loop_until_done();
+  /* Start our eventloop threads, then start the main eventloop. */
+  other_eventloops_should_exit = 0;
+  start_eventloop_threads(tor_eventloop_thread, start_tor_thread);
+  int rv = run_main_loop_until_done();
+
+  /* Stop our eventloops in other threads. */
+  other_eventloops_should_exit = 1;
+  rescan_eventloops();
+  join_eventloop_threads();
+
+  return rv;
 }
 
 #ifndef _WIN32
@@ -2554,6 +2586,7 @@ tor_mainloop_free_all(void)
   called_loop_once = 0;
   main_loop_should_exit = 0;
   main_loop_exit_value = 0;
+  other_eventloops_should_exit = 0;
   can_complete_circuits = 0;
   quiet_level = 0;
   should_init_bridge_stats = 1;

+ 193 - 29
src/lib/evloop/compat_libevent.c

@@ -76,8 +76,11 @@ tor_event_free_(struct event *ev)
   event_free(ev);
 }
 
-/** Global event base for use by the main thread. */
-static struct event_base *the_event_base = NULL;
+/** Global event bases for use by the eventloop threads. */
+static tor_thread_t **threads = NULL;
+static struct event_base **eventloops = NULL;
+static tor_threadlocal_t eventloop_index;
+static int num_eventloops = -1;
 
 /**
  * @defgroup postloop post-loop event helpers
@@ -95,11 +98,11 @@ static struct event_base *the_event_base = NULL;
  * @{ */
 
 /**
- * An event that stops Libevent from running any more events on the current
+ * Events that stop Libevent from running any more events on the current
  * iteration of its loop, until it has re-checked for socket events, signal
  * events, timeouts, etc.
  */
-static struct event *rescan_mainloop_ev = NULL;
+struct event **rescan_eventloop_events = NULL;
 
 /**
  * Callback to implement rescan_mainloop_ev: it simply exits the mainloop,
@@ -116,6 +119,40 @@ rescan_mainloop_cb(evutil_socket_t fd, short events, void *arg)
 
 /** @} */
 
+struct event_base *
+get_eventloop(int index)
+{
+  tor_assert(index >= 0 && index < num_eventloops);
+  tor_assert(eventloops != NULL);
+  tor_assert(eventloops[index] != NULL);
+  return eventloops[index];
+}
+
+int
+get_local_eventloop_index(void) {
+  int *index = tor_threadlocal_get(&eventloop_index);
+  tor_assert(index != NULL);
+  return *index;
+}
+
+int
+get_num_eventloops(void)
+{
+  tor_assert(num_eventloops != -1);
+  return num_eventloops;
+}
+
+/* Tell the eventloops to stop soon. */
+void
+rescan_eventloops(void)
+{
+  tor_assert(rescan_eventloop_events != NULL);
+  for (int i=0; i<num_eventloops; i++) {
+    tor_assert(rescan_eventloop_events[i] != NULL);
+    event_active(rescan_eventloop_events[i], EV_READ, 1);
+  }
+}
+
 /* This is what passes for version detection on OSX.  We set
  * MACOSX_KQUEUE_IS_BROKEN to true iff we're on a version of OSX before
  * 10.4.0 (aka 1040). */
@@ -128,26 +165,123 @@ rescan_mainloop_cb(evutil_socket_t fd, short events, void *arg)
 #endif /* defined(__ENVIRONMENT_MAC_OS_X_VERSION_MIN_REQUIRED__) */
 #endif /* defined(__APPLE__) */
 
+/** Initialize the threadlocals. Must be run before libevent is initialized. */
+void
+tor_evloop_init_threadlocals(void)
+{
+  tor_assert(tor_threadlocal_init(&eventloop_index) == 0);
+}
+
+/** Destroy the threadlocals. Must be run after tor_libevent_free_all. */
+void
+tor_evloop_destroy_threadlocals(void)
+{
+  tor_threadlocal_destroy(&eventloop_index);
+}
+
+/** Initialize the current thread. */
+static void
+init_eventloop_thread(int index)
+{
+  tor_assert(index >= 0 && index < num_eventloops);
+  tor_assert(tor_threadlocal_get(&eventloop_index) == NULL);
+
+  int* this_thread_index = tor_malloc(sizeof(int));
+  *this_thread_index = index;
+  tor_threadlocal_set(&eventloop_index, (void *)this_thread_index);
+}
+
+/* Destory the current thread. */
+static void
+destroy_eventloop_thread(void)
+{
+  int *this_thread_index = (int *)tor_threadlocal_get(&eventloop_index);
+  if (this_thread_index != NULL) {
+    tor_threadlocal_set(&eventloop_index, NULL);
+    tor_free(this_thread_index);
+  }
+}
+
+struct thread_data {
+  void (*func)(void);
+  int index;
+};
+
+static void
+thread_wrapper(void *data_void)
+{
+  tor_assert(data_void != NULL);
+  struct thread_data *data = (struct thread_data *)data_void;
+  void (*func)(void) = data->func;
+  int index = data->index;
+  tor_free(data_void);
+
+  init_eventloop_thread(index);
+  func();
+  destroy_eventloop_thread();
+}
+
+/* Start the eventloop threads and make them run 'func'. Threads will
+   be started using 'spawn_fn'. */
+void
+start_eventloop_threads(void (*func)(void),
+                        tor_thread_t *(*spawn_fn)(void (*func)(void *),
+                                                  void *data))
+{
+  // the first thread (i==0) should already have been initialized
+  for (int i=1; i<num_eventloops; i++) {
+    struct thread_data *data = tor_malloc(sizeof(struct thread_data));
+    data->func = func;
+    data->index = i;
+    tor_thread_t *thread = spawn_fn(thread_wrapper, (void *)data);
+    tor_assert(thread != NULL);
+    threads[i] = thread;
+  }
+}
+
+/* Wait for the eventloop threads to finish. */
+void
+join_eventloop_threads(void)
+{
+  for (int i=1; i<num_eventloops; i++) {
+    if (threads[i] != NULL) {
+      tor_assert(join_thread(threads[i]) == 0);
+      free_thread(threads[i]);
+      threads[i] = NULL;
+    }
+  }
+}
+
 /** Initialize the Libevent library and set up the event base. */
 void
-tor_libevent_initialize(tor_libevent_cfg *torcfg)
+tor_libevent_initialize(tor_libevent_cfg *torcfg,
+                        int num_additional_eventloops)
 {
-  tor_assert(the_event_base == NULL);
   /* some paths below don't use torcfg, so avoid unused variable warnings */
   (void)torcfg;
 
+  tor_assert(num_eventloops == -1);
+  num_eventloops = num_additional_eventloops+1;
+  eventloops = tor_malloc_zero(num_eventloops*sizeof(struct event_base *));
+  threads = tor_malloc_zero(num_eventloops*sizeof(tor_thread_t *));
+  rescan_eventloop_events = tor_malloc_zero(num_eventloops*sizeof(tor_thread_t *));
+
+  /* This main thread has no tor_thread_t, so we set it to NULL. Should already
+     be NULL from tor_malloc_zero, but we do this explicitly anyways. */
+  threads[0] = NULL;
+
+  int libevent_started_correctly = 1;
+
   {
     int attempts = 0;
     struct event_config *cfg;
 
+    tor_assert(evthread_use_pthreads() == 0);
+
     ++attempts;
     cfg = event_config_new();
     tor_assert(cfg);
 
-    /* Telling Libevent not to try to turn locking on can avoid a needless
-     * socketpair() attempt. */
-    event_config_set_flag(cfg, EVENT_BASE_FLAG_NOLOCK);
-
     if (torcfg->num_cpus > 0)
       event_config_set_num_cpus_hint(cfg, torcfg->num_cpus);
 
@@ -155,26 +289,28 @@ tor_libevent_initialize(tor_libevent_cfg *torcfg)
      * Libevent any dup'd fds.  This lets us avoid some syscalls. */
     event_config_set_flag(cfg, EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST);
 
-    the_event_base = event_base_new_with_config(cfg);
+    for (int i=0; i<num_eventloops; i++) {
+      eventloops[i] = event_base_new_with_config(cfg);
+      rescan_eventloop_events[i] = event_new(eventloops[i], -1, 0,
+                                             rescan_mainloop_cb,
+                                             eventloops[i]);
+      libevent_started_correctly = (libevent_started_correctly &&
+                                    eventloops[i] != NULL &&
+                                    rescan_eventloop_events[i] != NULL);
+    }
 
     event_config_free(cfg);
   }
 
-  if (!the_event_base) {
+  if (libevent_started_correctly == 0) {
     /* LCOV_EXCL_START */
     log_err(LD_GENERAL, "Unable to initialize Libevent: cannot continue.");
     exit(1); // exit ok: libevent is broken.
     /* LCOV_EXCL_STOP */
   }
 
-  rescan_mainloop_ev = event_new(the_event_base, -1, 0,
-                                 rescan_mainloop_cb, the_event_base);
-  if (!rescan_mainloop_ev) {
-    /* LCOV_EXCL_START */
-    log_err(LD_GENERAL, "Unable to create rescan event: cannot continue.");
-    exit(1); // exit ok: libevent is broken.
-    /* LCOV_EXCL_STOP */
-  }
+  /* Initialize this main thread. */
+  init_eventloop_thread(0);
 
   log_info(LD_GENERAL,
       "Initialized libevent version %s using method %s. Good.",
@@ -188,22 +324,26 @@ tor_libevent_initialize(tor_libevent_cfg *torcfg)
 bool
 tor_libevent_is_initialized(void)
 {
-  return the_event_base != NULL;
+  return num_eventloops != -1;
 }
 
 /** Return the current Libevent event base that we're set up to use. */
 MOCK_IMPL(struct event_base *,
 tor_libevent_get_base, (void))
 {
-  tor_assert(the_event_base != NULL);
-  return the_event_base;
+  int *index = tor_threadlocal_get(&eventloop_index);
+  tor_assert(index != NULL);
+  struct event_base *event_base = eventloops[*index];
+  tor_assert(event_base != NULL);
+  return event_base;
 }
 
 /** Return the name of the Libevent backend we're using. */
 const char *
 tor_libevent_get_method(void)
 {
-  return event_base_get_method(the_event_base);
+  struct event_base *event_base = tor_libevent_get_base();
+  return event_base_get_method(event_base);
 }
 
 /** Return a string representation of the version of the currently running
@@ -354,7 +494,12 @@ mainloop_event_postloop_cb(evutil_socket_t fd, short what, void *arg)
    * callback run after rescan_mainloop_cb is called -- that is, on the
    * next iteration of the loop.
    */
-  event_active(rescan_mainloop_ev, EV_READ, 1);
+
+  int *index = tor_threadlocal_get(&eventloop_index);
+  tor_assert(index != NULL);
+  struct event *rescan_event = rescan_eventloop_events[*index];
+  tor_assert(rescan_event != NULL);
+  event_active(rescan_event, EV_READ, 1);
 
   mainloop_event_t *mev = arg;
   mev->cb(mev, mev->userdata);
@@ -495,10 +640,29 @@ tor_init_libevent_rng(void)
 void
 tor_libevent_free_all(void)
 {
-  tor_event_free(rescan_mainloop_ev);
-  if (the_event_base)
-    event_base_free(the_event_base);
-  the_event_base = NULL;
+  /* Make sure all eventloop threads have stopped. */
+  join_eventloop_threads();
+
+  /* Destroy this main thread. Other eventloop threads should already have
+     called this locally. */
+  destroy_eventloop_thread();
+
+  /* Destroy the rescan events since we own them. */
+  for (int i=0; i<num_eventloops; i++) {
+    event_free(rescan_eventloop_events[i]);
+    rescan_eventloop_events[i] = NULL;
+  }
+
+  /* Destory the event bases since we own them. */
+  for (int i=0; i<num_eventloops; i++) {
+    event_base_free(eventloops[i]);
+    eventloops[i] = NULL;
+  }
+
+  tor_free(eventloops);
+  tor_free(threads);
+  tor_free(rescan_eventloop_events);
+  num_eventloops = -1;
 }
 
 /**

+ 16 - 1
src/lib/evloop/compat_libevent.h

@@ -12,6 +12,7 @@
 #include "orconfig.h"
 #include "lib/testsupport/testsupport.h"
 #include "lib/malloc/malloc.h"
+#include "lib/thread/threads.h"
 
 #include <stdbool.h>
 
@@ -69,7 +70,21 @@ typedef struct tor_libevent_cfg {
   int msec_per_tick;
 } tor_libevent_cfg;
 
-void tor_libevent_initialize(tor_libevent_cfg *cfg);
+struct event_base *get_eventloop(int index);
+int get_local_eventloop_index(void);
+int get_num_eventloops(void);
+void rescan_eventloops(void);
+
+void tor_evloop_init_threadlocals(void);
+void tor_evloop_destroy_threadlocals(void);
+
+void start_eventloop_threads(void (*func)(void),
+                             tor_thread_t *(*spawn_fn)(void (*func)(void *),
+                                                       void *data));
+void join_eventloop_threads(void);
+
+void tor_libevent_initialize(tor_libevent_cfg *cfg,
+                             int num_additional_eventloops);
 bool tor_libevent_is_initialized(void);
 MOCK_DECL(struct event_base *, tor_libevent_get_base, (void));
 const char *tor_libevent_get_method(void);

+ 4 - 0
src/lib/evloop/evloop_sys.c

@@ -18,10 +18,13 @@
 static int
 subsys_evloop_initialize(void)
 {
+  tor_evloop_init_threadlocals();
+
   if (tor_init_libevent_rng() < 0) {
     log_warn(LD_NET, "Problem initializing libevent RNG.");
     return -1;
   }
+
   return 0;
 }
 
@@ -37,6 +40,7 @@ static void
 subsys_evloop_shutdown(void)
 {
   tor_libevent_free_all();
+  tor_evloop_destroy_threadlocals();
 }
 
 const struct subsys_fns_t sys_evloop = {

+ 10 - 1
src/test/test-timers.c

@@ -15,6 +15,9 @@
 #include "lib/time/compat_time.h"
 #include "lib/wallclock/timeval.h"
 
+#include "lib/subsys/subsys.h"
+#include "lib/evloop/evloop_sys.h"
+
 #define N_TIMERS 1000
 #define MAX_DURATION 30
 #define N_DISABLE 5
@@ -59,9 +62,15 @@ main(int argc, char **argv)
 {
   (void)argc;
   (void)argv;
+
+  if (sys_evloop.initialize()) {
+    printf("Couldn't initialize evloop subsystem; exiting.\n");
+    return 1;
+  }
+
   tor_libevent_cfg cfg;
   memset(&cfg, 0, sizeof(cfg));
-  tor_libevent_initialize(&cfg);
+  tor_libevent_initialize(&cfg, 0);
   timers_initialize();
   init_logging(1);
 

+ 18 - 2
src/test/test_workqueue.c

@@ -14,6 +14,11 @@
 #include "lib/intmath/weakrng.h"
 #include "lib/crypt_ops/crypto_init.h"
 
+#include "lib/subsys/subsys.h"
+#include "lib/net/network_sys.h"
+#include "lib/thread/thread_sys.h"
+#include "lib/evloop/evloop_sys.h"
+
 #include <stdio.h>
 
 #define MAX_INFLIGHT (1<<16)
@@ -391,7 +396,18 @@ main(int argc, char **argv)
   }
 
   init_logging(1);
-  network_init();
+  if (sys_network.initialize()) {
+    printf("Couldn't initialize network subsystem; exiting.\n");
+    return 1;
+  }
+  if (sys_threads.initialize()) {
+    printf("Couldn't initialize threads subsystem; exiting.\n");
+    return 1;
+  }
+  if (sys_evloop.initialize()) {
+    printf("Couldn't initialize evloop subsystem; exiting.\n");
+    return 1;
+  }
   if (crypto_global_init(1, NULL, NULL) < 0) {
     printf("Couldn't initialize crypto subsystem; exiting.\n");
     return 1;
@@ -415,7 +431,7 @@ main(int argc, char **argv)
   crypto_seed_weak_rng(&weak_rng);
 
   memset(&evcfg, 0, sizeof(evcfg));
-  tor_libevent_initialize(&evcfg);
+  tor_libevent_initialize(&evcfg, 0);
 
   {
     struct event_base *base = tor_libevent_get_base();

+ 1 - 1
src/test/testing_common.c

@@ -268,7 +268,7 @@ main(int c, const char **v)
 
   struct tor_libevent_cfg cfg;
   memset(&cfg, 0, sizeof(cfg));
-  tor_libevent_initialize(&cfg);
+  tor_libevent_initialize(&cfg, 0);
 
   control_initialize_event_queue();