Browse Source

Merge branch 'token_bucket_refactor_squashed'

Nick Mathewson 6 years ago
parent
commit
b152d62cee

+ 5 - 0
changes/ticket25760

@@ -0,0 +1,5 @@
+  o Removed features:
+    - The TestingEnableTbEmptyEvent option has been removed.  It was used
+      in testing simulations to measure how often connection buckets were
+      emptied, in order to improve our scheduling, but it has not
+      been actively used in years. Closes ticket 25760.

+ 3 - 0
changes/ticket25766

@@ -0,0 +1,3 @@
+  o Code simplification and refactoring:
+    - Refactor token-bucket implementations to use a common backend.
+      Closes ticket 25766.

+ 0 - 6
doc/tor.1.txt

@@ -2883,7 +2883,6 @@ The following options are used for running a testing Tor network.
        TestingDirConnectionMaxStall 30 seconds
        TestingEnableConnBwEvent 1
        TestingEnableCellStatsEvent 1
-       TestingEnableTbEmptyEvent 1
 
 [[TestingV3AuthInitialVotingInterval]] **TestingV3AuthInitialVotingInterval** __N__ **minutes**|**hours**::
     Like V3AuthVotingInterval, but for initial voting interval before the first
@@ -3021,11 +3020,6 @@ The following options are used for running a testing Tor network.
     events.  Changing this requires that **TestingTorNetwork** is set.
     (Default: 0)
 
-[[TestingEnableTbEmptyEvent]] **TestingEnableTbEmptyEvent** **0**|**1**::
-    If this option is set, then Tor controllers may register for TB_EMPTY
-    events.  Changing this requires that **TestingTorNetwork** is set.
-    (Default: 0)
-
 [[TestingMinExitFlagThreshold]] **TestingMinExitFlagThreshold**  __N__ **KBytes**|**MBytes**|**GBytes**|**TBytes**|**KBits**|**MBits**|**GBits**|**TBits**::
     Sets a lower-bound for assigning an exit flag when running as an
     authority on a testing network. Overrides the usual default lower bound

+ 13 - 0
src/common/compat_time.c

@@ -830,11 +830,24 @@ monotime_coarse_stamp_units_to_approx_msec(uint64_t units)
   return (abstime_diff * mach_time_info.numer) /
     (mach_time_info.denom * ONE_MILLION);
 }
+uint64_t
+monotime_msec_to_approx_coarse_stamp_units(uint64_t msec)
+{
+  uint64_t abstime_val =
+    (((uint64_t)msec) * ONE_MILLION * mach_time_info.denom) /
+    mach_time_info.numer;
+  return abstime_val >> monotime_shift;
+}
 #else
 uint64_t
 monotime_coarse_stamp_units_to_approx_msec(uint64_t units)
 {
   return (units * 1000) / STAMP_TICKS_PER_SECOND;
 }
+uint64_t
+monotime_msec_to_approx_coarse_stamp_units(uint64_t msec)
+{
+  return (msec * STAMP_TICKS_PER_SECOND) / 1000;
+}
 #endif
 

+ 1 - 0
src/common/compat_time.h

@@ -150,6 +150,7 @@ uint32_t monotime_coarse_to_stamp(const monotime_coarse_t *t);
  * into an approximate number of milliseconds.
  */
 uint64_t monotime_coarse_stamp_units_to_approx_msec(uint64_t units);
+uint64_t monotime_msec_to_approx_coarse_stamp_units(uint64_t msec);
 uint32_t monotime_coarse_get_stamp(void);
 
 #if defined(MONOTIME_COARSE_TYPE_IS_DIFFERENT)

+ 2 - 0
src/common/include.am

@@ -97,6 +97,7 @@ LIBOR_A_SRC = \
   src/common/util_process.c				\
   src/common/sandbox.c					\
   src/common/storagedir.c				\
+  src/common/token_bucket.c				\
   src/common/workqueue.c				\
   $(libor_extra_source)					\
   $(threads_impl_source)				\
@@ -184,6 +185,7 @@ COMMONHEADERS = \
   src/common/storagedir.h			\
   src/common/testsupport.h			\
   src/common/timers.h				\
+  src/common/token_bucket.h			\
   src/common/torint.h				\
   src/common/torlog.h				\
   src/common/tortls.h				\

+ 199 - 0
src/common/token_bucket.c

@@ -0,0 +1,199 @@
+/* Copyright (c) 2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file token_bucket.c
+ * \brief Functions to use and manipulate token buckets, used for
+ *    rate-limiting on connections and globally.
+ *
+ * Tor uses these token buckets to keep track of bandwidth usage, and
+ * sometimes other things too.
+ *
+ * The time units we use internally are based on "timestamp" units -- see
+ * monotime_coarse_to_stamp() for a rationale.
+ *
+ * Token buckets may become negative.
+ **/
+
+#define TOKEN_BUCKET_PRIVATE
+
+#include "token_bucket.h"
+#include "util_bug.h"
+
+/** Convert a rate in bytes per second to a rate in bytes per step */
+static uint32_t
+rate_per_sec_to_rate_per_step(uint32_t rate)
+{
+  /*
+    The precise calculation we'd want to do is
+
+    (rate / 1000) * to_approximate_msec(TICKS_PER_STEP).  But to minimize
+    rounding error, we do it this way instead, and divide last.
+  */
+  return (uint32_t)
+    monotime_coarse_stamp_units_to_approx_msec(rate*TICKS_PER_STEP)/1000;
+}
+
+/**
+ * Initialize a token bucket in *<b>bucket</b>, set up to allow <b>rate</b>
+ * bytes per second, with a maximum burst of <b>burst</b> bytes. The bucket
+ * is created such that <b>now_ts</b> is the current timestamp.  The bucket
+ * starts out full.
+ */
+void
+token_bucket_init(token_bucket_t *bucket,
+                  uint32_t rate,
+                  uint32_t burst,
+                  uint32_t now_ts)
+{
+  memset(bucket, 0, sizeof(token_bucket_t));
+  token_bucket_adjust(bucket, rate, burst);
+  token_bucket_reset(bucket, now_ts);
+}
+
+/**
+ * Change the configured rate (in bytes per second) and burst (in bytes)
+ * for the token bucket in *<b>bucket</b>.
+ */
+void
+token_bucket_adjust(token_bucket_t *bucket,
+                    uint32_t rate,
+                    uint32_t burst)
+{
+  tor_assert_nonfatal(rate > 0);
+  tor_assert_nonfatal(burst > 0);
+  if (burst > TOKEN_BUCKET_MAX_BURST)
+    burst = TOKEN_BUCKET_MAX_BURST;
+
+  bucket->rate = rate_per_sec_to_rate_per_step(rate);
+  bucket->burst = burst;
+  bucket->read_bucket = MIN(bucket->read_bucket, (int32_t)burst);
+  bucket->write_bucket = MIN(bucket->write_bucket, (int32_t)burst);
+}
+
+/**
+ * Reset <b>bucket</b> to be full, as of timestamp <b>now_ts</b>.
+ */
+void
+token_bucket_reset(token_bucket_t *bucket,
+                   uint32_t now_ts)
+{
+  bucket->read_bucket = bucket->burst;
+  bucket->write_bucket = bucket->burst;
+  bucket->last_refilled_at_ts = now_ts;
+}
+
+/* Helper: see token_bucket_refill */
+static int
+refill_single_bucket(int32_t *bucketptr,
+                     const uint32_t rate,
+                     const int32_t burst,
+                     const uint32_t elapsed_steps)
+{
+  const int was_empty = (*bucketptr <= 0);
+  /* The casts here prevent an underflow.
+   *
+   * Note that even if the bucket value is negative, subtracting it from
+   * "burst" will still produce a correct result.  If this result is
+   * ridiculously high, then the "elapsed_steps > gap / rate" check below
+   * should catch it. */
+  const size_t gap = ((size_t)burst) - ((size_t)*bucketptr);
+
+  if (elapsed_steps > gap / rate) {
+    *bucketptr = burst;
+  } else {
+    *bucketptr += rate * elapsed_steps;
+  }
+
+  return was_empty && *bucketptr > 0;
+}
+
+/**
+ * Refill <b>bucket</b> as appropriate, given that the current timestamp
+ * is <b>now_ts</b>.
+ *
+ * Return a bitmask containing TB_READ iff read bucket was empty and became
+ * nonempty, and TB_WRITE iff the write bucket was empty and became nonempty.
+ */
+int
+token_bucket_refill(token_bucket_t *bucket,
+                    uint32_t now_ts)
+{
+  const uint32_t elapsed_ticks = (now_ts - bucket->last_refilled_at_ts);
+  if (elapsed_ticks > UINT32_MAX-(300*1000)) {
+    /* Either about 48 days have passed since the last refill, or the
+     * monotonic clock has somehow moved backwards. (We're looking at you,
+     * Windows.).  We accept up to a 5 minute jump backwards as
+     * "unremarkable".
+     */
+    return 0;
+  }
+  const uint32_t elapsed_steps = elapsed_ticks / TICKS_PER_STEP;
+
+  if (!elapsed_steps) {
+    /* Note that if less than one whole step elapsed, we don't advance the
+     * time in last_refilled_at_ts. That's intentional: we want to make sure
+     * that we add some bytes to it eventually. */
+    return 0;
+  }
+
+  int flags = 0;
+  if (refill_single_bucket(&bucket->read_bucket,
+                           bucket->rate, bucket->burst, elapsed_steps))
+    flags |= TB_READ;
+  if (refill_single_bucket(&bucket->write_bucket,
+                           bucket->rate, bucket->burst, elapsed_steps))
+    flags |= TB_WRITE;
+
+  bucket->last_refilled_at_ts = now_ts;
+  return flags;
+}
+
+static int
+decrement_single_bucket(int32_t *bucketptr,
+                        ssize_t n)
+{
+  if (BUG(n < 0))
+    return 0;
+  const int becomes_empty = *bucketptr > 0 && n >= *bucketptr;
+  *bucketptr -= n;
+  return becomes_empty;
+}
+
+/**
+ * Decrement the read token bucket in <b>bucket</b> by <b>n</b> bytes.
+ *
+ * Return true if the bucket was nonempty and became empty; return false
+ * otherwise.
+ */
+int
+token_bucket_dec_read(token_bucket_t *bucket,
+                      ssize_t n)
+{
+  return decrement_single_bucket(&bucket->read_bucket, n);
+}
+
+/**
+ * Decrement the write token bucket in <b>bucket</b> by <b>n</b> bytes.
+ *
+ * Return true if the bucket was nonempty and became empty; return false
+ * otherwise.
+ */
+int
+token_bucket_dec_write(token_bucket_t *bucket,
+                       ssize_t n)
+{
+  return decrement_single_bucket(&bucket->write_bucket, n);
+}
+
+/**
+ * As token_bucket_dec_read and token_bucket_dec_write, in a single operation.
+ */
+void
+token_bucket_dec(token_bucket_t *bucket,
+                 ssize_t n_read, ssize_t n_written)
+{
+  token_bucket_dec_read(bucket, n_read);
+  token_bucket_dec_read(bucket, n_written);
+}
+

