Prechádzať zdrojové kódy

Implementation of a fix for bug 7912

I added the code to pass a destroy cell to a queueing function rather
than writing it immediately, and the code to remember that we
shouldn't reuse the circuit id until the destroy is actually sent, and
the code to release the circuit id once the destroy has been sent...
and then I finished by hooking destroy_cell_queue into the rest of
Tor.
Nick Mathewson 11 rokov pred
rodič
commit
43d53e6d86
6 zmenil súbory, kde vykonal 141 pridanie a 21 odobranie
  1. 50 9
      src/or/channel.c
  2. 1 1
      src/or/circuitlist.c
  3. 69 4
      src/or/circuitmux.c
  4. 6 1
      src/or/circuitmux.h
  5. 14 5
      src/or/relay.c
  6. 1 1
      src/or/relay.h

+ 50 - 9
src/or/channel.c

@@ -122,6 +122,8 @@ static cell_queue_entry_t *
 cell_queue_entry_new_fixed(cell_t *cell);
 static cell_queue_entry_t *
 cell_queue_entry_new_var(var_cell_t *var_cell);
+static int is_destroy_cell(channel_t *chan,
+                           const cell_queue_entry_t *q, circid_t *circid_out);
 
 /* Functions to maintain the digest map */
 static void channel_add_to_digest_map(channel_t *chan);
@@ -1685,6 +1687,13 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
     chan->timestamp_last_added_nonpadding = approx_time();
   }
 
+  {
+    circid_t circ_id;
+    if (is_destroy_cell(chan, q, &circ_id)) {
+      channel_note_destroy_not_pending(chan, circ_id);
+    }
+  }
+
   /* Can we send it right out?  If so, try */
   if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue) &&
       chan->state == CHANNEL_STATE_OPEN) {
@@ -2607,6 +2616,43 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell)
   }
 }
 
