Browse Source

Uncoupled the thread pool from the reply queue

Rather than giving a single reply queue to a thread pool, a reply
queue is given for each work entry added to the thread pool. This
allows each reply to be sent to a different reply queue, possibly
on a different event loop.
Steven Engler 4 years ago
parent
commit
8a3598b799
4 changed files with 95 additions and 76 deletions
  1. 10 5
      src/core/mainloop/cpuworker.c
  2. 57 49
      src/lib/evloop/workqueue.c
  3. 10 5
      src/lib/evloop/workqueue.h
  4. 18 17
      src/test/test_workqueue.c

+ 10 - 5
src/core/mainloop/cpuworker.c

@@ -33,6 +33,7 @@
 #include "lib/evloop/workqueue.h"
 #include "core/crypto/onion_crypto.h"
 #include "app/main/tor_threads.h"
+#include "lib/evloop/compat_libevent.h"
 
 #include "core/or/or_circuit_st.h"
 
@@ -83,9 +84,6 @@ static int max_pending_tasks = 128;
 void
 cpu_init(void)
 {
-  if (!replyqueue) {
-    replyqueue = replyqueue_new(0);
-  }
   if (!threadpool) {
     /*
       In our threadpool implementation, half the threads are permissive and
@@ -95,13 +93,16 @@ cpu_init(void)
     */
     const int n_threads = get_num_cpus(get_options()) + 1;
     threadpool = threadpool_new(n_threads,
-                                replyqueue,
                                 worker_state_new,
                                 worker_state_free_void,
                                 NULL,
                                 start_tor_thread);
+  }
+  if (!replyqueue) {
+    replyqueue = replyqueue_new(0, threadpool);
 
-    int r = threadpool_register_reply_event(threadpool, NULL);
+    struct event_base *base = tor_libevent_get_base();
+    int r = replyqueue_register_reply_event(replyqueue, base);
 
     tor_assert(r == 0);
   }
@@ -498,11 +499,13 @@ cpuworker_queue_work,(workqueue_priority_t priority,
                       void *arg))
 {
   tor_assert(threadpool);
+  tor_assert(replyqueue);
 
   return threadpool_queue_work_priority(threadpool,
                                         priority,
                                         fn,
                                         reply_fn,
+                                        replyqueue,
                                         arg);
 }
 
