| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586 | 
							- /* Copyright (c) 2003-2004, Roger Dingledine.
 
-  * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
 
-  * Copyright (c) 2007-2017, The Tor Project, Inc. */
 
- /* See LICENSE for licensing information */
 
- /**
 
-  * \file cpuworker.c
 
-  * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
 
-  * out to subprocesses.
 
-  *
 
-  * The multithreading backend for this module is in workqueue.c; this module
 
-  * specializes workqueue.c.
 
-  *
 
-  * Right now, we only use this for processing onionskins, and invoke it mostly
 
-  * from onion.c.
 
-  **/
 
- #include "or.h"
 
- #include "channel.h"
 
- #include "circuitbuild.h"
 
- #include "circuitlist.h"
 
- #include "connection_or.h"
 
- #include "config.h"
 
- #include "cpuworker.h"
 
- #include "main.h"
 
- #include "onion.h"
 
- #include "rephist.h"
 
- #include "router.h"
 
- #include "workqueue.h"
 
- #include <event2/event.h>
 
- static void queue_pending_tasks(void);
 
- typedef struct worker_state_s {
 
-   int generation;
 
-   server_onion_keys_t *onion_keys;
 
- } worker_state_t;
 
- static void *
 
- worker_state_new(void *arg)
 
- {
 
-   worker_state_t *ws;
 
-   (void)arg;
 
-   ws = tor_malloc_zero(sizeof(worker_state_t));
 
-   ws->onion_keys = server_onion_keys_new();
 
-   return ws;
 
- }
 
- static void
 
- worker_state_free(void *arg)
 
- {
 
-   worker_state_t *ws = arg;
 
-   server_onion_keys_free(ws->onion_keys);
 
-   tor_free(ws);
 
- }
 
- static replyqueue_t *replyqueue = NULL;
 
- static threadpool_t *threadpool = NULL;
 
- static struct event *reply_event = NULL;
 
- static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
 
- static int total_pending_tasks = 0;
 
- static int max_pending_tasks = 128;
 
- static void
 
- replyqueue_process_cb(evutil_socket_t sock, short events, void *arg)
 
- {
 
-   replyqueue_t *rq = arg;
 
-   (void) sock;
 
-   (void) events;
 
-   replyqueue_process(rq);
 
- }
 
- /** Initialize the cpuworker subsystem. It is OK to call this more than once
 
-  * during Tor's lifetime.
 
-  */
 
- void
 
- cpu_init(void)
 
- {
 
-   if (!replyqueue) {
 
-     replyqueue = replyqueue_new(0);
 
-   }
 
-   if (!reply_event) {
 
-     reply_event = tor_event_new(tor_libevent_get_base(),
 
-                                 replyqueue_get_socket(replyqueue),
 
-                                 EV_READ|EV_PERSIST,
 
-                                 replyqueue_process_cb,
 
-                                 replyqueue);
 
-     event_add(reply_event, NULL);
 
-   }
 
-   if (!threadpool) {
 
-     threadpool = threadpool_new(get_num_cpus(get_options()),
 
-                                 replyqueue,
 
-                                 worker_state_new,
 
-                                 worker_state_free,
 
-                                 NULL);
 
-   }
 
-   /* Total voodoo. Can we make this more sensible? */
 
-   max_pending_tasks = get_num_cpus(get_options()) * 64;
 
-   crypto_seed_weak_rng(&request_sample_rng);
 
- }
 
- /** Magic numbers to make sure our cpuworker_requests don't grow any
 
-  * mis-framing bugs. */
 
- #define CPUWORKER_REQUEST_MAGIC 0xda4afeed
 
- #define CPUWORKER_REPLY_MAGIC 0x5eedf00d
 
- /** A request sent to a cpuworker. */
 
- typedef struct cpuworker_request_t {
 
-   /** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
 
-   uint32_t magic;
 
-   /** Flag: Are we timing this request? */
 
-   unsigned timed : 1;
 
-   /** If we're timing this request, when was it sent to the cpuworker? */
 
-   struct timeval started_at;
 
-   /** A create cell for the cpuworker to process. */
 
-   create_cell_t create_cell;
 
-   /* Turn the above into a tagged union if needed. */
 
- } cpuworker_request_t;
 
- /** A reply sent by a cpuworker. */
 
