|
@@ -13,6 +13,9 @@
|
|
|
|
|
|
#define TOR_CHANNEL_INTERNAL_
|
|
|
|
|
|
+/* This one's for stuff only channel.c and the test suite should see */
|
|
|
+#define CHANNEL_PRIVATE_
|
|
|
+
|
|
|
#include "or.h"
|
|
|
#include "channel.h"
|
|
|
#include "channeltls.h"
|
|
@@ -29,29 +32,7 @@
|
|
|
#include "rephist.h"
|
|
|
#include "router.h"
|
|
|
#include "routerlist.h"
|
|
|
-
|
|
|
-/* Cell queue structure */
|
|
|
-
|
|
|
-typedef struct cell_queue_entry_s cell_queue_entry_t;
|
|
|
-struct cell_queue_entry_s {
|
|
|
- TOR_SIMPLEQ_ENTRY(cell_queue_entry_s) next;
|
|
|
- enum {
|
|
|
- CELL_QUEUE_FIXED,
|
|
|
- CELL_QUEUE_VAR,
|
|
|
- CELL_QUEUE_PACKED
|
|
|
- } type;
|
|
|
- union {
|
|
|
- struct {
|
|
|
- cell_t *cell;
|
|
|
- } fixed;
|
|
|
- struct {
|
|
|
- var_cell_t *var_cell;
|
|
|
- } var;
|
|
|
- struct {
|
|
|
- packed_cell_t *packed_cell;
|
|
|
- } packed;
|
|
|
- } u;
|
|
|
-};
|
|
|
+#include "scheduler.h"
|
|
|
|
|
|
/* Global lists of channels */
|
|
|
|
|
@@ -76,6 +57,60 @@ static smartlist_t *finished_listeners = NULL;
|
|
|
/* Counter for ID numbers */
|
|
|
static uint64_t n_channels_allocated = 0;
|
|
|
|
|
|
+/*
|
|
|
+ * Channel global byte/cell counters, for statistics and for scheduler high
|
|
|
+ * /low-water marks.
|
|
|
+ */
|
|
|
+
|
|
|
+/*
|
|
|
+ * Total number of cells ever given to any channel with the
|
|
|
+ * channel_write_*_cell() functions.
|
|
|
+ */
|
|
|
+
|
|
|
+static uint64_t n_channel_cells_queued = 0;
|
|
|
+
|
|
|
+/*
|
|
|
+ * Total number of cells ever passed to a channel lower layer with the
|
|
|
+ * write_*_cell() methods.
|
|
|
+ */
|
|
|
+
|
|
|
+static uint64_t n_channel_cells_passed_to_lower_layer = 0;
|
|
|
+
|
|
|
+/*
|
|
|
+ * Current number of cells in all channel queues; should be
|
|
|
+ * n_channel_cells_queued - n_channel_cells_passed_to_lower_layer.
|
|
|
+ */
|
|
|
+
|
|
|
+static uint64_t n_channel_cells_in_queues = 0;
|
|
|
+
|
|
|
+/*
|
|
|
+ * Total number of bytes for all cells ever queued to a channel and
|
|
|
+ * counted in n_channel_cells_queued.
|
|
|
+ */
|
|
|
+
|
|
|
+static uint64_t n_channel_bytes_queued = 0;
|
|
|
+
|
|
|
+/*
|
|
|
+ * Total number of bytes for all cells ever passed to a channel lower layer
|
|
|
+ * and counted in n_channel_cells_passed_to_lower_layer.
|
|
|
+ */
|
|
|
+
|
|
|
+static uint64_t n_channel_bytes_passed_to_lower_layer = 0;
|
|
|
+
|
|
|
+/*
|
|
|
+ * Current number of bytes in all channel queues; should be
|
|
|
+ * n_channel_bytes_queued - n_channel_bytes_passed_to_lower_layer.
|
|
|
+ */
|
|
|
+
|
|
|
+static uint64_t n_channel_bytes_in_queues = 0;
|
|
|
+
|
|
|
+/*
|
|
|
+ * Current total estimated queue size *including lower layer queues and
|
|
|
+ * transmit overhead*
|
|
|
+ */
|
|
|
+
|
|
|
+STATIC uint64_t estimated_total_queue_size = 0;
|
|
|
+
|
|
|
/* Digest->channel map
|
|
|
*
|
|
|
* Similar to the one used in connection_or.c, this maps from the identity
|
|
@@ -123,6 +158,8 @@ cell_queue_entry_new_var(var_cell_t *var_cell);
|
|
|
static int is_destroy_cell(channel_t *chan,
|
|
|
const cell_queue_entry_t *q, circid_t *circid_out);
|
|
|
|
|
|
+static void channel_assert_counter_consistency(void);
|
|
|
+
|
|
|
/* Functions to maintain the digest map */
|
|
|
static void channel_add_to_digest_map(channel_t *chan);
|
|
|
static void channel_remove_from_digest_map(channel_t *chan);
|
|
@@ -140,6 +177,8 @@ channel_free_list(smartlist_t *channels, int mark_for_close);
|
|
|
static void
|
|
|
channel_listener_free_list(smartlist_t *channels, int mark_for_close);
|
|
|
static void channel_listener_force_free(channel_listener_t *chan_l);
|
|
|
+static size_t channel_get_cell_queue_entry_size(channel_t *chan,
|
|
|
+ cell_queue_entry_t *q);
|
|
|
static void
|
|
|
channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q);
|
|
|
|
|
@@ -746,6 +785,9 @@ channel_init(channel_t *chan)
|
|
|
|
|
|
/* It hasn't been open yet. */
|
|
|
chan->has_been_open = 0;
|
|
|
+
|
|
|
+ /* Scheduler state is idle */
|
|
|
+ chan->scheduler_state = SCHED_CHAN_IDLE;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -788,6 +830,9 @@ channel_free(channel_t *chan)
|
|
|
"Freeing channel " U64_FORMAT " at %p",
|
|
|
U64_PRINTF_ARG(chan->global_identifier), chan);
|
|
|
|
|
|
+ /* Get this one out of the scheduler */
|
|
|
+ scheduler_release_channel(chan);
|
|
|
+
|
|
|
/*
|
|
|
* Get rid of cmux policy before we do anything, so cmux policies don't
|
|
|
* see channels in weird half-freed states.
|
|
@@ -863,6 +908,9 @@ channel_force_free(channel_t *chan)
|
|
|
"Force-freeing channel " U64_FORMAT " at %p",
|
|
|
U64_PRINTF_ARG(chan->global_identifier), chan);
|
|
|
|
|
|
+ /* Get this one out of the scheduler */
|
|
|
+ scheduler_release_channel(chan);
|
|
|
+
|
|
|
/*
|
|
|
* Get rid of cmux policy before we do anything, so cmux policies don't
|
|
|
* see channels in weird half-freed states.
|
|
@@ -1665,6 +1713,36 @@ cell_queue_entry_new_var(var_cell_t *var_cell)
|
|
|
return q;
|
|
|
}
|
|
|
|
|
|
+/**
|
|
|
+ * Ask how big the cell contained in a cell_queue_entry_t is
|
|
|
+ */
|
|
|
+
|
|
|
+static size_t
|
|
|
+channel_get_cell_queue_entry_size(channel_t *chan, cell_queue_entry_t *q)
|
|
|
+{
|
|
|
+ size_t rv = 0;
|
|
|
+
|
|
|
+ tor_assert(chan);
|
|
|
+ tor_assert(q);
|
|
|
+
|
|
|
+ switch (q->type) {
|
|
|
+ case CELL_QUEUE_FIXED:
|
|
|
+ rv = get_cell_network_size(chan->wide_circ_ids);
|
|
|
+ break;
|
|
|
+ case CELL_QUEUE_VAR:
|
|
|
+ rv = get_var_cell_header_size(chan->wide_circ_ids) +
|
|
|
+ (q->u.var.var_cell ? q->u.var.var_cell->payload_len : 0);
|
|
|
+ break;
|
|
|
+ case CELL_QUEUE_PACKED:
|
|
|
+ rv = get_cell_network_size(chan->wide_circ_ids);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ tor_assert(1);
|
|
|
+ }
|
|
|
+
|
|
|
+ return rv;
|
|
|
+}
|
|
|
+
|
|
|
/**
|
|
|
* Write to a channel based on a cell_queue_entry_t
|
|
|
*
|
|
@@ -1677,6 +1755,7 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
|
|
|
{
|
|
|
int result = 0, sent = 0;
|
|
|
cell_queue_entry_t *tmp = NULL;
|
|
|
+ size_t cell_bytes;
|
|
|
|
|
|
tor_assert(chan);
|
|
|
tor_assert(q);
|
|
@@ -1693,6 +1772,9 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /* For statistical purposes, figure out how big this cell is */
|
|
|
+ cell_bytes = channel_get_cell_queue_entry_size(chan, q);
|
|
|
+
|
|
|
/* Can we send it right out? If so, try */
|
|
|
if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue) &&
|
|
|
chan->state == CHANNEL_STATE_OPEN) {
|
|
@@ -1726,6 +1808,13 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
|
|
|
channel_timestamp_drained(chan);
|
|
|
/* Update the counter */
|
|
|
++(chan->n_cells_xmitted);
|
|
|
+ chan->n_bytes_xmitted += cell_bytes;
|
|
|
+ /* Update global counters */
|
|
|
+ ++n_channel_cells_queued;
|
|
|
+ ++n_channel_cells_passed_to_lower_layer;
|
|
|
+ n_channel_bytes_queued += cell_bytes;
|
|
|
+ n_channel_bytes_passed_to_lower_layer += cell_bytes;
|
|
|
+ channel_assert_counter_consistency();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1737,6 +1826,14 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
|
|
|
*/
|
|
|
tmp = cell_queue_entry_dup(q);
|
|
|
TOR_SIMPLEQ_INSERT_TAIL(&chan->outgoing_queue, tmp, next);
|
|
|
+ /* Update global counters */
|
|
|
+ ++n_channel_cells_queued;
|
|
|
+ ++n_channel_cells_in_queues;
|
|
|
+ n_channel_bytes_queued += cell_bytes;
|
|
|
+ n_channel_bytes_in_queues += cell_bytes;
|
|
|
+ channel_assert_counter_consistency();
|
|
|
+ /* Update channel queue size */
|
|
|
+ chan->bytes_in_queue += cell_bytes;
|
|
|
/* Try to process the queue? */
|
|
|
if (chan->state == CHANNEL_STATE_OPEN) channel_flush_cells(chan);
|
|
|
}
|
|
@@ -1775,6 +1872,9 @@ channel_write_cell(channel_t *chan, cell_t *cell)
|
|
|
q.type = CELL_QUEUE_FIXED;
|
|
|
q.u.fixed.cell = cell;
|
|
|
channel_write_cell_queue_entry(chan, &q);
|
|
|
+
|
|
|
+ /* Update the queue size estimate */
|
|
|
+ channel_update_xmit_queue_size(chan);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1810,6 +1910,9 @@ channel_write_packed_cell(channel_t *chan, packed_cell_t *packed_cell)
|
|
|
q.type = CELL_QUEUE_PACKED;
|
|
|
q.u.packed.packed_cell = packed_cell;
|
|
|
channel_write_cell_queue_entry(chan, &q);
|
|
|
+
|
|
|
+ /* Update the queue size estimate */
|
|
|
+ channel_update_xmit_queue_size(chan);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1846,6 +1949,9 @@ channel_write_var_cell(channel_t *chan, var_cell_t *var_cell)
|
|
|
q.type = CELL_QUEUE_VAR;
|
|
|
q.u.var.var_cell = var_cell;
|
|
|
channel_write_cell_queue_entry(chan, &q);
|
|
|
+
|
|
|
+ /* Update the queue size estimate */
|
|
|
+ channel_update_xmit_queue_size(chan);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1941,6 +2047,41 @@ channel_change_state(channel_t *chan, channel_state_t to_state)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /*
|
|
|
+ * If we're going to a closed/closing state, we don't need scheduling any
|
|
|
+ * more; in CHANNEL_STATE_MAINT we can't accept writes.
|
|
|
+ */
|
|
|
+ if (to_state == CHANNEL_STATE_CLOSING ||
|
|
|
+ to_state == CHANNEL_STATE_CLOSED ||
|
|
|
+ to_state == CHANNEL_STATE_ERROR) {
|
|
|
+ scheduler_release_channel(chan);
|
|
|
+ } else if (to_state == CHANNEL_STATE_MAINT) {
|
|
|
+ scheduler_channel_doesnt_want_writes(chan);
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * If we're closing, this channel no longer counts toward the global
|
|
|
+ * estimated queue size; if we're open, it now does.
|
|
|
+ */
|
|
|
+ if ((to_state == CHANNEL_STATE_CLOSING ||
|
|
|
+ to_state == CHANNEL_STATE_CLOSED ||
|
|
|
+ to_state == CHANNEL_STATE_ERROR) &&
|
|
|
+ (from_state == CHANNEL_STATE_OPEN ||
|
|
|
+ from_state == CHANNEL_STATE_MAINT)) {
|
|
|
+ estimated_total_queue_size -= chan->bytes_in_queue;
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * If we're opening, this channel now does count toward the global
|
|
|
+ * estimated queue size.
|
|
|
+ */
|
|
|
+ if ((to_state == CHANNEL_STATE_OPEN ||
|
|
|
+ to_state == CHANNEL_STATE_MAINT) &&
|
|
|
+ !(from_state == CHANNEL_STATE_OPEN ||
|
|
|
+ from_state == CHANNEL_STATE_MAINT)) {
|
|
|
+ estimated_total_queue_size += chan->bytes_in_queue;
|
|
|
+ }
|
|
|
+
|
|
|
/* Tell circuits if we opened and stuff */
|
|
|
if (to_state == CHANNEL_STATE_OPEN) {
|
|
|
channel_do_open_actions(chan);
|
|
@@ -2056,12 +2197,13 @@ channel_listener_change_state(channel_listener_t *chan_l,
|
|
|
|
|
|
#define MAX_CELLS_TO_GET_FROM_CIRCUITS_FOR_UNLIMITED 256
|
|
|
|
|
|
-ssize_t
|
|
|
-channel_flush_some_cells(channel_t *chan, ssize_t num_cells)
|
|
|
+MOCK_IMPL(ssize_t,
|
|
|
+channel_flush_some_cells, (channel_t *chan, ssize_t num_cells))
|
|
|
{
|
|
|
unsigned int unlimited = 0;
|
|
|
ssize_t flushed = 0;
|
|
|
int num_cells_from_circs, clamped_num_cells;
|
|
|
+ int q_len_before, q_len_after;
|
|
|
|
|
|
tor_assert(chan);
|
|
|
|
|
@@ -2087,14 +2229,45 @@ channel_flush_some_cells(channel_t *chan, ssize_t num_cells)
|
|
|
clamped_num_cells = (int)(num_cells - flushed);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Keep track of the change in queue size; we have to count cells
|
|
|
+ * channel_flush_from_first_active_circuit() writes out directly,
|
|
|
+ * but not double-count ones we might get later in
|
|
|
+ * channel_flush_some_cells_from_outgoing_queue()
|
|
|
+ */
|
|
|
+ q_len_before = chan_cell_queue_len(&(chan->outgoing_queue));
|
|
|
+
|
|
|
/* Try to get more cells from any active circuits */
|
|
|
num_cells_from_circs = channel_flush_from_first_active_circuit(
|
|
|
chan, clamped_num_cells);
|
|
|
|
|
|
- /* If it claims we got some, process the queue again */
|
|
|
+ q_len_after = chan_cell_queue_len(&(chan->outgoing_queue));
|
|
|
+
|
|
|
+ /*
|
|
|
+ * If it claims we got some, adjust the flushed counter and consider
|
|
|
+ * processing the queue again
|
|
|
+ */
|
|
|
if (num_cells_from_circs > 0) {
|
|
|
- flushed += channel_flush_some_cells_from_outgoing_queue(chan,
|
|
|
- (unlimited ? -1 : num_cells - flushed));
|
|
|
+ /*
|
|
|
+ * Adjust flushed by the number of cells counted in
|
|
|
+ * num_cells_from_circs that didn't go to the cell queue.
|
|
|
+ */
|
|
|
+
|
|
|
+ if (q_len_after > q_len_before) {
|
|
|
+ num_cells_from_circs -= (q_len_after - q_len_before);
|
|
|
+ if (num_cells_from_circs < 0) num_cells_from_circs = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ flushed += num_cells_from_circs;
|
|
|
+
|
|
|
+ /* Now process the queue if necessary */
|
|
|
+
|
|
|
+ if ((q_len_after > q_len_before) &&
|
|
|
+ (unlimited || (flushed < num_cells))) {
|
|
|
+ flushed += channel_flush_some_cells_from_outgoing_queue(chan,
|
|
|
+ (unlimited ? -1 : num_cells - flushed));
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -2117,6 +2290,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
|
|
|
unsigned int unlimited = 0;
|
|
|
ssize_t flushed = 0;
|
|
|
cell_queue_entry_t *q = NULL;
|
|
|
+ size_t cell_size;
|
|
|
+ int free_q = 0, handed_off = 0;
|
|
|
|
|
|
tor_assert(chan);
|
|
|
tor_assert(chan->write_cell);
|
|
@@ -2130,8 +2305,12 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
|
|
|
if (chan->state == CHANNEL_STATE_OPEN) {
|
|
|
while ((unlimited || num_cells > flushed) &&
|
|
|
NULL != (q = TOR_SIMPLEQ_FIRST(&chan->outgoing_queue))) {
|
|
|
+ free_q = 0;
|
|
|
+ handed_off = 0;
|
|
|
|
|
|
if (1) {
|
|
|
+ /* Figure out how big it is for statistical purposes */
|
|
|
+ cell_size = channel_get_cell_queue_entry_size(chan, q);
|
|
|
/*
|
|
|
* Okay, we have a good queue entry, try to give it to the lower
|
|
|
* layer.
|
|
@@ -2144,8 +2323,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
|
|
|
++flushed;
|
|
|
channel_timestamp_xmit(chan);
|
|
|
++(chan->n_cells_xmitted);
|
|
|
- cell_queue_entry_free(q, 1);
|
|
|
- q = NULL;
|
|
|
+ chan->n_bytes_xmitted += cell_size;
|
|
|
+ free_q = 1;
|
|
|
+ handed_off = 1;
|
|
|
}
|
|
|
/* Else couldn't write it; leave it on the queue */
|
|
|
} else {
|
|
@@ -2156,8 +2336,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
|
|
|
"(global ID " U64_FORMAT ").",
|
|
|
chan, U64_PRINTF_ARG(chan->global_identifier));
|
|
|
/* Throw it away */
|
|
|
- cell_queue_entry_free(q, 0);
|
|
|
- q = NULL;
|
|
|
+ free_q = 1;
|
|
|
+ handed_off = 0;
|
|
|
}
|
|
|
break;
|
|
|
case CELL_QUEUE_PACKED:
|
|
@@ -2167,8 +2347,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
|
|
|
++flushed;
|
|
|
channel_timestamp_xmit(chan);
|
|
|
++(chan->n_cells_xmitted);
|
|
|
- cell_queue_entry_free(q, 1);
|
|
|
- q = NULL;
|
|
|
+ chan->n_bytes_xmitted += cell_size;
|
|
|
+ free_q = 1;
|
|
|
+ handed_off = 1;
|
|
|
}
|
|
|
/* Else couldn't write it; leave it on the queue */
|
|
|
} else {
|
|
@@ -2179,8 +2360,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
|
|
|
"(global ID " U64_FORMAT ").",
|
|
|
chan, U64_PRINTF_ARG(chan->global_identifier));
|
|
|
/* Throw it away */
|
|
|
- cell_queue_entry_free(q, 0);
|
|
|
- q = NULL;
|
|
|
+ free_q = 1;
|
|
|
+ handed_off = 0;
|
|
|
}
|
|
|
break;
|
|
|
case CELL_QUEUE_VAR:
|
|
@@ -2190,8 +2371,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
|
|
|
++flushed;
|
|
|
channel_timestamp_xmit(chan);
|
|
|
++(chan->n_cells_xmitted);
|
|
|
- cell_queue_entry_free(q, 1);
|
|
|
- q = NULL;
|
|
|
+ chan->n_bytes_xmitted += cell_size;
|
|
|
+ free_q = 1;
|
|
|
+ handed_off = 1;
|
|
|
}
|
|
|
/* Else couldn't write it; leave it on the queue */
|
|
|
} else {
|
|
@@ -2202,8 +2384,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
|
|
|
"(global ID " U64_FORMAT ").",
|
|
|
chan, U64_PRINTF_ARG(chan->global_identifier));
|
|
|
/* Throw it away */
|
|
|
- cell_queue_entry_free(q, 0);
|
|
|
- q = NULL;
|
|
|
+ free_q = 1;
|
|
|
+ handed_off = 0;
|
|
|
}
|
|
|
break;
|
|
|
default:
|
|
@@ -2213,12 +2395,32 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
|
|
|
"(global ID " U64_FORMAT "; ignoring it."
|
|
|
" Someone should fix this.",
|
|
|
q->type, chan, U64_PRINTF_ARG(chan->global_identifier));
|
|
|
- cell_queue_entry_free(q, 0);
|
|
|
- q = NULL;
|
|
|
+ free_q = 1;
|
|
|
+ handed_off = 0;
|
|
|
}
|
|
|
|
|
|
- /* if q got NULLed out, we used it and should remove the queue entry */
|
|
|
- if (!q) TOR_SIMPLEQ_REMOVE_HEAD(&chan->outgoing_queue, next);
|
|
|
+ /*
|
|
|
+ * if free_q is set, we used it and should remove the queue entry;
|
|
|
+ * we have to do the free down here so TOR_SIMPLEQ_REMOVE_HEAD isn't
|
|
|
+ * accessing freed memory
|
|
|
+ */
|
|
|
+ if (free_q) {
|
|
|
+ TOR_SIMPLEQ_REMOVE_HEAD(&chan->outgoing_queue, next);
|
|
|
+ /*
|
|
|
+ * ...and we handed a cell off to the lower layer, so we should
|
|
|
+ * update the counters.
|
|
|
+ */
|
|
|
+ ++n_channel_cells_passed_to_lower_layer;
|
|
|
+ --n_channel_cells_in_queues;
|
|
|
+ n_channel_bytes_passed_to_lower_layer += cell_size;
|
|
|
+ n_channel_bytes_in_queues -= cell_size;
|
|
|
+ channel_assert_counter_consistency();
|
|
|
+ /* Update the channel's queue size too */
|
|
|
+ chan->bytes_in_queue -= cell_size;
|
|
|
+ /* Finally, free q */
|
|
|
+ cell_queue_entry_free(q, handed_off);
|
|
|
+ q = NULL;
|
|
|
+ }
|
|
|
/* No cell removed from list, so we can't go on any further */
|
|
|
else break;
|
|
|
}
|
|
@@ -2230,6 +2432,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
|
|
|
channel_timestamp_drained(chan);
|
|
|
}
|
|
|
|
|
|
+ /* Update the estimate queue size */
|
|
|
+ channel_update_xmit_queue_size(chan);
|
|
|
+
|
|
|
return flushed;
|
|
|
}
|
|
|
|
|
@@ -2541,8 +2746,9 @@ channel_queue_cell(channel_t *chan, cell_t *cell)
|
|
|
/* Timestamp for receiving */
|
|
|
channel_timestamp_recv(chan);
|
|
|
|
|
|
- /* Update the counter */
|
|
|
+ /* Update the counters */
|
|
|
++(chan->n_cells_recved);
|
|
|
+ chan->n_bytes_recved += get_cell_network_size(chan->wide_circ_ids);
|
|
|
|
|
|
/* If we don't need to queue we can just call cell_handler */
|
|
|
if (!need_to_queue) {
|
|
@@ -2596,6 +2802,8 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell)
|
|
|
|
|
|
/* Update the counter */
|
|
|
++(chan->n_cells_recved);
|
|
|
+ chan->n_bytes_recved += get_var_cell_header_size(chan->wide_circ_ids) +
|
|
|
+ var_cell->payload_len;
|
|
|
|
|
|
/* If we don't need to queue we can just call cell_handler */
|
|
|
if (!need_to_queue) {
|
|
@@ -2645,6 +2853,19 @@ packed_cell_is_destroy(channel_t *chan,
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
+/**
|
|
|
+ * Assert that the global channel stats counters are internally consistent
|
|
|
+ */
|
|
|
+
|
|
|
+static void
|
|
|
+channel_assert_counter_consistency(void)
|
|
|
+{
|
|
|
+ tor_assert(n_channel_cells_queued ==
|
|
|
+ (n_channel_cells_in_queues + n_channel_cells_passed_to_lower_layer));
|
|
|
+ tor_assert(n_channel_bytes_queued ==
|
|
|
+ (n_channel_bytes_in_queues + n_channel_bytes_passed_to_lower_layer));
|
|
|
+}
|
|
|
+
|
|
|
/** DOCDOC */
|
|
|
static int
|
|
|
is_destroy_cell(channel_t *chan,
|
|
@@ -2726,6 +2947,19 @@ void
|
|
|
channel_dumpstats(int severity)
|
|
|
{
|
|
|
if (all_channels && smartlist_len(all_channels) > 0) {
|
|
|
+ tor_log(severity, LD_GENERAL,
|
|
|
+ "Channels have queued " U64_FORMAT " bytes in " U64_FORMAT " cells, "
|
|
|
+ "and handed " U64_FORMAT " bytes in " U64_FORMAT " cells to the lower"
|
|
|
+ " layer.",
|
|
|
+ U64_PRINTF_ARG(n_channel_bytes_queued),
|
|
|
+ U64_PRINTF_ARG(n_channel_cells_queued),
|
|
|
+ U64_PRINTF_ARG(n_channel_bytes_passed_to_lower_layer),
|
|
|
+ U64_PRINTF_ARG(n_channel_cells_passed_to_lower_layer));
|
|
|
+ tor_log(severity, LD_GENERAL,
|
|
|
+ "There are currently " U64_FORMAT " bytes in " U64_FORMAT " cells "
|
|
|
+ "in channel queues.",
|
|
|
+ U64_PRINTF_ARG(n_channel_bytes_in_queues),
|
|
|
+ U64_PRINTF_ARG(n_channel_cells_in_queues));
|
|
|
tor_log(severity, LD_GENERAL,
|
|
|
"Dumping statistics about %d channels:",
|
|
|
smartlist_len(all_channels));
|
|
@@ -3200,7 +3434,7 @@ channel_listener_describe_transport(channel_listener_t *chan_l)
|
|
|
/**
|
|
|
* Return the number of entries in <b>queue</b>
|
|
|
*/
|
|
|
-static int
|
|
|
+STATIC int
|
|
|
chan_cell_queue_len(const chan_cell_queue_t *queue)
|
|
|
{
|
|
|
int r = 0;
|
|
@@ -3216,8 +3450,8 @@ chan_cell_queue_len(const chan_cell_queue_t *queue)
|
|
|
* Dump statistics for one channel to the log
|
|
|
*/
|
|
|
|
|
|
-void
|
|
|
-channel_dump_statistics(channel_t *chan, int severity)
|
|
|
+MOCK_IMPL(void,
|
|
|
+channel_dump_statistics, (channel_t *chan, int severity))
|
|
|
{
|
|
|
double avg, interval, age;
|
|
|
time_t now = time(NULL);
|
|
@@ -3369,12 +3603,22 @@ channel_dump_statistics(channel_t *chan, int severity)
|
|
|
/* Describe counters and rates */
|
|
|
tor_log(severity, LD_GENERAL,
|
|
|
" * Channel " U64_FORMAT " has received "
|
|
|
- U64_FORMAT " cells and transmitted " U64_FORMAT,
|
|
|
+ U64_FORMAT " bytes in " U64_FORMAT " cells and transmitted "
|
|
|
+ U64_FORMAT " bytes in " U64_FORMAT " cells",
|
|
|
U64_PRINTF_ARG(chan->global_identifier),
|
|
|
+ U64_PRINTF_ARG(chan->n_bytes_recved),
|
|
|
U64_PRINTF_ARG(chan->n_cells_recved),
|
|
|
+ U64_PRINTF_ARG(chan->n_bytes_xmitted),
|
|
|
U64_PRINTF_ARG(chan->n_cells_xmitted));
|
|
|
if (now > chan->timestamp_created &&
|
|
|
chan->timestamp_created > 0) {
|
|
|
+ if (chan->n_bytes_recved > 0) {
|
|
|
+ avg = (double)(chan->n_bytes_recved) / age;
|
|
|
+ tor_log(severity, LD_GENERAL,
|
|
|
+ " * Channel " U64_FORMAT " has averaged %f "
|
|
|
+ "bytes received per second",
|
|
|
+ U64_PRINTF_ARG(chan->global_identifier), avg);
|
|
|
+ }
|
|
|
if (chan->n_cells_recved > 0) {
|
|
|
avg = (double)(chan->n_cells_recved) / age;
|
|
|
if (avg >= 1.0) {
|
|
@@ -3390,6 +3634,13 @@ channel_dump_statistics(channel_t *chan, int severity)
|
|
|
U64_PRINTF_ARG(chan->global_identifier), interval);
|
|
|
}
|
|
|
}
|
|
|
+ if (chan->n_bytes_xmitted > 0) {
|
|
|
+ avg = (double)(chan->n_bytes_xmitted) / age;
|
|
|
+ tor_log(severity, LD_GENERAL,
|
|
|
+ " * Channel " U64_FORMAT " has averaged %f "
|
|
|
+ "bytes transmitted per second",
|
|
|
+ U64_PRINTF_ARG(chan->global_identifier), avg);
|
|
|
+ }
|
|
|
if (chan->n_cells_xmitted > 0) {
|
|
|
avg = (double)(chan->n_cells_xmitted) / age;
|
|
|
if (avg >= 1.0) {
|
|
@@ -3807,6 +4058,50 @@ channel_mark_outgoing(channel_t *chan)
|
|
|
chan->is_incoming = 0;
|
|
|
}
|
|
|
|
|
|
+/************************
|
|
|
+ * Flow control queries *
|
|
|
+ ***********************/
|
|
|
+
|
|
|
+/*
|
|
|
+ * Get the latest estimate for the total queue size of all open channels
|
|
|
+ */
|
|
|
+
|
|
|
+uint64_t
|
|
|
+channel_get_global_queue_estimate(void)
|
|
|
+{
|
|
|
+ return estimated_total_queue_size;
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+ * Estimate the number of writeable cells
|
|
|
+ *
|
|
|
+ * Ask the lower layer for an estimate of how many cells it can accept, and
|
|
|
+ * then subtract the length of our outgoing_queue, if any, to produce an
|
|
|
+ * estimate of the number of cells this channel can accept for writes.
|
|
|
+ */
|
|
|
+
|
|
|
+int
|
|
|
+channel_num_cells_writeable(channel_t *chan)
|
|
|
+{
|
|
|
+ int result;
|
|
|
+
|
|
|
+ tor_assert(chan);
|
|
|
+ tor_assert(chan->num_cells_writeable);
|
|
|
+
|
|
|
+ if (chan->state == CHANNEL_STATE_OPEN) {
|
|
|
+ /* Query lower layer */
|
|
|
+ result = chan->num_cells_writeable(chan);
|
|
|
+ /* Subtract cell queue length, if any */
|
|
|
+ result -= chan_cell_queue_len(&chan->outgoing_queue);
|
|
|
+ if (result < 0) result = 0;
|
|
|
+ } else {
|
|
|
+ /* No cells are writeable in any other state */
|
|
|
+ result = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ return result;
|
|
|
+}
|
|
|
+
|
|
|
/*********************
|
|
|
* Timestamp updates *
|
|
|
********************/
|
|
@@ -4209,3 +4504,87 @@ channel_set_circid_type(channel_t *chan,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+/**
|
|
|
+ * Update the estimated number of bytes queued to transmit for this channel,
|
|
|
+ * and notify the scheduler. The estimate includes both the channel queue and
|
|
|
+ * the queue size reported by the lower layer, and an overhead estimate
|
|
|
+ * optionally provided by the lower layer.
|
|
|
+ */
|
|
|
+
|
|
|
+void
|
|
|
+channel_update_xmit_queue_size(channel_t *chan)
|
|
|
+{
|
|
|
+ uint64_t queued, adj;
|
|
|
+ double overhead;
|
|
|
+
|
|
|
+ tor_assert(chan);
|
|
|
+ tor_assert(chan->num_bytes_queued);
|
|
|
+
|
|
|
+ /*
|
|
|
+ * First, get the number of bytes we have queued without factoring in
|
|
|
+ * lower-layer overhead.
|
|
|
+ */
|
|
|
+ queued = chan->num_bytes_queued(chan) + chan->bytes_in_queue;
|
|
|
+ /* Next, adjust by the overhead factor, if any is available */
|
|
|
+ if (chan->get_overhead_estimate) {
|
|
|
+ overhead = chan->get_overhead_estimate(chan);
|
|
|
+ if (overhead >= 1.0f) {
|
|
|
+ queued *= overhead;
|
|
|
+ } else {
|
|
|
+ /* Ignore silly overhead factors */
|
|
|
+ log_notice(LD_CHANNEL, "Ignoring silly overhead factor %f", overhead);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Now, compare to the previous estimate */
|
|
|
+ if (queued > chan->bytes_queued_for_xmit) {
|
|
|
+ adj = queued - chan->bytes_queued_for_xmit;
|
|
|
+ log_debug(LD_CHANNEL,
|
|
|
+ "Increasing queue size for channel " U64_FORMAT " by " U64_FORMAT
|
|
|
+ " from " U64_FORMAT " to " U64_FORMAT,
|
|
|
+ U64_PRINTF_ARG(chan->global_identifier),
|
|
|
+ U64_PRINTF_ARG(adj),
|
|
|
+ U64_PRINTF_ARG(chan->bytes_queued_for_xmit),
|
|
|
+ U64_PRINTF_ARG(queued));
|
|
|
+ /* Update the channel's estimate */
|
|
|
+ chan->bytes_queued_for_xmit = queued;
|
|
|
+
|
|
|
+ /* Update the global queue size estimate if appropriate */
|
|
|
+ if (chan->state == CHANNEL_STATE_OPEN ||
|
|
|
+ chan->state == CHANNEL_STATE_MAINT) {
|
|
|
+ estimated_total_queue_size += adj;
|
|
|
+ log_debug(LD_CHANNEL,
|
|
|
+ "Increasing global queue size by " U64_FORMAT " for channel "
|
|
|
+ U64_FORMAT ", new size is " U64_FORMAT,
|
|
|
+ U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
|
|
|
+ U64_PRINTF_ARG(estimated_total_queue_size));
|
|
|
+ /* Tell the scheduler we're increasing the queue size */
|
|
|
+ scheduler_adjust_queue_size(chan, 1, adj);
|
|
|
+ }
|
|
|
+ } else if (queued < chan->bytes_queued_for_xmit) {
|
|
|
+ adj = chan->bytes_queued_for_xmit - queued;
|
|
|
+ log_debug(LD_CHANNEL,
|
|
|
+ "Decreasing queue size for channel " U64_FORMAT " by " U64_FORMAT
|
|
|
+ " from " U64_FORMAT " to " U64_FORMAT,
|
|
|
+ U64_PRINTF_ARG(chan->global_identifier),
|
|
|
+ U64_PRINTF_ARG(adj),
|
|
|
+ U64_PRINTF_ARG(chan->bytes_queued_for_xmit),
|
|
|
+ U64_PRINTF_ARG(queued));
|
|
|
+ /* Update the channel's estimate */
|
|
|
+ chan->bytes_queued_for_xmit = queued;
|
|
|
+
|
|
|
+ /* Update the global queue size estimate if appropriate */
|
|
|
+ if (chan->state == CHANNEL_STATE_OPEN ||
|
|
|
+ chan->state == CHANNEL_STATE_MAINT) {
|
|
|
+ estimated_total_queue_size -= adj;
|
|
|
+ log_debug(LD_CHANNEL,
|
|
|
+ "Decreasing global queue size by " U64_FORMAT " for channel "
|
|
|
+ U64_FORMAT ", new size is " U64_FORMAT,
|
|
|
+ U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
|
|
|
+ U64_PRINTF_ARG(estimated_total_queue_size));
|
|
|
+ /* Tell the scheduler we're decreasing the queue size */
|
|
|
+ scheduler_adjust_queue_size(chan, -1, adj);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|