@@ -521,6 +524,7 @@ assign_onionskin_to_cpuworker(or_circuit_t *circ,
   int should_time;
 
   tor_assert(threadpool);
+  tor_assert(replyqueue);
 
   if (!circ->p_chan) {
     log_info(LD_OR,"circ->p_chan gone. Failing circ.");
@@ -562,6 +566,7 @@ assign_onionskin_to_cpuworker(or_circuit_t *circ,
                                       WQ_PRI_HIGH,
                                       cpuworker_onion_handshake_threadfn,
                                       cpuworker_onion_handshake_replyfn,
+                                      replyqueue,
                                       job);
   if (!queue_entry) {
     log_warn(LD_BUG, "Couldn't queue work on threadpool");

+ 57 - 49
src/lib/evloop/workqueue.c

@@ -69,18 +69,14 @@ struct threadpool_s {
   void (*free_update_arg_fn)(void *);
   /** Array of n_threads update arguments. */
   void **update_args;
-  /** Event to notice when another thread has sent a reply. */
-  struct event *reply_event;
-  void (*reply_cb)(threadpool_t *);
+  /** Callback that is run after a reply queue has processed work. */
+  void (*reply_cb)(threadpool_t *, replyqueue_t *);
 
   /** Number of elements in threads. */
   int n_threads;
   /** Mutex to protect all the above fields. */
   tor_mutex_t lock;
 
-  /** A reply queue to use when constructing new threads. */
-  replyqueue_t *reply_queue;
-
   /** Functions used to allocate and free thread state. */
   void *(*new_thread_state_fn)(void*);
   void (*free_thread_state_fn)(void*);
@@ -111,6 +107,8 @@ struct workqueue_entry_s {
   workqueue_reply_t (*fn)(void *state, void *arg);
   /** Function to run while processing the reply queue. */
   void (*reply_fn)(void *arg);
+  /** Linked reply queue */
+  replyqueue_t *reply_queue;
   /** Argument for the above functions. */
   void *arg;
 };
@@ -123,6 +121,11 @@ struct replyqueue_s {
 
   /** Mechanism to wake up the main thread when it is receiving answers. */
   alert_sockets_t alert;
+  /** Event to notice when another thread has sent a reply. */
+  struct event *reply_event;
+
+  /** The threadpool that uses this reply queue. */
+  struct threadpool_s *pool;
 };
 
 /** A worker thread represents a single thread in a thread pool. */
@@ -134,8 +137,6 @@ typedef struct workerthread_s {
   /** User-supplied state field that we pass to the worker functions of each
    * work item. */
   void *state;
-  /** Reply queue to which we pass our results. */
-  replyqueue_t *reply_queue;
   /** The current update generation of this thread */
   unsigned generation;
   /** One over the probability of taking work from a lower-priority queue. */
@@ -150,11 +151,13 @@ static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
 static workqueue_entry_t *
 workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*),
                     void (*reply_fn)(void*),
+                    replyqueue_t *reply_queue,
                     void *arg)
 {
   workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t));
   ent->fn = fn;
   ent->reply_fn = reply_fn;
+  ent->reply_queue = reply_queue;
   ent->arg = arg;
   ent->priority = WQ_PRI_HIGH;
   return ent;
@@ -301,7 +304,7 @@ worker_thread_main(void *thread_)
       result = work->fn(thread->state, work->arg);
 
       /* Queue the reply for the main thread. */
-      queue_reply(thread->reply_queue, work);
+      queue_reply(work->reply_queue, work);
 
       /* We may need to exit the thread. */
       if (result != WQ_RPL_REPLY) {
@@ -339,15 +342,13 @@ queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
   }
 }
 
-/** Allocate and start a new worker thread to use state object <b>state</b>,
- * and send responses to <b>replyqueue</b>. */
+/** Allocate and start a new worker thread to use state object <b>state</b>. */
 static workerthread_t *
 workerthread_new(int32_t lower_priority_chance,
-                 void *state, threadpool_t *pool, replyqueue_t *replyqueue)
+                 void *state, threadpool_t *pool)
 {
   workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
   thr->state = state;
-  thr->reply_queue = replyqueue;
   thr->in_pool = pool;
   thr->lower_priority_chance = lower_priority_chance;
 
@@ -391,12 +392,13 @@ threadpool_queue_work_priority(threadpool_t *pool,
                                workqueue_priority_t prio,
                                workqueue_reply_t (*fn)(void *, void *),
                                void (*reply_fn)(void *),
+                               replyqueue_t *reply_queue,
                                void *arg)
 {
   tor_assert(((int)prio) >= WORKQUEUE_PRIORITY_FIRST &&
              ((int)prio) <= WORKQUEUE_PRIORITY_LAST);
 
-  workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
+  workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, reply_queue, arg);
   ent->on_pool = pool;
   ent->pending = 1;
   ent->priority = prio;
@@ -417,9 +419,11 @@ workqueue_entry_t *
 threadpool_queue_work(threadpool_t *pool,
                       workqueue_reply_t (*fn)(void *, void *),
                       void (*reply_fn)(void *),
+                      replyqueue_t *reply_queue,
                       void *arg)
 {
-  return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg);
+  return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn,
+                                        reply_fn, reply_queue, arg);
 }
 
 /**
@@ -516,7 +520,7 @@ threadpool_start_threads(threadpool_t *pool, int n)
 
     void *state = pool->new_thread_state_fn(pool->new_thread_state_arg);
     workerthread_t *thr = workerthread_new(chance,
-                                           state, pool, pool->reply_queue);
+                                           state, pool);
 
     if (!thr) {
       //LCOV_EXCL_START
@@ -535,15 +539,13 @@ threadpool_start_threads(threadpool_t *pool, int n)
 }
 
 /**
- * Construct a new thread pool with <b>n</b> worker threads, configured to
- * send their output to <b>replyqueue</b>.  The threads' states will be
- * constructed with the <b>new_thread_state_fn</b> call, receiving <b>arg</b>
- * as its argument.  When the threads close, they will call
- * <b>free_thread_state_fn</b> on their states.
+ * Construct a new thread pool with <b>n</b> worker threads. The threads'
+ * states will be constructed with the <b>new_thread_state_fn</b> call,
+ * receiving <b>arg</b> as its argument.  When the threads close, they
+ * will call <b>free_thread_state_fn</b> on their states.
  */
 threadpool_t *
 threadpool_new(int n_threads,
-               replyqueue_t *replyqueue,
                void *(*new_thread_state_fn)(void*),
                void (*free_thread_state_fn)(void*),
                void *arg,
@@ -562,7 +564,6 @@ threadpool_new(int n_threads,
   pool->new_thread_state_arg = arg;
   pool->free_thread_state_fn = free_thread_state_fn;
   pool->thread_spawn_fn = thread_spawn_fn;
-  pool->reply_queue = replyqueue;
 
   if (threadpool_start_threads(pool, n_threads) < 0) {
     //LCOV_EXCL_START
@@ -577,11 +578,11 @@ threadpool_new(int n_threads,
   return pool;
 }
 
-/** Return the reply queue associated with a given thread pool. */
-replyqueue_t *
-threadpool_get_replyqueue(threadpool_t *tp)
+/** Return the thread pool associated with a given reply queue. */
+threadpool_t *
+replyqueue_get_threadpool(replyqueue_t *rq)
 {
-  return tp->reply_queue;
+  return rq->pool;
 }
 
 /** Allocate a new reply queue.  Reply queues are used to pass results from
@@ -589,7 +590,7 @@ threadpool_get_replyqueue(threadpool_t *tp)
  * IO-centric event loop, it needs to get woken up with means other than a
  * condition variable. */
 replyqueue_t *
-replyqueue_new(uint32_t alertsocks_flags)
+replyqueue_new(uint32_t alertsocks_flags, threadpool_t *pool)
 {
   replyqueue_t *rq;
 
@@ -601,6 +602,8 @@ replyqueue_new(uint32_t alertsocks_flags)
     //LCOV_EXCL_STOP
   }
 
+  rq->pool = pool;
+
   tor_mutex_init(&rq->lock);
   TOR_TAILQ_INIT(&rq->answers);
 
@@ -612,36 +615,41 @@ replyqueue_new(uint32_t alertsocks_flags)
 static void
 reply_event_cb(evutil_socket_t sock, short events, void *arg)
 {
-  threadpool_t *tp = arg;
+  replyqueue_t *reply_queue = arg;
   (void) sock;
   (void) events;
-  replyqueue_process(tp->reply_queue);
-  if (tp->reply_cb)
-    tp->reply_cb(tp);
+  replyqueue_process(reply_queue);
+  if (reply_queue->pool && reply_queue->pool->reply_cb)
+    reply_queue->pool->reply_cb(reply_queue->pool, reply_queue);
 }
 
-/** Register the threadpool <b>tp</b>'s reply queue with Tor's global
- * libevent mainloop. If <b>cb</b> is provided, it is run after
- * each time there is work to process from the reply queue. Return 0 on
- * success, -1 on failure.
+/** Register the reply queue with the given libevent mainloop. Return 0
+ * on success, -1 on failure.
  */
 int
-threadpool_register_reply_event(threadpool_t *tp,
-                                void (*cb)(threadpool_t *tp))
+replyqueue_register_reply_event(replyqueue_t *reply_queue,
+                                struct event_base *base)
 {
-  struct event_base *base = tor_libevent_get_base();
-
-  if (tp->reply_event) {
-    tor_event_free(tp->reply_event);
+  if (reply_queue->reply_event) {
+    tor_event_free(reply_queue->reply_event);
   }
-  tp->reply_event = tor_event_new(base,
-                                  tp->reply_queue->alert.read_fd,
-                                  EV_READ|EV_PERSIST,
-                                  reply_event_cb,
-                                  tp);
-  tor_assert(tp->reply_event);
+  reply_queue->reply_event = tor_event_new(base,
+                                           reply_queue->alert.read_fd,
+                                           EV_READ|EV_PERSIST,
+                                           reply_event_cb,
+                                           reply_queue);
+  tor_assert(reply_queue->reply_event);
+  return event_add(reply_queue->reply_event, NULL);
+}
+
+/** The given callback is run after each time there is work to process
+ * from a reply queue. Return 0 on success, -1 on failure.
+ */
+void
+threadpool_set_reply_cb(threadpool_t *tp,
+                        void (*cb)(threadpool_t *tp, replyqueue_t *rq))
+{
   tp->reply_cb = cb;
-  return event_add(tp->reply_event, NULL);
 }
 
 /**

+ 10 - 5
src/lib/evloop/workqueue.h

@@ -9,6 +9,8 @@
 #ifndef TOR_WORKQUEUE_H
 #define TOR_WORKQUEUE_H
 
+#include <event.h>
+
 #include "lib/cc/torint.h"
 
 /** A replyqueue is used to tell the main thread about the outcome of
@@ -39,12 +41,14 @@ workqueue_entry_t *threadpool_queue_work_priority(threadpool_t *pool,
                                     workqueue_reply_t (*fn)(void *,
                                                             void *),
                                     void (*reply_fn)(void *),
+                                    replyqueue_t *reply_queue,
                                     void *arg);
 
 workqueue_entry_t *threadpool_queue_work(threadpool_t *pool,
                                          workqueue_reply_t (*fn)(void *,
                                                                  void *),
                                          void (*reply_fn)(void *),
+                                         replyqueue_t *reply_queue,
                                          void *arg);
 
 int threadpool_queue_update(threadpool_t *pool,
@@ -54,18 +58,19 @@ int threadpool_queue_update(threadpool_t *pool,
                             void *arg);
 void *workqueue_entry_cancel(workqueue_entry_t *pending_work);
 threadpool_t *threadpool_new(int n_threads,
-                             replyqueue_t *replyqueue,
                              void *(*new_thread_state_fn)(void*),
                              void (*free_thread_state_fn)(void*),
                              void *arg,
                              int (*thread_spawn_fn)
                                  (void (*func)(void *), void *data));
-replyqueue_t *threadpool_get_replyqueue(threadpool_t *tp);
+threadpool_t *replyqueue_get_threadpool(replyqueue_t *rq);
 
-replyqueue_t *replyqueue_new(uint32_t alertsocks_flags);
+replyqueue_t *replyqueue_new(uint32_t alertsocks_flags, threadpool_t *pool);
 void replyqueue_process(replyqueue_t *queue);
 
-int threadpool_register_reply_event(threadpool_t *tp,
-                                    void (*cb)(threadpool_t *tp));
+int replyqueue_register_reply_event(replyqueue_t *reply_queue,
+                                    struct event_base *base);
+void threadpool_set_reply_cb(threadpool_t *tp,
+                             void (*cb)(threadpool_t *, replyqueue_t *));
 
 #endif /* !defined(TOR_WORKQUEUE_H) */

+ 18 - 17
src/test/test_workqueue.c

@@ -191,7 +191,7 @@ handle_reply_shutdown(void *arg)
 }
 
 static workqueue_entry_t *
-add_work(threadpool_t *tp)
+add_work(threadpool_t *tp, replyqueue_t *rq)
 {
   int add_rsa =
     opt_ratio_rsa == 0 ||
@@ -203,16 +203,15 @@ add_work(threadpool_t *tp)
     crypto_rand((char*)w->msg, 20);
     w->msglen = 20;
     ++rsa_sent;
-    return threadpool_queue_work_priority(tp,
-                                          WQ_PRI_MED,
-                                          workqueue_do_rsa, handle_reply, w);
+    return threadpool_queue_work_priority(tp, WQ_PRI_MED, workqueue_do_rsa,
+                                          handle_reply, rq, w);
   } else {
     ecdh_work_t *w = tor_malloc_zero(sizeof(*w));
     w->serial = n_sent++;
     /* Not strictly right, but this is just for benchmarks. */
     crypto_rand((char*)w->u.pk.public_key, 32);
     ++ecdh_sent;
-    return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w);
+    return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, rq, w);
   }
 }
 