+ 75 - 0
src/common/token_bucket.h

@@ -0,0 +1,75 @@
+/* Copyright (c) 2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file token_bucket.h
+ * \brief Headers for token_bucket.c
+ **/
+
+#ifndef TOR_TOKEN_BUCKET_H
+#define TOR_TOKEN_BUCKET_H
+
+#include "torint.h"
+
+typedef struct token_bucket_t {
+  uint32_t rate;
+  int32_t burst;
+  int32_t read_bucket;
+  int32_t write_bucket;
+  uint32_t last_refilled_at_ts;
+} token_bucket_t;
+
+#define TOKEN_BUCKET_MAX_BURST INT32_MAX
+
+void token_bucket_init(token_bucket_t *bucket,
+                       uint32_t rate,
+                       uint32_t burst,
+                       uint32_t now_ts);
+
+void token_bucket_adjust(token_bucket_t *bucket,
+                         uint32_t rate, uint32_t burst);
+
+void token_bucket_reset(token_bucket_t *bucket,
+                        uint32_t now_ts);
+
+#define TB_READ 1
+#define TB_WRITE 2
+
+int token_bucket_refill(token_bucket_t *bucket,
+                        uint32_t now_ts);
+
+int token_bucket_dec_read(token_bucket_t *bucket,
+                          ssize_t n);
+int token_bucket_dec_write(token_bucket_t *bucket,
+                           ssize_t n);
+
+void token_bucket_dec(token_bucket_t *bucket,
+                      ssize_t n_read, ssize_t n_written);
+
+static inline size_t token_bucket_get_read(const token_bucket_t *bucket);
+static inline size_t
+token_bucket_get_read(const token_bucket_t *bucket)
+{
+  const ssize_t b = bucket->read_bucket;
+  return b >= 0 ? b : 0;
+}
+
+static inline size_t token_bucket_get_write(const token_bucket_t *bucket);
+static inline size_t
+token_bucket_get_write(const token_bucket_t *bucket)
+{
+  const ssize_t b = bucket->write_bucket;
+  return b >= 0 ? b : 0;
+}
+
+#ifdef TOKEN_BUCKET_PRIVATE
+
+/* To avoid making the rates too small, we consider units of "steps",
+ * where a "step" is defined as this many timestamp ticks.  Keep this
+ * a power of two if you can. */
+#define TICKS_PER_STEP 16
+
+#endif
+
+#endif /* TOR_TOKEN_BUCKET_H */
+

+ 7 - 8
src/or/config.c

@@ -337,7 +337,7 @@ static config_var_t option_vars_[] = {
   V(DownloadExtraInfo,           BOOL,     "0"),
   V(TestingEnableConnBwEvent,    BOOL,     "0"),
   V(TestingEnableCellStatsEvent, BOOL,     "0"),
-  V(TestingEnableTbEmptyEvent,   BOOL,     "0"),
+  OBSOLETE("TestingEnableTbEmptyEvent"),
   V(EnforceDistinctSubnets,      BOOL,     "1"),
   V(EntryNodes,                  ROUTERSET,   NULL),
   V(EntryStatistics,             BOOL,     "0"),
@@ -707,7 +707,6 @@ static const config_var_t testing_tor_network_defaults[] = {
   V(TestingDirConnectionMaxStall, INTERVAL, "30 seconds"),
   V(TestingEnableConnBwEvent,    BOOL,     "1"),
   V(TestingEnableCellStatsEvent, BOOL,     "1"),
-  V(TestingEnableTbEmptyEvent,   BOOL,     "1"),
   VAR("___UsingTestNetworkDefaults", BOOL, UsingTestNetworkDefaults_, "1"),
   V(RendPostPeriod,              INTERVAL, "2 minutes"),
 
@@ -2189,6 +2188,12 @@ options_act(const or_options_t *old_options)
         options->PerConnBWBurst != old_options->PerConnBWBurst)
       connection_or_update_token_buckets(get_connection_array(), options);
 
+    if (options->BandwidthRate != old_options->BandwidthRate ||
+        options->BandwidthBurst != old_options->BandwidthBurst ||
+        options->BandwidthRate != old_options->BandwidthRate ||
+        options->RelayBandwidthBurst != old_options->RelayBandwidthBurst)
+      connection_bucket_adjust(options);
+
     if (options->MainloopStats != old_options->MainloopStats) {
       reset_main_loop_counters();
     }
@@ -4459,12 +4464,6 @@ options_validate(or_options_t *old_options, or_options_t *options,
            "Tor networks!");
   }
 
