소스 검색

be more greedy about filling up all relay cells.

this may have some bugs in it still.
and it may end up not being what we want to do.


svn:r2928
Roger Dingledine 21 년 전
부모
커밋
6a516dfdd3
7개의 변경된 파일70개의 추가작업 그리고 39개의 파일을 삭제
  1. 5 3
      src/or/circuitbuild.c
  2. 2 2
      src/or/config.c
  3. 39 16
      src/or/connection.c
  4. 4 3
      src/or/connection_edge.c
  5. 7 5
      src/or/or.h
  6. 9 6
      src/or/relay.c
  7. 4 4
      src/or/rendclient.c

+ 5 - 3
src/or/circuitbuild.c

@@ -298,11 +298,13 @@ circuit_t *circuit_establish_circuit(uint8_t purpose,
 
 /** Find circuits that are waiting on <b>or_conn</b> to become open,
  * if any, and get them to send their create cells forward.
+ *
+ * Status is 1 if connect succeeded, or 0 if connect failed.
  */
-void circuit_n_conn_done(connection_t *or_conn, int success) {
+void circuit_n_conn_done(connection_t *or_conn, int status) {
   circuit_t *circ;
 
-  log_fn(LOG_DEBUG,"or_conn to %s, success=%d", or_conn->nickname, success);
+  log_fn(LOG_DEBUG,"or_conn to %s, status=%d", or_conn->nickname, status);
 
   for(circ=global_circuitlist;circ;circ = circ->next) {
     if (circ->marked_for_close)
@@ -312,7 +314,7 @@ void circuit_n_conn_done(connection_t *or_conn, int success) {
        circ->n_port == or_conn->port &&
        !memcmp(or_conn->identity_digest, circ->n_conn_id_digest, DIGEST_LEN)) {
       tor_assert(circ->state == CIRCUIT_STATE_OR_WAIT);
-      if(!success) { /* or_conn failed; close circ */
+      if(!status) { /* or_conn failed; close circ */
         log_fn(LOG_INFO,"or_conn failed. Closing circ.");
         circuit_mark_for_close(circ);
         continue;

+ 2 - 2
src/or/config.c

@@ -95,8 +95,8 @@ static config_var_t config_vars[] = {
   VAR("Address",             STRING,   Address,              NULL),
   VAR("AllowUnverifiedNodes",CSV,      AllowUnverifiedNodes, "middle,rendezvous"),
   VAR("AuthoritativeDirectory",BOOL,   AuthoritativeDir,     "0"),
-  VAR("BandwidthRate",       MEMUNIT,   BandwidthRate,   "780 KB"),
-  VAR("BandwidthBurst",      MEMUNIT,   BandwidthBurst,  "48 MB"),
+  VAR("BandwidthRate",       MEMUNIT,  BandwidthRate,        "780 KB"),
+  VAR("BandwidthBurst",      MEMUNIT,  BandwidthBurst,       "48 MB"),
   VAR("ClientOnly",          BOOL,     ClientOnly,           "0"),
   VAR("ContactInfo",         STRING,   ContactInfo,          NULL),
   VAR("ControlPort",         UINT,     ControlPort,          "0"),

+ 39 - 16
src/or/connection.c

@@ -87,8 +87,8 @@ static int connection_handle_listener_read(connection_t *conn, int new_type);
 static int connection_receiver_bucket_should_increase(connection_t *conn);
 static int connection_finished_flushing(connection_t *conn);
 static int connection_finished_connecting(connection_t *conn);
-static int connection_read_to_buf(connection_t *conn);
-static int connection_process_inbuf(connection_t *conn);
+static int connection_read_to_buf(connection_t *conn, int *max_to_read);
+static int connection_process_inbuf(connection_t *conn, int package_partial);
 static int connection_bucket_read_limit(connection_t *conn);
 
 /**************************************************************/
@@ -803,6 +803,7 @@ static int connection_receiver_bucket_should_increase(connection_t *conn) {
  * return 0.
  */
 int connection_handle_read(connection_t *conn) {
+  int max_to_read=-1, try_to_read;
 
   conn->timestamp_lastread = time(NULL);
 
@@ -817,16 +818,19 @@ int connection_handle_read(connection_t *conn) {
       return connection_handle_listener_read(conn, CONN_TYPE_CONTROL);
   }
 
-  if(connection_read_to_buf(conn) < 0) {
+loop_again:
+  try_to_read = max_to_read;
+  tor_assert(!conn->marked_for_close);
+  if (connection_read_to_buf(conn, &max_to_read) < 0) {
     /* There's a read error; kill the connection.*/
     connection_close_immediate(conn); /* Don't flush; connection is dead. */
-    if(conn->type == CONN_TYPE_AP || conn->type == CONN_TYPE_EXIT) {
+    if (conn->type == CONN_TYPE_AP || conn->type == CONN_TYPE_EXIT) {
       connection_edge_end(conn, (char)(connection_state_is_open(conn) ?
                           END_STREAM_REASON_MISC : END_STREAM_REASON_CONNECTFAILED),
                           conn->cpath_layer);
     }
     connection_mark_for_close(conn);
-    if(conn->type == CONN_TYPE_DIR &&
+    if (conn->type == CONN_TYPE_DIR &&
        conn->state == DIR_CONN_STATE_CONNECTING) {
        /* it's a directory server and connecting failed: forget about this router */
        /* XXX I suspect pollerr may make Windows not get to this point. :( */
@@ -839,8 +843,17 @@ int connection_handle_read(connection_t *conn) {
     }
     return -1;
   }
-  if(connection_process_inbuf(conn) < 0) {
-//    log_fn(LOG_DEBUG,"connection_process_inbuf returned -1.");
+  if (CONN_IS_EDGE(conn) &&
+      try_to_read != max_to_read) {
+    /* instruct it not to try to package partial cells. */
+    if (connection_process_inbuf(conn, 0) < 0) {
+      return -1;
+    }
+    if (connection_is_reading(conn) && !conn->inbuf_reached_eof)
+      goto loop_again; /* try reading again, in case more is here now */
+  }
+  /* one last try, packaging partial cells and all. */
+  if (connection_process_inbuf(conn, 1) < 0) {
     return -1;
   }
   return 0;
@@ -850,14 +863,19 @@ int connection_handle_read(connection_t *conn) {
  * directly or via TLS. Reduce the token buckets by the number of
  * bytes read.
  *
+ * If *max_to_read is -1, then decide it ourselves, else go with the
+ * value passed to us. When returning, if it's changed, subtract the
+ * number of bytes we read from *max_to_read.
+ *
  * Return -1 if we want to break conn, else return 0.
  */
-static int connection_read_to_buf(connection_t *conn) {
-  int result;
-  int at_most;
+static int connection_read_to_buf(connection_t *conn, int *max_to_read) {
+  int result, at_most = *max_to_read;
 
-  /* how many bytes are we allowed to read? */
-  at_most = connection_bucket_read_limit(conn);
+  if(at_most == -1) { /* we need to initialize it */
+    /* how many bytes are we allowed to read? */
+    at_most = connection_bucket_read_limit(conn);
+  }
 
   if(connection_speaks_cells(conn) && conn->state != OR_CONN_STATE_CONNECTING) {
     if(conn->state == OR_CONN_STATE_HANDSHAKING) {
@@ -898,7 +916,11 @@ static int connection_read_to_buf(connection_t *conn) {
       return -1;
   }
 
-  if(result > 0 && !is_local_IP(conn->addr)) { /* remember it */
+  if (result > 0) { /* change *max_to_read */
+    *max_to_read = at_most - result;
+  }
+
+  if (result > 0 && !is_local_IP(conn->addr)) { /* remember it */
     rep_hist_note_bytes_read(result, time(NULL));
     connection_read_bucket_decrement(conn, result);
   }
@@ -1250,9 +1272,10 @@ int connection_send_destroy(uint16_t circ_id, connection_t *conn) {
 /** Process new bytes that have arrived on conn-\>inbuf.
  *
  * This function just passes conn to the connection-specific
- * connection_*_process_inbuf() function.
+ * connection_*_process_inbuf() function. It also passes in
+ * package_partial if wanted.
  */
-static int connection_process_inbuf(connection_t *conn) {
+static int connection_process_inbuf(connection_t *conn, int package_partial) {
 
   tor_assert(conn);
 
@@ -1261,7 +1284,7 @@ static int connection_process_inbuf(connection_t *conn) {
       return connection_or_process_inbuf(conn);
     case CONN_TYPE_EXIT:
     case CONN_TYPE_AP:
-      return connection_edge_process_inbuf(conn);
+      return connection_edge_process_inbuf(conn, package_partial);
     case CONN_TYPE_DIR:
       return connection_dir_process_inbuf(conn);
     case CONN_TYPE_DNSWORKER:

+ 4 - 3
src/or/connection_edge.c

@@ -32,7 +32,7 @@ static int connection_ap_handshake_process_socks(connection_t *conn);
  * Mark and return -1 if there was an unexpected error with the conn,
  * else return 0.
  */
-int connection_edge_process_inbuf(connection_t *conn) {
+int connection_edge_process_inbuf(connection_t *conn, int package_partial) {
 
   tor_assert(conn);
   tor_assert(conn->type == CONN_TYPE_AP || conn->type == CONN_TYPE_EXIT);
@@ -81,7 +81,7 @@ int connection_edge_process_inbuf(connection_t *conn) {
         log_fn(LOG_WARN,"called with package_window %d. Tell Roger.", conn->package_window);
         return 0;
       }
-      if(connection_edge_package_raw_inbuf(conn) < 0) {
+      if(connection_edge_package_raw_inbuf(conn, package_partial) < 0) {
         connection_edge_end(conn, END_STREAM_REASON_MISC, conn->cpath_layer);
         connection_mark_for_close(conn);
         return -1;
@@ -221,7 +221,8 @@ int connection_edge_finished_connecting(connection_t *conn)
       return 0; /* circuit is closed, don't continue */
   }
   tor_assert(conn->package_window > 0);
-  return connection_edge_process_inbuf(conn); /* in case the server has written anything */
+  /* in case the server has written anything */
+  return connection_edge_process_inbuf(conn, 1);
 }
 
 /** How many times do we retry a general-purpose stream (detach it from

+ 7 - 5
src/or/or.h

@@ -180,6 +180,8 @@ typedef enum {
 #define CONN_TYPE_CONTROL 13
 #define _CONN_TYPE_MAX 13
 
+#define CONN_IS_EDGE(x) ((x)->type == CONN_TYPE_EXIT || (x)->type == CONN_TYPE_AP)
+
 /** State for any listener connection. */
 #define LISTENER_STATE_READY 0
 
@@ -1034,7 +1036,7 @@ void circuit_rep_hist_note_result(circuit_t *circ);
 void circuit_dump_by_conn(connection_t *conn, int severity);
 circuit_t *circuit_establish_circuit(uint8_t purpose,
                                      const char *exit_digest);
-void circuit_n_conn_done(connection_t *or_conn, int success);
+void circuit_n_conn_done(connection_t *or_conn, int status);
 int circuit_send_next_onion_skin(circuit_t *circ);
 int circuit_extend(cell_t *cell, circuit_t *circ);
 int circuit_init_cpath_crypto(crypt_path_t *cpath, char *key_data, int reverse);
@@ -1193,7 +1195,7 @@ int connection_or_nonopen_was_started_here(connection_t *conn);
 
 /********************************* connection_edge.c ***************************/
 
-int connection_edge_process_inbuf(connection_t *conn);
+int connection_edge_process_inbuf(connection_t *conn, int package_partial);
 int connection_edge_destroy(uint16_t circ_id, connection_t *conn);
 int connection_edge_end(connection_t *conn, char reason, crypt_path_t *cpath_layer);
 int connection_edge_finished_flushing(connection_t *conn);
@@ -1204,7 +1206,7 @@ int connection_ap_handshake_send_resolve(connection_t *ap_conn, circuit_t *circ)
 
 int connection_ap_make_bridge(char *address, uint16_t port);
 void connection_ap_handshake_socks_reply(connection_t *conn, char *reply,
-                                         size_t replylen, int success);
+                                         size_t replylen, int status);
 void connection_ap_handshake_socks_resolved(connection_t *conn,
                                             int answer_type,
                                             size_t answer_len,
@@ -1405,7 +1407,7 @@ void relay_header_unpack(relay_header_t *dest, const char *src);
 int connection_edge_send_command(connection_t *fromconn, circuit_t *circ,
                                  int relay_command, const char *payload,
                                  size_t payload_len, crypt_path_t *cpath_layer);
-int connection_edge_package_raw_inbuf(connection_t *conn);
+int connection_edge_package_raw_inbuf(connection_t *conn, int package_partial);
 void connection_edge_consider_sending_sendme(connection_t *conn);
 
 extern uint64_t stats_n_data_cells_packaged;
@@ -1439,7 +1441,7 @@ void rend_client_refetch_renddesc(const char *query);
 int rend_client_remove_intro_point(char *failed_intro, const char *query);
 int rend_client_rendezvous_acked(circuit_t *circ, const char *request, size_t request_len);
 int rend_client_receive_rendezvous(circuit_t *circ, const char *request, size_t request_len);
-void rend_client_desc_fetched(char *query, int success);
+void rend_client_desc_fetched(char *query, int status);
 
 char *rend_client_get_random_intro(char *query);
 int rend_parse_rendezvous_address(char *address);

+ 9 - 6
src/or/relay.c

@@ -590,7 +590,7 @@ connection_edge_process_relay_cell_not_open(
     connection_ap_handshake_socks_reply(conn, NULL, 0, 1);
     conn->socks_request->has_finished = 1;
     /* handle anything that might have queued */
-    if (connection_edge_package_raw_inbuf(conn) < 0) {
+    if (connection_edge_package_raw_inbuf(conn, 1) < 0) {
       connection_edge_end(conn, END_STREAM_REASON_MISC, conn->cpath_layer);
       connection_mark_for_close(conn);
       return 0;
@@ -803,7 +803,7 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
       conn->package_window += STREAMWINDOW_INCREMENT;
       log_fn(LOG_DEBUG,"stream-level sendme, packagewindow now %d.", conn->package_window);
       connection_start_reading(conn);
-      connection_edge_package_raw_inbuf(conn); /* handle whatever might still be on the inbuf */
+      connection_edge_package_raw_inbuf(conn, 1); /* handle whatever might still be on the inbuf */
       return 0;
     case RELAY_COMMAND_RESOLVE:
       if (layer_hint) {
@@ -854,7 +854,7 @@ uint64_t stats_n_data_bytes_received = 0;
  *
  * Return -1 if conn should be marked for close, else return 0.
  */
-int connection_edge_package_raw_inbuf(connection_t *conn) {
+int connection_edge_package_raw_inbuf(connection_t *conn, int package_partial) {
   size_t amount_to_process, length;
   char payload[CELL_PAYLOAD_SIZE];
   circuit_t *circ;
@@ -881,10 +881,13 @@ repeat_connection_edge_package_raw_inbuf:
 
   amount_to_process = buf_datalen(conn->inbuf);
 
-  if(!amount_to_process)
+  if (!amount_to_process)
     return 0;
 
-  if(amount_to_process > RELAY_PAYLOAD_SIZE) {
+  if (!package_partial && amount_to_process < RELAY_PAYLOAD_SIZE)
+    return 0;
+
+  if (amount_to_process > RELAY_PAYLOAD_SIZE) {
     length = RELAY_PAYLOAD_SIZE;
   } else {
     length = amount_to_process;
@@ -982,7 +985,7 @@ circuit_resume_edge_reading_helper(connection_t *conn,
        (layer_hint && conn->package_window > 0 && conn->cpath_layer == layer_hint)) {
       connection_start_reading(conn);
       /* handle whatever might still be on the inbuf */
-      connection_edge_package_raw_inbuf(conn);
+      connection_edge_package_raw_inbuf(conn, 1);
 
       /* If the circuit won't accept any more data, return without looking
        * at any more of the streams. Any connections that should be stopped

+ 4 - 4
src/or/rendclient.c

@@ -369,10 +369,10 @@ rend_client_receive_rendezvous(circuit_t *circ, const char *request, size_t requ
 }
 
 /** Find all the apconns in state AP_CONN_STATE_RENDDESC_WAIT that
- * are waiting on query. If success==1, move them to the next state.
- * If success==0, fail them.
+ * are waiting on query. If status==1, move them to the next state.
+ * If status==0, fail them.
  */
-void rend_client_desc_fetched(char *query, int success) {
+void rend_client_desc_fetched(char *query, int status) {
   connection_t **carray;
   connection_t *conn;
   int n, i;
@@ -388,7 +388,7 @@ void rend_client_desc_fetched(char *query, int success) {
     if (rend_cmp_service_ids(conn->rend_query, query))
       continue;
     /* great, this guy was waiting */
-    if(success ||
+    if(status!=0 ||
        rend_cache_lookup_entry(conn->rend_query, &entry) == 1) {
       /* either this fetch worked, or it failed but there was a
        * valid entry from before which we should reuse */