Bladeren bron

Merge branch 'isolate_libevent_2_squashed'

Nick Mathewson 6 jaren geleden
bovenliggende
commit
98b694bfd5

+ 5 - 0
changes/isolate_libevent

@@ -0,0 +1,5 @@
+  o Code simplification and refactoring:
+    - Initial work to isolate Libevent usage to a handful of modules in our
+      codebase, to simplify our call structure, and so that we can more
+      easily change event loops in the future if needed. Closes ticket
+      23750.

+ 147 - 0
src/common/compat_libevent.c

@@ -221,6 +221,121 @@ periodic_timer_free_(periodic_timer_t *timer)
   tor_free(timer);
 }
 
+/**
+ * Type used to represent events that run directly from the main loop,
+ * either because they are activated from elsewhere in the code, or
+ * because they have a simple timeout.
+ *
+ * We use this type to avoid exposing Libevent's API throughout the rest
+ * of the codebase.
+ *
+ * This type can't be used for all events: it doesn't handle events that
+ * are triggered by signals or by sockets.
+ */
+struct mainloop_event_t {
+  struct event *ev;
+  void (*cb)(mainloop_event_t *, void *);
+  void *userdata;
+};
+
+/**
+ * Internal: Implements mainloop event using a libevent event.
+ */
+static void
+mainloop_event_cb(evutil_socket_t fd, short what, void *arg)
+{
+  (void)fd;
+  (void)what;
+  mainloop_event_t *mev = arg;
+  mev->cb(mev, mev->userdata);
+}
+
+/**
+ * Create and return a new mainloop_event_t to run the function <b>cb</b>.
+ *
+ * When run, the callback function will be passed the mainloop_event_t
+ * and <b>userdata</b> as its arguments.  The <b>userdata</b> pointer
+ * must remain valid for as long as the mainloop_event_t event exists:
+ * it is your responsibility to free it.
+ *
+ * The event is not scheduled by default: Use mainloop_event_activate()
+ * or mainloop_event_schedule() to make it run.
+ */
+mainloop_event_t *
+mainloop_event_new(void (*cb)(mainloop_event_t *, void *),
+                   void *userdata)
+{
+  tor_assert(cb);
+
+  struct event_base *base = tor_libevent_get_base();
+  mainloop_event_t *mev = tor_malloc_zero(sizeof(mainloop_event_t));
+  mev->ev = tor_event_new(base, -1, 0, mainloop_event_cb, mev);
+  tor_assert(mev->ev);
+  mev->cb = cb;
+  mev->userdata = userdata;
+  return mev;
+}
+
+/**
+ * Schedule <b>event</b> to run in the main loop, immediately.  If it is
+ * not scheduled, it will run anyway. If it is already scheduled to run
+ * later, it will run now instead.  This function will have no effect if
+ * the event is already scheduled to run.
+ *
+ * This function may only be called from the main thread.
+ */
+void
+mainloop_event_activate(mainloop_event_t *event)
+{
+  tor_assert(event);
+  event_active(event->ev, EV_READ, 1);
+}
+
+/** Schedule <b>event</b> to run in the main loop, after a delay of <b>tv</b>.
+ *
+ * If the event is scheduled for a different time, cancel it and run
+ * after this delay instead.  If the event is currently pending to run
+ * <em>now</b>, has no effect.
+ *
+ * Do not call this function with <b>tv</b> == NULL -- use
+ * mainloop_event_activate() instead.
+ *
+ * This function may only be called from the main thread.
+ */
+int
+mainloop_event_schedule(mainloop_event_t *event, const struct timeval *tv)
+{
+  tor_assert(event);
+  if (BUG(tv == NULL)) {
+    // LCOV_EXCL_START
+    mainloop_event_activate(event);
+    return 0;
+    // LCOV_EXCL_STOP
+  }
+  return event_add(event->ev, tv);
+}
+
+/** Cancel <b>event</b> if it is currently active or pending. (Do nothing if
+ * the event is not currently active or pending.) */
+void
+mainloop_event_cancel(mainloop_event_t *event)
+{
+  if (!event)
+    return;
+  event_del(event->ev);
+}
+
+/** Cancel <b>event</b> and release all storage associated with it. */
+void
+mainloop_event_free_(mainloop_event_t *event)
+{
+  if (!event)
+    return;
+  tor_event_free(event->ev);
+  memset(event, 0xb8, sizeof(*event));
+  tor_free(event);
+}
+
 int
 tor_init_libevent_rng(void)
 {
@@ -248,6 +363,38 @@ tor_libevent_free_all(void)
   the_event_base = NULL;
 }
 
+/**
+ * Run the event loop for the provided event_base, handling events until
+ * something stops it.  If <b>once</b> is set, then just poll-and-run
+ * once, then exit.  Return 0 on success, -1 if an error occurred, or 1
+ * if we exited because no events were pending or active.
+ *
+ * This isn't reentrant or multithreaded.
+ */
+int
+tor_libevent_run_event_loop(struct event_base *base, int once)
+{
+  const int flags = once ? EVLOOP_ONCE : 0;
+  return event_base_loop(base, flags);
+}
+
+/** Tell the event loop to exit after <b>delay</b>.  If <b>delay</b> is NULL,
+ * instead exit after we're done running the currently active events. */
+void
+tor_libevent_exit_loop_after_delay(struct event_base *base,
+                                   const struct timeval *delay)
+{
+  event_base_loopexit(base, delay);
+}
+
+/** Tell the event loop to exit after running whichever callback is currently
+ * active. */
+void
+tor_libevent_exit_loop_after_callback(struct event_base *base)
+{
+  event_base_loopbreak(base);
+}
+
 #if defined(LIBEVENT_VERSION_NUMBER) &&         \
   LIBEVENT_VERSION_NUMBER >= V(2,1,1) &&        \
   !defined(TOR_UNIT_TESTS)

+ 18 - 4
src/common/compat_libevent.h

@@ -7,8 +7,6 @@
 #include "orconfig.h"
 #include "testsupport.h"
 