+/** DOCDOC */
+static int
+is_destroy_cell(channel_t *chan,
+                const cell_queue_entry_t *q, circid_t *circid_out)
+{
+  *circid_out = 0;
+  switch (q->type) {
+    case CELL_QUEUE_FIXED:
+      if (q->u.fixed.cell->command == CELL_DESTROY) {
+        *circid_out = q->u.fixed.cell->circ_id;
+        return 1;
+      }
+      break;
+    case CELL_QUEUE_VAR:
+      if (q->u.var.var_cell->command == CELL_DESTROY) {
+        *circid_out = q->u.var.var_cell->circ_id;
+        return 1;
+      }
+      break;
+    case CELL_QUEUE_PACKED:
+      if (chan->wide_circ_ids) {
+        if (q->u.packed.packed_cell->body[4] == CELL_DESTROY) {
+          *circid_out = ntohl(get_uint32(q->u.packed.packed_cell->body));
+          return 1;
+        }
+      } else {
+        if (q->u.packed.packed_cell->body[2] == CELL_DESTROY) {
+          *circid_out = ntohs(get_uint16(q->u.packed.packed_cell->body));
+          return 1;
+        }
+      }
+      break;
+  }
+  return 0;
+}
+
+
 /**
  * Send destroy cell on a channel
  *
@@ -2618,25 +2664,20 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell)
 int
 channel_send_destroy(circid_t circ_id, channel_t *chan, int reason)
 {
-  cell_t cell;
-
   tor_assert(chan);
 
   /* Check to make sure we can send on this channel first */
   if (!(chan->state == CHANNEL_STATE_CLOSING ||
         chan->state == CHANNEL_STATE_CLOSED ||
-        chan->state == CHANNEL_STATE_ERROR)) {
-    memset(&cell, 0, sizeof(cell_t));
-    cell.circ_id = circ_id;
-    cell.command = CELL_DESTROY;
-    cell.payload[0] = (uint8_t) reason;
+        chan->state == CHANNEL_STATE_ERROR) &&
+      chan->cmux) {
+    channel_note_destroy_pending(chan, circ_id);
+    circuitmux_append_destroy_cell(chan, chan->cmux, circ_id, reason);
     log_debug(LD_OR,
               "Sending destroy (circID %u) on channel %p "
               "(global ID " U64_FORMAT ")",
               (unsigned)circ_id, chan,
               U64_PRINTF_ARG(chan->global_identifier));
-
-    channel_write_cell(chan, &cell);
   } else {
     log_warn(LD_BUG,
              "Someone called channel_send_destroy() for circID %u "

+ 1 - 1
src/or/circuitlist.c

@@ -343,7 +343,7 @@ circuit_set_n_circid_chan(circuit_t *circ, circid_t id,
 
   if (circ->n_delete_pending && old_chan) {
     channel_mark_circid_unusable(old_chan, old_id);
-    circ->n_delete_pending = 1;
+    circ->n_delete_pending = 0;
   }
 }
 

+ 69 - 4
src/or/circuitmux.c

@@ -10,6 +10,7 @@
 #include "channel.h"
 #include "circuitlist.h"
 #include "circuitmux.h"
+#include "relay.h"
 
 /*
  * Private typedefs for circuitmux.c
@@ -115,6 +116,18 @@ struct circuitmux_s {
    */
   struct circuit_t *active_circuits_head, *active_circuits_tail;
 
+  /** List of queued destroy cells */
+  cell_queue_t destroy_cell_queue;
+  /** Boolean: True iff the last cell to circuitmux_get_first_active_circuit
+   * returned the destroy queue. Used to force alternation between
+   * destroy/non-destroy cells.
+   *
+   * XXXX There is no reason to think that alternating is a particularly good
+   * approach -- it's just designed to prevent destroys from starving other
+   * cells completely.
+   */
+  unsigned int last_cell_was_destroy : 1;
+
   /*
    * Circuitmux policy; if this is non-NULL, it can override the built-
    * in round-robin active circuits behavior.  This is how EWMA works in
@@ -508,6 +521,8 @@ circuitmux_free(circuitmux_t *cmux)
     tor_free(cmux->chanid_circid_map);
   }
 
+  cell_queue_clear(&cmux->destroy_cell_queue);
+
   tor_free(cmux);
 }
 
@@ -816,7 +831,7 @@ circuitmux_num_cells(circuitmux_t *cmux)
 {
   tor_assert(cmux);
 
-  return cmux->n_cells;
+  return cmux->n_cells + cmux->destroy_cell_queue.n;
 }
 
 /**
@@ -1368,16 +1383,36 @@ circuitmux_set_num_cells(circuitmux_t *cmux, circuit_t *circ,
 /**
  * Pick a circuit to send from, using the active circuits list or a
  * circuitmux policy if one is available.  This is called from channel.c.
+ *
+ * If we would rather send a destroy cell, return NULL and set
+ * *<b>destroy_queue_out</b> to the destroy queue.
+ *
+ * If we have nothing to send, set *<b>destroy_queue_out</b> to NULL and
+ * return NULL.
  */
 
 circuit_t *
-circuitmux_get_first_active_circuit(circuitmux_t *cmux)
+circuitmux_get_first_active_circuit(circuitmux_t *cmux,
+                                    cell_queue_t **destroy_queue_out)
 {
   circuit_t *circ = NULL;
 
   tor_assert(cmux);
+  tor_assert(destroy_queue_out);
+
+  *destroy_queue_out = NULL;
+
+  if (cmux->destroy_cell_queue.n &&
+        (!cmux->last_cell_was_destroy || cmux->n_active_circuits == 0)) {
+    /* We have destroy cells to send, and either we just sent a relay cell,
+     * or we have no relay cells to send. */
 
-  if (cmux->n_active_circuits > 0) {
+    /* XXXX We should let the cmux policy have some say in this eventually. */
+    /* XXXX Alternating is not a terribly brilliant approach here. */
+    *destroy_queue_out = &cmux->destroy_cell_queue;
+
+    cmux->last_cell_was_destroy = 1;
+  } else if (cmux->n_active_circuits > 0) {
     /* We also must have a cell available for this to be the case */
     tor_assert(cmux->n_cells > 0);
     /* Do we have a policy-provided circuit selector? */
@@ -1389,7 +1424,11 @@ circuitmux_get_first_active_circuit(circuitmux_t *cmux)
       tor_assert(cmux->active_circuits_head);
       circ = cmux->active_circuits_head;
     }
-  } else tor_assert(cmux->n_cells == 0);
+    cmux->last_cell_was_destroy = 0;
+  } else {
+    tor_assert(cmux->n_cells == 0);
+    tor_assert(cmux->destroy_cell_queue.n == 0);
+  }
 
   return circ;
 }
@@ -1743,3 +1782,29 @@ circuitmux_assert_okay_pass_three(circuitmux_t *cmux)
   }
 }
 
