|
@@ -52,6 +52,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *conn,
|
|
|
crypt_path_t *layer_hint);
|
|
|
static int
|
|
|
circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint);
|
|
|
+static int circuit_queue_streams_are_blocked(circuit_t *circ);
|
|
|
|
|
|
/** Cache the current hi-res time; the cache gets reset when libevent
|
|
|
* calls us. */
|
|
@@ -268,7 +269,7 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ,
|
|
|
* we might kill the circ before we relay
|
|
|
* the cells. */
|
|
|
|
|
|
- append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction);
|
|
|
+ append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction, 0);
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
@@ -365,7 +366,7 @@ relay_crypt(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction,
|
|
|
static int
|
|
|
circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
|
|
|
cell_direction_t cell_direction,
|
|
|
- crypt_path_t *layer_hint)
|
|
|
+ crypt_path_t *layer_hint, uint16_t on_stream)
|
|
|
{
|
|
|
or_connection_t *conn; /* where to send the cell */
|
|
|
|
|
@@ -409,7 +410,7 @@ circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
|
|
|
}
|
|
|
++stats_n_relay_cells_relayed;
|
|
|
|
|
|
- append_cell_to_circuit_queue(circ, conn, cell, cell_direction);
|
|
|
+ append_cell_to_circuit_queue(circ, conn, cell, cell_direction, on_stream);
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
@@ -624,8 +625,8 @@ relay_send_command_from_edge(uint16_t stream_id, circuit_t *circ,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer)
|
|
|
- < 0) {
|
|
|
+ if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer,
|
|
|
+ stream_id) < 0) {
|
|
|
log_warn(LD_BUG,"circuit_package_relay_cell failed. Closing.");
|
|
|
circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL);
|
|
|
return -1;
|
|
@@ -1236,6 +1237,10 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
|
|
|
conn->package_window += STREAMWINDOW_INCREMENT;
|
|
|
log_debug(domain,"stream-level sendme, packagewindow now %d.",
|
|
|
conn->package_window);
|
|
|
+ if (circuit_queue_streams_are_blocked(circ)) {
|
|
|
+ /* Still waiting for queue to flush; don't touch conn */
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
connection_start_reading(TO_CONN(conn));
|
|
|
/* handle whatever might still be on the inbuf */
|
|
|
if (connection_edge_package_raw_inbuf(conn, 1) < 0) {
|
|
@@ -1436,7 +1441,10 @@ connection_edge_consider_sending_sendme(edge_connection_t *conn)
|
|
|
static void
|
|
|
circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
|
|
|
{
|
|
|
-
|
|
|
+ if (circuit_queue_streams_are_blocked(circ)) {
|
|
|
+ log_debug(layer_hint?LD_APP:LD_EXIT,"Too big queue, no resuming");
|
|
|
+ return;
|
|
|
+ }
|
|
|
log_debug(layer_hint?LD_APP:LD_EXIT,"resuming");
|
|
|
|
|
|
if (CIRCUIT_IS_ORIGIN(circ))
|
|
@@ -2092,12 +2100,19 @@ connection_or_unlink_all_active_circs(or_connection_t *orconn)
|
|
|
|
|
|
/** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
|
|
|
* every edge connection that is using <b>circ</b> to write to <b>orconn</b>,
|
|
|
- * and start or stop reading as appropriate. */
|
|
|
-static void
|
|
|
+ * and start or stop reading as appropriate.
|
|
|
+ *
|
|
|
+ * If <b>stream_id</b> is nonzero, block only the edge connection whose
|
|
|
+ * stream_id matches it.
|
|
|
+ *
|
|
|
+ * Returns the number of streams whose status we changed.
|
|
|
+ */
|
|
|
+static int
|
|
|
set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
|
|
|
- int block)
|
|
|
+ int block, uint16_t stream_id)
|
|
|
{
|
|
|
edge_connection_t *edge = NULL;
|
|
|
+ int n = 0;
|
|
|
if (circ->n_conn == orconn) {
|
|
|
circ->streams_blocked_on_n_conn = block;
|
|
|
if (CIRCUIT_IS_ORIGIN(circ))
|
|
@@ -2110,7 +2125,13 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
|
|
|
|
|
|
for (; edge; edge = edge->next_stream) {
|
|
|
connection_t *conn = TO_CONN(edge);
|
|
|
- edge->edge_blocked_on_circ = block;
|
|
|
+ if (stream_id && edge->stream_id != stream_id)
|
|
|
+ continue;
|
|
|
+
|
|
|
+ if (edge->edge_blocked_on_circ != block) {
|
|
|
+ ++n;
|
|
|
+ edge->edge_blocked_on_circ = block;
|
|
|
+ }
|
|
|
|
|
|
if (!conn->read_event) {
|
|
|
/* This connection is a placeholder for something; probably a DNS
|
|
@@ -2127,6 +2148,8 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
|
|
|
connection_start_reading(conn);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ return n;
|
|
|
}
|
|
|
|
|
|
/** Pull as many cells as possible (but no more than <b>max</b>) from the
|
|
@@ -2252,7 +2275,7 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
|
|
|
/* Is the cell queue low enough to unblock all the streams that are waiting
|
|
|
* to write to this circuit? */
|
|
|
if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
|
|
|
- set_streams_blocked_on_circ(circ, conn, 0); /* unblock streams */
|
|
|
+ set_streams_blocked_on_circ(circ, conn, 0, 0); /* unblock streams */
|
|
|
|
|
|
/* Did we just run out of cells on this circuit's queue? */
|
|
|
if (queue->n == 0) {
|
|
@@ -2269,7 +2292,8 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
|
|
|
* transmitting in <b>direction</b>. */
|
|
|
void
|
|
|
append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
|
|
|
- cell_t *cell, cell_direction_t direction)
|
|
|
+ cell_t *cell, cell_direction_t direction,
|
|
|
+ uint16_t fromstream)
|
|
|
{
|
|
|
cell_queue_t *queue;
|
|
|
int streams_blocked;
|
|
@@ -2291,7 +2315,12 @@ append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
|
|
|
/* If we have too many cells on the circuit, we should stop reading from
|
|
|
* the edge streams for a while. */
|
|
|
if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE)
|
|
|
- set_streams_blocked_on_circ(circ, orconn, 1); /* block streams */
|
|
|
+ set_streams_blocked_on_circ(circ, orconn, 1, 0); /* block streams */
|
|
|
+
|
|
|
+ if (streams_blocked && fromstream) {
|
|
|
+ /* This edge connection is apparently not blocked; block it. */
|
|
|
+ set_streams_blocked_on_circ(circ, orconn, 1, fromstream);
|
|
|
+ }
|
|
|
|
|
|
if (queue->n == 1) {
|
|
|
/* This was the first cell added to the queue. We need to make this
|
|
@@ -2405,3 +2434,15 @@ assert_active_circuits_ok(or_connection_t *orconn)
|
|
|
tor_assert(n == smartlist_len(orconn->active_circuit_pqueue));
|
|
|
}
|
|
|
|
|
|
+/** Return 1 if we shouldn't restart reading on this circuit, even if
|
|
|
+ * we get a SENDME. Else return 0.
|
|
|
+*/
|
|
|
+static int
|
|
|
+circuit_queue_streams_are_blocked(circuit_t *circ)
|
|
|
+{
|
|
|
+ if (CIRCUIT_IS_ORIGIN(circ)) {
|
|
|
+ return circ->streams_blocked_on_n_conn;
|
|
|
+ } else {
|
|
|
+ return circ->streams_blocked_on_p_conn;
|
|
|
+ }
|
|
|
+}
|