Browse Source

Merge remote-tracking branch 'github/lazy_bucket_refill'

Nick Mathewson 6 years ago
parent
commit
3527f4b8a4
11 changed files with 252 additions and 170 deletions
  1. 7 0
      changes/bug25373
  2. 7 0
      changes/bug25828
  3. 5 3
      doc/tor.1.txt
  4. 9 4
      src/common/token_bucket.c
  5. 2 2
      src/common/token_bucket.h
  6. 185 66
      src/or/connection.c
  7. 6 2
      src/or/connection.h
  8. 1 1
      src/or/hibernate.c
  9. 28 92
      src/or/main.c
  10. 1 0
      src/or/main.h
  11. 1 0
      src/or/or.h

+ 7 - 0
changes/bug25373

@@ -0,0 +1,7 @@
+  o Major features (main loop, CPU wakeup):
+    - The bandwidth-limitation logic has been refactored so that
+      bandwidth calculations are performed on-demand, rather than
+      every TokenBucketRefillInterval milliseconds.
+      This change should improve the granularity of our bandwidth
+      calculations, and limit the number of times that the Tor process needs
+      to wake up when it is idle. Closes ticket 25373.

+ 7 - 0
changes/bug25828

@@ -0,0 +1,7 @@
+  o Minor bugfixes (bandwidth management):
+    - Consider ourselves "low on write bandwidth" if we have exhausted our
+      write bandwidth some time in the last second. This was the
+      documented behavior before, but the actual behavior was to change
+      this value every TokenBucketRefillInterval. Fixes bug 25828; bugfix on
+      0.2.3.5-alpha.
+

+ 5 - 3
doc/tor.1.txt

@@ -1286,9 +1286,11 @@ The following options are useful only for clients (that is, if
     2 minutes)
     2 minutes)
 
 
 [[TokenBucketRefillInterval]] **TokenBucketRefillInterval** __NUM__ [**msec**|**second**]::
 [[TokenBucketRefillInterval]] **TokenBucketRefillInterval** __NUM__ [**msec**|**second**]::
-    Set the refill interval of Tor's token bucket to NUM milliseconds.
-    NUM must be between 1 and 1000, inclusive.  Note that the configured
-    bandwidth limits are still expressed in bytes per second: this
+    Set the refill delay interval of Tor's token bucket to NUM milliseconds.
+    NUM must be between 1 and 1000, inclusive.  When Tor is out of bandwidth,
+    on a connection or globally, it will wait up to this long before it tries
+    to use that connection again.
+    Note that bandwidth limits are still expressed in bytes per second: this
     option only affects the frequency with which Tor checks to see whether
     option only affects the frequency with which Tor checks to see whether
     previously exhausted connections may read again.
     previously exhausted connections may read again.
     Can not be changed while tor is running. (Default: 100 msec)
     Can not be changed while tor is running. (Default: 100 msec)

+ 9 - 4
src/common/token_bucket.c

@@ -238,13 +238,18 @@ token_bucket_rw_dec_write(token_bucket_rw_t *bucket,
 
 
 /**
 /**
  * As token_bucket_rw_dec_read and token_bucket_rw_dec_write, in a single
  * As token_bucket_rw_dec_read and token_bucket_rw_dec_write, in a single
- * operation.
+ * operation.  Return a bitmask of TB_READ and TB_WRITE to indicate
+ * which buckets became empty.
  */
  */
-void
+int
 token_bucket_rw_dec(token_bucket_rw_t *bucket,
 token_bucket_rw_dec(token_bucket_rw_t *bucket,
                     ssize_t n_read, ssize_t n_written)
                     ssize_t n_read, ssize_t n_written)
 {
 {
-  token_bucket_rw_dec_read(bucket, n_read);
-  token_bucket_rw_dec_write(bucket, n_written);
+  int flags = 0;
+  if (token_bucket_rw_dec_read(bucket, n_read))
+    flags |= TB_READ;
+  if (token_bucket_rw_dec_write(bucket, n_written))
+    flags |= TB_WRITE;
+  return flags;
 }
 }
 
 

+ 2 - 2
src/common/token_bucket.h

@@ -85,8 +85,8 @@ int token_bucket_rw_dec_read(token_bucket_rw_t *bucket,
 int token_bucket_rw_dec_write(token_bucket_rw_t *bucket,
 int token_bucket_rw_dec_write(token_bucket_rw_t *bucket,
                               ssize_t n);
                               ssize_t n);
 
 
-void token_bucket_rw_dec(token_bucket_rw_t *bucket,
-                         ssize_t n_read, ssize_t n_written);
+int token_bucket_rw_dec(token_bucket_rw_t *bucket,
+                        ssize_t n_read, ssize_t n_written);
 
 
 static inline size_t token_bucket_rw_get_read(const token_bucket_rw_t *bucket);
 static inline size_t token_bucket_rw_get_read(const token_bucket_rw_t *bucket);
 static inline size_t
 static inline size_t

+ 185 - 66
src/or/connection.c

@@ -1,4 +1,4 @@
- /* Copyright (c) 2001 Matej Pfajfar.
+/* Copyright (c) 2001 Matej Pfajfar.
  * Copyright (c) 2001-2004, Roger Dingledine.
  * Copyright (c) 2001-2004, Roger Dingledine.
  * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
  * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
  * Copyright (c) 2007-2017, The Tor Project, Inc. */
  * Copyright (c) 2007-2017, The Tor Project, Inc. */
@@ -85,6 +85,7 @@
 #include "ext_orport.h"
 #include "ext_orport.h"
 #include "geoip.h"
 #include "geoip.h"
 #include "main.h"
 #include "main.h"
+#include "hibernate.h"
 #include "hs_common.h"
 #include "hs_common.h"
 #include "hs_ident.h"
 #include "hs_ident.h"
 #include "nodelist.h"
 #include "nodelist.h"
@@ -137,6 +138,8 @@ static const char *proxy_type_to_string(int proxy_type);
 static int get_proxy_type(void);
 static int get_proxy_type(void);
 const tor_addr_t *conn_get_outbound_address(sa_family_t family,
 const tor_addr_t *conn_get_outbound_address(sa_family_t family,
                   const or_options_t *options, unsigned int conn_type);
                   const or_options_t *options, unsigned int conn_type);
+static void reenable_blocked_connection_init(const or_options_t *options);
+static void reenable_blocked_connection_schedule(void);
 
 
 /** The last addresses that our network interface seemed to have been
 /** The last addresses that our network interface seemed to have been
  * binding to.  We use this as one way to detect when our IP changes.
  * binding to.  We use this as one way to detect when our IP changes.
@@ -772,8 +775,8 @@ connection_close_immediate(connection_t *conn)
   connection_unregister_events(conn);
   connection_unregister_events(conn);
 
 
   /* Prevent the event from getting unblocked. */
   /* Prevent the event from getting unblocked. */
-  conn->read_blocked_on_bw =
-    conn->write_blocked_on_bw = 0;
+  conn->read_blocked_on_bw = 0;
+  conn->write_blocked_on_bw = 0;
 
 
   if (SOCKET_OK(conn->s))
   if (SOCKET_OK(conn->s))
     tor_close_socket(conn->s);
     tor_close_socket(conn->s);
@@ -2814,10 +2817,10 @@ connection_is_rate_limited(connection_t *conn)
     return 1;
     return 1;
 }
 }
 
 
