Browse Source

Merge branch 'ticket25573-034' into ticket25573-master

Mike Perry 5 years ago
parent
commit
93ff8b411a

+ 5 - 0
changes/ticket25573

@@ -0,0 +1,5 @@
+  o Minor features (controller):
+    - For purposes of CIRC_BW-based dropped cell detection, track half-closed
+      stream ids, and allow their ENDs, SENDMEs, DATA and path bias check
+      cells to arrive without counting it as dropped until either the END arrvies,
+      or the windows are empty. Closes ticket 25573.

+ 1 - 2
src/core/or/circuitbuild.c

@@ -1429,13 +1429,12 @@ circuit_finish_handshake(origin_circuit_t *circ,
  * just give up: force circ to close, and return 0.
  */
 int
-circuit_truncated(origin_circuit_t *circ, crypt_path_t *layer, int reason)
+circuit_truncated(origin_circuit_t *circ, int reason)
 {
 //  crypt_path_t *victim;
 //  connection_t *stream;
 
   tor_assert(circ);
-  tor_assert(layer);
 
   /* XXX Since we don't send truncates currently, getting a truncated
    *     means that a connection broke or an extend failed. For now,

+ 1 - 2
src/core/or/circuitbuild.h

@@ -40,8 +40,7 @@ int circuit_init_cpath_crypto(crypt_path_t *cpath,
 struct created_cell_t;
 int circuit_finish_handshake(origin_circuit_t *circ,
                              const struct created_cell_t *created_cell);
-int circuit_truncated(origin_circuit_t *circ, crypt_path_t *layer,
-                      int reason);
+int circuit_truncated(origin_circuit_t *circ, int reason);
 int onionskin_answer(or_circuit_t *circ,
                      const struct created_cell_t *created_cell,
                      const char *keys, size_t keys_len,

+ 9 - 0
src/core/or/circuitlist.c

@@ -99,6 +99,7 @@
 #include "core/or/crypt_path_reference_st.h"
 #include "feature/dircommon/dir_connection_st.h"
 #include "core/or/edge_connection_st.h"
+#include "core/or/half_edge_st.h"
 #include "core/or/extend_info_st.h"
 #include "core/or/or_circuit_st.h"
 #include "core/or/origin_circuit_st.h"
@@ -1078,6 +1079,14 @@ circuit_free_(circuit_t *circ)
 
     circuit_remove_from_origin_circuit_list(ocirc);
 
+    if (ocirc->half_streams) {
+      SMARTLIST_FOREACH_BEGIN(ocirc->half_streams, half_edge_t*,
+                              half_conn) {
+          tor_free(half_conn);
+      } SMARTLIST_FOREACH_END(half_conn);
+      smartlist_free(ocirc->half_streams);
+    }
+
     if (ocirc->build_state) {
         extend_info_free(ocirc->build_state->chosen_exit);
         circuit_free_cpath_node(ocirc->build_state->pending_final_cpath);

+ 226 - 0
src/core/or/connection_edge.c

@@ -105,6 +105,7 @@
 #include "feature/nodelist/node_st.h"
 #include "core/or/or_circuit_st.h"
 #include "core/or/origin_circuit_st.h"
+#include "core/or/half_edge_st.h"
 #include "core/or/socks_request_st.h"
 #include "lib/evloop/compat_libevent.h"
 
@@ -154,6 +155,11 @@ static int connection_ap_process_natd(entry_connection_t *conn);
 static int connection_exit_connect_dir(edge_connection_t *exitconn);
 static int consider_plaintext_ports(entry_connection_t *conn, uint16_t port);
 static int connection_ap_supports_optimistic_data(const entry_connection_t *);
+STATIC void connection_half_edge_add(const edge_connection_t *conn,
+                                     origin_circuit_t *circ);
+STATIC half_edge_t *connection_half_edge_find_stream_id(
+                                    const smartlist_t *half_conns,
+                                    streamid_t stream_id);
 
 /** Convert a connection_t* to an edge_connection_t*; assert if the cast is
  * invalid. */
@@ -472,6 +478,12 @@ connection_edge_end(edge_connection_t *conn, uint8_t reason)
   if (circ && !circ->marked_for_close) {
     log_debug(LD_EDGE,"Sending end on conn (fd "TOR_SOCKET_T_FORMAT").",
               conn->base_.s);
+
+    if (CIRCUIT_IS_ORIGIN(circ)) {
+      origin_circuit_t *origin_circ = TO_ORIGIN_CIRCUIT(circ);
+      connection_half_edge_add(conn, origin_circ);
+    }
+
     connection_edge_send_command(conn, RELAY_COMMAND_END,
                                  payload, payload_len);
     /* We'll log warn if the connection was an hidden service and couldn't be
@@ -488,6 +500,215 @@ connection_edge_end(edge_connection_t *conn, uint8_t reason)
   return 0;
 }
 
+/**
+ * Helper function for bsearch.
+ *
+ * As per smartlist_bsearch, return < 0 if key preceeds member,
+ * > 0 if member preceeds key, and 0 if they are equal.
+ *
+ * This is equivalent to subtraction of the values of key - member
+ * (why does no one ever say that explicitly?).
+ */
+static int
+connection_half_edge_compare_bsearch(const void *key, const void **member)
+{
+  const half_edge_t *e2;
+  tor_assert(key);
+  tor_assert(member && *(half_edge_t**)member);
+  e2 = *(const half_edge_t **)member;
+
+  return *(const streamid_t*)key - e2->stream_id;
+}
+
+/**
+ * Add a half-closed connection to the list, to watch for activity.
+ *
+ * These connections are removed from the list upon receiving an end
+ * cell.
+ */
+STATIC void
+connection_half_edge_add(const edge_connection_t *conn,
+                         origin_circuit_t *circ)
+{
+  half_edge_t *half_conn = NULL;
+  int insert_at = 0;
+  int ignored;
+
+  /* Double-check for re-insertion. This should not happen,
+   * but this check is cheap compared to the sort anyway */
+  if (connection_half_edge_find_stream_id(circ->half_streams,
+                                          conn->stream_id)) {
+    log_warn(LD_BUG, "Duplicate stream close for stream %d on circuit %d",
+             conn->stream_id, circ->global_identifier);
+    return;
+  }
+
+  half_conn = tor_malloc_zero(sizeof(half_edge_t));
+
+  if (!circ->half_streams) {
+    circ->half_streams = smartlist_new();
+  }
+
+  half_conn->stream_id = conn->stream_id;
+
+  // How many sendme's should I expect?
+  half_conn->sendmes_pending =
+   (STREAMWINDOW_START-conn->package_window)/STREAMWINDOW_INCREMENT;
+
+   // Is there a connected cell pending?
+  half_conn->connected_pending = conn->base_.state ==
+      AP_CONN_STATE_CONNECT_WAIT;
+
+  /* Data should only arrive if we're not waiting on a resolved cell.
+   * It can arrive after waiting on connected, because of optimistic
+   * data. */
+  if (conn->base_.state != AP_CONN_STATE_RESOLVE_WAIT) {
+    // How many more data cells can arrive on this id?
+    half_conn->data_pending = conn->deliver_window;
+  }
+
+  insert_at = smartlist_bsearch_idx(circ->half_streams, &half_conn->stream_id,
+                                    connection_half_edge_compare_bsearch,
+                                    &ignored);
+  smartlist_insert(circ->half_streams, insert_at, half_conn);
+}
+
+/**
+ * Find a stream_id_t in the list in O(lg(n)).
+ *
+ * Returns NULL if the list is empty or element is not found.
+ * Returns a pointer to the element if found.
+ */
+STATIC half_edge_t *
+connection_half_edge_find_stream_id(const smartlist_t *half_conns,
+                                    streamid_t stream_id)
+{
+  if (!half_conns)
+    return NULL;
+
+  return smartlist_bsearch(half_conns, &stream_id,
+                           connection_half_edge_compare_bsearch);
+}
+
+/**
+ * Check if this stream_id is in a half-closed state. If so,
+ * check if it still has data cells pending, and decrement that
+ * window if so.
+ *
+ * Return 1 if the data window was not empty.
+ * Return 0 otherwise.
+ */
+int
+connection_half_edge_is_valid_data(const smartlist_t *half_conns,
+                                   streamid_t stream_id)
+{
+  half_edge_t *half = connection_half_edge_find_stream_id(half_conns,
+                                                          stream_id);
+
+  if (!half)
+    return 0;
+
+  if (half->data_pending > 0) {
+    half->data_pending--;
+    return 1;
+  }
+
+  return 0;
+}
+
+/**
+ * Check if this stream_id is in a half-closed state. If so,
+ * check if it still has a connected cell pending, and decrement
+ * that window if so.
+ *
+ * Return 1 if the connected window was not empty.
+ * Return 0 otherwise.
+ */
+int
+connection_half_edge_is_valid_connected(const smartlist_t *half_conns,
+                                        streamid_t stream_id)
+{
+  half_edge_t *half = connection_half_edge_find_stream_id(half_conns,
+                                                          stream_id);
+
+  if (!half)
+    return 0;
+
+  if (half->connected_pending) {
+    half->connected_pending = 0;
+    return 1;
+  }
+
+  return 0;
+}
+
+/**
+ * Check if this stream_id is in a half-closed state. If so,
+ * check if it still has sendme cells pending, and decrement that
+ * window if so.
+ *
+ * Return 1 if the sendme window was not empty.
+ * Return 0 otherwise.
+ */
+int
+connection_half_edge_is_valid_sendme(const smartlist_t *half_conns,
+                                     streamid_t stream_id)
+{
+  half_edge_t *half = connection_half_edge_find_stream_id(half_conns,
+                                                          stream_id);
+
+  if (!half)
+    return 0;
+
+  if (half->sendmes_pending > 0) {
+    half->sendmes_pending--;
+    return 1;
+  }
+
+  return 0;
+}
+
+/**
+ * Check if this stream_id is in a half-closed state. If so, remove
+ * it from the list. No other data should come after the END cell.
+ *
+ * Return 1 if stream_id was in half-closed state.
+ * Return 0 otherwise.
+ */
+int
+connection_half_edge_is_valid_end(smartlist_t *half_conns,
+                                  streamid_t stream_id)
+{
+  half_edge_t *half;
+  int found, remove_idx;
+
+  if (!half_conns)
+    return 0;
+
+  remove_idx = smartlist_bsearch_idx(half_conns, &stream_id,
+                                    connection_half_edge_compare_bsearch,
+                                    &found);
+  if (!found)
+    return 0;
+
+  half = smartlist_get(half_conns, remove_idx);
+  smartlist_del_keeporder(half_conns, remove_idx);
+  tor_free(half);
+  return 1;
+}
+
+/**
+ * Streams that were used to send a RESOLVE cell are closed
+ * when they get the RESOLVED, without an end. So treat
+ * a RESOLVED just like an end, and remove from the list.
+ */
+int
+connection_half_edge_is_valid_resolved(smartlist_t *half_conns,
+                                       streamid_t stream_id)
+{
+  return connection_half_edge_is_valid_end(half_conns, stream_id);
+}
+
 /** An error has just occurred on an operation on an edge connection
  * <b>conn</b>.  Extract the errno; convert it to an end reason, and send an
  * appropriate relay end cell to the other end of the connection's circuit.
@@ -2623,6 +2844,11 @@ get_unique_stream_id_by_circ(origin_circuit_t *circ)
   for (tmpconn = circ->p_streams; tmpconn; tmpconn=tmpconn->next_stream)
     if (tmpconn->stream_id == test_stream_id)
       goto again;
+
+  if (connection_half_edge_find_stream_id(circ->half_streams,
+                                           test_stream_id))
+    goto again;
+
   return test_stream_id;
 }
 

+ 11 - 0
src/core/or/connection_edge.h

@@ -174,6 +174,17 @@ void connection_ap_warn_and_unmark_if_pending_circ(
                                              entry_connection_t *entry_conn,
                                              const char *where);
 
+int connection_half_edge_is_valid_data(const smartlist_t *half_conns,
+                                       streamid_t stream_id);
+int connection_half_edge_is_valid_sendme(const smartlist_t *half_conns,
+                                         streamid_t stream_id);
+int connection_half_edge_is_valid_connected(const smartlist_t *half_conns,
+                                            streamid_t stream_id);
+int connection_half_edge_is_valid_end(smartlist_t *half_conns,
+                                      streamid_t stream_id);
+int connection_half_edge_is_valid_resolved(smartlist_t *half_conns,
+                                           streamid_t stream_id);
+
 /** @name Begin-cell flags
  *
  * These flags are used in RELAY_BEGIN cells to change the default behavior

+ 34 - 0
src/core/or/half_edge_st.h

@@ -0,0 +1,34 @@
+/* Copyright (c) 2001 Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef HALF_EDGE_ST_H
+#define HALF_EDGE_ST_H
+
+#include "core/or/or.h"
+
+/**
+ * Struct to track a connection that we closed that the other end
+ * still thinks is open. Exists in origin_circuit_t.half_streams until
+ * we get an end cell or a resolved cell for this stream id.
+ */
+typedef struct half_edge_t {
+  /** stream_id for the half-closed connection */
+  streamid_t stream_id;
+
+  /** How many sendme's can the other end still send, based on how
+   * much data we had sent at the time of close */
+  int sendmes_pending;
+
+  /** How much more data can the other end still send, based on
+   * our deliver window */
+  int data_pending;
+
+  /** Is there a connected cell pending? */
+  int connected_pending : 1;
+} half_edge_t;
+
+#endif
+

+ 4 - 0
src/core/or/origin_circuit_st.h

@@ -78,6 +78,10 @@ struct origin_circuit_t {
    * associated with this circuit. */
   edge_connection_t *p_streams;
 
+  /** Smartlist of half-closed streams (half_edge_t*) that still
+   * have pending activity */
+  smartlist_t *half_streams;
+
   /** Bytes read on this circuit since last call to
    * control_event_circ_bandwidth_used().  Only used if we're configured
    * to emit CIRC_BW events. */

+ 73 - 2
src/core/or/relay.c

@@ -255,7 +255,9 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ,
     edge_connection_t *conn = NULL;
 
     if (circ->purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING) {
-      pathbias_check_probe_response(circ, cell);
+      if (pathbias_check_probe_response(circ, cell) == -1) {
+        pathbias_count_valid_cells(circ, cell);
+      }
 
       /* We need to drop this cell no matter what to avoid code that expects
        * a certain purpose (such as the hidserv code). */
@@ -1561,6 +1563,17 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
                "stream_id. Dropping.");
         return 0;
       } else if (!conn) {
+        if (CIRCUIT_IS_ORIGIN(circ)) {
+          origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
+          if (connection_half_edge_is_valid_data(ocirc->half_streams,
+                                                 rh.stream_id)) {
+            circuit_read_valid_data(ocirc, rh.length);
+            log_info(domain,
+                     "data cell on circ %u valid on half-closed "
+                     "stream id %d", ocirc->global_identifier, rh.stream_id);
+          }
+        }
+
         log_info(domain,"data cell dropped, unknown stream (streamid %d).",
                  rh.stream_id);
         return 0;
@@ -1602,6 +1615,20 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
       reason = rh.length > 0 ?
         get_uint8(cell->payload+RELAY_HEADER_SIZE) : END_STREAM_REASON_MISC;
       if (!conn) {
+        if (CIRCUIT_IS_ORIGIN(circ)) {
+          origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
+          if (connection_half_edge_is_valid_end(ocirc->half_streams,
+                                                rh.stream_id)) {
+
+            circuit_read_valid_data(ocirc, rh.length);
+            log_info(domain,
+                     "end cell (%s) on circ %u valid on half-closed "
+                     "stream id %d",
+                     stream_end_reason_to_string(reason),
+                     ocirc->global_identifier, rh.stream_id);
+            return 0;
+          }
+        }
         log_info(domain,"end cell (%s) dropped, unknown stream.",
                  stream_end_reason_to_string(reason));
         return 0;
@@ -1737,7 +1764,14 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
                "'truncated' unsupported at non-origin. Dropping.");
         return 0;
       }
-      circuit_truncated(TO_ORIGIN_CIRCUIT(circ), layer_hint,
+
+      /* Count the truncated as valid, for completeness. The
+       * circuit is being torn down anyway, though.  */
+      if (CIRCUIT_IS_ORIGIN(circ)) {
+        circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ),
+                                rh.length);
+      }
+      circuit_truncated(TO_ORIGIN_CIRCUIT(circ),
                         get_uint8(cell->payload + RELAY_HEADER_SIZE));
       return 0;
     case RELAY_COMMAND_CONNECTED:
@@ -1746,6 +1780,19 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
                "'connected' unsupported while open. Closing circ.");
         return -END_CIRC_REASON_TORPROTOCOL;
       }
