Browse Source

Merge branch 'maint-0.3.1'

Nick Mathewson 6 years ago
parent
commit
15ed1c0c83

+ 8 - 0
changes/bug22883-priority

@@ -0,0 +1,8 @@
+  o Major bugfixes (relay, performance):
+
+    - Perform circuit handshake operations at a higher priority than we use
+      for consensus diff creation and compression. This should prevent
+      circuits from starving when a relay or bridge receive a new consensus,
+      especially on lower-powered machines. Fixes bug 22883; bugfix on
+      0.3.1.1-alpha.
+

+ 3 - 0
changes/more-threads

@@ -0,0 +1,3 @@
+  o Minor features (relay, performance):
+    - Always start relays with at least two worker threads, to prevent
+      priority inversion on slow tasks.  Part of the fix for bug 22883.

+ 5 - 0
changes/multi-priority

@@ -0,0 +1,5 @@
+  o Minor features (relay, thread pool):
+    - Allow background work to be queued with different priorities, so
+      that a big pile of slow low-priority jobs will not starve out
+      higher priority jobs. This lays the groundwork for a fix for bug
+      22883.

+ 124 - 21
src/common/workqueue.c

@@ -25,11 +25,19 @@
 #include "orconfig.h"
 #include "compat.h"
 #include "compat_threads.h"
+#include "crypto.h"
 #include "util.h"
 #include "workqueue.h"
 #include "tor_queue.h"
 #include "torlog.h"
 
+#define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
+#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
+#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
+
+TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_s);
+typedef struct work_tailq_t work_tailq_t;
+
 struct threadpool_s {
   /** An array of pointers to workerthread_t: one for each running worker
    * thread. */
@@ -38,8 +46,12 @@ struct threadpool_s {
   /** Condition variable that we wait on when we have no work, and which
    * gets signaled when our queue becomes nonempty. */
   tor_cond_t condition;
-  /** Queue of pending work that we have to do. */
-  TOR_TAILQ_HEAD(, workqueue_entry_s) work;
+  /** Queues of pending work that we have to do. The queue with priority
+   * <b>p</b> is work[p]. */
+  work_tailq_t work[WORKQUEUE_N_PRIORITIES];
+
+  /** Weak RNG, used to decide when to ignore priority. */
+  tor_weak_rng_t weak_rng;
 
   /** The current 'update generation' of the threadpool.  Any thread that is
    * at an earlier generation needs to run the update function. */
@@ -66,6 +78,11 @@ struct threadpool_s {
   void *new_thread_state_arg;
 };
 
+/** Used to put a workqueue_priority_t value into a bitfield. */
+#define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t)
+/** Number of bits needed to hold all legal values of workqueue_priority_t */
+#define WORKQUEUE_PRIORITY_BITS 2
+
 struct workqueue_entry_s {
   /** The next workqueue_entry_t that's pending on the same thread or
    * reply queue. */
@@ -76,6 +93,8 @@ struct workqueue_entry_s {
   struct threadpool_s *on_pool;
   /** True iff this entry is waiting for a worker to start processing it. */
   uint8_t pending;
+  /** Priority of this entry. */
+  workqueue_priority_bitfield_t priority : WORKQUEUE_PRIORITY_BITS;
   /** Function to run in the worker thread. */
   workqueue_reply_t (*fn)(void *state, void *arg);
   /** Function to run while processing the reply queue. */
@@ -94,9 +113,7 @@ struct replyqueue_s {
   alert_sockets_t alert;
 };
 
-/** A worker thread represents a single thread in a thread pool.  To avoid
- * contention, each gets its own queue. This breaks the guarantee that that
- * queued work will get executed strictly in order. */
+/** A worker thread represents a single thread in a thread pool. */
 typedef struct workerthread_s {
   /** Which thread it this?  In range 0..in_pool->n_threads-1 */
   int index;
@@ -109,6 +126,8 @@ typedef struct workerthread_s {
   replyqueue_t *reply_queue;
   /** The current update generation of this thread */
   unsigned generation;
+  /** One over the probability of taking work from a lower-priority queue. */
+  int32_t lower_priority_chance;
 } workerthread_t;
 
 static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
@@ -125,6 +144,7 @@ workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*),
   ent->fn = fn;
   ent->reply_fn = reply_fn;
   ent->arg = arg;
+  ent->priority = WQ_PRI_HIGH;
   return ent;
 }
 
@@ -161,8 +181,9 @@ workqueue_entry_cancel(workqueue_entry_t *ent)
   int cancelled = 0;
   void *result = NULL;
   tor_mutex_acquire(&ent->on_pool->lock);
+  workqueue_priority_t prio = ent->priority;
   if (ent->pending) {
-    TOR_TAILQ_REMOVE(&ent->on_pool->work, ent, next_work);
+    TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work);
     cancelled = 1;
     result = ent->arg;
   }
@@ -180,8 +201,46 @@ workqueue_entry_cancel(workqueue_entry_t *ent)
 static int
 worker_thread_has_work(workerthread_t *thread)
 {
-  return !TOR_TAILQ_EMPTY(&thread->in_pool->work) ||
-    thread->generation != thread->in_pool->generation;
+  unsigned i;
+  for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
+    if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i]))
+        return 1;
+  }
+  return thread->generation != thread->in_pool->generation;
+}
+
+/** Extract the next workqueue_entry_t from the the thread's pool, removing
+ * it from the relevant queues and marking it as non-pending.
+ *
+ * The caller must hold the lock. */
+static workqueue_entry_t *
+worker_thread_extract_next_work(workerthread_t *thread)
+{
+  threadpool_t *pool = thread->in_pool;
+  work_tailq_t *queue = NULL, *this_queue;
+  unsigned i;
+  for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
+    this_queue = &pool->work[i];
+    if (!TOR_TAILQ_EMPTY(this_queue)) {
+      queue = this_queue;
+      if (! tor_weak_random_one_in_n(&pool->weak_rng,
+                                     thread->lower_priority_chance)) {
+        /* Usually we'll just break now, so that we can get out of the loop
+         * and use the queue where we found work. But with a small
+         * probability, we'll keep looking for lower priority work, so that
+         * we don't ignore our low-priority queues entirely. */
+        break;
+      }
+    }
+  }
+
+  if (queue == NULL)
+    return NULL;
+
+  workqueue_entry_t *work = TOR_TAILQ_FIRST(queue);
+  TOR_TAILQ_REMOVE(queue, work, next_work);
+  work->pending = 0;
+  return work;
 }
 
 /**
@@ -217,9 +276,9 @@ worker_thread_main(void *thread_)
         tor_mutex_acquire(&pool->lock);
         continue;
       }
-      work = TOR_TAILQ_FIRST(&pool->work);
-      TOR_TAILQ_REMOVE(&pool->work, work, next_work);
-      work->pending = 0;
+      work = worker_thread_extract_next_work(thread);
+      if (BUG(work == NULL))
+        break;
       tor_mutex_release(&pool->lock);
 
       /* We run the work function without holding the thread lock. This
@@ -268,12 +327,14 @@ queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
 /** Allocate and start a new worker thread to use state object <b>state</b>,
  * and send responses to <b>replyqueue</b>. */
 static workerthread_t *
-workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue)
+workerthread_new(int32_t lower_priority_chance,
+                 void *state, threadpool_t *pool, replyqueue_t *replyqueue)
 {
   workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
   thr->state = state;
   thr->reply_queue = replyqueue;
   thr->in_pool = pool;
+  thr->lower_priority_chance = lower_priority_chance;
 
   if (spawn_func(worker_thread_main, thr) < 0) {
     //LCOV_EXCL_START
@@ -299,24 +360,34 @@ workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue)
  * function's responsibility to free the work object.
  *
  * On success, return a workqueue_entry_t object that can be passed to
- * workqueue_entry_cancel(). On failure, return NULL.
+ * workqueue_entry_cancel(). On failure, return NULL.  (Failure is not
+ * currently possible, but callers should check anyway.)
+ *
+ * Items are executed in a loose priority order -- each thread will usually
+ * take from the queued work with the highest prioirity, but will occasionally
+ * visit lower-priority queues to keep them from starving completely.
  *
- * Note that because each thread has its own work queue, work items may not
+ * Note that because of priorities and thread behavior, work items may not
  * be executed strictly in order.
  */
 workqueue_entry_t *
-threadpool_queue_work(threadpool_t *pool,
-                      workqueue_reply_t (*fn)(void *, void *),
-                      void (*reply_fn)(void *),
-                      void *arg)
+threadpool_queue_work_priority(threadpool_t *pool,
+                               workqueue_priority_t prio,
+                               workqueue_reply_t (*fn)(void *, void *),
+                               void (*reply_fn)(void *),
+                               void *arg)
 {
+  tor_assert(prio >= WORKQUEUE_PRIORITY_FIRST &&
+             prio <= WORKQUEUE_PRIORITY_LAST);
+
   workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
   ent->on_pool = pool;
   ent->pending = 1;
+  ent->priority = prio;
 
   tor_mutex_acquire(&pool->lock);
 
-  TOR_TAILQ_INSERT_TAIL(&pool->work, ent, next_work);
+  TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work);
 
   tor_cond_signal_one(&pool->condition);
 
@@ -325,6 +396,16 @@ threadpool_queue_work(threadpool_t *pool,
   return ent;
 }
 
+/** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */
+workqueue_entry_t *
+threadpool_queue_work(threadpool_t *pool,
+                      workqueue_reply_t (*fn)(void *, void *),
+                      void (*reply_fn)(void *),
+                      void *arg)
+{
+  return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg);
+}
+
 /**
  * Queue a copy of a work item for every thread in a pool.  This can be used,
  * for example, to tell the threads to update some parameter in their states.
@@ -388,6 +469,14 @@ threadpool_queue_update(threadpool_t *pool,
 /** Don't have more than this many threads per pool. */
 #define MAX_THREADS 1024
 