-/** Did either global write bucket run dry last second? If so,
- * we are likely to run dry again this second, so be stingy with the
- * tokens we just put in. */
-static int write_buckets_empty_last_second = 0;
+/** When was either global write bucket last empty? If this was recent, then
+ * we're probably low on bandwidth, and we should be stingy with our bandwidth
+ * usage. */
+static time_t write_buckets_last_empty_at = -100;
 
 
 /** How many seconds of no active local circuits will make the
 /** How many seconds of no active local circuits will make the
  * connection revert to the "relayed" bandwidth class? */
  * connection revert to the "relayed" bandwidth class? */
@@ -2845,14 +2848,14 @@ connection_counts_as_relayed_traffic(connection_t *conn, time_t now)
  * write many of them or just a few; and <b>conn_bucket</b> (if
  * write many of them or just a few; and <b>conn_bucket</b> (if
  * non-negative) provides an upper limit for our answer. */
  * non-negative) provides an upper limit for our answer. */
 static ssize_t
 static ssize_t
-connection_bucket_round_robin(int base, int priority,
-                              ssize_t global_bucket_val, ssize_t conn_bucket)
+connection_bucket_get_share(int base, int priority,
+                            ssize_t global_bucket_val, ssize_t conn_bucket)
 {
 {
   ssize_t at_most;
   ssize_t at_most;
   ssize_t num_bytes_high = (priority ? 32 : 16) * base;
   ssize_t num_bytes_high = (priority ? 32 : 16) * base;
   ssize_t num_bytes_low = (priority ? 4 : 2) * base;
   ssize_t num_bytes_low = (priority ? 4 : 2) * base;
 
 
-  /* Do a rudimentary round-robin so one circuit can't hog a connection.
+  /* Do a rudimentary limiting so one circuit can't hog a connection.
    * Pick at most 32 cells, at least 4 cells if possible, and if we're in
    * Pick at most 32 cells, at least 4 cells if possible, and if we're in
    * the middle pick 1/8 of the available bandwidth. */
    * the middle pick 1/8 of the available bandwidth. */
   at_most = global_bucket_val / 8;
   at_most = global_bucket_val / 8;
@@ -2899,8 +2902,8 @@ connection_bucket_read_limit(connection_t *conn, time_t now)
     global_bucket_val = MIN(global_bucket_val, relayed);
     global_bucket_val = MIN(global_bucket_val, relayed);
   }
   }
 
 
-  return connection_bucket_round_robin(base, priority,
-                                       global_bucket_val, conn_bucket);
+  return connection_bucket_get_share(base, priority,
+                                     global_bucket_val, conn_bucket);
 }
 }
 
 
 /** How many bytes at most can we write onto this connection? */
 /** How many bytes at most can we write onto this connection? */
@@ -2931,8 +2934,8 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
     global_bucket_val = MIN(global_bucket_val, relayed);
     global_bucket_val = MIN(global_bucket_val, relayed);
   }
   }
 
 