- typedef struct cpuworker_reply_t {
 
-   /** Magic number; must be CPUWORKER_REPLY_MAGIC. */
 
-   uint32_t magic;
 
-   /** True iff we got a successful request. */
 
-   uint8_t success;
 
-   /** Are we timing this request? */
 
-   unsigned int timed : 1;
 
-   /** What handshake type was the request? (Used for timing) */
 
-   uint16_t handshake_type;
 
-   /** When did we send the request to the cpuworker? */
 
-   struct timeval started_at;
 
-   /** Once the cpuworker received the request, how many microseconds did it
 
-    * take? (This shouldn't overflow; 4 billion micoseconds is over an hour,
 
-    * and we'll never have an onion handshake that takes so long.) */
 
-   uint32_t n_usec;
 
-   /** Output of processing a create cell
 
-    *
 
-    * @{
 
-    */
 
-   /** The created cell to send back. */
 
-   created_cell_t created_cell;
 
-   /** The keys to use on this circuit. */
 
-   uint8_t keys[CPATH_KEY_MATERIAL_LEN];
 
-   /** Input to use for authenticating introduce1 cells. */
 
-   uint8_t rend_auth_material[DIGEST_LEN];
 
- } cpuworker_reply_t;
 
- typedef struct cpuworker_job_u {
 
-   or_circuit_t *circ;
 
-   union {
 
-     cpuworker_request_t request;
 
-     cpuworker_reply_t reply;
 
-   } u;
 
- } cpuworker_job_t;
 
- static workqueue_reply_t
 
- update_state_threadfn(void *state_, void *work_)
 
- {
 
-   worker_state_t *state = state_;
 
-   worker_state_t *update = work_;
 
-   server_onion_keys_free(state->onion_keys);
 
-   state->onion_keys = update->onion_keys;
 
-   update->onion_keys = NULL;
 
-   worker_state_free(update);
 
-   ++state->generation;
 
-   return WQ_RPL_REPLY;
 
- }
 
- /** Called when the onion key has changed so update all CPU worker(s) with
 
-  * new function pointers with which a new state will be generated.
 
-  */
 
- void
 
- cpuworkers_rotate_keyinfo(void)
 
- {
 
-   if (!threadpool) {
 
-     /* If we're a client, then we won't have cpuworkers, and we won't need
 
-      * to tell them to rotate their state.
 
-      */
 
-     return;
 
-   }
 
-   if (threadpool_queue_update(threadpool,
 
-                               worker_state_new,
 
-                               update_state_threadfn,
 
-                               worker_state_free,
 
-                               NULL)) {
 
-     log_warn(LD_OR, "Failed to queue key update for worker threads.");
 
-   }
 
- }
 
- /** Indexed by handshake type: how many onionskins have we processed and
 
-  * counted of that type? */
 
- static uint64_t onionskins_n_processed[MAX_ONION_HANDSHAKE_TYPE+1];
 
- /** Indexed by handshake type, corresponding to the onionskins counted in
 
-  * onionskins_n_processed: how many microseconds have we spent in cpuworkers
 
-  * processing that kind of onionskin? */
 
- static uint64_t onionskins_usec_internal[MAX_ONION_HANDSHAKE_TYPE+1];
 
- /** Indexed by handshake type, corresponding to onionskins counted in
 
-  * onionskins_n_processed: how many microseconds have we spent waiting for
 
-  * cpuworkers to give us answers for that kind of onionskin?
 
-  */
 
- static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
 
- /** If any onionskin takes longer than this, we clip them to this
 
-  * time. (microseconds) */
 
- #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
 
- /** Return true iff we'd like to measure a handshake of type
 
-  * <b>onionskin_type</b>. Call only from the main thread. */
 
- static int
 
- should_time_request(uint16_t onionskin_type)
 
- {
 
-   /* If we've never heard of this type, we shouldn't even be here. */
 
-   if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE)
 
-     return 0;
 
-   /* Measure the first N handshakes of each type, to ensure we have a
 
-    * sample */
 
-   if (onionskins_n_processed[onionskin_type] < 4096)
 
-     return 1;
 
-   /** Otherwise, measure with P=1/128.  We avoid doing this for every
 
-    * handshake, since the measurement itself can take a little time. */
 
-   return tor_weak_random_one_in_n(&request_sample_rng, 128);
 
- }
 
- /** Return an estimate of how many microseconds we will need for a single
 
-  * cpuworker to to process <b>n_requests</b> onionskins of type
 
-  * <b>onionskin_type</b>. */
 
- uint64_t
 
- estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type)
 