-#include <event2/event.h>
-
 void configure_libevent_logging(void);
 void suppress_libevent_log_msg(const char *msg);
 
@@ -19,6 +17,9 @@ void suppress_libevent_log_msg(const char *msg);
   evdns_add_server_port_with_base(tor_libevent_get_base(), \
   (sock),(tcp),(cb),(data));
 
+struct event;
+struct event_base;
+
 void tor_event_free_(struct event *ev);
 #define tor_event_free(ev) \
   FREE_AND_NULL(struct event, tor_event_free_, (ev))
@@ -33,8 +34,16 @@ void periodic_timer_free_(periodic_timer_t *);
 #define periodic_timer_free(t) \
   FREE_AND_NULL(periodic_timer_t, periodic_timer_free_, (t))
 
-#define tor_event_base_loopexit event_base_loopexit
-#define tor_event_base_loopbreak event_base_loopbreak
+typedef struct mainloop_event_t mainloop_event_t;
+mainloop_event_t *mainloop_event_new(void (*cb)(mainloop_event_t *, void *),
+                                     void *userdata);
+void mainloop_event_activate(mainloop_event_t *event);
+int mainloop_event_schedule(mainloop_event_t *event,
+                            const struct timeval *delay);
+void mainloop_event_cancel(mainloop_event_t *event);
+void mainloop_event_free_(mainloop_event_t *event);
+#define mainloop_event_free(event) \
+  FREE_AND_NULL(mainloop_event_t, mainloop_event_free_, (event))
 
 /** Defines a configuration for using libevent with Tor: passed as an argument
  * to tor_libevent_initialize() to describe how we want to set up. */
@@ -63,6 +72,11 @@ void tor_gettimeofday_cache_set(const struct timeval *tv);
 void tor_libevent_postfork(void);
 #endif
 
+int tor_libevent_run_event_loop(struct event_base *base, int once);
+void tor_libevent_exit_loop_after_delay(struct event_base *base,
+                                        const struct timeval *delay);
+void tor_libevent_exit_loop_after_callback(struct event_base *base);
+
 #ifdef COMPAT_LIBEVENT_PRIVATE
 
 /** Macro: returns the number of a Libevent version as a 4-byte number,

+ 8 - 19
src/common/procmon.c

@@ -10,8 +10,6 @@
 
 #include "util.h"
 
-#include <event2/event.h>
-
 #ifdef HAVE_SIGNAL_H
 #include <signal.h>
 #endif
@@ -44,7 +42,7 @@ typedef int pid_t;
 /* Currently we need to poll in some way on all systems. */
 
 #ifdef PROCMON_POLLS