-  return connection_bucket_round_robin(base, priority,
-                                       global_bucket_val, conn_bucket);
+  return connection_bucket_get_share(base, priority,
+                                     global_bucket_val, conn_bucket);
 }
 }
 
 
 /** Return 1 if the global write buckets are low enough that we
 /** Return 1 if the global write buckets are low enough that we
@@ -2969,8 +2972,11 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
   if (smaller_bucket < attempt)
   if (smaller_bucket < attempt)
     return 1; /* not enough space no matter the priority */
     return 1; /* not enough space no matter the priority */
 
 
-  if (write_buckets_empty_last_second)
-    return 1; /* we're already hitting our limits, no more please */
+  {
+    const time_t diff = approx_time() - write_buckets_last_empty_at;
+    if (diff <= 1)
+      return 1; /* we're already hitting our limits, no more please */
+  }
 
 
   if (priority == 1) { /* old-style v1 query */
   if (priority == 1) { /* old-style v1 query */
     /* Could we handle *two* of these requests within the next two seconds? */
     /* Could we handle *two* of these requests within the next two seconds? */
@@ -2986,6 +2992,10 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
   return 0;
   return 0;
 }
 }
 
 
+/** When did we last tell the accounting subsystem about transmitted
+ * bandwidth? */
+static time_t last_recorded_accounting_at = 0;
+
 /** Helper: adjusts our bandwidth history and informs the controller as
 /** Helper: adjusts our bandwidth history and informs the controller as
  * appropriate, given that we have just read <b>num_read</b> bytes and written
  * appropriate, given that we have just read <b>num_read</b> bytes and written
  * <b>num_written</b> bytes on <b>conn</b>. */
  * <b>num_written</b> bytes on <b>conn</b>. */
@@ -3016,6 +3026,20 @@ record_num_bytes_transferred_impl(connection_t *conn,
   }
   }
   if (conn->type == CONN_TYPE_EXIT)
   if (conn->type == CONN_TYPE_EXIT)
     rep_hist_note_exit_bytes(conn->port, num_written, num_read);
     rep_hist_note_exit_bytes(conn->port, num_written, num_read);
+
+  /* Remember these bytes towards statistics. */
+  stats_increment_bytes_read_and_written(num_read, num_written);
+
+  /* Remember these bytes towards accounting. */
+  if (accounting_is_enabled(get_options())) {
+    if (now > last_recorded_accounting_at && last_recorded_accounting_at) {
+      accounting_add_bytes(num_read, num_written,
+                           (int)(now - last_recorded_accounting_at));
+    } else {
+      accounting_add_bytes(num_read, num_written, 0);
+    }
+    last_recorded_accounting_at = now;
+  }
 }
 }
 
 
 /** We just read <b>num_read</b> and wrote <b>num_written</b> bytes
 /** We just read <b>num_read</b> and wrote <b>num_written</b> bytes
@@ -3042,19 +3066,54 @@ connection_buckets_decrement(connection_t *conn, time_t now,
   if (!connection_is_rate_limited(conn))
   if (!connection_is_rate_limited(conn))
     return; /* local IPs are free */
     return; /* local IPs are free */
 
 
+  unsigned flags = 0;
   if (connection_counts_as_relayed_traffic(conn, now)) {
   if (connection_counts_as_relayed_traffic(conn, now)) {
-    token_bucket_rw_dec(&global_relayed_bucket, num_read, num_written);
+    flags = token_bucket_rw_dec(&global_relayed_bucket, num_read, num_written);
+  }
+  flags |= token_bucket_rw_dec(&global_bucket, num_read, num_written);
+
+  if (flags & TB_WRITE) {
+    write_buckets_last_empty_at = now;
   }
   }
-  token_bucket_rw_dec(&global_bucket, num_read, num_written);
   if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
   if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
     or_connection_t *or_conn = TO_OR_CONN(conn);
     or_connection_t *or_conn = TO_OR_CONN(conn);
     token_bucket_rw_dec(&or_conn->bucket, num_read, num_written);
     token_bucket_rw_dec(&or_conn->bucket, num_read, num_written);
   }
   }
 }
 }
 
 
+/**
+ * Mark <b>conn</b> as needing to stop reading because bandwidth has been
+ * exhausted.  If <b>is_global_bw</b>, it is closing because global bandwidth
+ * limit has been exhausted.  Otherwise, it is closing because its own
+ * bandwidth limit has been exhausted.
+ */
+void
+connection_read_bw_exhausted(connection_t *conn, bool is_global_bw)
+{
+  (void)is_global_bw;
+  conn->read_blocked_on_bw = 1;
+  connection_stop_reading(conn);
+  reenable_blocked_connection_schedule();
+}
+
+/**
+ * Mark <b>conn</b> as needing to stop reading because write bandwidth has
+ * been exhausted.  If <b>is_global_bw</b>, it is closing because global
+ * bandwidth limit has been exhausted.  Otherwise, it is closing because its
+ * own bandwidth limit has been exhausted.
+*/
+void
+connection_write_bw_exhausted(connection_t *conn, bool is_global_bw)
+{
+  (void)is_global_bw;
+  conn->write_blocked_on_bw = 1;
+  connection_stop_reading(conn);
+  reenable_blocked_connection_schedule();
+}
+
 /** If we have exhausted our global buckets, or the buckets for conn,
 /** If we have exhausted our global buckets, or the buckets for conn,
  * stop reading. */
  * stop reading. */
