Browse Source

refactor bandwidth-control token buckets

this is a checkpoint commit; there still remain some
bugs, er, somewhere.


svn:r1269
Roger Dingledine 21 years ago
parent
commit
703b2d3cf8
3 changed files with 129 additions and 82 deletions
  1. 119 48
      src/or/connection.c
  2. 7 29
      src/or/main.c
  3. 3 5
      src/or/or.h

+ 119 - 48
src/or/connection.c

@@ -8,8 +8,6 @@
 
 extern or_options_t options; /* command-line and config-file options */
 
-extern int global_read_bucket;
-
 char *conn_type_to_string[] = {
   "",            /* 0 */
   "OP listener", /* 1 */
@@ -72,6 +70,7 @@ char *conn_state_to_string[][_CONN_TYPE_MAX+1] = {
 
 static int connection_init_accepted_conn(connection_t *conn);
 static int connection_handle_listener_read(connection_t *conn, int new_type);
+static int connection_receiver_bucket_should_increase(connection_t *conn);
 
 /**************************************************************/
 
@@ -444,6 +443,121 @@ int retry_all_connections(void) {
   return 0;
 }
 
+extern int global_read_bucket;
+
+/* how many bytes at most can we read onto this connection? */
+int connection_bucket_read_limit(connection_t *conn) {
+  int at_most;
+
+  if(options.LinkPadding) {
+    at_most = global_read_bucket;
+  } else {
+    /* do a rudimentary round-robin so one circuit can't hog a connection */
+    if(connection_speaks_cells(conn)) {
+      at_most = 32*(CELL_NETWORK_SIZE);
+    } else {
+      at_most = 32*(RELAY_PAYLOAD_SIZE);
+    }
+
+    if(at_most > global_read_bucket)
+      at_most = global_read_bucket;
+  }
+
+  if(connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN)
+    if(at_most > conn->receiver_bucket)
+      at_most = conn->receiver_bucket;
+
+  return at_most;
+}
+
+/* we just read num_read onto conn. Decrement buckets appropriately. */
+void connection_bucket_decrement(connection_t *conn, int num_read) {
+  global_read_bucket -= num_read; assert(global_read_bucket >= 0);
+  if(connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
+    conn->receiver_bucket -= num_read; assert(conn->receiver_bucket >= 0);
+  }
+  if(global_read_bucket == 0) {
+    log_fn(LOG_DEBUG,"global bucket exhausted. Pausing.");
+    conn->wants_to_read = 1;
+    connection_stop_reading(conn);
+    return;
+  }
+  if(connection_speaks_cells(conn) &&
+     conn->state == OR_CONN_STATE_OPEN &&
+     conn->receiver_bucket == 0) {
+      log_fn(LOG_DEBUG,"receiver bucket exhausted. Pausing.");
+      conn->wants_to_read = 1;
+      connection_stop_reading(conn);
+  }
+}
+
+/* keep a timeval to know when time has passed enough to refill buckets */
+static struct timeval current_time;
+
+void connection_bucket_init() {
+  tor_gettimeofday(&current_time);
+  global_read_bucket = options.BandwidthBurst; /* start it at max traffic */
+}
+
+/* some time has passed; increment buckets appropriately. */
+void connection_bucket_refill(struct timeval *now) {
+  int i, n;
+  connection_t *conn;
+  connection_t **carray;
+
+  if(now->tv_sec <= current_time.tv_sec)
+    return; /* wait until the second has rolled over */
+
+  current_time.tv_sec = now->tv_sec; /* update current_time */
+  /* (ignore usecs for now) */
+
+  /* refill the global bucket */
+  if(global_read_bucket < options.BandwidthBurst) {
+    global_read_bucket += options.BandwidthRate;
+    log_fn(LOG_DEBUG,"global_read_bucket now %d.", global_read_bucket);
+  }
+
+  /* refill the per-connection buckets */
+  get_connection_array(&carray,&n);
+  for(i=0;i<n;i++) {
+    conn = carray[i];
+
+    if(connection_receiver_bucket_should_increase(conn)) {
+      conn->receiver_bucket += conn->bandwidth;
+      //log_fn(LOG_DEBUG,"Receiver bucket %d now %d.", i, conn->receiver_bucket);
+    }
+
+    if(conn->wants_to_read == 1 /* it's marked to turn reading back on now */
+       && global_read_bucket > 0 /* and we're allowed to read */
+       && (!connection_speaks_cells(conn) ||
+           conn->state != OR_CONN_STATE_OPEN ||
+           conn->receiver_bucket > 0)) {
+      /* and either a non-cell conn or a cell conn with non-empty bucket */
+      conn->wants_to_read = 0;
+      connection_start_reading(conn);
+      if(conn->wants_to_write == 1) {
+        conn->wants_to_write = 0;
+        connection_start_writing(conn);
+      }
+    }
+  }
+}
+
+static int connection_receiver_bucket_should_increase(connection_t *conn) {
+  assert(conn);
+
+  if(!connection_speaks_cells(conn))
+    return 0; /* edge connections don't use receiver_buckets */
+  if(conn->state != OR_CONN_STATE_OPEN)
+    return 0; /* only open connections play the rate limiting game */
+
+  assert(conn->bandwidth > 0);
+  if(conn->receiver_bucket > 9*conn->bandwidth)
+    return 0;
+
+  return 1;
+}
+
 int connection_handle_read(connection_t *conn) {
 
   conn->timestamp_lastread = time(NULL);
@@ -482,27 +596,14 @@ int connection_read_to_buf(connection_t *conn) {
   int result;
   int at_most;
 
-  if(options.LinkPadding) {
-    at_most = global_read_bucket;
-  } else {
-    /* do a rudimentary round-robin so one connection can't hog a thickpipe */
-    if(connection_speaks_cells(conn)) {
-      at_most = 32*(CELL_NETWORK_SIZE);
-    } else {
-      at_most = 32*(RELAY_PAYLOAD_SIZE);
-    }
-
-    if(at_most > global_read_bucket)
-      at_most = global_read_bucket;
-  }
+  /* how many bytes are we allowed to read? */
+  at_most = connection_bucket_read_limit(conn);
 
   if(connection_speaks_cells(conn) && conn->state != OR_CONN_STATE_CONNECTING) {
     if(conn->state == OR_CONN_STATE_HANDSHAKING)
       return connection_tls_continue_handshake(conn);
 
     /* else open, or closing */
-    if(at_most > conn->receiver_bucket)
-      at_most = conn->receiver_bucket;
     result = read_to_buf_tls(conn->tls, at_most, conn->inbuf);
 
     switch(result) {
@@ -527,22 +628,7 @@ int connection_read_to_buf(connection_t *conn) {
       return -1;
   }
 
-  global_read_bucket -= result; assert(global_read_bucket >= 0);
-  if(global_read_bucket == 0) {
-    log_fn(LOG_DEBUG,"global bucket exhausted. Pausing.");
-    conn->wants_to_read = 1;
-    connection_stop_reading(conn);
-    return 0;
-  }
-  if(connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
-    conn->receiver_bucket -= result; assert(conn->receiver_bucket >= 0);
-    if(conn->receiver_bucket == 0) {
-      log_fn(LOG_DEBUG,"receiver bucket exhausted. Pausing.");
-      conn->wants_to_read = 1;
-      connection_stop_reading(conn);
-      return 0;
-    }
-  }
+  connection_bucket_decrement(conn, result);
   return 0;
 }
 
@@ -754,21 +840,6 @@ connection_t *connection_get_by_type_state_lastwritten(int type, int state) {
   return best;
 }
 
-int connection_receiver_bucket_should_increase(connection_t *conn) {
-  assert(conn);
-
-  if(!connection_speaks_cells(conn))
-    return 0; /* edge connections don't use receiver_buckets */
-  if(conn->state != OR_CONN_STATE_OPEN)
-    return 0; /* only open connections play the rate limiting game */
-
-  assert(conn->bandwidth > 0);
-  if(conn->receiver_bucket > 9*conn->bandwidth)
-    return 0;
-
-  return 1;
-}
-
 int connection_is_listener(connection_t *conn) {
   if(conn->type == CONN_TYPE_OR_LISTENER ||
      conn->type == CONN_TYPE_AP_LISTENER ||

+ 7 - 29
src/or/main.c

@@ -11,7 +11,6 @@ static int init_from_config(int argc, char **argv);
 
 /********* START VARIABLES **********/
 
-extern char *conn_type_to_string[];
 extern char *conn_state_to_string[][_CONN_TYPE_MAX+1];
 
 or_options_t options; /* command-line and config-file options */
@@ -281,23 +280,6 @@ static void run_connection_housekeeping(int i, time_t now) {
   cell_t cell;
   connection_t *conn = connection_array[i];
 
-  if(connection_receiver_bucket_should_increase(conn)) {
-    conn->receiver_bucket += conn->bandwidth;
-    //        log_fn(LOG_DEBUG,"Receiver bucket %d now %d.", i, conn->receiver_bucket);
-  }
-
-  if(conn->wants_to_read == 1 /* it's marked to turn reading back on now */
-     && global_read_bucket > 0 /* and we're allowed to read */
-     && (!connection_speaks_cells(conn) || conn->receiver_bucket > 0)) {
-    /* and either a non-cell conn or a cell conn with non-empty bucket */
-    conn->wants_to_read = 0;
-    connection_start_reading(conn);
-    if(conn->wants_to_write == 1) {
-      conn->wants_to_write = 0;
-      connection_start_writing(conn);
-    }
-  }
-
   /* check connections to see whether we should send a keepalive, expire, or wait */
   if(!connection_speaks_cells(conn))
     return;
@@ -397,16 +379,6 @@ static void run_scheduled_events(time_t now) {
     }
   }
 
-  /* 4. Every second, we check how much bandwidth we've consumed and
-   *    increment global_read_bucket.
-   */
-  stats_n_bytes_read += stats_prev_global_read_bucket-global_read_bucket;
-  if(global_read_bucket < options.BandwidthBurst) {
-    global_read_bucket += options.BandwidthRate;
-    log_fn(LOG_DEBUG,"global_read_bucket now %d.", global_read_bucket);
-  }
-  stats_prev_global_read_bucket = global_read_bucket;
-
   /* 5. We do housekeeping for each connection... */
   for(i=0;i<nfds;i++) {
     run_connection_housekeeping(i, now);
@@ -433,6 +405,12 @@ static int prepare_for_poll(void) {
 
   tor_gettimeofday(&now);
 
+  /* Check how much bandwidth we've consumed,
+   * and increment the token buckets. */
+  stats_n_bytes_read += stats_prev_global_read_bucket-global_read_bucket;
+  connection_bucket_refill(&now);
+  stats_prev_global_read_bucket = global_read_bucket;
+
   if(now.tv_sec > current_second) { /* the second has rolled over. check more stuff. */
 
     ++stats_n_seconds_reading;
@@ -486,7 +464,7 @@ static int init_from_config(int argc, char **argv) {
     log_fn(LOG_DEBUG, "Successfully opened DebugLogFile '%s'.", options.DebugLogFile);
   }
 
-  global_read_bucket = options.BandwidthBurst; /* start it at max traffic */
+  connection_bucket_init();
   stats_prev_global_read_bucket = global_read_bucket;
 
   if(options.RunAsDaemon) {

+ 3 - 5
src/or/or.h

@@ -664,8 +664,8 @@ int getconfig(int argc, char **argv, or_options_t *options);
 
 /********************************* connection.c ***************************/
 
-#define CONN_TYPE_TO_STRING(t) (((t) < _CONN_TYPE_MIN || (t) > _CONN_TYPE_MAX) ? "Unknown" : \
-	                            conn_type_to_string[(t)])
+#define CONN_TYPE_TO_STRING(t) (((t) < _CONN_TYPE_MIN || (t) > _CONN_TYPE_MAX) ? \
+  "Unknown" : conn_type_to_string[(t)])
 
 extern char *conn_type_to_string[];
 
@@ -711,13 +711,11 @@ connection_t *connection_get_by_type(int type);
 connection_t *connection_get_by_type_state(int type, int state);
 connection_t *connection_get_by_type_state_lastwritten(int type, int state);
 
-int connection_receiver_bucket_should_increase(connection_t *conn);
-
 #define connection_speaks_cells(conn) ((conn)->type == CONN_TYPE_OR)
 #define connection_has_pending_tls_data(conn) \
   ((conn)->type == CONN_TYPE_OR && \
    (conn)->state == OR_CONN_STATE_OPEN && \
-   tor_tls_get_pending_bytes(conn->tls))
+   tor_tls_get_pending_bytes((conn)->tls))
 int connection_is_listener(connection_t *conn);
 int connection_state_is_open(connection_t *conn);