cpuworker.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575
  1. /* Copyright (c) 2003-2004, Roger Dingledine.
  2. * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
  3. * Copyright (c) 2007-2015, The Tor Project, Inc. */
  4. /* See LICENSE for licensing information */
  5. /**
  6. * \file cpuworker.c
  7. * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
  8. * out to subprocesses.
  9. *
  10. * Right now, we only use this for processing onionskins.
  11. **/
  12. #include "or.h"
  13. #include "channel.h"
  14. #include "circuitbuild.h"
  15. #include "circuitlist.h"
  16. #include "connection_or.h"
  17. #include "config.h"
  18. #include "cpuworker.h"
  19. #include "main.h"
  20. #include "onion.h"
  21. #include "rephist.h"
  22. #include "router.h"
  23. #include "workqueue.h"
  24. #ifdef HAVE_EVENT2_EVENT_H
  25. #include <event2/event.h>
  26. #else
  27. #include <event.h>
  28. #endif
  29. static void queue_pending_tasks(void);
  30. typedef struct worker_state_s {
  31. int generation;
  32. server_onion_keys_t *onion_keys;
  33. } worker_state_t;
  34. static void *
  35. worker_state_new(void *arg)
  36. {
  37. worker_state_t *ws;
  38. (void)arg;
  39. ws = tor_malloc_zero(sizeof(worker_state_t));
  40. ws->onion_keys = server_onion_keys_new();
  41. return ws;
  42. }
  43. static void
  44. worker_state_free(void *arg)
  45. {
  46. worker_state_t *ws = arg;
  47. server_onion_keys_free(ws->onion_keys);
  48. tor_free(ws);
  49. }
  50. static replyqueue_t *replyqueue = NULL;
  51. static threadpool_t *threadpool = NULL;
  52. static struct event *reply_event = NULL;
  53. static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
  54. static int total_pending_tasks = 0;
  55. static int max_pending_tasks = 128;
  56. static void
  57. replyqueue_process_cb(evutil_socket_t sock, short events, void *arg)
  58. {
  59. replyqueue_t *rq = arg;
  60. (void) sock;
  61. (void) events;
  62. replyqueue_process(rq);
  63. }
  64. /** Initialize the cpuworker subsystem.
  65. */
  66. void
  67. cpu_init(void)
  68. {
  69. if (!replyqueue) {
  70. replyqueue = replyqueue_new(0);
  71. }
  72. if (!reply_event) {
  73. reply_event = tor_event_new(tor_libevent_get_base(),
  74. replyqueue_get_socket(replyqueue),
  75. EV_READ|EV_PERSIST,
  76. replyqueue_process_cb,
  77. replyqueue);
  78. event_add(reply_event, NULL);
  79. }
  80. if (!threadpool) {
  81. threadpool = threadpool_new(get_num_cpus(get_options()),
  82. replyqueue,
  83. worker_state_new,
  84. worker_state_free,
  85. NULL);
  86. }
  87. /* Total voodoo. Can we make this more sensible? */
  88. max_pending_tasks = get_num_cpus(get_options()) * 64;
  89. crypto_seed_weak_rng(&request_sample_rng);
  90. }
  91. /** Magic numbers to make sure our cpuworker_requests don't grow any
  92. * mis-framing bugs. */
  93. #define CPUWORKER_REQUEST_MAGIC 0xda4afeed
  94. #define CPUWORKER_REPLY_MAGIC 0x5eedf00d
  95. /** A request sent to a cpuworker. */
  96. typedef struct cpuworker_request_t {
  97. /** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
  98. uint32_t magic;
  99. /** Flag: Are we timing this request? */
  100. unsigned timed : 1;
  101. /** If we're timing this request, when was it sent to the cpuworker? */
  102. struct timeval started_at;
  103. /** A create cell for the cpuworker to process. */
  104. create_cell_t create_cell;
  105. /* Turn the above into a tagged union if needed. */
  106. } cpuworker_request_t;
  107. /** A reply sent by a cpuworker. */
  108. typedef struct cpuworker_reply_t {
  109. /** Magic number; must be CPUWORKER_REPLY_MAGIC. */
  110. uint32_t magic;
  111. /** True iff we got a successful request. */
  112. uint8_t success;
  113. /** Are we timing this request? */
  114. unsigned int timed : 1;
  115. /** What handshake type was the request? (Used for timing) */
  116. uint16_t handshake_type;
  117. /** When did we send the request to the cpuworker? */
  118. struct timeval started_at;
  119. /** Once the cpuworker received the request, how many microseconds did it
  120. * take? (This shouldn't overflow; 4 billion micoseconds is over an hour,
  121. * and we'll never have an onion handshake that takes so long.) */
  122. uint32_t n_usec;
  123. /** Output of processing a create cell
  124. *
  125. * @{
  126. */
  127. /** The created cell to send back. */
  128. created_cell_t created_cell;
  129. /** The keys to use on this circuit. */
  130. uint8_t keys[CPATH_KEY_MATERIAL_LEN];
  131. /** Input to use for authenticating introduce1 cells. */
  132. uint8_t rend_auth_material[DIGEST_LEN];
  133. } cpuworker_reply_t;
  134. typedef struct cpuworker_job_u {
  135. uint64_t chan_id;
  136. uint32_t circ_id;
  137. union {
  138. cpuworker_request_t request;
  139. cpuworker_reply_t reply;
  140. } u;
  141. } cpuworker_job_t;
  142. static int
  143. update_state_threadfn(void *state_, void *work_)
  144. {
  145. worker_state_t *state = state_;
  146. worker_state_t *update = work_;
  147. server_onion_keys_free(state->onion_keys);
  148. state->onion_keys = update->onion_keys;
  149. update->onion_keys = NULL;
  150. ++state->generation;
  151. return WQ_RPL_REPLY;
  152. }
  153. static void
  154. update_state_replyfn(void *work_)
  155. {
  156. tor_free(work_);
  157. }
  158. /** Called when the onion key has changed and we need to spawn new
  159. * cpuworkers. Close all currently idle cpuworkers, and mark the last
  160. * rotation time as now.
  161. */
  162. void
  163. cpuworkers_rotate_keyinfo(void)
  164. {
  165. if (threadpool_queue_for_all(threadpool,
  166. worker_state_new,
  167. update_state_threadfn,
  168. update_state_replyfn,
  169. NULL)) {
  170. log_warn(LD_OR, "Failed to queue key update for worker threads.");
  171. }
  172. }
  173. /** Indexed by handshake type: how many onionskins have we processed and
  174. * counted of that type? */
  175. static uint64_t onionskins_n_processed[MAX_ONION_HANDSHAKE_TYPE+1];
  176. /** Indexed by handshake type, corresponding to the onionskins counted in
  177. * onionskins_n_processed: how many microseconds have we spent in cpuworkers
  178. * processing that kind of onionskin? */
  179. static uint64_t onionskins_usec_internal[MAX_ONION_HANDSHAKE_TYPE+1];
  180. /** Indexed by handshake type, corresponding to onionskins counted in
  181. * onionskins_n_processed: how many microseconds have we spent waiting for
  182. * cpuworkers to give us answers for that kind of onionskin?
  183. */
  184. static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
  185. /** If any onionskin takes longer than this, we clip them to this
  186. * time. (microseconds) */
  187. #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
  188. /** Return true iff we'd like to measure a handshake of type
  189. * <b>onionskin_type</b>. Call only from the main thread. */
  190. static int
  191. should_time_request(uint16_t onionskin_type)
  192. {
  193. /* If we've never heard of this type, we shouldn't even be here. */
  194. if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE)
  195. return 0;
  196. /* Measure the first N handshakes of each type, to ensure we have a
  197. * sample */
  198. if (onionskins_n_processed[onionskin_type] < 4096)
  199. return 1;
  200. /** Otherwise, measure with P=1/128. We avoid doing this for every
  201. * handshake, since the measurement itself can take a little time. */
  202. return tor_weak_random_one_in_n(&request_sample_rng, 128);
  203. }
  204. /** Return an estimate of how many microseconds we will need for a single
  205. * cpuworker to to process <b>n_requests</b> onionskins of type
  206. * <b>onionskin_type</b>. */
  207. uint64_t
  208. estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type)
  209. {
  210. if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
  211. return 1000 * (uint64_t)n_requests;
  212. if (PREDICT_UNLIKELY(onionskins_n_processed[onionskin_type] < 100)) {
  213. /* Until we have 100 data points, just asssume everything takes 1 msec. */
  214. return 1000 * (uint64_t)n_requests;
  215. } else {
  216. /* This can't overflow: we'll never have more than 500000 onionskins
  217. * measured in onionskin_usec_internal, and they won't take anything near
  218. * 1 sec each, and we won't have anything like 1 million queued
  219. * onionskins. But that's 5e5 * 1e6 * 1e6, which is still less than
  220. * UINT64_MAX. */
  221. return (onionskins_usec_internal[onionskin_type] * n_requests) /
  222. onionskins_n_processed[onionskin_type];
  223. }
  224. }
  225. /** Compute the absolute and relative overhead of using the cpuworker
  226. * framework for onionskins of type <b>onionskin_type</b>.*/
  227. static int
  228. get_overhead_for_onionskins(uint32_t *usec_out, double *frac_out,
  229. uint16_t onionskin_type)
  230. {
  231. uint64_t overhead;
  232. *usec_out = 0;
  233. *frac_out = 0.0;
  234. if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
  235. return -1;
  236. if (onionskins_n_processed[onionskin_type] == 0 ||
  237. onionskins_usec_internal[onionskin_type] == 0 ||
  238. onionskins_usec_roundtrip[onionskin_type] == 0)
  239. return -1;
  240. overhead = onionskins_usec_roundtrip[onionskin_type] -
  241. onionskins_usec_internal[onionskin_type];
  242. *usec_out = (uint32_t)(overhead / onionskins_n_processed[onionskin_type]);
  243. *frac_out = U64_TO_DBL(overhead) / onionskins_usec_internal[onionskin_type];
  244. return 0;
  245. }
  246. /** If we've measured overhead for onionskins of type <b>onionskin_type</b>,
  247. * log it. */
  248. void
  249. cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
  250. const char *onionskin_type_name)
  251. {
  252. uint32_t overhead;
  253. double relative_overhead;
  254. int r;
  255. r = get_overhead_for_onionskins(&overhead, &relative_overhead,
  256. onionskin_type);
  257. if (!overhead || r<0)
  258. return;
  259. log_fn(severity, LD_OR,
  260. "%s onionskins have averaged %u usec overhead (%.2f%%) in "
  261. "cpuworker code ",
  262. onionskin_type_name, (unsigned)overhead, relative_overhead*100);
  263. }
  264. /** */
  265. static void
  266. cpuworker_onion_handshake_replyfn(void *work_)
  267. {
  268. cpuworker_job_t *job = work_;
  269. cpuworker_reply_t rpl;
  270. uint64_t chan_id;
  271. circid_t circ_id;
  272. channel_t *p_chan = NULL;
  273. circuit_t *circ = NULL;
  274. --total_pending_tasks;
  275. if (1) {
  276. /* Could avoid this, but doesn't matter. */
  277. memcpy(&rpl, &job->u.reply, sizeof(rpl));
  278. tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
  279. if (rpl.timed && rpl.success &&
  280. rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) {
  281. /* Time how long this request took. The handshake_type check should be
  282. needless, but let's leave it in to be safe. */
  283. struct timeval tv_end, tv_diff;
  284. int64_t usec_roundtrip;
  285. tor_gettimeofday(&tv_end);
  286. timersub(&tv_end, &rpl.started_at, &tv_diff);
  287. usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
  288. if (usec_roundtrip >= 0 &&
  289. usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) {
  290. ++onionskins_n_processed[rpl.handshake_type];
  291. onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec;
  292. onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip;
  293. if (onionskins_n_processed[rpl.handshake_type] >= 500000) {
  294. /* Scale down every 500000 handshakes. On a busy server, that's
  295. * less impressive than it sounds. */
  296. onionskins_n_processed[rpl.handshake_type] /= 2;
  297. onionskins_usec_internal[rpl.handshake_type] /= 2;
  298. onionskins_usec_roundtrip[rpl.handshake_type] /= 2;
  299. }
  300. }
  301. }
  302. /* Find the circ it was talking about */
  303. chan_id = job->chan_id;
  304. circ_id = job->circ_id;
  305. p_chan = channel_find_by_global_id(chan_id);
  306. if (p_chan)
  307. circ = circuit_get_by_circid_channel(circ_id, p_chan);
  308. log_debug(LD_OR,
  309. "Unpacking cpuworker reply %p, chan_id is " U64_FORMAT
  310. ", circ_id is %u, p_chan=%p, circ=%p, success=%d",
  311. job, U64_PRINTF_ARG(chan_id), (unsigned)circ_id,
  312. p_chan, circ, rpl.success);
  313. if (rpl.success == 0) {
  314. log_debug(LD_OR,
  315. "decoding onionskin failed. "
  316. "(Old key or bad software.) Closing.");
  317. if (circ)
  318. circuit_mark_for_close(circ, END_CIRC_REASON_TORPROTOCOL);
  319. goto done_processing;
  320. }
  321. if (!circ) {
  322. /* This happens because somebody sends us a destroy cell and the
  323. * circuit goes away, while the cpuworker is working. This is also
  324. * why our tag doesn't include a pointer to the circ, because we'd
  325. * never know if it's still valid.
  326. */
  327. log_debug(LD_OR,"processed onion for a circ that's gone. Dropping.");
  328. goto done_processing;
  329. }
  330. tor_assert(! CIRCUIT_IS_ORIGIN(circ));
  331. TO_OR_CIRCUIT(circ)->workqueue_entry = NULL;
  332. if (onionskin_answer(TO_OR_CIRCUIT(circ),
  333. &rpl.created_cell,
  334. (const char*)rpl.keys,
  335. rpl.rend_auth_material) < 0) {
  336. log_warn(LD_OR,"onionskin_answer failed. Closing.");
  337. circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL);
  338. goto done_processing;
  339. }
  340. log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
  341. }
  342. done_processing:
  343. memwipe(&rpl, 0, sizeof(rpl));
  344. memwipe(job, 0, sizeof(*job));
  345. tor_free(job);
  346. queue_pending_tasks();
  347. }
  348. /** Implementation function for onion handshake requests. */
  349. static int
  350. cpuworker_onion_handshake_threadfn(void *state_, void *work_)
  351. {
  352. worker_state_t *state = state_;
  353. cpuworker_job_t *job = work_;
  354. /* variables for onion processing */
  355. server_onion_keys_t *onion_keys = state->onion_keys;
  356. cpuworker_request_t req;
  357. cpuworker_reply_t rpl;
  358. memcpy(&req, &job->u.request, sizeof(req));
  359. tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
  360. memset(&rpl, 0, sizeof(rpl));
  361. if (1) {
  362. const create_cell_t *cc = &req.create_cell;
  363. created_cell_t *cell_out = &rpl.created_cell;
  364. struct timeval tv_start = {0,0}, tv_end;
  365. int n;
  366. rpl.timed = req.timed;
  367. rpl.started_at = req.started_at;
  368. rpl.handshake_type = cc->handshake_type;
  369. if (req.timed)
  370. tor_gettimeofday(&tv_start);
  371. n = onion_skin_server_handshake(cc->handshake_type,
  372. cc->onionskin, cc->handshake_len,
  373. onion_keys,
  374. cell_out->reply,
  375. rpl.keys, CPATH_KEY_MATERIAL_LEN,
  376. rpl.rend_auth_material);
  377. if (n < 0) {
  378. /* failure */
  379. log_debug(LD_OR,"onion_skin_server_handshake failed.");
  380. memset(&rpl, 0, sizeof(rpl));
  381. rpl.success = 0;
  382. } else {
  383. /* success */
  384. log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
  385. cell_out->handshake_len = n;
  386. switch (cc->cell_type) {
  387. case CELL_CREATE:
  388. cell_out->cell_type = CELL_CREATED; break;
  389. case CELL_CREATE2:
  390. cell_out->cell_type = CELL_CREATED2; break;
  391. case CELL_CREATE_FAST:
  392. cell_out->cell_type = CELL_CREATED_FAST; break;
  393. default:
  394. tor_assert(0);
  395. return WQ_RPL_SHUTDOWN;
  396. }
  397. rpl.success = 1;
  398. }
  399. rpl.magic = CPUWORKER_REPLY_MAGIC;
  400. if (req.timed) {
  401. struct timeval tv_diff;
  402. int64_t usec;
  403. tor_gettimeofday(&tv_end);
  404. timersub(&tv_end, &tv_start, &tv_diff);
  405. usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
  406. if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY)
  407. rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY;
  408. else
  409. rpl.n_usec = (uint32_t) usec;
  410. }
  411. }
  412. memcpy(&job->u.reply, &rpl, sizeof(rpl));
  413. memwipe(&req, 0, sizeof(req));
  414. memwipe(&rpl, 0, sizeof(req));
  415. return WQ_RPL_REPLY;
  416. }
  417. /** Take pending tasks from the queue and assign them to cpuworkers. */
  418. static void
  419. queue_pending_tasks(void)
  420. {
  421. or_circuit_t *circ;
  422. create_cell_t *onionskin = NULL;
  423. while (total_pending_tasks < max_pending_tasks) {
  424. circ = onion_next_task(&onionskin);
  425. if (!circ)
  426. return;
  427. if (assign_onionskin_to_cpuworker(circ, onionskin))
  428. log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring.");
  429. }
  430. }
  431. /** Try to tell a cpuworker to perform the public key operations necessary to
  432. * respond to <b>onionskin</b> for the circuit <b>circ</b>.
  433. *
  434. * Return 0 if we successfully assign the task, or -1 on failure.
  435. */
  436. int
  437. assign_onionskin_to_cpuworker(or_circuit_t *circ,
  438. create_cell_t *onionskin)
  439. {
  440. workqueue_entry_t *queue_entry;
  441. cpuworker_job_t *job;
  442. cpuworker_request_t req;
  443. int should_time;
  444. if (1) {
  445. if (!circ->p_chan) {
  446. log_info(LD_OR,"circ->p_chan gone. Failing circ.");
  447. tor_free(onionskin);
  448. return -1;
  449. }
  450. if (total_pending_tasks >= max_pending_tasks) {
  451. log_debug(LD_OR,"No idle cpuworkers. Queuing.");
  452. if (onion_pending_add(circ, onionskin) < 0) {
  453. tor_free(onionskin);
  454. return -1;
  455. }
  456. return 0;
  457. }
  458. if (connection_or_digest_is_known_relay(circ->p_chan->identity_digest))
  459. rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
  460. should_time = should_time_request(onionskin->handshake_type);
  461. memset(&req, 0, sizeof(req));
  462. req.magic = CPUWORKER_REQUEST_MAGIC;
  463. req.timed = should_time;
  464. memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
  465. tor_free(onionskin);
  466. if (should_time)
  467. tor_gettimeofday(&req.started_at);
  468. job = tor_malloc_zero(sizeof(cpuworker_job_t));
  469. job->chan_id = circ->p_chan->global_identifier;
  470. job->circ_id = circ->p_circ_id;
  471. memcpy(&job->u.request, &req, sizeof(req));
  472. memwipe(&req, 0, sizeof(req));
  473. ++total_pending_tasks;
  474. queue_entry = threadpool_queue_work(threadpool,
  475. cpuworker_onion_handshake_threadfn,
  476. cpuworker_onion_handshake_replyfn,
  477. job);
  478. if (!queue_entry) {
  479. log_warn(LD_BUG, "Couldn't queue work on threadpool");
  480. tor_free(job);
  481. return -1;
  482. }
  483. log_debug(LD_OR, "Queued task %p (qe=%p, chanid="U64_FORMAT", circid=%u)",
  484. job, queue_entry, U64_PRINTF_ARG(job->chan_id), job->circ_id);
  485. circ->workqueue_entry = queue_entry;
  486. }
  487. return 0;
  488. }
  489. /** If <b>circ</b> has a pending handshake that hasn't been processed yet,
  490. * remove it from the worker queue. */
  491. void
  492. cpuworker_cancel_circ_handshake(or_circuit_t *circ)
  493. {
  494. cpuworker_job_t *job;
  495. if (circ->workqueue_entry == NULL)
  496. return;
  497. job = workqueue_entry_cancel(circ->workqueue_entry);
  498. if (job) {
  499. /* It successfully cancelled. */
  500. memwipe(job, 0xe0, sizeof(*job));
  501. tor_free(job);
  502. }
  503. circ->workqueue_entry = NULL;
  504. }