Browse Source

Threadpools use a configurable thread spawn function

A new argument to threadpool_new specifies a thread spawn function. This
is useful if you want to wrap 'spawn_func' from 'thread.h' in a helper
function.
Steven Engler 4 years ago
parent
commit
ba5d3d9a6e

+ 3 - 1
src/core/mainloop/cpuworker.c

@@ -32,6 +32,7 @@
 #include "feature/relay/router.h"
 #include "lib/evloop/workqueue.h"
 #include "core/crypto/onion_crypto.h"
+#include "lib/thread/threads.h"
 
 #include "core/or/or_circuit_st.h"
 
@@ -97,7 +98,8 @@ cpu_init(void)
                                 replyqueue,
                                 worker_state_new,
                                 worker_state_free_void,
-                                NULL);
+                                NULL,
+                                spawn_func);
 
     int r = threadpool_register_reply_event(threadpool, NULL);
 

+ 8 - 2
src/lib/evloop/workqueue.c

@@ -85,6 +85,9 @@ struct threadpool_s {
   void *(*new_thread_state_fn)(void*);
   void (*free_thread_state_fn)(void*);
   void *new_thread_state_arg;
+
+  /** Function to start a thread. Should return a negative number on error. */
+  int (*thread_spawn_fn)(void (*func)(void *), void *data);
 };
 
 /** Used to put a workqueue_priority_t value into a bitfield. */
@@ -348,7 +351,8 @@ workerthread_new(int32_t lower_priority_chance,
   thr->in_pool = pool;
   thr->lower_priority_chance = lower_priority_chance;
 
-  if (spawn_func(worker_thread_main, thr) < 0) {
+  tor_assert(pool->thread_spawn_fn != NULL);
+  if (pool->thread_spawn_fn(worker_thread_main, thr) < 0) {
     //LCOV_EXCL_START
     tor_assert_nonfatal_unreached();
     log_err(LD_GENERAL, "Can't launch worker thread.");
@@ -542,7 +546,8 @@ threadpool_new(int n_threads,
                replyqueue_t *replyqueue,
                void *(*new_thread_state_fn)(void*),
                void (*free_thread_state_fn)(void*),
-               void *arg)
+               void *arg,
+               int (*thread_spawn_fn)(void (*func)(void *), void *data))
 {
   threadpool_t *pool;
   pool = tor_malloc_zero(sizeof(threadpool_t));
@@ -556,6 +561,7 @@ threadpool_new(int n_threads,
   pool->new_thread_state_fn = new_thread_state_fn;
   pool->new_thread_state_arg = arg;
   pool->free_thread_state_fn = free_thread_state_fn;
+  pool->thread_spawn_fn = thread_spawn_fn;
   pool->reply_queue = replyqueue;
 
   if (threadpool_start_threads(pool, n_threads) < 0) {

+ 3 - 1
src/lib/evloop/workqueue.h

@@ -57,7 +57,9 @@ threadpool_t *threadpool_new(int n_threads,
                              replyqueue_t *replyqueue,
                              void *(*new_thread_state_fn)(void*),
                              void (*free_thread_state_fn)(void*),
-                             void *arg);
+                             void *arg,
+                             int (*thread_spawn_fn)
+                                 (void (*func)(void *), void *data));
 replyqueue_t *threadpool_get_replyqueue(threadpool_t *tp);
 
 replyqueue_t *replyqueue_new(uint32_t alertsocks_flags);

+ 1 - 1
src/test/test_workqueue.c

@@ -405,7 +405,7 @@ main(int argc, char **argv)
 
   tor_assert(rq);
   tp = threadpool_new(opt_n_threads,
-                      rq, new_state, free_state, NULL);
+                      rq, new_state, free_state, NULL, spawn_func);
   tor_assert(tp);
 
   crypto_seed_weak_rng(&weak_rng);