-  if (options->TestingEnableTbEmptyEvent &&
-      !options->TestingTorNetwork && !options->UsingTestNetworkDefaults_) {
-    REJECT("TestingEnableTbEmptyEvent may only be changed in testing "
-           "Tor networks!");
-  }
-
   if (options->TestingTorNetwork) {
     log_warn(LD_CONFIG, "TestingTorNetwork is set. This will make your node "
                         "almost unusable in the public Tor network, and is "

+ 78 - 275
src/or/connection.c

@@ -119,8 +119,6 @@ static connection_t *connection_listener_new(
 static void connection_init(time_t now, connection_t *conn, int type,
                             int socket_family);
 static int connection_handle_listener_read(connection_t *conn, int new_type);
-static int connection_bucket_should_increase(int bucket,
-                                             or_connection_t *conn);
 static int connection_finished_flushing(connection_t *conn);
 static int connection_flushed_some(connection_t *conn);
 static int connection_finished_connecting(connection_t *conn);
@@ -2848,7 +2846,7 @@ connection_counts_as_relayed_traffic(connection_t *conn, time_t now)
  * non-negative) provides an upper limit for our answer. */
 static ssize_t
 connection_bucket_round_robin(int base, int priority,
-                              ssize_t global_bucket, ssize_t conn_bucket)
+                              ssize_t global_bucket_val, ssize_t conn_bucket)
 {
   ssize_t at_most;
   ssize_t num_bytes_high = (priority ? 32 : 16) * base;
@@ -2857,15 +2855,15 @@ connection_bucket_round_robin(int base, int priority,
   /* Do a rudimentary round-robin so one circuit can't hog a connection.
    * Pick at most 32 cells, at least 4 cells if possible, and if we're in
    * the middle pick 1/8 of the available bandwidth. */
-  at_most = global_bucket / 8;
+  at_most = global_bucket_val / 8;
   at_most -= (at_most % base); /* round down */
   if (at_most > num_bytes_high) /* 16 KB, or 8 KB for low-priority */
     at_most = num_bytes_high;
   else if (at_most < num_bytes_low) /* 2 KB, or 1 KB for low-priority */
     at_most = num_bytes_low;
 
-  if (at_most > global_bucket)
-    at_most = global_bucket;
+  if (at_most > global_bucket_val)
+    at_most = global_bucket_val;
 
   if (conn_bucket >= 0 && at_most > conn_bucket)
     at_most = conn_bucket;
@@ -2881,13 +2879,13 @@ connection_bucket_read_limit(connection_t *conn, time_t now)
 {
   int base = RELAY_PAYLOAD_SIZE;
   int priority = conn->type != CONN_TYPE_DIR;
-  int conn_bucket = -1;
-  int global_bucket = global_read_bucket;
+  ssize_t conn_bucket = -1;
+  size_t global_bucket_val = token_bucket_get_read(&global_bucket);
 
   if (connection_speaks_cells(conn)) {
     or_connection_t *or_conn = TO_OR_CONN(conn);
     if (conn->state == OR_CONN_STATE_OPEN)
-      conn_bucket = or_conn->read_bucket;
+      conn_bucket = token_bucket_get_read(&or_conn->bucket);
     base = get_cell_network_size(or_conn->wide_circ_ids);
   }
 
@@ -2896,12 +2894,13 @@ connection_bucket_read_limit(connection_t *conn, time_t now)
     return conn_bucket>=0 ? conn_bucket : 1<<14;
   }
 
-  if (connection_counts_as_relayed_traffic(conn, now) &&
-      global_relayed_read_bucket <= global_read_bucket)
-    global_bucket = global_relayed_read_bucket;
+  if (connection_counts_as_relayed_traffic(conn, now)) {
+    size_t relayed = token_bucket_get_read(&global_relayed_bucket);
+    global_bucket_val = MIN(global_bucket_val, relayed);
+  }
 
   return connection_bucket_round_robin(base, priority,
-                                       global_bucket, conn_bucket);
+                                       global_bucket_val, conn_bucket);
 }
 
 /** How many bytes at most can we write onto this connection? */
@@ -2910,8 +2909,8 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
 {
   int base = RELAY_PAYLOAD_SIZE;
   int priority = conn->type != CONN_TYPE_DIR;
-  int conn_bucket = (int)conn->outbuf_flushlen;
-  int global_bucket = global_write_bucket;
+  size_t conn_bucket = conn->outbuf_flushlen;
+  size_t global_bucket_val = token_bucket_get_write(&global_bucket);
 
   if (!connection_is_rate_limited(conn)) {
     /* be willing to write to local conns even if our buckets are empty */
@@ -2919,22 +2918,20 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
   }
 
   if (connection_speaks_cells(conn)) {
-    /* use the per-conn write limit if it's lower, but if it's less
-     * than zero just use zero */
+    /* use the per-conn write limit if it's lower */
     or_connection_t *or_conn = TO_OR_CONN(conn);
     if (conn->state == OR_CONN_STATE_OPEN)
-      if (or_conn->write_bucket < conn_bucket)
-        conn_bucket = or_conn->write_bucket >= 0 ?
-                        or_conn->write_bucket : 0;
+      conn_bucket = MIN(conn_bucket, token_bucket_get_write(&or_conn->bucket));
     base = get_cell_network_size(or_conn->wide_circ_ids);
   }
 
-  if (connection_counts_as_relayed_traffic(conn, now) &&
-      global_relayed_write_bucket <= global_write_bucket)
-    global_bucket = global_relayed_write_bucket;
+  if (connection_counts_as_relayed_traffic(conn, now)) {
+    size_t relayed = token_bucket_get_write(&global_relayed_bucket);
+    global_bucket_val = MIN(global_bucket_val, relayed);
+  }
 
   return connection_bucket_round_robin(base, priority,
-                                       global_bucket, conn_bucket);
+                                       global_bucket_val, conn_bucket);
 }
 
 /** Return 1 if the global write buckets are low enough that we
@@ -2959,15 +2956,15 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
 int
 global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
 {
-  int smaller_bucket = global_write_bucket < global_relayed_write_bucket ?
-                       global_write_bucket : global_relayed_write_bucket;
+  size_t smaller_bucket = MIN(token_bucket_get_write(&global_bucket),
+                              token_bucket_get_write(&global_relayed_bucket));
   if (authdir_mode(get_options()) && priority>1)
     return 0; /* there's always room to answer v2 if we're an auth dir */
 
   if (!connection_is_rate_limited(conn))
     return 0; /* local conns don't get limited */
 
-  if (smaller_bucket < (int)attempt)
+  if (smaller_bucket < attempt)
     return 1; /* not enough space no matter the priority */
 
   if (write_buckets_empty_last_second)
@@ -2976,10 +2973,10 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
   if (priority == 1) { /* old-style v1 query */
     /* Could we handle *two* of these requests within the next two seconds? */
     const or_options_t *options = get_options();
-    int64_t can_write = (int64_t)smaller_bucket
+    size_t can_write = smaller_bucket
       + 2*(options->RelayBandwidthRate ? options->RelayBandwidthRate :
                                          options->BandwidthRate);
-    if (can_write < 2*(int64_t)attempt)
+    if (can_write < 2*attempt)
       return 1;
   } else { /* v2 query */
     /* no further constraints yet */
@@ -3019,57 +3016,6 @@ record_num_bytes_transferred_impl(connection_t *conn,
     rep_hist_note_exit_bytes(conn->port, num_written, num_read);
 }
 
-/** Helper: convert given <b>tvnow</b> time value to milliseconds since
- * midnight. */
-static uint32_t
-msec_since_midnight(const struct timeval *tvnow)
-{
-  return (uint32_t)(((tvnow->tv_sec % 86400L) * 1000L) +
-         ((uint32_t)tvnow->tv_usec / (uint32_t)1000L));
-}
-
-/** Helper: return the time in milliseconds since <b>last_empty_time</b>
- * when a bucket ran empty that previously had <b>tokens_before</b> tokens
- * now has <b>tokens_after</b> tokens after refilling at timestamp
- * <b>tvnow</b>, capped at <b>milliseconds_elapsed</b> milliseconds since
- * last refilling that bucket.  Return 0 if the bucket has not been empty
- * since the last refill or has not been refilled. */
-uint32_t
-bucket_millis_empty(int tokens_before, uint32_t last_empty_time,
-                    int tokens_after, int milliseconds_elapsed,
-                    const struct timeval *tvnow)
-{
-  uint32_t result = 0, refilled;
-  if (tokens_before <= 0 && tokens_after > tokens_before) {
-    refilled = msec_since_midnight(tvnow);
-    result = (uint32_t)((refilled + 86400L * 1000L - last_empty_time) %
-             (86400L * 1000L));
-    if (result > (uint32_t)milliseconds_elapsed)
-      result = (uint32_t)milliseconds_elapsed;
-  }
-  return result;
-}
-
-/** Check if a bucket which had <b>tokens_before</b> tokens and which got
- * <b>tokens_removed</b> tokens removed at timestamp <b>tvnow</b> has run
- * out of tokens, and if so, note the milliseconds since midnight in
- * <b>timestamp_var</b> for the next TB_EMPTY event. */
-void
-connection_buckets_note_empty_ts(uint32_t *timestamp_var,
-                                 int tokens_before, size_t tokens_removed,
-                                 const struct timeval *tvnow)
-{
-  if (tokens_before > 0 && (uint32_t)tokens_before <= tokens_removed)
-    *timestamp_var = msec_since_midnight(tvnow);
-}
-
-/** Last time at which the global or relay buckets were emptied in msec
- * since midnight. */
-static uint32_t global_relayed_read_emptied = 0,
-                global_relayed_write_emptied = 0,
-                global_read_emptied = 0,
-                global_write_emptied = 0;
-
 /** We just read <b>num_read</b> and wrote <b>num_written</b> bytes
  * onto <b>conn</b>. Decrement buckets appropriately. */
 static void
@@ -3094,39 +3040,13 @@ connection_buckets_decrement(connection_t *conn, time_t now,
   if (!connection_is_rate_limited(conn))
     return; /* local IPs are free */
 
-  /* If one or more of our token buckets ran dry just now, note the
-   * timestamp for TB_EMPTY events. */
-  if (get_options()->TestingEnableTbEmptyEvent) {
-    struct timeval tvnow;
-    tor_gettimeofday_cached(&tvnow);
-    if (connection_counts_as_relayed_traffic(conn, now)) {
-      connection_buckets_note_empty_ts(&global_relayed_read_emptied,
-                         global_relayed_read_bucket, num_read, &tvnow);
-      connection_buckets_note_empty_ts(&global_relayed_write_emptied,
-                         global_relayed_write_bucket, num_written, &tvnow);
-    }
-    connection_buckets_note_empty_ts(&global_read_emptied,
-                       global_read_bucket, num_read, &tvnow);
-    connection_buckets_note_empty_ts(&global_write_emptied,
-                       global_write_bucket, num_written, &tvnow);
-    if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
-      or_connection_t *or_conn = TO_OR_CONN(conn);
-      connection_buckets_note_empty_ts(&or_conn->read_emptied_time,
-                         or_conn->read_bucket, num_read, &tvnow);
-      connection_buckets_note_empty_ts(&or_conn->write_emptied_time,
-                         or_conn->write_bucket, num_written, &tvnow);
-    }
-  }
-
   if (connection_counts_as_relayed_traffic(conn, now)) {
-    global_relayed_read_bucket -= (int)num_read;
-    global_relayed_write_bucket -= (int)num_written;
+    token_bucket_dec(&global_relayed_bucket, num_read, num_written);
   }
-  global_read_bucket -= (int)num_read;
-  global_write_bucket -= (int)num_written;
+  token_bucket_dec(&global_bucket, num_read, num_written);
   if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
-    TO_OR_CONN(conn)->read_bucket -= (int)num_read;
-    TO_OR_CONN(conn)->write_bucket -= (int)num_written;
+    or_connection_t *or_conn = TO_OR_CONN(conn);
+    token_bucket_dec(&or_conn->bucket, num_read, num_written);
   }
 }
 