@@ -220,7 +219,7 @@ static int n_failed_cancel = 0;
 static int n_successful_cancel = 0;
 
 static int
-add_n_work_items(threadpool_t *tp, int n)
+add_n_work_items(threadpool_t *tp, replyqueue_t *rq, int n)
 {
   int n_queued = 0;
   int n_try_cancel = 0, i;
@@ -231,7 +230,7 @@ add_n_work_items(threadpool_t *tp, int n)
   to_cancel = tor_calloc(opt_n_cancel, sizeof(workqueue_entry_t*));
 
   while (n_queued++ < n) {
-    ent = add_work(tp);
+    ent = add_work(tp, rq);
     if (! ent) {
       puts("Z");
       tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), NULL);
@@ -265,7 +264,7 @@ add_n_work_items(threadpool_t *tp, int n)
 static int shutting_down = 0;
 
 static void
-replysock_readable_cb(threadpool_t *tp)
+replysock_readable_cb(threadpool_t *tp, replyqueue_t *rq)
 {
   if (n_received_previously == n_received)
     return;
@@ -297,7 +296,7 @@ replysock_readable_cb(threadpool_t *tp)
     int n_to_send = n_received + opt_n_inflight - n_sent;
     if (n_to_send > opt_n_items - n_sent)
       n_to_send = opt_n_items - n_sent;
-    add_n_work_items(tp, n_to_send);
+    add_n_work_items(tp, rq, n_to_send);
   }
 
   if (shutting_down == 0 &&
@@ -308,7 +307,7 @@ replysock_readable_cb(threadpool_t *tp)
                              workqueue_do_shutdown, NULL, NULL);
     // Anything we add after starting the shutdown must not be executed.
     threadpool_queue_work(tp, workqueue_shutdown_error,
-                          handle_reply_shutdown, NULL);
+                          handle_reply_shutdown, rq, NULL);
     {
       struct timeval limit = { 2, 0 };
       tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), &limit);
