| 
					
				 | 
			
			
				@@ -25,11 +25,19 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 #include "orconfig.h" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 #include "compat.h" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 #include "compat_threads.h" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+#include "crypto.h" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 #include "util.h" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 #include "workqueue.h" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 #include "tor_queue.h" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 #include "torlog.h" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+#define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_s); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+typedef struct work_tailq_t work_tailq_t; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 struct threadpool_s { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   /** An array of pointers to workerthread_t: one for each running worker 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    * thread. */ 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -38,8 +46,12 @@ struct threadpool_s { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   /** Condition variable that we wait on when we have no work, and which 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    * gets signaled when our queue becomes nonempty. */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   tor_cond_t condition; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-  /** Queue of pending work that we have to do. */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-  TOR_TAILQ_HEAD(, workqueue_entry_s) work; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  /** Queues of pending work that we have to do. The queue with priority 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+   * <b>p</b> is work[p]. */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  work_tailq_t work[WORKQUEUE_N_PRIORITIES]; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  /** Weak RNG, used to decide when to ignore priority. */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  tor_weak_rng_t weak_rng; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   /** The current 'update generation' of the threadpool.  Any thread that is 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    * at an earlier generation needs to run the update function. */ 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -66,6 +78,11 @@ struct threadpool_s { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   void *new_thread_state_arg; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 }; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+/** Used to put a workqueue_priority_t value into a bitfield. */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+#define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+/** Number of bits needed to hold all legal values of workqueue_priority_t */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+#define WORKQUEUE_PRIORITY_BITS 2 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 struct workqueue_entry_s { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   /** The next workqueue_entry_t that's pending on the same thread or 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    * reply queue. */ 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -76,6 +93,8 @@ struct workqueue_entry_s { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   struct threadpool_s *on_pool; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   /** True iff this entry is waiting for a worker to start processing it. */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   uint8_t pending; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  /** Priority of this entry. */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  workqueue_priority_bitfield_t priority : WORKQUEUE_PRIORITY_BITS; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   /** Function to run in the worker thread. */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   workqueue_reply_t (*fn)(void *state, void *arg); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   /** Function to run while processing the reply queue. */ 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -94,9 +113,7 @@ struct replyqueue_s { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   alert_sockets_t alert; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 }; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-/** A worker thread represents a single thread in a thread pool.  To avoid 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- * contention, each gets its own queue. This breaks the guarantee that that 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- * queued work will get executed strictly in order. */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+/** A worker thread represents a single thread in a thread pool. */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 typedef struct workerthread_s { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   /** Which thread it this?  In range 0..in_pool->n_threads-1 */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   int index; 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -109,6 +126,8 @@ typedef struct workerthread_s { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   replyqueue_t *reply_queue; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   /** The current update generation of this thread */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   unsigned generation; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  /** One over the probability of taking work from a lower-priority queue. */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  int32_t lower_priority_chance; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } workerthread_t; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work); 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -125,6 +144,7 @@ workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   ent->fn = fn; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   ent->reply_fn = reply_fn; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   ent->arg = arg; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  ent->priority = WQ_PRI_HIGH; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   return ent; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -161,8 +181,9 @@ workqueue_entry_cancel(workqueue_entry_t *ent) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   int cancelled = 0; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   void *result = NULL; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   tor_mutex_acquire(&ent->on_pool->lock); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  workqueue_priority_t prio = ent->priority; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   if (ent->pending) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    TOR_TAILQ_REMOVE(&ent->on_pool->work, ent, next_work); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     cancelled = 1; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     result = ent->arg; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   } 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -180,8 +201,46 @@ workqueue_entry_cancel(workqueue_entry_t *ent) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 static int 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 worker_thread_has_work(workerthread_t *thread) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-  return !TOR_TAILQ_EMPTY(&thread->in_pool->work) || 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    thread->generation != thread->in_pool->generation; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  unsigned i; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i])) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return 1; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  return thread->generation != thread->in_pool->generation; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+} 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+/** Extract the next workqueue_entry_t from the the thread's pool, removing 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ * it from the relevant queues and marking it as non-pending. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ * 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ * The caller must hold the lock. */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+static workqueue_entry_t * 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+worker_thread_extract_next_work(workerthread_t *thread) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+{ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  threadpool_t *pool = thread->in_pool; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  work_tailq_t *queue = NULL, *this_queue; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  unsigned i; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    this_queue = &pool->work[i]; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    if (!TOR_TAILQ_EMPTY(this_queue)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      queue = this_queue; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      if (! tor_weak_random_one_in_n(&pool->weak_rng, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                     thread->lower_priority_chance)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        /* Usually we'll just break now, so that we can get out of the loop 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+         * and use the queue where we found work. But with a small 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+         * probability, we'll keep looking for lower priority work, so that 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+         * we don't ignore our low-priority queues entirely. */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        break; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  if (queue == NULL) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    return NULL; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  workqueue_entry_t *work = TOR_TAILQ_FIRST(queue); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  TOR_TAILQ_REMOVE(queue, work, next_work); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  work->pending = 0; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  return work; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 /** 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -217,9 +276,9 @@ worker_thread_main(void *thread_) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         tor_mutex_acquire(&pool->lock); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         continue; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				       } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-      work = TOR_TAILQ_FIRST(&pool->work); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-      TOR_TAILQ_REMOVE(&pool->work, work, next_work); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-      work->pending = 0; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      work = worker_thread_extract_next_work(thread); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      if (BUG(work == NULL)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        break; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				       tor_mutex_release(&pool->lock); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				       /* We run the work function without holding the thread lock. This 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -268,12 +327,14 @@ queue_reply(replyqueue_t *queue, workqueue_entry_t *work) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 /** Allocate and start a new worker thread to use state object <b>state</b>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  * and send responses to <b>replyqueue</b>. */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 static workerthread_t * 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+workerthread_new(int32_t lower_priority_chance, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                 void *state, threadpool_t *pool, replyqueue_t *replyqueue) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   thr->state = state; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   thr->reply_queue = replyqueue; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   thr->in_pool = pool; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  thr->lower_priority_chance = lower_priority_chance; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   if (spawn_func(worker_thread_main, thr) < 0) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     //LCOV_EXCL_START 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -299,24 +360,34 @@ workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  * function's responsibility to free the work object. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  * 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  * On success, return a workqueue_entry_t object that can be passed to 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- * workqueue_entry_cancel(). On failure, return NULL. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ * workqueue_entry_cancel(). On failure, return NULL.  (Failure is not 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ * currently possible, but callers should check anyway.) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ * 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ * Items are executed in a loose priority order -- each thread will usually 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ * take from the queued work with the highest prioirity, but will occasionally 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ * visit lower-priority queues to keep them from starving completely. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  * 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- * Note that because each thread has its own work queue, work items may not 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ * Note that because of priorities and thread behavior, work items may not 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  * be executed strictly in order. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 workqueue_entry_t * 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-threadpool_queue_work(threadpool_t *pool, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                      workqueue_reply_t (*fn)(void *, void *), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                      void (*reply_fn)(void *), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                      void *arg) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+threadpool_queue_work_priority(threadpool_t *pool, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               workqueue_priority_t prio, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               workqueue_reply_t (*fn)(void *, void *), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               void (*reply_fn)(void *), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               void *arg) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  tor_assert(prio >= WORKQUEUE_PRIORITY_FIRST && 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+             prio <= WORKQUEUE_PRIORITY_LAST); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   ent->on_pool = pool; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   ent->pending = 1; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  ent->priority = prio; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   tor_mutex_acquire(&pool->lock); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-  TOR_TAILQ_INSERT_TAIL(&pool->work, ent, next_work); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   tor_cond_signal_one(&pool->condition); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -325,6 +396,16 @@ threadpool_queue_work(threadpool_t *pool, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   return ent; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+/** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+workqueue_entry_t * 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+threadpool_queue_work(threadpool_t *pool, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      workqueue_reply_t (*fn)(void *, void *), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      void (*reply_fn)(void *), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      void *arg) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+{ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+} 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 /** 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  * Queue a copy of a work item for every thread in a pool.  This can be used, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  * for example, to tell the threads to update some parameter in their states. 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -388,6 +469,14 @@ threadpool_queue_update(threadpool_t *pool, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 /** Don't have more than this many threads per pool. */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 #define MAX_THREADS 1024 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+/** For half of our threads, choose lower priority queues with probability 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ * 1/N for each of these values. Both are chosen somewhat arbitrarily.  If 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ * CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ * stalling forever.  If it's too high, we have a risk of low-priority tasks 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ * grabbing half of the threads. */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+#define CHANCE_PERMISSIVE 37 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+#define CHANCE_STRICT INT32_MAX 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 /** Launch threads until we have <b>n</b>. */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 static int 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 threadpool_start_threads(threadpool_t *pool, int n) 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -404,8 +493,14 @@ threadpool_start_threads(threadpool_t *pool, int n) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                      sizeof(workerthread_t*), n); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   while (pool->n_threads < n) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /* For half of our threads, we'll choose lower priorities permissively; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+     * for the other half, we'll stick more strictly to higher priorities. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+     * This keeps slow low-priority tasks from taking over completely. */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    int32_t chance = (pool->n_threads & 1) ? CHANCE_STRICT : CHANCE_PERMISSIVE; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     void *state = pool->new_thread_state_fn(pool->new_thread_state_arg); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    workerthread_t *thr = workerthread_new(state, pool, pool->reply_queue); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    workerthread_t *thr = workerthread_new(chance, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                           state, pool, pool->reply_queue); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     if (!thr) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				       //LCOV_EXCL_START 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -441,7 +536,15 @@ threadpool_new(int n_threads, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   pool = tor_malloc_zero(sizeof(threadpool_t)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   tor_mutex_init_nonrecursive(&pool->lock); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   tor_cond_init(&pool->condition); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-  TOR_TAILQ_INIT(&pool->work); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  unsigned i; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    TOR_TAILQ_INIT(&pool->work[i]); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    unsigned seed; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    crypto_rand((void*)&seed, sizeof(seed)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    tor_init_weak_random(&pool->weak_rng, seed); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   pool->new_thread_state_fn = new_thread_state_fn; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   pool->new_thread_state_arg = arg; 
			 |