-static void tor_process_monitor_poll_cb(evutil_socket_t unused1, short unused2,
+static void tor_process_monitor_poll_cb(periodic_timer_t *ev,
                                         void *procmon_);
 #endif
 
@@ -136,7 +134,7 @@ struct tor_process_monitor_t {
 
   /** A Libevent event structure, to either poll for the process's
    * existence or receive a notification when the process ends. */
-  struct event *e;
+  periodic_timer_t *e;
 
   /** A callback to be called when the process ends. */
   tor_procmon_callback_t cb;
@@ -159,9 +157,6 @@ tor_validate_process_specifier(const char *process_spec,
   return parse_process_specifier(process_spec, &ppspec, msg);
 }
 
-/* XXXX we should use periodic_timer_new() for this stuff */
-#define PERIODIC_TIMER_FLAGS EV_PERSIST
-
 /* DOCDOC poll_interval_tv */
 static const struct timeval poll_interval_tv = {15, 0};
 
@@ -225,13 +220,9 @@ tor_process_monitor_new(struct event_base *base,
   procmon->cb_arg = cb_arg;
 
 #ifdef PROCMON_POLLS
-  procmon->e = tor_event_new(base, -1 /* no FD */, PERIODIC_TIMER_FLAGS,
-                             tor_process_monitor_poll_cb, procmon);
-  /* Note: If you port this file to plain Libevent 2, check that
-   * procmon->e is non-NULL.  We don't need to here because
-   * tor_evtimer_new never returns NULL. */
-
-  evtimer_add(procmon->e, &poll_interval_tv);
+  procmon->e = periodic_timer_new(base,
+                                  &poll_interval_tv,
+                                  tor_process_monitor_poll_cb, procmon);
 #else /* !(defined(PROCMON_POLLS)) */
 #error OOPS?
 #endif /* defined(PROCMON_POLLS) */
@@ -246,14 +237,12 @@ tor_process_monitor_new(struct event_base *base,
 /** Libevent callback to poll for the existence of the process
  * monitored by <b>procmon_</b>. */
 static void
-tor_process_monitor_poll_cb(evutil_socket_t unused1, short unused2,
-                            void *procmon_)
+tor_process_monitor_poll_cb(periodic_timer_t *event, void *procmon_)
 {
+  (void)event;
   tor_process_monitor_t *procmon = (tor_process_monitor_t *)(procmon_);
   int its_dead_jim;
 
-  (void)unused1; (void)unused2;
-
   tor_assert(procmon != NULL);
 
 #ifdef _WIN32
@@ -336,7 +325,7 @@ tor_process_monitor_free_(tor_process_monitor_t *procmon)
 #endif
 
   if (procmon->e != NULL)
-    tor_event_free(procmon->e);
+    periodic_timer_free(procmon->e);
 
   tor_free(procmon);
 }

+ 7 - 11
src/common/timers.c

@@ -37,8 +37,6 @@
 #include "torlog.h"
 #include "util.h"
 
-#include <event2/event.h>
-
 struct timeout_cb {
   timer_cb_fn_t cb;
   void *arg;
@@ -69,7 +67,7 @@ struct timeout_cb {
 #include "src/ext/timeouts/timeout.c"
 
 static struct timeouts *global_timeouts = NULL;
-static struct event *global_timer_event = NULL;
+static struct mainloop_event_t *global_timer_event = NULL;
 
 static monotime_t start_of_time;
 
@@ -147,7 +145,7 @@ libevent_timer_reschedule(void)
   if (delay > MIN_CHECK_TICKS)
     delay = MIN_CHECK_TICKS;
   timeout_to_tv(delay, &d);
-  event_add(global_timer_event, &d);
+  mainloop_event_schedule(global_timer_event, &d);
 }
 
 /** Run the callback of every timer that has expired, based on the current
@@ -170,10 +168,9 @@ timers_run_pending(void)
  * have fired, activate their callbacks, and reschedule the libevent timer.
  */
 static void
-libevent_timer_callback(evutil_socket_t fd, short what, void *arg)
+libevent_timer_callback(mainloop_event_t *ev, void *arg)
 {
-  (void)fd;
-  (void)what;
+  (void)ev;
   (void)arg;
 
   timers_run_pending();
@@ -203,9 +200,8 @@ timers_initialize(void)
   monotime_init();
   monotime_get(&start_of_time);
 
-  struct event *timer_event;
-  timer_event = tor_event_new(tor_libevent_get_base(),
-                              -1, 0, libevent_timer_callback, NULL);
+  mainloop_event_t *timer_event;
+  timer_event = mainloop_event_new(libevent_timer_callback, NULL);
   tor_assert(timer_event);
   global_timer_event = timer_event;
 
@@ -219,7 +215,7 @@ void
 timers_shutdown(void)
 {
   if (global_timer_event) {
-    tor_event_free(global_timer_event);
+    mainloop_event_free(global_timer_event);
     global_timer_event = NULL;
   }
   if (global_timeouts) {

+ 40 - 7
src/common/workqueue.c

@@ -1,3 +1,4 @@
+
 /* copyright (c) 2013-2015, The Tor Project, Inc. */
 /* See LICENSE for licensing information */
 
@@ -24,6 +25,7 @@
 
 #include "orconfig.h"
 #include "compat.h"
+#include "compat_libevent.h"
 #include "compat_threads.h"
 #include "crypto.h"
 #include "util.h"
@@ -31,6 +33,8 @@
 #include "tor_queue.h"
 #include "torlog.h"
 
+#include <event2/event.h>
+
 #define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
 #define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
 #define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
@@ -63,6 +67,9 @@ struct threadpool_s {
   void (*free_update_arg_fn)(void *);
   /** Array of n_threads update arguments. */
   void **update_args;
+  /** Event to notice when another thread has sent a reply. */
+  struct event *reply_event;
+  void (*reply_cb)(threadpool_t *);
 
   /** Number of elements in threads. */
   int n_threads;
@@ -597,15 +604,41 @@ replyqueue_new(uint32_t alertsocks_flags)
   return rq;
 }
 
-/**
- * Return the "read socket" for a given reply queue.  The main thread should
- * listen for read events on this socket, and call replyqueue_process() every
- * time it triggers.
+/** Internal: Run from the libevent mainloop when there is work to handle in
+ * the reply queue handler. */
+static void
+reply_event_cb(evutil_socket_t sock, short events, void *arg)
+{
+  threadpool_t *tp = arg;
+  (void) sock;
+  (void) events;
+  replyqueue_process(tp->reply_queue);
+  if (tp->reply_cb)
+    tp->reply_cb(tp);
+}
+
+/** Register the threadpool <b>tp</b>'s reply queue with the libevent
+ * mainloop of <b>base</b>. If <b>tp</b> is provided, it is run after
+ * each time there is work to process from the reply queue. Return 0 on
+ * success, -1 on failure.
  */
-tor_socket_t
-replyqueue_get_socket(replyqueue_t *rq)
+int
+threadpool_register_reply_event(threadpool_t *tp,
+                                void (*cb)(threadpool_t *tp))
 {
-  return rq->alert.read_fd;
+  struct event_base *base = tor_libevent_get_base();
+
+  if (tp->reply_event) {
+    tor_event_free(tp->reply_event);
+  }
+  tp->reply_event = tor_event_new(base,
+                                  tp->reply_queue->alert.read_fd,
+                                  EV_READ|EV_PERSIST,
+                                  reply_event_cb,
+                                  tp);
+  tor_assert(tp->reply_event);
+  tp->reply_cb = cb;
+  return event_add(tp->reply_event, NULL);
 }
 
 /**

+ 4 - 1
src/common/workqueue.h

@@ -56,8 +56,11 @@ threadpool_t *threadpool_new(int n_threads,
 replyqueue_t *threadpool_get_replyqueue(threadpool_t *tp);
 
 replyqueue_t *replyqueue_new(uint32_t alertsocks_flags);
-tor_socket_t replyqueue_get_socket(replyqueue_t *rq);
 void replyqueue_process(replyqueue_t *queue);
 
+struct event_base;
+int threadpool_register_reply_event(threadpool_t *tp,
+                                    void (*cb)(threadpool_t *tp));
+
 #endif /* !defined(TOR_WORKQUEUE_H) */
 

+ 0 - 1
src/or/channelpadding.c

@@ -20,7 +20,6 @@
 #include "rephist.h"
 #include "router.h"
 #include "compat_time.h"
-#include <event2/event.h>
 #include "rendservice.h"
 
 STATIC int32_t channelpadding_get_netflow_inactive_timeout_ms(

+ 8 - 12
src/or/control.c

@@ -83,8 +83,6 @@
 #include <sys/resource.h>
 #endif
 
-#include <event2/event.h>
-
 #include "crypto_s2k.h"
 #include "procmon.h"
 
@@ -216,7 +214,7 @@ static void orconn_target_get_name(char *buf, size_t len,
 static int get_cached_network_liveness(void);
 static void set_cached_network_liveness(int liveness);
 
-static void flush_queued_events_cb(evutil_socket_t fd, short what, void *arg);
+static void flush_queued_events_cb(mainloop_event_t *event, void *arg);
 
 static char * download_status_to_string(const download_status_t *dl);
 
@@ -691,7 +689,7 @@ static tor_mutex_t *queued_control_events_lock = NULL;
 
 /** An event that should fire in order to flush the contents of
  * queued_control_events. */
-static struct event *flush_queued_events_event = NULL;
+static mainloop_event_t *flush_queued_events_event = NULL;
 
 void
 control_initialize_event_queue(void)
@@ -703,9 +701,8 @@ control_initialize_event_queue(void)
   if (flush_queued_events_event == NULL) {
     struct event_base *b = tor_libevent_get_base();
     if (b) {
-      flush_queued_events_event = tor_event_new(b,
-                                              -1, 0, flush_queued_events_cb,
-                                              NULL);
+      flush_queued_events_event =
+        mainloop_event_new(flush_queued_events_cb, NULL);
       tor_assert(flush_queued_events_event);
     }
   }
@@ -781,7 +778,7 @@ queue_control_event_string,(uint16_t event, char *msg))
    */
   if (activate_event) {
     tor_assert(flush_queued_events_event);
-    event_active(flush_queued_events_event, EV_READ, 1);
+    mainloop_event_activate(flush_queued_events_event);
   }
 }
 
@@ -863,10 +860,9 @@ queued_events_flush_all(int force)
 /** Libevent callback: Flushes pending events to controllers that are
  * interested in them. */
 static void
-flush_queued_events_cb(evutil_socket_t fd, short what, void *arg)
+flush_queued_events_cb(mainloop_event_t *event, void *arg)
 {
-  (void) fd;
-  (void) what;
+  (void) event;
   (void) arg;
   queued_events_flush_all(0);
 }
@@ -7608,7 +7604,7 @@ control_free_all(void)
     smartlist_free(queued_events);
   }
   if (flush_queued_events_event) {
-    tor_event_free(flush_queued_events_event);
+    mainloop_event_free(flush_queued_events_event);
     flush_queued_events_event = NULL;
   }
   bootstrap_percent = BOOTSTRAP_STATUS_UNDEF;

+ 5 - 20
src/or/cpuworker.c

@@ -30,8 +30,6 @@
 #include "router.h"
 #include "workqueue.h"
 
-#include <event2/event.h>
-
 static void queue_pending_tasks(void);
 
 typedef struct worker_state_s {
@@ -69,22 +67,12 @@ worker_state_free_void(void *arg)
 
 static replyqueue_t *replyqueue = NULL;
 static threadpool_t *threadpool = NULL;
-static struct event *reply_event = NULL;
 
 static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
 
 static int total_pending_tasks = 0;
 static int max_pending_tasks = 128;
 
-static void
-replyqueue_process_cb(evutil_socket_t sock, short events, void *arg)
-{
-  replyqueue_t *rq = arg;
-  (void) sock;
-  (void) events;
-  replyqueue_process(rq);
-}
-
 /** Initialize the cpuworker subsystem. It is OK to call this more than once
  * during Tor's lifetime.
  */
@@ -94,14 +82,6 @@ cpu_init(void)
   if (!replyqueue) {
     replyqueue = replyqueue_new(0);
   }
-  if (!reply_event) {
-    reply_event = tor_event_new(tor_libevent_get_base(),
-                                replyqueue_get_socket(replyqueue),
-                                EV_READ|EV_PERSIST,
-                                replyqueue_process_cb,
-                                replyqueue);
-    event_add(reply_event, NULL);
-  }
   if (!threadpool) {
     /*
       In our threadpool implementation, half the threads are permissive and
@@ -115,7 +95,12 @@ cpu_init(void)
                                 worker_state_new,
                                 worker_state_free_void,
                                 NULL);
+
+    int r = threadpool_register_reply_event(threadpool, NULL);
+
+    tor_assert(r == 0);
   }
+
   /* Total voodoo. Can we make this more sensible? */
   max_pending_tasks = get_num_cpus(get_options()) * 64;
   crypto_seed_weak_rng(&request_sample_rng);

+ 11 - 11
src/or/main.c

@@ -719,7 +719,7 @@ tell_event_loop_to_run_external_code(void)
 {
   if (!called_loop_once) {
     struct timeval tv = { 0, 0 };
-    tor_event_base_loopexit(tor_libevent_get_base(), &tv);
+    tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), &tv);
     called_loop_once = 1; /* hack to avoid adding more exit events */
   }
 }
@@ -779,8 +779,9 @@ tor_shutdown_event_loop_and_exit(int exitcode)
                   shutdown_did_not_work_callback, NULL);
   event_add(shutdown_did_not_work_event, &ten_seconds);
 
-  /* Unlike loopexit, loopbreak prevents other callbacks from running. */
-  tor_event_base_loopbreak(tor_libevent_get_base());
+  /* Unlike exit_loop_after_delay(), exit_loop_after_callback
+   * prevents other callbacks from running. */
+  tor_libevent_exit_loop_after_callback(tor_libevent_get_base());
 }
 
 /** Return true iff tor_shutdown_event_loop_and_exit() has been called. */
@@ -1060,9 +1061,8 @@ conn_close_if_marked(int i)
  * reason.
  */
 static void
-directory_all_unreachable_cb(evutil_socket_t fd, short event, void *arg)
+directory_all_unreachable_cb(mainloop_event_t *event, void *arg)
 {
-  (void)fd;
   (void)event;
   (void)arg;
 
@@ -1082,7 +1082,7 @@ directory_all_unreachable_cb(evutil_socket_t fd, short event, void *arg)
   control_event_general_error("DIR_ALL_UNREACHABLE");
 }
 
-static struct event *directory_all_unreachable_cb_event = NULL;
+static mainloop_event_t *directory_all_unreachable_cb_event = NULL;
 
 /** We've just tried every dirserver we know about, and none of
  * them were reachable. Assume the network is down. Change state
@@ -1099,12 +1099,11 @@ directory_all_unreachable(time_t now)
 
   if (!directory_all_unreachable_cb_event) {
     directory_all_unreachable_cb_event =
-      tor_event_new(tor_libevent_get_base(),
-                    -1, EV_READ, directory_all_unreachable_cb, NULL);
+      mainloop_event_new(directory_all_unreachable_cb, NULL);
     tor_assert(directory_all_unreachable_cb_event);
   }
 
-  event_active(directory_all_unreachable_cb_event, EV_READ, 1);
+  mainloop_event_activate(directory_all_unreachable_cb_event);
 }
 
 /** This function is called whenever we successfully pull down some new
@@ -2833,8 +2832,8 @@ run_main_loop_once(void)
    * an event, or the second ends, or until we have some active linked
    * connections to trigger events for.  Libevent will wait till one
    * of these happens, then run all the appropriate callbacks. */
-  loop_result = event_base_loop(tor_libevent_get_base(),
-                                called_loop_once ? EVLOOP_ONCE : 0);
+  loop_result = tor_libevent_run_event_loop(tor_libevent_get_base(),
+                                            called_loop_once);
 
   if (get_options()->MainloopStats) {
     /* Update our main loop counters. */
@@ -3525,6 +3524,7 @@ tor_free_all(int postfork)
   periodic_timer_free(refill_timer);
   tor_event_free(shutdown_did_not_work_event);
   tor_event_free(initialize_periodic_events_event);
+  mainloop_event_free(directory_all_unreachable_cb_event);
 
 #ifdef HAVE_SYSTEMD_209
   periodic_timer_free(systemd_watchdog_timer);

+ 2 - 3
src/or/ntmain.c

@@ -24,8 +24,6 @@
 #include "main.h"
 #include "ntmain.h"
 
-#include <event2/event.h>
-
 #include <windows.h>
 #define GENSRV_SERVICENAME  "tor"
 #define GENSRV_DISPLAYNAME  "Tor Win32 Service"
@@ -245,7 +243,8 @@ nt_service_control(DWORD request)
           log_notice(LD_GENERAL,
                      "Got stop/shutdown request; shutting down cleanly.");
           service_status.dwCurrentState = SERVICE_STOP_PENDING;
-          event_base_loopexit(tor_libevent_get_base(), &exit_now);
+          tor_libevent_exit_loop_after_delay(tor_libevent_get_base(),
+                                             &exit_now);
           return;
   }
   service_fns.SetServiceStatus_fn(hStatus, &service_status);

+ 8 - 13
src/or/periodic.c

@@ -16,8 +16,6 @@
 #include "config.h"
 #include "periodic.h"
 
-#include <event2/event.h>
-
 /** We disable any interval greater than this number of seconds, on the
  * grounds that it is probably an absolute time mistakenly passed in as a
  * relative time.
@@ -34,17 +32,16 @@ periodic_event_set_interval(periodic_event_item_t *event,
   struct timeval tv;
   tv.tv_sec = next_interval;
   tv.tv_usec = 0;
-  event_add(event->ev, &tv);
+  mainloop_event_schedule(event->ev, &tv);
 }
 
 /** Wraps dispatches for periodic events, <b>data</b> will be a pointer to the
  * event that needs to be called */
 static void
-periodic_event_dispatch(evutil_socket_t fd, short what, void *data)
+periodic_event_dispatch(mainloop_event_t *ev, void *data)
 {
-  (void)fd;
-  (void)what;
   periodic_event_item_t *event = data;
+  tor_assert(ev == event->ev);
 
   time_t now = time(NULL);
   const or_options_t *options = get_options();
@@ -74,7 +71,7 @@ periodic_event_dispatch(evutil_socket_t fd, short what, void *data)
 //  log_debug(LD_GENERAL, "Scheduling %s for %d seconds", event->name,
 //           next_interval);
   struct timeval tv = { next_interval , 0 };
-  event_add(event->ev, &tv);
+  mainloop_event_schedule(ev, &tv);
 }
 
 /** Schedules <b>event</b> to run as soon as possible from now. */
@@ -93,10 +90,8 @@ periodic_event_setup(periodic_event_item_t *event)
     tor_assert(0);
   }
 
-  event->ev = tor_event_new(tor_libevent_get_base(),
-                            -1, 0,
-                            periodic_event_dispatch,
-                            event);
+  event->ev = mainloop_event_new(periodic_event_dispatch,
+                                 event);
   tor_assert(event->ev);
 }
 
@@ -111,7 +106,7 @@ periodic_event_launch(periodic_event_item_t *event)
   }
 
   // Initial dispatch
-  periodic_event_dispatch(-1, EV_TIMEOUT, event);
+  periodic_event_dispatch(event->ev, event);
 }
 
 /** Release all storage associated with <b>event</b> */
@@ -120,7 +115,7 @@ periodic_event_destroy(periodic_event_item_t *event)
 {
   if (!event)
     return;
-  tor_event_free(event->ev);
+  mainloop_event_free(event->ev);
   event->last_action_time = 0;
 }
 

+ 3 - 2
src/or/periodic.h

@@ -14,13 +14,14 @@
 typedef int (*periodic_event_helper_t)(time_t now,
                                       const or_options_t *options);
 
-struct event;
+struct mainloop_event_t;
 
 /** A single item for the periodic-events-function table. */
 typedef struct periodic_event_item_t {
   periodic_event_helper_t fn; /**< The function to run the event */
   time_t last_action_time; /**< The last time the function did something */
-  struct event *ev; /**< Libevent callback we're using to implement this */
+  struct mainloop_event_t *ev; /**< Libevent callback we're using to implement
+                                * this */
   const char *name; /**< Name of the function -- for debug */
 } periodic_event_item_t;
 

+ 9 - 16
src/or/scheduler.c

@@ -13,8 +13,6 @@
 #define TOR_CHANNEL_INTERNAL_
 #include "channeltls.h"
 
-#include <event2/event.h>
-
 /**
  * \file scheduler.c
  * \brief Channel scheduling system: decides which channels should send and
@@ -169,7 +167,7 @@ STATIC smartlist_t *channels_pending = NULL;
  * This event runs the scheduler from its callback, and is manually
  * activated whenever a channel enters open for writes/cells to send.
  */
-STATIC struct event *run_sched_ev = NULL;
+STATIC struct mainloop_event_t *run_sched_ev = NULL;
 
 static int have_logged_kist_suddenly_disabled = 0;
 
@@ -203,10 +201,9 @@ get_scheduler_type_string(scheduler_types_t type)
  * if any scheduling work was created during the event loop.
  */
 static void
-scheduler_evt_callback(evutil_socket_t fd, short events, void *arg)
+scheduler_evt_callback(mainloop_event_t *event, void *arg)
 {
-  (void) fd;
-  (void) events;
+  (void) event;
   (void) arg;
 
   log_debug(LD_SCHED, "Scheduler event callback called");
@@ -487,10 +484,7 @@ scheduler_free_all(void)
   log_debug(LD_SCHED, "Shutting down scheduler");
 
   if (run_sched_ev) {
-    if (event_del(run_sched_ev) < 0) {
-      log_warn(LD_BUG, "Problem deleting run_sched_ev");
-    }
-    tor_event_free(run_sched_ev);
+    mainloop_event_free(run_sched_ev);
     run_sched_ev = NULL;
   }
 
@@ -589,7 +583,7 @@ scheduler_ev_add(const struct timeval *next_run)
 {
   tor_assert(run_sched_ev);
   tor_assert(next_run);
-  if (BUG(event_add(run_sched_ev, next_run) < 0)) {
+  if (BUG(mainloop_event_schedule(run_sched_ev, next_run) < 0)) {
     log_warn(LD_SCHED, "Adding to libevent failed. Next run time was set to: "
                        "%ld.%06ld", next_run->tv_sec, (long)next_run->tv_usec);
     return;
@@ -598,10 +592,10 @@ scheduler_ev_add(const struct timeval *next_run)
 
 /** Make the scheduler event active with the given flags. */
 void
-scheduler_ev_active(int flags)
+scheduler_ev_active(void)
 {
   tor_assert(run_sched_ev);
-  event_active(run_sched_ev, flags, 1);
+  mainloop_event_activate(run_sched_ev);
 }
 
 /*
@@ -618,11 +612,10 @@ scheduler_init(void)
   IF_BUG_ONCE(!!run_sched_ev) {
     log_warn(LD_SCHED, "We should not already have a libevent scheduler event."
              "I'll clean the old one up, but this is odd.");
-    tor_event_free(run_sched_ev);
+    mainloop_event_free(run_sched_ev);
     run_sched_ev = NULL;
   }
-  run_sched_ev = tor_event_new(tor_libevent_get_base(), -1,
-                               0, scheduler_evt_callback, NULL);
+  run_sched_ev = mainloop_event_new(scheduler_evt_callback, NULL);
   channels_pending = smartlist_new();
 
   set_scheduler();

+ 2 - 2
src/or/scheduler.h

@@ -155,12 +155,12 @@ void scheduler_bug_occurred(const channel_t *chan);
 smartlist_t *get_channels_pending(void);
 MOCK_DECL(int, scheduler_compare_channels,
           (const void *c1_v, const void *c2_v));
-void scheduler_ev_active(int flags);
+void scheduler_ev_active(void);
 void scheduler_ev_add(const struct timeval *next_run);
 
 #ifdef TOR_UNIT_TESTS
 extern smartlist_t *channels_pending;
-extern struct event *run_sched_ev;
+extern struct mainloop_event_t *run_sched_ev;
 extern const scheduler_t *the_scheduler;
 void scheduler_touch_channel(channel_t *chan);
 #endif /* defined(TOR_UNIT_TESTS) */

+ 1 - 3
src/or/scheduler_kist.c

@@ -3,8 +3,6 @@
 
 #define SCHEDULER_KIST_PRIVATE
 
-#include <event2/event.h>
-
 #include "or.h"
 #include "buffers.h"
 #include "config.h"
@@ -553,7 +551,7 @@ kist_scheduler_schedule(void)
     /* Re-adding an event reschedules it. It does not duplicate it. */
     scheduler_ev_add(&next_run);
   } else {
-    scheduler_ev_active(EV_TIMEOUT);
+    scheduler_ev_active();
   }
 }
 

+ 1 - 3
src/or/scheduler_vanilla.c

@@ -1,8 +1,6 @@
 /* Copyright (c) 2017, The Tor Project, Inc. */
 /* See LICENSE for licensing information */
 
-#include <event2/event.h>
-
 #include "or.h"
 #include "config.h"
 #define TOR_CHANNEL_INTERNAL_
@@ -42,7 +40,7 @@ vanilla_scheduler_schedule(void)
   }
 
   /* Activate our event so it can process channels. */
-  scheduler_ev_active(EV_TIMEOUT);
+  scheduler_ev_active();
 }
 
 static void

+ 2 - 4
src/test/test-timers.c

@@ -7,8 +7,6 @@
 #include <stdio.h>
 #include <string.h>
 
-#include <event2/event.h>
-
 #include "compat.h"
 #include "compat_libevent.h"
 #include "crypto.h"
@@ -50,7 +48,7 @@ timer_cb(tor_timer_t *t, void *arg, const monotime_t *now_mono)
 
   // printf("%d / %d\n",n_fired, N_TIMERS);
   if (n_fired == n_active_timers) {
-    event_base_loopbreak(tor_libevent_get_base());
+    tor_libevent_exit_loop_after_callback(tor_libevent_get_base());
   }
 }
 