@@ -3140,14 +3060,14 @@ connection_consider_empty_read_buckets(connection_t *conn)
   if (!connection_is_rate_limited(conn))
     return; /* Always okay. */
 
-  if (global_read_bucket <= 0) {
+  if (token_bucket_get_read(&global_bucket) <= 0) {
     reason = "global read bucket exhausted. Pausing.";
   } else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
-             global_relayed_read_bucket <= 0) {
+             token_bucket_get_read(&global_relayed_bucket) <= 0) {
     reason = "global relayed read bucket exhausted. Pausing.";
   } else if (connection_speaks_cells(conn) &&
              conn->state == OR_CONN_STATE_OPEN &&
-             TO_OR_CONN(conn)->read_bucket <= 0) {
+             token_bucket_get_read(&TO_OR_CONN(conn)->bucket) <= 0) {
     reason = "connection read bucket exhausted. Pausing.";
   } else
     return; /* all good, no need to stop it */
@@ -3167,14 +3087,14 @@ connection_consider_empty_write_buckets(connection_t *conn)
   if (!connection_is_rate_limited(conn))
     return; /* Always okay. */
 
-  if (global_write_bucket <= 0) {
+  if (token_bucket_get_write(&global_bucket) <= 0) {
     reason = "global write bucket exhausted. Pausing.";
   } else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
-             global_relayed_write_bucket <= 0) {
+             token_bucket_get_write(&global_relayed_bucket) <= 0) {
     reason = "global relayed write bucket exhausted. Pausing.";
   } else if (connection_speaks_cells(conn) &&
              conn->state == OR_CONN_STATE_OPEN &&
-             TO_OR_CONN(conn)->write_bucket <= 0) {
+             token_bucket_get_write(&TO_OR_CONN(conn)->bucket) <= 0) {
     reason = "connection write bucket exhausted. Pausing.";
   } else
     return; /* all good, no need to stop it */
@@ -3184,180 +3104,79 @@ connection_consider_empty_write_buckets(connection_t *conn)
   connection_stop_writing(conn);
 }
 
-/** Initialize the global read bucket to options-\>BandwidthBurst. */
+/** Initialize the global buckets to the values configured in the
+ * options */
 void
 connection_bucket_init(void)
 {
   const or_options_t *options = get_options();
-  /* start it at max traffic */
-  global_read_bucket = (int)options->BandwidthBurst;
-  global_write_bucket = (int)options->BandwidthBurst;
+  const uint32_t now_ts = monotime_coarse_get_stamp();
+  token_bucket_init(&global_bucket,
+                    (int32_t)options->BandwidthRate,
+                    (int32_t)options->BandwidthBurst,
+                    now_ts);
   if (options->RelayBandwidthRate) {
-    global_relayed_read_bucket = (int)options->RelayBandwidthBurst;
-    global_relayed_write_bucket = (int)options->RelayBandwidthBurst;
+    token_bucket_init(&global_relayed_bucket,
+                      (int32_t)options->RelayBandwidthRate,
+                      (int32_t)options->RelayBandwidthBurst,
+                      now_ts);
   } else {
-    global_relayed_read_bucket = (int)options->BandwidthBurst;
-    global_relayed_write_bucket = (int)options->BandwidthBurst;
+    token_bucket_init(&global_relayed_bucket,
+                      (int32_t)options->BandwidthRate,
+                      (int32_t)options->BandwidthBurst,
+                      now_ts);
   }
 }
 