+
+      if (CIRCUIT_IS_ORIGIN(circ)) {
+        origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
+        if (connection_half_edge_is_valid_connected(ocirc->half_streams,
+                                                    rh.stream_id)) {
+          circuit_read_valid_data(ocirc, rh.length);
+          log_info(domain,
+                   "connected cell on circ %u valid on half-closed "
+                   "stream id %d", ocirc->global_identifier, rh.stream_id);
+          return 0;
+        }
+      }
+
       log_info(domain,
                "'connected' received on circid %u for streamid %d, "
                "no conn attached anymore. Ignoring.",
@@ -1794,6 +1841,17 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
         return 0;
       }
       if (!conn) {
+        if (CIRCUIT_IS_ORIGIN(circ)) {
+          origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
+          if (connection_half_edge_is_valid_sendme(ocirc->half_streams,
+                                                   rh.stream_id)) {
+            circuit_read_valid_data(ocirc, rh.length);
+            log_info(domain,
+                    "sendme cell on circ %u valid on half-closed "
+                    "stream id %d", ocirc->global_identifier, rh.stream_id);
+          }
+        }
+
         log_info(domain,"sendme cell dropped, unknown stream (streamid %d).",
                  rh.stream_id);
         return 0;
@@ -1857,6 +1915,19 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
                "'resolved' unsupported while open. Closing circ.");
         return -END_CIRC_REASON_TORPROTOCOL;
       }