@@ -90,7 +88,7 @@ main(int argc, char **argv)
     --n_active_timers;
   }
 
-  event_base_loop(tor_libevent_get_base(), 0);
+  tor_libevent_run_event_loop(tor_libevent_get_base(), 0);
 
   int64_t total_difference = 0;
   uint64_t total_square_difference = 0;

+ 8 - 8
src/test/test_channelpadding.c

@@ -15,7 +15,6 @@
 #include "channelpadding.h"
 #include "compat_libevent.h"
 #include "config.h"
-#include <event2/event.h>
 #include "compat_time.h"
 #include "main.h"
 #include "networkstatus.h"
@@ -65,7 +64,7 @@ mock_channel_write_cell_relay2(channel_t *chan, cell_t *cell)
   (void)chan;
   tried_to_write_cell++;
   channel_tls_handle_cell(cell, ((channel_tls_t*)relay1_relay2)->conn);
-  event_base_loopbreak(tor_libevent_get_base());
+  tor_libevent_exit_loop_after_callback(tor_libevent_get_base());
   return 0;
 }
 
@@ -75,7 +74,7 @@ mock_channel_write_cell_relay1(channel_t *chan, cell_t *cell)
   (void)chan;
   tried_to_write_cell++;
   channel_tls_handle_cell(cell, ((channel_tls_t*)relay2_relay1)->conn);