-/** Refill a single <b>bucket</b> called <b>name</b> with bandwidth rate per
- * second <b>rate</b> and bandwidth burst <b>burst</b>, assuming that
- * <b>milliseconds_elapsed</b> milliseconds have passed since the last
- * call. */
-static void
-connection_bucket_refill_helper(int *bucket, int rate, int burst,
-                                int milliseconds_elapsed,
-                                const char *name)
+/** Update the global connection bucket settings to a new value. */
+void
+connection_bucket_adjust(const or_options_t *options)
 {
-  int starting_bucket = *bucket;
-  if (starting_bucket < burst && milliseconds_elapsed > 0) {
-    int64_t incr = (((int64_t)rate) * milliseconds_elapsed) / 1000;
-    if ((burst - starting_bucket) < incr) {
-      *bucket = burst;  /* We would overflow the bucket; just set it to
-                         * the maximum. */
-    } else {
-      *bucket += (int)incr;
-      if (*bucket > burst || *bucket < starting_bucket) {
-        /* If we overflow the burst, or underflow our starting bucket,
-         * cap the bucket value to burst. */
-        /* XXXX this might be redundant now, but it doesn't show up
-         * in profiles.  Remove it after analysis. */
-        *bucket = burst;
-      }
-    }
-    log_debug(LD_NET,"%s now %d.", name, *bucket);
+  token_bucket_adjust(&global_bucket,
+                      (int32_t)options->BandwidthRate,
+                      (int32_t)options->BandwidthBurst);
+  if (options->RelayBandwidthRate) {
+    token_bucket_adjust(&global_relayed_bucket,
+                        (int32_t)options->RelayBandwidthRate,
+                        (int32_t)options->RelayBandwidthBurst);
+  } else {
+    token_bucket_adjust(&global_relayed_bucket,
+                        (int32_t)options->BandwidthRate,
+                        (int32_t)options->BandwidthBurst);
   }
 }
 
 /** Time has passed; increment buckets appropriately. */
 void
-connection_bucket_refill(int milliseconds_elapsed, time_t now)
+connection_bucket_refill(time_t now, uint32_t now_ts)
 {
-  const or_options_t *options = get_options();
   smartlist_t *conns = get_connection_array();
-  int bandwidthrate, bandwidthburst, relayrate, relayburst;
-
-  int prev_global_read = global_read_bucket;
-  int prev_global_write = global_write_bucket;
-  int prev_relay_read = global_relayed_read_bucket;
-  int prev_relay_write = global_relayed_write_bucket;
-  struct timeval tvnow; /*< Only used if TB_EMPTY events are enabled. */
-
-  bandwidthrate = (int)options->BandwidthRate;
-  bandwidthburst = (int)options->BandwidthBurst;
-
-  if (options->RelayBandwidthRate) {
-    relayrate = (int)options->RelayBandwidthRate;
-    relayburst = (int)options->RelayBandwidthBurst;
-  } else {
-    relayrate = bandwidthrate;
-    relayburst = bandwidthburst;
-  }
-
-  tor_assert(milliseconds_elapsed >= 0);
 
   write_buckets_empty_last_second =
-    global_relayed_write_bucket <= 0 || global_write_bucket <= 0;
+    token_bucket_get_write(&global_bucket) <= 0 ||
+    token_bucket_get_write(&global_relayed_bucket) <= 0;
 
   /* refill the global buckets */
-  connection_bucket_refill_helper(&global_read_bucket,
-                                  bandwidthrate, bandwidthburst,
-                                  milliseconds_elapsed,
-                                  "global_read_bucket");
-  connection_bucket_refill_helper(&global_write_bucket,
-                                  bandwidthrate, bandwidthburst,
-                                  milliseconds_elapsed,
-                                  "global_write_bucket");
-  connection_bucket_refill_helper(&global_relayed_read_bucket,
-                                  relayrate, relayburst,
-                                  milliseconds_elapsed,
-                                  "global_relayed_read_bucket");
-  connection_bucket_refill_helper(&global_relayed_write_bucket,
-                                  relayrate, relayburst,
-                                  milliseconds_elapsed,
-                                  "global_relayed_write_bucket");
-
-  /* If buckets were empty before and have now been refilled, tell any
-   * interested controllers. */
-  if (get_options()->TestingEnableTbEmptyEvent) {
-    uint32_t global_read_empty_time, global_write_empty_time,
-             relay_read_empty_time, relay_write_empty_time;
-    tor_gettimeofday_cached(&tvnow);
-    global_read_empty_time = bucket_millis_empty(prev_global_read,
-                             global_read_emptied, global_read_bucket,
-                             milliseconds_elapsed, &tvnow);
-    global_write_empty_time = bucket_millis_empty(prev_global_write,
-                              global_write_emptied, global_write_bucket,
-                              milliseconds_elapsed, &tvnow);
-    control_event_tb_empty("GLOBAL", global_read_empty_time,
-                           global_write_empty_time, milliseconds_elapsed);
-    relay_read_empty_time = bucket_millis_empty(prev_relay_read,
-                            global_relayed_read_emptied,
-                            global_relayed_read_bucket,
-                            milliseconds_elapsed, &tvnow);
-    relay_write_empty_time = bucket_millis_empty(prev_relay_write,
-                             global_relayed_write_emptied,
-                             global_relayed_write_bucket,
-                             milliseconds_elapsed, &tvnow);
-    control_event_tb_empty("RELAY", relay_read_empty_time,
-                           relay_write_empty_time, milliseconds_elapsed);
-  }
+  token_bucket_refill(&global_bucket, now_ts);
+  token_bucket_refill(&global_relayed_bucket, now_ts);
 
   /* refill the per-connection buckets */
   SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) {
     if (connection_speaks_cells(conn)) {
       or_connection_t *or_conn = TO_OR_CONN(conn);
-      int orbandwidthrate = or_conn->bandwidthrate;
-      int orbandwidthburst = or_conn->bandwidthburst;
-
-      int prev_conn_read = or_conn->read_bucket;
-      int prev_conn_write = or_conn->write_bucket;
-
-      if (connection_bucket_should_increase(or_conn->read_bucket, or_conn)) {
-        connection_bucket_refill_helper(&or_conn->read_bucket,
-                                        orbandwidthrate,
-                                        orbandwidthburst,
-                                        milliseconds_elapsed,
-                                        "or_conn->read_bucket");
-      }
-      if (connection_bucket_should_increase(or_conn->write_bucket, or_conn)) {
-        connection_bucket_refill_helper(&or_conn->write_bucket,
-                                        orbandwidthrate,
-                                        orbandwidthburst,
-                                        milliseconds_elapsed,
-                                        "or_conn->write_bucket");
-      }
 
-      /* If buckets were empty before and have now been refilled, tell any
-       * interested controllers. */
-      if (get_options()->TestingEnableTbEmptyEvent) {
-        char *bucket;
-        uint32_t conn_read_empty_time, conn_write_empty_time;
-        tor_asprintf(&bucket, "ORCONN ID="U64_FORMAT,
-                     U64_PRINTF_ARG(or_conn->base_.global_identifier));
-        conn_read_empty_time = bucket_millis_empty(prev_conn_read,
-                               or_conn->read_emptied_time,
-                               or_conn->read_bucket,
-                               milliseconds_elapsed, &tvnow);
-        conn_write_empty_time = bucket_millis_empty(prev_conn_write,
-                                or_conn->write_emptied_time,
-                                or_conn->write_bucket,
-                                milliseconds_elapsed, &tvnow);
-        control_event_tb_empty(bucket, conn_read_empty_time,
-                               conn_write_empty_time,
-                               milliseconds_elapsed);
-        tor_free(bucket);
+      if (conn->state == OR_CONN_STATE_OPEN) {
+        token_bucket_refill(&or_conn->bucket, now_ts);
       }
     }
 
     if (conn->read_blocked_on_bw == 1 /* marked to turn reading back on now */
-        && global_read_bucket > 0 /* and we're allowed to read */
+        && token_bucket_get_read(&global_bucket) > 0 /* and we can read */
         && (!connection_counts_as_relayed_traffic(conn, now) ||
-            global_relayed_read_bucket > 0) /* even if we're relayed traffic */
+            token_bucket_get_read(&global_relayed_bucket) > 0)
         && (!connection_speaks_cells(conn) ||
             conn->state != OR_CONN_STATE_OPEN ||
-            TO_OR_CONN(conn)->read_bucket > 0)) {
+            token_bucket_get_read(&TO_OR_CONN(conn)->bucket) > 0)) {
         /* and either a non-cell conn or a cell conn with non-empty bucket */
       LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
                          "waking up conn (fd %d) for read", (int)conn->s));
@@ -3366,12 +3185,12 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now)
     }
 
     if (conn->write_blocked_on_bw == 1
-        && global_write_bucket > 0 /* and we're allowed to write */
+        && token_bucket_get_write(&global_bucket) > 0 /* and we can write */
         && (!connection_counts_as_relayed_traffic(conn, now) ||
-            global_relayed_write_bucket > 0) /* even if it's relayed traffic */
+            token_bucket_get_write(&global_relayed_bucket) > 0)
         && (!connection_speaks_cells(conn) ||
             conn->state != OR_CONN_STATE_OPEN ||
-            TO_OR_CONN(conn)->write_bucket > 0)) {
+            token_bucket_get_write(&TO_OR_CONN(conn)->bucket) > 0)) {
       LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
                          "waking up conn (fd %d) for write", (int)conn->s));
       conn->write_blocked_on_bw = 0;
@@ -3380,22 +3199,6 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now)
   } SMARTLIST_FOREACH_END(conn);
 }
 
-/** Is the <b>bucket</b> for connection <b>conn</b> low enough that we
- * should add another pile of tokens to it?
- */
-static int
-connection_bucket_should_increase(int bucket, or_connection_t *conn)
-{
-  tor_assert(conn);
-
-  if (conn->base_.state != OR_CONN_STATE_OPEN)
-    return 0; /* only open connections play the rate limiting game */
-  if (bucket >= conn->bandwidthburst)
-    return 0;
-
-  return 1;
-}
-
 /** Read bytes from conn-\>s and process them.
  *
  * It calls connection_buf_read_from_socket() to bring in any new bytes,

+ 3 - 8
src/or/connection.h

@@ -122,7 +122,9 @@ void connection_mark_all_noncontrol_connections(void);
 ssize_t connection_bucket_write_limit(connection_t *conn, time_t now);
 int global_write_bucket_low(connection_t *conn, size_t attempt, int priority);
 void connection_bucket_init(void);
-void connection_bucket_refill(int seconds_elapsed, time_t now);
+void connection_bucket_adjust(const or_options_t *options);
+void connection_bucket_refill(time_t now,
+                              uint32_t now_ts);
 
 int connection_handle_read(connection_t *conn);
 
@@ -272,13 +274,6 @@ void connection_check_oos(int n_socks, int failed);
 STATIC void connection_free_minimal(connection_t *conn);
 
 /* Used only by connection.c and test*.c */