+
+      if (CIRCUIT_IS_ORIGIN(circ)) {
+        origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
+        if (connection_half_edge_is_valid_resolved(ocirc->half_streams,
+                                                    rh.stream_id)) {
+          circuit_read_valid_data(ocirc, rh.length);
+          log_info(domain,
+                   "resolved cell on circ %u valid on half-closed "
+                   "stream id %d", ocirc->global_identifier, rh.stream_id);
+          return 0;
+        }
+      }
+
       log_info(domain,
                "'resolved' received, no conn attached anymore. Ignoring.");
       return 0;

+ 63 - 0
src/feature/client/circpathbias.c

@@ -901,6 +901,7 @@ pathbias_check_probe_response(circuit_t *circ, const cell_t *cell)
     /* Check nonce */
     if (ipv4_host == ocirc->pathbias_probe_nonce) {
       pathbias_mark_use_success(ocirc);
+      circuit_read_valid_data(ocirc, rh.length);
       circuit_mark_for_close(circ, END_CIRC_REASON_FINISHED);
       log_info(LD_CIRC,
                "Got valid path bias probe back for circ %d, stream %d.",
@@ -921,6 +922,68 @@ pathbias_check_probe_response(circuit_t *circ, const cell_t *cell)
   return -1;
 }
 
+/**
+ * Check if a cell is counts as valid data for a circuit,
+ * and if so, count it as valid.
+ */
+void
+pathbias_count_valid_cells(circuit_t *circ, const cell_t *cell)
+{
+  origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
+  relay_header_t rh;
+
+  relay_header_unpack(&rh, cell->payload);
+
+  /* Check to see if this is a cell from a previous connection,
+   * or is a request to close the circuit. */
+  switch (rh.command) {
+    case RELAY_COMMAND_TRUNCATED:
+      /* Truncated cells can arrive on path bias circs. When they do,
+       * just process them. This closes the circ, but it was junk anyway.
+       * No reason to wait for the probe. */
+      circuit_read_valid_data(ocirc, rh.length);
+      circuit_truncated(TO_ORIGIN_CIRCUIT(circ),
+                        get_uint8(cell->payload + RELAY_HEADER_SIZE));
+
+      break;
+
+    case RELAY_COMMAND_END:
+      if (connection_half_edge_is_valid_end(ocirc->half_streams,
+                                             rh.stream_id)) {
+        circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh.length);
+      }
+      break;
+
+    case RELAY_COMMAND_DATA:
+      if (connection_half_edge_is_valid_data(ocirc->half_streams,
+                                             rh.stream_id)) {
+        circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh.length);
+      }
+      break;
+
+    case RELAY_COMMAND_SENDME:
+      if (connection_half_edge_is_valid_sendme(ocirc->half_streams,
+                                             rh.stream_id)) {
+        circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh.length);
+      }
+      break;
+
+    case RELAY_COMMAND_CONNECTED:
+      if (connection_half_edge_is_valid_connected(ocirc->half_streams,
+                                                  rh.stream_id)) {
+        circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh.length);
+      }
+      break;
+
+    case RELAY_COMMAND_RESOLVED:
+      if (connection_half_edge_is_valid_resolved(ocirc->half_streams,
+                                                 rh.stream_id)) {
+        circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh.length);
+      }
+      break;
+  }
+}
+
 /**
  * Check if a circuit was used and/or closed successfully.
  *

+ 1 - 0
src/feature/client/circpathbias.h

@@ -20,6 +20,7 @@ void pathbias_count_build_success(origin_circuit_t *circ);
 int pathbias_count_build_attempt(origin_circuit_t *circ);
 int pathbias_check_close(origin_circuit_t *circ, int reason);
 int pathbias_check_probe_response(circuit_t *circ, const cell_t *cell);
+void pathbias_count_valid_cells(circuit_t *circ, const cell_t *cell);
 void pathbias_count_use_attempt(origin_circuit_t *circ);
 void pathbias_mark_use_success(origin_circuit_t *circ);
 void pathbias_mark_use_rollback(origin_circuit_t *circ);

+ 1 - 1
src/lib/container/smartlist.c

@@ -408,7 +408,7 @@ smartlist_uniq(smartlist_t *sl,
  * less than member, and greater than 0 if key is greater then member.
  */
 void *