+/** For half of our threads, choose lower priority queues with probability
+ * 1/N for each of these values. Both are chosen somewhat arbitrarily.  If
+ * CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks
+ * stalling forever.  If it's too high, we have a risk of low-priority tasks
+ * grabbing half of the threads. */
+#define CHANCE_PERMISSIVE 37
+#define CHANCE_STRICT INT32_MAX
+
 /** Launch threads until we have <b>n</b>. */
 static int
 threadpool_start_threads(threadpool_t *pool, int n)
@@ -404,8 +493,14 @@ threadpool_start_threads(threadpool_t *pool, int n)
                                      sizeof(workerthread_t*), n);
 
   while (pool->n_threads < n) {
+    /* For half of our threads, we'll choose lower priorities permissively;
+     * for the other half, we'll stick more strictly to higher priorities.
+     * This keeps slow low-priority tasks from taking over completely. */
+    int32_t chance = (pool->n_threads & 1) ? CHANCE_STRICT : CHANCE_PERMISSIVE;
+
     void *state = pool->new_thread_state_fn(pool->new_thread_state_arg);
-    workerthread_t *thr = workerthread_new(state, pool, pool->reply_queue);
+    workerthread_t *thr = workerthread_new(chance,
+                                           state, pool, pool->reply_queue);
 
     if (!thr) {
       //LCOV_EXCL_START
@@ -441,7 +536,15 @@ threadpool_new(int n_threads,
   pool = tor_malloc_zero(sizeof(threadpool_t));
   tor_mutex_init_nonrecursive(&pool->lock);
   tor_cond_init(&pool->condition);
-  TOR_TAILQ_INIT(&pool->work);
+  unsigned i;
+  for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
+    TOR_TAILQ_INIT(&pool->work[i]);
+  }
+  {
+    unsigned seed;
+    crypto_rand((void*)&seed, sizeof(seed));
+    tor_init_weak_random(&pool->weak_rng, seed);
+  }
 
   pool->new_thread_state_fn = new_thread_state_fn;
   pool->new_thread_state_arg = arg;

+ 14 - 0
src/common/workqueue.h

@@ -22,6 +22,20 @@ typedef enum workqueue_reply_t {
   WQ_RPL_SHUTDOWN = 2, /** indicates thread is shutting down */
 } workqueue_reply_t;
 
+/** Possible priorities for work.  Lower numeric values are more important. */
+typedef enum workqueue_priority_t {
+  WQ_PRI_HIGH = 0,
+  WQ_PRI_MED  = 1,
+  WQ_PRI_LOW  = 2,
+} workqueue_priority_t;
+
+workqueue_entry_t *threadpool_queue_work_priority(threadpool_t *pool,
+                                    workqueue_priority_t prio,
+                                    workqueue_reply_t (*fn)(void *,
+                                                            void *),
+                                    void (*reply_fn)(void *),
+                                    void *arg);
+
 workqueue_entry_t *threadpool_queue_work(threadpool_t *pool,
                                          workqueue_reply_t (*fn)(void *,
                                                                  void *),

+ 4 - 2
src/or/consdiffmgr.c

@@ -1616,7 +1616,8 @@ consensus_diff_queue_diff_work(consensus_cache_entry_t *diff_from,
     goto err;
 
   workqueue_entry_t *work;
-  work = cpuworker_queue_work(consensus_diff_worker_threadfn,
+  work = cpuworker_queue_work(WQ_PRI_LOW,
+                              consensus_diff_worker_threadfn,
                               consensus_diff_worker_replyfn,
                               job);
   if (!work)
@@ -1779,7 +1780,8 @@ consensus_queue_compression_work(const char *consensus,
 
   if (background_compression) {
     workqueue_entry_t *work;
-    work = cpuworker_queue_work(consensus_compress_worker_threadfn,
+    work = cpuworker_queue_work(WQ_PRI_LOW,
+                                consensus_compress_worker_threadfn,
                                 consensus_compress_worker_replyfn,
                                 job);
     if (!work) {

+ 22 - 9
src/or/cpuworker.c

@@ -11,8 +11,11 @@
  * The multithreading backend for this module is in workqueue.c; this module
  * specializes workqueue.c.
  *
- * Right now, we only use this for processing onionskins, and invoke it mostly
- * from onion.c.
+ * Right now, we use this infrastructure
+ *  <ul><li>for processing onionskins in onion.c
+ *      <li>for compressing consensuses in consdiffmgr.c,
+ *      <li>and for calculating diffs and compressing them in consdiffmgr.c.
+ *  </ul>
  **/
 #include "or.h"
 #include "channel.h"
@@ -89,7 +92,14 @@ cpu_init(void)
     event_add(reply_event, NULL);
   }
   if (!threadpool) {
-    threadpool = threadpool_new(get_num_cpus(get_options()),
+    /*
+      In our threadpool implementation, half the threads are permissive and
+      half are strict (when it comes to running lower-priority tasks). So we
+      always make sure we have at least two threads, so that there will be at
+      least one thread of each kind.
+    */
+    const int n_threads = get_num_cpus(get_options()) + 1;
+    threadpool = threadpool_new(n_threads,
                                 replyqueue,
                                 worker_state_new,
                                 worker_state_free,
@@ -481,16 +491,18 @@ queue_pending_tasks(void)
 
 /** DOCDOC */
 MOCK_IMPL(workqueue_entry_t *,
-cpuworker_queue_work,(workqueue_reply_t (*fn)(void *, void *),
+cpuworker_queue_work,(workqueue_priority_t priority,
+                      workqueue_reply_t (*fn)(void *, void *),
                       void (*reply_fn)(void *),
                       void *arg))
 {
   tor_assert(threadpool);
 
-  return threadpool_queue_work(threadpool,
-                               fn,
-                               reply_fn,
-                               arg);
+  return threadpool_queue_work_priority(threadpool,
+                                        priority,
+                                        fn,
+                                        reply_fn,
+                                        arg);
 }
 
 /** Try to tell a cpuworker to perform the public key operations necessary to
@@ -545,7 +557,8 @@ assign_onionskin_to_cpuworker(or_circuit_t *circ,
   memwipe(&req, 0, sizeof(req));
 
   ++total_pending_tasks;
-  queue_entry = threadpool_queue_work(threadpool,
+  queue_entry = threadpool_queue_work_priority(threadpool,
+                                      WQ_PRI_HIGH,
                                       cpuworker_onion_handshake_threadfn,
                                       cpuworker_onion_handshake_replyfn,
                                       job);

+ 2 - 0
src/or/cpuworker.h

@@ -16,7 +16,9 @@ void cpu_init(void);
 void cpuworkers_rotate_keyinfo(void);
 struct workqueue_entry_s;
 enum workqueue_reply_t;
+enum workqueue_priority_t;
 MOCK_DECL(struct workqueue_entry_s *, cpuworker_queue_work, (
+                    enum workqueue_priority_t priority,
                     enum workqueue_reply_t (*fn)(void *, void *),
                     void (*reply_fn)(void *),
                     void *arg));

+ 4 - 1
src/test/test_consdiffmgr.c

@@ -98,10 +98,13 @@ typedef struct fake_work_queue_ent_t {
   void *arg;
 } fake_work_queue_ent_t;
 static struct workqueue_entry_s *
-mock_cpuworker_queue_work(enum workqueue_reply_t (*fn)(void *, void *),
+mock_cpuworker_queue_work(workqueue_priority_t prio,
+                          enum workqueue_reply_t (*fn)(void *, void *),
                           void (*reply_fn)(void *),
                           void *arg)
 {
+  (void) prio;
+
   if (! fake_cpuworker_queue)
     fake_cpuworker_queue = smartlist_new();
 

+ 3 - 1
src/test/test_workqueue.c

@@ -200,7 +200,9 @@ add_work(threadpool_t *tp)
     crypto_rand((char*)w->msg, 20);
     w->msglen = 20;
     ++rsa_sent;
-    return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w);
+    return threadpool_queue_work_priority(tp,
+                                          WQ_PRI_MED,
+                                          workqueue_do_rsa, handle_reply, w);
   } else {
     ecdh_work_t *w = tor_malloc_zero(sizeof(*w));
     w->serial = n_sent++;