cpuworker.c 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. /* Copyright (c) 2003-2004, Roger Dingledine.
  2. * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
  3. * Copyright (c) 2007-2019, The Tor Project, Inc. */
  4. /* See LICENSE for licensing information */
  5. /**
  6. * \file cpuworker.c
  7. * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
  8. * out to subprocesses.
  9. *
  10. * The multithreading backend for this module is in workqueue.c; this module
  11. * specializes workqueue.c.
  12. *
  13. * Right now, we use this infrastructure
  14. * <ul><li>for processing onionskins in onion.c
  15. * <li>for compressing consensuses in consdiffmgr.c,
  16. * <li>and for calculating diffs and compressing them in consdiffmgr.c.
  17. * </ul>
  18. **/
  19. #include "core/or/or.h"
  20. #include "core/or/channel.h"
  21. #include "core/or/circuitbuild.h"
  22. #include "core/or/circuitlist.h"
  23. #include "core/or/connection_or.h"
  24. #include "app/config/config.h"
  25. #include "core/mainloop/cpuworker.h"
  26. #include "lib/crypt_ops/crypto_rand.h"
  27. #include "lib/crypt_ops/crypto_util.h"
  28. #include "core/or/onion.h"
  29. #include "feature/relay/onion_queue.h"
  30. #include "feature/stats/rephist.h"
  31. #include "feature/relay/router.h"
  32. #include "core/crypto/onion_crypto.h"
  33. #include "app/main/tor_threads.h"
  34. #include "lib/evloop/compat_libevent.h"
  35. #include "core/mainloop/cpuworker_sys.h"
  36. #include "core/or/or_circuit_st.h"
  37. static void queue_pending_tasks(void);
  38. typedef struct worker_state_s {
  39. int generation;
  40. server_onion_keys_t *onion_keys;
  41. } worker_state_t;
  42. static void *
  43. worker_state_new(void *arg)
  44. {
  45. worker_state_t *ws;
  46. (void)arg;
  47. ws = tor_malloc_zero(sizeof(worker_state_t));
  48. ws->onion_keys = server_onion_keys_new();
  49. return ws;
  50. }
  51. #define worker_state_free(ws) \
  52. FREE_AND_NULL(worker_state_t, worker_state_free_, (ws))
  53. static void
  54. worker_state_free_(worker_state_t *ws)
  55. {
  56. if (!ws)
  57. return;
  58. server_onion_keys_free(ws->onion_keys);
  59. tor_free(ws);
  60. }
  61. static void
  62. worker_state_free_void(void *arg)
  63. {
  64. worker_state_free_(arg);
  65. }
  66. static threadpool_t *threadpool = NULL;
  67. static tor_threadlocal_t replyqueue;
  68. static int total_pending_tasks = 0;
  69. static int max_pending_tasks = 128;
  70. static void
  71. cpu_init_threadlocals(void)
  72. {
  73. tor_threadlocal_init(&replyqueue);
  74. }
  75. static void
  76. cpu_destroy_threadlocals(void)
  77. {
  78. tor_threadlocal_destroy(&replyqueue);
  79. }
  80. void
  81. local_replyqueue_init(struct event_base *base)
  82. {
  83. tor_assert(tor_threadlocal_get(&replyqueue) == NULL);
  84. replyqueue_t *rq = replyqueue_new(0, threadpool);
  85. int result = replyqueue_register_reply_event(rq, base);
  86. tor_assert(result == 0);
  87. tor_threadlocal_set(&replyqueue, (void *)rq);
  88. }
  89. /** Initialize the cpuworker subsystem. It is OK to call this more than once
  90. * during Tor's lifetime.
  91. */
  92. void
  93. cpu_init(void)
  94. {
  95. if (!threadpool) {
  96. /*
  97. In our threadpool implementation, half the threads are permissive and
  98. half are strict (when it comes to running lower-priority tasks). So we
  99. always make sure we have at least two threads, so that there will be at
  100. least one thread of each kind.
  101. */
  102. const int n_threads = get_num_cpus(get_options()) + 1;
  103. threadpool = threadpool_new(n_threads,
  104. worker_state_new,
  105. worker_state_free_void,
  106. NULL,
  107. start_tor_thread);
  108. }
  109. if (!tor_threadlocal_get(&replyqueue)) {
  110. struct event_base *base = tor_libevent_get_base();
  111. local_replyqueue_init(base);
  112. }
  113. /* Total voodoo. Can we make this more sensible? */
  114. max_pending_tasks = get_num_cpus(get_options()) * 64;
  115. }
  116. /** Shutdown the cpuworker subsystem, and wait for any threads to join. */
  117. void
  118. cpu_shutdown(void)
  119. {
  120. if (threadpool != NULL) {
  121. threadpool_shutdown(threadpool);
  122. threadpool = NULL;
  123. }
  124. // TODO: clean up all replyqueues
  125. }
  126. /** Magic numbers to make sure our cpuworker_requests don't grow any
  127. * mis-framing bugs. */
  128. #define CPUWORKER_REQUEST_MAGIC 0xda4afeed
  129. #define CPUWORKER_REPLY_MAGIC 0x5eedf00d
  130. /** A request sent to a cpuworker. */
  131. typedef struct cpuworker_request_t {
  132. /** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
  133. uint32_t magic;
  134. /** Flag: Are we timing this request? */
  135. unsigned timed : 1;
  136. /** If we're timing this request, when was it sent to the cpuworker? */
  137. struct timeval started_at;
  138. /** A create cell for the cpuworker to process. */
  139. create_cell_t create_cell;
  140. /* Turn the above into a tagged union if needed. */
  141. } cpuworker_request_t;
  142. /** A reply sent by a cpuworker. */
  143. typedef struct cpuworker_reply_t {
  144. /** Magic number; must be CPUWORKER_REPLY_MAGIC. */
  145. uint32_t magic;
  146. /** True iff we got a successful request. */
  147. uint8_t success;
  148. /** Are we timing this request? */
  149. unsigned int timed : 1;
  150. /** What handshake type was the request? (Used for timing) */
  151. uint16_t handshake_type;
  152. /** When did we send the request to the cpuworker? */
  153. struct timeval started_at;
  154. /** Once the cpuworker received the request, how many microseconds did it
  155. * take? (This shouldn't overflow; 4 billion micoseconds is over an hour,
  156. * and we'll never have an onion handshake that takes so long.) */
  157. uint32_t n_usec;
  158. /** Output of processing a create cell
  159. *
  160. * @{
  161. */
  162. /** The created cell to send back. */
  163. created_cell_t created_cell;
  164. /** The keys to use on this circuit. */
  165. uint8_t keys[CPATH_KEY_MATERIAL_LEN];
  166. /** Input to use for authenticating introduce1 cells. */
  167. uint8_t rend_auth_material[DIGEST_LEN];
  168. } cpuworker_reply_t;
  169. typedef struct cpuworker_job_u {
  170. or_circuit_t *circ;
  171. union {
  172. cpuworker_request_t request;
  173. cpuworker_reply_t reply;
  174. } u;
  175. } cpuworker_job_t;
  176. static workqueue_reply_t
  177. update_state_threadfn(void *state_, void *work_)
  178. {
  179. worker_state_t *state = state_;
  180. worker_state_t *update = work_;
  181. server_onion_keys_free(state->onion_keys);
  182. state->onion_keys = update->onion_keys;
  183. update->onion_keys = NULL;
  184. worker_state_free(update);
  185. ++state->generation;
  186. return WQ_RPL_REPLY;
  187. }
  188. /** Called when the onion key has changed so update all CPU worker(s) with
  189. * new function pointers with which a new state will be generated.
  190. */
  191. void
  192. cpuworkers_rotate_keyinfo(void)
  193. {
  194. if (!threadpool) {
  195. /* If we're a client, then we won't have cpuworkers, and we won't need
  196. * to tell them to rotate their state.
  197. */
  198. return;
  199. }
  200. if (threadpool_queue_update(threadpool,
  201. worker_state_new,
  202. update_state_threadfn,
  203. worker_state_free_void,
  204. NULL)) {
  205. log_warn(LD_OR, "Failed to queue key update for worker threads.");
  206. }
  207. }
  208. /** Indexed by handshake type: how many onionskins have we processed and
  209. * counted of that type? */
  210. static uint64_t onionskins_n_processed[MAX_ONION_HANDSHAKE_TYPE+1];
  211. /** Indexed by handshake type, corresponding to the onionskins counted in
  212. * onionskins_n_processed: how many microseconds have we spent in cpuworkers
  213. * processing that kind of onionskin? */
  214. static uint64_t onionskins_usec_internal[MAX_ONION_HANDSHAKE_TYPE+1];
  215. /** Indexed by handshake type, corresponding to onionskins counted in
  216. * onionskins_n_processed: how many microseconds have we spent waiting for
  217. * cpuworkers to give us answers for that kind of onionskin?
  218. */
  219. static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
  220. /** If any onionskin takes longer than this, we clip them to this
  221. * time. (microseconds) */
  222. #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
  223. /** Return true iff we'd like to measure a handshake of type
  224. * <b>onionskin_type</b>. Call only from the main thread. */
  225. static int
  226. should_time_request(uint16_t onionskin_type)
  227. {
  228. /* If we've never heard of this type, we shouldn't even be here. */
  229. if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE)
  230. return 0;
  231. /* Measure the first N handshakes of each type, to ensure we have a
  232. * sample */
  233. if (onionskins_n_processed[onionskin_type] < 4096)
  234. return 1;
  235. /** Otherwise, measure with P=1/128. We avoid doing this for every
  236. * handshake, since the measurement itself can take a little time. */
  237. return crypto_fast_rng_one_in_n(get_thread_fast_rng(), 128);
  238. }
  239. /** Return an estimate of how many microseconds we will need for a single
  240. * cpuworker to process <b>n_requests</b> onionskins of type
  241. * <b>onionskin_type</b>. */
  242. uint64_t
  243. estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type)
  244. {
  245. if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
  246. return 1000 * (uint64_t)n_requests;
  247. if (PREDICT_UNLIKELY(onionskins_n_processed[onionskin_type] < 100)) {
  248. /* Until we have 100 data points, just asssume everything takes 1 msec. */
  249. return 1000 * (uint64_t)n_requests;
  250. } else {
  251. /* This can't overflow: we'll never have more than 500000 onionskins
  252. * measured in onionskin_usec_internal, and they won't take anything near
  253. * 1 sec each, and we won't have anything like 1 million queued
  254. * onionskins. But that's 5e5 * 1e6 * 1e6, which is still less than
  255. * UINT64_MAX. */
  256. return (onionskins_usec_internal[onionskin_type] * n_requests) /
  257. onionskins_n_processed[onionskin_type];
  258. }
  259. }
  260. /** Compute the absolute and relative overhead of using the cpuworker
  261. * framework for onionskins of type <b>onionskin_type</b>.*/
  262. static int
  263. get_overhead_for_onionskins(uint32_t *usec_out, double *frac_out,
  264. uint16_t onionskin_type)
  265. {
  266. uint64_t overhead;
  267. *usec_out = 0;
  268. *frac_out = 0.0;
  269. if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
  270. return -1;
  271. if (onionskins_n_processed[onionskin_type] == 0 ||
  272. onionskins_usec_internal[onionskin_type] == 0 ||
  273. onionskins_usec_roundtrip[onionskin_type] == 0)
  274. return -1;
  275. overhead = onionskins_usec_roundtrip[onionskin_type] -
  276. onionskins_usec_internal[onionskin_type];
  277. *usec_out = (uint32_t)(overhead / onionskins_n_processed[onionskin_type]);
  278. *frac_out = ((double)overhead) / onionskins_usec_internal[onionskin_type];
  279. return 0;
  280. }
  281. /** If we've measured overhead for onionskins of type <b>onionskin_type</b>,
  282. * log it. */
  283. void
  284. cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
  285. const char *onionskin_type_name)
  286. {
  287. uint32_t overhead;
  288. double relative_overhead;
  289. int r;
  290. r = get_overhead_for_onionskins(&overhead, &relative_overhead,
  291. onionskin_type);
  292. if (!overhead || r<0)
  293. return;
  294. log_fn(severity, LD_OR,
  295. "%s onionskins have averaged %u usec overhead (%.2f%%) in "
  296. "cpuworker code ",
  297. onionskin_type_name, (unsigned)overhead, relative_overhead*100);
  298. }
  299. /** Handle a reply from the worker threads. */
  300. static void
  301. cpuworker_onion_handshake_replyfn(void *work_, workqueue_reply_t reply_status)
  302. {
  303. cpuworker_job_t *job = work_;
  304. cpuworker_reply_t rpl;
  305. or_circuit_t *circ = NULL;
  306. tor_assert(total_pending_tasks > 0);
  307. --total_pending_tasks;
  308. if (reply_status != WQ_RPL_REPLY) {
  309. goto done_processing;
  310. }
  311. /* Could avoid this, but doesn't matter. */
  312. memcpy(&rpl, &job->u.reply, sizeof(rpl));
  313. tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
  314. if (rpl.timed && rpl.success &&
  315. rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) {
  316. /* Time how long this request took. The handshake_type check should be
  317. needless, but let's leave it in to be safe. */
  318. struct timeval tv_end, tv_diff;
  319. int64_t usec_roundtrip;
  320. tor_gettimeofday(&tv_end);
  321. timersub(&tv_end, &rpl.started_at, &tv_diff);
  322. usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
  323. if (usec_roundtrip >= 0 &&
  324. usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) {
  325. ++onionskins_n_processed[rpl.handshake_type];
  326. onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec;
  327. onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip;
  328. if (onionskins_n_processed[rpl.handshake_type] >= 500000) {
  329. /* Scale down every 500000 handshakes. On a busy server, that's
  330. * less impressive than it sounds. */
  331. onionskins_n_processed[rpl.handshake_type] /= 2;
  332. onionskins_usec_internal[rpl.handshake_type] /= 2;
  333. onionskins_usec_roundtrip[rpl.handshake_type] /= 2;
  334. }
  335. }
  336. }
  337. circ = job->circ;
  338. log_debug(LD_OR,
  339. "Unpacking cpuworker reply %p, circ=%p, success=%d",
  340. job, circ, rpl.success);
  341. if (circ->base_.magic == DEAD_CIRCUIT_MAGIC) {
  342. /* The circuit was supposed to get freed while the reply was
  343. * pending. Instead, it got left for us to free so that we wouldn't freak
  344. * out when the job->circ field wound up pointing to nothing. */
  345. log_debug(LD_OR, "Circuit died while reply was pending. Freeing memory.");
  346. circ->base_.magic = 0;
  347. tor_free(circ);
  348. goto done_processing;
  349. }
  350. circ->workqueue_entry = NULL;
  351. if (TO_CIRCUIT(circ)->marked_for_close) {
  352. /* We already marked this circuit; we can't call it open. */
  353. log_debug(LD_OR,"circuit is already marked.");
  354. goto done_processing;
  355. }
  356. if (rpl.success == 0) {
  357. log_debug(LD_OR,
  358. "decoding onionskin failed. "
  359. "(Old key or bad software.) Closing.");
  360. circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_TORPROTOCOL);
  361. goto done_processing;
  362. }
  363. if (onionskin_answer(circ,
  364. &rpl.created_cell,
  365. (const char*)rpl.keys, sizeof(rpl.keys),
  366. rpl.rend_auth_material) < 0) {
  367. log_warn(LD_OR,"onionskin_answer failed. Closing.");
  368. circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_INTERNAL);
  369. goto done_processing;
  370. }
  371. log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
  372. done_processing:
  373. memwipe(&rpl, 0, sizeof(rpl));
  374. memwipe(job, 0, sizeof(*job));
  375. tor_free(job);
  376. queue_pending_tasks();
  377. }
  378. /** Implementation function for onion handshake requests. */
  379. static workqueue_reply_t
  380. cpuworker_onion_handshake_threadfn(void *state_, void *work_)
  381. {
  382. worker_state_t *state = state_;
  383. cpuworker_job_t *job = work_;
  384. /* variables for onion processing */
  385. server_onion_keys_t *onion_keys = state->onion_keys;
  386. cpuworker_request_t req;
  387. cpuworker_reply_t rpl;
  388. memcpy(&req, &job->u.request, sizeof(req));
  389. tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
  390. memset(&rpl, 0, sizeof(rpl));
  391. const create_cell_t *cc = &req.create_cell;
  392. created_cell_t *cell_out = &rpl.created_cell;
  393. struct timeval tv_start = {0,0}, tv_end;
  394. int n;
  395. rpl.timed = req.timed;
  396. rpl.started_at = req.started_at;
  397. rpl.handshake_type = cc->handshake_type;
  398. if (req.timed)
  399. tor_gettimeofday(&tv_start);
  400. n = onion_skin_server_handshake(cc->handshake_type,
  401. cc->onionskin, cc->handshake_len,
  402. onion_keys,
  403. cell_out->reply,
  404. rpl.keys, CPATH_KEY_MATERIAL_LEN,
  405. rpl.rend_auth_material);
  406. if (n < 0) {
  407. /* failure */
  408. log_debug(LD_OR,"onion_skin_server_handshake failed.");
  409. memset(&rpl, 0, sizeof(rpl));
  410. rpl.success = 0;
  411. } else {
  412. /* success */
  413. log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
  414. cell_out->handshake_len = n;
  415. switch (cc->cell_type) {
  416. case CELL_CREATE:
  417. cell_out->cell_type = CELL_CREATED; break;
  418. case CELL_CREATE2:
  419. cell_out->cell_type = CELL_CREATED2; break;
  420. case CELL_CREATE_FAST:
  421. cell_out->cell_type = CELL_CREATED_FAST; break;
  422. default:
  423. tor_assert(0);
  424. return WQ_RPL_SHUTDOWN;
  425. }
  426. rpl.success = 1;
  427. }
  428. rpl.magic = CPUWORKER_REPLY_MAGIC;
  429. if (req.timed) {
  430. struct timeval tv_diff;
  431. int64_t usec;
  432. tor_gettimeofday(&tv_end);
  433. timersub(&tv_end, &tv_start, &tv_diff);
  434. usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
  435. if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY)
  436. rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY;
  437. else
  438. rpl.n_usec = (uint32_t) usec;
  439. }
  440. memcpy(&job->u.reply, &rpl, sizeof(rpl));
  441. memwipe(&req, 0, sizeof(req));
  442. memwipe(&rpl, 0, sizeof(req));
  443. return WQ_RPL_REPLY;
  444. }
  445. /** Take pending tasks from the queue and assign them to cpuworkers. */
  446. static void
  447. queue_pending_tasks(void)
  448. {
  449. or_circuit_t *circ;
  450. create_cell_t *onionskin = NULL;
  451. while (total_pending_tasks < max_pending_tasks) {
  452. circ = onion_next_task(&onionskin);
  453. if (!circ)
  454. return;
  455. if (assign_onionskin_to_cpuworker(circ, onionskin) < 0)
  456. log_info(LD_OR,"assign_to_cpuworker failed. Ignoring.");
  457. }
  458. }
  459. /** DOCDOC */
  460. MOCK_IMPL(workqueue_entry_t *,
  461. cpuworker_queue_work,(workqueue_priority_t priority,
  462. workqueue_reply_t (*fn)(void *, void *),
  463. void (*reply_fn)(void *, workqueue_reply_t),
  464. void *arg))
  465. {
  466. tor_assert(threadpool);
  467. replyqueue_t *local_replyqueue = tor_threadlocal_get(&replyqueue);
  468. tor_assert(local_replyqueue);
  469. return threadpool_queue_work_priority(threadpool,
  470. priority,
  471. fn,
  472. reply_fn,
  473. local_replyqueue,
  474. arg);
  475. }
  476. /** Try to tell a cpuworker to perform the public key operations necessary to
  477. * respond to <b>onionskin</b> for the circuit <b>circ</b>.
  478. *
  479. * Return 0 if we successfully assign the task, or -1 on failure.
  480. */
  481. int
  482. assign_onionskin_to_cpuworker(or_circuit_t *circ,
  483. create_cell_t *onionskin)
  484. {
  485. workqueue_entry_t *queue_entry;
  486. cpuworker_job_t *job;
  487. cpuworker_request_t req;
  488. int should_time;
  489. tor_assert(threadpool);
  490. replyqueue_t *local_replyqueue = tor_threadlocal_get(&replyqueue);
  491. tor_assert(local_replyqueue);
  492. if (!circ->p_chan) {
  493. log_info(LD_OR,"circ->p_chan gone. Failing circ.");
  494. tor_free(onionskin);
  495. return -1;
  496. }
  497. if (total_pending_tasks >= max_pending_tasks) {
  498. log_debug(LD_OR,"No idle cpuworkers. Queuing.");
  499. if (onion_pending_add(circ, onionskin) < 0) {
  500. tor_free(onionskin);
  501. return -1;
  502. }
  503. return 0;
  504. }
  505. if (!channel_is_client(circ->p_chan))
  506. rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
  507. should_time = should_time_request(onionskin->handshake_type);
  508. memset(&req, 0, sizeof(req));
  509. req.magic = CPUWORKER_REQUEST_MAGIC;
  510. req.timed = should_time;
  511. memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
  512. tor_free(onionskin);
  513. if (should_time)
  514. tor_gettimeofday(&req.started_at);
  515. job = tor_malloc_zero(sizeof(cpuworker_job_t));
  516. job->circ = circ;
  517. memcpy(&job->u.request, &req, sizeof(req));
  518. memwipe(&req, 0, sizeof(req));
  519. ++total_pending_tasks;
  520. queue_entry = threadpool_queue_work_priority(threadpool,
  521. WQ_PRI_HIGH,
  522. cpuworker_onion_handshake_threadfn,
  523. cpuworker_onion_handshake_replyfn,
  524. local_replyqueue,
  525. job);
  526. if (!queue_entry) {
  527. log_warn(LD_BUG, "Couldn't queue work on threadpool");
  528. tor_free(job);
  529. return -1;
  530. }
  531. log_debug(LD_OR, "Queued task %p (qe=%p, circ=%p)",
  532. job, queue_entry, job->circ);
  533. circ->workqueue_entry = queue_entry;
  534. return 0;
  535. }
  536. /** If <b>circ</b> has a pending handshake that hasn't been processed yet,
  537. * remove it from the worker queue. */
  538. void
  539. cpuworker_cancel_circ_handshake(or_circuit_t *circ)
  540. {
  541. cpuworker_job_t *job;
  542. if (circ->workqueue_entry == NULL)
  543. return;
  544. job = workqueue_entry_cancel(circ->workqueue_entry);
  545. if (job) {
  546. /* It successfully cancelled. */
  547. memwipe(job, 0xe0, sizeof(*job));
  548. tor_free(job);
  549. tor_assert(total_pending_tasks > 0);
  550. --total_pending_tasks;
  551. /* if (!job), this is done in cpuworker_onion_handshake_replyfn. */
  552. circ->workqueue_entry = NULL;
  553. }
  554. }
  555. static int
  556. subsys_cpuworker_initialize(void)
  557. {
  558. cpu_init_threadlocals();
  559. return 0;
  560. }
  561. static void
  562. subsys_cpuworker_shutdown(void)
  563. {
  564. cpu_destroy_threadlocals();
  565. }
  566. const struct subsys_fns_t sys_cpuworker = {
  567. .name = "cpuworker",
  568. .supported = true,
  569. .level = 7,
  570. .initialize = subsys_cpuworker_initialize,
  571. .shutdown = subsys_cpuworker_shutdown,
  572. };