|
@@ -19,7 +19,7 @@ static connection_t *connection_create_listener(const char *listenaddress,
|
|
|
static int connection_init_accepted_conn(connection_t *conn,
|
|
|
uint8_t listener_type);
|
|
|
static int connection_handle_listener_read(connection_t *conn, int new_type);
|
|
|
-static int connection_receiver_bucket_should_increase(or_connection_t *conn);
|
|
|
+static int connection_read_bucket_should_increase(or_connection_t *conn);
|
|
|
static int connection_finished_flushing(connection_t *conn);
|
|
|
static int connection_flushed_some(connection_t *conn);
|
|
|
static int connection_finished_connecting(connection_t *conn);
|
|
@@ -1103,59 +1103,55 @@ retry_all_listeners(int force, smartlist_t *replaced_conns,
|
|
|
|
|
|
extern int global_read_bucket, global_write_bucket;
|
|
|
|
|
|
-/** How many bytes at most can we read onto this connection? */
|
|
|
static int
|
|
|
-connection_bucket_read_limit(connection_t *conn)
|
|
|
+connection_bucket_round_robin(int base, int global_bucket, int conn_bucket)
|
|
|
{
|
|
|
int at_most;
|
|
|
- int base = connection_speaks_cells(conn) ?
|
|
|
- CELL_NETWORK_SIZE : RELAY_PAYLOAD_SIZE;
|
|
|
|
|
|
/* Do a rudimentary round-robin so one circuit can't hog a connection.
|
|
|
* Pick at most 32 cells, at least 4 cells if possible, and if we're in
|
|
|
* the middle pick 1/8 of the available bandwidth. */
|
|
|
- at_most = global_read_bucket / 8;
|
|
|
+ at_most = global_bucket / 8;
|
|
|
at_most -= (at_most % base); /* round down */
|
|
|
if (at_most > 32*base) /* 16 KB */
|
|
|
at_most = 32*base;
|
|
|
else if (at_most < 4*base) /* 2 KB */
|
|
|
at_most = 4*base;
|
|
|
|
|
|
- if (at_most > global_read_bucket)
|
|
|
- at_most = global_read_bucket;
|
|
|
+ if (at_most > global_bucket)
|
|
|
+ at_most = global_bucket;
|
|
|
|
|
|
- if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
|
|
|
- or_connection_t *or_conn = TO_OR_CONN(conn);
|
|
|
- if (at_most > or_conn->receiver_bucket)
|
|
|
- at_most = or_conn->receiver_bucket;
|
|
|
- }
|
|
|
+ if (conn_bucket >= 0 && at_most > conn_bucket)
|
|
|
+ at_most = conn_bucket;
|
|
|
|
|
|
if (at_most < 0)
|
|
|
return 0;
|
|
|
return at_most;
|
|
|
}
|
|
|
|
|
|
+/** How many bytes at most can we read onto this connection? */
|
|
|
+static int
|
|
|
+connection_bucket_read_limit(connection_t *conn)
|
|
|
+{
|
|
|
+ int base = connection_speaks_cells(conn) ?
|
|
|
+ CELL_NETWORK_SIZE : RELAY_PAYLOAD_SIZE;
|
|
|
+ int conn_bucket = -1;
|
|
|
+ if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
|
|
|
+ or_connection_t *or_conn = TO_OR_CONN(conn);
|
|
|
+ conn_bucket = or_conn->read_bucket;
|
|
|
+ }
|
|
|
+ return connection_bucket_round_robin(base, global_read_bucket, conn_bucket);
|
|
|
+}
|
|
|
+
|
|
|
/** How many bytes at most can we write onto this connection? */
|
|
|
int
|
|
|
connection_bucket_write_limit(connection_t *conn)
|
|
|
{
|
|
|
- unsigned int at_most;
|
|
|
-
|
|
|
- /* do a rudimentary round-robin so one circuit can't hog a connection */
|
|
|
- if (connection_speaks_cells(conn)) {
|
|
|
- at_most = 32*(CELL_NETWORK_SIZE);
|
|
|
- } else {
|
|
|
- at_most = 32*(RELAY_PAYLOAD_SIZE);
|
|
|
- }
|
|
|
-
|
|
|
- if (at_most > conn->outbuf_flushlen)
|
|
|
- at_most = conn->outbuf_flushlen;
|
|
|
+ int base = connection_speaks_cells(conn) ?
|
|
|
+ CELL_NETWORK_SIZE : RELAY_PAYLOAD_SIZE;
|
|
|
|
|
|
-#if 0 /* don't enable til we actually do write limiting */
|
|
|
- if (at_most > global_write_bucket)
|
|
|
- at_most = global_write_bucket;
|
|
|
-#endif
|
|
|
- return at_most;
|
|
|
+ return connection_bucket_round_robin(base, global_write_bucket,
|
|
|
+ conn->outbuf_flushlen);
|
|
|
}
|
|
|
|
|
|
/** Return 1 if the global write bucket has no bytes in it,
|
|
@@ -1172,31 +1168,56 @@ connection_read_bucket_decrement(connection_t *conn, int num_read)
|
|
|
{
|
|
|
global_read_bucket -= num_read;
|
|
|
if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
|
|
|
- TO_OR_CONN(conn)->receiver_bucket -= num_read;
|
|
|
+ TO_OR_CONN(conn)->read_bucket -= num_read;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/** If we have exhausted our global read bucket, or the read bucket for conn,
|
|
|
+/** If we have exhausted our global buckets, or the buckets for conn,
|
|
|
* stop reading. */
|
|
|
static void
|
|
|
-connection_consider_empty_buckets(connection_t *conn)
|
|
|
+connection_consider_empty_read_buckets(connection_t *conn)
|
|
|
{
|
|
|
if (global_read_bucket <= 0) {
|
|
|
- LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,"global bucket exhausted. Pausing."));
|
|
|
+ LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
|
|
|
+ "global read bucket exhausted. Pausing."));
|
|
|
conn->wants_to_read = 1;
|
|
|
connection_stop_reading(conn);
|
|
|
return;
|
|
|
}
|
|
|
if (connection_speaks_cells(conn) &&
|
|
|
conn->state == OR_CONN_STATE_OPEN &&
|
|
|
- TO_OR_CONN(conn)->receiver_bucket <= 0) {
|
|
|
+ TO_OR_CONN(conn)->read_bucket <= 0) {
|
|
|
LOG_FN_CONN(conn,
|
|
|
- (LOG_DEBUG,LD_NET,"receiver bucket exhausted. Pausing."));
|
|
|
+ (LOG_DEBUG,LD_NET,"read bucket exhausted. Pausing."));
|
|
|
conn->wants_to_read = 1;
|
|
|
connection_stop_reading(conn);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+/** If we have exhausted our global buckets, or the buckets for conn,
|
|
|
+ * stop writing. */
|
|
|
+static void
|
|
|
+connection_consider_empty_write_buckets(connection_t *conn)
|
|
|
+{
|
|
|
+ if (global_write_bucket <= 0) {
|
|
|
+ LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
|
|
|
+ "global write bucket exhausted. Pausing."));
|
|
|
+ conn->wants_to_write = 1;
|
|
|
+ connection_stop_writing(conn);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+#if 0
|
|
|
+ if (connection_speaks_cells(conn) &&
|
|
|
+ conn->state == OR_CONN_STATE_OPEN &&
|
|
|
+ TO_OR_CONN(conn)->write_bucket <= 0) {
|
|
|
+ LOG_FN_CONN(conn,
|
|
|
+ (LOG_DEBUG,LD_NET,"write bucket exhausted. Pausing."));
|
|
|
+ conn->wants_to_write = 1;
|
|
|
+ connection_stop_writing(conn);
|
|
|
+ }
|
|
|
+#endif
|
|
|
+}
|
|
|
+
|
|
|
/** Initialize the global read bucket to options->BandwidthBurst. */
|
|
|
void
|
|
|
connection_bucket_init(void)
|
|
@@ -1209,24 +1230,25 @@ connection_bucket_init(void)
|
|
|
|
|
|
/** A second has rolled over; increment buckets appropriately. */
|
|
|
void
|
|
|
-connection_bucket_refill(struct timeval *now)
|
|
|
+connection_bucket_refill(int seconds_elapsed)
|
|
|
{
|
|
|
int i, n;
|
|
|
connection_t *conn;
|
|
|
connection_t **carray;
|
|
|
or_options_t *options = get_options();
|
|
|
- /* Not used, but it should be! We might have rolled over more than one
|
|
|
- * second! XXXX */
|
|
|
- (void) now;
|
|
|
|
|
|
/* refill the global buckets */
|
|
|
if (global_read_bucket < (int)options->BandwidthBurst) {
|
|
|
- global_read_bucket += (int)options->BandwidthRate;
|
|
|
- log_debug(LD_NET,"global_read_bucket now %d.", global_read_bucket);
|
|
|
+ global_read_bucket += (int)options->BandwidthRate*seconds_elapsed;
|
|
|
+ if (global_read_bucket > (int)options->BandwidthBurst)
|
|
|
+ global_read_bucket = (int)options->BandwidthBurst;
|
|
|
+ log(LOG_DEBUG, LD_NET,"global_read_bucket now %d.", global_read_bucket);
|
|
|
}
|
|
|
if (global_write_bucket < (int)options->BandwidthBurst) {
|
|
|
- global_write_bucket += (int)options->BandwidthRate;
|
|
|
- log_debug(LD_NET,"global_write_bucket now %d.", global_write_bucket);
|
|
|
+ global_write_bucket += (int)options->BandwidthRate*seconds_elapsed;
|
|
|
+ if (global_write_bucket > (int)options->BandwidthBurst)
|
|
|
+ global_write_bucket = (int)options->BandwidthBurst;
|
|
|
+ log(LOG_DEBUG, LD_NET,"global_write_bucket now %d.", global_write_bucket);
|
|
|
}
|
|
|
|
|
|
/* refill the per-connection buckets */
|
|
@@ -1236,30 +1258,32 @@ connection_bucket_refill(struct timeval *now)
|
|
|
|
|
|
if (connection_speaks_cells(conn)) {
|
|
|
or_connection_t *or_conn = TO_OR_CONN(conn);
|
|
|
- if (connection_receiver_bucket_should_increase(or_conn)) {
|
|
|
- or_conn->receiver_bucket += or_conn->bandwidthrate;
|
|
|
- if (or_conn->receiver_bucket > or_conn->bandwidthburst)
|
|
|
- or_conn->receiver_bucket = or_conn->bandwidthburst;
|
|
|
+ if (connection_read_bucket_should_increase(or_conn)) {
|
|
|
+ or_conn->read_bucket += or_conn->bandwidthrate*seconds_elapsed;
|
|
|
+ if (or_conn->read_bucket > or_conn->bandwidthburst)
|
|
|
+ or_conn->read_bucket = or_conn->bandwidthburst;
|
|
|
//log_fn(LOG_DEBUG,"Receiver bucket %d now %d.", i,
|
|
|
- // conn->receiver_bucket);
|
|
|
+ // conn->read_bucket);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if (conn->wants_to_read == 1 /* it's marked to turn reading back on now */
|
|
|
&& global_read_bucket > 0 /* and we're allowed to read */
|
|
|
- && global_write_bucket > 0 /* and we're allowed to write (XXXX,
|
|
|
- * not the best place to check this.) */
|
|
|
&& (!connection_speaks_cells(conn) ||
|
|
|
conn->state != OR_CONN_STATE_OPEN ||
|
|
|
- TO_OR_CONN(conn)->receiver_bucket > 0)) {
|
|
|
- /* and either a non-cell conn or a cell conn with non-empty bucket */
|
|
|
- LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,"waking up conn (fd %d)",conn->s));
|
|
|
+ TO_OR_CONN(conn)->read_bucket > 0)) {
|
|
|
+ /* and either a non-cell conn or a cell conn with non-empty bucket */
|
|
|
+ LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
|
|
|
+ "waking up conn (fd %d) for read",conn->s));
|
|
|
conn->wants_to_read = 0;
|
|
|
connection_start_reading(conn);
|
|
|
- if (conn->wants_to_write == 1) {
|
|
|
- conn->wants_to_write = 0;
|
|
|
- connection_start_writing(conn);
|
|
|
- }
|
|
|
+ }
|
|
|
+ if (conn->wants_to_write == 1 &&
|
|
|
+ global_write_bucket > 0) { /* and we're allowed to write */
|
|
|
+ LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
|
|
|
+ "waking up conn (fd %d) for write",conn->s));
|
|
|
+ conn->wants_to_write = 0;
|
|
|
+ connection_start_writing(conn);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1268,13 +1292,13 @@ connection_bucket_refill(struct timeval *now)
|
|
|
* should add another pile of tokens to it?
|
|
|
*/
|
|
|
static int
|
|
|
-connection_receiver_bucket_should_increase(or_connection_t *conn)
|
|
|
+connection_read_bucket_should_increase(or_connection_t *conn)
|
|
|
{
|
|
|
tor_assert(conn);
|
|
|
|
|
|
if (conn->_base.state != OR_CONN_STATE_OPEN)
|
|
|
return 0; /* only open connections play the rate limiting game */
|
|
|
- if (conn->receiver_bucket >= conn->bandwidthburst)
|
|
|
+ if (conn->read_bucket >= conn->bandwidthburst)
|
|
|
return 0;
|
|
|
|
|
|
return 1;
|
|
@@ -1470,7 +1494,7 @@ connection_read_to_buf(connection_t *conn, int *max_to_read)
|
|
|
/* Call even if result is 0, since the global read bucket may
|
|
|
* have reached 0 on a different conn, and this guy needs to
|
|
|
* know to stop reading. */
|
|
|
- connection_consider_empty_buckets(conn);
|
|
|
+ connection_consider_empty_read_buckets(conn);
|
|
|
|
|
|
return 0;
|
|
|
}
|
|
@@ -1644,8 +1668,13 @@ connection_handle_write(connection_t *conn)
|
|
|
/* already marked */
|
|
|
return -1;
|
|
|
}
|
|
|
+ return 0;
|
|
|
}
|
|
|
|
|
|
+ /* Call even if result is 0, since the global write bucket may
|
|
|
+ * have reached 0 on a different conn, and this guy needs to
|
|
|
+ * know to stop writing. */
|
|
|
+ connection_consider_empty_write_buckets(conn);
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
@@ -2267,7 +2296,7 @@ assert_connection_ok(connection_t *conn, time_t now)
|
|
|
* gave a bad cert/etc, then we won't have assigned bandwidth,
|
|
|
* yet it will be open. -RD
|
|
|
*/
|
|
|
-// tor_assert(conn->receiver_bucket >= 0);
|
|
|
+// tor_assert(conn->read_bucket >= 0);
|
|
|
}
|
|
|
// tor_assert(conn->addr && conn->port);
|
|
|
tor_assert(conn->address);
|