-smartlist_bsearch(smartlist_t *sl, const void *key,
+smartlist_bsearch(const smartlist_t *sl, const void *key,
                   int (*compare)(const void *key, const void **member))
 {
   int found, idx;

+ 1 - 1
src/lib/container/smartlist.h

@@ -64,7 +64,7 @@ const uint8_t *smartlist_get_most_frequent_digest256(smartlist_t *sl);
 void smartlist_uniq_strings(smartlist_t *sl);
 void smartlist_uniq_digests(smartlist_t *sl);
 void smartlist_uniq_digests256(smartlist_t *sl);
-void *smartlist_bsearch(smartlist_t *sl, const void *key,
+void *smartlist_bsearch(const smartlist_t *sl, const void *key,
                         int (*compare)(const void *key, const void **member));
 int smartlist_bsearch_idx(const smartlist_t *sl, const void *key,
                           int (*compare)(const void *key, const void **member),

+ 603 - 37
src/test/test_relaycell.c

@@ -10,17 +10,20 @@
 #include "app/config/config.h"
 #include "core/mainloop/connection.h"
 #include "lib/crypt_ops/crypto.h"
+#include "lib/crypt_ops/crypto_rand.h"
 #include "core/or/circuitbuild.h"
 #include "core/or/circuitlist.h"
 #include "core/or/connection_edge.h"
 #include "core/or/relay.h"
 #include "test/test.h"
+#include "test/log_test_helpers.h"
 
 #include "core/or/cell_st.h"
 #include "core/or/crypt_path_st.h"
 #include "core/or/entry_connection_st.h"
 #include "core/or/origin_circuit_st.h"
 #include "core/or/socks_request_st.h"
+#include "core/or/half_edge_st.h"
 
 static int srm_ncalls;
 static entry_connection_t *srm_conn;
@@ -35,6 +38,18 @@ void connection_free_minimal(connection_t*);
 int connected_cell_format_payload(uint8_t *payload_out,
                               const tor_addr_t *addr,
                               uint32_t ttl);
+int pathbias_count_valid_cells(origin_circuit_t *circ,
+                              cell_t *cell);
+half_edge_t *connection_half_edge_find_stream_id(
+                                    const smartlist_t *half_conns,
+                                    streamid_t stream_id);
+void connection_half_edge_add(const edge_connection_t *conn,
+                         origin_circuit_t *circ);
+
+int mock_send_command(streamid_t stream_id, circuit_t *circ,
+                               uint8_t relay_command, const char *payload,
+                               size_t payload_len, crypt_path_t *cpath_layer,
+                               const char *filename, int lineno);
 
 /* Mock replacement for connection_ap_hannshake_socks_resolved() */
 static void
@@ -107,6 +122,16 @@ mock_connection_mark_unattached_ap_(entry_connection_t *conn, int endreason,
   conn->edge_.end_reason = endreason;
 }
 
+static void
+mock_mark_circ_for_close(circuit_t *circ, int reason, int line,
+                          const char *file)
+{
+  (void)reason; (void)line; (void)file;
+
+  circ->marked_for_close = 1;
+  return;
+}
+
 static void
 mock_mark_for_close(connection_t *conn,
                         int line, const char *file)
@@ -125,19 +150,38 @@ mock_start_reading(connection_t *conn)
   return;
 }
 
-static void
-test_circbw_relay(void *arg)
+int
+mock_send_command(streamid_t stream_id, circuit_t *circ,
+                               uint8_t relay_command, const char *payload,
+                               size_t payload_len, crypt_path_t *cpath_layer,
+                               const char *filename, int lineno)
+{
+ (void)stream_id; (void)circ;
+ (void)relay_command; (void)payload;
+ (void)payload_len; (void)cpath_layer;
+ (void)filename; (void)lineno;
+
+ return 0;
+}
+
+static entry_connection_t *
+fake_entry_conn(origin_circuit_t *oncirc, streamid_t id)
 {
-  cell_t cell;
-  relay_header_t rh;
-  tor_addr_t addr;
   edge_connection_t *edgeconn;
   entry_connection_t *entryconn;
-  origin_circuit_t *circ;
-  int delivered = 0;
-  int overhead = 0;
 
-  (void)arg;
+  entryconn = entry_connection_new(CONN_TYPE_AP, AF_INET);
+  edgeconn = ENTRY_TO_EDGE_CONN(entryconn);
+  edgeconn->base_.state = AP_CONN_STATE_CONNECT_WAIT;
+  edgeconn->deliver_window = STREAMWINDOW_START;
+  edgeconn->package_window = STREAMWINDOW_START;
+
+  edgeconn->stream_id = id;
+  edgeconn->on_circuit = TO_CIRCUIT(oncirc);
+  edgeconn->cpath_layer = oncirc->cpath;
+
+  return entryconn;
+}
 
 #define PACK_CELL(id, cmd, body_s) do {                                  \
     memset(&cell, 0, sizeof(cell));                                     \
@@ -160,18 +204,521 @@ test_circbw_relay(void *arg)
     tt_int_op(circ->n_overhead_read_circ_bw, OP_EQ, overhead); \
  } while (0)
 
+static int
+subtest_circbw_halfclosed(origin_circuit_t *circ, streamid_t init_id)
+{
+  cell_t cell;
+  relay_header_t rh;
+  edge_connection_t *edgeconn;
+  entry_connection_t *entryconn2=NULL;
+  entry_connection_t *entryconn3=NULL;
+  entry_connection_t *entryconn4=NULL;
+  int delivered = circ->n_delivered_read_circ_bw;
+  int overhead = circ->n_overhead_read_circ_bw;
+
+  /* Make new entryconns */
+  entryconn2 = fake_entry_conn(circ, init_id);
+  entryconn2->socks_request->has_finished = 1;
+  entryconn3 = fake_entry_conn(circ, init_id+1);
+  entryconn3->socks_request->has_finished = 1;
+  entryconn4 = fake_entry_conn(circ, init_id+2);
+  entryconn4->socks_request->has_finished = 1;
+  edgeconn = ENTRY_TO_EDGE_CONN(entryconn2);
+  edgeconn->package_window = 23;
+  edgeconn->base_.state = AP_CONN_STATE_OPEN;
+
+  int data_cells = edgeconn->deliver_window;
+  int sendme_cells = (STREAMWINDOW_START-edgeconn->package_window)
+                             /STREAMWINDOW_INCREMENT;
+  ENTRY_TO_CONN(entryconn2)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn2)->outbuf_flushlen = 0;
+  connection_edge_reached_eof(edgeconn);
+
+  /* Data cell not in the half-opened list */
+  PACK_CELL(4000, RELAY_COMMAND_DATA, "Data1234");
+  if (circ->base_.purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING)
+    pathbias_count_valid_cells(circ, &cell);
+  else
+    connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
+                                       circ->cpath);
+  ASSERT_UNCOUNTED_BW();
+
+  /* Sendme cell not in the half-opened list */
+  PACK_CELL(4000, RELAY_COMMAND_SENDME, "Data1234");
+  if (circ->base_.purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING)
+    pathbias_count_valid_cells(circ, &cell);
+  else
+    connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
+                                     circ->cpath);
+  ASSERT_UNCOUNTED_BW();
+
+  /* Connected cell not in the half-opened list */
+  PACK_CELL(4000, RELAY_COMMAND_CONNECTED, "Data1234");
+  if (circ->base_.purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING)
+    pathbias_count_valid_cells(circ, &cell);
+  else
+    connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
+                                     circ->cpath);
+  ASSERT_UNCOUNTED_BW();
+
+  /* Resolved cell not in the half-opened list */
+  PACK_CELL(4000, RELAY_COMMAND_RESOLVED, "Data1234");
+  if (circ->base_.purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING)
+    pathbias_count_valid_cells(circ, &cell);
+  else
+    connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
+                                     circ->cpath);
+  ASSERT_UNCOUNTED_BW();
+
+  /* Connected cell: not counted -- we were open */
+  edgeconn = ENTRY_TO_EDGE_CONN(entryconn2);
+  PACK_CELL(edgeconn->stream_id, RELAY_COMMAND_CONNECTED, "Data1234");
+  if (circ->base_.purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING)
+    pathbias_count_valid_cells(circ, &cell);
+  else
+    connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
+                                     circ->cpath);
+  ASSERT_UNCOUNTED_BW();
+
+  /* DATA cells up to limit */
+  while (data_cells > 0) {
+    ENTRY_TO_CONN(entryconn2)->marked_for_close = 0;
+    ENTRY_TO_CONN(entryconn2)->outbuf_flushlen = 0;
+    PACK_CELL(edgeconn->stream_id, RELAY_COMMAND_DATA, "Data1234");
+    if (circ->base_.purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING)
+      pathbias_count_valid_cells(circ, &cell);
+    else
+      connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
+                                       circ->cpath);
+    ASSERT_COUNTED_BW();
+    data_cells--;
+  }
+  ENTRY_TO_CONN(entryconn2)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn2)->outbuf_flushlen = 0;
+  PACK_CELL(edgeconn->stream_id, RELAY_COMMAND_DATA, "Data1234");
+  if (circ->base_.purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING)
+    pathbias_count_valid_cells(circ, &cell);
+  else
+    connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
+                                     circ->cpath);
+  ASSERT_UNCOUNTED_BW();
+
+  /* SENDME cells up to limit */
+  while (sendme_cells > 0) {
+    ENTRY_TO_CONN(entryconn2)->marked_for_close = 0;
+    ENTRY_TO_CONN(entryconn2)->outbuf_flushlen = 0;
+    PACK_CELL(edgeconn->stream_id, RELAY_COMMAND_SENDME, "Data1234");
+    if (circ->base_.purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING)
+      pathbias_count_valid_cells(circ, &cell);
+    else
+      connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
+                                       circ->cpath);
+    ASSERT_COUNTED_BW();
+    sendme_cells--;
+  }
+  ENTRY_TO_CONN(entryconn2)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn2)->outbuf_flushlen = 0;
+  PACK_CELL(edgeconn->stream_id, RELAY_COMMAND_SENDME, "Data1234");
+  if (circ->base_.purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING)
+    pathbias_count_valid_cells(circ, &cell);
+  else
+    connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
+                                     circ->cpath);
+  ASSERT_UNCOUNTED_BW();
+
+  /* Only one END cell */
+  ENTRY_TO_CONN(entryconn2)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn2)->outbuf_flushlen = 0;
+  PACK_CELL(edgeconn->stream_id, RELAY_COMMAND_END, "Data1234");
+  if (circ->base_.purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING)
+    pathbias_count_valid_cells(circ, &cell);
+  else
+    connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
+                                     circ->cpath);
+  ASSERT_COUNTED_BW();
+
+  ENTRY_TO_CONN(entryconn2)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn2)->outbuf_flushlen = 0;
+  PACK_CELL(edgeconn->stream_id, RELAY_COMMAND_END, "Data1234");
+  if (circ->base_.purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING)
+    pathbias_count_valid_cells(circ, &cell);
+  else
+    connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
+                                     circ->cpath);
+  ASSERT_UNCOUNTED_BW();
+
+  edgeconn = ENTRY_TO_EDGE_CONN(entryconn3);
+  edgeconn->base_.state = AP_CONN_STATE_OPEN;
+  ENTRY_TO_CONN(entryconn3)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn3)->outbuf_flushlen = 0;
+  /* sendme cell on open entryconn with full window */
+  PACK_CELL(edgeconn->stream_id, RELAY_COMMAND_SENDME, "Data1234");
+  int ret =
+    connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), edgeconn,
+                                     circ->cpath);
+  tt_int_op(ret, OP_EQ, -END_CIRC_REASON_TORPROTOCOL);
+  ASSERT_UNCOUNTED_BW();
+
+  /* connected cell on a after EOF */
+  ENTRY_TO_CONN(entryconn3)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn3)->outbuf_flushlen = 0;
+  edgeconn->base_.state = AP_CONN_STATE_CONNECT_WAIT;
+  connection_edge_reached_eof(edgeconn);
+  PACK_CELL(edgeconn->stream_id, RELAY_COMMAND_CONNECTED, "Data1234");
+  if (circ->base_.purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING)
+    pathbias_count_valid_cells(circ, &cell);
+  else
+    connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ),  NULL,
+                                     circ->cpath);
+  ASSERT_COUNTED_BW();
+
+  ENTRY_TO_CONN(entryconn3)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn3)->outbuf_flushlen = 0;
+  PACK_CELL(edgeconn->stream_id, RELAY_COMMAND_CONNECTED, "Data1234");
+  if (circ->base_.purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING)
+    pathbias_count_valid_cells(circ, &cell);
+  else
+    connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ),  NULL,
+                                     circ->cpath);
+  ASSERT_UNCOUNTED_BW();
+
+  /* DATA and SENDME after END cell */
+  ENTRY_TO_CONN(entryconn3)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn3)->outbuf_flushlen = 0;
+  PACK_CELL(edgeconn->stream_id, RELAY_COMMAND_END, "Data1234");
+  if (circ->base_.purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING)
+    pathbias_count_valid_cells(circ, &cell);
+  else
+    connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ),  NULL,
+                                     circ->cpath);
+  ASSERT_COUNTED_BW();
+
+  ENTRY_TO_CONN(entryconn3)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn3)->outbuf_flushlen = 0;
+  PACK_CELL(edgeconn->stream_id, RELAY_COMMAND_SENDME, "Data1234");
+  ret =
+    connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
+                                     circ->cpath);
+  tt_int_op(ret, OP_NE, -END_CIRC_REASON_TORPROTOCOL);
+  ASSERT_UNCOUNTED_BW();
+
+  ENTRY_TO_CONN(entryconn3)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn3)->outbuf_flushlen = 0;
+  PACK_CELL(edgeconn->stream_id, RELAY_COMMAND_DATA, "Data1234");
+  if (circ->base_.purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING)
+    pathbias_count_valid_cells(circ, &cell);
+  else
+    connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
+                                     circ->cpath);
+  ASSERT_UNCOUNTED_BW();
+
+  /* Resolved: 1 counted, more not */
+  edgeconn = ENTRY_TO_EDGE_CONN(entryconn4);
+  entryconn4->socks_request->command = SOCKS_COMMAND_RESOLVE;
+  edgeconn->base_.state = AP_CONN_STATE_RESOLVE_WAIT;
+  edgeconn->on_circuit = TO_CIRCUIT(circ);
+  ENTRY_TO_CONN(entryconn4)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn4)->outbuf_flushlen = 0;
+  connection_edge_reached_eof(edgeconn);
+
+  ENTRY_TO_CONN(entryconn4)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn4)->outbuf_flushlen = 0;
+  PACK_CELL(edgeconn->stream_id, RELAY_COMMAND_RESOLVED,
+            "\x04\x04\x12\x00\x00\x01\x00\x00\x02\x00");
+  if (circ->base_.purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING)
+    pathbias_count_valid_cells(circ, &cell);
+  else
+    connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
+                                     circ->cpath);
+  ASSERT_COUNTED_BW();
+
+  ENTRY_TO_CONN(entryconn4)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn4)->outbuf_flushlen = 0;
+  PACK_CELL(edgeconn->stream_id, RELAY_COMMAND_RESOLVED,
+            "\x04\x04\x12\x00\x00\x01\x00\x00\x02\x00");
+  connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
+                                     circ->cpath);
+  ASSERT_UNCOUNTED_BW();
+
+  /* Data not counted after resolved */
+  ENTRY_TO_CONN(entryconn4)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn4)->outbuf_flushlen = 0;
+  PACK_CELL(edgeconn->stream_id, RELAY_COMMAND_DATA, "Data1234");
+  if (circ->base_.purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING)
+    pathbias_count_valid_cells(circ, &cell);
+  else
+    connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
+                                     circ->cpath);
+  ASSERT_UNCOUNTED_BW();
+
+  /* End not counted after resolved */
+  ENTRY_TO_CONN(entryconn4)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn4)->outbuf_flushlen = 0;
+  PACK_CELL(edgeconn->stream_id, RELAY_COMMAND_END, "Data1234");
+  if (circ->base_.purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING)
+    pathbias_count_valid_cells(circ, &cell);
+  else
+    connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
+                                     circ->cpath);
+  ASSERT_UNCOUNTED_BW();
+
+  connection_free_minimal(ENTRY_TO_CONN(entryconn2));
+  connection_free_minimal(ENTRY_TO_CONN(entryconn3));
+  connection_free_minimal(ENTRY_TO_CONN(entryconn4));
+  return 1;
+ done:
+  connection_free_minimal(ENTRY_TO_CONN(entryconn2));
+  connection_free_minimal(ENTRY_TO_CONN(entryconn3));
+  connection_free_minimal(ENTRY_TO_CONN(entryconn4));
+  return 0;
+}
+
+static int
+halfstream_insert(origin_circuit_t *circ, edge_connection_t *edgeconn,
+                  streamid_t *streams, int num, int random)
+{
+  int inserted = 0;
+
+  /* Insert num random elements */
+  while (inserted < num) {
+    streamid_t id;
+
+    if (random)
+      id = (streamid_t)crypto_rand_int(65535)+1;
+    else
+      id = get_unique_stream_id_by_circ(circ);
+
+    edgeconn->stream_id = id;
+
+    /* Ensure it isn't there */
+    if (connection_half_edge_find_stream_id(circ->half_streams, id)) {
+      continue;
+    }
+
+    connection_half_edge_add(edgeconn, circ);
+    if (streams)
+      streams[inserted] = id;
+    inserted++;
+  }
+
+  return inserted;
+}
+
+static void
+subtest_halfstream_insertremove(int num)
+{
+  origin_circuit_t *circ =
+      helper_create_origin_circuit(CIRCUIT_PURPOSE_C_GENERAL, 0);
+  edge_connection_t *edgeconn;
+  entry_connection_t *entryconn;
+  streamid_t *streams = tor_malloc_zero(num*sizeof(streamid_t));
+  int i = 0;
+
+  circ->cpath->state = CPATH_STATE_AWAITING_KEYS;
+  circ->cpath->deliver_window = CIRCWINDOW_START;
+
+  entryconn = fake_entry_conn(circ, 23);
+  edgeconn = ENTRY_TO_EDGE_CONN(entryconn);
+
+  /* Explicity test all operations on an absent stream list */
+  tt_int_op(connection_half_edge_is_valid_data(circ->half_streams,
+            23), OP_EQ, 0);
+  tt_int_op(connection_half_edge_is_valid_connected(circ->half_streams,
+            23), OP_EQ, 0);
+  tt_int_op(connection_half_edge_is_valid_sendme(circ->half_streams,
+            23), OP_EQ, 0);
+  tt_int_op(connection_half_edge_is_valid_resolved(circ->half_streams,
+            23), OP_EQ, 0);
+  tt_int_op(connection_half_edge_is_valid_end(circ->half_streams,
+            23), OP_EQ, 0);
+
+  /* Insert a duplicate element; verify that other elements absent;
+   * ensure removing it once works */
+  edgeconn->stream_id = 23;
+  connection_half_edge_add(edgeconn, circ);
+  connection_half_edge_add(edgeconn, circ);
+  connection_half_edge_add(edgeconn, circ);
+
+  /* Verify that other elements absent */
+  tt_int_op(connection_half_edge_is_valid_data(circ->half_streams,
+            22), OP_EQ, 0);
+  tt_int_op(connection_half_edge_is_valid_connected(circ->half_streams,
+            22), OP_EQ, 0);
+  tt_int_op(connection_half_edge_is_valid_sendme(circ->half_streams,
+            22), OP_EQ, 0);
+  tt_int_op(connection_half_edge_is_valid_resolved(circ->half_streams,
+            22), OP_EQ, 0);
+  tt_int_op(connection_half_edge_is_valid_end(circ->half_streams,
+            22), OP_EQ, 0);
+
+  tt_int_op(connection_half_edge_is_valid_data(circ->half_streams,
+            24), OP_EQ, 0);
+  tt_int_op(connection_half_edge_is_valid_connected(circ->half_streams,
+            24), OP_EQ, 0);
+  tt_int_op(connection_half_edge_is_valid_sendme(circ->half_streams,
+            24), OP_EQ, 0);
+  tt_int_op(connection_half_edge_is_valid_resolved(circ->half_streams,
+            24), OP_EQ, 0);
+  tt_int_op(connection_half_edge_is_valid_end(circ->half_streams,
+            24), OP_EQ, 0);
+
+  /* Verify we only remove it once */
+  tt_int_op(connection_half_edge_is_valid_end(circ->half_streams,
+            23), OP_EQ, 1);
+  tt_int_op(connection_half_edge_is_valid_end(circ->half_streams,
+            23), OP_EQ, 0);
+
+  halfstream_insert(circ, edgeconn, streams, num, 1);
+
+  /* Remove half of them */
+  for (i = 0; i < num/2; i++) {
+    tt_int_op(connection_half_edge_is_valid_end(circ->half_streams,
+                                                streams[i]),
+              OP_EQ, 1);
+  }
+
+  /* Verify first half of list is gone */
+  for (i = 0; i < num/2; i++) {
+    tt_ptr_op(connection_half_edge_find_stream_id(circ->half_streams,
+                                                  streams[i]),
+              OP_EQ, NULL);
+  }
+
+  /* Verify second half of list is present */
+  for (; i < num; i++) {
+    tt_ptr_op(connection_half_edge_find_stream_id(circ->half_streams,
+                                                  streams[i]),
+              OP_NE, NULL);
+  }
+
+  /* Remove other half. Verify list is empty. */
+  for (i = num/2; i < num; i++) {
+    tt_int_op(connection_half_edge_is_valid_end(circ->half_streams,
+                                                streams[i]),
+              OP_EQ, 1);
+  }
+  tt_int_op(smartlist_len(circ->half_streams), OP_EQ, 0);
+
+  /* Explicity test all operations on an empty stream list */
+  tt_int_op(connection_half_edge_is_valid_data(circ->half_streams,
+            23), OP_EQ, 0);
+  tt_int_op(connection_half_edge_is_valid_connected(circ->half_streams,
+            23), OP_EQ, 0);
+  tt_int_op(connection_half_edge_is_valid_sendme(circ->half_streams,
+            23), OP_EQ, 0);
+  tt_int_op(connection_half_edge_is_valid_resolved(circ->half_streams,
+            23), OP_EQ, 0);
+  tt_int_op(connection_half_edge_is_valid_end(circ->half_streams,
+            23), OP_EQ, 0);
+
+  /* For valgrind, leave some around then free the circ */
+  halfstream_insert(circ, edgeconn, NULL, 10, 0);
+
+ done:
+  tor_free(streams);
+  circuit_free_(TO_CIRCUIT(circ));
+  connection_free_minimal(ENTRY_TO_CONN(entryconn));
+}
+
+static void
+test_halfstream_insertremove(void *arg)
+{
+  (void)arg;
+
+  /* Suppress the WARN message we generate in this test */
+  setup_full_capture_of_logs(LOG_WARN);
+
+  /* Test insertion and removal with a few different sizes */
+  subtest_halfstream_insertremove(10);
+  subtest_halfstream_insertremove(100);
+  subtest_halfstream_insertremove(1000);
+}
+
+static void
+test_halfstream_wrap(void *arg)
+{
+  origin_circuit_t *circ =
+      helper_create_origin_circuit(CIRCUIT_PURPOSE_C_GENERAL, 0);
+  edge_connection_t *edgeconn;
+  entry_connection_t *entryconn;
+
+  circ->cpath->state = CPATH_STATE_AWAITING_KEYS;
+  circ->cpath->deliver_window = CIRCWINDOW_START;
+
+  entryconn = fake_entry_conn(circ, 23);
+  edgeconn = ENTRY_TO_EDGE_CONN(entryconn);
+
+  (void)arg;
+
+  /* Suppress the WARN message we generate in this test */
+  setup_full_capture_of_logs(LOG_WARN);
+  MOCK(connection_mark_for_close_internal_, mock_mark_for_close);
+
+  /* Verify that get_unique_stream_id_by_circ() can wrap uint16_t */
+  circ->next_stream_id = 65530;
+  halfstream_insert(circ, edgeconn, NULL, 7, 0);
+  tt_int_op(circ->next_stream_id, OP_EQ, 2);
+  tt_int_op(smartlist_len(circ->half_streams), OP_EQ, 7);
+
+  /* Insert full-1 */
+  halfstream_insert(circ, edgeconn, NULL,
+                    65534-smartlist_len(circ->half_streams), 0);
+  tt_int_op(smartlist_len(circ->half_streams), OP_EQ, 65534);
+
+  /* Verify that we can get_unique_stream_id_by_circ() successfully */
+  edgeconn->stream_id = get_unique_stream_id_by_circ(circ);
+  tt_int_op(edgeconn->stream_id, OP_NE, 0); /* 0 is failure */
+
+  /* Insert an opened stream on the circ with that id */
+  ENTRY_TO_CONN(entryconn)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn)->outbuf_flushlen = 0;
+  edgeconn->base_.state = AP_CONN_STATE_CONNECT_WAIT;
+  circ->p_streams = edgeconn;
+
+  /* Verify that get_unique_stream_id_by_circ() fails */
+  tt_int_op(get_unique_stream_id_by_circ(circ), OP_EQ, 0); /* 0 is failure */
+
+  /* eof the one opened stream. Verify it is now in half-closed */
+  tt_int_op(smartlist_len(circ->half_streams), OP_EQ, 65534);
+  connection_edge_reached_eof(edgeconn);
+  tt_int_op(smartlist_len(circ->half_streams), OP_EQ, 65535);
+
+  /* Verify get_unique_stream_id_by_circ() fails due to full half-closed */
+  circ->p_streams = NULL;
+  tt_int_op(get_unique_stream_id_by_circ(circ), OP_EQ, 0); /* 0 is failure */
+
+ done:
+  circuit_free_(TO_CIRCUIT(circ));
+  connection_free_minimal(ENTRY_TO_CONN(entryconn));
+  UNMOCK(connection_mark_for_close_internal_);
+}
+
+static void
+test_circbw_relay(void *arg)
+{
+  cell_t cell;
+  relay_header_t rh;
+  tor_addr_t addr;
+  edge_connection_t *edgeconn;
+  entry_connection_t *entryconn1=NULL;
+  origin_circuit_t *circ;
+  int delivered = 0;
+  int overhead = 0;
+
+  (void)arg;
+
   MOCK(connection_mark_unattached_ap_, mock_connection_mark_unattached_ap_);
   MOCK(connection_start_reading, mock_start_reading);
   MOCK(connection_mark_for_close_internal_, mock_mark_for_close);
+  MOCK(relay_send_command_from_edge_, mock_send_command);
+  MOCK(circuit_mark_for_close_, mock_mark_circ_for_close);
 
-  entryconn = entry_connection_new(CONN_TYPE_AP, AF_INET);
-  edgeconn = ENTRY_TO_EDGE_CONN(entryconn);
-  edgeconn->base_.state = AP_CONN_STATE_CONNECT_WAIT;
-  edgeconn->deliver_window = 1000;
   circ = helper_create_origin_circuit(CIRCUIT_PURPOSE_C_GENERAL, 0);
-  edgeconn->cpath_layer = circ->cpath;
   circ->cpath->state = CPATH_STATE_AWAITING_KEYS;
-  circ->cpath->deliver_window = 1000;
+  circ->cpath->deliver_window = CIRCWINDOW_START;
+
+  entryconn1 = fake_entry_conn(circ, 1);
+  edgeconn = ENTRY_TO_EDGE_CONN(entryconn1);
 
   /* Stream id 0: Not counted */
   PACK_CELL(0, RELAY_COMMAND_END, "Data1234");
@@ -197,7 +744,7 @@ test_circbw_relay(void *arg)
 
   /* Properly formatted resolved cell in correct state: counted */
   edgeconn->base_.state = AP_CONN_STATE_RESOLVE_WAIT;
-  entryconn->socks_request->command = SOCKS_COMMAND_RESOLVE;
+  entryconn1->socks_request->command = SOCKS_COMMAND_RESOLVE;
   edgeconn->on_circuit = TO_CIRCUIT(circ);
   PACK_CELL(1, RELAY_COMMAND_RESOLVED,
             "\x04\x04\x12\x00\x00\x01\x00\x00\x02\x00");
@@ -206,7 +753,7 @@ test_circbw_relay(void *arg)
   ASSERT_COUNTED_BW();
 
   edgeconn->base_.state = AP_CONN_STATE_OPEN;
-  entryconn->socks_request->has_finished = 1;
+  entryconn1->socks_request->has_finished = 1;
 
   /* Connected cell after open: not counted */
   PACK_CELL(1, RELAY_COMMAND_CONNECTED, "Data1234");
@@ -227,42 +774,43 @@ test_circbw_relay(void *arg)
   ASSERT_UNCOUNTED_BW();
 
   /* Data cell on stream 0: not counted */
-  PACK_CELL(1, RELAY_COMMAND_DATA, "Data1234");
+  PACK_CELL(0, RELAY_COMMAND_DATA, "Data1234");
   connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), edgeconn,
                                      circ->cpath);
   ASSERT_UNCOUNTED_BW();
 
   /* Data cell on open connection: counted */
