Explorar o código

Merge branch 'bug10169_025_v2'

Conflicts:
	src/test/test.c
Nick Mathewson %!s(int64=10) %!d(string=hai) anos
pai
achega
ab225aaf28

+ 4 - 0
changes/bug10169

@@ -0,0 +1,4 @@
+  o Major features:
+    - Also consider stream buffer sizes when calculating OOM
+      conditions. Rename MaxMemInCellQueues to MaxMemInQueues. Fixes
+      bug 10169.

+ 3 - 0
changes/bug9686

@@ -0,0 +1,3 @@
+  o Minor changes:
+    - Decrease the lower limit of MaxMemInQueues to 256 MBytes, to
+      appease raspberry pi users. Fixes bug 9686.

+ 5 - 5
doc/tor.1.txt

@@ -1727,13 +1727,13 @@ is non-zero):
     localhost, RFC1918 addresses, and so on. This can create security issues;
     localhost, RFC1918 addresses, and so on. This can create security issues;
     you should probably leave it off. (Default: 0)
     you should probably leave it off. (Default: 0)
 
 
-[[MaxMemInCellQueues]] **MaxMemInCellQueues**  __N__ **bytes**|**KB**|**MB**|**GB**::
+[[MaxMemInQueues]] **MaxMemInQueues**  __N__ **bytes**|**KB**|**MB**|**GB**::
     This option configures a threshold above which Tor will assume that it
     This option configures a threshold above which Tor will assume that it
-    needs to stop queueing cells because it's about to run out of memory.
-    If it hits this threshold, it will begin killing circuits until it
-    has recovered at least 10% of this memory.  Do not set this option too
+    needs to stop queueing or buffering data because it's about to run out of
+    memory.  If it hits this threshold, it will begin killing circuits until
+    it has recovered at least 10% of this memory.  Do not set this option too
     low, or your relay may be unreliable under load.  This option only
     low, or your relay may be unreliable under load.  This option only
-    affects circuit queues, so the actual process size will be larger than
+    affects some queues, so the actual process size will be larger than
     this. (Default: 8GB)
     this. (Default: 8GB)
 
 
 DIRECTORY SERVER OPTIONS
 DIRECTORY SERVER OPTIONS

+ 43 - 1
src/common/compat_libevent.c

@@ -626,7 +626,9 @@ tor_add_bufferevent_to_rate_limit_group(struct bufferevent *bev,
 }
 }
 #endif
 #endif
 
 
-#if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= V(2,1,1)
+
+#if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= V(2,1,1) \
+  && !defined(TOR_UNIT_TESTS)
 void
 void
 tor_gettimeofday_cached(struct timeval *tv)
 tor_gettimeofday_cached(struct timeval *tv)
 {
 {
@@ -659,5 +661,45 @@ tor_gettimeofday_cache_clear(void)
 {
 {
   cached_time_hires.tv_sec = 0;
   cached_time_hires.tv_sec = 0;
 }
 }
+
+#ifdef TOR_UNIT_TESTS
+/** For testing: force-update the cached time to a given value. */
+void
+tor_gettimeofday_cache_set(const struct timeval *tv)
+{
+  tor_assert(tv);
+  memcpy(&cached_time_hires, tv, sizeof(*tv));
+}
+#endif
 #endif
 #endif
 
 
+/**
+ * As tor_gettimeofday_cached, but can never move backwards in time.
+ *
+ * The returned value may diverge from wall-clock time, since wall-clock time
+ * can trivially be adjusted backwards, and this can't.  Don't mix wall-clock
+ * time with these values in the same calculation.
+ *
+ * Depending on implementation, this function may or may not "smooth out" huge
+ * jumps forward in wall-clock time.  It may or may not keep its results
+ * advancing forward (as opposed to stalling) if the wall-clock time goes
+ * backwards.  The current implementation does neither of of these.
+ *
+ * This function is not thread-safe; do not call it outside the main thread.
+ *
+ * In future versions of Tor, this may return a time does not have its
+ * origin at the Unix epoch.
+ */
+void
+tor_gettimeofday_cached_monotonic(struct timeval *tv)
+{
+  struct timeval last_tv = { 0, 0 };
+
+  tor_gettimeofday_cached(tv);
+  if (timercmp(tv, &last_tv, <)) {
+    memcpy(tv, &last_tv, sizeof(struct timeval));
+  } else {
+    memcpy(&last_tv, tv, sizeof(struct timeval));
+  }
+}
+

+ 4 - 0
src/common/compat_libevent.h

@@ -91,6 +91,10 @@ int tor_add_bufferevent_to_rate_limit_group(struct bufferevent *bev,
 
 
 void tor_gettimeofday_cached(struct timeval *tv);
 void tor_gettimeofday_cached(struct timeval *tv);
 void tor_gettimeofday_cache_clear(void);
 void tor_gettimeofday_cache_clear(void);
+#ifdef TOR_UNIT_TESTS
+void tor_gettimeofday_cache_set(const struct timeval *tv);
+#endif
+void tor_gettimeofday_cached_monotonic(struct timeval *tv);
 
 
 #endif
 #endif
 
 

+ 103 - 6
src/or/buffers.c

@@ -62,6 +62,8 @@ static int parse_socks_client(const uint8_t *data, size_t datalen,
                               int state, char **reason,
                               int state, char **reason,
                               ssize_t *drain_out);
                               ssize_t *drain_out);
 
 
+#define DEBUG_CHUNK_ALLOC
+
 /* Chunk manipulation functions */
 /* Chunk manipulation functions */
 
 
 /** A single chunk on a buffer or in a freelist. */
 /** A single chunk on a buffer or in a freelist. */
@@ -69,7 +71,12 @@ typedef struct chunk_t {
   struct chunk_t *next; /**< The next chunk on the buffer or freelist. */
   struct chunk_t *next; /**< The next chunk on the buffer or freelist. */
   size_t datalen; /**< The number of bytes stored in this chunk */
   size_t datalen; /**< The number of bytes stored in this chunk */
   size_t memlen; /**< The number of usable bytes of storage in <b>mem</b>. */
   size_t memlen; /**< The number of usable bytes of storage in <b>mem</b>. */
+#ifdef DEBUG_CHUNK_ALLOC
+  size_t DBG_alloc;
+#endif
   char *data; /**< A pointer to the first byte of data stored in <b>mem</b>. */
   char *data; /**< A pointer to the first byte of data stored in <b>mem</b>. */
+  uint32_t inserted_time; /**< Timestamp in truncated ms since epoch
+                           * when this chunk was inserted. */
   char mem[FLEXIBLE_ARRAY_MEMBER]; /**< The actual memory used for storage in
   char mem[FLEXIBLE_ARRAY_MEMBER]; /**< The actual memory used for storage in
                 * this chunk. */
                 * this chunk. */
 } chunk_t;
 } chunk_t;
@@ -141,6 +148,9 @@ static chunk_freelist_t freelists[] = {
  * could help with? */
  * could help with? */
 static uint64_t n_freelist_miss = 0;
 static uint64_t n_freelist_miss = 0;
 
 
+/** DOCDOC */
+static size_t total_bytes_allocated_in_chunks = 0;
+
 static void assert_freelist_ok(chunk_freelist_t *fl);
 static void assert_freelist_ok(chunk_freelist_t *fl);
 
 
 /** Return the freelist to hold chunks of size <b>alloc</b>, or NULL if
 /** Return the freelist to hold chunks of size <b>alloc</b>, or NULL if
@@ -174,6 +184,11 @@ chunk_free_unchecked(chunk_t *chunk)
   } else {
   } else {
     if (freelist)
     if (freelist)
       ++freelist->n_free;
       ++freelist->n_free;
+#ifdef DEBUG_CHUNK_ALLOC
+    tor_assert(alloc  == chunk->DBG_alloc);
+#endif
+    tor_assert(total_bytes_allocated_in_chunks >= alloc);
+    total_bytes_allocated_in_chunks -= alloc;
     tor_free(chunk);
     tor_free(chunk);
   }
   }
 }
 }
@@ -200,6 +215,10 @@ chunk_new_with_alloc_size(size_t alloc)
     else
     else
       ++n_freelist_miss;
       ++n_freelist_miss;
     ch = tor_malloc(alloc);
     ch = tor_malloc(alloc);
+#ifdef DEBUG_CHUNK_ALLOC
+    ch->DBG_alloc = alloc;
+#endif
+    total_bytes_allocated_in_chunks += alloc;
   }
   }
   ch->next = NULL;
   ch->next = NULL;
   ch->datalen = 0;
   ch->datalen = 0;
@@ -211,6 +230,13 @@ chunk_new_with_alloc_size(size_t alloc)
 static void
 static void
 chunk_free_unchecked(chunk_t *chunk)
 chunk_free_unchecked(chunk_t *chunk)
 {
 {
+  if (!chunk)
+    return;
+#ifdef DEBUG_CHUNK_ALLOC
+  tor_assert(CHUNK_ALLOC_SIZE(chunk->memlen) == chunk->DBG_alloc);
+#endif
+  tor_assert(total_bytes_allocated_in_chunks >= CHUNK_ALLOC_SIZE(chunk->memlen));
+  total_bytes_allocated_in_chunks -= CHUNK_ALLOC_SIZE(chunk->memlen);
   tor_free(chunk);
   tor_free(chunk);
 }
 }
 static INLINE chunk_t *
 static INLINE chunk_t *
@@ -220,7 +246,11 @@ chunk_new_with_alloc_size(size_t alloc)
   ch = tor_malloc(alloc);
   ch = tor_malloc(alloc);
   ch->next = NULL;
   ch->next = NULL;
   ch->datalen = 0;
   ch->datalen = 0;
+#ifdef DEBUG_CHUNK_ALLOC
+  ch->DBG_alloc = alloc;
+#endif
   ch->memlen = CHUNK_SIZE_WITH_ALLOC(alloc);
   ch->memlen = CHUNK_SIZE_WITH_ALLOC(alloc);
+  total_bytes_allocated_in_chunks += alloc;
   ch->data = &ch->mem[0];
   ch->data = &ch->mem[0];
   return ch;
   return ch;
 }
 }
@@ -232,11 +262,17 @@ static INLINE chunk_t *
 chunk_grow(chunk_t *chunk, size_t sz)
 chunk_grow(chunk_t *chunk, size_t sz)
 {
 {
   off_t offset;
   off_t offset;
+  size_t memlen_orig = chunk->memlen;
   tor_assert(sz > chunk->memlen);
   tor_assert(sz > chunk->memlen);
   offset = chunk->data - chunk->mem;
   offset = chunk->data - chunk->mem;
   chunk = tor_realloc(chunk, CHUNK_ALLOC_SIZE(sz));
   chunk = tor_realloc(chunk, CHUNK_ALLOC_SIZE(sz));
   chunk->memlen = sz;
   chunk->memlen = sz;
   chunk->data = chunk->mem + offset;
   chunk->data = chunk->mem + offset;
+#ifdef DEBUG_CHUNK_ALLOC
+  tor_assert(chunk->DBG_alloc == CHUNK_ALLOC_SIZE(memlen_orig));
+  chunk->DBG_alloc = CHUNK_ALLOC_SIZE(sz);
+#endif
+  total_bytes_allocated_in_chunks += CHUNK_ALLOC_SIZE(sz) - CHUNK_ALLOC_SIZE(memlen_orig);
   return chunk;
   return chunk;
 }
 }
 
 
@@ -261,12 +297,14 @@ preferred_chunk_size(size_t target)
 }
 }
 
 
 /** Remove from the freelists most chunks that have not been used since the
 /** Remove from the freelists most chunks that have not been used since the
- * last call to buf_shrink_freelists(). */
-void
+ * last call to buf_shrink_freelists().   Return the amount of memory
+ * freed. */
+size_t
 buf_shrink_freelists(int free_all)
 buf_shrink_freelists(int free_all)
 {
 {
 #ifdef ENABLE_BUF_FREELISTS
 #ifdef ENABLE_BUF_FREELISTS
   int i;
   int i;
+  size_t total_freed = 0;
   disable_control_logging();
   disable_control_logging();
   for (i = 0; freelists[i].alloc_size; ++i) {
   for (i = 0; freelists[i].alloc_size; ++i) {
     int slack = freelists[i].slack;
     int slack = freelists[i].slack;
@@ -298,6 +336,12 @@ buf_shrink_freelists(int free_all)
       *chp = NULL;
       *chp = NULL;
       while (chunk) {
       while (chunk) {
         chunk_t *next = chunk->next;
         chunk_t *next = chunk->next;
+#ifdef DEBUG_CHUNK_ALLOC
+        tor_assert(chunk->DBG_alloc == CHUNK_ALLOC_SIZE(chunk->memlen));
+#endif
+        tor_assert(total_bytes_allocated_in_chunks >= CHUNK_ALLOC_SIZE(chunk->memlen));
+        total_bytes_allocated_in_chunks -= CHUNK_ALLOC_SIZE(chunk->memlen);
+        total_freed += CHUNK_ALLOC_SIZE(chunk->memlen);
         tor_free(chunk);
         tor_free(chunk);
         chunk = next;
         chunk = next;
         --n_to_free;
         --n_to_free;
@@ -315,18 +359,21 @@ buf_shrink_freelists(int free_all)
       }
       }
       // tor_assert(!n_to_free);
       // tor_assert(!n_to_free);
       freelists[i].cur_length = new_length;
       freelists[i].cur_length = new_length;
+      tor_assert(orig_n_to_skip == new_length);
       log_info(LD_MM, "Cleaned freelist for %d-byte chunks: original "
       log_info(LD_MM, "Cleaned freelist for %d-byte chunks: original "
-               "length %d, kept %d, dropped %d.",
+               "length %d, kept %d, dropped %d. New length is %d",
                (int)freelists[i].alloc_size, orig_length,
                (int)freelists[i].alloc_size, orig_length,
-               orig_n_to_skip, orig_n_to_free);
+               orig_n_to_skip, orig_n_to_free, new_length);
     }
     }
     freelists[i].lowest_length = freelists[i].cur_length;
     freelists[i].lowest_length = freelists[i].cur_length;
     assert_freelist_ok(&freelists[i]);
     assert_freelist_ok(&freelists[i]);
   }
   }
  done:
  done:
   enable_control_logging();
   enable_control_logging();
+  return total_freed;
 #else
 #else
   (void) free_all;
   (void) free_all;
+  return 0;
 #endif
 #endif
 }
 }
 
 
