Browse Source

Added the ability to shutdown the threadpool

The threadpool can now be shutdown. This stops and joins the
threadpool's worker threads. When shutting down, the worker
threads will quickly reply to the remaing work entries without
running them, setting their reply status to 'WQ_RPL_SHUTDOWN'.

This change is not perfect, but it improves on the existing code.
Additional changes should be made to make sure all memory is
being freed (in the previous version, most memory was not being
freed).
Steven Engler 4 years ago
parent
commit
547376f189

+ 3 - 0
src/app/main/shutdown.c

@@ -19,6 +19,7 @@
 #include "app/main/subsysmgr.h"
 #include "core/mainloop/connection.h"
 #include "core/mainloop/mainloop_pubsub.h"
+#include "core/mainloop/cpuworker.h"
 #include "core/or/channeltls.h"
 #include "core/or/circuitlist.h"
 #include "core/or/circuitmux_ewma.h"
@@ -151,6 +152,8 @@ tor_free_all(int postfork)
   }
   /* stuff in main.c */
 
+  cpu_shutdown();
+
   tor_mainloop_disconnect_pubsub();
 
   if (!postfork) {

+ 18 - 3
src/core/mainloop/cpuworker.c

@@ -30,7 +30,6 @@
 #include "feature/relay/onion_queue.h"
 #include "feature/stats/rephist.h"
 #include "feature/relay/router.h"
-#include "lib/evloop/workqueue.h"
 #include "core/crypto/onion_crypto.h"
 #include "app/main/tor_threads.h"
 #include "lib/evloop/compat_libevent.h"
@@ -111,6 +110,18 @@ cpu_init(void)
   max_pending_tasks = get_num_cpus(get_options()) * 64;
 }
 
+/** Shutdown the cpuworker subsystem, and wait for any threads to join. */
+void
+cpu_shutdown(void)
+{
+  if (threadpool != NULL) {
+    threadpool_shutdown(threadpool);
+    threadpool = NULL;
+  }
+
+  // TODO: clean up the replyqueue
+}
+
 /** Magic numbers to make sure our cpuworker_requests don't grow any
  * mis-framing bugs. */
 #define CPUWORKER_REQUEST_MAGIC 0xda4afeed
@@ -312,7 +323,7 @@ cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
 
 /** Handle a reply from the worker threads. */
 static void