-  event_base_loopbreak(tor_libevent_get_base());
+  tor_libevent_exit_loop_after_callback(tor_libevent_get_base());
   return 0;
 }
 
@@ -85,7 +84,7 @@ mock_channel_write_cell_relay3(channel_t *chan, cell_t *cell)
   (void)chan;
   tried_to_write_cell++;
   channel_tls_handle_cell(cell, ((channel_tls_t*)client_relay3)->conn);
-  event_base_loopbreak(tor_libevent_get_base());
+  tor_libevent_exit_loop_after_callback(tor_libevent_get_base());
   return 0;
 }
 
@@ -95,7 +94,7 @@ mock_channel_write_cell_client(channel_t *chan, cell_t *cell)
   (void)chan;
   tried_to_write_cell++;
   channel_tls_handle_cell(cell, ((channel_tls_t*)relay3_client)->conn);
-  event_base_loopbreak(tor_libevent_get_base());
+  tor_libevent_exit_loop_after_callback(tor_libevent_get_base());
   return 0;
 }
 
@@ -105,7 +104,7 @@ mock_channel_write_cell(channel_t *chan, cell_t *cell)
   tried_to_write_cell++;
   channel_tls_handle_cell(cell, ((channel_tls_t*)chan)->conn);
   if (!dont_stop_libevent)