@@ -399,14 +398,16 @@ main(int argc, char **argv)
     return 1;
   }
 
-  rq = replyqueue_new(as_flags);
+  tp = threadpool_new(opt_n_threads,
+                      new_state, free_state, NULL, spawn_func);
+  tor_assert(tp);
+  threadpool_set_reply_cb(tp, replysock_readable_cb);
+
+  rq = replyqueue_new(as_flags, tp);
   if (as_flags && rq == NULL)
     return 77; // 77 means "skipped".
 
   tor_assert(rq);
-  tp = threadpool_new(opt_n_threads,
-                      rq, new_state, free_state, NULL, spawn_func);
-  tor_assert(tp);
 
   crypto_seed_weak_rng(&weak_rng);
 
@@ -414,8 +415,8 @@ main(int argc, char **argv)
   tor_libevent_initialize(&evcfg);
 
   {
-    int r = threadpool_register_reply_event(tp,
-                                            replysock_readable_cb);
+    struct event_base *base = tor_libevent_get_base();
+    int r = replyqueue_register_reply_event(rq, base);
     tor_assert(r == 0);
   }
 
@@ -427,7 +428,7 @@ main(int argc, char **argv)
 #endif /* defined(TRACK_RESPONSES) */
 
   for (i = 0; i < opt_n_inflight; ++i) {
-    if (! add_work(tp)) {
+    if (! add_work(tp, rq)) {
       puts("Couldn't add work.");
       return 1;
     }