-  ENTRY_TO_CONN(entryconn)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn1)->marked_for_close = 0;
   PACK_CELL(1, RELAY_COMMAND_DATA, "Data1234");
   connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), edgeconn,
                                      circ->cpath);
   ASSERT_COUNTED_BW();
 
   /* Empty Data cell on open connection: not counted */
-  ENTRY_TO_CONN(entryconn)->marked_for_close = 0;
+  ENTRY_TO_CONN(entryconn1)->marked_for_close = 0;
   PACK_CELL(1, RELAY_COMMAND_DATA, "");
   connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), edgeconn,
                                      circ->cpath);
   ASSERT_UNCOUNTED_BW();
 
   /* Sendme on valid stream: counted */
-  ENTRY_TO_CONN(entryconn)->outbuf_flushlen = 0;
+  edgeconn->package_window -= STREAMWINDOW_INCREMENT;
+  ENTRY_TO_CONN(entryconn1)->outbuf_flushlen = 0;
   PACK_CELL(1, RELAY_COMMAND_SENDME, "Data1234");
   connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), edgeconn,
                                      circ->cpath);
   ASSERT_COUNTED_BW();
 
   /* Sendme on valid stream with full window: not counted */
-  ENTRY_TO_CONN(entryconn)->outbuf_flushlen = 0;
+  ENTRY_TO_CONN(entryconn1)->outbuf_flushlen = 0;
   PACK_CELL(1, RELAY_COMMAND_SENDME, "Data1234");