-cpuworker_onion_handshake_replyfn(void *work_)
+cpuworker_onion_handshake_replyfn(void *work_, workqueue_reply_t reply_status)
 {
   cpuworker_job_t *job = work_;
   cpuworker_reply_t rpl;
@@ -321,6 +332,10 @@ cpuworker_onion_handshake_replyfn(void *work_)
   tor_assert(total_pending_tasks > 0);
   --total_pending_tasks;
 
+  if (reply_status != WQ_RPL_REPLY) {
+    goto done_processing;
+  }
+
   /* Could avoid this, but doesn't matter. */
   memcpy(&rpl, &job->u.reply, sizeof(rpl));
 
@@ -495,7 +510,7 @@ queue_pending_tasks(void)
 MOCK_IMPL(workqueue_entry_t *,
 cpuworker_queue_work,(workqueue_priority_t priority,
                       workqueue_reply_t (*fn)(void *, void *),
-                      void (*reply_fn)(void *),
+                      void (*reply_fn)(void *, workqueue_reply_t),
                       void *arg))
 {
   tor_assert(threadpool);

+ 4 - 1
src/core/mainloop/cpuworker.h

@@ -12,7 +12,10 @@
 #ifndef TOR_CPUWORKER_H
 #define TOR_CPUWORKER_H
 
+#include "lib/evloop/workqueue.h"
+
 void cpu_init(void);
+void cpu_shutdown(void);
 void cpuworkers_rotate_keyinfo(void);
 struct workqueue_entry_s;
 enum workqueue_reply_t;
@@ -20,7 +23,7 @@ enum workqueue_priority_t;
 MOCK_DECL(struct workqueue_entry_s *, cpuworker_queue_work, (
                     enum workqueue_priority_t priority,
                     enum workqueue_reply_t (*fn)(void *, void *),
-                    void (*reply_fn)(void *),
+                    void (*reply_fn)(void *, workqueue_reply_t),
                     void *arg));
 
 struct create_cell_t;

+ 14 - 4
src/feature/dircache/consdiffmgr.c

@@ -1611,13 +1611,18 @@ consensus_diff_worker_job_free_(consensus_diff_worker_job_t *job)
  * processed.
  */
 static void
-consensus_diff_worker_replyfn(void *work_)
+consensus_diff_worker_replyfn(void *work_, workqueue_reply_t reply_status)
 {
   tor_assert(in_main_thread());
   tor_assert(work_);
 
   consensus_diff_worker_job_t *job = work_;
 
+  if (reply_status != WQ_RPL_REPLY) {
+    consensus_diff_worker_job_free(job);
+    return;
+  }
+
   const char *lv_from_digest =
     consensus_cache_entry_get_value(job->diff_from,
                                     LABEL_SHA3_DIGEST_AS_SIGNED);
@@ -1806,10 +1811,15 @@ consensus_compress_worker_threadfn(void *state_, void *work_)
  * processed.
  */
 static void
-consensus_compress_worker_replyfn(void *work_)
+consensus_compress_worker_replyfn(void *work_, workqueue_reply_t reply_status)
 {
   consensus_compress_worker_job_t *job = work_;
 
+  if (reply_status != WQ_RPL_REPLY) {
+    consensus_compress_worker_job_free(job);
+    return;
+  }
+
   consensus_cache_entry_handle_t *handles[
                                ARRAY_LENGTH(compress_consensus_with)];
   memset(handles, 0, sizeof(handles));
@@ -1895,8 +1905,8 @@ consensus_queue_compression_work(const char *consensus,
 
     return 0;
   } else {
-    consensus_compress_worker_threadfn(NULL, job);
-    consensus_compress_worker_replyfn(job);
+    workqueue_reply_t reply_status = consensus_compress_worker_threadfn(NULL, job);
+    consensus_compress_worker_replyfn(job, reply_status);
     return 0;
   }
 }

+ 94 - 24
src/lib/evloop/workqueue.c

@@ -34,7 +34,6 @@
 #include "lib/log/util_bug.h"
 #include "lib/net/alertsock.h"
 #include "lib/net/socket.h"
-#include "lib/thread/threads.h"
 
 #include "ext/tor_queue.h"
 #include <event2/event.h>
@@ -63,6 +62,9 @@ struct threadpool_s {
    * at an earlier generation needs to run the update function. */
   unsigned generation;
 
+  /** Flag to tell the worker threads to stop. */
+  int shutdown;
+
   /** Function that should be run for updates on each thread. */
   workqueue_reply_t (*update_fn)(void *, void *);
   /** Function to free update arguments if they can't be run. */
@@ -83,7 +85,7 @@ struct threadpool_s {
   void *new_thread_state_arg;
 
   /** Function to start a thread. Should return a negative number on error. */
-  int (*thread_spawn_fn)(void (*func)(void *), void *data);
+  tor_thread_t *(*thread_spawn_fn)(void (*func)(void *), void *data);
 };
 
 /** Used to put a workqueue_priority_t value into a bitfield. */
@@ -106,11 +108,13 @@ struct workqueue_entry_s {
   /** Function to run in the worker thread. */
   workqueue_reply_t (*fn)(void *state, void *arg);
   /** Function to run while processing the reply queue. */
-  void (*reply_fn)(void *arg);
+  void (*reply_fn)(void *arg, workqueue_reply_t reply_status);
   /** Linked reply queue */
   replyqueue_t *reply_queue;
   /** Argument for the above functions. */
   void *arg;
+  /** Reply status of the worker thread function after it has returned. */
+  workqueue_reply_t reply_status;
 };
 
 struct replyqueue_s {
@@ -132,6 +136,8 @@ struct replyqueue_s {
 typedef struct workerthread_s {
   /** Which thread it this?  In range 0..in_pool->n_threads-1 */
   int index;
+  /** The tor thread object. */
+  tor_thread_t* thread;
   /** The pool this thread is a part of. */
   struct threadpool_s *in_pool;
   /** User-supplied state field that we pass to the worker functions of each
@@ -150,7 +156,7 @@ static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
  * thread. See threadpool_queue_work() for full documentation. */
 static workqueue_entry_t *
 workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*),
-                    void (*reply_fn)(void*),
+                    void (*reply_fn)(void*, workqueue_reply_t),
                     replyqueue_t *reply_queue,
                     void *arg)
 {
@@ -295,26 +301,42 @@ worker_thread_main(void *thread_)
         continue;
       }
       work = worker_thread_extract_next_work(thread);
-      if (BUG(work == NULL))
+      if (BUG(work == NULL)) {
         break;
-      tor_mutex_release(&pool->lock);
+      }
+      if (pool->shutdown) {
+        /* If the pool wants to shutdown, we still need to reply so
+           that the reply functions have a chance to free memory. */
+        tor_mutex_release(&pool->lock);
+        work->reply_status = WQ_RPL_SHUTDOWN;
+        queue_reply(work->reply_queue, work);
+        tor_mutex_acquire(&pool->lock);
+      } else {
+        tor_mutex_release(&pool->lock);
 
-      /* We run the work function without holding the thread lock. This
-       * is the main thread's first opportunity to give us more work. */
-      result = work->fn(thread->state, work->arg);
+        /* We run the work function without holding the thread lock. This
+         * is the main thread's first opportunity to give us more work. */
+        result = work->fn(thread->state, work->arg);
 
-      /* Queue the reply for the main thread. */
-      queue_reply(work->reply_queue, work);
+        /* Queue the reply for the main thread. */
+        work->reply_status = result;
+        queue_reply(work->reply_queue, work);
 
-      /* We may need to exit the thread. */
-      if (result != WQ_RPL_REPLY) {
-        return;
+        /* We may need to exit the thread. */
+        if (result != WQ_RPL_REPLY) {
+          return;
+        }
+        tor_mutex_acquire(&pool->lock);
       }
-      tor_mutex_acquire(&pool->lock);
     }
     /* At this point the lock is held, and there is no work in this thread's
      * queue. */
 
+    if (pool->shutdown) {
+      tor_mutex_release(&pool->lock);
+      return;
+    }
+
     /* TODO: support an idle-function */
 
     /* Okay. Now, wait till somebody has work for us. */
@@ -353,7 +375,8 @@ workerthread_new(int32_t lower_priority_chance,
   thr->lower_priority_chance = lower_priority_chance;
 
   tor_assert(pool->thread_spawn_fn != NULL);
-  if (pool->thread_spawn_fn(worker_thread_main, thr) < 0) {
+  tor_thread_t* thread = pool->thread_spawn_fn(worker_thread_main, thr);
+  if (thread == NULL) {
     //LCOV_EXCL_START
     tor_assert_nonfatal_unreached();
     log_err(LD_GENERAL, "Can't launch worker thread.");
@@ -362,9 +385,25 @@ workerthread_new(int32_t lower_priority_chance,
     //LCOV_EXCL_STOP
   }
 
+  thr->thread = thread;
+
   return thr;
 }
 
+static void
+workerthread_join(workerthread_t* thr)
+{
+  if (join_thread(thr->thread) != 0) {
+    log_err(LD_GENERAL, "Could not join workerthread.");
+  }
+}
+
+static void
+workerthread_free(workerthread_t* thr)
+{
+  free_thread(thr->thread);
+}
+
 /**
  * Queue an item of work for a thread in a thread pool.  The function
  * <b>fn</b> will be run in a worker thread, and will receive as arguments the
@@ -377,8 +416,7 @@ workerthread_new(int32_t lower_priority_chance,
  * function's responsibility to free the work object.
  *
  * On success, return a workqueue_entry_t object that can be passed to
- * workqueue_entry_cancel(). On failure, return NULL.  (Failure is not
- * currently possible, but callers should check anyway.)
+ * workqueue_entry_cancel(). On failure, return NULL.
  *
  * Items are executed in a loose priority order -- each thread will usually
  * take from the queued work with the highest prioirity, but will occasionally
@@ -391,20 +429,24 @@ workqueue_entry_t *
 threadpool_queue_work_priority(threadpool_t *pool,
                                workqueue_priority_t prio,
                                workqueue_reply_t (*fn)(void *, void *),
-                               void (*reply_fn)(void *),
+                               void (*reply_fn)(void *, workqueue_reply_t),
                                replyqueue_t *reply_queue,
                                void *arg)
 {
   tor_assert(((int)prio) >= WORKQUEUE_PRIORITY_FIRST &&
              ((int)prio) <= WORKQUEUE_PRIORITY_LAST);
 
+  tor_mutex_acquire(&pool->lock);
+
+  if (pool->shutdown) {
+    return NULL;
+  }
+
   workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, reply_queue, arg);
   ent->on_pool = pool;
   ent->pending = 1;
   ent->priority = prio;
 
-  tor_mutex_acquire(&pool->lock);
-
   TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work);
 
   tor_cond_signal_one(&pool->condition);
@@ -418,7 +460,7 @@ threadpool_queue_work_priority(threadpool_t *pool,
 workqueue_entry_t *
 threadpool_queue_work(threadpool_t *pool,
                       workqueue_reply_t (*fn)(void *, void *),
-                      void (*reply_fn)(void *),
+                      void (*reply_fn)(void *, workqueue_reply_t),
                       replyqueue_t *reply_queue,
                       void *arg)
 {
@@ -454,6 +496,11 @@ threadpool_queue_update(threadpool_t *pool,
   void **new_args;
 
   tor_mutex_acquire(&pool->lock);
+
+  if (pool->shutdown) {
+    return -1;
+  }
+
   n_threads = pool->n_threads;
   old_args = pool->update_args;
   old_args_free_fn = pool->free_update_arg_fn;
@@ -549,7 +596,7 @@ threadpool_new(int n_threads,
                void *(*new_thread_state_fn)(void*),
                void (*free_thread_state_fn)(void*),
                void *arg,
-               int (*thread_spawn_fn)(void (*func)(void *), void *data))
+               tor_thread_t *(*thread_spawn_fn)(void (*func)(void *), void *data))
 {
   threadpool_t *pool;
   pool = tor_malloc_zero(sizeof(threadpool_t));
@@ -578,6 +625,29 @@ threadpool_new(int n_threads,
   return pool;
 }
 
+void
+threadpool_shutdown(threadpool_t* pool)
+{
+  tor_assert(pool != NULL);
+  tor_mutex_acquire(&pool->lock);
+  pool->shutdown = 1;
+  tor_cond_signal_all(&pool->condition);
+
+  for (int i=0; i<pool->n_threads; i++) {
+    workerthread_t *thread = pool->threads[i];
+    tor_mutex_release(&pool->lock);
+    workerthread_join(thread);
+    tor_mutex_acquire(&pool->lock);
+  }
+
+  for (int i=0; i<pool->n_threads; i++) {
+    workerthread_free(pool->threads[i]);
+    pool->free_thread_state_fn(pool->threads[i]->state);
+  }
+
+  tor_mutex_release(&pool->lock);
+}
+
 /** Return the thread pool associated with a given reply queue. */
 threadpool_t *
 replyqueue_get_threadpool(replyqueue_t *rq)
@@ -678,7 +748,7 @@ replyqueue_process(replyqueue_t *queue)
     tor_mutex_release(&queue->lock);
     work->on_pool = NULL;
 
-    work->reply_fn(work->arg);
+    work->reply_fn(work->arg, work->reply_status);
     workqueue_entry_free(work);
 
     tor_mutex_acquire(&queue->lock);

+ 7 - 4
src/lib/evloop/workqueue.h

@@ -12,6 +12,7 @@
 #include <event.h>
 
 #include "lib/cc/torint.h"
+#include "lib/thread/threads.h"
 
 /** A replyqueue is used to tell the main thread about the outcome of
  * work that we queued for the workers. */
@@ -40,14 +41,14 @@ workqueue_entry_t *threadpool_queue_work_priority(threadpool_t *pool,
                                     workqueue_priority_t prio,
                                     workqueue_reply_t (*fn)(void *,
                                                             void *),
-                                    void (*reply_fn)(void *),
+                                    void (*reply_fn)(void *, workqueue_reply_t),
                                     replyqueue_t *reply_queue,
                                     void *arg);
 
 workqueue_entry_t *threadpool_queue_work(threadpool_t *pool,
                                          workqueue_reply_t (*fn)(void *,
                                                                  void *),
-                                         void (*reply_fn)(void *),
+                                         void (*reply_fn)(void *, workqueue_reply_t),
                                          replyqueue_t *reply_queue,
                                          void *arg);
 
@@ -61,8 +62,10 @@ threadpool_t *threadpool_new(int n_threads,
                              void *(*new_thread_state_fn)(void*),
                              void (*free_thread_state_fn)(void*),
                              void *arg,
-                             int (*thread_spawn_fn)
-                                 (void (*func)(void *), void *data));
+                             tor_thread_t *(*thread_spawn_fn)
+                                           (void (*func)(void *),
+                                            void *data));
+void threadpool_shutdown(threadpool_t* pool);
 threadpool_t *replyqueue_get_threadpool(replyqueue_t *rq);
 
 replyqueue_t *replyqueue_new(uint32_t alertsocks_flags, threadpool_t *pool);

+ 5 - 3
src/test/test_consdiffmgr.c

@@ -116,13 +116,14 @@ fake_ns_body_new(consensus_flavor_t flav, time_t valid_after)
 static smartlist_t *fake_cpuworker_queue = NULL;
 typedef struct fake_work_queue_ent_t {
   enum workqueue_reply_t (*fn)(void *, void *);
-  void (*reply_fn)(void *);
+  void (*reply_fn)(void *, workqueue_reply_t);
   void *arg;
+  workqueue_reply_t reply_status;
 } fake_work_queue_ent_t;
 static struct workqueue_entry_s *
 mock_cpuworker_queue_work(workqueue_priority_t prio,
                           enum workqueue_reply_t (*fn)(void *, void *),
-                          void (*reply_fn)(void *),
+                          void (*reply_fn)(void *, workqueue_reply_t),
                           void *arg)
 {
   (void) prio;
@@ -144,6 +145,7 @@ mock_cpuworker_run_work(void)
     return 0;
   SMARTLIST_FOREACH(fake_cpuworker_queue, fake_work_queue_ent_t *, ent, {
       enum workqueue_reply_t r = ent->fn(NULL, ent->arg);
+      ent->reply_status = r;
       if (r != WQ_RPL_REPLY)
         return -1;
   });
@@ -155,7 +157,7 @@ mock_cpuworker_handle_replies(void)
   if (! fake_cpuworker_queue)
     return;
   SMARTLIST_FOREACH(fake_cpuworker_queue, fake_work_queue_ent_t *, ent, {
-      ent->reply_fn(ent->arg);
+      ent->reply_fn(ent->arg, ent->reply_status);
       tor_free(ent);
   });
   smartlist_free(fake_cpuworker_queue);

+ 7 - 2
src/test/test_workqueue.c

@@ -170,8 +170,10 @@ bitarray_t *received;
 #endif
 
 static void
-handle_reply(void *arg)
+handle_reply(void *arg, workqueue_reply_t reply_status)
 {
+  (void)reply_status;
+
 #ifdef TRACK_RESPONSES
   rsa_work_t *rw = arg; /* Naughty cast, but only looking at serial. */
   tor_assert(! bitarray_is_set(received, rw->serial));
@@ -184,9 +186,10 @@ handle_reply(void *arg)
 
 /* This should never get called. */
 static void
-handle_reply_shutdown(void *arg)
+handle_reply_shutdown(void *arg, workqueue_reply_t reply_status)
 {
   (void)arg;
+  (void)reply_status;
   no_shutdown = 1;
 }
 
@@ -441,6 +444,8 @@ main(int argc, char **argv)
 
   tor_libevent_run_event_loop(tor_libevent_get_base(), 0);
 
+  threadpool_shutdown(tp);
+
   if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent) {
     printf("%d vs %d\n", n_sent, opt_n_items);
     printf("%d+%d vs %d\n", n_received, n_successful_cancel, n_sent);