|
@@ -57,6 +57,13 @@ static int circuit_queue_streams_are_blocked(circuit_t *circ);
|
|
|
|
|
|
static struct timeval cached_time_hires = {0, 0};
|
|
|
|
|
|
+/** Stop reading on edge connections when we have this many cells
|
|
|
+ * waiting on the appropriate queue. */
|
|
|
+#define CELL_QUEUE_HIGHWATER_SIZE 256
|
|
|
+/** Start reading from edge connections again when we get down to this many
|
|
|
+ * cells. */
|
|
|
+#define CELL_QUEUE_LOWWATER_SIZE 64
|
|
|
+
|
|
|
static void
|
|
|
tor_gettimeofday_cached(struct timeval *tv)
|
|
|
{
|
|
@@ -1471,31 +1478,101 @@ circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
|
|
|
* of a linked list of edge streams that should each be considered.
|
|
|
*/
|
|
|
static int
|
|
|
-circuit_resume_edge_reading_helper(edge_connection_t *conn,
|
|
|
+circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
|
|
|
circuit_t *circ,
|
|
|
crypt_path_t *layer_hint)
|
|
|
{
|
|
|
- for ( ; conn; conn=conn->next_stream) {
|
|
|
- if (conn->_base.marked_for_close)
|
|
|
+ edge_connection_t *conn;
|
|
|
+ int n_streams, n_streams_left;
|
|
|
+ int packaged_this_round;
|
|
|
+ int cells_on_queue;
|
|
|
+ int cells_per_conn;
|
|
|
+
|
|
|
+ /* How many cells do we have space for? It will be the minimum of
|
|
|
+ * the number needed to exhaust the package window, and the minimum
|
|
|
+ * needed to fill the cell queue. */
|
|
|
+ int max_to_package = circ->package_window;
|
|
|
+ if (CIRCUIT_IS_ORIGIN(circ)) {
|
|
|
+ cells_on_queue = circ->n_conn_cells.n;
|
|
|
+ } else {
|
|
|
+ or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
|
|
|
+ cells_on_queue = or_circ->p_conn_cells.n;
|
|
|
+ }
|
|
|
+ if (CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue < max_to_package)
|
|
|
+ max_to_package = CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue;
|
|
|
+
|
|
|
+ /* Count how many non-marked streams there are that have anything on
|
|
|
+ * their inbuf, and enable reading on all of the connections. */
|
|
|
+ n_streams = 0;
|
|
|
+ for (conn=first_conn; conn; conn=conn->next_stream) {
|
|
|
+ if (conn->_base.marked_for_close || conn->package_window <= 0)
|
|
|
continue;
|
|
|
- if ((!layer_hint && conn->package_window > 0) ||
|
|
|
- (layer_hint && conn->package_window > 0 &&
|
|
|
- conn->cpath_layer == layer_hint)) {
|
|
|
+ if (!layer_hint || conn->cpath_layer == layer_hint) {
|
|
|
connection_start_reading(TO_CONN(conn));
|
|
|
+
|
|
|
+ if (buf_datalen(conn->_base.inbuf) > 0)
|
|
|
+ ++n_streams;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (n_streams == 0) /* avoid divide-by-zero */
|
|
|
+ return 0;
|
|
|
+
|
|
|
+ again:
|
|
|
+
|
|
|
+ /* ??? turn this into a ceildiv function? */
|
|
|
+ cells_per_conn = (max_to_package + n_streams - 1 ) / n_streams;
|
|
|
+
|
|
|
+ packaged_this_round = 0;
|
|
|
+ n_streams_left = 0;
|
|
|
+
|
|
|
+ /* Iterate over all connections. Package up to cells_per_conn cells on
|
|
|
+ * each. Update packaged_this_round with the total number of cells
|
|
|
+ * packaged, and n_streams_left with the number that still have data to
|
|
|
+ * package.
|
|
|
+ */
|
|
|
+ for (conn=first_conn; conn; conn=conn->next_stream) {
|
|
|
+ if (conn->_base.marked_for_close || conn->package_window <= 0)
|
|
|
+ continue;
|
|
|
+ if (!layer_hint || conn->cpath_layer == layer_hint) {
|
|
|
+ int n = cells_per_conn, r;
|
|
|
/* handle whatever might still be on the inbuf */
|
|
|
- if (connection_edge_package_raw_inbuf(conn, 1, NULL)<0) {
|
|
|
- /* (We already sent an end cell if possible) */
|
|
|
+ r = connection_edge_package_raw_inbuf(conn, 1, &n);
|
|
|
+
|
|
|
+ /* Note how many we packaged */
|
|
|
+ packaged_this_round += (cells_per_conn-n);
|
|
|
+
|
|
|
+ if (r<0) {
|
|
|
+ /* Problem while packaging. (We already sent an end cell if
|
|
|
+ * possible) */
|
|
|
connection_mark_for_close(TO_CONN(conn));
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
+ /* If there's still data to read, we'll be coming back to this stream. */
|
|
|
+ if (buf_datalen(conn->_base.inbuf))
|
|
|
+ ++n_streams_left;
|
|
|
+
|
|
|
/* If the circuit won't accept any more data, return without looking
|
|
|
* at any more of the streams. Any connections that should be stopped
|
|
|
* have already been stopped by connection_edge_package_raw_inbuf. */
|
|
|
if (circuit_consider_stop_edge_reading(circ, layer_hint))
|
|
|
return -1;
|
|
|
+ /* XXXX should we also stop immediately if we fill up the cell queue?
|
|
|
+ * Probably. */
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /* If we made progress, and we are willing to package more, and there are
|
|
|
+ * any streams left that want to package stuff... try again!
|
|
|
+ */
|
|
|
+ if (packaged_this_round && packaged_this_round < max_to_package &&
|
|
|
+ n_streams_left) {
|
|
|
+ max_to_package -= packaged_this_round;
|
|
|
+ n_streams = n_streams_left;
|
|
|
+ goto again;
|
|
|
+ }
|
|
|
+
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
@@ -1564,13 +1641,6 @@ circuit_consider_sending_sendme(circuit_t *circ, crypt_path_t *layer_hint)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/** Stop reading on edge connections when we have this many cells
|
|
|
- * waiting on the appropriate queue. */
|
|
|
-#define CELL_QUEUE_HIGHWATER_SIZE 256
|
|
|
-/** Start reading from edge connections again when we get down to this many
|
|
|
- * cells. */
|
|
|
-#define CELL_QUEUE_LOWWATER_SIZE 64
|
|
|
-
|
|
|
#ifdef ACTIVE_CIRCUITS_PARANOIA
|
|
|
#define assert_active_circuits_ok_paranoid(conn) \
|
|
|
assert_active_circuits_ok(conn)
|