-uint32_t bucket_millis_empty(int tokens_before, uint32_t last_empty_time,
-                             int tokens_after, int milliseconds_elapsed,
-                             const struct timeval *tvnow);
-void connection_buckets_note_empty_ts(uint32_t *timestamp_var,
-                                      int tokens_before,
-                                      size_t tokens_removed,
-                                      const struct timeval *tvnow);
 MOCK_DECL(STATIC int,connection_connect_sockaddr,
                                             (connection_t *conn,
                                              const struct sockaddr *sa,

+ 3 - 11
src/or/connection_or.c

@@ -793,18 +793,10 @@ connection_or_update_token_buckets_helper(or_connection_t *conn, int reset,
                                 (int)options->BandwidthBurst, 1, INT32_MAX);
   }
 
-  conn->bandwidthrate = rate;
-  conn->bandwidthburst = burst;
-  if (reset) { /* set up the token buckets to be full */
-    conn->read_bucket = conn->write_bucket = burst;
-    return;
+  token_bucket_adjust(&conn->bucket, rate, burst);
+  if (reset) {
+    token_bucket_reset(&conn->bucket, monotime_coarse_get_stamp());
   }
-  /* If the new token bucket is smaller, take out the extra tokens.
-   * (If it's larger, don't -- the buckets can grow to reach the cap.) */
-  if (conn->read_bucket > burst)
-    conn->read_bucket = burst;
-  if (conn->write_bucket > burst)
-    conn->write_bucket = burst;
 }
 
 /** Either our set of relays or our per-conn rate limits have changed.

+ 0 - 23
src/or/control.c

@@ -1214,7 +1214,6 @@ static const struct control_event_t control_event_table[] = {
   { EVENT_CONF_CHANGED, "CONF_CHANGED"},
   { EVENT_CONN_BW, "CONN_BW" },
   { EVENT_CELL_STATS, "CELL_STATS" },
-  { EVENT_TB_EMPTY, "TB_EMPTY" },
   { EVENT_CIRC_BANDWIDTH_USED, "CIRC_BW" },
   { EVENT_TRANSPORT_LAUNCHED, "TRANSPORT_LAUNCHED" },
   { EVENT_HS_DESC, "HS_DESC" },
@@ -6077,28 +6076,6 @@ control_event_circuit_cell_stats(void)
   return 0;
 }
 
-/** Tokens in <b>bucket</b> have been refilled: the read bucket was empty
- * for <b>read_empty_time</b> millis, the write bucket was empty for
- * <b>write_empty_time</b> millis, and buckets were last refilled
- * <b>milliseconds_elapsed</b> millis ago.  Only emit TB_EMPTY event if
- * either read or write bucket have been empty before. */
-int
-control_event_tb_empty(const char *bucket, uint32_t read_empty_time,
-                       uint32_t write_empty_time,
-                       int milliseconds_elapsed)
-{
-  if (get_options()->TestingEnableTbEmptyEvent &&
-      EVENT_IS_INTERESTING(EVENT_TB_EMPTY) &&
-      (read_empty_time > 0 || write_empty_time > 0)) {
-    send_control_event(EVENT_TB_EMPTY,
-                       "650 TB_EMPTY %s READ=%d WRITTEN=%d "
-                       "LAST=%d\r\n",
-                       bucket, read_empty_time, write_empty_time,
-                       milliseconds_elapsed);
-  }
-  return 0;
-}
-
 /* about 5 minutes worth. */
 #define N_BW_EVENTS_TO_CACHE 300
 /* Index into cached_bw_events to next write. */

+ 1 - 4
src/or/control.h

@@ -59,9 +59,6 @@ int control_event_circ_bandwidth_used(void);
 int control_event_conn_bandwidth(connection_t *conn);
 int control_event_conn_bandwidth_used(void);
 int control_event_circuit_cell_stats(void);
-int control_event_tb_empty(const char *bucket, uint32_t read_empty_time,
-                           uint32_t write_empty_time,
-                           int milliseconds_elapsed);
 void control_event_logmsg(int severity, uint32_t domain, const char *msg);
 int control_event_descriptors_changed(smartlist_t *routers);
 int control_event_address_mapped(const char *from, const char *to,
@@ -194,7 +191,7 @@ void control_free_all(void);
 #define EVENT_CONF_CHANGED            0x0019
 #define EVENT_CONN_BW                 0x001A
 #define EVENT_CELL_STATS              0x001B
-#define EVENT_TB_EMPTY                0x001C
+/* UNUSED :                           0x001C */
 #define EVENT_CIRC_BANDWIDTH_USED     0x001D
 #define EVENT_TRANSPORT_LAUNCHED      0x0020
 #define EVENT_HS_DESC                 0x0021

+ 25 - 21
src/or/main.c

@@ -152,19 +152,19 @@ static void shutdown_did_not_work_callback(evutil_socket_t fd, short event,
                                            void *arg) ATTR_NORETURN;
 
 /********* START VARIABLES **********/
-int global_read_bucket; /**< Max number of bytes I can read this second. */
-int global_write_bucket; /**< Max number of bytes I can write this second. */
-
-/** Max number of relayed (bandwidth class 1) bytes I can read this second. */
-int global_relayed_read_bucket;
-/** Max number of relayed (bandwidth class 1) bytes I can write this second. */
-int global_relayed_write_bucket;
-/** What was the read bucket before the last second_elapsed_callback() call?
- * (used to determine how many bytes we've read). */
-static int stats_prev_global_read_bucket;
+
+/* Token bucket for all traffic. */
+token_bucket_t global_bucket;
+
+/* Token bucket for relayed traffic. */
+token_bucket_t global_relayed_bucket;
+
+/** What was the read/write bucket before the last second_elapsed_callback()
+ * call?  (used to determine how many bytes we've read). */
+static size_t stats_prev_global_read_bucket;
 /** What was the write bucket before the last second_elapsed_callback() call?
  * (used to determine how many bytes we've written). */
-static int stats_prev_global_write_bucket;
+static size_t stats_prev_global_write_bucket;
 
 /* DOCDOC stats_prev_n_read */
 static uint64_t stats_prev_n_read = 0;
@@ -2389,19 +2389,23 @@ refill_callback(periodic_timer_t *timer, void *arg)
                                 refill_timer_current_millisecond.tv_sec);
   }
 
-  bytes_written = stats_prev_global_write_bucket - global_write_bucket;
-  bytes_read = stats_prev_global_read_bucket - global_read_bucket;
+  bytes_written = stats_prev_global_write_bucket -
+    token_bucket_get_write(&global_bucket);
+  bytes_read = stats_prev_global_read_bucket -
+    token_bucket_get_read(&global_bucket);
 
   stats_n_bytes_read += bytes_read;
   stats_n_bytes_written += bytes_written;
   if (accounting_is_enabled(options) && milliseconds_elapsed >= 0)
     accounting_add_bytes(bytes_read, bytes_written, seconds_rolled_over);
 
-  if (milliseconds_elapsed > 0)
-    connection_bucket_refill(milliseconds_elapsed, (time_t)now.tv_sec);
+  if (milliseconds_elapsed > 0) {
+    connection_bucket_refill((time_t)now.tv_sec,
+                             monotime_coarse_get_stamp());
+  }
 
-  stats_prev_global_read_bucket = global_read_bucket;
-  stats_prev_global_write_bucket = global_write_bucket;
+  stats_prev_global_read_bucket = token_bucket_get_read(&global_bucket);
+  stats_prev_global_write_bucket = token_bucket_get_write(&global_bucket);
 
   /* remember what time it is, for next time */
   refill_timer_current_millisecond = now;
@@ -2605,8 +2609,8 @@ do_main_loop(void)
 
   /* Set up our buckets */
   connection_bucket_init();
-  stats_prev_global_read_bucket = global_read_bucket;
-  stats_prev_global_write_bucket = global_write_bucket;
+  stats_prev_global_read_bucket = token_bucket_get_read(&global_bucket);
+  stats_prev_global_write_bucket = token_bucket_get_write(&global_bucket);
 
   /* initialize the bootstrap status events to know we're starting up */
   control_event_bootstrap(BOOTSTRAP_STATUS_STARTING, 0);
@@ -3501,8 +3505,8 @@ tor_free_all(int postfork)
   periodic_timer_free(systemd_watchdog_timer);
 #endif
 
-  global_read_bucket = global_write_bucket = 0;
-  global_relayed_read_bucket = global_relayed_write_bucket = 0;
+  memset(&global_bucket, 0, sizeof(global_bucket));
+  memset(&global_relayed_bucket, 0, sizeof(global_relayed_bucket));
   stats_prev_global_read_bucket = stats_prev_global_write_bucket = 0;
   stats_prev_n_read = stats_prev_n_written = 0;
   stats_n_bytes_read = stats_n_bytes_written = 0;

+ 2 - 4
src/or/main.h

@@ -89,10 +89,8 @@ uint64_t get_main_loop_idle_count(void);
 
 extern time_t time_of_process_start;
 extern int quiet_level;