-  edgeconn->package_window = 500;
+  edgeconn->package_window = STREAMWINDOW_START;
   connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), edgeconn,
                                      circ->cpath);
   ASSERT_UNCOUNTED_BW();
 
   /* Sendme on unknown stream: not counted */
-  ENTRY_TO_CONN(entryconn)->outbuf_flushlen = 0;
+  ENTRY_TO_CONN(entryconn1)->outbuf_flushlen = 0;
   PACK_CELL(1, RELAY_COMMAND_SENDME, "Data1234");
   connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
                                      circ->cpath);
@@ -281,18 +829,6 @@ test_circbw_relay(void *arg)
                                      circ->cpath);
   ASSERT_COUNTED_BW();
 
-  /* End cell on non-closed connection: counted */
-  PACK_CELL(1, RELAY_COMMAND_END, "Data1234");
-  connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), edgeconn,
-                                     circ->cpath);
-  ASSERT_COUNTED_BW();
-
-  /* End cell on connection that already got one: not counted */
-  PACK_CELL(1, RELAY_COMMAND_END, "Data1234");
-  connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
-                                     circ->cpath);
-  ASSERT_UNCOUNTED_BW();
-
   /* Invalid extended cell: not counted */
   PACK_CELL(1, RELAY_COMMAND_EXTENDED2, "Data1234");
   connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
