Browse Source

Temp changes for cell events

Steven Engler 4 years ago
parent
commit
b30fea06c9

+ 1 - 1
src/app/config/config.c

@@ -7992,7 +7992,7 @@ init_libevent(const or_options_t *options)
   cfg.msec_per_tick = options->TokenBucketRefillInterval;
 
   /* Don't use any additional eventloops (only use the mainloop). */
-  int num_additional_eventloops = 4;
+  int num_additional_eventloops = 2;
 
   tor_libevent_initialize(&cfg, num_additional_eventloops);
   log_info(LD_CONFIG, "Initializing libevent with %d additional eventloops.",

+ 3 - 1
src/core/mainloop/connection.c

@@ -4465,7 +4465,7 @@ connection_flush(connection_t *conn)
  * Return true iff it is okay to queue bytes on <b>conn</b>'s outbuf for
  * writing.
  */
-static int
+int
 connection_may_write_to_buf(connection_t *conn)
 {
   /* if it's marked for close, only allow write if we mean to flush it */
@@ -4544,6 +4544,8 @@ connection_write_to_buf_impl_,(const char *string, size_t len,
   if (!connection_may_write_to_buf(conn))
     return;
 
+  tor_assert(conn->safe_conn == NULL);
+
   size_t written;
 
   if (conn->safe_conn == NULL) {

+ 1 - 0
src/core/mainloop/connection.h

@@ -220,6 +220,7 @@ int connection_wants_to_flush(connection_t *conn);
 int connection_outbuf_too_full(connection_t *conn);
 int connection_handle_write(connection_t *conn, int force);
 int connection_flush(connection_t *conn);
+int connection_may_write_to_buf(connection_t *conn);
 
 MOCK_DECL(void, connection_write_to_buf_impl_,
           (const char *string, size_t len, connection_t *conn, int zlib));

+ 11 - 7
src/core/mainloop/mainloop.c

@@ -1029,7 +1029,8 @@ conn_close_if_marked(int i)
       connection_wants_to_flush(conn)) {
     /* s == -1 means it's an incomplete edge connection, or that the socket
      * has already been closed as unflushable. */
-    ssize_t sz = connection_bucket_write_limit(conn, now);
+    //ssize_t sz = connection_bucket_write_limit(conn, now);
+    ssize_t sz = 0;
     if (!conn->hold_open_until_flushed)
       log_info(LD_NET,
                "Conn (addr %s, fd %d, type %s, state %d) marked, but wants "
@@ -1039,6 +1040,7 @@ conn_close_if_marked(int i)
                (int)conn->outbuf_flushlen,
                 conn->marked_for_close_file, conn->marked_for_close);
     if (conn->linked_conn) {
+      sz = connection_bucket_write_limit(conn, now);
       retval = buf_move_to_buf(conn->linked_conn->inbuf, conn->outbuf,
                                &conn->outbuf_flushlen);
       if (retval >= 0) {
@@ -1062,6 +1064,7 @@ conn_close_if_marked(int i)
       } else
         retval = -1; /* never flush non-open broken tls connections */
     } else {
+      sz = connection_bucket_write_limit(conn, now);
       retval = buf_flush_to_socket(conn->outbuf, conn->s, sz,
                                    &conn->outbuf_flushlen);
     }
@@ -1100,12 +1103,13 @@ conn_close_if_marked(int i)
       return 0;
     }
     if (connection_wants_to_flush(conn)) {
-      log_fn(LOG_INFO, LD_NET, "We stalled too much while trying to write %d "
+      //log_fn(LOG_INFO, LD_NET, "We stalled too much while trying to write %d "
+      log_fn(LOG_INFO, LD_NET, "We stalled too much while trying to write <?> "
              "bytes to address %s.  If this happens a lot, either "
              "something is wrong with your network connection, or "
              "something is wrong with theirs. "
              "(fd %d, type %s, state %d, marked at %s:%d).",
-             (int)connection_get_outbuf_len(conn),
+             //(int)connection_get_outbuf_len(conn),
              escaped_safe_str_client(conn->address),
              (int)conn->s, conn_type_to_string(conn->type), conn->state,
              conn->marked_for_close_file,
@@ -1213,7 +1217,6 @@ directory_info_has_arrived(time_t now, int from_cache, int suppress_logs)
 static void
 run_connection_housekeeping(int i, time_t now)
 {
-  cell_t cell;
   connection_t *conn = smartlist_get(connection_array, i);
   const or_options_t *options = get_options();
   or_connection_t *or_conn;
@@ -1332,9 +1335,10 @@ run_connection_housekeeping(int i, time_t now)
     /* send a padding cell */
     log_fn(LOG_DEBUG,LD_OR,"Sending keepalive to (%s:%d)",
            conn->address, conn->port);
-    memset(&cell,0,sizeof(cell_t));
-    cell.command = CELL_PADDING;
-    connection_or_write_cell_to_buf(&cell, or_conn);
+    cell_t *cell = tor_malloc_zero(sizeof(cell_t));
+    //memset(&cell,0,sizeof(cell_t));
+    cell->command = CELL_PADDING;
+    connection_or_write_cell_to_buf(cell, or_conn);
   } else {
     channelpadding_decide_to_pad_channel(chan);
   }

+ 1 - 1
src/core/or/channel.c

@@ -1501,7 +1501,7 @@ channel_write_packed_cell(channel_t *chan, packed_cell_t *cell)
   /* Whatever happens, we free the cell. Either an error occurred or the cell
    * was put on the connection outbuf, both cases we have ownership of the
    * cell and we free it. */
-  packed_cell_free(cell);
+  //packed_cell_free(cell);
   return ret;
 }
 

+ 29 - 20
src/core/or/channelpadding.c

@@ -295,26 +295,30 @@ STATIC int
 channelpadding_send_disable_command(channel_t *chan)
 {
   channelpadding_negotiate_t disable;
-  cell_t cell;
+  //cell_t cell;
+  cell_t *cell = tor_malloc_zero(sizeof(cell_t));
 
   tor_assert(chan);
   tor_assert(BASE_CHAN_TO_TLS(chan)->conn->link_proto >=
              MIN_LINK_PROTO_FOR_CHANNEL_PADDING);
 
-  memset(&cell, 0, sizeof(cell_t));
+  //memset(&cell, 0, sizeof(cell_t));
   memset(&disable, 0, sizeof(channelpadding_negotiate_t));
-  cell.command = CELL_PADDING_NEGOTIATE;
+  cell->command = CELL_PADDING_NEGOTIATE;
 
   channelpadding_negotiate_set_command(&disable, CHANNELPADDING_COMMAND_STOP);
 
-  if (channelpadding_negotiate_encode(cell.payload, CELL_PAYLOAD_SIZE,
-                                      &disable) < 0)
+  if (channelpadding_negotiate_encode(cell->payload, CELL_PAYLOAD_SIZE,
+                                      &disable) < 0) {
+    tor_free(cell);
     return -1;
-
-  if (chan->write_cell(chan, &cell) == 1)
+  }
+  if (chan->write_cell(chan, cell) == 1) {
     return 0;
-  else
+  } else {
+    tor_free(cell);
     return -1;
+  }
 }
 
 /**
@@ -328,28 +332,32 @@ channelpadding_send_enable_command(channel_t *chan, uint16_t low_timeout,
                                    uint16_t high_timeout)
 {
   channelpadding_negotiate_t enable;
-  cell_t cell;
+  //cell_t cell;
+  cell_t *cell = tor_malloc_zero(sizeof(cell_t));
 
   tor_assert(chan);
   tor_assert(BASE_CHAN_TO_TLS(chan)->conn->link_proto >=
              MIN_LINK_PROTO_FOR_CHANNEL_PADDING);
 
-  memset(&cell, 0, sizeof(cell_t));
+  //memset(&cell, 0, sizeof(cell_t));
   memset(&enable, 0, sizeof(channelpadding_negotiate_t));
-  cell.command = CELL_PADDING_NEGOTIATE;
+  cell->command = CELL_PADDING_NEGOTIATE;
 
   channelpadding_negotiate_set_command(&enable, CHANNELPADDING_COMMAND_START);
   channelpadding_negotiate_set_ito_low_ms(&enable, low_timeout);
   channelpadding_negotiate_set_ito_high_ms(&enable, high_timeout);
 
-  if (channelpadding_negotiate_encode(cell.payload, CELL_PAYLOAD_SIZE,
-                                      &enable) < 0)
+  if (channelpadding_negotiate_encode(cell->payload, CELL_PAYLOAD_SIZE,
+                                      &enable) < 0) {
+    tor_free(cell);
     return -1;
-
-  if (chan->write_cell(chan, &cell) == 1)
+  }
+  if (chan->write_cell(chan, cell) == 1) {
     return 0;
-  else
+  } else {
+    tor_free(cell);
     return -1;
+  }
 }
 
 /**
@@ -362,7 +370,7 @@ channelpadding_send_enable_command(channel_t *chan, uint16_t low_timeout,
 static void
 channelpadding_send_padding_cell_for_callback(channel_t *chan)
 {
-  cell_t cell;
+  //cell_t cell;
 
   /* Check that the channel is still valid and open */
   if (!chan || chan->state != CHANNEL_STATE_OPEN) {
@@ -406,9 +414,10 @@ channelpadding_send_padding_cell_for_callback(channel_t *chan)
 
   /* Send the padding cell. This will cause the channel to get a
    * fresh timestamp_active */
-  memset(&cell, 0, sizeof(cell));
-  cell.command = CELL_PADDING;
-  chan->write_cell(chan, &cell);
+  //memset(&cell, 0, sizeof(cell));
+  cell_t *cell = tor_malloc_zero(sizeof(cell_t));
+  cell->command = CELL_PADDING;
+  chan->write_cell(chan, cell);
 }
 
 /**

+ 19 - 5
src/core/or/channeltls.c

@@ -826,14 +826,20 @@ channel_tls_write_cell_method(channel_t *chan, cell_t *cell)
   return written;
 }
 
+static void
+void_packed_cell_free(void *void_packed_cell)
+{
+  packed_cell_free_((packed_cell_t *)void_packed_cell);
+}
+
 /**
  * Write a packed cell to a channel_tls_t.
  *
  * This implements the write_packed_cell method for channel_tls_t; given a
  * channel_tls_t and a packed_cell_t, transmit the packed_cell_t.
  *
- * Return 0 on success or negative value on error. The caller must free the
- * packed cell.
+ * Return 0 on success or negative value on error. The caller must not free
+ * the packed cell.
  */
 static int
 channel_tls_write_packed_cell_method(channel_t *chan,
@@ -841,14 +847,22 @@ channel_tls_write_packed_cell_method(channel_t *chan,
 {
   tor_assert(chan);
   channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan);
-  size_t cell_network_size = get_cell_network_size(chan->wide_circ_ids);
+  //size_t cell_network_size = get_cell_network_size(chan->wide_circ_ids);
 
   tor_assert(tlschan);
   tor_assert(packed_cell);
 
   if (tlschan->conn) {
-    connection_buf_add(packed_cell->body, cell_network_size,
-                            TO_CONN(tlschan->conn));
+    //connection_buf_add(packed_cell->body, cell_network_size,
+    //                        TO_CONN(tlschan->conn));
+    if (connection_may_write_to_buf(TO_CONN(tlschan->conn))) {
+      event_data_t event_data = { .ptr = packed_cell };
+      event_source_publish(TO_CONN(tlschan->conn)->event_source,
+                           or_conn_outgoing_packed_cell, event_data,
+                           void_packed_cell_free);
+    } else {
+      packed_cell_free(packed_cell);
+    }
   } else {
     log_info(LD_CHANNEL,
              "something called write_packed_cell on a tlschan "

+ 52 - 19
src/core/or/connection_or.c

@@ -84,6 +84,9 @@
 
 event_label_t or_conn_link_protocol_version_ev = EVENT_LABEL_UNSET;
 event_label_t or_conn_open_ev = EVENT_LABEL_UNSET;
+event_label_t or_conn_outgoing_packed_cell = EVENT_LABEL_UNSET;
+event_label_t or_conn_outgoing_fixed_cell = EVENT_LABEL_UNSET;
+event_label_t or_conn_outgoing_variable_cell = EVENT_LABEL_UNSET;
 
 //static int connection_tls_finish_handshake(or_connection_t *conn);
 static int connection_or_launch_v3_or_handshake(or_connection_t *conn);
@@ -126,11 +129,20 @@ or_conn_register_events(event_registry_t *registry)
 {
   tor_assert(or_conn_link_protocol_version_ev == EVENT_LABEL_UNSET);
   tor_assert(or_conn_open_ev == EVENT_LABEL_UNSET);
+  tor_assert(or_conn_outgoing_packed_cell == EVENT_LABEL_UNSET);
+  tor_assert(or_conn_outgoing_fixed_cell == EVENT_LABEL_UNSET);
+  tor_assert(or_conn_outgoing_variable_cell == EVENT_LABEL_UNSET);
 
   or_conn_link_protocol_version_ev = \
     event_registry_register_event(registry, "Decided protocol version");
   or_conn_open_ev = \
     event_registry_register_event(registry, "Connection is open");
+  or_conn_outgoing_packed_cell = \
+    event_registry_register_event(registry, "Outgoing packed cell");
+  or_conn_outgoing_fixed_cell = \
+    event_registry_register_event(registry, "Outgoing fixed cell");
+  or_conn_outgoing_variable_cell = \
+    event_registry_register_event(registry, "Outgoing variable cell");
 }
 
 /** Global map between Extended ORPort identifiers and OR
@@ -2415,13 +2427,13 @@ connection_or_set_state_open(or_connection_t *conn)
 void
 connection_or_write_cell_to_buf(const cell_t *cell, or_connection_t *conn)
 {
-  packed_cell_t networkcell;
-  size_t cell_network_size = get_cell_network_size(conn->wide_circ_ids);
+  //packed_cell_t networkcell;
+  //size_t cell_network_size = get_cell_network_size(conn->wide_circ_ids);
 
   tor_assert(cell);
   tor_assert(conn);
 
-  cell_pack(&networkcell, cell, conn->wide_circ_ids);
+  //cell_pack(&networkcell, cell, conn->wide_circ_ids);
 
   /* We need to count padding cells from this non-packed code path
    * since they are sent via chan->write_cell() (which is not packed) */
@@ -2429,7 +2441,11 @@ connection_or_write_cell_to_buf(const cell_t *cell, or_connection_t *conn)
   if (cell->command == CELL_PADDING)
     rep_hist_padding_count_write(PADDING_TYPE_CELL);
 
-  connection_buf_add(networkcell.body, cell_network_size, TO_CONN(conn));
+  //connection_buf_add(networkcell.body, cell_network_size, TO_CONN(conn));
+
+  event_data_t event_data = { .ptr = cell };
+  event_source_publish(TO_CONN(conn)->event_source,
+                       or_conn_outgoing_fixed_cell, event_data, tor_free_);
 
   /* Touch the channel's active timestamp if there is one */
   if (conn->chan) {
@@ -2446,6 +2462,12 @@ connection_or_write_cell_to_buf(const cell_t *cell, or_connection_t *conn)
     or_handshake_state_record_cell(conn, conn->handshake_state, cell, 0);
 }
 
+static void
+void_var_cell_free(void *void_var_cell)
+{
+  var_cell_free_((var_cell_t *)void_var_cell);
+}
+
 /** Pack a variable-length <b>cell</b> into wire-format, and write it onto
  * <b>conn</b>'s outbuf.  Right now, this <em>DOES NOT</em> support cells that
  * affect a circuit.
@@ -2454,14 +2476,20 @@ MOCK_IMPL(void,
 connection_or_write_var_cell_to_buf,(const var_cell_t *cell,
                                      or_connection_t *conn))
 {
-  int n;
-  char hdr[VAR_CELL_MAX_HEADER_SIZE];
+  //int n;
+  //char hdr[VAR_CELL_MAX_HEADER_SIZE];
   tor_assert(cell);
   tor_assert(conn);
-  n = var_cell_pack_header(cell, hdr, conn->wide_circ_ids);
-  connection_buf_add(hdr, n, TO_CONN(conn));
-  connection_buf_add((char*)cell->payload,
-                          cell->payload_len, TO_CONN(conn));
+  //n = var_cell_pack_header(cell, hdr, conn->wide_circ_ids);
+  //connection_buf_add(hdr, n, TO_CONN(conn));
+  //connection_buf_add((char*)cell->payload,
+  //                        cell->payload_len, TO_CONN(conn));
+
+  event_data_t event_data = { .ptr = cell };
+  event_source_publish(TO_CONN(conn)->event_source,
+                       or_conn_outgoing_variable_cell, event_data,
+                       void_var_cell_free);
+
   if (conn->base_.state == OR_CONN_STATE_OR_HANDSHAKING_V3)
     or_handshake_state_record_var_cell(conn, conn->handshake_state, cell, 0);
 
@@ -2598,7 +2626,7 @@ connection_or_send_versions(or_connection_t *conn, int v3_plus)
   connection_or_write_var_cell_to_buf(cell, conn);
   conn->handshake_state->sent_versions_at = time(NULL);
 
-  var_cell_free(cell);
+  //var_cell_free(cell);
   return 0;
 }
 
@@ -2632,7 +2660,8 @@ netinfo_addr_from_tor_addr(const tor_addr_t *tor_addr)
 MOCK_IMPL(int,
 connection_or_send_netinfo,(or_connection_t *conn))
 {
-  cell_t cell;
+  //cell_t cell;
+  cell_t *cell = NULL;
   time_t now = time(NULL);
   const routerinfo_t *me;
   int r = -1;
@@ -2645,8 +2674,9 @@ connection_or_send_netinfo,(or_connection_t *conn))
     return 0;
   }
 
-  memset(&cell, 0, sizeof(cell_t));
-  cell.command = CELL_NETINFO;
+  //memset(&cell, 0, sizeof(cell_t));
+  cell = tor_malloc_zero(sizeof(*cell));
+  cell->command = CELL_NETINFO;
 
   netinfo_cell_t *netinfo_cell = netinfo_cell_new();
 
@@ -2688,18 +2718,20 @@ connection_or_send_netinfo,(or_connection_t *conn))
   if ((errmsg = netinfo_cell_check(netinfo_cell))) {
     log_warn(LD_OR, "Failed to validate NETINFO cell with error: %s",
                     errmsg);
+    tor_free(cell);
     goto cleanup;
   }
 
-  if (netinfo_cell_encode(cell.payload, CELL_PAYLOAD_SIZE,
+  if (netinfo_cell_encode(cell->payload, CELL_PAYLOAD_SIZE,
                           netinfo_cell) < 0) {
     log_warn(LD_OR, "Failed generating NETINFO cell");
+    tor_free(cell);
     goto cleanup;
   }
 
   conn->handshake_state->digest_sent_data = 0;
   conn->handshake_state->sent_netinfo = 1;
-  connection_or_write_cell_to_buf(&cell, conn);
+  connection_or_write_cell_to_buf(cell, conn);
 
   r = 0;
  cleanup:
@@ -2851,7 +2883,7 @@ connection_or_send_certs_cell(or_connection_t *conn)
   cell->payload_len = enc_len;
 
   connection_or_write_var_cell_to_buf(cell, conn);
-  var_cell_free(cell);
+  //var_cell_free(cell);
   certs_cell_free(certs_cell);
   //tor_x509_cert_free(own_link_cert);
 
@@ -2934,6 +2966,7 @@ connection_or_send_auth_challenge_cell(or_connection_t *conn)
   if (len != cell->payload_len) {
     /* LCOV_EXCL_START */
     log_warn(LD_BUG, "Encoded auth challenge cell length not as expected");
+    var_cell_free(cell);
     goto done;
     /* LCOV_EXCL_STOP */
   }
@@ -2943,7 +2976,7 @@ connection_or_send_auth_challenge_cell(or_connection_t *conn)
   r = 0;
 
  done:
-  var_cell_free(cell);
+  //var_cell_free(cell);
   auth_challenge_cell_free(ac);
 
   return r;
@@ -3245,7 +3278,7 @@ connection_or_send_authenticate_cell,(or_connection_t *conn, int authtype))
     return -1;
   }
   connection_or_write_var_cell_to_buf(cell, conn);
-  var_cell_free(cell);
+  //var_cell_free(cell);
 
   return 0;
 }

+ 3 - 0
src/core/or/connection_or.h

@@ -17,6 +17,9 @@
 
 extern event_label_t or_conn_link_protocol_version_ev;
 extern event_label_t or_conn_open_ev;
+extern event_label_t or_conn_outgoing_packed_cell;
+extern event_label_t or_conn_outgoing_fixed_cell;
+extern event_label_t or_conn_outgoing_variable_cell;
 
 struct ed25519_public_key_t;
 struct ed25519_keypair_t;

+ 139 - 0
src/core/or/safe_connection.c

@@ -8,6 +8,7 @@
 #include "core/or/connection_or.h"
 #include "core/or/var_cell_st.h"
 #include "core/or/cell_st.h"
+#include "core/or/cell_queue_st.h"
 
 event_label_t safe_or_conn_tcp_connecting_ev = EVENT_LABEL_UNSET;
 event_label_t safe_or_conn_tls_handshaking_ev = EVENT_LABEL_UNSET;
@@ -52,6 +53,10 @@ safe_or_connection_socket_added_cb(safe_connection_t *safe_conn);
 static void
 safe_or_connection_outbuf_modified_cb(safe_connection_t *safe_conn);
 
+static void
+safe_or_conn_outgoing_cell_cb(event_label_t label, event_data_t data,
+                              void *context);
+
 static void
 process_cells_from_inbuf(safe_or_connection_t *safe_or_conn);
 
@@ -136,6 +141,39 @@ socket_rw_state_set(socket_rw_state_t *rw_state,
 
 /********************************************************/
 
+/*
+void
+safe_cell_queue_init(safe_cell_queue_t *queue)
+{
+  tor_assert(queue != NULL);
+  memset(queue, 0, sizeof(*queue));
+
+  tor_mutex_init(&queue->lock);
+  TOR_SIMPLEQ_INIT(&queue->head);
+}
+
+void
+safe_cell_queue_append(safe_cell_queue_t *queue,
+                       generic_cell_t *cell)
+{
+  tor_assert(queue != NULL);
+  tor_assert(cell != NULL);
+  tor_mutex_acquire(&queue->lock);
+
+  TOR_TAILQ_INSERT_TAIL(&queue->head, cell);
+
+  tor_mutex_release(&queue->lock);
+}
+
+generic_cell_t *
+safe_cell_queue_pop(safe_cell_queue_t *queue)
+{
+
+}
+*/
+
+/********************************************************/
+
 void
 safe_connection_init(safe_connection_t *safe_conn, uint32_t type_magic,
                      event_source_t *conn_event_source,
@@ -497,6 +535,15 @@ safe_or_connection_new(bool requires_buffers, bool is_outgoing,
   event_listener_set_callback(TO_SAFE_CONN(safe_or_conn)->event_listener,
                               or_conn_open_ev,
                               NULL, safe_or_conn_open_cb);
+  event_listener_set_callback(TO_SAFE_CONN(safe_or_conn)->event_listener,
+                              or_conn_outgoing_packed_cell,
+                              NULL, safe_or_conn_outgoing_cell_cb);
+  event_listener_set_callback(TO_SAFE_CONN(safe_or_conn)->event_listener,
+                              or_conn_outgoing_fixed_cell,
+                              NULL, safe_or_conn_outgoing_cell_cb);
+  event_listener_set_callback(TO_SAFE_CONN(safe_or_conn)->event_listener,
+                              or_conn_outgoing_variable_cell,
+                              NULL, safe_or_conn_outgoing_cell_cb);
 
   if (conn_event_source) {
     event_source_subscribe(conn_event_source,
@@ -505,6 +552,15 @@ safe_or_connection_new(bool requires_buffers, bool is_outgoing,
     event_source_subscribe(conn_event_source,
                            TO_SAFE_CONN(safe_or_conn)->event_listener,
                            or_conn_open_ev);
+    event_source_subscribe(conn_event_source,
+                           TO_SAFE_CONN(safe_or_conn)->event_listener,
+                           or_conn_outgoing_packed_cell);
+    event_source_subscribe(conn_event_source,
+                           TO_SAFE_CONN(safe_or_conn)->event_listener,
+                           or_conn_outgoing_fixed_cell);
+    event_source_subscribe(conn_event_source,
+                           TO_SAFE_CONN(safe_or_conn)->event_listener,
+                           or_conn_outgoing_variable_cell);
   }
 
   safe_or_conn->link_protocol = 0; // unknown protocol
@@ -540,6 +596,9 @@ safe_or_connection_socket_added_cb(safe_connection_t *safe_conn)
 static void
 safe_or_connection_outbuf_modified_cb(safe_connection_t *safe_conn)
 {
+  log_warn(LD_OR, "Nothing should write directly to an OR conn buffer");
+  tor_assert(0);
+
   tor_assert(safe_conn != NULL);
   safe_or_connection_t *safe_or_conn = TO_SAFE_OR_CONN(safe_conn);
 
@@ -1507,6 +1566,86 @@ safe_or_connection_write_cb(safe_connection_t *safe_conn)
 
 /********************************************************/
 
+/*
+static void
+append_to_incoming_cell_queue(safe_or_connection_t *safe_or_conn,
+                              generic_cell_t *cell)
+{
+  tor_assert(safe_or_conn != NULL);
+  tor_mutex_acquire(&safe_or_conn->incoming_cell_queue->lock);
+
+  TOR_TAILQ_INSERT_TAIL(&safe_or_conn->incoming_cell_queue->head, cell);
+
+  tor_mutex_release(&safe_or_conn->incoming_cell_queue->lock);
+}
+*/
+
+static void
+safe_or_conn_outgoing_cell_cb(event_label_t label, event_data_t data,
+                              void *context)
+{
+  safe_or_connection_t *safe_or_conn = context;
+  tor_assert(safe_or_conn != NULL);
+  tor_mutex_acquire(&TO_SAFE_CONN(safe_or_conn)->lock);
+
+  if (safe_or_conn->state == SAFE_OR_CONN_STATE_CLOSED) {
+    tor_mutex_release(&TO_SAFE_CONN(safe_or_conn)->lock);
+    return;
+  }
+  tor_assert(safe_or_conn->state == SAFE_OR_CONN_STATE_LINK_HANDSHAKING ||
+             safe_or_conn->state == SAFE_OR_CONN_STATE_OPEN);
+
+  struct buf_t *outbuf = TO_SAFE_CONN(safe_or_conn)->outbuf;
+  int rv = -1;
+
+  if (label == or_conn_outgoing_packed_cell) {
+    packed_cell_t *packed_cell = data.ptr;
+    tor_assert(packed_cell != NULL);
+    size_t cell_network_size = \
+      get_cell_network_size(safe_or_conn->wide_circ_ids?1:0);
+    tor_assert(packed_cell_get_command(packed_cell,
+      safe_or_conn->wide_circ_ids?1:0) != 0);
+
+    rv = buf_add(outbuf, packed_cell->body, cell_network_size);
+  } else if (label == or_conn_outgoing_fixed_cell) {
+    cell_t *cell = data.ptr;
+    tor_assert(cell != NULL);
+    //tor_assert(cell->command != 0); // PADDING cells have command == 0
+    size_t cell_network_size = \
+      get_cell_network_size(safe_or_conn->wide_circ_ids?1:0);
+
+    packed_cell_t packed_cell;
+    cell_pack(&packed_cell, cell, safe_or_conn->wide_circ_ids?1:0);
+
+    rv = buf_add(outbuf, packed_cell.body, cell_network_size);
+  } else if (label == or_conn_outgoing_variable_cell) {
+    var_cell_t *var_cell = data.ptr;
+    tor_assert(var_cell != NULL);
+    tor_assert(var_cell->command != 0);
+    char header[VAR_CELL_MAX_HEADER_SIZE];
+    int header_len = var_cell_pack_header(var_cell, header,
+                                          safe_or_conn->wide_circ_ids?1:0);
+    rv = buf_add(outbuf, header, header_len);
+    if (rv >= 0) {
+      rv = buf_add(outbuf, (char *)var_cell->payload, var_cell->payload_len);
+    }
+  } else {
+    log_warn(LD_OR, "Received an unexpected event type");
+    tor_assert_nonfatal_unreached_once();
+  }
+
+  if (rv < 0) {
+    log_warn(LD_OR, "Safe OR connection could not write to outgoing buffer");
+    tor_assert(safe_or_connection_update_state(safe_or_conn,
+      SAFE_OR_CONN_STATE_CLOSED) == E_SUCCESS);
+  } else {
+    socket_rw_state_set(&safe_or_conn->tor_write_wanted, true,
+                        TO_SAFE_CONN(safe_or_conn));
+  }
+
+  tor_mutex_release(&TO_SAFE_CONN(safe_or_conn)->lock);
+}
+
 static bool
 fetch_cell(safe_or_connection_t *safe_or_conn, char *cell_buf)
 {

+ 21 - 0
src/core/or/safe_connection.h

@@ -26,6 +26,25 @@ typedef struct link_handshaking_ev_data_t {
 
 void link_handshaking_ev_free(void *ptr);
 
+/*
+typedef struct generic_cell_t {
+  TOR_SIMPLEQ_ENTRY(safe_cell_t) next;
+  enum {
+    CELL_TYPE_FIXED,
+    CELL_TYPE_VAR,
+  } type;
+  union {
+    cell_t *fixed_cell;
+    var_cell_t *var_cell;
+  } data;
+} generic_cell_t;
+
+typedef struct safe_cell_queue_t {
+  tor_mutex_t lock;
+  TOR_SIMPLEQ_HEAD(safe_cell_queue_head_t, generic_cell_t) head;
+} safe_cell_queue_t;
+*/
+
 //#define SAFE_BASE_CONN_MAGIC 0x64DB4EE2u
 #define SAFE_OR_CONN_MAGIC 0x1221ABBAu
 
@@ -89,6 +108,8 @@ typedef struct safe_or_connection_t {
   bool wide_circ_ids;
   bool waiting_for_link_protocol;
 
+  //safe_cell_queue_t incoming_cell_queue;
+
   socket_rw_state_t tor_read_wanted;
   socket_rw_state_t tor_write_wanted;
   socket_rw_state_t tls_read_wanted;

+ 29 - 19
src/core/or/scheduler_kist.c

@@ -16,6 +16,7 @@
 #include "lib/math/fp.h"
 
 #include "core/or/or_connection_st.h"
+#include "core/or/connection_or.h"
 
 #ifdef HAVE_SYS_IOCTL_H
 #include <sys/ioctl.h>
@@ -138,14 +139,17 @@ channel_outbuf_length(channel_t *chan)
   return len;
 }
 
+/* Little helper function for HT_FOREACH_FN. */
 static int
-each_channel_update_outbuf(outbuf_table_ent_t *ent, void *data)
+each_channel_wakeup_listeners(outbuf_table_ent_t *ent, void *data)
 {
   (void) data;
   connection_t *conn = TO_CONN(BASE_CHAN_TO_TLS(ent->chan)->conn);
-  tor_assert(conn->safe_conn != NULL);
-  safe_connection_start_caring_about_modified(conn->safe_conn);
-  safe_connection_outbuf_modified(conn->safe_conn);
+  event_source_deliver_silently(conn->event_source,
+                                or_conn_outgoing_packed_cell, false);
+  event_source_wakeup_listener(conn->event_source,
+                               or_conn_outgoing_packed_cell);
+  return 0; /* Returning non-zero removes the element from the table. */
 }
 
 /* Little helper function for HT_FOREACH_FN. */
@@ -370,7 +374,7 @@ init_socket_info(socket_table_t *table, const channel_t *chan)
 
 /* Add chan to the outbuf table if it isn't already in it. If it is, then don't
  * do anything */
-static void
+static bool
 outbuf_table_add(outbuf_table_t *table, channel_t *chan)
 {
   outbuf_table_ent_t search, *ent;
@@ -382,7 +386,9 @@ outbuf_table_add(outbuf_table_t *table, channel_t *chan)
     ent = tor_malloc_zero(sizeof(*ent));
     ent->chan = chan;
     HT_INSERT(outbuf_table_s, table, ent);
+    return true;
   }
+  return false;
 }
 
 static void
@@ -472,14 +478,14 @@ update_socket_written(socket_table_t *table, channel_t *chan, size_t bytes)
  * by only writing a channel's outbuf to the kernel if it has 8 cells or more
  * in it.
  */
-MOCK_IMPL(int, channel_should_write_to_kernel,
-          (outbuf_table_t *table, channel_t *chan))
-{
-  outbuf_table_add(table, chan);
-  /* CELL_MAX_NETWORK_SIZE * 8 because we only want to write the outbuf to the
-   * kernel if there's 8 or more cells waiting */
-  return channel_outbuf_length(chan) > (CELL_MAX_NETWORK_SIZE * 8);
-}
+//MOCK_IMPL(int, channel_should_write_to_kernel,
+//          (outbuf_table_t *table, channel_t *chan))
+//{
+//  outbuf_table_add(table, chan);
+//  /* CELL_MAX_NETWORK_SIZE * 8 because we only want to write the outbuf to the
+//   * kernel if there's 8 or more cells waiting */
+//  return channel_outbuf_length(chan) > (CELL_MAX_NETWORK_SIZE * 8);
+//}
 
 /* Little helper function to write a channel's outbuf all the way to the
  * kernel */
@@ -630,11 +636,15 @@ kist_scheduler_run(void)
        */
       continue;
     }
-    outbuf_table_add(&outbuf_table, chan);
-
-    connection_t *conn = TO_CONN(BASE_CHAN_TO_TLS(chan)->conn);
-    tor_assert(conn->safe_conn != NULL);
-    safe_connection_stop_caring_about_modified(conn->safe_conn);
+    bool added_new = outbuf_table_add(&outbuf_table, chan);
+
+    if (added_new) {
+      connection_t *conn = TO_CONN(BASE_CHAN_TO_TLS(chan)->conn);
+      event_source_deliver_silently(conn->event_source,
+                                    or_conn_outgoing_packed_cell, true);
+      //tor_assert(conn->safe_conn != NULL);
+      //safe_connection_stop_caring_about_modified(conn->safe_conn);
+    }
 
     /* if we have switched to a new channel, consider writing the previous
      * channel's outbuf to the kernel. */
@@ -746,7 +756,7 @@ kist_scheduler_run(void)
   /* Write the outbuf of any channels that still have data */
   //HT_FOREACH_FN(outbuf_table_s, &outbuf_table, each_channel_write_to_kernel,
   //              NULL);
-  HT_FOREACH_FN(outbuf_table_s, &outbuf_table, each_channel_update_outbuf,
+  HT_FOREACH_FN(outbuf_table_s, &outbuf_table, each_channel_wakeup_listeners,
                 NULL);
   /* We are done with it. */
   HT_FOREACH_FN(outbuf_table_s, &outbuf_table, free_outbuf_info_by_ent, NULL);