-extern int global_read_bucket;
-extern int global_write_bucket;
-extern int global_relayed_read_bucket;
-extern int global_relayed_write_bucket;
+extern token_bucket_t global_bucket;
+extern token_bucket_t global_relayed_bucket;
 
 #ifdef MAIN_PRIVATE
 STATIC void init_connection_lists(void);

+ 3 - 17
src/or/or.h

@@ -80,6 +80,7 @@
 #include "crypto_curve25519.h"
 #include "crypto_ed25519.h"
 #include "tor_queue.h"
+#include "token_bucket.h"
 #include "util_format.h"
 #include "hs_circuitmap.h"
 
@@ -1660,20 +1661,8 @@ typedef struct or_connection_t {
 
   time_t timestamp_lastempty; /**< When was the outbuf last completely empty?*/
 
-  /* bandwidth* and *_bucket only used by ORs in OPEN state: */
-  int bandwidthrate; /**< Bytes/s added to the bucket. (OPEN ORs only.) */
-  int bandwidthburst; /**< Max bucket size for this conn. (OPEN ORs only.) */
-  int read_bucket; /**< When this hits 0, stop receiving. Every second we
-                    * add 'bandwidthrate' to this, capping it at
-                    * bandwidthburst. (OPEN ORs only) */
-  int write_bucket; /**< When this hits 0, stop writing. Like read_bucket. */
-
-  /** Last emptied read token bucket in msec since midnight; only used if
-   * TB_EMPTY events are enabled. */
-  uint32_t read_emptied_time;
-  /** Last emptied write token bucket in msec since midnight; only used if
-   * TB_EMPTY events are enabled. */
-  uint32_t write_emptied_time;
+  token_bucket_t bucket; /**< Used for rate limiting when the connection is
+                          * in state CONN_OPEN. */
 
   /*
    * Count the number of bytes flushed out on this orconn, and the number of
@@ -4431,9 +4420,6 @@ typedef struct {
   /** Enable CELL_STATS events.  Only altered on testing networks. */
   int TestingEnableCellStatsEvent;
 
-  /** Enable TB_EMPTY events.  Only altered on testing networks. */
-  int TestingEnableTbEmptyEvent;
-
   /** If true, and we have GeoIP data, and we're a bridge, keep a per-country
    * count of how many client addresses have contacted us so that we can help
    * the bridge authority guess which countries have blocked access to us. */

+ 1 - 0
src/test/include.am

@@ -90,6 +90,7 @@ src_test_test_SOURCES = \
 	src/test/test_address_set.c \
 	src/test/test_bridges.c \
 	src/test/test_buffers.c \
+	src/test/test_bwmgt.c \
 	src/test/test_cell_formats.c \
 	src/test/test_cell_queue.c \
 	src/test/test_channel.c \

+ 1 - 0
src/test/test.c

@@ -813,6 +813,7 @@ struct testgroup_t testgroups[] = {
   { "address_set/", address_set_tests },
   { "bridges/", bridges_tests },
   { "buffer/", buffer_tests },
+  { "bwmgt/", bwmgt_tests },
   { "cellfmt/", cell_format_tests },
   { "cellqueue/", cell_queue_tests },
   { "channel/", channel_tests },

+ 1 - 0
src/test/test.h

@@ -187,6 +187,7 @@ extern struct testcase_t addr_tests[];
 extern struct testcase_t address_tests[];
 extern struct testcase_t address_set_tests[];
 extern struct testcase_t bridges_tests[];
+extern struct testcase_t bwmgt_tests[];
 extern struct testcase_t buffer_tests[];
 extern struct testcase_t cell_format_tests[];
 extern struct testcase_t cell_queue_tests[];

+ 205 - 0
src/test/test_bwmgt.c

@@ -0,0 +1,205 @@
+/* Copyright (c) 2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file test_bwmgt.c
+ * \brief tests for bandwidth management / token bucket functions
+ */
+
+#define TOKEN_BUCKET_PRIVATE
+
+#include "or.h"
+#include "test.h"
+
+#include "token_bucket.h"
+
+// an imaginary time, in timestamp units. Chosen so it will roll over.
+static const uint32_t START_TS = UINT32_MAX-10;
+static const int32_t KB = 1024;
+
+static void
+test_bwmgt_token_buf_init(void *arg)
+{
+  (void)arg;
+  token_bucket_t b;
+
+  token_bucket_init(&b, 16*KB, 64*KB, START_TS);
+  // Burst is correct
+  tt_uint_op(b.burst, OP_EQ, 64*KB);
+  // Rate is correct, within 1 percent.
+  {
+    uint32_t ticks_per_sec =
+      (uint32_t) monotime_msec_to_approx_coarse_stamp_units(1000);
+    uint32_t rate_per_sec = (b.rate * ticks_per_sec / TICKS_PER_STEP);
+
+    tt_uint_op(rate_per_sec, OP_GT, 16*KB-160);
+    tt_uint_op(rate_per_sec, OP_LT, 16*KB+160);
+  }
+  // Bucket starts out full:
+  tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS);
+  tt_int_op(b.read_bucket, OP_EQ, 64*KB);
+
+ done:
+  ;
+}
+
+static void
+test_bwmgt_token_buf_adjust(void *arg)
+{
+  (void)arg;
+  token_bucket_t b;
+
+  token_bucket_init(&b, 16*KB, 64*KB, START_TS);
+
+  uint32_t rate_orig = b.rate;
+  // Increasing burst
+  token_bucket_adjust(&b, 16*KB, 128*KB);
+  tt_uint_op(b.rate, OP_EQ, rate_orig);
+  tt_uint_op(b.read_bucket, OP_EQ, 64*KB);
+  tt_uint_op(b.burst, OP_EQ, 128*KB);
+
+  // Decreasing burst but staying above bucket
+  token_bucket_adjust(&b, 16*KB, 96*KB);
+  tt_uint_op(b.rate, OP_EQ, rate_orig);
+  tt_uint_op(b.read_bucket, OP_EQ, 64*KB);
+  tt_uint_op(b.burst, OP_EQ, 96*KB);
+
+  // Decreasing burst below bucket,
+  token_bucket_adjust(&b, 16*KB, 48*KB);
+  tt_uint_op(b.rate, OP_EQ, rate_orig);
+  tt_uint_op(b.read_bucket, OP_EQ, 48*KB);
+  tt_uint_op(b.burst, OP_EQ, 48*KB);
+
+  // Changing rate.
+  token_bucket_adjust(&b, 32*KB, 48*KB);
+  tt_uint_op(b.rate, OP_GE, rate_orig*2 - 10);
+  tt_uint_op(b.rate, OP_LE, rate_orig*2 + 10);
+  tt_uint_op(b.read_bucket, OP_EQ, 48*KB);
+  tt_uint_op(b.burst, OP_EQ, 48*KB);
+
+ done:
+  ;
+}
+
+static void
+test_bwmgt_token_buf_dec(void *arg)
+{
+  (void)arg;
+  token_bucket_t b;
+  token_bucket_init(&b, 16*KB, 64*KB, START_TS);
+
+  // full-to-not-full.
+  tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, KB));
+  tt_int_op(b.read_bucket, OP_EQ, 63*KB);
+
+  // Full to almost-not-full
+  tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, 63*KB - 1));
+  tt_int_op(b.read_bucket, OP_EQ, 1);
+
+  // almost-not-full to empty.
+  tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 1));
+  tt_int_op(b.read_bucket, OP_EQ, 0);
+
+  // reset bucket, try full-to-empty
+  token_bucket_init(&b, 16*KB, 64*KB, START_TS);
+  tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 64*KB));
+  tt_int_op(b.read_bucket, OP_EQ, 0);
+
+  // reset bucket, try underflow.
+  token_bucket_init(&b, 16*KB, 64*KB, START_TS);
+  tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 64*KB + 1));
+  tt_int_op(b.read_bucket, OP_EQ, -1);
+
+  // A second underflow does not make the bucket empty.
+  tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, 1000));
+  tt_int_op(b.read_bucket, OP_EQ, -1001);
+
+ done:
+  ;
+}
+
+static void
+test_bwmgt_token_buf_refill(void *arg)
+{
+  (void)arg;
+  token_bucket_t b;
+  const uint32_t SEC =
+    (uint32_t)monotime_msec_to_approx_coarse_stamp_units(1000);
+  printf("%d\n", (int)SEC);
+  token_bucket_init(&b, 16*KB, 64*KB, START_TS);
+
+  /* Make the buffer much emptier, then let one second elapse. */
+  token_bucket_dec_read(&b, 48*KB);
+  tt_int_op(b.read_bucket, OP_EQ, 16*KB);
+  tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC));
+  tt_int_op(b.read_bucket, OP_GT, 32*KB - 300);
+  tt_int_op(b.read_bucket, OP_LT, 32*KB + 300);
+
+  /* Another half second. */
+  tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2));
+  tt_int_op(b.read_bucket, OP_GT, 40*KB - 400);
+  tt_int_op(b.read_bucket, OP_LT, 40*KB + 400);
+  tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2);
+
+  /* No time: nothing happens. */
+  {
+    const uint32_t bucket_orig = b.read_bucket;
+    tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2));
+    tt_int_op(b.read_bucket, OP_EQ, bucket_orig);
+  }
+
+  /* Another 30 seconds: fill the bucket. */
+  tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*30));
+  tt_int_op(b.read_bucket, OP_EQ, b.burst);
+  tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*30);
+
+  /* Another 30 seconds: nothing happens. */
+  tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*60));
+  tt_int_op(b.read_bucket, OP_EQ, b.burst);
+  tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*60);
+
+  /* Empty the bucket, let two seconds pass, and make sure that a refill is
+   * noticed. */
+  tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.burst));
+  tt_int_op(0, OP_EQ, b.read_bucket);
+  tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*61));
+  tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*62));
+  tt_int_op(b.read_bucket, OP_GT, 32*KB-400);
+  tt_int_op(b.read_bucket, OP_LT, 32*KB+400);
+
+  /* Underflow the bucket, make sure we detect when it has tokens again. */
+  tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.read_bucket+16*KB));
+  tt_int_op(-16*KB, OP_EQ, b.read_bucket);
+  // half a second passes...
+  tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64));
+  tt_int_op(b.read_bucket, OP_GT, -8*KB-300);
+  tt_int_op(b.read_bucket, OP_LT, -8*KB+300);
+  // a second passes
+  tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*65));
+  tt_int_op(b.read_bucket, OP_GT, 8*KB-400);
+  tt_int_op(b.read_bucket, OP_LT, 8*KB+400);
+
+  // We step a second backwards, and nothing happens.
+  tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64));
+  tt_int_op(b.read_bucket, OP_GT, 8*KB-400);
+  tt_int_op(b.read_bucket, OP_LT, 8*KB+400);
+
+  // A ridiculous amount of time passes.
+  tt_int_op(0, OP_EQ, token_bucket_refill(&b, INT32_MAX));
+  tt_int_op(b.read_bucket, OP_EQ, b.burst);
+
+ done:
+  ;
+}
+
+#define BWMGT(name)                                          \
+  { #name, test_bwmgt_ ## name , 0, NULL, NULL }
+
+struct testcase_t bwmgt_tests[] = {
+  BWMGT(token_buf_init),
+  BWMGT(token_buf_adjust),
+  BWMGT(token_buf_dec),
+  BWMGT(token_buf_refill),
+  END_OF_TESTCASES
+};
+

+ 0 - 75
src/test/test_controller_events.c

@@ -11,79 +11,6 @@
 #include "control.h"
 #include "test.h"
 
-static void
-help_test_bucket_note_empty(uint32_t expected_msec_since_midnight,
-                            int tokens_before, size_t tokens_removed,
-                            uint32_t msec_since_epoch)
-{
-  uint32_t timestamp_var = 0;
-  struct timeval tvnow;
-  tvnow.tv_sec = msec_since_epoch / 1000;
-  tvnow.tv_usec = (msec_since_epoch % 1000) * 1000;
-  connection_buckets_note_empty_ts(&timestamp_var, tokens_before,
-                                   tokens_removed, &tvnow);
-  tt_int_op(expected_msec_since_midnight, OP_EQ, timestamp_var);
-
- done:
-  ;
-}
-
-static void
-test_cntev_bucket_note_empty(void *arg)
-{
-  (void)arg;
-
-  /* Two cases with nothing to note, because bucket was empty before;
-   * 86442200 == 1970-01-02 00:00:42.200000 */
-  help_test_bucket_note_empty(0, 0, 0, 86442200);
-  help_test_bucket_note_empty(0, -100, 100, 86442200);
-
-  /* Nothing to note, because bucket has not been emptied. */
-  help_test_bucket_note_empty(0, 101, 100, 86442200);
-
-  /* Bucket was emptied, note 42200 msec since midnight. */
-  help_test_bucket_note_empty(42200, 101, 101, 86442200);
-  help_test_bucket_note_empty(42200, 101, 102, 86442200);
-}
-
-static void
-test_cntev_bucket_millis_empty(void *arg)
-{
-  struct timeval tvnow;
-  (void)arg;
-
-  /* 1970-01-02 00:00:42.200000 */
-  tvnow.tv_sec = 86400 + 42;
-  tvnow.tv_usec = 200000;
-
-  /* Bucket has not been refilled. */
-  tt_int_op(0, OP_EQ, bucket_millis_empty(0, 42120, 0, 100, &tvnow));
-  tt_int_op(0, OP_EQ, bucket_millis_empty(-10, 42120, -10, 100, &tvnow));
-
-  /* Bucket was not empty. */
-  tt_int_op(0, OP_EQ, bucket_millis_empty(10, 42120, 20, 100, &tvnow));
-
-  /* Bucket has been emptied 80 msec ago and has just been refilled. */
-  tt_int_op(80, OP_EQ, bucket_millis_empty(-20, 42120, -10, 100, &tvnow));
-  tt_int_op(80, OP_EQ, bucket_millis_empty(-10, 42120, 0, 100, &tvnow));
-  tt_int_op(80, OP_EQ, bucket_millis_empty(0, 42120, 10, 100, &tvnow));
-
-  /* Bucket has been emptied 180 msec ago, last refill was 100 msec ago
-   * which was insufficient to make it positive, so cap msec at 100. */
-  tt_int_op(100, OP_EQ, bucket_millis_empty(0, 42020, 1, 100, &tvnow));
-
-  /* 1970-01-02 00:00:00:050000 */
-  tvnow.tv_sec = 86400;
-  tvnow.tv_usec = 50000;
-
-  /* Last emptied 30 msec before midnight, tvnow is 50 msec after
-   * midnight, that's 80 msec in total. */
-  tt_int_op(80, OP_EQ, bucket_millis_empty(0, 86400000 - 30, 1, 100, &tvnow));
-
- done:
-  ;
-}
-
 static void
 add_testing_cell_stats_entry(circuit_t *circ, uint8_t command,
                              unsigned int waiting_time,
@@ -395,8 +322,6 @@ test_cntev_event_mask(void *arg)
   { #name, test_cntev_ ## name, flags, 0, NULL }
 
 struct testcase_t controller_event_tests[] = {
-  TEST(bucket_note_empty, TT_FORK),
-  TEST(bucket_millis_empty, TT_FORK),
   TEST(sum_up_cell_stats, TT_FORK),
   TEST(append_cell_stats, TT_FORK),
   TEST(format_cell_stats, TT_FORK),

+ 0 - 10
src/test/test_options.c

@@ -4101,16 +4101,6 @@ test_options_validate__testing_options(void *ignored)
   tt_assert(!msg);
   tor_free(msg);
 
-  free_options_test_data(tdata);
-  tdata = get_options_test_data(TEST_OPTIONS_DEFAULT_VALUES
-                                "TestingEnableTbEmptyEvent 1\n"
-                                );
-  ret = options_validate(tdata->old_opt, tdata->opt, tdata->def_opt, 0, &msg);
-  tt_int_op(ret, OP_EQ, -1);
-  tt_str_op(msg, OP_EQ, "TestingEnableTbEmptyEvent may only be changed "
-            "in testing Tor networks!");
-  tor_free(msg);
-
   free_options_test_data(tdata);
   tdata = get_options_test_data(TEST_OPTIONS_DEFAULT_VALUES
                                 "TestingEnableTbEmptyEvent 1\n"

+ 7 - 0
src/test/test_util.c

@@ -5907,6 +5907,13 @@ test_util_monotonic_time(void *arg)
   tt_u64_op(coarse_stamp_diff, OP_GE, 120);
   tt_u64_op(coarse_stamp_diff, OP_LE, 1200);
 
+  {
+    uint64_t units = monotime_msec_to_approx_coarse_stamp_units(5000);
+    uint64_t ms = monotime_coarse_stamp_units_to_approx_msec(units);
+    tt_int_op(ms, OP_GE, 4950);
+    tt_int_op(ms, OP_LT, 5050);
+  }
+
  done:
   ;
 }