@@ -318,12 +854,40 @@ test_circbw_relay(void *arg)
                                      circ->cpath);
   ASSERT_COUNTED_BW();
 
+  /* End cell on non-closed connection: counted */
+  PACK_CELL(1, RELAY_COMMAND_END, "Data1234");
+  connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), edgeconn,
+                                     circ->cpath);
+  ASSERT_COUNTED_BW();
+
+  /* End cell on connection that already got one: not counted */
+  PACK_CELL(1, RELAY_COMMAND_END, "Data1234");
+  connection_edge_process_relay_cell(&cell, TO_CIRCUIT(circ), NULL,
+                                     circ->cpath);
+  ASSERT_UNCOUNTED_BW();
+
+  /* Simulate closed stream on entryconn, then test: */
+  if (!subtest_circbw_halfclosed(circ, 2))
+    goto done;
+
+  circ->base_.purpose = CIRCUIT_PURPOSE_PATH_BIAS_TESTING;
+  if (!subtest_circbw_halfclosed(circ, 6))
+    goto done;
+
+  /* Path bias: truncated */
+  tt_int_op(circ->base_.marked_for_close, OP_EQ, 0);
+  PACK_CELL(0, RELAY_COMMAND_TRUNCATED, "Data1234");
+  pathbias_count_valid_cells(circ, &cell);
+  tt_int_op(circ->base_.marked_for_close, OP_EQ, 1);
+
  done:
   UNMOCK(connection_start_reading);
   UNMOCK(connection_mark_unattached_ap_);
   UNMOCK(connection_mark_for_close_internal_);
+  UNMOCK(relay_send_command_from_edge_);
+  UNMOCK(circuit_mark_for_close_);
   circuit_free_(TO_CIRCUIT(circ));
-  connection_free_minimal(ENTRY_TO_CONN(entryconn));
+  connection_free_minimal(ENTRY_TO_CONN(entryconn1));
 }
 
 /* Tests for connection_edge_process_resolved_cell().
@@ -511,6 +1075,8 @@ test_relaycell_resolved(void *arg)
 struct testcase_t relaycell_tests[] = {
   { "resolved", test_relaycell_resolved, TT_FORK, NULL, NULL },
   { "circbw", test_circbw_relay, TT_FORK, NULL, NULL },
+  { "halfstream", test_halfstream_insertremove, TT_FORK, NULL, NULL },
+  { "streamwrap", test_halfstream_wrap, TT_FORK, NULL, NULL },
   END_OF_TESTCASES
 };