-static void
+void
 connection_consider_empty_read_buckets(connection_t *conn)
 connection_consider_empty_read_buckets(connection_t *conn)
 {
 {
   const char *reason;
   const char *reason;
@@ -3062,6 +3121,8 @@ connection_consider_empty_read_buckets(connection_t *conn)
   if (!connection_is_rate_limited(conn))
   if (!connection_is_rate_limited(conn))
     return; /* Always okay. */
     return; /* Always okay. */
 
 
+  int is_global = 1;
+
   if (token_bucket_rw_get_read(&global_bucket) <= 0) {
   if (token_bucket_rw_get_read(&global_bucket) <= 0) {
     reason = "global read bucket exhausted. Pausing.";
     reason = "global read bucket exhausted. Pausing.";
   } else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
   } else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
@@ -3071,17 +3132,17 @@ connection_consider_empty_read_buckets(connection_t *conn)
              conn->state == OR_CONN_STATE_OPEN &&
              conn->state == OR_CONN_STATE_OPEN &&
              token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) <= 0) {
              token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) <= 0) {
     reason = "connection read bucket exhausted. Pausing.";
     reason = "connection read bucket exhausted. Pausing.";
+    is_global = false;
   } else
   } else
     return; /* all good, no need to stop it */
     return; /* all good, no need to stop it */
 
 
   LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason));
   LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason));
-  conn->read_blocked_on_bw = 1;
-  connection_stop_reading(conn);
+  connection_read_bw_exhausted(conn, is_global);
 }
 }
 
 
 /** If we have exhausted our global buckets, or the buckets for conn,
 /** If we have exhausted our global buckets, or the buckets for conn,
  * stop writing. */
  * stop writing. */
-static void
+void
 connection_consider_empty_write_buckets(connection_t *conn)
 connection_consider_empty_write_buckets(connection_t *conn)
 {
 {
   const char *reason;
   const char *reason;
@@ -3089,6 +3150,7 @@ connection_consider_empty_write_buckets(connection_t *conn)
   if (!connection_is_rate_limited(conn))
   if (!connection_is_rate_limited(conn))
     return; /* Always okay. */
     return; /* Always okay. */
 
 
+  bool is_global = true;
   if (token_bucket_rw_get_write(&global_bucket) <= 0) {
   if (token_bucket_rw_get_write(&global_bucket) <= 0) {
     reason = "global write bucket exhausted. Pausing.";
     reason = "global write bucket exhausted. Pausing.";
   } else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
   } else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
@@ -3098,12 +3160,12 @@ connection_consider_empty_write_buckets(connection_t *conn)
              conn->state == OR_CONN_STATE_OPEN &&
              conn->state == OR_CONN_STATE_OPEN &&
              token_bucket_rw_get_write(&TO_OR_CONN(conn)->bucket) <= 0) {
              token_bucket_rw_get_write(&TO_OR_CONN(conn)->bucket) <= 0) {
     reason = "connection write bucket exhausted. Pausing.";
     reason = "connection write bucket exhausted. Pausing.";
+    is_global = false;
   } else
   } else
     return; /* all good, no need to stop it */
     return; /* all good, no need to stop it */
 
 
   LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason));
   LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason));
-  conn->write_blocked_on_bw = 1;
-  connection_stop_writing(conn);
+  connection_write_bw_exhausted(conn, is_global);
 }
 }
 
 
 /** Initialize the global buckets to the values configured in the
 /** Initialize the global buckets to the values configured in the
@@ -3128,6 +3190,8 @@ connection_bucket_init(void)
                       (int32_t)options->BandwidthBurst,
                       (int32_t)options->BandwidthBurst,
                       now_ts);
                       now_ts);
   }
   }
+
+  reenable_blocked_connection_init(options);
 }
 }
 
 
 /** Update the global connection bucket settings to a new value. */
 /** Update the global connection bucket settings to a new value. */
@@ -3148,57 +3212,104 @@ connection_bucket_adjust(const or_options_t *options)
   }
   }
 }
 }
 
 
