|
@@ -201,14 +201,6 @@ HT_PROTOTYPE(channel_idmap, channel_idmap_entry_s, node, channel_idmap_hash,
|
|
|
HT_GENERATE2(channel_idmap, channel_idmap_entry_s, node, channel_idmap_hash,
|
|
|
channel_idmap_eq, 0.5, tor_reallocarray_, tor_free_)
|
|
|
|
|
|
-static cell_queue_entry_t * cell_queue_entry_dup(cell_queue_entry_t *q);
|
|
|
-#if 0
|
|
|
-static int cell_queue_entry_is_padding(cell_queue_entry_t *q);
|
|
|
-#endif
|
|
|
-static cell_queue_entry_t *
|
|
|
-cell_queue_entry_new_fixed(cell_t *cell);
|
|
|
-static cell_queue_entry_t *
|
|
|
-cell_queue_entry_new_var(var_cell_t *var_cell);
|
|
|
static int is_destroy_cell(channel_t *chan,
|
|
|
const cell_queue_entry_t *q, circid_t *circid_out);
|
|
|
|
|
@@ -218,13 +210,6 @@ static void channel_assert_counter_consistency(void);
|
|
|
static void channel_add_to_digest_map(channel_t *chan);
|
|
|
static void channel_remove_from_digest_map(channel_t *chan);
|
|
|
|
|
|
-/*
|
|
|
- * Flush cells from just the outgoing queue without trying to get them
|
|
|
- * from circuits; used internall by channel_flush_some_cells().
|
|
|
- */
|
|
|
-static ssize_t
|
|
|
-channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
|
|
|
- ssize_t num_cells);
|
|
|
static void channel_force_free(channel_t *chan);
|
|
|
static void
|
|
|
channel_free_list(smartlist_t *channels, int mark_for_close);
|
|
@@ -936,10 +921,6 @@ channel_init(channel_t *chan)
|
|
|
/* Warn about exhausted circuit IDs no more than hourly. */
|
|
|
chan->last_warned_circ_ids_exhausted.rate = 3600;
|
|
|
|
|
|
- /* Initialize queues. */
|
|
|
- TOR_SIMPLEQ_INIT(&chan->incoming_queue);
|
|
|
- TOR_SIMPLEQ_INIT(&chan->outgoing_queue);
|
|
|
-
|
|
|
/* Initialize list entries. */
|
|
|
memset(&chan->next_with_same_id, 0, sizeof(chan->next_with_same_id));
|
|
|
|
|
@@ -1069,7 +1050,6 @@ channel_listener_free(channel_listener_t *chan_l)
|
|
|
static void
|
|
|
channel_force_free(channel_t *chan)
|
|
|
{
|
|
|
- cell_queue_entry_t *cell, *cell_tmp;
|
|
|
tor_assert(chan);
|
|
|
|
|
|
log_debug(LD_CHANNEL,
|
|
@@ -1103,18 +1083,6 @@ channel_force_free(channel_t *chan)
|
|
|
chan->cmux = NULL;
|
|
|
}
|
|
|
|
|
|
- /* We might still have a cell queue; kill it */
|
|
|
- TOR_SIMPLEQ_FOREACH_SAFE(cell, &chan->incoming_queue, next, cell_tmp) {
|
|
|
- cell_queue_entry_free(cell, 0);
|
|
|
- }
|
|
|
- TOR_SIMPLEQ_INIT(&chan->incoming_queue);
|
|
|
-
|
|
|
- /* Outgoing cell queue is similar, but we can have to free packed cells */
|
|
|
- TOR_SIMPLEQ_FOREACH_SAFE(cell, &chan->outgoing_queue, next, cell_tmp) {
|
|
|
- cell_queue_entry_free(cell, 0);
|
|
|
- }
|
|
|
- TOR_SIMPLEQ_INIT(&chan->outgoing_queue);
|
|
|
-
|
|
|
tor_free(chan);
|
|
|
}
|
|
|
|
|
@@ -1247,8 +1215,6 @@ channel_set_cell_handlers(channel_t *chan,
|
|
|
channel_var_cell_handler_fn_ptr
|
|
|
var_cell_handler)
|
|
|
{
|
|
|
- int try_again = 0;
|
|
|
-
|
|
|
tor_assert(chan);
|
|
|
tor_assert(CHANNEL_CAN_HANDLE_CELLS(chan));
|
|
|
|
|
@@ -1259,21 +1225,9 @@ channel_set_cell_handlers(channel_t *chan,
|
|
|
"Setting var_cell_handler callback for channel %p to %p",
|
|
|
chan, var_cell_handler);
|
|
|
|
|
|
- /* Should we try the queue? */
|
|
|
- if (cell_handler &&
|
|
|
- cell_handler != chan->cell_handler) try_again = 1;
|
|
|
- if (var_cell_handler &&
|
|
|
- var_cell_handler != chan->var_cell_handler) try_again = 1;
|
|
|
-
|
|
|
/* Change them */
|
|
|
chan->cell_handler = cell_handler;
|
|
|
chan->var_cell_handler = var_cell_handler;
|
|
|
-
|
|
|
- /* Re-run the queue if we have one and there's any reason to */
|
|
|
- if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue) &&
|
|
|
- try_again &&
|
|
|
- (chan->cell_handler ||
|
|
|
- chan->var_cell_handler)) channel_process_cells(chan);
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -1729,147 +1683,6 @@ channel_set_remote_end(channel_t *chan,
|
|
|
channel_add_to_digest_map(chan);
|
|
|
}
|
|
|
|
|
|
-/**
|
|
|
- * Duplicate a cell queue entry; this is a shallow copy intended for use
|
|
|
- * in channel_write_cell_queue_entry().
|
|
|
- */
|
|
|
-
|
|
|
-static cell_queue_entry_t *
|
|
|
-cell_queue_entry_dup(cell_queue_entry_t *q)
|
|
|
-{
|
|
|
- cell_queue_entry_t *rv = NULL;
|
|
|
-
|
|
|
- tor_assert(q);
|
|
|
-
|
|
|
- rv = tor_malloc(sizeof(*rv));
|
|
|
- memcpy(rv, q, sizeof(*rv));
|
|
|
-
|
|
|
- return rv;
|
|
|
-}
|
|
|
-
|
|
|
-/**
|
|
|
- * Free a cell_queue_entry_t; the handed_off parameter indicates whether
|
|
|
- * the contents were passed to the lower layer (it is responsible for
|
|
|
- * them) or not (we should free).
|
|
|
- */
|
|
|
-
|
|
|
-STATIC void
|
|
|
-cell_queue_entry_free(cell_queue_entry_t *q, int handed_off)
|
|
|
-{
|
|
|
- if (!q) return;
|
|
|
-
|
|
|
- if (!handed_off) {
|
|
|
- /*
|
|
|
- * If we handed it off, the recipient becomes responsible (or
|
|
|
- * with packed cells the channel_t subclass calls packed_cell
|
|
|
- * free after writing out its contents; see, e.g.,
|
|
|
- * channel_tls_write_packed_cell_method(). Otherwise, we have
|
|
|
- * to take care of it here if possible.
|
|
|
- */
|
|
|
- switch (q->type) {
|
|
|
- case CELL_QUEUE_FIXED:
|
|
|
- if (q->u.fixed.cell) {
|
|
|
- /*
|
|
|
- * There doesn't seem to be a cell_free() function anywhere in the
|
|
|
- * pre-channel code; just use tor_free()
|
|
|
- */
|
|
|
- tor_free(q->u.fixed.cell);
|
|
|
- }
|
|
|
- break;
|
|
|
- case CELL_QUEUE_PACKED:
|
|
|
- if (q->u.packed.packed_cell) {
|
|
|
- packed_cell_free(q->u.packed.packed_cell);
|
|
|
- }
|
|
|
- break;
|
|
|
- case CELL_QUEUE_VAR:
|
|
|
- if (q->u.var.var_cell) {
|
|
|
- /*
|
|
|
- * This one's in connection_or.c; it'd be nice to figure out the
|
|
|
- * whole flow of cells from one end to the other and factor the
|
|
|
- * cell memory management functions like this out of the specific
|
|
|
- * TLS lower layer.
|
|
|
- */
|
|
|
- var_cell_free(q->u.var.var_cell);
|
|
|
- }
|
|
|
- break;
|
|
|
- default:
|
|
|
- /*
|
|
|
- * Nothing we can do if we don't know the type; this will
|
|
|
- * have been warned about elsewhere.
|
|
|
- */
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- tor_free(q);
|
|
|
-}
|
|
|
-
|
|
|
-#if 0
|
|
|
-/**
|
|
|
- * Check whether a cell queue entry is padding; this is a helper function
|
|
|
- * for channel_write_cell_queue_entry()
|
|
|
- */
|
|
|
-
|
|
|
-static int
|
|
|
-cell_queue_entry_is_padding(cell_queue_entry_t *q)
|
|
|
-{
|
|
|
- tor_assert(q);
|
|
|
-
|
|
|
- if (q->type == CELL_QUEUE_FIXED) {
|
|
|
- if (q->u.fixed.cell) {
|
|
|
- if (q->u.fixed.cell->command == CELL_PADDING ||
|
|
|
- q->u.fixed.cell->command == CELL_VPADDING) {
|
|
|
- return 1;
|
|
|
- }
|
|
|
- }
|
|
|
- } else if (q->type == CELL_QUEUE_VAR) {
|
|
|
- if (q->u.var.var_cell) {
|
|
|
- if (q->u.var.var_cell->command == CELL_PADDING ||
|
|
|
- q->u.var.var_cell->command == CELL_VPADDING) {
|
|
|
- return 1;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return 0;
|
|
|
-}
|
|
|
-#endif /* 0 */
|
|
|
-
|
|
|
-/**
|
|
|
- * Allocate a new cell queue entry for a fixed-size cell
|
|
|
- */
|
|
|
-
|
|
|
-static cell_queue_entry_t *
|
|
|
-cell_queue_entry_new_fixed(cell_t *cell)
|
|
|
-{
|
|
|
- cell_queue_entry_t *q = NULL;
|
|
|
-
|
|
|
- tor_assert(cell);
|
|
|
-
|
|
|
- q = tor_malloc(sizeof(*q));
|
|
|
- q->type = CELL_QUEUE_FIXED;
|
|
|
- q->u.fixed.cell = cell;
|
|
|
-
|
|
|
- return q;
|
|
|
-}
|
|
|
-
|
|
|
-/**
|
|
|
- * Allocate a new cell queue entry for a variable-size cell
|
|
|
- */
|
|
|
-
|
|
|
-static cell_queue_entry_t *
|
|
|
-cell_queue_entry_new_var(var_cell_t *var_cell)
|
|
|
-{
|
|
|
- cell_queue_entry_t *q = NULL;
|
|
|
-
|
|
|
- tor_assert(var_cell);
|
|
|
-
|
|
|
- q = tor_malloc(sizeof(*q));
|
|
|
- q->type = CELL_QUEUE_VAR;
|
|
|
- q->u.var.var_cell = var_cell;
|
|
|
-
|
|
|
- return q;
|
|
|
-}
|
|
|
-
|
|
|
/**
|
|
|
* Ask how big the cell contained in a cell_queue_entry_t is
|
|
|
*/
|
|
@@ -1910,8 +1723,7 @@ channel_get_cell_queue_entry_size(channel_t *chan, cell_queue_entry_t *q)
|
|
|
static void
|
|
|
channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
|
|
|
{
|
|
|
- int result = 0, sent = 0;
|
|
|
- cell_queue_entry_t *tmp = NULL;
|
|
|
+ int result = 0;
|
|
|
size_t cell_bytes;
|
|
|
|
|
|
tor_assert(chan);
|
|
@@ -1931,8 +1743,7 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
|
|
|
cell_bytes = channel_get_cell_queue_entry_size(chan, q);
|
|
|
|
|
|
/* Can we send it right out? If so, try */
|
|
|
- if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue) &&
|
|
|
- CHANNEL_IS_OPEN(chan)) {
|
|
|
+ if (CHANNEL_IS_OPEN(chan)) {
|
|
|
/* Pick the right write function for this cell type and save the result */
|
|
|
switch (q->type) {
|
|
|
case CELL_QUEUE_FIXED:
|
|
@@ -1956,7 +1767,6 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
|
|
|
|
|
|
/* Check if we got it out */
|
|
|
if (result > 0) {
|
|
|
- sent = 1;
|
|
|
/* Timestamp for transmission */
|
|
|
channel_timestamp_xmit(chan);
|
|
|
/* If we're here the queue is empty, so it's drained too */
|
|
@@ -1973,25 +1783,8 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (!sent) {
|
|
|
- /* Not sent, queue it */
|
|
|
- /*
|
|
|
- * We have to copy the queue entry passed in, since the caller probably
|
|
|
- * used the stack.
|
|
|
- */
|
|
|
- tmp = cell_queue_entry_dup(q);
|
|
|
- TOR_SIMPLEQ_INSERT_TAIL(&chan->outgoing_queue, tmp, next);
|
|
|
- /* Update global counters */
|
|
|
- ++n_channel_cells_queued;
|
|
|
- ++n_channel_cells_in_queues;
|
|
|
- n_channel_bytes_queued += cell_bytes;
|
|
|
- n_channel_bytes_in_queues += cell_bytes;
|
|
|
- channel_assert_counter_consistency();
|
|
|
- /* Update channel queue size */
|
|
|
- chan->bytes_in_queue += cell_bytes;
|
|
|
- /* Try to process the queue? */
|
|
|
- if (CHANNEL_IS_OPEN(chan)) channel_flush_cells(chan);
|
|
|
- }
|
|
|
+ /* XXX: If the cell wasn't sent, we need to propagate the error back so we
|
|
|
+ * can put it back in the circuit queue. */
|
|
|
}
|
|
|
|
|
|
/** Write a generic cell type to a channel
|
|
@@ -2207,13 +2000,6 @@ channel_change_state_(channel_t *chan, channel_state_t to_state)
|
|
|
from_state == CHANNEL_STATE_MAINT)) {
|
|
|
estimated_total_queue_size += chan->bytes_in_queue;
|
|
|
}
|
|
|
-
|
|
|
- if (to_state == CHANNEL_STATE_CLOSED ||
|
|
|
- to_state == CHANNEL_STATE_ERROR) {
|
|
|
- /* Assert that all queues are empty */
|
|
|
- tor_assert(TOR_SIMPLEQ_EMPTY(&chan->incoming_queue));
|
|
|
- tor_assert(TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue));
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2237,12 +2023,6 @@ channel_change_state_open(channel_t *chan)
|
|
|
/* Tell circuits if we opened and stuff */
|
|
|
channel_do_open_actions(chan);
|
|
|
chan->has_been_open = 1;
|
|
|
-
|
|
|
- /* Check for queued cells to process */
|
|
|
- if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue))
|
|
|
- channel_process_cells(chan);
|
|
|
- if (! TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue))
|
|
|
- channel_flush_cells(chan);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2347,8 +2127,7 @@ channel_flush_some_cells, (channel_t *chan, ssize_t num_cells))
|
|
|
{
|
|
|
unsigned int unlimited = 0;
|
|
|
ssize_t flushed = 0;
|
|
|
- int num_cells_from_circs, clamped_num_cells;
|
|
|
- int q_len_before, q_len_after;
|
|
|
+ int clamped_num_cells;
|
|
|
|
|
|
tor_assert(chan);
|
|
|
|
|
@@ -2357,11 +2136,6 @@ channel_flush_some_cells, (channel_t *chan, ssize_t num_cells))
|
|
|
|
|
|
/* If we aren't in CHANNEL_STATE_OPEN, nothing goes through */
|
|
|
if (CHANNEL_IS_OPEN(chan)) {
|
|
|
- /* Try to flush as much as we can that's already queued */
|
|
|
- flushed += channel_flush_some_cells_from_outgoing_queue(chan,
|
|
|
- (unlimited ? -1 : num_cells - flushed));
|
|
|
- if (!unlimited && num_cells <= flushed) goto done;
|
|
|
-
|
|
|
if (circuitmux_num_cells(chan->cmux) > 0) {
|
|
|
/* Calculate number of cells, including clamp */
|
|
|
if (unlimited) {
|
|
@@ -2375,45 +2149,9 @@ channel_flush_some_cells, (channel_t *chan, ssize_t num_cells))
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /*
|
|
|
- * Keep track of the change in queue size; we have to count cells
|
|
|
- * channel_flush_from_first_active_circuit() writes out directly,
|
|
|
- * but not double-count ones we might get later in
|
|
|
- * channel_flush_some_cells_from_outgoing_queue()
|
|
|
- */
|
|
|
- q_len_before = chan_cell_queue_len(&(chan->outgoing_queue));
|
|
|
-
|
|
|
/* Try to get more cells from any active circuits */
|
|
|
- num_cells_from_circs = channel_flush_from_first_active_circuit(
|
|
|
+ flushed = channel_flush_from_first_active_circuit(
|
|
|
chan, clamped_num_cells);
|
|
|
-
|
|
|
- q_len_after = chan_cell_queue_len(&(chan->outgoing_queue));
|
|
|
-
|
|
|
- /*
|
|
|
- * If it claims we got some, adjust the flushed counter and consider
|
|
|
- * processing the queue again
|
|
|
- */
|
|
|
- if (num_cells_from_circs > 0) {
|
|
|
- /*
|
|
|
- * Adjust flushed by the number of cells counted in
|
|
|
- * num_cells_from_circs that didn't go to the cell queue.
|
|
|
- */
|
|
|
-
|
|
|
- if (q_len_after > q_len_before) {
|
|
|
- num_cells_from_circs -= (q_len_after - q_len_before);
|
|
|
- if (num_cells_from_circs < 0) num_cells_from_circs = 0;
|
|
|
- }
|
|
|
-
|
|
|
- flushed += num_cells_from_circs;
|
|
|
-
|
|
|
- /* Now process the queue if necessary */
|
|
|
-
|
|
|
- if ((q_len_after > q_len_before) &&
|
|
|
- (unlimited || (flushed < num_cells))) {
|
|
|
- flushed += channel_flush_some_cells_from_outgoing_queue(chan,
|
|
|
- (unlimited ? -1 : num_cells - flushed));
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2421,181 +2159,6 @@ channel_flush_some_cells, (channel_t *chan, ssize_t num_cells))
|
|
|
return flushed;
|
|
|
}
|
|
|
|
|
|
-/**
|
|
|
- * Flush cells from just the channel's outgoing cell queue
|
|
|
- *
|
|
|
- * This gets called from channel_flush_some_cells() above to flush cells
|
|
|
- * just from the queue without trying for active_circuits.
|
|
|
- */
|
|
|
-
|
|
|
-static ssize_t
|
|
|
-channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
|
|
|
- ssize_t num_cells)
|
|
|
-{
|
|
|
- unsigned int unlimited = 0;
|
|
|
- ssize_t flushed = 0;
|
|
|
- cell_queue_entry_t *q = NULL;
|
|
|
- size_t cell_size;
|
|
|
- int free_q = 0, handed_off = 0;
|
|
|
-
|
|
|
- tor_assert(chan);
|
|
|
- tor_assert(chan->write_cell);
|
|
|
- tor_assert(chan->write_packed_cell);
|
|
|
- tor_assert(chan->write_var_cell);
|
|
|
-
|
|
|
- if (num_cells < 0) unlimited = 1;
|
|
|
- if (!unlimited && num_cells <= flushed) return 0;
|
|
|
-
|
|
|
- /* If we aren't in CHANNEL_STATE_OPEN, nothing goes through */
|
|
|
- if (CHANNEL_IS_OPEN(chan)) {
|
|
|
- while ((unlimited || num_cells > flushed) &&
|
|
|
- NULL != (q = TOR_SIMPLEQ_FIRST(&chan->outgoing_queue))) {
|
|
|
- free_q = 0;
|
|
|
- handed_off = 0;
|
|
|
-
|
|
|
- /* Figure out how big it is for statistical purposes */
|
|
|
- cell_size = channel_get_cell_queue_entry_size(chan, q);
|
|
|
- /*
|
|
|
- * Okay, we have a good queue entry, try to give it to the lower
|
|
|
- * layer.
|
|
|
- */
|
|
|
- switch (q->type) {
|
|
|
- case CELL_QUEUE_FIXED:
|
|
|
- if (q->u.fixed.cell) {
|
|
|
- if (chan->write_cell(chan,
|
|
|
- q->u.fixed.cell)) {
|
|
|
- ++flushed;
|
|
|
- channel_timestamp_xmit(chan);
|
|
|
- ++(chan->n_cells_xmitted);
|
|
|
- chan->n_bytes_xmitted += cell_size;
|
|
|
- free_q = 1;
|
|
|
- handed_off = 1;
|
|
|
- }
|
|
|
- /* Else couldn't write it; leave it on the queue */
|
|
|
- } else {
|
|
|
- /* This shouldn't happen */
|
|
|
- log_info(LD_CHANNEL,
|
|
|
- "Saw broken cell queue entry of type CELL_QUEUE_FIXED "
|
|
|
- "with no cell on channel %p "
|
|
|
- "(global ID " U64_FORMAT ").",
|
|
|
- chan, U64_PRINTF_ARG(chan->global_identifier));
|
|
|
- /* Throw it away */
|
|
|
- free_q = 1;
|
|
|
- handed_off = 0;
|
|
|
- }
|
|
|
- break;
|
|
|
- case CELL_QUEUE_PACKED:
|
|
|
- if (q->u.packed.packed_cell) {
|
|
|
- if (chan->write_packed_cell(chan,
|
|
|
- q->u.packed.packed_cell)) {
|
|
|
- ++flushed;
|
|
|
- channel_timestamp_xmit(chan);
|
|
|
- ++(chan->n_cells_xmitted);
|
|
|
- chan->n_bytes_xmitted += cell_size;
|
|
|
- free_q = 1;
|
|
|
- handed_off = 1;
|
|
|
- }
|
|
|
- /* Else couldn't write it; leave it on the queue */
|
|
|
- } else {
|
|
|
- /* This shouldn't happen */
|
|
|
- log_info(LD_CHANNEL,
|
|
|
- "Saw broken cell queue entry of type CELL_QUEUE_PACKED "
|
|
|
- "with no cell on channel %p "
|
|
|
- "(global ID " U64_FORMAT ").",
|
|
|
- chan, U64_PRINTF_ARG(chan->global_identifier));
|
|
|
- /* Throw it away */
|
|
|
- free_q = 1;
|
|
|
- handed_off = 0;
|
|
|
- }
|
|
|
- break;
|
|
|
- case CELL_QUEUE_VAR:
|
|
|
- if (q->u.var.var_cell) {
|
|
|
- if (chan->write_var_cell(chan,
|
|
|
- q->u.var.var_cell)) {
|
|
|
- ++flushed;
|
|
|
- channel_timestamp_xmit(chan);
|
|
|
- ++(chan->n_cells_xmitted);
|
|
|
- chan->n_bytes_xmitted += cell_size;
|
|
|
- free_q = 1;
|
|
|
- handed_off = 1;
|
|
|
- }
|
|
|
- /* Else couldn't write it; leave it on the queue */
|
|
|
- } else {
|
|
|
- /* This shouldn't happen */
|
|
|
- log_info(LD_CHANNEL,
|
|
|
- "Saw broken cell queue entry of type CELL_QUEUE_VAR "
|
|
|
- "with no cell on channel %p "
|
|
|
- "(global ID " U64_FORMAT ").",
|
|
|
- chan, U64_PRINTF_ARG(chan->global_identifier));
|
|
|
- /* Throw it away */
|
|
|
- free_q = 1;
|
|
|
- handed_off = 0;
|
|
|
- }
|
|
|
- break;
|
|
|
- default:
|
|
|
- /* Unknown type, log and free it */
|
|
|
- log_info(LD_CHANNEL,
|
|
|
- "Saw an unknown cell queue entry type %d on channel %p "
|
|
|
- "(global ID " U64_FORMAT "; ignoring it."
|
|
|
- " Someone should fix this.",
|
|
|
- q->type, chan, U64_PRINTF_ARG(chan->global_identifier));
|
|
|
- free_q = 1;
|
|
|
- handed_off = 0;
|
|
|
- }
|
|
|
-
|
|
|
- /*
|
|
|
- * if free_q is set, we used it and should remove the queue entry;
|
|
|
- * we have to do the free down here so TOR_SIMPLEQ_REMOVE_HEAD isn't
|
|
|
- * accessing freed memory
|
|
|
- */
|
|
|
- if (free_q) {
|
|
|
- TOR_SIMPLEQ_REMOVE_HEAD(&chan->outgoing_queue, next);
|
|
|
- /*
|
|
|
- * ...and we handed a cell off to the lower layer, so we should
|
|
|
- * update the counters.
|
|
|
- */
|
|
|
- ++n_channel_cells_passed_to_lower_layer;
|
|
|
- --n_channel_cells_in_queues;
|
|
|
- n_channel_bytes_passed_to_lower_layer += cell_size;
|
|
|
- n_channel_bytes_in_queues -= cell_size;
|
|
|
- channel_assert_counter_consistency();
|
|
|
- /* Update the channel's queue size too */
|
|
|
- chan->bytes_in_queue -= cell_size;
|
|
|
- /* Finally, free q */
|
|
|
- cell_queue_entry_free(q, handed_off);
|
|
|
- q = NULL;
|
|
|
- } else {
|
|
|
- /* No cell removed from list, so we can't go on any further */
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /* Did we drain the queue? */
|
|
|
- if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue)) {
|
|
|
- channel_timestamp_drained(chan);
|
|
|
- }
|
|
|
-
|
|
|
- /* Update the estimate queue size */
|
|
|
- channel_update_xmit_queue_size(chan);
|
|
|
-
|
|
|
- return flushed;
|
|
|
-}
|
|
|
-
|
|
|
-/**
|
|
|
- * Flush as many cells as we possibly can from the queue
|
|
|
- *
|
|
|
- * This tries to flush as many cells from the queue as the lower layer
|
|
|
- * will take. It just calls channel_flush_some_cells_from_outgoing_queue()
|
|
|
- * in unlimited mode.
|
|
|
- */
|
|
|
-
|
|
|
-void
|
|
|
-channel_flush_cells(channel_t *chan)
|
|
|
-{
|
|
|
- channel_flush_some_cells_from_outgoing_queue(chan, -1);
|
|
|
-}
|
|
|
-
|
|
|
/**
|
|
|
* Check if any cells are available
|
|
|
*
|
|
@@ -2608,10 +2171,6 @@ channel_more_to_flush, (channel_t *chan))
|
|
|
{
|
|
|
tor_assert(chan);
|
|
|
|
|
|
- /* Check if we have any queued */
|
|
|
- if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue))
|
|
|
- return 1;
|
|
|
-
|
|
|
/* Check if any circuits would like to queue some */
|
|
|
if (circuitmux_num_cells(chan->cmux) > 0) return 1;
|
|
|
|
|
@@ -2823,200 +2382,37 @@ channel_listener_queue_incoming(channel_listener_t *listener,
|
|
|
*/
|
|
|
|
|
|
void
|
|
|
-channel_process_cells(channel_t *chan)
|
|
|
+channel_process_cell(channel_t *chan, cell_t *cell)
|
|
|
{
|
|
|
- cell_queue_entry_t *q;
|
|
|
tor_assert(chan);
|
|
|
tor_assert(CHANNEL_IS_CLOSING(chan) || CHANNEL_IS_MAINT(chan) ||
|
|
|
CHANNEL_IS_OPEN(chan));
|
|
|
-
|
|
|
- log_debug(LD_CHANNEL,
|
|
|
- "Processing as many incoming cells as we can for channel %p",
|
|
|
- chan);
|
|
|
+ tor_assert(cell);
|
|
|
|
|
|
/* Nothing we can do if we have no registered cell handlers */
|
|
|
- if (!(chan->cell_handler ||
|
|
|
- chan->var_cell_handler)) return;
|
|
|
- /* Nothing we can do if we have no cells */
|
|
|
- if (TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) return;
|
|
|
+ if (!chan->cell_handler)
|
|
|
+ return;
|
|
|
|
|
|
/*
|
|
|
- * Process cells until we're done or find one we have no current handler
|
|
|
- * for.
|
|
|
+ * Process the given cell
|
|
|
*
|
|
|
* We must free the cells here after calling the handler, since custody
|
|
|
* of the buffer was given to the channel layer when they were queued;
|
|
|
* see comments on memory management in channel_queue_cell() and in
|
|
|
* channel_queue_var_cell() below.
|
|
|
*/
|
|
|
- while (NULL != (q = TOR_SIMPLEQ_FIRST(&chan->incoming_queue))) {
|
|
|
- tor_assert(q);
|
|
|
- tor_assert(q->type == CELL_QUEUE_FIXED ||
|
|
|
- q->type == CELL_QUEUE_VAR);
|
|
|
-
|
|
|
- if (q->type == CELL_QUEUE_FIXED &&
|
|
|
- chan->cell_handler) {
|
|
|
- /* Handle a fixed-length cell */
|
|
|
- TOR_SIMPLEQ_REMOVE_HEAD(&chan->incoming_queue, next);
|
|
|
- tor_assert(q->u.fixed.cell);
|
|
|
- log_debug(LD_CHANNEL,
|
|
|
- "Processing incoming cell_t %p for channel %p (global ID "
|
|
|
- U64_FORMAT ")",
|
|
|
- q->u.fixed.cell, chan,
|
|
|
- U64_PRINTF_ARG(chan->global_identifier));
|
|
|
- chan->cell_handler(chan, q->u.fixed.cell);
|
|
|
- tor_free(q->u.fixed.cell);
|
|
|
- tor_free(q);
|
|
|
- } else if (q->type == CELL_QUEUE_VAR &&
|
|
|
- chan->var_cell_handler) {
|
|
|
- /* Handle a variable-length cell */
|
|
|
- TOR_SIMPLEQ_REMOVE_HEAD(&chan->incoming_queue, next);
|
|
|
- tor_assert(q->u.var.var_cell);
|
|
|
- log_debug(LD_CHANNEL,
|
|
|
- "Processing incoming var_cell_t %p for channel %p (global ID "
|
|
|
- U64_FORMAT ")",
|
|
|
- q->u.var.var_cell, chan,
|
|
|
- U64_PRINTF_ARG(chan->global_identifier));
|
|
|
- chan->var_cell_handler(chan, q->u.var.var_cell);
|
|
|
- tor_free(q->u.var.var_cell);
|
|
|
- tor_free(q);
|
|
|
- } else {
|
|
|
- /* Can't handle this one */
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-/**
|
|
|
- * Queue incoming cell
|
|
|
- *
|
|
|
- * This should be called by a channel_t subclass to queue an incoming fixed-
|
|
|
- * length cell for processing, and process it if possible.
|
|
|
- */
|
|
|
-
|
|
|
-void
|
|
|
-channel_queue_cell(channel_t *chan, cell_t *cell)
|
|
|
-{
|
|
|
- int need_to_queue = 0;
|
|
|
- cell_queue_entry_t *q;
|
|
|
- cell_t *cell_copy = NULL;
|
|
|
-
|
|
|
- tor_assert(chan);
|
|
|
- tor_assert(cell);
|
|
|
- tor_assert(CHANNEL_IS_OPEN(chan));
|
|
|
-
|
|
|
- /* Do we need to queue it, or can we just call the handler right away? */
|
|
|
- if (!(chan->cell_handler)) need_to_queue = 1;
|
|
|
- if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue))
|
|
|
- need_to_queue = 1;
|
|
|
|
|
|
/* Timestamp for receiving */
|
|
|
channel_timestamp_recv(chan);
|
|
|
-
|
|
|
- /* Update the counters */
|
|
|
+ /* Update received counter. */
|
|
|
++(chan->n_cells_recved);
|
|
|
chan->n_bytes_recved += get_cell_network_size(chan->wide_circ_ids);
|
|
|
|
|
|
- /* If we don't need to queue we can just call cell_handler */
|
|
|
- if (!need_to_queue) {
|
|
|
- tor_assert(chan->cell_handler);
|
|
|
- log_debug(LD_CHANNEL,
|
|
|
- "Directly handling incoming cell_t %p for channel %p "
|
|
|
- "(global ID " U64_FORMAT ")",
|
|
|
- cell, chan,
|
|
|
- U64_PRINTF_ARG(chan->global_identifier));
|
|
|
- chan->cell_handler(chan, cell);
|
|
|
- } else {
|
|
|
- /*
|
|
|
- * Otherwise queue it and then process the queue if possible.
|
|
|
- *
|
|
|
- * We queue a copy, not the original pointer - it might have been on the
|
|
|
- * stack in connection_or_process_cells_from_inbuf() (or another caller
|
|
|
- * if we ever have a subclass other than channel_tls_t), or be freed
|
|
|
- * there after we return. This is the uncommon case; the non-copying
|
|
|
- * fast path occurs in the if (!need_to_queue) case above when the
|
|
|
- * upper layer has installed cell handlers.
|
|
|
- */
|
|
|
- cell_copy = tor_malloc_zero(sizeof(cell_t));
|
|
|
- memcpy(cell_copy, cell, sizeof(cell_t));
|
|
|
- q = cell_queue_entry_new_fixed(cell_copy);
|
|
|
- log_debug(LD_CHANNEL,
|
|
|
- "Queueing incoming cell_t %p for channel %p "
|
|
|
- "(global ID " U64_FORMAT ")",
|
|
|
- cell, chan,
|
|
|
- U64_PRINTF_ARG(chan->global_identifier));
|
|
|
- TOR_SIMPLEQ_INSERT_TAIL(&chan->incoming_queue, q, next);
|
|
|
- if (chan->cell_handler ||
|
|
|
- chan->var_cell_handler) {
|
|
|
- channel_process_cells(chan);
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-/**
|
|
|
- * Queue incoming variable-length cell
|
|
|
- *
|
|
|
- * This should be called by a channel_t subclass to queue an incoming
|
|
|
- * variable-length cell for processing, and process it if possible.
|
|
|
- */
|
|
|
-
|
|
|
-void
|
|
|
-channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell)
|
|
|
-{
|
|
|
- int need_to_queue = 0;
|
|
|
- cell_queue_entry_t *q;
|
|
|
- var_cell_t *cell_copy = NULL;
|
|
|
-
|
|
|
- tor_assert(chan);
|
|
|
- tor_assert(var_cell);
|
|
|
- tor_assert(CHANNEL_IS_OPEN(chan));
|
|
|
-
|
|
|
- /* Do we need to queue it, or can we just call the handler right away? */
|
|
|
- if (!(chan->var_cell_handler)) need_to_queue = 1;
|
|
|
- if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue))
|
|
|
- need_to_queue = 1;
|
|
|
-
|
|
|
- /* Timestamp for receiving */
|
|
|
- channel_timestamp_recv(chan);
|
|
|
-
|
|
|
- /* Update the counter */
|
|
|
- ++(chan->n_cells_recved);
|
|
|
- chan->n_bytes_recved += get_var_cell_header_size(chan->wide_circ_ids) +
|
|
|
- var_cell->payload_len;
|
|
|
-
|
|
|
- /* If we don't need to queue we can just call cell_handler */
|
|
|
- if (!need_to_queue) {
|
|
|
- tor_assert(chan->var_cell_handler);
|
|
|
- log_debug(LD_CHANNEL,
|
|
|
- "Directly handling incoming var_cell_t %p for channel %p "
|
|
|
- "(global ID " U64_FORMAT ")",
|
|
|
- var_cell, chan,
|
|
|
- U64_PRINTF_ARG(chan->global_identifier));
|
|
|
- chan->var_cell_handler(chan, var_cell);
|
|
|
- } else {
|
|
|
- /*
|
|
|
- * Otherwise queue it and then process the queue if possible.
|
|
|
- *
|
|
|
- * We queue a copy, not the original pointer - it might have been on the
|
|
|
- * stack in connection_or_process_cells_from_inbuf() (or another caller
|
|
|
- * if we ever have a subclass other than channel_tls_t), or be freed
|
|
|
- * there after we return. This is the uncommon case; the non-copying
|
|
|
- * fast path occurs in the if (!need_to_queue) case above when the
|
|
|
- * upper layer has installed cell handlers.
|
|
|
- */
|
|
|
- cell_copy = var_cell_copy(var_cell);
|
|
|
- q = cell_queue_entry_new_var(cell_copy);
|
|
|
- log_debug(LD_CHANNEL,
|
|
|
- "Queueing incoming var_cell_t %p for channel %p "
|
|
|
- "(global ID " U64_FORMAT ")",
|
|
|
- var_cell, chan,
|
|
|
- U64_PRINTF_ARG(chan->global_identifier));
|
|
|
- TOR_SIMPLEQ_INSERT_TAIL(&chan->incoming_queue, q, next);
|
|
|
- if (chan->cell_handler ||
|
|
|
- chan->var_cell_handler) {
|
|
|
- channel_process_cells(chan);
|
|
|
- }
|
|
|
- }
|
|
|
+ log_debug(LD_CHANNEL,
|
|
|
+ "Processing incoming cell_t %p for channel %p (global ID "
|
|
|
+ U64_FORMAT ")", cell, chan,
|
|
|
+ U64_PRINTF_ARG(chan->global_identifier));
|
|
|
+ chan->cell_handler(chan, cell);
|
|
|
}
|
|
|
|
|
|
/** If <b>packed_cell</b> on <b>chan</b> is a destroy cell, then set
|
|
@@ -3628,19 +3024,6 @@ channel_listener_describe_transport(channel_listener_t *chan_l)
|
|
|
return chan_l->describe_transport(chan_l);
|
|
|
}
|
|
|
|
|
|
-/**
|
|
|
- * Return the number of entries in <b>queue</b>
|
|
|
- */
|
|
|
-STATIC int
|
|
|
-chan_cell_queue_len(const chan_cell_queue_t *queue)
|
|
|
-{
|
|
|
- int r = 0;
|
|
|
- cell_queue_entry_t *cell;
|
|
|
- TOR_SIMPLEQ_FOREACH(cell, queue, next)
|
|
|
- ++r;
|
|
|
- return r;
|
|
|
-}
|
|
|
-
|
|
|
/**
|
|
|
* Dump channel statistics
|
|
|
*
|
|
@@ -3753,14 +3136,6 @@ channel_dump_statistics, (channel_t *chan, int severity))
|
|
|
channel_is_incoming(chan) ?
|
|
|
"incoming" : "outgoing");
|
|
|
|
|
|
- /* Describe queues */
|
|
|
- tor_log(severity, LD_GENERAL,
|
|
|
- " * Channel " U64_FORMAT " has %d queued incoming cells"
|
|
|
- " and %d queued outgoing cells",
|
|
|
- U64_PRINTF_ARG(chan->global_identifier),
|
|
|
- chan_cell_queue_len(&chan->incoming_queue),
|
|
|
- chan_cell_queue_len(&chan->outgoing_queue));
|
|
|
-
|
|
|
/* Describe circuits */
|
|
|
tor_log(severity, LD_GENERAL,
|
|
|
" * Channel " U64_FORMAT " has %d active circuits out of"
|
|
@@ -4037,19 +3412,11 @@ channel_get_addr_if_possible(channel_t *chan, tor_addr_t *addr_out)
|
|
|
int
|
|
|
channel_has_queued_writes(channel_t *chan)
|
|
|
{
|
|
|
- int has_writes = 0;
|
|
|
-
|
|
|
tor_assert(chan);
|
|
|
tor_assert(chan->has_queued_writes);
|
|
|
|
|
|
- if (! TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue)) {
|
|
|
- has_writes = 1;
|
|
|
- } else {
|
|
|
- /* Check with the lower layer */
|
|
|
- has_writes = chan->has_queued_writes(chan);
|
|
|
- }
|
|
|
-
|
|
|
- return has_writes;
|
|
|
+ /* Check with the lower layer */
|
|
|
+ return chan->has_queued_writes(chan);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -4286,11 +3653,8 @@ channel_get_global_queue_estimate(void)
|
|
|
/*
|
|
|
* Estimate the number of writeable cells
|
|
|
*
|
|
|
- * Ask the lower layer for an estimate of how many cells it can accept, and
|
|
|
- * then subtract the length of our outgoing_queue, if any, to produce an
|
|
|
- * estimate of the number of cells this channel can accept for writes.
|
|
|
+ * Ask the lower layer for an estimate of how many cells it can accept.
|
|
|
*/
|
|
|
-
|
|
|
int
|
|
|
channel_num_cells_writeable(channel_t *chan)
|
|
|
{
|
|
@@ -4302,8 +3666,6 @@ channel_num_cells_writeable(channel_t *chan)
|
|
|
if (chan->state == CHANNEL_STATE_OPEN) {
|
|
|
/* Query lower layer */
|
|
|
result = chan->num_cells_writeable(chan);
|
|
|
- /* Subtract cell queue length, if any */
|
|
|
- result -= chan_cell_queue_len(&chan->outgoing_queue);
|
|
|
if (result < 0) result = 0;
|
|
|
} else {
|
|
|
/* No cells are writeable in any other state */
|