+/*DOCDOC */
+void
+circuitmux_append_destroy_cell(channel_t *chan,
+                               circuitmux_t *cmux,
+                               circid_t circ_id,
+                               uint8_t reason)
+{
+  cell_t cell;
+  memset(&cell, 0, sizeof(cell_t));
+  cell.circ_id = circ_id;
+  cell.command = CELL_DESTROY;
+  cell.payload[0] = (uint8_t) reason;
+
+  cell_queue_append_packed_copy(&cmux->destroy_cell_queue, &cell,
+                                chan->wide_circ_ids, 0);
+
+  /* XXXX Duplicate code from append_cell_to_circuit_queue */
+  if (!channel_has_queued_writes(chan)) {
+    /* There is no data at all waiting to be sent on the outbuf.  Add a
+     * cell, so that we can notice when it gets flushed, flushed_some can
+     * get called, and we can start putting more data onto the buffer then.
+     */
+    log_debug(LD_GENERAL, "Primed a buffer.");
+    channel_flush_from_first_active_circuit(chan, 1);
+  }
+}

+ 6 - 1
src/or/circuitmux.h

@@ -120,7 +120,8 @@ unsigned int circuitmux_num_circuits(circuitmux_t *cmux);
 unsigned int circuitmux_num_active_circuits(circuitmux_t *cmux);
 
 /* Channel interface */
-circuit_t * circuitmux_get_first_active_circuit(circuitmux_t *cmux);
+circuit_t * circuitmux_get_first_active_circuit(circuitmux_t *cmux,
+                                           cell_queue_t **destroy_queue_out);
 void circuitmux_notify_xmit_cells(circuitmux_t *cmux, circuit_t *circ,
                                   unsigned int n_cells);
 
@@ -132,5 +133,9 @@ void circuitmux_clear_num_cells(circuitmux_t *cmux, circuit_t *circ);
 void circuitmux_set_num_cells(circuitmux_t *cmux, circuit_t *circ,
                               unsigned int n_cells);
 
+void circuitmux_append_destroy_cell(channel_t *chan,
+                                    circuitmux_t *cmux, circid_t circ_id,
+                                    uint8_t reason);
+
 #endif /* TOR_CIRCUITMUX_H */
 

+ 14 - 5
src/or/relay.c

@@ -2140,11 +2140,11 @@ cell_queue_append(cell_queue_t *queue, packed_cell_t *cell)
 /** Append a newly allocated copy of <b>cell</b> to the end of <b>queue</b> */
 void
 cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell,
-                              int wide_circ_ids)
+                              int wide_circ_ids, int use_stats)
 {
   packed_cell_t *copy = packed_cell_copy(cell, wide_circ_ids);
   /* Remember the time when this cell was put in the queue. */
-  if (get_options()->CellStatistics) {
+  if (get_options()->CellStatistics && use_stats) {
     struct timeval now;
     uint32_t added;
     insertion_time_queue_t *it_queue = queue->insertion_times;
@@ -2339,7 +2339,7 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
 {
   circuitmux_t *cmux = NULL;
   int n_flushed = 0;
-  cell_queue_t *queue;
+  cell_queue_t *queue, *destroy_queue=NULL;
   circuit_t *circ;
   or_circuit_t *or_circ;
   int streams_blocked;
@@ -2352,7 +2352,16 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
 
   /* Main loop: pick a circuit, send a cell, update the cmux */
   while (n_flushed < max) {
-    circ = circuitmux_get_first_active_circuit(cmux);
+    circ = circuitmux_get_first_active_circuit(cmux, &destroy_queue);
+    if (destroy_queue) {
+      /* this code is duplicated from some of the logic below. Ugly! XXXX */
+      tor_assert(destroy_queue->n > 0);
+      cell = cell_queue_pop(destroy_queue);
+      channel_write_packed_cell(chan, cell);
+      cell = NULL;
+      ++n_flushed;
+      continue;
+    }
     /* If it returns NULL, no cells left to send */
     if (!circ) break;
     assert_cmux_ok_paranoid(chan);
@@ -2474,7 +2483,7 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
     streams_blocked = circ->streams_blocked_on_p_chan;
   }
 
-  cell_queue_append_packed_copy(queue, cell, chan->wide_circ_ids);
+  cell_queue_append_packed_copy(queue, cell, chan->wide_circ_ids, 1);
 
   /* If we have too many cells on the circuit, we should stop reading from
    * the edge streams for a while. */

+ 1 - 1
src/or/relay.h

@@ -47,7 +47,7 @@ void packed_cell_free(packed_cell_t *cell);
 void cell_queue_clear(cell_queue_t *queue);
 void cell_queue_append(cell_queue_t *queue, packed_cell_t *cell);
 void cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell,
-                                   int wide_circ_ids);
+                                   int wide_circ_ids, int use_stats);
 
 void append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
                                   cell_t *cell, cell_direction_t direction,