-/** Time has passed; increment buckets appropriately. */
-void
-connection_bucket_refill(time_t now, uint32_t now_ts)
+/**
+ * Cached value of the last coarse-timestamp when we refilled the
+ * global buckets.
+ */
+static uint32_t last_refilled_global_buckets_ts=0;
+/**
+ * Refill the token buckets for a single connection <b>conn</b>, and the
+ * global token buckets as appropriate.  Requires that <b>now_ts</b> is
+ * the time in coarse timestamp units.
+ */
+static void
+connection_bucket_refill_single(connection_t *conn, uint32_t now_ts)
 {
 {
-  smartlist_t *conns = get_connection_array();
+  /* Note that we only check for equality here: the underlying
+   * token bucket functions can handle moving backwards in time if they
+   * need to. */
+  if (now_ts != last_refilled_global_buckets_ts) {
+    token_bucket_rw_refill(&global_bucket, now_ts);
+    token_bucket_rw_refill(&global_relayed_bucket, now_ts);
+    last_refilled_global_buckets_ts = now_ts;
+  }
 
 
-  write_buckets_empty_last_second =
-    token_bucket_rw_get_write(&global_bucket) <= 0 ||
-    token_bucket_rw_get_write(&global_relayed_bucket) <= 0;
+  if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
+    or_connection_t *or_conn = TO_OR_CONN(conn);
+    token_bucket_rw_refill(&or_conn->bucket, now_ts);
+  }
+}
 
 
-  /* refill the global buckets */
-  token_bucket_rw_refill(&global_bucket, now_ts);
-  token_bucket_rw_refill(&global_relayed_bucket, now_ts);
+/**
+ * Event to re-enable all connections that were previously blocked on read or
+ * write.
+ */
+static mainloop_event_t *reenable_blocked_connections_ev = NULL;
 
 
-  /* refill the per-connection buckets */
-  SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) {
-    if (connection_speaks_cells(conn)) {
-      or_connection_t *or_conn = TO_OR_CONN(conn);
+/** True iff reenable_blocked_connections_ev is currently scheduled. */
+static int reenable_blocked_connections_is_scheduled = 0;
 
 
-      if (conn->state == OR_CONN_STATE_OPEN) {
-        token_bucket_rw_refill(&or_conn->bucket, now_ts);
-      }
-    }
+/** Delay after which to run reenable_blocked_connections_ev. */
+static struct timeval reenable_blocked_connections_delay;
 
 
-    if (conn->read_blocked_on_bw == 1 /* marked to turn reading back on now */
-        && token_bucket_rw_get_read(&global_bucket) > 0 /* and we can read */
-        && (!connection_counts_as_relayed_traffic(conn, now) ||
-            token_bucket_rw_get_read(&global_relayed_bucket) > 0)
-        && (!connection_speaks_cells(conn) ||
-            conn->state != OR_CONN_STATE_OPEN ||
-            token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) > 0)) {
-        /* and either a non-cell conn or a cell conn with non-empty bucket */
-      LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
-                         "waking up conn (fd %d) for read", (int)conn->s));
-      conn->read_blocked_on_bw = 0;
+/**
+ * Re-enable all connections that were previously blocked on read or write.
+ * This event is scheduled after enough time has elapsed to be sure
+ * that the buckets will refill when the connections have something to do.
+ */
+static void
+reenable_blocked_connections_cb(mainloop_event_t *ev, void *arg)
+{
+  (void)ev;
+  (void)arg;
+  SMARTLIST_FOREACH_BEGIN(get_connection_array(), connection_t *, conn) {
+    if (conn->read_blocked_on_bw == 1) {
       connection_start_reading(conn);
       connection_start_reading(conn);
+      conn->read_blocked_on_bw = 0;
     }
     }
-
-    if (conn->write_blocked_on_bw == 1
-        && token_bucket_rw_get_write(&global_bucket) > 0 /* and we can write */
-        && (!connection_counts_as_relayed_traffic(conn, now) ||
-            token_bucket_rw_get_write(&global_relayed_bucket) > 0)
-        && (!connection_speaks_cells(conn) ||
-            conn->state != OR_CONN_STATE_OPEN ||
-            token_bucket_rw_get_write(&TO_OR_CONN(conn)->bucket) > 0)) {
-      LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
-                         "waking up conn (fd %d) for write", (int)conn->s));
-      conn->write_blocked_on_bw = 0;
+    if (conn->write_blocked_on_bw == 1) {
       connection_start_writing(conn);
       connection_start_writing(conn);
+      conn->write_blocked_on_bw = 0;
     }
     }
   } SMARTLIST_FOREACH_END(conn);
   } SMARTLIST_FOREACH_END(conn);