- {
 
-   if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
 
-     return 1000 * (uint64_t)n_requests;
 
-   if (PREDICT_UNLIKELY(onionskins_n_processed[onionskin_type] < 100)) {
 
-     /* Until we have 100 data points, just asssume everything takes 1 msec. */
 
-     return 1000 * (uint64_t)n_requests;
 
-   } else {
 
-     /* This can't overflow: we'll never have more than 500000 onionskins
 
-      * measured in onionskin_usec_internal, and they won't take anything near
 
-      * 1 sec each, and we won't have anything like 1 million queued
 
-      * onionskins.  But that's 5e5 * 1e6 * 1e6, which is still less than
 
-      * UINT64_MAX. */
 
-     return (onionskins_usec_internal[onionskin_type] * n_requests) /
 
-       onionskins_n_processed[onionskin_type];
 
-   }
 
- }
 
- /** Compute the absolute and relative overhead of using the cpuworker
 
-  * framework for onionskins of type <b>onionskin_type</b>.*/
 
- static int
 
- get_overhead_for_onionskins(uint32_t *usec_out, double *frac_out,
 
-                             uint16_t onionskin_type)
 
- {
 
-   uint64_t overhead;
 
-   *usec_out = 0;
 
-   *frac_out = 0.0;
 
-   if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
 
-     return -1;
 
-   if (onionskins_n_processed[onionskin_type] == 0 ||
 
-       onionskins_usec_internal[onionskin_type] == 0 ||
 
-       onionskins_usec_roundtrip[onionskin_type] == 0)
 
-     return -1;
 
-   overhead = onionskins_usec_roundtrip[onionskin_type] -
 
-     onionskins_usec_internal[onionskin_type];
 
-   *usec_out = (uint32_t)(overhead / onionskins_n_processed[onionskin_type]);
 
-   *frac_out = U64_TO_DBL(overhead) / onionskins_usec_internal[onionskin_type];
 
-   return 0;
 
- }
 
- /** If we've measured overhead for onionskins of type <b>onionskin_type</b>,
 
-  * log it. */
 
- void
 
- cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
 
-                                  const char *onionskin_type_name)
 
- {
 
-   uint32_t overhead;
 
-   double relative_overhead;
 
-   int r;
 
-   r = get_overhead_for_onionskins(&overhead,  &relative_overhead,
 
-                                   onionskin_type);
 
-   if (!overhead || r<0)
 
-     return;
 
-   log_fn(severity, LD_OR,
 
-          "%s onionskins have averaged %u usec overhead (%.2f%%) in "
 
-          "cpuworker code ",
 
-          onionskin_type_name, (unsigned)overhead, relative_overhead*100);
 
- }
 
- /** Handle a reply from the worker threads. */
 
- static void
 
- cpuworker_onion_handshake_replyfn(void *work_)
 
- {
 
-   cpuworker_job_t *job = work_;
 
-   cpuworker_reply_t rpl;
 
-   or_circuit_t *circ = NULL;
 
-   tor_assert(total_pending_tasks > 0);
 
-   --total_pending_tasks;
 
-   /* Could avoid this, but doesn't matter. */
 
-   memcpy(&rpl, &job->u.reply, sizeof(rpl));
 
-   tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
 
-   if (rpl.timed && rpl.success &&
 
-       rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) {
 
-     /* Time how long this request took. The handshake_type check should be
 
-        needless, but let's leave it in to be safe. */
 
-     struct timeval tv_end, tv_diff;
 
-     int64_t usec_roundtrip;
 
-     tor_gettimeofday(&tv_end);
 
-     timersub(&tv_end, &rpl.started_at, &tv_diff);
 
-     usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
 
-     if (usec_roundtrip >= 0 &&
 
-         usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) {
 
-       ++onionskins_n_processed[rpl.handshake_type];
 
-       onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec;
 
-       onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip;
 
-       if (onionskins_n_processed[rpl.handshake_type] >= 500000) {
 
-         /* Scale down every 500000 handshakes.  On a busy server, that's
 
-          * less impressive than it sounds. */
 
-         onionskins_n_processed[rpl.handshake_type] /= 2;
 
-         onionskins_usec_internal[rpl.handshake_type] /= 2;
 
-         onionskins_usec_roundtrip[rpl.handshake_type] /= 2;
 
-       }
 
-     }
 
-   }
 
-   circ = job->circ;
 
-   log_debug(LD_OR,
 
-             "Unpacking cpuworker reply %p, circ=%p, success=%d",
 
-             job, circ, rpl.success);
 
-   if (circ->base_.magic == DEAD_CIRCUIT_MAGIC) {
 
-     /* The circuit was supposed to get freed while the reply was
 
-      * pending. Instead, it got left for us to free so that we wouldn't freak
 
-      * out when the job->circ field wound up pointing to nothing. */
 
-     log_debug(LD_OR, "Circuit died while reply was pending. Freeing memory.");
 
-     circ->base_.magic = 0;
 
-     tor_free(circ);
 
-     goto done_processing;
 
-   }
 
-   circ->workqueue_entry = NULL;
 
-   if (TO_CIRCUIT(circ)->marked_for_close) {
 
-     /* We already marked this circuit; we can't call it open. */
 
-     log_debug(LD_OR,"circuit is already marked.");
 
-     goto done_processing;
 
-   }
 
-   if (rpl.success == 0) {
 
-     log_debug(LD_OR,
 
-               "decoding onionskin failed. "
 
-               "(Old key or bad software.) Closing.");
 
-     circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_TORPROTOCOL);
 
-     goto done_processing;
 
-   }
 
