|
@@ -1482,6 +1482,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
|
|
|
int packaged_this_round;
|
|
|
int cells_on_queue;
|
|
|
int cells_per_conn;
|
|
|
+ int num_streams = 0;
|
|
|
+ edge_connection_t *chosen_stream = NULL;
|
|
|
|
|
|
/* How many cells do we have space for? It will be the minimum of
|
|
|
* the number needed to exhaust the package window, and the minimum
|
|
@@ -1499,7 +1501,38 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
|
|
|
/* Count how many non-marked streams there are that have anything on
|
|
|
* their inbuf, and enable reading on all of the connections. */
|
|
|
n_streams = 0;
|
|
|
- for (conn=first_conn; conn; conn=conn->next_stream) {
|
|
|
+ /* This used to start listening on the streams in the order they
|
|
|
+ * appeared in the linked list. That leads to starvation in the
|
|
|
+ * event that, for example, our circuit window is almost full, and
|
|
|
+ * there are lots of streams. Then the first few streams will have
|
|
|
+ * data read from them, and then the window will close again. When
|
|
|
+ * it reopens, we would enable reading from the beginning of the list
|
|
|
+ * again. Instead, we just pick a random stream on the list, and
|
|
|
+ * enable reading for streams starting at that point (and wrapping
|
|
|
+ * around as if the list were circular). It would probably be better
|
|
|
+ * to actually remember which streams we've serviced in the past, but
|
|
|
+ * this is simple and effective. */
|
|
|
+
|
|
|
+ /* Select a stream uniformly at random from the linked list. We
|
|
|
+ * don't need cryptographic randomness here. */
|
|
|
+ for(conn = first_conn; conn; conn = conn->next_stream) {
|
|
|
+ num_streams++;
|
|
|
+ if((random() % num_streams)==0) chosen_stream = conn;
|
|
|
+ }
|
|
|
+ /* Activate reading starting from the chosen stream */
|
|
|
+ for (conn=chosen_stream; conn; conn = conn->next_stream) {
|
|
|
+ /* Start reading for the streams starting from here */
|
|
|
+ if (conn->_base.marked_for_close || conn->package_window <= 0)
|
|
|
+ continue;
|
|
|
+ if (!layer_hint || conn->cpath_layer == layer_hint) {
|
|
|
+ connection_start_reading(TO_CONN(conn));
|
|
|
+
|
|
|
+ if (connection_get_inbuf_len(TO_CONN(conn)) > 0)
|
|
|
+ ++n_streams;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /* Go back and do the ones we skipped, circular-style */
|
|
|
+ for(conn = first_conn; conn != chosen_stream; conn = conn->next_stream) {
|
|
|
if (conn->_base.marked_for_close || conn->package_window <= 0)
|
|
|
continue;
|
|
|
if (!layer_hint || conn->cpath_layer == layer_hint) {
|