+
+  reenable_blocked_connections_is_scheduled = 0;
+}
+
+/**
+ * Initialize the mainloop event that we use to wake up connections that
+ * find themselves blocked on bandwidth.
+ */
+static void
+reenable_blocked_connection_init(const or_options_t *options)
+{
+  if (! reenable_blocked_connections_ev) {
+    reenable_blocked_connections_ev =
+      mainloop_event_new(reenable_blocked_connections_cb, NULL);
+    reenable_blocked_connections_is_scheduled = 0;
+  }
+  time_t sec = options->TokenBucketRefillInterval / 1000;
+  int msec = (options->TokenBucketRefillInterval % 1000);
+  reenable_blocked_connections_delay.tv_sec = sec;
+  reenable_blocked_connections_delay.tv_usec = msec * 1000;
+}
+
+/**
+ * Called when we have blocked a connection for being low on bandwidth:
+ * schedule an event to reenable such connections, if it is not already
+ * scheduled.
+ */
+static void
+reenable_blocked_connection_schedule(void)
+{
+  if (reenable_blocked_connections_is_scheduled)
+    return;
+  if (BUG(reenable_blocked_connections_ev == NULL)) {
+    reenable_blocked_connection_init(get_options());
+  }
+  mainloop_event_schedule(reenable_blocked_connections_ev,
+                          &reenable_blocked_connections_delay);
+  reenable_blocked_connections_is_scheduled = 1;
 }
 }
 
 
 /** Read bytes from conn-\>s and process them.
 /** Read bytes from conn-\>s and process them.
@@ -3221,6 +3332,8 @@ connection_handle_read_impl(connection_t *conn)
 
 
   conn->timestamp_last_read_allowed = approx_time();
   conn->timestamp_last_read_allowed = approx_time();
 
 
+  connection_bucket_refill_single(conn, monotime_coarse_get_stamp());
+
   switch (conn->type) {
   switch (conn->type) {
     case CONN_TYPE_OR_LISTENER:
     case CONN_TYPE_OR_LISTENER:
       return connection_handle_listener_read(conn, CONN_TYPE_OR);
       return connection_handle_listener_read(conn, CONN_TYPE_OR);
@@ -3645,6 +3758,8 @@ connection_handle_write_impl(connection_t *conn, int force)
 
 
   conn->timestamp_last_write_allowed = now;
   conn->timestamp_last_write_allowed = now;
 
 
+  connection_bucket_refill_single(conn, monotime_coarse_get_stamp());
+
   /* Sometimes, "writable" means "connected". */
   /* Sometimes, "writable" means "connected". */
   if (connection_state_is_connecting(conn)) {
   if (connection_state_is_connecting(conn)) {
     if (getsockopt(conn->s, SOL_SOCKET, SO_ERROR, (void*)&e, &len) < 0) {
     if (getsockopt(conn->s, SOL_SOCKET, SO_ERROR, (void*)&e, &len) < 0) {
@@ -3758,8 +3873,7 @@ connection_handle_write_impl(connection_t *conn, int force)
         /* Make sure to avoid a loop if the receive buckets are empty. */
         /* Make sure to avoid a loop if the receive buckets are empty. */
         log_debug(LD_NET,"wanted read.");
         log_debug(LD_NET,"wanted read.");
         if (!connection_is_reading(conn)) {
         if (!connection_is_reading(conn)) {
-          connection_stop_writing(conn);
-          conn->write_blocked_on_bw = 1;
+          connection_write_bw_exhausted(conn, true);
           /* we'll start reading again when we get more tokens in our
           /* we'll start reading again when we get more tokens in our
            * read bucket; then we'll start writing again too.
            * read bucket; then we'll start writing again too.
            */
            */
@@ -5109,6 +5223,11 @@ connection_free_all(void)
 
 
   tor_free(last_interface_ipv4);
   tor_free(last_interface_ipv4);
   tor_free(last_interface_ipv6);
   tor_free(last_interface_ipv6);
+  last_recorded_accounting_at = 0;
+
+  mainloop_event_free(reenable_blocked_connections_ev);
+  reenable_blocked_connections_is_scheduled = 0;
+  memset(&reenable_blocked_connections_delay, 0, sizeof(struct timeval));
 }
 }
 
 
 /** Log a warning, and possibly emit a control event, that <b>received</b> came
 /** Log a warning, and possibly emit a control event, that <b>received</b> came

+ 6 - 2
src/or/connection.h

@@ -123,8 +123,12 @@ ssize_t connection_bucket_write_limit(connection_t *conn, time_t now);
 int global_write_bucket_low(connection_t *conn, size_t attempt, int priority);
 int global_write_bucket_low(connection_t *conn, size_t attempt, int priority);
 void connection_bucket_init(void);
 void connection_bucket_init(void);
 void connection_bucket_adjust(const or_options_t *options);
 void connection_bucket_adjust(const or_options_t *options);
-void connection_bucket_refill(time_t now,
-                              uint32_t now_ts);
+void connection_bucket_refill_all(time_t now,
+                                  uint32_t now_ts);
+void connection_read_bw_exhausted(connection_t *conn, bool is_global_bw);
+void connection_write_bw_exhausted(connection_t *conn, bool is_global_bw);
+void connection_consider_empty_read_buckets(connection_t *conn);
+void connection_consider_empty_write_buckets(connection_t *conn);
 
 
 int connection_handle_read(connection_t *conn);
 int connection_handle_read(connection_t *conn);
 
 

+ 1 - 1
src/or/hibernate.c

@@ -297,7 +297,7 @@ accounting_get_end_time,(void))
   return interval_end_time;
   return interval_end_time;
 }
 }
 
 
-/** Called from main.c to tell us that <b>seconds</b> seconds have
+/** Called from connection.c to tell us that <b>seconds</b> seconds have
  * passed, <b>n_read</b> bytes have been read, and <b>n_written</b>
  * passed, <b>n_read</b> bytes have been read, and <b>n_written</b>
  * bytes have been written. */
  * bytes have been written. */
 void
 void

+ 28 - 92
src/or/main.c

@@ -159,13 +159,6 @@ token_bucket_rw_t global_bucket;
 /* Token bucket for relayed traffic. */
 /* Token bucket for relayed traffic. */
 token_bucket_rw_t global_relayed_bucket;
 token_bucket_rw_t global_relayed_bucket;
 
 
-/** What was the read/write bucket before the last second_elapsed_callback()
- * call?  (used to determine how many bytes we've read). */
-static size_t stats_prev_global_read_bucket;
-/** What was the write bucket before the last second_elapsed_callback() call?
- * (used to determine how many bytes we've written). */
-static size_t stats_prev_global_write_bucket;
-
 /* DOCDOC stats_prev_n_read */
 /* DOCDOC stats_prev_n_read */
 static uint64_t stats_prev_n_read = 0;
 static uint64_t stats_prev_n_read = 0;
 /* DOCDOC stats_prev_n_written */
 /* DOCDOC stats_prev_n_written */
@@ -479,21 +472,37 @@ get_connection_array, (void))
   return connection_array;
   return connection_array;
 }
 }
 
 
-/** Provides the traffic read and written over the life of the process. */
-
+/**
+ * Return the amount of network traffic read, in bytes, over the life of this
+ * process.
+ */
 MOCK_IMPL(uint64_t,
 MOCK_IMPL(uint64_t,
 get_bytes_read,(void))
 get_bytes_read,(void))
 {
 {
   return stats_n_bytes_read;
   return stats_n_bytes_read;
 }
 }
 
 
-/* DOCDOC get_bytes_written */
+/**
+ * Return the amount of network traffic read, in bytes, over the life of this
+ * process.
+ */
 MOCK_IMPL(uint64_t,
 MOCK_IMPL(uint64_t,
 get_bytes_written,(void))
 get_bytes_written,(void))
 {
 {
   return stats_n_bytes_written;
   return stats_n_bytes_written;
 }
 }
 
 
+/**
+ * Increment the amount of network traffic read and written, over the life of
+ * this process.
+ */
+void
+stats_increment_bytes_read_and_written(uint64_t r, uint64_t w)
+{
+  stats_n_bytes_read += r;
+  stats_n_bytes_written += w;
+}
+
 /** Set the event mask on <b>conn</b> to <b>events</b>.  (The event
 /** Set the event mask on <b>conn</b> to <b>events</b>.  (The event
  * mask is a bitmask whose bits are READ_EVENT and WRITE_EVENT)
  * mask is a bitmask whose bits are READ_EVENT and WRITE_EVENT)
  */
  */
@@ -1025,19 +1034,22 @@ conn_close_if_marked(int i)
          * busy Libevent loops where we keep ending up here and returning
          * busy Libevent loops where we keep ending up here and returning
          * 0 until we are no longer blocked on bandwidth.
          * 0 until we are no longer blocked on bandwidth.
          */
          */
-        if (connection_is_writing(conn)) {
-          conn->write_blocked_on_bw = 1;
-          connection_stop_writing(conn);
+        connection_consider_empty_read_buckets(conn);
+        connection_consider_empty_write_buckets(conn);
+
+        /* Make sure that consider_empty_buckets really disabled the
+         * connection: */
+        if (BUG(connection_is_writing(conn))) {
+          connection_write_bw_exhausted(conn, true);
         }
         }
-        if (connection_is_reading(conn)) {
+        if (BUG(connection_is_reading(conn))) {
           /* XXXX+ We should make this code unreachable; if a connection is
           /* XXXX+ We should make this code unreachable; if a connection is
            * marked for close and flushing, there is no point in reading to it
            * marked for close and flushing, there is no point in reading to it
            * at all. Further, checking at this point is a bit of a hack: it
            * at all. Further, checking at this point is a bit of a hack: it
            * would make much more sense to react in
            * would make much more sense to react in
            * connection_handle_read_impl, or to just stop reading in
            * connection_handle_read_impl, or to just stop reading in
            * mark_and_flush */
            * mark_and_flush */
-          conn->read_blocked_on_bw = 1;
-          connection_stop_reading(conn);
+          connection_read_bw_exhausted(conn, true/* kludge. */);
         }
         }
       }
       }
       return 0;
       return 0;
