cpuworker.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599
  1. /* Copyright (c) 2003-2004, Roger Dingledine.
  2. * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
  3. * Copyright (c) 2007-2017, The Tor Project, Inc. */
  4. /* See LICENSE for licensing information */
  5. /**
  6. * \file cpuworker.c
  7. * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
  8. * out to subprocesses.
  9. *
  10. * The multithreading backend for this module is in workqueue.c; this module
  11. * specializes workqueue.c.
  12. *
  13. * Right now, we use this infrastructure
  14. * <ul><li>for processing onionskins in onion.c
  15. * <li>for compressing consensuses in consdiffmgr.c,
  16. * <li>and for calculating diffs and compressing them in consdiffmgr.c.
  17. * </ul>
  18. **/
  19. #include "or.h"
  20. #include "channel.h"
  21. #include "circuitbuild.h"
  22. #include "circuitlist.h"
  23. #include "connection_or.h"
  24. #include "config.h"
  25. #include "cpuworker.h"
  26. #include "main.h"
  27. #include "onion.h"
  28. #include "rephist.h"
  29. #include "router.h"
  30. #include "workqueue.h"
  31. #include <event2/event.h>
  32. static void queue_pending_tasks(void);
  33. typedef struct worker_state_s {
  34. int generation;
  35. server_onion_keys_t *onion_keys;
  36. } worker_state_t;
  37. static void *
  38. worker_state_new(void *arg)
  39. {
  40. worker_state_t *ws;
  41. (void)arg;
  42. ws = tor_malloc_zero(sizeof(worker_state_t));
  43. ws->onion_keys = server_onion_keys_new();
  44. return ws;
  45. }
  46. static void
  47. worker_state_free(void *arg)
  48. {
  49. worker_state_t *ws = arg;
  50. server_onion_keys_free(ws->onion_keys);
  51. tor_free(ws);
  52. }
  53. static replyqueue_t *replyqueue = NULL;
  54. static threadpool_t *threadpool = NULL;
  55. static struct event *reply_event = NULL;
  56. static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
  57. static int total_pending_tasks = 0;
  58. static int max_pending_tasks = 128;
  59. static void
  60. replyqueue_process_cb(evutil_socket_t sock, short events, void *arg)
  61. {
  62. replyqueue_t *rq = arg;
  63. (void) sock;
  64. (void) events;
  65. replyqueue_process(rq);
  66. }
  67. /** Initialize the cpuworker subsystem. It is OK to call this more than once
  68. * during Tor's lifetime.
  69. */
  70. void
  71. cpu_init(void)
  72. {
  73. if (!replyqueue) {
  74. replyqueue = replyqueue_new(0);
  75. }
  76. if (!reply_event) {
  77. reply_event = tor_event_new(tor_libevent_get_base(),
  78. replyqueue_get_socket(replyqueue),
  79. EV_READ|EV_PERSIST,
  80. replyqueue_process_cb,
  81. replyqueue);
  82. event_add(reply_event, NULL);
  83. }
  84. if (!threadpool) {
  85. /*
  86. In our threadpool implementation, half the threads are permissive and
  87. half are strict (when it comes to running lower-priority tasks). So we
  88. always make sure we have at least two threads, so that there will be at
  89. least one thread of each kind.
  90. */
  91. const int n_threads = get_num_cpus(get_options()) + 1;
  92. threadpool = threadpool_new(n_threads,
  93. replyqueue,
  94. worker_state_new,
  95. worker_state_free,
  96. NULL);
  97. }
  98. /* Total voodoo. Can we make this more sensible? */
  99. max_pending_tasks = get_num_cpus(get_options()) * 64;
  100. crypto_seed_weak_rng(&request_sample_rng);
  101. }
  102. /** Magic numbers to make sure our cpuworker_requests don't grow any
  103. * mis-framing bugs. */
  104. #define CPUWORKER_REQUEST_MAGIC 0xda4afeed
  105. #define CPUWORKER_REPLY_MAGIC 0x5eedf00d
  106. /** A request sent to a cpuworker. */
  107. typedef struct cpuworker_request_t {
  108. /** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
  109. uint32_t magic;
  110. /** Flag: Are we timing this request? */
  111. unsigned timed : 1;
  112. /** If we're timing this request, when was it sent to the cpuworker? */
  113. struct timeval started_at;
  114. /** A create cell for the cpuworker to process. */
  115. create_cell_t create_cell;
  116. /* Turn the above into a tagged union if needed. */
  117. } cpuworker_request_t;
  118. /** A reply sent by a cpuworker. */
  119. typedef struct cpuworker_reply_t {
  120. /** Magic number; must be CPUWORKER_REPLY_MAGIC. */
  121. uint32_t magic;
  122. /** True iff we got a successful request. */
  123. uint8_t success;
  124. /** Are we timing this request? */
  125. unsigned int timed : 1;
  126. /** What handshake type was the request? (Used for timing) */
  127. uint16_t handshake_type;
  128. /** When did we send the request to the cpuworker? */
  129. struct timeval started_at;
  130. /** Once the cpuworker received the request, how many microseconds did it
  131. * take? (This shouldn't overflow; 4 billion micoseconds is over an hour,
  132. * and we'll never have an onion handshake that takes so long.) */
  133. uint32_t n_usec;
  134. /** Output of processing a create cell
  135. *
  136. * @{
  137. */
  138. /** The created cell to send back. */
  139. created_cell_t created_cell;
  140. /** The keys to use on this circuit. */
  141. uint8_t keys[CPATH_KEY_MATERIAL_LEN];
  142. /** Input to use for authenticating introduce1 cells. */
  143. uint8_t rend_auth_material[DIGEST_LEN];
  144. } cpuworker_reply_t;
  145. typedef struct cpuworker_job_u {
  146. or_circuit_t *circ;
  147. union {
  148. cpuworker_request_t request;
  149. cpuworker_reply_t reply;
  150. } u;
  151. } cpuworker_job_t;
  152. static workqueue_reply_t
  153. update_state_threadfn(void *state_, void *work_)
  154. {
  155. worker_state_t *state = state_;
  156. worker_state_t *update = work_;
  157. server_onion_keys_free(state->onion_keys);
  158. state->onion_keys = update->onion_keys;
  159. update->onion_keys = NULL;
  160. worker_state_free(update);
  161. ++state->generation;
  162. return WQ_RPL_REPLY;
  163. }
  164. /** Called when the onion key has changed so update all CPU worker(s) with
  165. * new function pointers with which a new state will be generated.
  166. */
  167. void
  168. cpuworkers_rotate_keyinfo(void)
  169. {
  170. if (!threadpool) {
  171. /* If we're a client, then we won't have cpuworkers, and we won't need
  172. * to tell them to rotate their state.
  173. */
  174. return;
  175. }
  176. if (threadpool_queue_update(threadpool,
  177. worker_state_new,
  178. update_state_threadfn,
  179. worker_state_free,
  180. NULL)) {
  181. log_warn(LD_OR, "Failed to queue key update for worker threads.");
  182. }
  183. }
  184. /** Indexed by handshake type: how many onionskins have we processed and
  185. * counted of that type? */
  186. static uint64_t onionskins_n_processed[MAX_ONION_HANDSHAKE_TYPE+1];
  187. /** Indexed by handshake type, corresponding to the onionskins counted in
  188. * onionskins_n_processed: how many microseconds have we spent in cpuworkers
  189. * processing that kind of onionskin? */
  190. static uint64_t onionskins_usec_internal[MAX_ONION_HANDSHAKE_TYPE+1];
  191. /** Indexed by handshake type, corresponding to onionskins counted in
  192. * onionskins_n_processed: how many microseconds have we spent waiting for
  193. * cpuworkers to give us answers for that kind of onionskin?
  194. */
  195. static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
  196. /** If any onionskin takes longer than this, we clip them to this
  197. * time. (microseconds) */
  198. #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
  199. /** Return true iff we'd like to measure a handshake of type
  200. * <b>onionskin_type</b>. Call only from the main thread. */
  201. static int
  202. should_time_request(uint16_t onionskin_type)
  203. {
  204. /* If we've never heard of this type, we shouldn't even be here. */
  205. if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE)
  206. return 0;
  207. /* Measure the first N handshakes of each type, to ensure we have a
  208. * sample */
  209. if (onionskins_n_processed[onionskin_type] < 4096)
  210. return 1;
  211. /** Otherwise, measure with P=1/128. We avoid doing this for every
  212. * handshake, since the measurement itself can take a little time. */
  213. return tor_weak_random_one_in_n(&request_sample_rng, 128);
  214. }
  215. /** Return an estimate of how many microseconds we will need for a single
  216. * cpuworker to to process <b>n_requests</b> onionskins of type
  217. * <b>onionskin_type</b>. */
  218. uint64_t
  219. estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type)
  220. {
  221. if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
  222. return 1000 * (uint64_t)n_requests;
  223. if (PREDICT_UNLIKELY(onionskins_n_processed[onionskin_type] < 100)) {
  224. /* Until we have 100 data points, just asssume everything takes 1 msec. */
  225. return 1000 * (uint64_t)n_requests;
  226. } else {
  227. /* This can't overflow: we'll never have more than 500000 onionskins
  228. * measured in onionskin_usec_internal, and they won't take anything near
  229. * 1 sec each, and we won't have anything like 1 million queued
  230. * onionskins. But that's 5e5 * 1e6 * 1e6, which is still less than
  231. * UINT64_MAX. */
  232. return (onionskins_usec_internal[onionskin_type] * n_requests) /
  233. onionskins_n_processed[onionskin_type];
  234. }
  235. }
  236. /** Compute the absolute and relative overhead of using the cpuworker
  237. * framework for onionskins of type <b>onionskin_type</b>.*/
  238. static int
  239. get_overhead_for_onionskins(uint32_t *usec_out, double *frac_out,
  240. uint16_t onionskin_type)
  241. {
  242. uint64_t overhead;
  243. *usec_out = 0;
  244. *frac_out = 0.0;
  245. if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
  246. return -1;
  247. if (onionskins_n_processed[onionskin_type] == 0 ||
  248. onionskins_usec_internal[onionskin_type] == 0 ||
  249. onionskins_usec_roundtrip[onionskin_type] == 0)
  250. return -1;
  251. overhead = onionskins_usec_roundtrip[onionskin_type] -
  252. onionskins_usec_internal[onionskin_type];
  253. *usec_out = (uint32_t)(overhead / onionskins_n_processed[onionskin_type]);
  254. *frac_out = U64_TO_DBL(overhead) / onionskins_usec_internal[onionskin_type];
  255. return 0;
  256. }
  257. /** If we've measured overhead for onionskins of type <b>onionskin_type</b>,
  258. * log it. */
  259. void
  260. cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
  261. const char *onionskin_type_name)
  262. {
  263. uint32_t overhead;
  264. double relative_overhead;
  265. int r;
  266. r = get_overhead_for_onionskins(&overhead, &relative_overhead,
  267. onionskin_type);
  268. if (!overhead || r<0)
  269. return;
  270. log_fn(severity, LD_OR,
  271. "%s onionskins have averaged %u usec overhead (%.2f%%) in "
  272. "cpuworker code ",
  273. onionskin_type_name, (unsigned)overhead, relative_overhead*100);
  274. }
  275. /** Handle a reply from the worker threads. */
  276. static void
  277. cpuworker_onion_handshake_replyfn(void *work_)
  278. {
  279. cpuworker_job_t *job = work_;
  280. cpuworker_reply_t rpl;
  281. or_circuit_t *circ = NULL;
  282. tor_assert(total_pending_tasks > 0);
  283. --total_pending_tasks;
  284. /* Could avoid this, but doesn't matter. */
  285. memcpy(&rpl, &job->u.reply, sizeof(rpl));
  286. tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
  287. if (rpl.timed && rpl.success &&
  288. rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) {
  289. /* Time how long this request took. The handshake_type check should be
  290. needless, but let's leave it in to be safe. */
  291. struct timeval tv_end, tv_diff;
  292. int64_t usec_roundtrip;
  293. tor_gettimeofday(&tv_end);
  294. timersub(&tv_end, &rpl.started_at, &tv_diff);
  295. usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
  296. if (usec_roundtrip >= 0 &&
  297. usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) {
  298. ++onionskins_n_processed[rpl.handshake_type];
  299. onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec;
  300. onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip;
  301. if (onionskins_n_processed[rpl.handshake_type] >= 500000) {
  302. /* Scale down every 500000 handshakes. On a busy server, that's
  303. * less impressive than it sounds. */
  304. onionskins_n_processed[rpl.handshake_type] /= 2;
  305. onionskins_usec_internal[rpl.handshake_type] /= 2;
  306. onionskins_usec_roundtrip[rpl.handshake_type] /= 2;
  307. }
  308. }
  309. }
  310. circ = job->circ;
  311. log_debug(LD_OR,
  312. "Unpacking cpuworker reply %p, circ=%p, success=%d",
  313. job, circ, rpl.success);
  314. if (circ->base_.magic == DEAD_CIRCUIT_MAGIC) {
  315. /* The circuit was supposed to get freed while the reply was
  316. * pending. Instead, it got left for us to free so that we wouldn't freak
  317. * out when the job->circ field wound up pointing to nothing. */
  318. log_debug(LD_OR, "Circuit died while reply was pending. Freeing memory.");
  319. circ->base_.magic = 0;
  320. tor_free(circ);
  321. goto done_processing;
  322. }
  323. circ->workqueue_entry = NULL;
  324. if (TO_CIRCUIT(circ)->marked_for_close) {
  325. /* We already marked this circuit; we can't call it open. */
  326. log_debug(LD_OR,"circuit is already marked.");
  327. goto done_processing;
  328. }
  329. if (rpl.success == 0) {
  330. log_debug(LD_OR,
  331. "decoding onionskin failed. "
  332. "(Old key or bad software.) Closing.");
  333. circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_TORPROTOCOL);
  334. goto done_processing;
  335. }
  336. if (onionskin_answer(circ,
  337. &rpl.created_cell,
  338. (const char*)rpl.keys,
  339. rpl.rend_auth_material) < 0) {
  340. log_warn(LD_OR,"onionskin_answer failed. Closing.");
  341. circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_INTERNAL);
  342. goto done_processing;
  343. }
  344. log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
  345. done_processing:
  346. memwipe(&rpl, 0, sizeof(rpl));
  347. memwipe(job, 0, sizeof(*job));
  348. tor_free(job);
  349. queue_pending_tasks();
  350. }
  351. /** Implementation function for onion handshake requests. */
  352. static workqueue_reply_t
  353. cpuworker_onion_handshake_threadfn(void *state_, void *work_)
  354. {
  355. worker_state_t *state = state_;
  356. cpuworker_job_t *job = work_;
  357. /* variables for onion processing */
  358. server_onion_keys_t *onion_keys = state->onion_keys;
  359. cpuworker_request_t req;
  360. cpuworker_reply_t rpl;
  361. memcpy(&req, &job->u.request, sizeof(req));
  362. tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
  363. memset(&rpl, 0, sizeof(rpl));
  364. const create_cell_t *cc = &req.create_cell;
  365. created_cell_t *cell_out = &rpl.created_cell;
  366. struct timeval tv_start = {0,0}, tv_end;
  367. int n;
  368. rpl.timed = req.timed;
  369. rpl.started_at = req.started_at;
  370. rpl.handshake_type = cc->handshake_type;
  371. if (req.timed)
  372. tor_gettimeofday(&tv_start);
  373. n = onion_skin_server_handshake(cc->handshake_type,
  374. cc->onionskin, cc->handshake_len,
  375. onion_keys,
  376. cell_out->reply,
  377. rpl.keys, CPATH_KEY_MATERIAL_LEN,
  378. rpl.rend_auth_material);
  379. if (n < 0) {
  380. /* failure */
  381. log_debug(LD_OR,"onion_skin_server_handshake failed.");
  382. memset(&rpl, 0, sizeof(rpl));
  383. rpl.success = 0;
  384. } else {
  385. /* success */
  386. log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
  387. cell_out->handshake_len = n;
  388. switch (cc->cell_type) {
  389. case CELL_CREATE:
  390. cell_out->cell_type = CELL_CREATED; break;
  391. case CELL_CREATE2:
  392. cell_out->cell_type = CELL_CREATED2; break;
  393. case CELL_CREATE_FAST:
  394. cell_out->cell_type = CELL_CREATED_FAST; break;
  395. default:
  396. tor_assert(0);
  397. return WQ_RPL_SHUTDOWN;
  398. }
  399. rpl.success = 1;
  400. }
  401. rpl.magic = CPUWORKER_REPLY_MAGIC;
  402. if (req.timed) {
  403. struct timeval tv_diff;
  404. int64_t usec;
  405. tor_gettimeofday(&tv_end);
  406. timersub(&tv_end, &tv_start, &tv_diff);
  407. usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
  408. if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY)
  409. rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY;
  410. else
  411. rpl.n_usec = (uint32_t) usec;
  412. }
  413. memcpy(&job->u.reply, &rpl, sizeof(rpl));
  414. memwipe(&req, 0, sizeof(req));
  415. memwipe(&rpl, 0, sizeof(req));
  416. return WQ_RPL_REPLY;
  417. }
  418. /** Take pending tasks from the queue and assign them to cpuworkers. */
  419. static void
  420. queue_pending_tasks(void)
  421. {
  422. or_circuit_t *circ;
  423. create_cell_t *onionskin = NULL;
  424. while (total_pending_tasks < max_pending_tasks) {
  425. circ = onion_next_task(&onionskin);
  426. if (!circ)
  427. return;
  428. if (assign_onionskin_to_cpuworker(circ, onionskin))
  429. log_info(LD_OR,"assign_to_cpuworker failed. Ignoring.");
  430. }
  431. }
  432. /** DOCDOC */
  433. MOCK_IMPL(workqueue_entry_t *,
  434. cpuworker_queue_work,(workqueue_priority_t priority,
  435. workqueue_reply_t (*fn)(void *, void *),
  436. void (*reply_fn)(void *),
  437. void *arg))
  438. {
  439. tor_assert(threadpool);
  440. return threadpool_queue_work_priority(threadpool,
  441. priority,
  442. fn,
  443. reply_fn,
  444. arg);
  445. }
  446. /** Try to tell a cpuworker to perform the public key operations necessary to
  447. * respond to <b>onionskin</b> for the circuit <b>circ</b>.
  448. *
  449. * Return 0 if we successfully assign the task, or -1 on failure.
  450. */
  451. int
  452. assign_onionskin_to_cpuworker(or_circuit_t *circ,
  453. create_cell_t *onionskin)
  454. {
  455. workqueue_entry_t *queue_entry;
  456. cpuworker_job_t *job;
  457. cpuworker_request_t req;
  458. int should_time;
  459. tor_assert(threadpool);
  460. if (!circ->p_chan) {
  461. log_info(LD_OR,"circ->p_chan gone. Failing circ.");
  462. tor_free(onionskin);
  463. return -1;
  464. }
  465. if (total_pending_tasks >= max_pending_tasks) {
  466. log_debug(LD_OR,"No idle cpuworkers. Queuing.");
  467. if (onion_pending_add(circ, onionskin) < 0) {
  468. tor_free(onionskin);
  469. return -1;
  470. }
  471. return 0;
  472. }
  473. if (connection_or_digest_is_known_relay(circ->p_chan->identity_digest))
  474. rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
  475. should_time = should_time_request(onionskin->handshake_type);
  476. memset(&req, 0, sizeof(req));
  477. req.magic = CPUWORKER_REQUEST_MAGIC;
  478. req.timed = should_time;
  479. memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
  480. tor_free(onionskin);
  481. if (should_time)
  482. tor_gettimeofday(&req.started_at);
  483. job = tor_malloc_zero(sizeof(cpuworker_job_t));
  484. job->circ = circ;
  485. memcpy(&job->u.request, &req, sizeof(req));
  486. memwipe(&req, 0, sizeof(req));
  487. ++total_pending_tasks;
  488. queue_entry = threadpool_queue_work_priority(threadpool,
  489. WQ_PRI_HIGH,
  490. cpuworker_onion_handshake_threadfn,
  491. cpuworker_onion_handshake_replyfn,
  492. job);
  493. if (!queue_entry) {
  494. log_warn(LD_BUG, "Couldn't queue work on threadpool");
  495. tor_free(job);
  496. return -1;
  497. }
  498. log_debug(LD_OR, "Queued task %p (qe=%p, circ=%p)",
  499. job, queue_entry, job->circ);
  500. circ->workqueue_entry = queue_entry;
  501. return 0;
  502. }
  503. /** If <b>circ</b> has a pending handshake that hasn't been processed yet,
  504. * remove it from the worker queue. */
  505. void
  506. cpuworker_cancel_circ_handshake(or_circuit_t *circ)
  507. {
  508. cpuworker_job_t *job;
  509. if (circ->workqueue_entry == NULL)
  510. return;
  511. job = workqueue_entry_cancel(circ->workqueue_entry);
  512. if (job) {
  513. /* It successfully cancelled. */
  514. memwipe(job, 0xe0, sizeof(*job));
  515. tor_free(job);
  516. tor_assert(total_pending_tasks > 0);
  517. --total_pending_tasks;
  518. /* if (!job), this is done in cpuworker_onion_handshake_replyfn. */
  519. circ->workqueue_entry = NULL;
  520. }
  521. }