@@ -376,9 +423,10 @@ struct buf_t {
  *
  *
  * If <b>nulterminate</b> is true, ensure that there is a 0 byte in
  * If <b>nulterminate</b> is true, ensure that there is a 0 byte in
  * buf->head->mem right after all the data. */
  * buf->head->mem right after all the data. */
-static void
+STATIC void
 buf_pullup(buf_t *buf, size_t bytes, int nulterminate)
 buf_pullup(buf_t *buf, size_t bytes, int nulterminate)
 {
 {
+  /* XXXX nothing uses nulterminate; remove it. */
   chunk_t *dest, *src;
   chunk_t *dest, *src;
   size_t capacity;
   size_t capacity;
   if (!buf->head)
   if (!buf->head)
@@ -450,6 +498,20 @@ buf_pullup(buf_t *buf, size_t bytes, int nulterminate)
   check();
   check();
 }
 }
 
 
+#ifdef TOR_UNIT_TESTS
+void
+buf_get_first_chunk_data(const buf_t *buf, const char **cp, size_t *sz)
+{
+  if (!buf || !buf->head) {
+    *cp = NULL;
+    *sz = 0;
+  } else {
+    *cp = buf->head->data;
+    *sz = buf->head->datalen;
+  }
+}
+#endif
+
 /** Resize buf so it won't hold extra memory that we haven't been
 /** Resize buf so it won't hold extra memory that we haven't been
  * using lately.
  * using lately.
  */
  */
@@ -504,6 +566,12 @@ buf_new(void)
   return buf;
   return buf;
 }
 }
 
 
+size_t
+buf_get_default_chunk_size(const buf_t *buf)
+{
+  return buf->default_chunk_size;
+}
+
 /** Remove all data from <b>buf</b>. */
 /** Remove all data from <b>buf</b>. */
 void
 void
 buf_clear(buf_t *buf)
 buf_clear(buf_t *buf)
@@ -531,7 +599,7 @@ buf_allocation(const buf_t *buf)
   size_t total = 0;
   size_t total = 0;
   const chunk_t *chunk;
   const chunk_t *chunk;
   for (chunk = buf->head; chunk; chunk = chunk->next) {
   for (chunk = buf->head; chunk; chunk = chunk->next) {
-    total += chunk->memlen;
+    total += CHUNK_ALLOC_SIZE(chunk->memlen);
   }
   }
   return total;
   return total;
 }
 }