@@ -2358,63 +2370,6 @@ systemd_watchdog_callback(periodic_timer_t *timer, void *arg)
 }
 }
 #endif /* defined(HAVE_SYSTEMD_209) */
 #endif /* defined(HAVE_SYSTEMD_209) */
 
 
-/** Timer: used to invoke refill_callback(). */
-static periodic_timer_t *refill_timer = NULL;
-
-/** Millisecond when refall_callback was last invoked. */
-static struct timeval refill_timer_current_millisecond;
-
-/** Libevent callback: invoked periodically to refill token buckets
- * and count r/w bytes. */
-static void
-refill_callback(periodic_timer_t *timer, void *arg)
-{
-  struct timeval now;
-
-  size_t bytes_written;
-  size_t bytes_read;
-  int milliseconds_elapsed = 0;
-  int seconds_rolled_over = 0;
-
-  const or_options_t *options = get_options();
-
-  (void)timer;
-  (void)arg;
-
-  tor_gettimeofday(&now);
-
-  /* If this is our first time, no time has passed. */
-  if (refill_timer_current_millisecond.tv_sec) {
-    long mdiff = tv_mdiff(&refill_timer_current_millisecond, &now);
-    if (mdiff > INT_MAX)
-      mdiff = INT_MAX;
-    milliseconds_elapsed = (int)mdiff;
-    seconds_rolled_over = (int)(now.tv_sec -
-                                refill_timer_current_millisecond.tv_sec);
-  }
-
-  bytes_written = stats_prev_global_write_bucket -
-    token_bucket_rw_get_write(&global_bucket);
-  bytes_read = stats_prev_global_read_bucket -
-    token_bucket_rw_get_read(&global_bucket);
-
-  stats_n_bytes_read += bytes_read;
-  stats_n_bytes_written += bytes_written;
-  if (accounting_is_enabled(options) && milliseconds_elapsed >= 0)
-    accounting_add_bytes(bytes_read, bytes_written, seconds_rolled_over);
-
-  if (milliseconds_elapsed > 0) {
-    connection_bucket_refill((time_t)now.tv_sec,
-                             monotime_coarse_get_stamp());
-  }
-
-  stats_prev_global_read_bucket = token_bucket_rw_get_read(&global_bucket);
-  stats_prev_global_write_bucket = token_bucket_rw_get_write(&global_bucket);
-
-  /* remember what time it is, for next time */
-  refill_timer_current_millisecond = now;
-}
-
 #ifndef _WIN32
 #ifndef _WIN32
 /** Called when a possibly ignorable libevent error occurs; ensures that we
 /** Called when a possibly ignorable libevent error occurs; ensures that we
  * don't get into an infinite loop by ignoring too many errors from
  * don't get into an infinite loop by ignoring too many errors from
@@ -2618,8 +2573,6 @@ do_main_loop(void)
 
 
   /* Set up our buckets */
   /* Set up our buckets */
   connection_bucket_init();
   connection_bucket_init();