-   if (onionskin_answer(circ,
 
-                        &rpl.created_cell,
 
-                        (const char*)rpl.keys,
 
-                        rpl.rend_auth_material) < 0) {
 
-     log_warn(LD_OR,"onionskin_answer failed. Closing.");
 
-     circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_INTERNAL);
 
-     goto done_processing;
 
-   }
 
-   log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
 
-  done_processing:
 
-   memwipe(&rpl, 0, sizeof(rpl));
 
-   memwipe(job, 0, sizeof(*job));
 
-   tor_free(job);
 
-   queue_pending_tasks();
 
- }
 
- /** Implementation function for onion handshake requests. */
 
- static workqueue_reply_t
 
- cpuworker_onion_handshake_threadfn(void *state_, void *work_)
 
- {
 
-   worker_state_t *state = state_;
 
-   cpuworker_job_t *job = work_;
 
-   /* variables for onion processing */
 
-   server_onion_keys_t *onion_keys = state->onion_keys;
 
-   cpuworker_request_t req;
 
-   cpuworker_reply_t rpl;
 
-   memcpy(&req, &job->u.request, sizeof(req));
 
-   tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
 
-   memset(&rpl, 0, sizeof(rpl));
 
-   const create_cell_t *cc = &req.create_cell;
 
-   created_cell_t *cell_out = &rpl.created_cell;
 
-   struct timeval tv_start = {0,0}, tv_end;
 
-   int n;
 
-   rpl.timed = req.timed;
 
-   rpl.started_at = req.started_at;
 
-   rpl.handshake_type = cc->handshake_type;
 
-   if (req.timed)
 
-     tor_gettimeofday(&tv_start);
 
-   n = onion_skin_server_handshake(cc->handshake_type,
 
-                                   cc->onionskin, cc->handshake_len,
 
-                                   onion_keys,
 
-                                   cell_out->reply,
 
-                                   rpl.keys, CPATH_KEY_MATERIAL_LEN,
 
-                                   rpl.rend_auth_material);
 
-   if (n < 0) {
 
-     /* failure */
 
-     log_debug(LD_OR,"onion_skin_server_handshake failed.");
 
-     memset(&rpl, 0, sizeof(rpl));
 
-     rpl.success = 0;
 
-   } else {
 
-     /* success */
 
-     log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
 
-     cell_out->handshake_len = n;
 
-     switch (cc->cell_type) {
 
-     case CELL_CREATE:
 
-       cell_out->cell_type = CELL_CREATED; break;
 
-     case CELL_CREATE2:
 
-       cell_out->cell_type = CELL_CREATED2; break;
 
-     case CELL_CREATE_FAST:
 
-       cell_out->cell_type = CELL_CREATED_FAST; break;
 
-     default:
 
-       tor_assert(0);
 
-       return WQ_RPL_SHUTDOWN;
 
-     }
 
-     rpl.success = 1;
 
-   }
 
-   rpl.magic = CPUWORKER_REPLY_MAGIC;
 
-   if (req.timed) {
 
-     struct timeval tv_diff;
 
-     int64_t usec;
 
-     tor_gettimeofday(&tv_end);
 
-     timersub(&tv_end, &tv_start, &tv_diff);
 
-     usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
 
-     if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY)
 
-       rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY;
 
-     else
 
-       rpl.n_usec = (uint32_t) usec;
 
-   }
 
-   memcpy(&job->u.reply, &rpl, sizeof(rpl));
 
-   memwipe(&req, 0, sizeof(req));
 