-    event_base_loopbreak(tor_libevent_get_base());
+    tor_libevent_exit_loop_after_callback(tor_libevent_get_base());
   return 0;
 }
 
@@ -246,7 +245,7 @@ static void
 dummy_timer_cb(tor_timer_t *t, void *arg, const monotime_t *now_mono)
 {
   (void)t; (void)arg; (void)now_mono;
-  event_base_loopbreak(tor_libevent_get_base());
+  tor_libevent_exit_loop_after_callback(tor_libevent_get_base());
   return;
 }
 
@@ -264,7 +263,8 @@ dummy_nop_timer(void)
 
   timer_schedule(dummy_timer, &timeout);
 
-  event_base_loop(tor_libevent_get_base(), 0);
+  tor_libevent_run_event_loop(tor_libevent_get_base(), 0);
+
   timer_free(dummy_timer);
 }
 

+ 0 - 1
src/test/test_compat_libevent.c

@@ -10,7 +10,6 @@
 #include "compat_libevent.h"
 
 #include <event2/event.h>
-#include <event2/thread.h>
 
 #include "log_test_helpers.h"
 

+ 1 - 1
src/test/test_helpers.c

@@ -155,7 +155,7 @@ mock_tor_addr_lookup__fail_on_bad_addrs(const char *name,
 
 /* Helper for test_conn_get_connection() */
 static int
-fake_close_socket(evutil_socket_t sock)
+fake_close_socket(tor_socket_t sock)
 {
   (void)sock;
   return 0;

+ 2 - 73
src/test/test_scheduler.c

@@ -4,7 +4,6 @@
 #include "orconfig.h"
 
 #include <math.h>
-#include <event2/event.h>
 
 #define SCHEDULER_KIST_PRIVATE
 #define TOR_CHANNEL_INTERNAL_
@@ -101,62 +100,6 @@ mock_kist_networkstatus_get_param(
   return 12;
 }
 
-/* Event base for scheduelr tests */
-static struct event_base *mock_event_base = NULL;
-/* Setup for mock event stuff */
-static void mock_event_free_all(void);
-static void mock_event_init(void);
-static void
-mock_event_free_all(void)
-{
-  tt_ptr_op(mock_event_base, OP_NE, NULL);
-
-  if (mock_event_base) {
-    event_base_free(mock_event_base);
-    mock_event_base = NULL;
-  }
-
-  tt_ptr_op(mock_event_base, OP_EQ, NULL);
-
- done:
-  return;
-}
-
-static void
-mock_event_init(void)
-{
-  struct event_config *cfg = NULL;
-
-  tt_ptr_op(mock_event_base, OP_EQ, NULL);
-
-  /*
-   * Really cut down from tor_libevent_initialize of
-   * src/common/compat_libevent.c to kill config dependencies
-   */
-
-  if (!mock_event_base) {
-    cfg = event_config_new();
-#if LIBEVENT_VERSION_NUMBER >= V(2,0,9)
-    /* We can enable changelist support with epoll, since we don't give
-     * Libevent any dup'd fds.  This lets us avoid some syscalls. */
-    event_config_set_flag(cfg, EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST);
-#endif
-    mock_event_base = event_base_new_with_config(cfg);
-    event_config_free(cfg);
-  }
-
-  tt_ptr_op(mock_event_base, OP_NE, NULL);
-
- done:
-  return;
-}
-
-static struct event_base *
-tor_libevent_get_base_mock(void)
-{
-  return mock_event_base;
-}
-
 static int
 scheduler_compare_channels_mock(const void *c1_v,
                                 const void *c2_v)
@@ -417,9 +360,7 @@ perform_channel_state_tests(int KISTSchedRunInterval, int sched_type)
   mocked_options.KISTSchedRunInterval = KISTSchedRunInterval;
   set_scheduler_options(sched_type);
 
-  /* Set up libevent and scheduler */
-  mock_event_init();
-  MOCK(tor_libevent_get_base, tor_libevent_get_base_mock);
+  /* Set up scheduler */
   scheduler_init();
   /*
    * Install the compare channels mock so we can test
@@ -523,14 +464,12 @@ perform_channel_state_tests(int KISTSchedRunInterval, int sched_type)
 
   channel_free_all();
   scheduler_free_all();
-  mock_event_free_all();
 
  done:
   tor_free(ch1);
   tor_free(ch2);
 
   UNMOCK(scheduler_compare_channels);
-  UNMOCK(tor_libevent_get_base);
   UNMOCK(get_options);
   cleanup_scheduler_options();
 
@@ -635,10 +574,7 @@ test_scheduler_loop_vanilla(void *arg)
   set_scheduler_options(SCHEDULER_VANILLA);
   mocked_options.KISTSchedRunInterval = 0;
 
-  /* Set up libevent and scheduler */
-
-  mock_event_init();
-  MOCK(tor_libevent_get_base, tor_libevent_get_base_mock);
+  /* Set up scheduler */
   scheduler_init();
   /*
    * Install the compare channels mock so we can test
@@ -786,7 +722,6 @@ test_scheduler_loop_vanilla(void *arg)
   channel_flush_some_cells_mock_free_all();
   channel_free_all();
   scheduler_free_all();
-  mock_event_free_all();
 
  done:
   tor_free(ch1);
@@ -795,7 +730,6 @@ test_scheduler_loop_vanilla(void *arg)
 
   UNMOCK(channel_flush_some_cells);
   UNMOCK(scheduler_compare_channels);
-  UNMOCK(tor_libevent_get_base);
   UNMOCK(get_options);
 }
 
@@ -917,8 +851,6 @@ test_scheduler_initfree(void *arg)
   tt_ptr_op(channels_pending, ==, NULL);
   tt_ptr_op(run_sched_ev, ==, NULL);
 
-  mock_event_init();
-  MOCK(tor_libevent_get_base, tor_libevent_get_base_mock);
   MOCK(get_options, mock_get_options);
   set_scheduler_options(SCHEDULER_KIST);
   set_scheduler_options(SCHEDULER_KIST_LITE);
@@ -935,9 +867,6 @@ test_scheduler_initfree(void *arg)
 
   scheduler_free_all();
 
-  UNMOCK(tor_libevent_get_base);
-  mock_event_free_all();
-
   tt_ptr_op(channels_pending, ==, NULL);
   tt_ptr_op(run_sched_ev, ==, NULL);
 

+ 14 - 21
src/test/test_workqueue.c

@@ -12,7 +12,6 @@
 #include "compat_libevent.h"
 
 #include <stdio.h>
-#include <event2/event.h>
 
 #define MAX_INFLIGHT (1<<16)
 
@@ -159,6 +158,7 @@ static tor_weak_rng_t weak_rng;
 static int n_sent = 0;
 static int rsa_sent = 0;
 static int ecdh_sent = 0;
+static int n_received_previously = 0;
 static int n_received = 0;
 static int no_shutdown = 0;
 
@@ -230,7 +230,7 @@ add_n_work_items(threadpool_t *tp, int n)
     ent = add_work(tp);
     if (! ent) {
       puts("Z");
-      tor_event_base_loopexit(tor_libevent_get_base(), NULL);
+      tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), NULL);
       return -1;
     }
     if (n_try_cancel < opt_n_cancel &&
@@ -256,19 +256,13 @@ add_n_work_items(threadpool_t *tp, int n)
 static int shutting_down = 0;
 
 static void
-replysock_readable_cb(tor_socket_t sock, short what, void *arg)
+replysock_readable_cb(threadpool_t *tp)
 {
-  threadpool_t *tp = arg;
-  replyqueue_t *rq = threadpool_get_replyqueue(tp);
-
-  int old_r = n_received;
-  (void) sock;
-  (void) what;
-
-  replyqueue_process(rq);
-  if (old_r == n_received)
+  if (n_received_previously == n_received)
     return;
 
+  n_received_previously = n_received;
+
   if (opt_verbose) {
     printf("%d / %d", n_received, n_sent);
     if (opt_n_cancel)
@@ -308,7 +302,7 @@ replysock_readable_cb(tor_socket_t sock, short what, void *arg)
                           handle_reply_shutdown, NULL);
     {
       struct timeval limit = { 2, 0 };
-      tor_event_base_loopexit(tor_libevent_get_base(), &limit);
+      tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), &limit);
     }
   }
 }
@@ -337,7 +331,6 @@ main(int argc, char **argv)
   threadpool_t *tp;
   int i;
   tor_libevent_cfg evcfg;
-  struct event *ev;
   uint32_t as_flags = 0;
 
   for (i = 1; i < argc; ++i) {
@@ -411,11 +404,11 @@ main(int argc, char **argv)
   memset(&evcfg, 0, sizeof(evcfg));
   tor_libevent_initialize(&evcfg);
 
-  ev = tor_event_new(tor_libevent_get_base(),
-                     replyqueue_get_socket(rq), EV_READ|EV_PERSIST,
-                     replysock_readable_cb, tp);
-
-  event_add(ev, NULL);
+  {
+    int r = threadpool_register_reply_event(tp,
+                                            replysock_readable_cb);
+    tor_assert(r == 0);
+  }
 
 #ifdef TRACK_RESPONSES
   handled = bitarray_init_zero(opt_n_items);
@@ -433,10 +426,10 @@ main(int argc, char **argv)
 
   {
     struct timeval limit = { 180, 0 };
-    tor_event_base_loopexit(tor_libevent_get_base(), &limit);
+    tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), &limit);
   }
 
-  event_base_loop(tor_libevent_get_base(), 0);
+  tor_libevent_run_event_loop(tor_libevent_get_base(), 0);
 
   if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent) {
     printf("%d vs %d\n", n_sent, opt_n_items);