Browse Source

Each eventloop thread now has its own replyqueue

The cpuworker is now a subsystem so that threadlocals can be
initialized and destroyed properly. This allows each eventloop
thread to have its own replyqueue. Now when a thread invokes the
cpuworker, replies should go to the thread which asked for the work
to be done.
Steven Engler 4 years ago
parent
commit
f3f251535e

+ 2 - 0
src/app/main/subsystem_list.c

@@ -27,6 +27,7 @@
 #include "lib/wallclock/wallclock_sys.h"
 #include "lib/evloop/evloop_sys.h"
 #include "core/or/scheduler_sys.h"
+#include "core/mainloop/cpuworker_sys.h"
 
 #include "feature/dirauth/dirauth_sys.h"
 
@@ -60,6 +61,7 @@ const subsys_fns_t *tor_subsystems[] = {
   &sys_evloop,
 
   &sys_mainloop,
+  &sys_cpuworker,
   &sys_scheduler,
   &sys_or,
 

+ 56 - 12
src/core/mainloop/cpuworker.c

@@ -33,6 +33,7 @@
 #include "core/crypto/onion_crypto.h"
 #include "app/main/tor_threads.h"
 #include "lib/evloop/compat_libevent.h"
+#include "core/mainloop/cpuworker_sys.h"
 
 #include "core/or/or_circuit_st.h"
 
@@ -71,12 +72,36 @@ worker_state_free_void(void *arg)
   worker_state_free_(arg);
 }
 
-static replyqueue_t *replyqueue = NULL;
 static threadpool_t *threadpool = NULL;
+static tor_threadlocal_t replyqueue;
 
 static int total_pending_tasks = 0;
 static int max_pending_tasks = 128;
 
+static void
+cpu_init_threadlocals(void)
+{
+  tor_threadlocal_init(&replyqueue);
+}
+
+static void
+cpu_destroy_threadlocals(void)
+{
+  tor_threadlocal_destroy(&replyqueue);
+}
+
+void
+local_replyqueue_init(struct event_base *base)
+{
+  tor_assert(tor_threadlocal_get(&replyqueue) == NULL);
+
+  replyqueue_t *rq = replyqueue_new(0, threadpool);
+  int result = replyqueue_register_reply_event(rq, base);
+  tor_assert(result == 0);
+
+  tor_threadlocal_set(&replyqueue, (void *)rq);
+}
+
 /** Initialize the cpuworker subsystem. It is OK to call this more than once
  * during Tor's lifetime.
  */
@@ -97,13 +122,9 @@ cpu_init(void)
                                 NULL,
                                 start_tor_thread);
   }
-  if (!replyqueue) {
-    replyqueue = replyqueue_new(0, threadpool);
-
+  if (!tor_threadlocal_get(&replyqueue)) {
     struct event_base *base = tor_libevent_get_base();
-    int r = replyqueue_register_reply_event(replyqueue, base);
-
-    tor_assert(r == 0);
+    local_replyqueue_init(base);
   }
 
   /* Total voodoo. Can we make this more sensible? */
@@ -119,7 +140,7 @@ cpu_shutdown(void)
     threadpool = NULL;
   }
 
-  // TODO: clean up the replyqueue
+  // TODO: clean up all replyqueues
 }
 
 /** Magic numbers to make sure our cpuworker_requests don't grow any
@@ -514,13 +535,14 @@ cpuworker_queue_work,(workqueue_priority_t priority,
                       void *arg))
 {
   tor_assert(threadpool);
-  tor_assert(replyqueue);
+  replyqueue_t *local_replyqueue = tor_threadlocal_get(&replyqueue);
+  tor_assert(local_replyqueue);
 
   return threadpool_queue_work_priority(threadpool,
                                         priority,
                                         fn,
                                         reply_fn,
-                                        replyqueue,
+                                        local_replyqueue,
                                         arg);
 }
 
@@ -539,7 +561,8 @@ assign_onionskin_to_cpuworker(or_circuit_t *circ,
   int should_time;
 
   tor_assert(threadpool);
-  tor_assert(replyqueue);
+  replyqueue_t *local_replyqueue = tor_threadlocal_get(&replyqueue);
+  tor_assert(local_replyqueue);
 
   if (!circ->p_chan) {
     log_info(LD_OR,"circ->p_chan gone. Failing circ.");
@@ -581,7 +604,7 @@ assign_onionskin_to_cpuworker(or_circuit_t *circ,
                                       WQ_PRI_HIGH,
                                       cpuworker_onion_handshake_threadfn,
                                       cpuworker_onion_handshake_replyfn,
-                                      replyqueue,
+                                      local_replyqueue,
                                       job);
   if (!queue_entry) {
     log_warn(LD_BUG, "Couldn't queue work on threadpool");
@@ -617,3 +640,24 @@ cpuworker_cancel_circ_handshake(or_circuit_t *circ)
     circ->workqueue_entry = NULL;
   }
 }
+
+static int
+subsys_cpuworker_initialize(void)
+{
+  cpu_init_threadlocals();
+  return 0;
+}
+
+static void
+subsys_cpuworker_shutdown(void)
+{
+  cpu_destroy_threadlocals();
+}
+
+const struct subsys_fns_t sys_cpuworker = {
+  .name = "cpuworker",
+  .supported = true,
+  .level = 7,
+  .initialize = subsys_cpuworker_initialize,
+  .shutdown = subsys_cpuworker_shutdown,
+};

+ 1 - 0
src/core/mainloop/cpuworker.h

@@ -14,6 +14,7 @@
 
 #include "lib/evloop/workqueue.h"
 
+void local_replyqueue_init(struct event_base *base);
 void cpu_init(void);
 void cpu_shutdown(void);
 void cpuworkers_rotate_keyinfo(void);

+ 11 - 0
src/core/mainloop/cpuworker_sys.h

@@ -0,0 +1,11 @@
+/* Copyright (c) 2007-2019, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef CPUWORKER_SYS_H
+#define CPUWORKER_SYS_H
+
+#include "lib/subsys/subsys.h"
+
+extern const struct subsys_fns_t sys_cpuworker;
+
+#endif /* !defined(CPUWORKER_SYS_H) */

+ 5 - 1
src/core/mainloop/mainloop.c

@@ -2336,9 +2336,13 @@ static void
 tor_eventloop_thread(void)
 {
   log_debug(LD_GENERAL, "Starting eventloop thread.");
+
+  struct event_base *base = tor_libevent_get_base();
+  local_replyqueue_init(base);
+
   int loop_result = 0;
   while (loop_result == 0 && other_eventloops_should_exit == 0) {
-    loop_result = event_base_loop(tor_libevent_get_base(), EVLOOP_NO_EXIT_ON_EMPTY);
+    loop_result = event_base_loop(base, EVLOOP_NO_EXIT_ON_EMPTY);
   }
   log_debug(LD_GENERAL, "Done eventloop thread.");