-   memwipe(&rpl, 0, sizeof(req));
 
-   return WQ_RPL_REPLY;
 
- }
 
- /** Take pending tasks from the queue and assign them to cpuworkers. */
 
- static void
 
- queue_pending_tasks(void)
 
- {
 
-   or_circuit_t *circ;
 
-   create_cell_t *onionskin = NULL;
 
-   while (total_pending_tasks < max_pending_tasks) {
 
-     circ = onion_next_task(&onionskin);
 
-     if (!circ)
 
-       return;
 
-     if (assign_onionskin_to_cpuworker(circ, onionskin))
 
-       log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring.");
 
-   }
 
- }
 
- /** DOCDOC */
 
- MOCK_IMPL(workqueue_entry_t *,
 
- cpuworker_queue_work,(workqueue_reply_t (*fn)(void *, void *),
 
-                       void (*reply_fn)(void *),
 
-                       void *arg))
 
- {
 
-   tor_assert(threadpool);
 
-   return threadpool_queue_work(threadpool,
 
-                                fn,
 
-                                reply_fn,
 
-                                arg);
 
- }
 
- /** Try to tell a cpuworker to perform the public key operations necessary to
 
-  * respond to <b>onionskin</b> for the circuit <b>circ</b>.
 
-  *
 
-  * Return 0 if we successfully assign the task, or -1 on failure.
 
-  */
 
- int
 
- assign_onionskin_to_cpuworker(or_circuit_t *circ,
 
-                               create_cell_t *onionskin)
 
- {
 
-   workqueue_entry_t *queue_entry;
 
-   cpuworker_job_t *job;
 
-   cpuworker_request_t req;
 
-   int should_time;
 
-   tor_assert(threadpool);
 
-   if (!circ->p_chan) {
 
-     log_info(LD_OR,"circ->p_chan gone. Failing circ.");
 
-     tor_free(onionskin);
 
-     return -1;
 
-   }
 
-   if (total_pending_tasks >= max_pending_tasks) {
 
-     log_debug(LD_OR,"No idle cpuworkers. Queuing.");
 
-     if (onion_pending_add(circ, onionskin) < 0) {
 
-       tor_free(onionskin);
 
-       return -1;
 
-     }
 
-     return 0;
 
-   }
 
-   if (connection_or_digest_is_known_relay(circ->p_chan->identity_digest))
 
-     rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
 
-   should_time = should_time_request(onionskin->handshake_type);
 
-   memset(&req, 0, sizeof(req));
 
-   req.magic = CPUWORKER_REQUEST_MAGIC;
 
-   req.timed = should_time;
 
-   memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
 
-   tor_free(onionskin);
 
-   if (should_time)
 
-     tor_gettimeofday(&req.started_at);
 
-   job = tor_malloc_zero(sizeof(cpuworker_job_t));
 
-   job->circ = circ;
 
-   memcpy(&job->u.request, &req, sizeof(req));
 
-   memwipe(&req, 0, sizeof(req));
 
-   ++total_pending_tasks;
 
-   queue_entry = threadpool_queue_work(threadpool,
 
-                                       cpuworker_onion_handshake_threadfn,
 
-                                       cpuworker_onion_handshake_replyfn,
 
-                                       job);
 
-   if (!queue_entry) {
 
-     log_warn(LD_BUG, "Couldn't queue work on threadpool");
 
-     tor_free(job);
 
-     return -1;
 
-   }
 
-   log_debug(LD_OR, "Queued task %p (qe=%p, circ=%p)",
 
-             job, queue_entry, job->circ);
 
-   circ->workqueue_entry = queue_entry;
 
-   return 0;
 
- }
 
- /** If <b>circ</b> has a pending handshake that hasn't been processed yet,
 
-  * remove it from the worker queue. */
 
- void
 
- cpuworker_cancel_circ_handshake(or_circuit_t *circ)
 
- {
 
-   cpuworker_job_t *job;
 
-   if (circ->workqueue_entry == NULL)
 
-     return;
 
-   job = workqueue_entry_cancel(circ->workqueue_entry);
 
-   if (job) {
 
-     /* It successfully cancelled. */
 
-     memwipe(job, 0xe0, sizeof(*job));
 
-     tor_free(job);
 
-     tor_assert(total_pending_tasks > 0);
 
-     --total_pending_tasks;
 
-     /* if (!job), this is done in cpuworker_onion_handshake_replyfn. */
 
-     circ->workqueue_entry = NULL;
 
-   }
 
- }
 
 
  |