|
@@ -1779,10 +1779,10 @@ circuit_consider_sending_sendme(circuit_t *circ, crypt_path_t *layer_hint)
|
|
|
}
|
|
|
|
|
|
#ifdef ACTIVE_CIRCUITS_PARANOIA
|
|
|
-#define assert_active_circuits_ok_paranoid(conn) \
|
|
|
- assert_active_circuits_ok(conn)
|
|
|
+#define assert_cmux_ok_paranoid(chan) \
|
|
|
+ assert_cmux_okay(chan)
|
|
|
#else
|
|
|
-#define assert_active_circuits_ok_paranoid(conn)
|
|
|
+#define assert_cmux_ok_paranoid(chan)
|
|
|
#endif
|
|
|
|
|
|
/** The total number of cells we have allocated from the memory pool. */
|
|
@@ -2004,6 +2004,7 @@ prev_circ_on_chan_p(circuit_t *circ, channel_t *chan)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+#if 0
|
|
|
/** Helper for sorting cell_ewma_t values in their priority queue. */
|
|
|
static int
|
|
|
compare_cell_ewma_counts(const void *p1, const void *p2)
|
|
@@ -2240,122 +2241,61 @@ pop_first_cell_ewma_from_chan(channel_t *chan)
|
|
|
compare_cell_ewma_counts,
|
|
|
STRUCT_OFFSET(cell_ewma_t, heap_index));
|
|
|
}
|
|
|
+#endif
|
|
|
|
|
|
-/** Add <b>circ</b> to the list of circuits with pending cells on
|
|
|
- * <b>chan</b>. No effect if <b>circ</b> is already linked. */
|
|
|
+/**
|
|
|
+ * Update the number of cells available on the circuit's n_chan or p_chan's
|
|
|
+ * circuit mux.
|
|
|
+ */
|
|
|
void
|
|
|
-make_circuit_active_on_chan(circuit_t *circ, channel_t *chan)
|
|
|
+update_circuit_on_cmux(circuit_t *circ, cell_direction_t direction)
|
|
|
{
|
|
|
- circuit_t **nextp = NULL, **prevp = NULL;
|
|
|
+ channel_t *chan = NULL;
|
|
|
+ or_circuit_t *or_circ = NULL;
|
|
|
+ circuitmux_t *cmux = NULL;
|
|
|
|
|
|
- tor_assert(chan);
|
|
|
tor_assert(circ);
|
|
|
|
|
|
- nextp = next_circ_on_chan_p(circ, chan);
|
|
|
- prevp = prev_circ_on_chan_p(circ, chan);
|
|
|
-
|
|
|
- if (*nextp && *prevp) {
|
|
|
- /* Already active. */
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- assert_active_circuits_ok_paranoid(chan);
|
|
|
-
|
|
|
- if (!(chan->active_circuits)) {
|
|
|
- chan->active_circuits = circ;
|
|
|
- *prevp = *nextp = circ;
|
|
|
- } else {
|
|
|
- circuit_t *head = chan->active_circuits;
|
|
|
- circuit_t *old_tail = *prev_circ_on_chan_p(head, chan);
|
|
|
- *next_circ_on_chan_p(old_tail, chan) = circ;
|
|
|
- *nextp = head;
|
|
|
- *prev_circ_on_chan_p(head, chan) = circ;
|
|
|
- *prevp = old_tail;
|
|
|
- }
|
|
|
-
|
|
|
- if (circ->n_chan == chan) {
|
|
|
- add_cell_ewma_to_chan(chan, &circ->n_cell_ewma);
|
|
|
+ /* Okay, get the channel */
|
|
|
+ if (direction == CELL_DIRECTION_OUT) {
|
|
|
+ chan = circ->n_chan;
|
|
|
} else {
|
|
|
- or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
|
|
|
- tor_assert(chan == orcirc->p_chan);
|
|
|
- add_cell_ewma_to_chan(chan, &orcirc->p_cell_ewma);
|
|
|
+ or_circ = TO_OR_CIRCUIT(circ);
|
|
|
+ chan = or_circ->p_chan;
|
|
|
}
|
|
|
|
|
|
- assert_active_circuits_ok_paranoid(chan);
|
|
|
-}
|
|
|
-
|
|
|
-/** Remove <b>circ</b> from the list of circuits with pending cells on
|
|
|
- * <b>chan</b>. No effect if <b>circ</b> is already unlinked. */
|
|
|
-void
|
|
|
-make_circuit_inactive_on_chan(circuit_t *circ, channel_t *chan)
|
|
|
-{
|
|
|
- circuit_t **nextp = NULL, **prevp = NULL;
|
|
|
- circuit_t *next = NULL, *prev = NULL;
|
|
|
-
|
|
|
tor_assert(chan);
|
|
|
- tor_assert(circ);
|
|
|
+ tor_assert(chan->cmux);
|
|
|
|
|
|
- nextp = next_circ_on_chan_p(circ, chan);
|
|
|
- prevp = prev_circ_on_chan_p(circ, chan);
|
|
|
- next = *nextp;
|
|
|
- prev = *prevp;
|
|
|
+ /* Now get the cmux */
|
|
|
+ cmux = chan->cmux;
|
|
|
|
|
|
- if (!next && !prev) {
|
|
|
- /* Already inactive. */
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- assert_active_circuits_ok_paranoid(chan);
|
|
|
-
|
|
|
- tor_assert(next && prev);
|
|
|
- tor_assert(*prev_circ_on_chan_p(next, chan) == circ);
|
|
|
- tor_assert(*next_circ_on_chan_p(prev, chan) == circ);
|
|
|
+ /* Cmux sanity check */
|
|
|
+ tor_assert(circuitmux_is_circuit_attached(cmux, circ));
|
|
|
+ tor_assert(circuitmux_attached_circuit_direction(cmux, circ) == direction);
|
|
|
|
|
|
- if (next == circ) {
|
|
|
- chan->active_circuits = NULL;
|
|
|
- } else {
|
|
|
- *prev_circ_on_chan_p(next, chan) = prev;
|
|
|
- *next_circ_on_chan_p(prev, chan) = next;
|
|
|
- if (chan->active_circuits == circ)
|
|
|
- chan->active_circuits = next;
|
|
|
- }
|
|
|
- *prevp = *nextp = NULL;
|
|
|
+ assert_cmux_ok_paranoid(chan);
|
|
|
|
|
|
- if (circ->n_chan == chan) {
|
|
|
- remove_cell_ewma_from_chan(chan, &circ->n_cell_ewma);
|
|
|
+ /* Update the number of cells we have for the circuit mux */
|
|
|
+ if (direction == CELL_DIRECTION_OUT) {
|
|
|
+ circuitmux_set_num_cells(cmux, circ, circ->n_chan_cells.n);
|
|
|
} else {
|
|
|
- or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
|
|
|
- tor_assert(chan == orcirc->p_chan);
|
|
|
- remove_cell_ewma_from_chan(chan, &orcirc->p_cell_ewma);
|
|
|
+ circuitmux_set_num_cells(cmux, circ, or_circ->p_chan_cells.n);
|
|
|
}
|
|
|
|
|
|
- assert_active_circuits_ok_paranoid(chan);
|
|
|
+ assert_cmux_ok_paranoid(chan);
|
|
|
}
|
|
|
|
|
|
-/** Remove all circuits from the list of circuits with pending cells on
|
|
|
- * <b>chan</b>. */
|
|
|
+/** Remove all circuits from the cmux on <b>chan</b>. */
|
|
|
void
|
|
|
-channel_unlink_all_active_circs(channel_t *chan)
|
|
|
+channel_unlink_all_circuits(channel_t *chan)
|
|
|
{
|
|
|
- circuit_t *head = NULL, *cur = NULL;
|
|
|
-
|
|
|
tor_assert(chan);
|
|
|
+ tor_assert(chan->cmux);
|
|
|
|
|
|
- cur = head = chan->active_circuits;
|
|
|
- if (! head)
|
|
|
- return;
|
|
|
- do {
|
|
|
- circuit_t *next = *next_circ_on_chan_p(cur, chan);
|
|
|
- *prev_circ_on_chan_p(cur, chan) = NULL;
|
|
|
- *next_circ_on_chan_p(cur, chan) = NULL;
|
|
|
- cur = next;
|
|
|
- } while (cur != head);
|
|
|
- chan->active_circuits = NULL;
|
|
|
-
|
|
|
- SMARTLIST_FOREACH(chan->active_circuit_pqueue,
|
|
|
- cell_ewma_t *, e,
|
|
|
- e->heap_index = -1);
|
|
|
- smartlist_clear(chan->active_circuit_pqueue);
|
|
|
+ circuitmux_detach_all_circuits(chan->cmux);
|
|
|
+ chan->num_n_circuits = 0;
|
|
|
+ chan->num_p_circuits = 0;
|
|
|
}
|
|
|
|
|
|
/** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
|
|
@@ -2419,53 +2359,71 @@ set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan,
|
|
|
int
|
|
|
channel_flush_from_first_active_circuit(channel_t *chan, int max)
|
|
|
{
|
|
|
- int n_flushed;
|
|
|
+ circuitmux_t *cmux = NULL;
|
|
|
+ int n_flushed = 0;
|
|
|
cell_queue_t *queue;
|
|
|
circuit_t *circ;
|
|
|
+ or_circuit_t *or_circ;
|
|
|
int streams_blocked;
|
|
|
+ packed_cell_t *cell;
|
|
|
|
|
|
+#if 0
|
|
|
/* The current (hi-res) time */
|
|
|
struct timeval now_hires;
|
|
|
|
|
|
/* The EWMA cell counter for the circuit we're flushing. */
|
|
|
cell_ewma_t *cell_ewma = NULL;
|
|
|
double ewma_increment = -1;
|
|
|
+#endif
|
|
|
|
|
|
+ /* Get the cmux */
|
|
|
tor_assert(chan);
|
|
|
+ tor_assert(chan->cmux);
|
|
|
+ cmux = chan->cmux;
|
|
|
+
|
|
|
+ /* Main loop: pick a circuit, send a cell, update the cmux */
|
|
|
+ while (n_flushed < max) {
|
|
|
+ circ = circuitmux_get_first_active_circuit(cmux);
|
|
|
+ /* If it returns NULL, no cells left to send */
|
|
|
+ if (!circ) break;
|
|
|
+ assert_cmux_ok_paranoid(chan);
|
|
|
+
|
|
|
+#if 0
|
|
|
+ /* This will go in circuitmux_get_first_active_circuit() */
|
|
|
+ /* See if we're doing the ewma circuit selection algorithm. */
|
|
|
+ if (ewma_enabled) {
|
|
|
+ unsigned tick;
|
|
|
+ double fractional_tick;
|
|
|
+ tor_gettimeofday_cached(&now_hires);
|
|
|
+ tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick);
|
|
|
+
|
|
|
+ if (tick != chan->active_circuit_pqueue_last_recalibrated) {
|
|
|
+ scale_active_circuits(chan, tick);
|
|
|
+ }
|
|
|
|
|
|
- circ = chan->active_circuits;
|
|
|
- if (!circ) return 0;
|
|
|
- assert_active_circuits_ok_paranoid(chan);
|
|
|
-
|
|
|
- /* See if we're doing the ewma circuit selection algorithm. */
|
|
|
- if (ewma_enabled) {
|
|
|
- unsigned tick;
|
|
|
- double fractional_tick;
|
|
|
- tor_gettimeofday_cached(&now_hires);
|
|
|
- tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick);
|
|
|
+ ewma_increment = pow(ewma_scale_factor, -fractional_tick);
|
|
|
|
|
|
- if (tick != chan->active_circuit_pqueue_last_recalibrated) {
|
|
|
- scale_active_circuits(chan, tick);
|
|
|
+ cell_ewma = smartlist_get(chan->active_circuit_pqueue, 0);
|
|
|
+ circ = cell_ewma_to_circuit(cell_ewma);
|
|
|
}
|
|
|
+#endif
|
|
|
|
|
|
- ewma_increment = pow(ewma_scale_factor, -fractional_tick);
|
|
|
-
|
|
|
- cell_ewma = smartlist_get(chan->active_circuit_pqueue, 0);
|
|
|
- circ = cell_ewma_to_circuit(cell_ewma);
|
|
|
- }
|
|
|
-
|
|
|
- if (circ->n_chan == chan) {
|
|
|
- queue = &circ->n_chan_cells;
|
|
|
- streams_blocked = circ->streams_blocked_on_n_chan;
|
|
|
- } else {
|
|
|
- queue = &TO_OR_CIRCUIT(circ)->p_chan_cells;
|
|
|
- streams_blocked = circ->streams_blocked_on_p_chan;
|
|
|
- }
|
|
|
- tor_assert(*next_circ_on_chan_p(circ, chan));
|
|
|
+ if (circ->n_chan == chan) {
|
|
|
+ queue = &circ->n_chan_cells;
|
|
|
+ streams_blocked = circ->streams_blocked_on_n_chan;
|
|
|
+ } else {
|
|
|
+ or_circ = TO_OR_CIRCUIT(circ);
|
|
|
+ tor_assert(or_circ->p_chan == chan);
|
|
|
+ queue = &TO_OR_CIRCUIT(circ)->p_chan_cells;
|
|
|
+ streams_blocked = circ->streams_blocked_on_p_chan;
|
|
|
+ }
|
|
|
|
|
|
- for (n_flushed = 0; n_flushed < max && queue->head; ) {
|
|
|
- packed_cell_t *cell = cell_queue_pop(queue);
|
|
|
- tor_assert(*next_circ_on_chan_p(circ, chan));
|
|
|
+ /*
|
|
|
+ * Get just one cell here; once we've sent it, that can change the circuit
|
|
|
+ * selection, so we have to loop around for another even if this circuit
|
|
|
+ * has more than one.
|
|
|
+ */
|
|
|
+ cell = cell_queue_pop(queue);
|
|
|
|
|
|
/* Calculate the exact time that this cell has spent in the queue. */
|
|
|
if (get_options()->CellStatistics && !CIRCUIT_IS_ORIGIN(circ)) {
|
|
@@ -2481,8 +2439,8 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
|
|
|
"Looks like the CellStatistics option was "
|
|
|
"recently enabled.");
|
|
|
} else {
|
|
|
- or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
|
|
|
insertion_time_elem_t *elem = it_queue->first;
|
|
|
+ or_circ = TO_OR_CIRCUIT(circ);
|
|
|
cell_waiting_time =
|
|
|
(uint32_t)((flushed * 10L + SECONDS_IN_A_DAY * 1000L -
|
|
|
elem->insertion_time * 10L) %
|
|
@@ -2495,8 +2453,8 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
|
|
|
it_queue->last = NULL;
|
|
|
mp_pool_release(elem);
|
|
|
}
|
|
|
- orcirc->total_cell_waiting_time += cell_waiting_time;
|
|
|
- orcirc->processed_cells++;
|
|
|
+ or_circ->total_cell_waiting_time += cell_waiting_time;
|
|
|
+ or_circ->processed_cells++;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2507,14 +2465,34 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
|
|
|
DIRREQ_TUNNELED,
|
|
|
DIRREQ_CIRC_QUEUE_FLUSHED);
|
|
|
|
|
|
+ /* Now send the cell */
|
|
|
channel_write_packed_cell(chan, cell);
|
|
|
+ cell = NULL;
|
|
|
+
|
|
|
/*
|
|
|
* Don't packed_cell_free_unchecked(cell) here because the channel will
|
|
|
* do so when it gets out of the channel queue (probably already did, in
|
|
|
* which case that was an immediate double-free bug).
|
|
|
*/
|
|
|
|
|
|
+ /* Update the counter */
|
|
|
++n_flushed;
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Now update the cmux; tell it we've just sent a cell, and how many
|
|
|
+ * we have left.
|
|
|
+ */
|
|
|
+ circuitmux_notify_xmit_cells(cmux, circ, 1);
|
|
|
+ circuitmux_set_num_cells(cmux, circ, queue->n);
|
|
|
+ if (queue->n == 0)
|
|
|
+ log_debug(LD_GENERAL, "Made a circuit inactive.");
|
|
|
+
|
|
|
+ /* Is the cell queue low enough to unblock all the streams that are waiting
|
|
|
+ * to write to this circuit? */
|
|
|
+ if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
|
|
|
+ set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */
|
|
|
+
|
|
|
+#if 0
|
|
|
if (cell_ewma) {
|
|
|
cell_ewma_t *tmp;
|
|
|
cell_ewma->cell_count += ewma_increment;
|
|
@@ -2534,22 +2512,13 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
|
|
|
assert_active_circuits_ok_paranoid(chan);
|
|
|
goto done;
|
|
|
}
|
|
|
- }
|
|
|
- tor_assert(*next_circ_on_chan_p(circ, chan));
|
|
|
- assert_active_circuits_ok_paranoid(chan);
|
|
|
- chan->active_circuits = *next_circ_on_chan_p(circ, chan);
|
|
|
-
|
|
|
- /* Is the cell queue low enough to unblock all the streams that are waiting
|
|
|
- * to write to this circuit? */
|
|
|
- if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
|
|
|
- set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */
|
|
|
+#endif
|
|
|
|
|
|
- /* Did we just run out of cells on this circuit's queue? */
|
|
|
- if (queue->n == 0) {
|
|
|
- log_debug(LD_GENERAL, "Made a circuit inactive.");
|
|
|
- make_circuit_inactive_on_chan(circ, chan);
|
|
|
+ /* If n_flushed < max still, loop around and pick another circuit */
|
|
|
}
|
|
|
- done:
|
|
|
+
|
|
|
+ /* Okay, we're done sending now */
|
|
|
+ assert_cmux_ok_paranoid(chan);
|
|
|
|
|
|
return n_flushed;
|
|
|
}
|
|
@@ -2587,11 +2556,11 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
|
|
|
set_streams_blocked_on_circ(circ, chan, 1, fromstream);
|
|
|
}
|
|
|
|
|
|
+ update_circuit_on_cmux(circ, direction);
|
|
|
if (queue->n == 1) {
|
|
|
- /* This was the first cell added to the queue. We need to make this
|
|
|
+ /* This was the first cell added to the queue. We just made this
|
|
|
* circuit active. */
|
|
|
log_debug(LD_GENERAL, "Made a circuit active.");
|
|
|
- make_circuit_active_on_chan(circ, chan);
|
|
|
}
|
|
|
|
|
|
if (!channel_has_queued_writes(chan)) {
|
|
@@ -2669,20 +2638,37 @@ void
|
|
|
circuit_clear_cell_queue(circuit_t *circ, channel_t *chan)
|
|
|
{
|
|
|
cell_queue_t *queue;
|
|
|
+ cell_direction_t direction;
|
|
|
+
|
|
|
if (circ->n_chan == chan) {
|
|
|
queue = &circ->n_chan_cells;
|
|
|
+ direction = CELL_DIRECTION_OUT;
|
|
|
} else {
|
|
|
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
|
|
|
tor_assert(orcirc->p_chan == chan);
|
|
|
queue = &orcirc->p_chan_cells;
|
|
|
+ direction = CELL_DIRECTION_IN;
|
|
|
}
|
|
|
|
|
|
- if (queue->n)
|
|
|
- make_circuit_inactive_on_chan(circ, chan);
|
|
|
-
|
|
|
+ /* Clear the queue */
|
|
|
cell_queue_clear(queue);
|
|
|
+
|
|
|
+ /* Update the cell counter in the cmux */
|
|
|
+ update_circuit_on_cmux(circ, direction);
|
|
|
+}
|
|
|
+
|
|
|
+/** Fail with an assert if the circuit mux on chan is corrupt
|
|
|
+ */
|
|
|
+void
|
|
|
+assert_circuit_mux_okay(channel_t *chan)
|
|
|
+{
|
|
|
+ tor_assert(chan);
|
|
|
+ tor_assert(chan->cmux);
|
|
|
+
|
|
|
+ circuitmux_assert_okay(chan->cmux);
|
|
|
}
|
|
|
|
|
|
+#if 0
|
|
|
/** Fail with an assert if the active circuits ring on <b>orconn</b> is
|
|
|
* corrupt. */
|
|
|
void
|
|
@@ -2721,6 +2707,7 @@ assert_active_circuits_ok(channel_t *chan)
|
|
|
|
|
|
tor_assert(n == smartlist_len(chan->active_circuit_pqueue));
|
|
|
}
|
|
|
+#endif
|
|
|
|
|
|
/** Return 1 if we shouldn't restart reading on this circuit, even if
|
|
|
* we get a SENDME. Else return 0.
|