-  stats_prev_global_read_bucket = token_bucket_rw_get_read(&global_bucket);
-  stats_prev_global_write_bucket = token_bucket_rw_get_write(&global_bucket);
 
 
   /* initialize the bootstrap status events to know we're starting up */
   /* initialize the bootstrap status events to know we're starting up */
   control_event_bootstrap(BOOTSTRAP_STATUS_STARTING, 0);
   control_event_bootstrap(BOOTSTRAP_STATUS_STARTING, 0);
@@ -2717,20 +2670,6 @@ do_main_loop(void)
   }
   }
 #endif /* defined(HAVE_SYSTEMD_209) */
 #endif /* defined(HAVE_SYSTEMD_209) */
 
 
-  if (!refill_timer) {
-    struct timeval refill_interval;
-    int msecs = get_options()->TokenBucketRefillInterval;
-
-    refill_interval.tv_sec =  msecs/1000;
-    refill_interval.tv_usec = (msecs%1000)*1000;
-
-    refill_timer = periodic_timer_new(tor_libevent_get_base(),
-                                      &refill_interval,
-                                      refill_callback,
-                                      NULL);
-    tor_assert(refill_timer);
-  }
-
 #ifdef HAVE_SYSTEMD
 #ifdef HAVE_SYSTEMD
   {
   {
     const int r = sd_notify(0, "READY=1");
     const int r = sd_notify(0, "READY=1");
@@ -3487,7 +3426,6 @@ tor_free_all(int postfork)
   smartlist_free(active_linked_connection_lst);
   smartlist_free(active_linked_connection_lst);
   periodic_timer_free(second_timer);
   periodic_timer_free(second_timer);
   teardown_periodic_events();
   teardown_periodic_events();
-  periodic_timer_free(refill_timer);
   tor_event_free(shutdown_did_not_work_event);
   tor_event_free(shutdown_did_not_work_event);
   tor_event_free(initialize_periodic_events_event);
   tor_event_free(initialize_periodic_events_event);
   mainloop_event_free(directory_all_unreachable_cb_event);
   mainloop_event_free(directory_all_unreachable_cb_event);
@@ -3499,7 +3437,6 @@ tor_free_all(int postfork)
 
 
   memset(&global_bucket, 0, sizeof(global_bucket));
   memset(&global_bucket, 0, sizeof(global_bucket));
   memset(&global_relayed_bucket, 0, sizeof(global_relayed_bucket));
   memset(&global_relayed_bucket, 0, sizeof(global_relayed_bucket));
-  stats_prev_global_read_bucket = stats_prev_global_write_bucket = 0;
   stats_prev_n_read = stats_prev_n_written = 0;
   stats_prev_n_read = stats_prev_n_written = 0;
   stats_n_bytes_read = stats_n_bytes_written = 0;
   stats_n_bytes_read = stats_n_bytes_written = 0;
   time_of_process_start = 0;
   time_of_process_start = 0;
@@ -3516,7 +3453,6 @@ tor_free_all(int postfork)
   heartbeat_callback_first_time = 1;
   heartbeat_callback_first_time = 1;
   n_libevent_errors = 0;
   n_libevent_errors = 0;
   current_second = 0;
   current_second = 0;
-  memset(&refill_timer_current_millisecond, 0, sizeof(struct timeval));
 
 
   if (!postfork) {
   if (!postfork) {
     release_lockfile();
     release_lockfile();

+ 1 - 0
src/or/main.h

@@ -28,6 +28,7 @@ int connection_is_on_closeable_list(connection_t *conn);
 MOCK_DECL(smartlist_t *, get_connection_array, (void));
 MOCK_DECL(smartlist_t *, get_connection_array, (void));
 MOCK_DECL(uint64_t,get_bytes_read,(void));
 MOCK_DECL(uint64_t,get_bytes_read,(void));
 MOCK_DECL(uint64_t,get_bytes_written,(void));
 MOCK_DECL(uint64_t,get_bytes_written,(void));
+void stats_increment_bytes_read_and_written(uint64_t r, uint64_t w);
 
 
 /** Bitmask for events that we can turn on and off with
 /** Bitmask for events that we can turn on and off with
  * connection_watch_events. */
  * connection_watch_events. */

+ 1 - 0
src/or/or.h

@@ -57,6 +57,7 @@
 #ifdef HAVE_TIME_H
 #ifdef HAVE_TIME_H
 #include <time.h>
 #include <time.h>
 #endif
 #endif
+#include <stdbool.h>
 
 
 #ifdef _WIN32
 #ifdef _WIN32
 #include <winsock2.h>
 #include <winsock2.h>