@@ -564,6 +632,10 @@ static chunk_t *
 chunk_copy(const chunk_t *in_chunk)
 chunk_copy(const chunk_t *in_chunk)
 {
 {
   chunk_t *newch = tor_memdup(in_chunk, CHUNK_ALLOC_SIZE(in_chunk->memlen));
   chunk_t *newch = tor_memdup(in_chunk, CHUNK_ALLOC_SIZE(in_chunk->memlen));
+  total_bytes_allocated_in_chunks += CHUNK_ALLOC_SIZE(in_chunk->memlen);
+#ifdef DEBUG_CHUNK_ALLOC
+  newch->DBG_alloc = CHUNK_ALLOC_SIZE(in_chunk->memlen);
+#endif
   newch->next = NULL;
   newch->next = NULL;
   if (in_chunk->data) {
   if (in_chunk->data) {
     off_t offset = in_chunk->data - in_chunk->mem;
     off_t offset = in_chunk->data - in_chunk->mem;
@@ -599,6 +671,7 @@ static chunk_t *
 buf_add_chunk_with_capacity(buf_t *buf, size_t capacity, int capped)
 buf_add_chunk_with_capacity(buf_t *buf, size_t capacity, int capped)
 {
 {
   chunk_t *chunk;
   chunk_t *chunk;
+  struct timeval now;
   if (CHUNK_ALLOC_SIZE(capacity) < buf->default_chunk_size) {
   if (CHUNK_ALLOC_SIZE(capacity) < buf->default_chunk_size) {
     chunk = chunk_new_with_alloc_size(buf->default_chunk_size);
     chunk = chunk_new_with_alloc_size(buf->default_chunk_size);
   } else if (capped && CHUNK_ALLOC_SIZE(capacity) > MAX_CHUNK_ALLOC) {
   } else if (capped && CHUNK_ALLOC_SIZE(capacity) > MAX_CHUNK_ALLOC) {
@@ -606,6 +679,10 @@ buf_add_chunk_with_capacity(buf_t *buf, size_t capacity, int capped)
   } else {
   } else {
     chunk = chunk_new_with_alloc_size(preferred_chunk_size(capacity));
     chunk = chunk_new_with_alloc_size(preferred_chunk_size(capacity));
   }
   }
+
+  tor_gettimeofday_cached_monotonic(&now);
+  chunk->inserted_time = (uint32_t)tv_to_msec(&now);
+
   if (buf->tail) {
   if (buf->tail) {
     tor_assert(buf->head);
     tor_assert(buf->head);
     buf->tail->next = chunk;
     buf->tail->next = chunk;
@@ -618,6 +695,26 @@ buf_add_chunk_with_capacity(buf_t *buf, size_t capacity, int capped)
   return chunk;
   return chunk;
 }
 }
 
 
+/** Return the age of the oldest chunk in the buffer <b>buf</b>, in
+ * milliseconds.  Requires the current time, in truncated milliseconds since
+ * the epoch, as its input <b>now</b>.
+ */
+uint32_t
+buf_get_oldest_chunk_timestamp(const buf_t *buf, uint32_t now)
+{
+  if (buf->head) {
+    return now - buf->head->inserted_time;
+  } else {
+    return 0;
+  }
+}
+
+size_t
+buf_get_total_allocation(void)
+{
+  return total_bytes_allocated_in_chunks;
+}
+
 /** Read up to <b>at_most</b> bytes from the socket <b>fd</b> into
 /** Read up to <b>at_most</b> bytes from the socket <b>fd</b> into
  * <b>chunk</b> (which must be on <b>buf</b>). If we get an EOF, set
  * <b>chunk</b> (which must be on <b>buf</b>). If we get an EOF, set
  * *<b>reached_eof</b> to 1.  Return -1 on error, 0 on eof or blocking,
  * *<b>reached_eof</b> to 1.  Return -1 on error, 0 on eof or blocking,

+ 7 - 1
src/or/buffers.h

@@ -16,17 +16,21 @@
 
 
 buf_t *buf_new(void);
 buf_t *buf_new(void);
 buf_t *buf_new_with_capacity(size_t size);
 buf_t *buf_new_with_capacity(size_t size);
+size_t buf_get_default_chunk_size(const buf_t *buf);
 void buf_free(buf_t *buf);
 void buf_free(buf_t *buf);
 void buf_clear(buf_t *buf);
 void buf_clear(buf_t *buf);
 buf_t *buf_copy(const buf_t *buf);
 buf_t *buf_copy(const buf_t *buf);
 void buf_shrink(buf_t *buf);
 void buf_shrink(buf_t *buf);
-void buf_shrink_freelists(int free_all);
+size_t buf_shrink_freelists(int free_all);
 void buf_dump_freelist_sizes(int severity);
 void buf_dump_freelist_sizes(int severity);
 
 
 size_t buf_datalen(const buf_t *buf);
 size_t buf_datalen(const buf_t *buf);
 size_t buf_allocation(const buf_t *buf);
 size_t buf_allocation(const buf_t *buf);
 size_t buf_slack(const buf_t *buf);
 size_t buf_slack(const buf_t *buf);
 
 
+uint32_t buf_get_oldest_chunk_timestamp(const buf_t *buf, uint32_t now);
+size_t buf_get_total_allocation(void);
+
 int read_to_buf(tor_socket_t s, size_t at_most, buf_t *buf, int *reached_eof,
 int read_to_buf(tor_socket_t s, size_t at_most, buf_t *buf, int *reached_eof,
                 int *socket_error);
                 int *socket_error);
 int read_to_buf_tls(tor_tls_t *tls, size_t at_most, buf_t *buf);
 int read_to_buf_tls(tor_tls_t *tls, size_t at_most, buf_t *buf);
@@ -100,6 +104,8 @@ void assert_buf_ok(buf_t *buf);
 
 
 #ifdef BUFFERS_PRIVATE
 #ifdef BUFFERS_PRIVATE
 STATIC int buf_find_string_offset(const buf_t *buf, const char *s, size_t n);
 STATIC int buf_find_string_offset(const buf_t *buf, const char *s, size_t n);
+STATIC void buf_pullup(buf_t *buf, size_t bytes, int nulterminate);
+void buf_get_first_chunk_data(const buf_t *buf, const char **cp, size_t *sz);
 #endif
 #endif
 
 
 #endif
 #endif

+ 134 - 33
src/or/circuitlist.c

@@ -1435,9 +1435,9 @@ circuit_mark_all_dirty_circs_as_unusable(void)
  *   - If circ->rend_splice is set (we are the midpoint of a joined
  *   - If circ->rend_splice is set (we are the midpoint of a joined
  *     rendezvous stream), then mark the other circuit to close as well.
  *     rendezvous stream), then mark the other circuit to close as well.
  */
  */
-void
-circuit_mark_for_close_(circuit_t *circ, int reason, int line,
-                        const char *file)
+MOCK_IMPL(void,
+circuit_mark_for_close_, (circuit_t *circ, int reason, int line,
+                          const char *file))
 {
 {
   int orig_reason = reason; /* Passed to the controller */
   int orig_reason = reason; /* Passed to the controller */
   assert_circuit_ok(circ);
   assert_circuit_ok(circ);
@@ -1612,6 +1612,38 @@ marked_circuit_free_cells(circuit_t *circ)
     cell_queue_clear(& TO_OR_CIRCUIT(circ)->p_chan_cells);
     cell_queue_clear(& TO_OR_CIRCUIT(circ)->p_chan_cells);
 }
 }
 
 
+/** Aggressively free buffer contents on all the buffers of all streams in the
+ * list starting at <b>stream</b>. Return the number of bytes recovered. */
+static size_t
+marked_circuit_streams_free_bytes(edge_connection_t *stream)
+{
+  size_t result = 0;
+  for ( ; stream; stream = stream->next_stream) {
+    connection_t *conn = TO_CONN(stream);
+    if (conn->inbuf) {
+      result += buf_allocation(conn->inbuf);
+      buf_clear(conn->inbuf);
+    }
+    if (conn->outbuf) {
+      result += buf_allocation(conn->outbuf);
+      buf_clear(conn->outbuf);
+    }
+  }
+  return result;
+}
+
+/** Aggressively free buffer contents on all the buffers of all streams on
+ * circuit <b>c</b>. Return the number of bytes recovered. */
+static size_t
+marked_circuit_free_stream_bytes(circuit_t *c)
+{
+  if (CIRCUIT_IS_ORIGIN(c)) {
+    return marked_circuit_streams_free_bytes(TO_ORIGIN_CIRCUIT(c)->p_streams);
+  } else {
+    return marked_circuit_streams_free_bytes(TO_OR_CIRCUIT(c)->n_streams);
+  }
+}
+
 /** Return the number of cells used by the circuit <b>c</b>'s cell queues. */
 /** Return the number of cells used by the circuit <b>c</b>'s cell queues. */
 STATIC size_t
 STATIC size_t
 n_cells_in_circ_queues(const circuit_t *c)
 n_cells_in_circ_queues(const circuit_t *c)
@@ -1632,7 +1664,7 @@ n_cells_in_circ_queues(const circuit_t *c)
  * This function will return incorrect results if the oldest cell queued on
  * This function will return incorrect results if the oldest cell queued on
  * the circuit is older than 2**32 msec (about 49 days) old.
  * the circuit is older than 2**32 msec (about 49 days) old.
  */
  */
-static uint32_t
+STATIC uint32_t
 circuit_max_queued_cell_age(const circuit_t *c, uint32_t now)
 circuit_max_queued_cell_age(const circuit_t *c, uint32_t now)
 {
 {
   uint32_t age = 0;
   uint32_t age = 0;
@@ -1652,20 +1684,68 @@ circuit_max_queued_cell_age(const circuit_t *c, uint32_t now)
   return age;
   return age;
 }
 }
 
 
-/** Temporary variable for circuits_compare_by_oldest_queued_cell_ This is a
- * kludge to work around the fact that qsort doesn't provide a way for
- * comparison functions to take an extra argument. */
-static uint32_t circcomp_now_tmp;
+/** Return the age in milliseconds of the oldest buffer chunk on any stream in
+ * the linked list <b>stream</b>, where age is taken in milliseconds before
+ * the time <b>now</b> (in truncated milliseconds since the epoch). */
+static uint32_t
+circuit_get_streams_max_data_age(const edge_connection_t *stream, uint32_t now)
+{
+  uint32_t age = 0, age2;
+  for (; stream; stream = stream->next_stream) {
+    const connection_t *conn = TO_CONN(stream);
+    if (conn->outbuf) {
+      age2 = buf_get_oldest_chunk_timestamp(conn->outbuf, now);
+      if (age2 > age)
+        age = age2;
+    }
+    if (conn->inbuf) {
+      age2 = buf_get_oldest_chunk_timestamp(conn->inbuf, now);
+      if (age2 > age)
+        age = age2;
+    }
+  }
+
+  return age;
+}
+
+/** Return the age in milliseconds of the oldest buffer chunk on any stream
+ * attached to the circuit <b>c</b>, where age is taken in milliseconds before
+ * the time <b>now</b> (in truncated milliseconds since the epoch). */
+STATIC uint32_t
+circuit_max_queued_data_age(const circuit_t *c, uint32_t now)
+{
+  if (CIRCUIT_IS_ORIGIN(c)) {
+    return circuit_get_streams_max_data_age(
+                           TO_ORIGIN_CIRCUIT((circuit_t*)c)->p_streams, now);
+  } else {
+    return circuit_get_streams_max_data_age(
+                           TO_OR_CIRCUIT((circuit_t*)c)->n_streams, now);
+  }
+}
 
 
-/** Helper to sort a list of circuit_t by age of oldest cell, in descending
- * order. Requires that circcomp_now_tmp is set correctly. */
+/** Return the age of the oldest cell or stream buffer chunk on the circuit
+ * <b>c</b>, where age is taken in milliseconds before the time <b>now</b> (in
+ * truncated milliseconds since the epoch). */
+STATIC uint32_t
+circuit_max_queued_item_age(const circuit_t *c, uint32_t now)
+{
+  uint32_t cell_age = circuit_max_queued_cell_age(c, now);
+  uint32_t data_age = circuit_max_queued_data_age(c, now);
+  if (cell_age > data_age)
+    return cell_age;
+  else
+    return data_age;
+}
+
+/** Helper to sort a list of circuit_t by age of oldest item, in descending
+ * order. */
 static int
 static int
-circuits_compare_by_oldest_queued_cell_(const void **a_, const void **b_)
+circuits_compare_by_oldest_queued_item_(const void **a_, const void **b_)
 {
 {
   const circuit_t *a = *a_;
   const circuit_t *a = *a_;
   const circuit_t *b = *b_;
   const circuit_t *b = *b_;
-  uint32_t age_a = circuit_max_queued_cell_age(a, circcomp_now_tmp);
-  uint32_t age_b = circuit_max_queued_cell_age(b, circcomp_now_tmp);
+  uint32_t age_a = a->age_tmp;
+  uint32_t age_b = b->age_tmp;
 
 
   if (age_a < age_b)
   if (age_a < age_b)
     return 1;
     return 1;
@@ -1675,67 +1755,88 @@ circuits_compare_by_oldest_queued_cell_(const void **a_, const void **b_)
     return -1;
     return -1;
 }
 }
 
 
-#define FRACTION_OF_CELLS_TO_RETAIN_ON_OOM 0.90
+#define FRACTION_OF_DATA_TO_RETAIN_ON_OOM 0.90
 
 
 /** We're out of memory for cells, having allocated <b>current_allocation</b>
 /** We're out of memory for cells, having allocated <b>current_allocation</b>
  * bytes' worth.  Kill the 'worst' circuits until we're under
  * bytes' worth.  Kill the 'worst' circuits until we're under
- * FRACTION_OF_CIRCS_TO_RETAIN_ON_OOM of our maximum usage. */
+ * FRACTION_OF_DATA_TO_RETAIN_ON_OOM of our maximum usage. */
 void
 void
 circuits_handle_oom(size_t current_allocation)
 circuits_handle_oom(size_t current_allocation)
 {
 {
   /* Let's hope there's enough slack space for this allocation here... */
   /* Let's hope there's enough slack space for this allocation here... */
   smartlist_t *circlist = smartlist_new();
   smartlist_t *circlist = smartlist_new();
   circuit_t *circ;
   circuit_t *circ;
-  size_t n_cells_removed=0, n_cells_to_remove;
+  size_t mem_to_recover;
+  size_t mem_recovered=0;
   int n_circuits_killed=0;
   int n_circuits_killed=0;
   struct timeval now;
   struct timeval now;
+  uint32_t now_ms;
   log_notice(LD_GENERAL, "We're low on memory.  Killing circuits with "
   log_notice(LD_GENERAL, "We're low on memory.  Killing circuits with "
              "over-long queues. (This behavior is controlled by "
              "over-long queues. (This behavior is controlled by "
-             "MaxMemInCellQueues.)");
+             "MaxMemInQueues.)");
+
+  {
+    const size_t recovered = buf_shrink_freelists(1);
+    if (recovered >= current_allocation) {
+      log_warn(LD_BUG, "We somehow recovered more memory from freelists "
+               "than we thought we had allocated");
+      current_allocation = 0;
+    } else {
+      current_allocation -= recovered;
+    }
+  }
 
 
   {
   {
-    size_t mem_target = (size_t)(get_options()->MaxMemInCellQueues *
-                                 FRACTION_OF_CELLS_TO_RETAIN_ON_OOM);
-    size_t mem_to_recover;
+    size_t mem_target = (size_t)(get_options()->MaxMemInQueues *
+                                 FRACTION_OF_DATA_TO_RETAIN_ON_OOM);
     if (current_allocation <= mem_target)
     if (current_allocation <= mem_target)
       return;
       return;
     mem_to_recover = current_allocation - mem_target;
     mem_to_recover = current_allocation - mem_target;
-    n_cells_to_remove = CEIL_DIV(mem_to_recover, packed_cell_mem_cost());
   }
   }
 
 
+  tor_gettimeofday_cached_monotonic(&now);
+  now_ms = (uint32_t)tv_to_msec(&now);
+
   /* This algorithm itself assumes that you've got enough memory slack
   /* This algorithm itself assumes that you've got enough memory slack
    * to actually run it. */
    * to actually run it. */
-  TOR_LIST_FOREACH(circ, &global_circuitlist, head)
+  TOR_LIST_FOREACH(circ, &global_circuitlist, head) {
+    circ->age_tmp = circuit_max_queued_item_age(circ, now_ms);
     smartlist_add(circlist, circ);
     smartlist_add(circlist, circ);
-
-  /* Set circcomp_now_tmp so that the sort can work. */
-  tor_gettimeofday_cached(&now);
-  circcomp_now_tmp = (uint32_t)tv_to_msec(&now);
+  }
 
 
   /* This is O(n log n); there are faster algorithms we could use instead.
   /* This is O(n log n); there are faster algorithms we could use instead.
    * Let's hope this doesn't happen enough to be in the critical path. */
    * Let's hope this doesn't happen enough to be in the critical path. */
-  smartlist_sort(circlist, circuits_compare_by_oldest_queued_cell_);
+  smartlist_sort(circlist, circuits_compare_by_oldest_queued_item_);
 
 
   /* Okay, now the worst circuits are at the front of the list. Let's mark
   /* Okay, now the worst circuits are at the front of the list. Let's mark
    * them, and reclaim their storage aggressively. */
    * them, and reclaim their storage aggressively. */
   SMARTLIST_FOREACH_BEGIN(circlist, circuit_t *, circ) {
   SMARTLIST_FOREACH_BEGIN(circlist, circuit_t *, circ) {
     size_t n = n_cells_in_circ_queues(circ);
     size_t n = n_cells_in_circ_queues(circ);
+    size_t freed;
     if (! circ->marked_for_close) {
     if (! circ->marked_for_close) {
       circuit_mark_for_close(circ, END_CIRC_REASON_RESOURCELIMIT);
       circuit_mark_for_close(circ, END_CIRC_REASON_RESOURCELIMIT);
     }
     }
     marked_circuit_free_cells(circ);
     marked_circuit_free_cells(circ);
+    freed = marked_circuit_free_stream_bytes(circ);
 
 
     ++n_circuits_killed;
     ++n_circuits_killed;
-    n_cells_removed += n;
-    if (n_cells_removed >= n_cells_to_remove)
+
+    mem_recovered += n * packed_cell_mem_cost();
+    mem_recovered += freed;
+
+    if (mem_recovered >= mem_to_recover)
       break;
       break;
   } SMARTLIST_FOREACH_END(circ);
   } SMARTLIST_FOREACH_END(circ);
 
 
   clean_cell_pool(); /* In case this helps. */
   clean_cell_pool(); /* In case this helps. */
-
-  log_notice(LD_GENERAL, "Removed "U64_FORMAT" bytes by killing %d circuits.",
-             U64_PRINTF_ARG(n_cells_removed * packed_cell_mem_cost()),
-             n_circuits_killed);
+  buf_shrink_freelists(1); /* This is necessary to actually release buffer
+                              chunks. */
+
+  log_notice(LD_GENERAL, "Removed "U64_FORMAT" bytes by killing %d circuits; "
+             "%d circuits remain alive.",
+             U64_PRINTF_ARG(mem_recovered),
+             n_circuits_killed,
+             smartlist_len(circlist) - n_circuits_killed);
 
 
   smartlist_free(circlist);
   smartlist_free(circlist);
 }
 }

+ 5 - 2
src/or/circuitlist.h

@@ -53,8 +53,8 @@ origin_circuit_t *circuit_find_to_cannibalize(uint8_t purpose,
                                               extend_info_t *info, int flags);
                                               extend_info_t *info, int flags);
 void circuit_mark_all_unused_circs(void);
 void circuit_mark_all_unused_circs(void);
 void circuit_mark_all_dirty_circs_as_unusable(void);
 void circuit_mark_all_dirty_circs_as_unusable(void);
-void circuit_mark_for_close_(circuit_t *circ, int reason,
-                             int line, const char *file);
+MOCK_DECL(void, circuit_mark_for_close_, (circuit_t *circ, int reason,
+                                          int line, const char *file));
 int circuit_get_cpath_len(origin_circuit_t *circ);
 int circuit_get_cpath_len(origin_circuit_t *circ);
 void circuit_clear_cpath(origin_circuit_t *circ);
 void circuit_clear_cpath(origin_circuit_t *circ);
 crypt_path_t *circuit_get_cpath_hop(origin_circuit_t *circ, int hopnum);
 crypt_path_t *circuit_get_cpath_hop(origin_circuit_t *circ, int hopnum);
@@ -76,6 +76,9 @@ void channel_note_destroy_not_pending(channel_t *chan, circid_t id);
 #ifdef CIRCUITLIST_PRIVATE
 #ifdef CIRCUITLIST_PRIVATE
 STATIC void circuit_free(circuit_t *circ);
 STATIC void circuit_free(circuit_t *circ);
 STATIC size_t n_cells_in_circ_queues(const circuit_t *c);
 STATIC size_t n_cells_in_circ_queues(const circuit_t *c);
+STATIC uint32_t circuit_max_queued_data_age(const circuit_t *c, uint32_t now);
+STATIC uint32_t circuit_max_queued_cell_age(const circuit_t *c, uint32_t now);
+STATIC uint32_t circuit_max_queued_item_age(const circuit_t *c, uint32_t now);
 #endif
 #endif
 
 
 #endif
 #endif

+ 5 - 4
src/or/config.c

@@ -85,6 +85,7 @@ static config_abbrev_t option_abbrevs_[] = {
   { "DirFetchPostPeriod", "StatusFetchPeriod", 0, 0},
   { "DirFetchPostPeriod", "StatusFetchPeriod", 0, 0},
   { "DirServer", "DirAuthority", 0, 0}, /* XXXX024 later, make this warn? */
   { "DirServer", "DirAuthority", 0, 0}, /* XXXX024 later, make this warn? */
   { "MaxConn", "ConnLimit", 0, 1},
   { "MaxConn", "ConnLimit", 0, 1},
+  { "MaxMemInCellQueues", "MaxMemInQueues", 0, 0},
   { "ORBindAddress", "ORListenAddress", 0, 0},
   { "ORBindAddress", "ORListenAddress", 0, 0},
   { "DirBindAddress", "DirListenAddress", 0, 0},
   { "DirBindAddress", "DirListenAddress", 0, 0},
   { "SocksBindAddress", "SocksListenAddress", 0, 0},
   { "SocksBindAddress", "SocksListenAddress", 0, 0},
@@ -306,7 +307,7 @@ static config_var_t option_vars_[] = {
   V(MaxAdvertisedBandwidth,      MEMUNIT,  "1 GB"),
   V(MaxAdvertisedBandwidth,      MEMUNIT,  "1 GB"),
   V(MaxCircuitDirtiness,         INTERVAL, "10 minutes"),
   V(MaxCircuitDirtiness,         INTERVAL, "10 minutes"),
   V(MaxClientCircuitsPending,    UINT,     "32"),
   V(MaxClientCircuitsPending,    UINT,     "32"),
-  V(MaxMemInCellQueues,          MEMUNIT,  "8 GB"),
+  V(MaxMemInQueues,              MEMUNIT,  "8 GB"),
   OBSOLETE("MaxOnionsPending"),
   OBSOLETE("MaxOnionsPending"),
   V(MaxOnionQueueDelay,          MSEC_INTERVAL, "1750 msec"),
   V(MaxOnionQueueDelay,          MSEC_INTERVAL, "1750 msec"),
   V(MinMeasuredBWsForAuthToIgnoreAdvertised, INT, "500"),
   V(MinMeasuredBWsForAuthToIgnoreAdvertised, INT, "500"),
@@ -2758,10 +2759,10 @@ options_validate(or_options_t *old_options, or_options_t *options,
     REJECT("If EntryNodes is set, UseEntryGuards must be enabled.");
     REJECT("If EntryNodes is set, UseEntryGuards must be enabled.");
   }
   }
 
 
-  if (options->MaxMemInCellQueues < (500 << 20)) {
-    log_warn(LD_CONFIG, "MaxMemInCellQueues must be at least 500 MB for now. "
+  if (options->MaxMemInQueues < (256 << 20)) {
+    log_warn(LD_CONFIG, "MaxMemInQueues must be at least 256 MB for now. "
              "Ideally, have it as large as you can afford.");
              "Ideally, have it as large as you can afford.");
-    options->MaxMemInCellQueues = (500 << 20);
+    options->MaxMemInQueues = (256 << 20);
   }
   }
 
 
   options->AllowInvalid_ = 0;
   options->AllowInvalid_ = 0;

+ 5 - 3
src/or/or.h

@@ -2816,6 +2816,9 @@ typedef struct circuit_t {
    * more. */
    * more. */
   int deliver_window;
   int deliver_window;
 
 
+  /** Temporary field used during circuits_handle_oom. */
+  uint32_t age_tmp;
+
   /** For storage while n_chan is pending (state CIRCUIT_STATE_CHAN_WAIT). */
   /** For storage while n_chan is pending (state CIRCUIT_STATE_CHAN_WAIT). */
   struct create_cell_t *n_chan_create_cell;
   struct create_cell_t *n_chan_create_cell;
 
 
@@ -3469,9 +3472,8 @@ typedef struct {
   config_line_t *DirPort_lines;
   config_line_t *DirPort_lines;
   config_line_t *DNSPort_lines; /**< Ports to listen on for DNS requests. */
   config_line_t *DNSPort_lines; /**< Ports to listen on for DNS requests. */
 
 
-  uint64_t MaxMemInCellQueues; /**< If we have more memory than this allocated
-                                * for circuit cell queues, run the OOM handler
-                                */
+  uint64_t MaxMemInQueues; /**< If we have more memory than this allocated
+                            * for queues and buffers, run the OOM handler */
 
 
   /** @name port booleans
   /** @name port booleans
    *
    *

+ 13 - 4
src/or/relay.c

@@ -2153,7 +2153,8 @@ cell_queue_append_packed_copy(circuit_t *circ, cell_queue_t *queue,
   (void)circ;
   (void)circ;
   (void)exitward;
   (void)exitward;
   (void)use_stats;
   (void)use_stats;
-  tor_gettimeofday_cached(&now);
+  tor_gettimeofday_cached_monotonic(&now);
+
   copy->inserted_time = (uint32_t)tv_to_msec(&now);
   copy->inserted_time = (uint32_t)tv_to_msec(&now);
 
 
   cell_queue_append(queue, copy);
   cell_queue_append(queue, copy);
@@ -2201,13 +2202,21 @@ packed_cell_mem_cost(void)
   return sizeof(packed_cell_t) + MP_POOL_ITEM_OVERHEAD;
   return sizeof(packed_cell_t) + MP_POOL_ITEM_OVERHEAD;
 }
 }
 
 
+/** DOCDOC */
+STATIC size_t
+cell_queues_get_total_allocation(void)
+{
+  return total_cells_allocated * packed_cell_mem_cost();
+}
+
 /** Check whether we've got too much space used for cells.  If so,
 /** Check whether we've got too much space used for cells.  If so,
  * call the OOM handler and return 1.  Otherwise, return 0. */
  * call the OOM handler and return 1.  Otherwise, return 0. */
-static int
+STATIC int
 cell_queues_check_size(void)
 cell_queues_check_size(void)
 {
 {
-  size_t alloc = total_cells_allocated * packed_cell_mem_cost();
-  if (alloc >= get_options()->MaxMemInCellQueues) {
+  size_t alloc = cell_queues_get_total_allocation();
+  alloc += buf_get_total_allocation();
+  if (alloc >= get_options()->MaxMemInQueues) {
     circuits_handle_oom(alloc);
     circuits_handle_oom(alloc);
     return 1;
     return 1;
   }
   }

+ 2 - 0
src/or/relay.h

@@ -85,6 +85,8 @@ STATIC int connected_cell_parse(const relay_header_t *rh, const cell_t *cell,
                          tor_addr_t *addr_out, int *ttl_out);
                          tor_addr_t *addr_out, int *ttl_out);
 STATIC packed_cell_t *packed_cell_new(void);
 STATIC packed_cell_t *packed_cell_new(void);
 STATIC packed_cell_t *cell_queue_pop(cell_queue_t *queue);
 STATIC packed_cell_t *cell_queue_pop(cell_queue_t *queue);
+STATIC size_t cell_queues_get_total_allocation(void);
+STATIC int cell_queues_check_size(void);
 #endif
 #endif
 
 
 #endif
 #endif

+ 1 - 0
src/test/include.am

@@ -32,6 +32,7 @@ src_test_test_SOURCES = \
 	src/test/test_introduce.c \
 	src/test/test_introduce.c \
 	src/test/test_logging.c \
 	src/test/test_logging.c \
 	src/test/test_microdesc.c \
 	src/test/test_microdesc.c \
+	src/test/test_oom.c \
 	src/test/test_options.c \
 	src/test/test_options.c \
 	src/test/test_pt.c \
 	src/test/test_pt.c \
 	src/test/test_replay.c \
 	src/test/test_replay.c \

+ 2 - 0
src/test/test.c

@@ -1630,6 +1630,7 @@ extern struct testcase_t backtrace_tests[];
 extern struct testcase_t hs_tests[];
 extern struct testcase_t hs_tests[];
 extern struct testcase_t nodelist_tests[];
 extern struct testcase_t nodelist_tests[];
 extern struct testcase_t routerkeys_tests[];
 extern struct testcase_t routerkeys_tests[];
+extern struct testcase_t oom_tests[];
 
 
 static struct testgroup_t testgroups[] = {
 static struct testgroup_t testgroups[] = {
   { "", test_array },
   { "", test_array },
@@ -1656,6 +1657,7 @@ static struct testgroup_t testgroups[] = {
   { "hs/", hs_tests },
   { "hs/", hs_tests },
   { "nodelist/", nodelist_tests },
   { "nodelist/", nodelist_tests },
   { "routerkeys/", routerkeys_tests },
   { "routerkeys/", routerkeys_tests },
+  { "oom/", oom_tests },
   END_OF_GROUPS
   END_OF_GROUPS
 };
 };
 
 

+ 262 - 3
src/test/test_buffers.c

@@ -193,7 +193,120 @@ test_buffers_basic(void *arg)
     buf_free(buf);
     buf_free(buf);
   if (buf2)
   if (buf2)
     buf_free(buf2);
     buf_free(buf2);
+  buf_shrink_freelists(1);
 }
 }
+
+static void
+test_buffer_pullup(void *arg)
+{
+  buf_t *buf;
+  char *stuff, *tmp;
+  const char *cp;
+  size_t sz;
+  (void)arg;
+  stuff = tor_malloc(16384);
+  tmp = tor_malloc(16384);
+
+  /* Note: this test doesn't check the nulterminate argument to buf_pullup,
+     since nothing actually uses it.  We should remove it some time. */
+
+  buf = buf_new_with_capacity(3000); /* rounds up to next power of 2. */
+
+  tt_assert(buf);
+  tt_int_op(buf_get_default_chunk_size(buf), ==, 4096);
+
+  tt_int_op(buf_get_total_allocation(), ==, 0);
+
+  /* There are a bunch of cases for pullup.  One is the trivial case. Let's
+     mess around with an empty buffer. */
+  buf_pullup(buf, 16, 1);
+  buf_get_first_chunk_data(buf, &cp, &sz);
+  tt_ptr_op(cp, ==, NULL);
+  tt_ptr_op(sz, ==, 0);
+
+  /* Let's make sure nothing got allocated */
+  tt_int_op(buf_get_total_allocation(), ==, 0);
+
+  /* Case 1: everything puts into the first chunk with some moving. */
+
+  /* Let's add some data. */
+  crypto_rand(stuff, 16384);
+  write_to_buf(stuff, 3000, buf);
+  write_to_buf(stuff+3000, 3000, buf);
+  buf_get_first_chunk_data(buf, &cp, &sz);
+  tt_ptr_op(cp, !=, NULL);
+  tt_int_op(sz, <=, 4096);
+
+  /* Make room for 3000 bytes in the first chunk, so that the pullup-move code
+   * can get tested. */
+  tt_int_op(fetch_from_buf(tmp, 3000, buf), ==, 3000);
+  test_memeq(tmp, stuff, 3000);
+  buf_pullup(buf, 2048, 0);
+  assert_buf_ok(buf);
+  buf_get_first_chunk_data(buf, &cp, &sz);
+  tt_ptr_op(cp, !=, NULL);
+  tt_int_op(sz, >=, 2048);
+  test_memeq(cp, stuff+3000, 2048);
+  tt_int_op(3000, ==, buf_datalen(buf));
+  tt_int_op(fetch_from_buf(tmp, 3000, buf), ==, 0);
+  test_memeq(tmp, stuff+3000, 2048);
+
+  buf_free(buf);
+
+  /* Now try the large-chunk case. */
+  buf = buf_new_with_capacity(3000); /* rounds up to next power of 2. */
+  write_to_buf(stuff, 4000, buf);
+  write_to_buf(stuff+4000, 4000, buf);
+  write_to_buf(stuff+8000, 4000, buf);
+  write_to_buf(stuff+12000, 4000, buf);
+  tt_int_op(buf_datalen(buf), ==, 16000);
+  buf_get_first_chunk_data(buf, &cp, &sz);
+  tt_ptr_op(cp, !=, NULL);
+  tt_int_op(sz, <=, 4096);
+
+  buf_pullup(buf, 12500, 0);
+  assert_buf_ok(buf);
+  buf_get_first_chunk_data(buf, &cp, &sz);
+  tt_ptr_op(cp, !=, NULL);
+  tt_int_op(sz, >=, 12500);
+  test_memeq(cp, stuff, 12500);
+  tt_int_op(buf_datalen(buf), ==, 16000);
+
+  fetch_from_buf(tmp, 12400, buf);
+  test_memeq(tmp, stuff, 12400);
+  tt_int_op(buf_datalen(buf), ==, 3600);
+  fetch_from_buf(tmp, 3500, buf);
+  test_memeq(tmp, stuff+12400, 3500);
+  fetch_from_buf(tmp, 100, buf);
+  test_memeq(tmp, stuff+15900, 10);
+
+  buf_free(buf);
+
+  /* Make sure that the pull-up-whole-buffer case works */
+  buf = buf_new_with_capacity(3000); /* rounds up to next power of 2. */
+  write_to_buf(stuff, 4000, buf);
+  write_to_buf(stuff+4000, 4000, buf);
+  fetch_from_buf(tmp, 100, buf); /* dump 100 bytes from first chunk */
+  buf_pullup(buf, 16000, 0); /* Way too much. */
+  assert_buf_ok(buf);
+  buf_get_first_chunk_data(buf, &cp, &sz);
+  tt_ptr_op(cp, !=, NULL);
+  tt_int_op(sz, ==, 7900);
+  test_memeq(cp, stuff+100, 7900);
+
+  buf_free(buf);
+  buf = NULL;
+
+  buf_shrink_freelists(1);
+
+  tt_int_op(buf_get_total_allocation(), ==, 0);
+ done:
+  buf_free(buf);
+  buf_shrink_freelists(1);
+  tor_free(stuff);
+  tor_free(tmp);
+}
+
 static void
 static void
 test_buffer_copy(void *arg)
 test_buffer_copy(void *arg)
 {
 {
@@ -257,6 +370,7 @@ test_buffer_copy(void *arg)
     generic_buffer_free(buf);
     generic_buffer_free(buf);
   if (buf2)
   if (buf2)
     generic_buffer_free(buf2);
     generic_buffer_free(buf2);
+  buf_shrink_freelists(1);
 }
 }
 
 
 static void
 static void
@@ -331,12 +445,157 @@ test_buffer_ext_or_cmd(void *arg)
   ext_or_cmd_free(cmd);
   ext_or_cmd_free(cmd);
   generic_buffer_free(buf);
   generic_buffer_free(buf);
   tor_free(tmp);
   tor_free(tmp);
+  buf_shrink_freelists(1);
+}
+
+static void
+test_buffer_allocation_tracking(void *arg)
+{
+  char *junk = tor_malloc(16384);
+  buf_t *buf1 = NULL, *buf2 = NULL;
+  int i;
+
+  (void)arg;
+
+  crypto_rand(junk, 16384);
+  tt_int_op(buf_get_total_allocation(), ==, 0);
+
+  buf1 = buf_new();
+  tt_assert(buf1);
+  buf2 = buf_new();
+  tt_assert(buf2);
+
+  tt_int_op(buf_allocation(buf1), ==, 0);
+  tt_int_op(buf_get_total_allocation(), ==, 0);
+
+  write_to_buf(junk, 4000, buf1);
+  write_to_buf(junk, 4000, buf1);
+  write_to_buf(junk, 4000, buf1);
+  write_to_buf(junk, 4000, buf1);
+  tt_int_op(buf_allocation(buf1), ==, 16384);
+  fetch_from_buf(junk, 100, buf1);
+  tt_int_op(buf_allocation(buf1), ==, 16384); /* still 4 4k chunks */
+
+  tt_int_op(buf_get_total_allocation(), ==, 16384);
+
+  fetch_from_buf(junk, 4096, buf1); /* drop a 1k chunk... */
+  tt_int_op(buf_allocation(buf1), ==, 3*4096); /* now 3 4k chunks */
+
+  tt_int_op(buf_get_total_allocation(), ==, 16384); /* that chunk went onto
+                                                       the freelist. */
+
+  write_to_buf(junk, 4000, buf2);
+  tt_int_op(buf_allocation(buf2), ==, 4096); /* another 4k chunk. */
+  tt_int_op(buf_get_total_allocation(), ==, 16384); /* that chunk came from
+                                                       the freelist. */
+  write_to_buf(junk, 4000, buf2);
+  tt_int_op(buf_allocation(buf2), ==, 8192); /* another 4k chunk. */
+  tt_int_op(buf_get_total_allocation(), ==, 5*4096); /* that chunk was new. */
+
+
+  /* Make a really huge buffer */
+  for (i = 0; i < 1000; ++i) {
+    write_to_buf(junk, 4000, buf2);
+  }
+  tt_int_op(buf_allocation(buf2), >=, 4008000);
+  tt_int_op(buf_get_total_allocation(), >=, 4008000);
+  buf_free(buf2);
+  buf2 = NULL;
+
+  tt_int_op(buf_get_total_allocation(), <, 4008000);
+  buf_shrink_freelists(1);
+  tt_int_op(buf_get_total_allocation(), ==, buf_allocation(buf1));
+  buf_free(buf1);
+  buf1 = NULL;
+  buf_shrink_freelists(1);
+  tt_int_op(buf_get_total_allocation(), ==, 0);
+
+ done:
+  buf_free(buf1);
+  buf_free(buf2);
+  buf_shrink_freelists(1);
+}
+
+static void
+test_buffer_time_tracking(void *arg)
+{
+  buf_t *buf=NULL, *buf2=NULL;
+  struct timeval tv0;
+  const time_t START = 1389288246;
+  const uint32_t START_MSEC = (uint32_t) ((uint64_t)START * 1000);
+  int i;
+  char tmp[4096];
+  (void)arg;
+
+  crypto_rand(tmp, sizeof(tmp));
+
+  tv0.tv_sec = START;
+  tv0.tv_usec = 0;
+
+  buf = buf_new_with_capacity(3000); /* rounds up to next power of 2. */
+  tt_assert(buf);
+
+  /* Empty buffer means the timestamp is 0. */
+  tt_int_op(0, ==, buf_get_oldest_chunk_timestamp(buf, START_MSEC));
+  tt_int_op(0, ==, buf_get_oldest_chunk_timestamp(buf, START_MSEC+1000));
+
+  tor_gettimeofday_cache_set(&tv0);
+  write_to_buf("ABCDEFG", 7, buf);
+  tt_int_op(1000, ==, buf_get_oldest_chunk_timestamp(buf, START_MSEC+1000));
+
+  buf2 = buf_copy(buf);
+  tt_assert(buf2);
+  tt_int_op(1234, ==, buf_get_oldest_chunk_timestamp(buf2, START_MSEC+1234));
+
+  /* Now add more bytes; enough to overflow the first chunk. */
+  tv0.tv_usec += 123 * 1000;
+  tor_gettimeofday_cache_set(&tv0);
+  for (i = 0; i < 600; ++i)
+    write_to_buf("ABCDEFG", 7, buf);
+  tt_int_op(4207, ==, buf_datalen(buf));
+
+  /* The oldest bytes are still in the front. */
+  tt_int_op(2000, ==, buf_get_oldest_chunk_timestamp(buf, START_MSEC+2000));
+
+  /* Once those bytes are dropped, the chunk is still on the first
+   * timestamp. */
+  fetch_from_buf(tmp, 100, buf);
+  tt_int_op(2000, ==, buf_get_oldest_chunk_timestamp(buf, START_MSEC+2000));
+
+  /* But once we discard the whole first chunk, we get the data in the second
+   * chunk. */
+  fetch_from_buf(tmp, 4000, buf);
+  tt_int_op(107, ==, buf_datalen(buf));
+  tt_int_op(2000, ==, buf_get_oldest_chunk_timestamp(buf, START_MSEC+2123));
+
+  /* This time we'll be grabbing a chunk from the freelist, and making sure
+     its time gets updated */
+  tv0.tv_sec += 5;
+  tv0.tv_usec = 617*1000;
+  tor_gettimeofday_cache_set(&tv0);
+  for (i = 0; i < 600; ++i)
+    write_to_buf("ABCDEFG", 7, buf);
+  tt_int_op(4307, ==, buf_datalen(buf));
+
+  tt_int_op(2000, ==, buf_get_oldest_chunk_timestamp(buf, START_MSEC+2123));
+  fetch_from_buf(tmp, 4000, buf);
+  fetch_from_buf(tmp, 306, buf);
+  tt_int_op(0, ==, buf_get_oldest_chunk_timestamp(buf, START_MSEC+5617));
+  tt_int_op(383, ==, buf_get_oldest_chunk_timestamp(buf, START_MSEC+6000));
+
+ done:
+  buf_free(buf);
+  buf_free(buf2);
 }
 }
 
 
 struct testcase_t buffer_tests[] = {
 struct testcase_t buffer_tests[] = {
-  { "basic", test_buffers_basic, 0, NULL, NULL },
-  { "copy", test_buffer_copy, 0, NULL, NULL },
-  { "ext_or_cmd", test_buffer_ext_or_cmd, 0, NULL, NULL },
+  { "basic", test_buffers_basic, TT_FORK, NULL, NULL },
+  { "copy", test_buffer_copy, TT_FORK, NULL, NULL },
+  { "pullup", test_buffer_pullup, TT_FORK, NULL, NULL },
+  { "ext_or_cmd", test_buffer_ext_or_cmd, TT_FORK, NULL, NULL },
+  { "allocation_tracking", test_buffer_allocation_tracking, TT_FORK,
+    NULL, NULL },
+  { "time_tracking", test_buffer_time_tracking, TT_FORK, NULL, NULL },
   END_OF_TESTCASES
   END_OF_TESTCASES
 };
 };
 
 

+ 348 - 0
src/test/test_oom.c

@@ -0,0 +1,348 @@
+/* Copyright (c) 2014, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/* Unit tests for OOM handling logic */
+
+#define RELAY_PRIVATE
+#define BUFFERS_PRIVATE
+#define CIRCUITLIST_PRIVATE
+#include "or.h"
+#include "buffers.h"
+#include "circuitlist.h"
+#include "compat_libevent.h"
+#include "connection.h"
+#include "config.h"
+#include "mempool.h"
+#include "relay.h"
+#include "test.h"
+
+/* small replacement mock for circuit_mark_for_close_ to avoid doing all
+ * the other bookkeeping that comes with marking circuits. */
+static void
+circuit_mark_for_close_dummy_(circuit_t *circ, int reason, int line,
+                              const char *file)
+{
+  (void) reason;
+  if (circ->marked_for_close) {
+    TT_FAIL(("Circuit already marked for close at %s:%d, but we are marking "
+             "it again at %s:%d",
+             circ->marked_for_close_file, (int)circ->marked_for_close,
+             file, line));
+  }
+
+  circ->marked_for_close = line;
+  circ->marked_for_close_file = file;
+}
+
+static circuit_t *
+dummy_or_circuit_new(int n_p_cells, int n_n_cells)
+{
+  or_circuit_t *circ = or_circuit_new(0, NULL);
+  int i;
+  cell_t cell;
+
+  for (i=0; i < n_p_cells; ++i) {
+    crypto_rand((void*)&cell, sizeof(cell));
+    cell_queue_append_packed_copy(TO_CIRCUIT(circ), &circ->p_chan_cells,
+                                  0, &cell, 1, 0);
+  }
+
+  for (i=0; i < n_n_cells; ++i) {
+    crypto_rand((void*)&cell, sizeof(cell));
+    cell_queue_append_packed_copy(TO_CIRCUIT(circ),
+                                  &TO_CIRCUIT(circ)->n_chan_cells,
+                                  1, &cell, 1, 0);
+  }
+
+  TO_CIRCUIT(circ)->purpose = CIRCUIT_PURPOSE_OR;
+  return TO_CIRCUIT(circ);
+}
+
+static circuit_t *
+dummy_origin_circuit_new(int n_cells)
+{
+  origin_circuit_t *circ = origin_circuit_new();
+  int i;
+  cell_t cell;
+
+  for (i=0; i < n_cells; ++i) {
+    crypto_rand((void*)&cell, sizeof(cell));
+    cell_queue_append_packed_copy(TO_CIRCUIT(circ),
+                                  &TO_CIRCUIT(circ)->n_chan_cells,
+                                  1, &cell, 1, 0);
+  }
+
+  TO_CIRCUIT(circ)->purpose = CIRCUIT_PURPOSE_C_GENERAL;
+  return TO_CIRCUIT(circ);
+}
+
+static void
+add_bytes_to_buf(generic_buffer_t *buf, size_t n_bytes)
+{
+  char b[3000];
+
+  while (n_bytes) {
+    size_t this_add = n_bytes > sizeof(buf) ? sizeof(buf) : n_bytes;
+    crypto_rand(b, sizeof(b));
+    generic_buffer_add(buf, b, this_add);
+    n_bytes -= this_add;
+  }
+}
+
+static edge_connection_t *
+dummy_edge_conn_new(circuit_t *circ,
+                    int type, size_t in_bytes, size_t out_bytes)
+{
+  edge_connection_t *conn;
+
+  if (type == CONN_TYPE_EXIT)
+    conn = edge_connection_new(type, AF_INET);
+  else
+    conn = ENTRY_TO_EDGE_CONN(entry_connection_new(type, AF_INET));
+
+  /* We add these bytes directly to the buffers, to avoid all the
+   * edge connection read/write machinery. */
+  add_bytes_to_buf(TO_CONN(conn)->inbuf, in_bytes);
+  add_bytes_to_buf(TO_CONN(conn)->outbuf, out_bytes);
+
+  conn->on_circuit = circ;
+  if (type == CONN_TYPE_EXIT) {
+    or_circuit_t *oc  = TO_OR_CIRCUIT(circ);
+    conn->next_stream = oc->n_streams;
+    oc->n_streams = conn;
+  } else {
+    origin_circuit_t *oc = TO_ORIGIN_CIRCUIT(circ);
+    conn->next_stream = oc->p_streams;
+    oc->p_streams = conn;
+  }
+
+  return conn;
+}
+
+/** Run unit tests for buffers.c */
+static void
+test_oom_circbuf(void *arg)
+{
+  or_options_t *options = get_options_mutable();
+  circuit_t *c1 = NULL, *c2 = NULL, *c3 = NULL, *c4 = NULL;
+  struct timeval tv = { 1389631048, 0 };
+
+  (void) arg;
+
+  MOCK(circuit_mark_for_close_, circuit_mark_for_close_dummy_);
+  init_cell_pool();
+
+  /* Far too low for real life. */
+  options->MaxMemInQueues = 256*packed_cell_mem_cost();
+  options->CellStatistics = 0;
+
+  tt_int_op(cell_queues_check_size(), ==, 0); /* We don't start out OOM. */
+  tt_int_op(cell_queues_get_total_allocation(), ==, 0);
+  tt_int_op(buf_get_total_allocation(), ==, 0);
+
+  /* Now we're going to fake up some circuits and get them added to the global
+     circuit list. */
+  tv.tv_usec = 0;
+  tor_gettimeofday_cache_set(&tv);
+  c1 = dummy_origin_circuit_new(30);
+  tv.tv_usec = 10*1000;
+  tor_gettimeofday_cache_set(&tv);
+  c2 = dummy_or_circuit_new(20, 20);
+
+  tt_int_op(packed_cell_mem_cost(), ==,
+            sizeof(packed_cell_t) + MP_POOL_ITEM_OVERHEAD);
+  tt_int_op(cell_queues_get_total_allocation(), ==,
+            packed_cell_mem_cost() * 70);
+  tt_int_op(cell_queues_check_size(), ==, 0); /* We are still not OOM */
+
+  tv.tv_usec = 20*1000;
+  tor_gettimeofday_cache_set(&tv);
+  c3 = dummy_or_circuit_new(100, 85);
+  tt_int_op(cell_queues_check_size(), ==, 0); /* We are still not OOM */
+  tt_int_op(cell_queues_get_total_allocation(), ==,
+            packed_cell_mem_cost() * 255);
+
+  tv.tv_usec = 30*1000;
+  tor_gettimeofday_cache_set(&tv);
+  /* Adding this cell will trigger our OOM handler. */
+  c4 = dummy_or_circuit_new(2, 0);
+
+  tt_int_op(cell_queues_get_total_allocation(), ==,
+            packed_cell_mem_cost() * 257);
+
+  tt_int_op(cell_queues_check_size(), ==, 1); /* We are now OOM */
+
+  tt_assert(c1->marked_for_close);
+  tt_assert(! c2->marked_for_close);
+  tt_assert(! c3->marked_for_close);
+  tt_assert(! c4->marked_for_close);
+
+  tt_int_op(cell_queues_get_total_allocation(), ==,
+            packed_cell_mem_cost() * (257 - 30));
+
+  circuit_free(c1);
+  tv.tv_usec = 0;
+  tor_gettimeofday_cache_set(&tv); /* go back in time */
+  c1 = dummy_or_circuit_new(90, 0);
+
+  tv.tv_usec = 40*1000; /* go back to the future */
+  tor_gettimeofday_cache_set(&tv);
+
+  tt_int_op(cell_queues_check_size(), ==, 1); /* We are now OOM */
+
+  tt_assert(c1->marked_for_close);
+  tt_assert(! c2->marked_for_close);
+  tt_assert(! c3->marked_for_close);
+  tt_assert(! c4->marked_for_close);
+
+  tt_int_op(cell_queues_get_total_allocation(), ==,
+            packed_cell_mem_cost() * (257 - 30));
+
+ done:
+  circuit_free(c1);
+  circuit_free(c2);
+  circuit_free(c3);
+  circuit_free(c4);
+
+  UNMOCK(circuit_mark_for_close_);
+}
+
+/** Run unit tests for buffers.c */
+static void
+test_oom_streambuf(void *arg)
+{
+  or_options_t *options = get_options_mutable();
+  circuit_t *c1 = NULL, *c2 = NULL, *c3 = NULL, *c4 = NULL, *c5 = NULL;
+  struct timeval tv = { 1389641159, 0 };
+  uint32_t tvms;
+  int i;
+
+  (void) arg;
+
+  MOCK(circuit_mark_for_close_, circuit_mark_for_close_dummy_);
+  init_cell_pool();
+
+  /* Far too low for real life. */
+  options->MaxMemInQueues = 81*packed_cell_mem_cost() + 4096 * 34;
+  options->CellStatistics = 0;
+
+  tt_int_op(cell_queues_check_size(), ==, 0); /* We don't start out OOM. */
+  tt_int_op(cell_queues_get_total_allocation(), ==, 0);
+  tt_int_op(buf_get_total_allocation(), ==, 0);
+
+  /* Start all circuits with a bit of data queued in cells */
+  tv.tv_usec = 500*1000; /* go halfway into the second. */
+  tor_gettimeofday_cache_set(&tv);
+  c1 = dummy_or_circuit_new(10,10);
+  tv.tv_usec = 510*1000;
+  tor_gettimeofday_cache_set(&tv);
+  c2 = dummy_origin_circuit_new(20);
+  tv.tv_usec = 520*1000;
+  tor_gettimeofday_cache_set(&tv);
+  c3 = dummy_or_circuit_new(20,20);
+  tv.tv_usec = 530*1000;
+  tor_gettimeofday_cache_set(&tv);
+  c4 = dummy_or_circuit_new(0,0);
+  tt_int_op(cell_queues_get_total_allocation(), ==,
+            packed_cell_mem_cost() * 80);
+
+  tv.tv_usec = 600*1000;
+  tor_gettimeofday_cache_set(&tv);
+
+  /* Add some connections to c1...c4. */
+  for (i = 0; i < 4; ++i) {
+    edge_connection_t *ec;
+    /* link it to a circuit */
+    tv.tv_usec += 10*1000;
+    tor_gettimeofday_cache_set(&tv);
+    ec = dummy_edge_conn_new(c1, CONN_TYPE_EXIT, 1000, 1000);
+    tt_assert(ec);
+    tv.tv_usec += 10*1000;
+    tor_gettimeofday_cache_set(&tv);
+    ec = dummy_edge_conn_new(c2, CONN_TYPE_AP, 1000, 1000);
+    tt_assert(ec);
+    tv.tv_usec += 10*1000;
+    tor_gettimeofday_cache_set(&tv);
+    ec = dummy_edge_conn_new(c4, CONN_TYPE_EXIT, 1000, 1000); /* Yes, 4 twice*/
+    tt_assert(ec);
+    tv.tv_usec += 10*1000;
+    tor_gettimeofday_cache_set(&tv);
+    ec = dummy_edge_conn_new(c4, CONN_TYPE_EXIT, 1000, 1000);
+    tt_assert(ec);
+  }
+
+  tv.tv_sec += 1;
+  tv.tv_usec = 0;
+  tvms = (uint32_t) tv_to_msec(&tv);
+
+  tt_int_op(circuit_max_queued_cell_age(c1, tvms), ==, 500);
+  tt_int_op(circuit_max_queued_cell_age(c2, tvms), ==, 490);
+  tt_int_op(circuit_max_queued_cell_age(c3, tvms), ==, 480);
+  tt_int_op(circuit_max_queued_cell_age(c4, tvms), ==, 0);
+
+  tt_int_op(circuit_max_queued_data_age(c1, tvms), ==, 390);
+  tt_int_op(circuit_max_queued_data_age(c2, tvms), ==, 380);
+  tt_int_op(circuit_max_queued_data_age(c3, tvms), ==, 0);
+  tt_int_op(circuit_max_queued_data_age(c4, tvms), ==, 370);
+
+  tt_int_op(circuit_max_queued_item_age(c1, tvms), ==, 500);
+  tt_int_op(circuit_max_queued_item_age(c2, tvms), ==, 490);
+  tt_int_op(circuit_max_queued_item_age(c3, tvms), ==, 480);
+  tt_int_op(circuit_max_queued_item_age(c4, tvms), ==, 370);
+
+  tt_int_op(cell_queues_get_total_allocation(), ==,
+            packed_cell_mem_cost() * 80);
+  tt_int_op(buf_get_total_allocation(), ==, 4096*16*2);
+
+  /* Now give c4 a very old buffer of modest size */
+  {
+    edge_connection_t *ec;
+    tv.tv_sec -= 1;
+    tv.tv_usec = 0;
+    tor_gettimeofday_cache_set(&tv);
+    ec = dummy_edge_conn_new(c4, CONN_TYPE_EXIT, 1000, 1000);
+    tt_assert(ec);
+  }
+  tt_int_op(buf_get_total_allocation(), ==, 4096*17*2);
+  tt_int_op(circuit_max_queued_item_age(c4, tvms), ==, 1000);
+
+  tt_int_op(cell_queues_check_size(), ==, 0);
+
+  /* And run over the limit. */
+  tv.tv_usec = 800*1000;
+  tor_gettimeofday_cache_set(&tv);
+  c5 = dummy_or_circuit_new(0,5);
+
+  tt_int_op(cell_queues_get_total_allocation(), ==,
+            packed_cell_mem_cost() * 85);
+  tt_int_op(buf_get_total_allocation(), ==, 4096*17*2);
+
+  tt_int_op(cell_queues_check_size(), ==, 1); /* We are now OOM */
+
+  /* C4 should have died. */
+  tt_assert(! c1->marked_for_close);
+  tt_assert(! c2->marked_for_close);
+  tt_assert(! c3->marked_for_close);
+  tt_assert(c4->marked_for_close);
+  tt_assert(! c5->marked_for_close);
+
+  tt_int_op(cell_queues_get_total_allocation(), ==,
+            packed_cell_mem_cost() * 85);
+  tt_int_op(buf_get_total_allocation(), ==, 4096*8*2);
+
+ done:
+  circuit_free(c1);
+  circuit_free(c2);
+  circuit_free(c3);
+  circuit_free(c4);
+  circuit_free(c5);
+
+  UNMOCK(circuit_mark_for_close_);
+}
+
+struct testcase_t oom_tests[] = {
+  { "circbuf", test_oom_circbuf, TT_FORK, NULL, NULL },
+  { "streambuf", test_oom_streambuf, TT_FORK, NULL, NULL },
+  END_OF_TESTCASES
+};
+