Browse Source

Added the safe connections (not working yet)

Added the code for the thread-safe connection objects, which contain
the socket, buffers, event source, etc for connections. This is only
used for OR connections.

This code does not fully work yet, but is getting close.
Steven Engler 4 years ago
parent
commit
75e677b13d

+ 43 - 42
src/app/main/main.c

@@ -326,53 +326,54 @@ dumpmemusage(int severity)
 static void
 dumpstats(int severity)
 {
+  log_warn(LD_NET, "Dumping stats has been reduced");
   time_t now = time(NULL);
   time_t elapsed;
-  size_t rbuf_cap, wbuf_cap, rbuf_len, wbuf_len;
+//  size_t rbuf_cap, wbuf_cap, rbuf_len, wbuf_len;
 
   tor_log(severity, LD_GENERAL, "Dumping stats:");
 
-  SMARTLIST_FOREACH_BEGIN(get_connection_array(), connection_t *, conn) {
-    int i = conn_sl_idx;
-    tor_log(severity, LD_GENERAL,
-        "Conn %d (socket %d) type %d (%s), state %d (%s), created %d secs ago",
-        i, (int)conn->s, conn->type, conn_type_to_string(conn->type),
-        conn->state, conn_state_to_string(conn->type, conn->state),
-        (int)(now - conn->timestamp_created));
-    if (!connection_is_listener(conn)) {
-      tor_log(severity,LD_GENERAL,
-          "Conn %d is to %s:%d.", i,
-          safe_str_client(conn->address),
-          conn->port);
-      tor_log(severity,LD_GENERAL,
-          "Conn %d: %d bytes waiting on inbuf (len %d, last read %d secs ago)",
-          i,
-          (int)connection_get_inbuf_len(conn),
-          (int)buf_allocation(conn->inbuf),
-          (int)(now - conn->timestamp_last_read_allowed));
-      tor_log(severity,LD_GENERAL,
-          "Conn %d: %d bytes waiting on outbuf "
-          "(len %d, last written %d secs ago)",i,
-          (int)connection_get_outbuf_len(conn),
-          (int)buf_allocation(conn->outbuf),
-          (int)(now - conn->timestamp_last_write_allowed));
-      if (conn->type == CONN_TYPE_OR) {
-        or_connection_t *or_conn = TO_OR_CONN(conn);
-        if (or_conn->tls) {
-          if (tor_tls_get_buffer_sizes(or_conn->tls, &rbuf_cap, &rbuf_len,
-                                       &wbuf_cap, &wbuf_len) == 0) {
-            tor_log(severity, LD_GENERAL,
-                "Conn %d: %d/%d bytes used on OpenSSL read buffer; "
-                "%d/%d bytes used on write buffer.",
-                i, (int)rbuf_len, (int)rbuf_cap, (int)wbuf_len, (int)wbuf_cap);
-          }
-        }
-      }
-    }
-    circuit_dump_by_conn(conn, severity); /* dump info about all the circuits
-                                           * using this conn */
-  } SMARTLIST_FOREACH_END(conn);
-
+//  SMARTLIST_FOREACH_BEGIN(get_connection_array(), connection_t *, conn) {
+//    int i = conn_sl_idx;
+//    tor_log(severity, LD_GENERAL,
+//        "Conn %d (socket %d) type %d (%s), state %d (%s), created %d secs ago",
+//        i, (int)conn->s, conn->type, conn_type_to_string(conn->type),
+//        conn->state, conn_state_to_string(conn->type, conn->state),
+//        (int)(now - conn->timestamp_created));
+//    if (!connection_is_listener(conn)) {
+//      tor_log(severity,LD_GENERAL,
+//          "Conn %d is to %s:%d.", i,
+//          safe_str_client(conn->address),
+//          conn->port);
+//      tor_log(severity,LD_GENERAL,
+//          "Conn %d: %d bytes waiting on inbuf (len %d, last read %d secs ago)",
+//          i,
+//          (int)connection_get_inbuf_len(conn),
+//          (int)buf_allocation(conn->inbuf),
+//          (int)(now - conn->timestamp_last_read_allowed));
+//      tor_log(severity,LD_GENERAL,
+//          "Conn %d: %d bytes waiting on outbuf "
+//          "(len %d, last written %d secs ago)",i,
+//          (int)connection_get_outbuf_len(conn),
+//          (int)buf_allocation(conn->outbuf),
+//          (int)(now - conn->timestamp_last_write_allowed));
+//      if (conn->type == CONN_TYPE_OR) {
+//        or_connection_t *or_conn = TO_OR_CONN(conn);
+//        if (or_conn->tls) {
+//          if (tor_tls_get_buffer_sizes(or_conn->tls, &rbuf_cap, &rbuf_len,
+//                                       &wbuf_cap, &wbuf_len) == 0) {
+//            tor_log(severity, LD_GENERAL,
+//                "Conn %d: %d/%d bytes used on OpenSSL read buffer; "
+//                "%d/%d bytes used on write buffer.",
+//                i, (int)rbuf_len, (int)rbuf_cap, (int)wbuf_len, (int)wbuf_cap);
+//          }
+//        }
+//      }
+//    }
+//    circuit_dump_by_conn(conn, severity); /* dump info about all the circuits
+//                                           * using this conn */
+//  } SMARTLIST_FOREACH_END(conn);
+//
   channel_dumpstats(severity);
   channel_listener_dumpstats(severity);
 

+ 2 - 0
src/core/include.am

@@ -55,6 +55,7 @@ LIBTOR_APP_A_SOURCES = 				\
 	src/core/or/protover_rust.c		\
 	src/core/or/reasons.c			\
 	src/core/or/relay.c			\
+	src/core/or/safe_connection.c	\
 	src/core/or/scheduler.c			\
 	src/core/or/scheduler_kist.c		\
 	src/core/or/scheduler_vanilla.c		\
@@ -286,6 +287,7 @@ noinst_HEADERS +=					\
 	src/core/or/reasons.h				\
 	src/core/or/relay.h				\
 	src/core/or/relay_crypto_st.h			\
+	src/core/or/safe_connection.h		\
 	src/core/or/scheduler.h				\
 	src/core/or/sendme.h				\
 	src/core/or/server_port_cfg_st.h		\

File diff suppressed because it is too large
+ 501 - 296
src/core/mainloop/connection.c


+ 93 - 35
src/core/mainloop/mainloop.c

@@ -209,6 +209,8 @@ static void conn_write_callback(evutil_socket_t fd, short event, void *_conn);
 static void shutdown_did_not_work_callback(evutil_socket_t fd, short event,
                                            void *arg) ATTR_NORETURN;
 
+static event_registry_t *event_registry = NULL;
+
 /****************************************************************************
  *
  * This section contains accessors and other methods on the connection_array
@@ -248,29 +250,51 @@ note_that_we_maybe_cant_complete_circuits(void)
 int
 connection_add_impl(connection_t *conn, int is_connecting)
 {
-  tor_assert(conn);
-  tor_assert(SOCKET_OK(conn->s) ||
-             conn->linked ||
-             (conn->type == CONN_TYPE_AP &&
-              TO_EDGE_CONN(conn)->is_dns_request));
+  (void) is_connecting;
+
+  tor_assert(conn != NULL);
 
   tor_assert(conn->conn_array_index == -1); /* can only connection_add once */
   conn->conn_array_index = smartlist_len(connection_array);
   smartlist_add(connection_array, conn);
 
-  (void) is_connecting;
+  event_listener_attach(conn->event_listener, tor_libevent_get_base());
+  // TODO: check for error
+
+  if (conn->safe_conn == NULL) {
+    tor_assert(conn->type != CONN_TYPE_OR);
+    tor_assert(SOCKET_OK(conn->s) ||
+               conn->linked ||
+               (conn->type == CONN_TYPE_AP &&
+                TO_EDGE_CONN(conn)->is_dns_request));
+
+    if (SOCKET_OK(conn->s) || conn->linked) {
+      conn->read_event = tor_event_new(tor_libevent_get_base(),
+           conn->s, EV_READ|EV_PERSIST, conn_read_callback, conn);
+      conn->write_event = tor_event_new(tor_libevent_get_base(),
+           conn->s, EV_WRITE|EV_PERSIST, conn_write_callback, conn);
+      /* XXXX CHECK FOR NULL RETURN! */
+    }
 
-  if (SOCKET_OK(conn->s) || conn->linked) {
-    conn->read_event = tor_event_new(tor_libevent_get_base(),
-         conn->s, EV_READ|EV_PERSIST, conn_read_callback, conn);
-    conn->write_event = tor_event_new(tor_libevent_get_base(),
-         conn->s, EV_WRITE|EV_PERSIST, conn_write_callback, conn);
-    /* XXXX CHECK FOR NULL RETURN! */
-  }
+    log_debug(LD_NET, "new conn type %s, socket %d, address %s, n_conns %d.",
+              conn_type_to_string(conn->type), (int)conn->s, conn->address,
+              smartlist_len(connection_array));
+  } else {
+    tor_assert(conn->type == CONN_TYPE_OR);
 
-  log_debug(LD_NET,"new conn type %s, socket %d, address %s, n_conns %d.",
-            conn_type_to_string(conn->type), (int)conn->s, conn->address,
-            smartlist_len(connection_array));
+    error_t rv = safe_connection_register_events(conn->safe_conn,
+                                                 tor_libevent_get_base());
+
+    if (rv != E_SUCCESS) {
+      smartlist_remove(connection_array, conn);
+      conn->conn_array_index = -1;
+      return 1;
+    }
+
+    log_debug(LD_NET, "new conn type %s, address %s, n_conns %d.",
+              conn_type_to_string(conn->type), conn->address,
+              smartlist_len(connection_array));
+  }
 
   return 0;
 }
@@ -279,19 +303,29 @@ connection_add_impl(connection_t *conn, int is_connecting)
 void
 connection_unregister_events(connection_t *conn)
 {
-  if (conn->read_event) {
-    if (event_del(conn->read_event))
-      log_warn(LD_BUG, "Error removing read event for %d", (int)conn->s);
-    tor_free(conn->read_event);
-  }
-  if (conn->write_event) {
-    if (event_del(conn->write_event))
-      log_warn(LD_BUG, "Error removing write event for %d", (int)conn->s);
-    tor_free(conn->write_event);
-  }
-  if (conn->type == CONN_TYPE_AP_DNS_LISTENER) {
-    dnsserv_close_listener(conn);
+  tor_assert(conn != NULL);
+  if (conn->safe_conn == NULL) {
+    tor_assert(conn->type != CONN_TYPE_OR);
+    if (conn->read_event) {
+      if (event_del(conn->read_event))
+        log_warn(LD_BUG, "Error removing read event for %d", (int)conn->s);
+      tor_free(conn->read_event);
+    }
+    if (conn->write_event) {
+      if (event_del(conn->write_event))
+        log_warn(LD_BUG, "Error removing write event for %d", (int)conn->s);
+      tor_free(conn->write_event);
+    }
+    if (conn->type == CONN_TYPE_AP_DNS_LISTENER) {
+      dnsserv_close_listener(conn);
+    }
+  } else {
+    tor_assert(conn->type == CONN_TYPE_OR);
+    safe_connection_unregister_events(conn->safe_conn);
   }
+
+  event_listener_detach(conn->event_listener);
+  // TODO: do we want to process the remaining events?
 }
 
 /** Remove the connection from the global list, and remove the
@@ -306,7 +340,7 @@ connection_remove(connection_t *conn)
 
   tor_assert(conn);
 
-  log_debug(LD_NET,"removing socket %d (type %s), n_conns now %d",
+  log_debug(LD_NET, "removing socket %d (type %s), n_conns now %d",
             (int)conn->s, conn_type_to_string(conn->type),
             smartlist_len(connection_array));
 
@@ -614,6 +648,7 @@ MOCK_IMPL(void,
 connection_stop_reading,(connection_t *conn))
 {
   tor_assert(conn);
+  tor_assert(conn->type != CONN_TYPE_OR);
 
   if (connection_check_event(conn, conn->read_event) < 0) {
     return;
@@ -636,6 +671,7 @@ MOCK_IMPL(void,
 connection_start_reading,(connection_t *conn))
 {
   tor_assert(conn);
+  tor_assert(conn->type != CONN_TYPE_OR);
 
   if (connection_check_event(conn, conn->read_event) < 0) {
     return;
@@ -659,6 +695,7 @@ int
 connection_is_writing(connection_t *conn)
 {
   tor_assert(conn);
+  tor_assert(conn->type != CONN_TYPE_OR);
 
   return conn->writing_to_linked_conn ||
     (conn->write_event && event_pending(conn->write_event, EV_WRITE, NULL));
@@ -669,6 +706,7 @@ MOCK_IMPL(void,
 connection_stop_writing,(connection_t *conn))
 {
   tor_assert(conn);
+  tor_assert(conn->type != CONN_TYPE_OR);
 
   if (connection_check_event(conn, conn->write_event) < 0) {
     return;
@@ -692,6 +730,7 @@ MOCK_IMPL(void,
 connection_start_writing,(connection_t *conn))
 {
   tor_assert(conn);
+  tor_assert(conn->type != CONN_TYPE_OR);
 
   if (connection_check_event(conn, conn->write_event) < 0) {
     return;
@@ -1009,8 +1048,12 @@ conn_close_if_marked(int i)
                 connection_wants_to_flush(conn));
     } else if (connection_speaks_cells(conn)) {
       if (conn->state == OR_CONN_STATE_OPEN) {
-        retval = buf_flush_to_tls(conn->outbuf, TO_OR_CONN(conn)->tls, sz,
-                               &conn->outbuf_flushlen);
+        // TODO: force TLS flush here, need to communicate to other thread somehow
+        //       really we need to tell the thread to close, and whether it should
+        //       try to flush or not
+        retval = -1;
+        //retval = buf_flush_to_tls(conn->outbuf, TO_OR_CONN(conn)->tls, sz,
+        //                       &conn->outbuf_flushlen);
       } else
         retval = -1; /* never flush non-open broken tls connections */
     } else {
@@ -1214,7 +1257,7 @@ run_connection_housekeeping(int i, time_t now)
      the connection or send a keepalive, depending. */
 
   or_conn = TO_OR_CONN(conn);
-  tor_assert(conn->outbuf);
+  //tor_assert(conn->outbuf);
 
   chan = TLS_CHAN_TO_BASE(or_conn->chan);
   tor_assert(chan);
@@ -1247,7 +1290,9 @@ run_connection_housekeeping(int i, time_t now)
     }
   } else if (we_are_hibernating() &&
              ! have_any_circuits &&
-             !connection_get_outbuf_len(conn)) {
+//             !connection_get_outbuf_len(conn)) {
+// TODO: we should have a way to make sure the safe connection has no data to send
+             1) {
     /* We're hibernating or shutting down, there's no circuits, and nothing to
      * flush.*/
     log_info(LD_OR,"Expiring non-used OR connection to fd %d (%s:%d) "
@@ -1273,10 +1318,12 @@ run_connection_housekeeping(int i, time_t now)
            "Expiring stuck OR connection to fd %d (%s:%d). (%d bytes to "
            "flush; %d seconds since last write)",
            (int)conn->s, conn->address, conn->port,
-           (int)connection_get_outbuf_len(conn),
+           //(int)connection_get_outbuf_len(conn),
+           12345678, // TODO: do something sensible here
            (int)(now-conn->timestamp_last_write_allowed));
     connection_or_close_normally(TO_OR_CONN(conn), 0);
-  } else if (past_keepalive && !connection_get_outbuf_len(conn)) {
+  } else if (past_keepalive && 1) { //!connection_get_outbuf_len(conn)) {
+// TODO: we should have a way to make sure the safe connection has no data to send
     /* send a padding cell */
     log_fn(LOG_DEBUG,LD_OR,"Sending keepalive to (%s:%d)",
            conn->address, conn->port);
@@ -2332,6 +2379,15 @@ initialize_mainloop_events(void)
   }
 }
 
+static void
+init_event_registry(void)
+{
+  tor_assert(event_registry == NULL);
+  event_registry = event_registry_new();
+
+  safe_or_conn_register_events(event_registry);
+}
+
 static void
 tor_eventloop_thread(void)
 {
@@ -2363,6 +2419,8 @@ do_main_loop(void)
   tor_assert(periodic_events_initialized);
   initialize_mainloop_events();
 
+  init_event_registry();
+
   periodic_events_connect_all();
 
   struct timeval one_second = { 1, 0 };

+ 3 - 2
src/core/or/channeltls.c

@@ -1415,7 +1415,7 @@ enter_v3_handshake_with_cell(var_cell_t *cell, channel_tls_t *chan)
            "Received a cell while TLS-handshaking, not in "
            "OR_HANDSHAKING_V3, on a connection we originated.");
   }
-  connection_or_block_renegotiation(chan->conn);
+  //connection_or_block_renegotiation(chan->conn);
   chan->conn->base_.state = OR_CONN_STATE_OR_HANDSHAKING_V3;
   if (connection_init_or_handshake_state(chan->conn, started_here) < 0) {
     connection_or_close_for_error(chan->conn, 0);
@@ -2119,7 +2119,8 @@ channel_tls_process_certs_cell(var_cell_t *cell, channel_tls_t *chan)
   const common_digests_t *checked_rsa_id = NULL;
   or_handshake_certs_check_both(severity,
                                 chan->conn->handshake_state->certs,
-                                chan->conn->tls,
+                                //chan->conn->tls,
+                                chan->conn->tls_peer_cert,
                                 time(NULL),
                                 &checked_ed_id,
                                 &checked_rsa_id);

File diff suppressed because it is too large
+ 522 - 354
src/core/or/connection_or.c


+ 5 - 2
src/core/or/connection_or.h

@@ -17,6 +17,7 @@ struct ed25519_keypair_t;
 
 or_connection_t *TO_OR_CONN(connection_t *);
 
+#include "lib/evloop/events.h"
 #include "core/or/orconn_event.h"
 
 void connection_or_clear_identity(or_connection_t *conn);
@@ -59,8 +60,8 @@ void connection_or_report_broken_states(int severity, int domain);
 void connection_or_event_status(or_connection_t *conn,
                                 or_conn_status_event_t tp, int reason);
 
-MOCK_DECL(int,connection_tls_start_handshake,(or_connection_t *conn,
-                                              int receiving));
+//MOCK_DECL(int,connection_tls_start_handshake,(or_connection_t *conn,
+//                                              int receiving));
 int connection_tls_continue_handshake(or_connection_t *conn);
 void connection_or_set_canonical(or_connection_t *or_conn,
                                  int is_canonical);
@@ -78,6 +79,8 @@ int connection_or_client_learned_peer_id(or_connection_t *conn,
                               const struct ed25519_public_key_t *ed_peer_id);
 time_t connection_or_client_used(or_connection_t *conn);
 MOCK_DECL(int, connection_or_get_num_circuits, (or_connection_t *conn));
+void connection_or_process_event(event_label_t label, event_data_t data,
+                                 void *context);
 void or_handshake_state_free_(or_handshake_state_t *state);
 #define or_handshake_state_free(state) \
   FREE_AND_NULL(or_handshake_state_t, or_handshake_state_free_, (state))

+ 5 - 0
src/core/or/connection_st.h

@@ -7,6 +7,8 @@
 #ifndef CONNECTION_ST_H
 #define CONNECTION_ST_H
 
+#include "core/or/safe_connection.h"
+
 struct buf_t;
 
 /* Values for connection_t.magic: used to make sure that downcasts (casts from
@@ -41,6 +43,9 @@ struct connection_t {
   uint32_t magic; /**< For memory debugging: must equal one of
                    * *_CONNECTION_MAGIC. */
 
+  safe_connection_t *safe_conn;
+  event_listener_t *event_listener;
+
   uint8_t state; /**< Current state of this connection. */
   unsigned int type:5; /**< What kind of connection is this? */
   unsigned int purpose:5; /**< Only used for DIR and EXIT types currently. */

+ 6 - 3
src/core/or/or_connection_st.h

@@ -36,7 +36,7 @@ struct or_connection_t {
 
   char *nickname; /**< Nickname of OR on other side (if any). */
 
-  struct tor_tls_t *tls; /**< TLS connection state. */
+  //struct tor_tls_t *tls; /**< TLS connection state. */
   int tls_error; /**< Last tor_tls error code. */
   /** When we last used this conn for any client traffic. If not
    * recent, we can rate limit it further. */
@@ -81,14 +81,17 @@ struct or_connection_t {
 
   time_t timestamp_lastempty; /**< When was the outbuf last completely empty?*/
 
-  token_bucket_rw_t bucket; /**< Used for rate limiting when the connection is
-                          * in state CONN_OPEN. */
+  //token_bucket_rw_t bucket; /**< Used for rate limiting when the connection is
+  //                        * in state CONN_OPEN. */
 
   /*
    * Count the number of bytes flushed out on this orconn, and the number of
    * bytes TLS actually sent - used for overhead estimation for scheduling.
    */
   uint64_t bytes_xmitted, bytes_xmitted_by_tls;
+
+  tor_x509_cert_t *tls_own_cert;
+  tor_x509_cert_t *tls_peer_cert;
 };
 
 #endif /* !defined(OR_CONNECTION_ST_H) */

+ 1 - 2
src/core/or/orconn_event.h

@@ -31,8 +31,7 @@
 #define OR_CONN_STATE_CONNECTING 1
 /** State for a connection to an OR: waiting for proxy handshake to complete */
 #define OR_CONN_STATE_PROXY_HANDSHAKING 2
-/** State for an OR connection client: SSL is handshaking, not done
- * yet. */
+/** State for an OR connection: SSL is handshaking, not done yet. */
 #define OR_CONN_STATE_TLS_HANDSHAKING 3
 /** State for a connection to an OR: We're doing a second SSL handshake for
  * renegotiation purposes. (V2 handshake only.) */

+ 3 - 0
src/core/or/relay.h

@@ -12,6 +12,9 @@
 #ifndef TOR_RELAY_H
 #define TOR_RELAY_H
 
+#include <stdint.h>
+#include "core/or/or.h"
+
 extern uint64_t stats_n_relay_cells_relayed;
 extern uint64_t stats_n_relay_cells_delivered;
 extern uint64_t stats_n_circ_max_cell_reached;

+ 1408 - 0
src/core/or/safe_connection.c

@@ -0,0 +1,1408 @@
+#include "core/or/safe_connection.h"
+#include "app/config/config.h"
+#include "lib/net/buffers_net.h"
+#include "lib/tls/tortls.h"
+#include "lib/tls/buffers_tls.h"
+
+event_label_t safe_or_conn_tcp_connecting_ev = EVENT_LABEL_UNSET;
+event_label_t safe_or_conn_tls_handshaking_ev = EVENT_LABEL_UNSET;
+event_label_t safe_or_conn_link_handshaking_ev = EVENT_LABEL_UNSET;
+event_label_t safe_or_conn_open_ev = EVENT_LABEL_UNSET;
+event_label_t safe_or_conn_closed_ev = EVENT_LABEL_UNSET;
+event_label_t safe_or_conn_has_buffered_data_ev = EVENT_LABEL_UNSET;
+
+static void
+safe_connection_refresh_events(safe_connection_t *safe_conn);
+
+static void
+safe_or_connection_refresh_bucket_rw_states(safe_or_connection_t *safe_or_conn);
+
+static tor_error_t
+safe_or_connection_update_state(safe_or_connection_t *safe_or_conn,
+                                or_conn_state_t new_state);
+
+static bool
+safe_or_connection_is_read_wanted(safe_connection_t *safe_conn);
+
+static bool
+safe_or_connection_is_write_wanted(safe_connection_t *safe_conn);
+
+static void
+safe_or_connection_read_cb(safe_connection_t *safe_conn);
+
+static void
+safe_or_connection_write_cb(safe_connection_t *safe_conn);
+
+static void
+safe_or_connection_socket_added_cb(safe_connection_t *safe_conn);
+
+static void
+safe_or_connection_outbuf_modified_cb(safe_connection_t *safe_conn);
+
+/********************************************************/
+
+safe_or_connection_t *
+TO_SAFE_OR_CONN(safe_connection_t *safe_conn)
+{
+  tor_assert(safe_conn != NULL);
+  tor_assert(safe_conn->magic == SAFE_OR_CONN_MAGIC);
+  return DOWNCAST(safe_or_connection_t, safe_conn);
+}
+
+void
+safe_or_conn_register_events(event_registry_t *registry)
+{
+  tor_assert(safe_or_conn_tcp_connecting_ev == EVENT_LABEL_UNSET);
+  tor_assert(safe_or_conn_tls_handshaking_ev == EVENT_LABEL_UNSET);
+  tor_assert(safe_or_conn_link_handshaking_ev == EVENT_LABEL_UNSET);
+  tor_assert(safe_or_conn_open_ev == EVENT_LABEL_UNSET);
+  tor_assert(safe_or_conn_closed_ev == EVENT_LABEL_UNSET);
+  tor_assert(safe_or_conn_has_buffered_data_ev == EVENT_LABEL_UNSET);
+
+  safe_or_conn_tcp_connecting_ev = \
+    event_registry_register_event(registry, "OR Connection Connecting");
+  safe_or_conn_tls_handshaking_ev = \
+    event_registry_register_event(registry, "Starting OR TLS Handshake");
+  safe_or_conn_link_handshaking_ev = \
+    event_registry_register_event(registry, "Starting OR Link Handshake");
+  safe_or_conn_open_ev = \
+    event_registry_register_event(registry, "OR Connection Open");
+  safe_or_conn_closed_ev = \
+    event_registry_register_event(registry, "OR Connection Closed");
+  safe_or_conn_has_buffered_data_ev = \
+    event_registry_register_event(registry, "OR Connection Has Data In Buffer");
+}
+
+/********************************************************/
+
+void
+link_handshaking_ev_free(void *ptr)
+{
+  // we don't need to free the certs since we passed the ownership
+  tor_free(ptr);
+}
+
+/********************************************************/
+
+static void
+socket_rw_state_init(socket_rw_state_t *rw_state,
+                     bool initial_state)
+{
+  tor_assert(rw_state != NULL);
+
+  rw_state->state = initial_state;
+}
+
+static bool
+socket_rw_state_get(socket_rw_state_t *rw_state)
+{
+  tor_assert(rw_state != NULL);
+
+  return rw_state->state;
+}
+
+static void
+socket_rw_state_set(socket_rw_state_t *rw_state,
+                    bool new_state,
+                    safe_connection_t *safe_conn)
+{
+  tor_assert(rw_state != NULL);
+  tor_assert(safe_conn != NULL);
+
+  if (new_state != rw_state->state) {
+    rw_state->state = new_state;
+    safe_connection_refresh_events(safe_conn);
+  }
+}
+
+/********************************************************/
+
+void
+safe_connection_init(safe_connection_t *safe_conn, uint32_t type_magic,
+                     bool (*is_read_wanted)(safe_connection_t *),
+                     bool (*is_write_wanted)(safe_connection_t *),
+                     void (*read_cb)(safe_connection_t *),
+                     void (*write_cb)(safe_connection_t *),
+                     void (*socket_added_cb)(safe_connection_t *),
+                     void (*inbuf_modified_cb)(safe_connection_t *),
+                     void (*outbuf_modified_cb)(safe_connection_t *),
+                     bool requires_buffers, bool linked)
+{
+  tor_assert(safe_conn != NULL);
+  tor_assert(is_read_wanted != NULL);
+  tor_assert(is_write_wanted != NULL);
+  tor_assert(read_cb != NULL);
+  tor_assert(write_cb != NULL);
+
+  memset(safe_conn, 0, sizeof(*safe_conn));
+
+  safe_conn->magic = type_magic;
+  safe_conn->socket = TOR_INVALID_SOCKET;
+  safe_conn->linked = linked;
+
+  safe_conn->event_source = event_source_new();
+
+  socket_rw_state_init(&safe_conn->read_allowed, true);
+  socket_rw_state_init(&safe_conn->write_allowed, true);
+
+  tor_mutex_init(&safe_conn->lock);
+
+  safe_conn->is_read_wanted = is_read_wanted;
+  safe_conn->is_write_wanted = is_write_wanted;
+  safe_conn->read_cb = read_cb;
+  safe_conn->write_cb = write_cb;
+  safe_conn->socket_added_cb = socket_added_cb;
+  safe_conn->inbuf_modified_cb = inbuf_modified_cb;
+  safe_conn->outbuf_modified_cb = outbuf_modified_cb;
+
+  if (requires_buffers) {
+    safe_conn->inbuf = buf_new();
+    safe_conn->outbuf = buf_new();
+  }
+}
+
+void
+safe_connection_set_socket(safe_connection_t *safe_conn, tor_socket_t socket)
+{
+  tor_assert(safe_conn != NULL);
+  tor_mutex_acquire(&safe_conn->lock);
+  tor_assert(!safe_conn->linked);
+  tor_assert(SOCKET_OK(socket));
+
+  if (safe_conn->socket != TOR_INVALID_SOCKET) {
+    log_warn(LD_BUG, "We're overwriting a previous socket");
+  }
+  safe_conn->socket = socket;
+
+  if (safe_conn->socket_added_cb != NULL) {
+    safe_conn->socket_added_cb(safe_conn);
+  }
+
+  tor_mutex_release(&safe_conn->lock);
+}
+
+static void
+safe_connection_read_cb(evutil_socket_t ev_sock, short fd, void *void_safe_conn)
+{
+  (void)ev_sock;
+  (void)fd;
+  safe_connection_t *safe_conn = void_safe_conn;
+
+  tor_assert(safe_conn != NULL);
+  tor_mutex_acquire(&safe_conn->lock);
+  tor_assert(safe_conn->read_cb != NULL);
+  //tor_assert(safe_conn->read_event != NULL);
+
+  // NOTE: the below requires obtaining a lock on the event base, which adds
+  //       unnecessary slowness
+  // XX: Is the above true?
+  //if (!event_pending(safe_conn->read_event, EV_READ, NULL)) {
+  //  // another thread may have disabled this event between when the
+  //  // callback started, and when we acquired the lock above
+  //  return;
+  //}
+
+  //if (!safe_conn->read_allowed || !safe_conn->read_wanted) {
+  //  // we shouldn't be reading
+  //  return;
+  //}
+
+  safe_conn->read_cb(safe_conn);
+
+  tor_mutex_release(&safe_conn->lock);
+}
+
+static void
+safe_connection_write_cb(evutil_socket_t ev_sock, short fd, void *void_safe_conn)
+{
+  (void)ev_sock;
+  (void)fd;
+  safe_connection_t *safe_conn = void_safe_conn;
+
+  tor_assert(safe_conn != NULL);
+  tor_mutex_acquire(&safe_conn->lock);
+  tor_assert(safe_conn->write_cb != NULL);
+  //tor_assert(safe_conn->write_event != NULL);
+
+  // NOTE: the below requires obtaining a lock on the event base, which adds
+  //       unnecessary slowness
+  // XX: Is the above true?
+  //if (!event_pending(safe_conn->write_event, EV_WRITE, NULL)) {
+  //  // another thread may have disabled this event between when the
+  //  // callback started, and when we acquired the lock above
+  //  return;
+  //}
+
+  //if (!safe_conn->write_allowed || !safe_conn->write_wanted) {
+  //  // we shouldn't be writing
+  //  return;
+  //}
+
+  safe_conn->write_cb(safe_conn);
+
+  tor_mutex_release(&safe_conn->lock);
+}
+
+void
+safe_connection_subscribe(safe_connection_t *safe_conn,
+                          event_listener_t *listener, event_label_t label,
+                          bool send_full_event)
+{
+  tor_assert(safe_conn != NULL);
+  tor_mutex_acquire(&safe_conn->lock);
+
+  event_source_subscribe(safe_conn->event_source, listener, label,
+                         send_full_event);
+
+  tor_mutex_release(&safe_conn->lock);
+}
+
+void
+safe_connection_unsubscribe_all(safe_connection_t *safe_conn,
+                                event_listener_t *listener)
+{
+  tor_assert(safe_conn != NULL);
+  tor_mutex_acquire(&safe_conn->lock);
+
+  event_source_unsubscribe_all(safe_conn->event_source, listener);
+
+  tor_mutex_release(&safe_conn->lock);
+}
+
+void
+safe_connection_unregister_events(safe_connection_t *safe_conn)
+{
+  tor_assert(safe_conn != NULL);
+  tor_mutex_acquire(&safe_conn->lock);
+
+  if (safe_conn->read_event != NULL) {
+    tor_event_free(safe_conn->read_event);
+  }
+  if (safe_conn->write_event != NULL) {
+    tor_event_free(safe_conn->write_event);
+  }
+
+  tor_mutex_release(&safe_conn->lock);
+}
+
+tor_error_t
+safe_connection_register_events(safe_connection_t *safe_conn,
+                                struct event_base *event_base)
+{
+  tor_assert(safe_conn != NULL);
+  tor_mutex_acquire(&safe_conn->lock);
+  tor_assert(safe_conn->read_cb != NULL);
+  tor_assert(safe_conn->write_cb != NULL);
+  tor_assert(safe_conn->linked != SOCKET_OK(safe_conn->socket));
+  // is either linked or has a socket, but not both (or neither)
+
+  safe_connection_unregister_events(safe_conn);
+
+  safe_conn->read_event = tor_event_new(event_base, safe_conn->socket,
+                                        EV_READ|EV_PERSIST,
+                                        safe_connection_read_cb, safe_conn);
+  safe_conn->write_event = tor_event_new(event_base, safe_conn->socket,
+                                         EV_WRITE|EV_PERSIST,
+                                         safe_connection_write_cb, safe_conn);
+
+  if (safe_conn->read_event == NULL || safe_conn->write_event == NULL) {
+    log_warn(LD_BUG, "Could not set events for %d", (int)safe_conn->socket);
+    safe_connection_unregister_events(safe_conn);
+    tor_mutex_release(&safe_conn->lock);
+    return E_ERROR;
+  }
+
+  safe_connection_refresh_events(safe_conn);
+
+  tor_mutex_release(&safe_conn->lock);
+  return E_SUCCESS;
+}
+
+static void
+safe_connection_refresh_events(safe_connection_t *safe_conn)
+{
+  tor_assert(safe_conn != NULL);
+  tor_mutex_acquire(&safe_conn->lock);
+  tor_assert(safe_conn->is_read_wanted != NULL);
+  tor_assert(safe_conn->is_write_wanted != NULL);
+
+  if (safe_conn->read_event != NULL) {
+    if (socket_rw_state_get(&safe_conn->read_allowed) &&
+        safe_conn->is_read_wanted(safe_conn)) {
+      event_add(safe_conn->read_event, NULL);
+    } else {
+      event_del(safe_conn->read_event);
+    }
+  }
+
+  if (safe_conn->write_event != NULL) {
+    if (socket_rw_state_get(&safe_conn->write_allowed) &&
+        safe_conn->is_write_wanted(safe_conn)) {
+      event_add(safe_conn->write_event, NULL);
+    } else {
+      event_del(safe_conn->write_event);
+    }
+  }
+
+  tor_mutex_release(&safe_conn->lock);
+}
+
+void
+safe_connection_set_read_permission(safe_connection_t *safe_conn,
+                                    bool read_allowed)
+{
+  tor_assert(safe_conn != NULL);
+  tor_mutex_acquire(&safe_conn->lock);
+  socket_rw_state_set(&safe_conn->read_allowed, read_allowed, safe_conn);
+  tor_mutex_release(&safe_conn->lock);
+}
+
+void
+safe_connection_set_write_permission(safe_connection_t *safe_conn,
+                                     bool write_allowed)
+{
+  tor_assert(safe_conn != NULL);
+  tor_mutex_acquire(&safe_conn->lock);
+  socket_rw_state_set(&safe_conn->write_allowed, write_allowed, safe_conn);
+  tor_mutex_release(&safe_conn->lock);
+}
+
+void
+safe_connection_inbuf_modified(safe_connection_t *safe_conn)
+{
+  tor_assert(safe_conn != NULL);
+  tor_mutex_acquire(&safe_conn->lock);
+
+  if (safe_conn->inbuf_modified_cb != NULL) {
+    safe_conn->inbuf_modified_cb(safe_conn);
+  }
+
+  tor_mutex_release(&safe_conn->lock);
+}
+
+void
+safe_connection_outbuf_modified(safe_connection_t *safe_conn)
+{
+  tor_assert(safe_conn != NULL);
+  tor_mutex_acquire(&safe_conn->lock);
+
+  if (safe_conn->outbuf_modified_cb != NULL) {
+    safe_conn->outbuf_modified_cb(safe_conn);
+  }
+  
+  tor_mutex_release(&safe_conn->lock);
+}
+
+//void
+//safe_connection_use_inbuf(safe_connection_t *safe_conn,
+//                          int (*f)(struct buf_t *, void *, void **),
+//                          void *data,
+//                          void **ret_val)
+//{
+//  tor_assert(safe_conn != NULL);
+//  tor_assert(f != NULL);
+//  tor_mutex_acquire(&safe_conn->lock);
+//
+//  int rv = f(safe_conn->inbuf, data, ret_val);
+//
+//  tor_mutex_release(&safe_conn->lock);
+//
+//  return rv;
+//}
+
+/********************************************************/
+
+safe_or_connection_t *
+safe_or_connection_new(bool requires_buffers, bool is_outgoing,
+                       const char *remote_address_str)
+{
+  safe_or_connection_t *safe_or_conn = \
+    tor_malloc_zero(sizeof(safe_or_connection_t));
+
+  safe_connection_init(TO_SAFE_CONN(safe_or_conn),
+                       SAFE_OR_CONN_MAGIC,
+                       safe_or_connection_is_read_wanted,
+                       safe_or_connection_is_write_wanted,
+                       safe_or_connection_read_cb,
+                       safe_or_connection_write_cb,
+                       safe_or_connection_socket_added_cb,
+                       NULL,
+                       safe_or_connection_outbuf_modified_cb,
+                       requires_buffers, false);
+
+  token_bucket_rw_init(&safe_or_conn->bucket, 1, 1, time(NULL));
+  safe_or_conn->is_outgoing = is_outgoing;
+  if (remote_address_str != NULL) {
+    safe_or_conn->remote_address_str = \
+      tor_strdup(escaped_safe_str(remote_address_str));
+    // the function 'escaped_safe_str' must be run in the main thread
+  } else {
+    safe_or_conn->remote_address_str = NULL;
+    log_warn(LD_OR, "No remote address string was provided");
+  }
+
+  // these states should be set by 'safe_or_connection_update_state()'
+  socket_rw_state_init(&safe_or_conn->tor_read_wanted,  false);
+  socket_rw_state_init(&safe_or_conn->tor_write_wanted, false);
+  socket_rw_state_init(&safe_or_conn->tls_read_wanted,  false);
+  socket_rw_state_init(&safe_or_conn->tls_write_wanted, false);
+  socket_rw_state_init(&safe_or_conn->bucket_read_allowed,  false);
+  socket_rw_state_init(&safe_or_conn->bucket_write_allowed, false);
+  safe_or_connection_refresh_bucket_rw_states(safe_or_conn);
+
+  tor_assert(safe_or_connection_update_state(safe_or_conn,
+    SAFE_OR_CONN_STATE_NO_SOCKET) == E_SUCCESS);
+
+  return safe_or_conn;
+}
+
+static void
+safe_or_connection_socket_added_cb(safe_connection_t *safe_conn)
+{
+  tor_assert(safe_conn != NULL);
+
+  tor_assert(safe_or_connection_update_state(TO_SAFE_OR_CONN(safe_conn),
+    SAFE_OR_CONN_STATE_TCP_CONNECTING) == E_SUCCESS);
+  // it might already be connected, but it should be fine to transition
+  // through this state first
+}
+
+static void
+safe_or_connection_outbuf_modified_cb(safe_connection_t *safe_conn)
+{
+  tor_assert(safe_conn != NULL);
+  safe_or_connection_t *safe_or_conn = TO_SAFE_OR_CONN(safe_conn);
+
+  if (safe_or_conn->state == SAFE_OR_CONN_STATE_LINK_HANDSHAKING ||
+      safe_or_conn->state == SAFE_OR_CONN_STATE_OPEN) {
+    if (buf_datalen(TO_SAFE_CONN(safe_or_conn)->outbuf) > 0) {
+      socket_rw_state_set(&safe_or_conn->tor_write_wanted, true,
+                          TO_SAFE_CONN(safe_or_conn));
+    }
+  } else {
+    log_warn(LD_OR, "The outbuf was modified when in a state where it "
+                    "shouldn't be modified (state %d)", safe_or_conn->state);
+  }
+}
+
+static void
+safe_or_connection_refresh_bucket_rw_states(safe_or_connection_t *safe_or_conn)
+{
+  if (token_bucket_rw_get_read(&safe_or_conn->bucket) > 0) {
+    // token bucket is not empty, so we can read now
+    socket_rw_state_set(&safe_or_conn->bucket_read_allowed, true,
+                        TO_SAFE_CONN(safe_or_conn));
+    log_debug(LD_OR, "Token bucket for %p read is non-empty", safe_or_conn);
+  } else {
+    // token bucket is empty, so can't read now
+    socket_rw_state_set(&safe_or_conn->bucket_read_allowed, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    log_debug(LD_OR, "Token bucket for %p read is empty", safe_or_conn);
+  }
+  if (token_bucket_rw_get_write(&safe_or_conn->bucket) > 0) {
+    // token bucket is not empty, so we can write now
+    socket_rw_state_set(&safe_or_conn->bucket_write_allowed, true,
+                        TO_SAFE_CONN(safe_or_conn));
+    log_debug(LD_OR, "Token bucket for %p write is non-empty", safe_or_conn);
+  } else {
+    // token bucket is empty, so can't write now
+    socket_rw_state_set(&safe_or_conn->bucket_write_allowed, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    log_debug(LD_OR, "Token bucket for %p write is empty", safe_or_conn);
+  }
+}
+
+// TODO: we should get rid of this at some point
+void
+safe_or_connection_get_tls_desc(safe_or_connection_t *safe_or_conn,
+                                char *buf, size_t buf_size)
+{
+  tor_assert(safe_or_conn != NULL);
+  tor_assert(buf != NULL);
+  tor_mutex_acquire(&TO_SAFE_CONN(safe_or_conn)->lock);
+  tor_assert(safe_or_conn->tls != NULL);
+
+  tor_tls_get_state_description(safe_or_conn->tls, buf, buf_size);
+
+  tor_mutex_release(&TO_SAFE_CONN(safe_or_conn)->lock);
+}
+
+int
+safe_or_connection_tls_secrets(safe_or_connection_t *safe_or_conn,
+                               uint8_t *secrets_out)
+{
+  tor_assert(safe_or_conn != NULL);
+  tor_assert(secrets_out != NULL);
+  tor_mutex_acquire(&TO_SAFE_CONN(safe_or_conn)->lock);
+  tor_assert(safe_or_conn->tls != NULL);
+
+  int rv = tor_tls_get_tlssecrets(safe_or_conn->tls, secrets_out);
+
+  tor_mutex_release(&TO_SAFE_CONN(safe_or_conn)->lock);
+  return rv;
+}
+
+int
+safe_or_connection_key_material(safe_or_connection_t *safe_or_conn,
+                                uint8_t *secrets_out,
+                                const uint8_t *context,
+                                size_t context_len, const char *label)
+{
+  tor_assert(safe_or_conn != NULL);
+  tor_mutex_acquire(&TO_SAFE_CONN(safe_or_conn)->lock);
+  tor_assert(safe_or_conn->tls != NULL);
+
+  int rv = tor_tls_export_key_material(safe_or_conn->tls, secrets_out,
+                                       context, context_len, label);
+
+  tor_mutex_release(&TO_SAFE_CONN(safe_or_conn)->lock);
+  return rv;
+}
+
+void
+safe_or_connection_refill_buckets(safe_or_connection_t *safe_or_conn,
+                                  uint32_t now_ts)
+{
+  tor_assert(safe_or_conn != NULL);
+  tor_mutex_acquire(&TO_SAFE_CONN(safe_or_conn)->lock);
+  tor_assert(&safe_or_conn->bucket != NULL);
+
+  token_bucket_rw_refill(&safe_or_conn->bucket, now_ts);
+  safe_or_connection_refresh_bucket_rw_states(safe_or_conn);
+
+  tor_mutex_release(&TO_SAFE_CONN(safe_or_conn)->lock);
+}
+
+// TODO: this might be better implemented as a message so that we don't need
+//       to wait for the lock (but would require us to add a listener to the
+//       safe conn
+void
+safe_or_connection_adjust_buckets(safe_or_connection_t *safe_or_conn,
+                                  uint32_t rate, uint32_t burst,
+                                  bool reset, uint32_t now_ts)
+{
+  tor_assert(safe_or_conn != NULL);
+  tor_mutex_acquire(&TO_SAFE_CONN(safe_or_conn)->lock);
+  tor_assert(&safe_or_conn->bucket != NULL);
+
+  token_bucket_rw_adjust(&safe_or_conn->bucket, rate, burst);
+  if (reset) {
+    token_bucket_rw_reset(&safe_or_conn->bucket, now_ts);
+    safe_or_connection_refresh_bucket_rw_states(safe_or_conn);
+  }
+
+  tor_mutex_release(&TO_SAFE_CONN(safe_or_conn)->lock);
+}
+
+static void
+safe_or_connection_decrement_buckets(safe_or_connection_t *safe_or_conn,
+                                     size_t num_read, size_t num_written,
+                                     bool use_conn_buckets)
+{
+  if (use_conn_buckets) {
+    token_bucket_rw_dec(&safe_or_conn->bucket, num_read, num_written);
+  }
+  safe_or_connection_refresh_bucket_rw_states(safe_or_conn);
+}
+
+static size_t
+safe_or_connection_max_bytes_can_read(safe_or_connection_t *safe_or_conn,
+                                      bool use_conn_buckets)
+{
+  // this function may become more complicated if we add support for global
+  // buckets in the future
+  // note: that would be a bad way to do it, since instead we should borrow
+  // some space from the global bucket, and then commit it once the read
+  // is actually finished
+  if (use_conn_buckets) {
+    return token_bucket_rw_get_read(&safe_or_conn->bucket);
+  } else {
+    return SIZE_MAX;
+  }
+}
+
+static size_t
+safe_or_connection_max_bytes_can_write(safe_or_connection_t *safe_or_conn,
+                                       bool use_conn_buckets)
+{
+  // this function may become more complicated if we add support for global
+  // buckets in the future
+  // note: that would be a bad way to do it, since instead we should borrow
+  // some space from the global bucket, and then commit it once the write
+  // is actually finished
+  if (use_conn_buckets) {
+    return token_bucket_rw_get_write(&safe_or_conn->bucket);
+  } else {
+    return SIZE_MAX;
+  }
+}
+
+static bool
+safe_or_connection_is_read_wanted(safe_connection_t *safe_conn)
+{
+  tor_assert(safe_conn != NULL);
+  safe_or_connection_t *safe_or_conn = TO_SAFE_OR_CONN(safe_conn);
+
+  return socket_rw_state_get(&safe_or_conn->tls_read_wanted) ||
+         (socket_rw_state_get(&safe_or_conn->tor_read_wanted) &&
+          socket_rw_state_get(&safe_or_conn->bucket_read_allowed));
+}
+
+static bool
+safe_or_connection_is_write_wanted(safe_connection_t *safe_conn)
+{
+  tor_assert(safe_conn != NULL);
+  safe_or_connection_t *safe_or_conn = TO_SAFE_OR_CONN(safe_conn);
+
+  return socket_rw_state_get(&safe_or_conn->tls_write_wanted) ||
+         (socket_rw_state_get(&safe_or_conn->tor_write_wanted) &&
+          socket_rw_state_get(&safe_or_conn->bucket_write_allowed));
+}
+
+static tor_error_t
+safe_or_connection_update_state(safe_or_connection_t *safe_or_conn,
+                                or_conn_state_t new_state)
+{
+  if (new_state == safe_or_conn->state) {
+    log_warn(LD_OR, "Trying to change to the current state (or_conn_state_t) "
+                    "of %d", new_state);
+  }
+
+  event_data_t null_data = { .ptr = NULL };
+  // this is used by several cases below
+
+  switch (new_state) {
+  case SAFE_OR_CONN_STATE_UNINITIALIZED:
+    tor_assert_unreached();
+    break;
+  case SAFE_OR_CONN_STATE_NO_SOCKET:
+    socket_rw_state_set(&safe_or_conn->tor_read_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tor_write_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tls_read_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tls_write_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    break;
+  case SAFE_OR_CONN_STATE_TCP_CONNECTING:
+    // the socket was EINPROGRESS, so wait for the socket to become
+    // writable
+    socket_rw_state_set(&safe_or_conn->tor_read_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tor_write_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tls_read_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tls_write_wanted, true,
+                        TO_SAFE_CONN(safe_or_conn));
+    event_source_publish(TO_SAFE_CONN(safe_or_conn)->event_source,
+                         safe_or_conn_tcp_connecting_ev,
+                         null_data, NULL);
+    break;
+  case SAFE_OR_CONN_STATE_PROXY_HANDSHAKING:
+    log_warn(LD_OR, "Relay connection proxy handshake state has not yet "
+                    "been implemented");
+    tor_assert(0);
+    break;
+  case SAFE_OR_CONN_STATE_TLS_HANDSHAKING:
+  {
+    // begin the handshake when either the socket is readable or
+    // writable
+    if (safe_or_conn->tls != NULL) {
+      log_warn(LD_OR, "safe_or_conn->tls should not be set");
+      return E_ERROR;
+    }
+    bool is_receiving = !safe_or_conn->is_outgoing;
+    if (TO_SAFE_CONN(safe_or_conn)->socket == TOR_INVALID_SOCKET) {
+      log_warn(LD_OR, "No socket was set yet");
+      return E_ERROR;
+    }
+    safe_or_conn->tls = tor_tls_new(TO_SAFE_CONN(safe_or_conn)->socket,
+                                    is_receiving);
+    if (safe_or_conn->tls == NULL) {
+      log_warn(LD_OR, "Could not create a new tor TLS object");
+      return E_ERROR;
+    }
+    if (safe_or_conn->remote_address_str != NULL) {
+      tor_tls_set_logged_address(safe_or_conn->tls,
+                                 safe_or_conn->remote_address_str);
+    }
+    socket_rw_state_set(&safe_or_conn->tor_read_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tor_write_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tls_read_wanted, true,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tls_write_wanted, true,
+                        TO_SAFE_CONN(safe_or_conn));
+    event_source_publish(TO_SAFE_CONN(safe_or_conn)->event_source,
+                         safe_or_conn_tls_handshaking_ev,
+                         null_data, NULL);
+    break;
+  }
+  case SAFE_OR_CONN_STATE_LINK_HANDSHAKING:
+  {
+    if (safe_or_conn->tls == NULL) {
+      log_warn(LD_OR, "safe_or_conn->tls was not set");
+      return E_ERROR;
+    }
+
+    socket_rw_state_set(&safe_or_conn->tor_read_wanted, true,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tor_write_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tls_read_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tls_write_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+
+    link_handshaking_ev_data *handshake_data = \
+      tor_malloc_zero(sizeof(link_handshaking_ev_data));
+    handshake_data->tls_own_cert = tor_tls_get_own_cert(safe_or_conn->tls);
+    handshake_data->tls_peer_cert = tor_tls_get_peer_cert(safe_or_conn->tls);
+
+    event_data_t ev_data = { .ptr = handshake_data };
+    event_source_publish(TO_SAFE_CONN(safe_or_conn)->event_source,
+                         safe_or_conn_link_handshaking_ev,
+                         ev_data, link_handshaking_ev_free);
+    break;
+  }
+  case SAFE_OR_CONN_STATE_OPEN:
+    socket_rw_state_set(&safe_or_conn->tor_read_wanted, true,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tor_write_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tls_read_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tls_write_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    event_source_publish(TO_SAFE_CONN(safe_or_conn)->event_source,
+                         safe_or_conn_open_ev, null_data, NULL);
+    break;
+  case SAFE_OR_CONN_STATE_CLOSED:
+    socket_rw_state_set(&safe_or_conn->tor_read_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tor_write_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tls_read_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tls_write_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    event_source_publish(TO_SAFE_CONN(safe_or_conn)->event_source,
+                         safe_or_conn_closed_ev, null_data, NULL);
+    break;
+  default:
+    log_warn(LD_OR, "Unexpected state");
+    tor_assert(0);
+    break;
+  }
+
+  log_warn(LD_OR, "Safe OR conn changed from state %d to state %d",
+            safe_or_conn->state, new_state);
+  // TODO: change back to debug ^^
+  safe_or_conn->state = new_state;
+
+  return E_SUCCESS;
+}
+
+static tor_error_t
+safe_or_connection_check_tcp_connection(safe_or_connection_t *safe_or_conn)
+{
+  tor_assert(safe_or_conn != NULL);
+
+  int e;
+  socklen_t len = (socklen_t)sizeof(e);
+
+  if (getsockopt(TO_SAFE_CONN(safe_or_conn)->socket, SOL_SOCKET, SO_ERROR,
+                 (void *)&e, &len) < 0) {
+    log_warn(LD_BUG, "getsockopt() syscall failed");
+    return E_ERROR;
+  }
+
+  if (e != 0) {
+    // some sort of error, but maybe just inprogress still
+    if (!ERRNO_IS_CONN_EINPROGRESS(e)) {
+      log_info(LD_NET, "In-progress connect failed. Removing. (%s)",
+               tor_socket_strerror(e));
+      return E_ERROR;
+    } else {
+      // no change, see if next time is better
+      return E_SUCCESS;
+    }
+  }
+
+  // there was no error
+  return safe_or_connection_update_state(safe_or_conn,
+           SAFE_OR_CONN_STATE_TLS_HANDSHAKING);
+}
+
+static int
+safe_or_connection_read_tls(safe_or_connection_t *safe_or_conn,
+                            size_t suggested_bytes_to_read,
+                            size_t *total_bytes_read)
+{
+  tor_assert(safe_or_conn != NULL);
+  tor_assert(suggested_bytes_to_read > 0);
+  *total_bytes_read = 0;
+
+  {
+    size_t bytes_read = 0;
+    int tls_rv = buf_read_from_tls(TO_SAFE_CONN(safe_or_conn)->inbuf,
+                                   safe_or_conn->tls,
+                                   suggested_bytes_to_read,
+                                   &bytes_read);
+    *total_bytes_read += bytes_read;
+
+    if (tls_rv != TOR_TLS_DONE) {
+      return tls_rv;
+    }
+  }
+
+  int pending_bytes_to_read = tor_tls_get_pending_bytes(safe_or_conn->tls);
+  if (pending_bytes_to_read > 0) {
+    size_t bytes_read = 0;
+    int tls_rv = buf_read_from_tls(TO_SAFE_CONN(safe_or_conn)->inbuf,
+                                   safe_or_conn->tls,
+                                   pending_bytes_to_read,
+                                   &bytes_read);
+    if (PREDICT_LIKELY(SIZE_MAX-(*total_bytes_read) > bytes_read)) {
+      *total_bytes_read += bytes_read;
+    } else {
+      *total_bytes_read = SIZE_MAX;
+    }
+
+    tor_assert(tls_rv != TOR_TLS_WANTREAD && tls_rv != TOR_TLS_WANTWRITE);
+    // we don't expect either of these when reading pending bytes
+    if (tls_rv != TOR_TLS_DONE) {
+      return tls_rv;
+    }
+  }
+
+  return TOR_TLS_DONE;
+}
+
+static int
+safe_or_connection_write_tls(safe_or_connection_t *safe_or_conn,
+                             size_t max_bytes_to_write,
+                             size_t *total_bytes_written)
+{
+  tor_assert(safe_or_conn != NULL);
+  tor_assert(max_bytes_to_write > 0);
+  *total_bytes_written = 0;
+
+  size_t bytes_written = 0;
+  int tls_rv = buf_flush_to_tls(TO_SAFE_CONN(safe_or_conn)->outbuf,
+                                safe_or_conn->tls,
+                                max_bytes_to_write,
+                                &bytes_written);
+  *total_bytes_written += bytes_written;
+
+  return tls_rv;
+}
+
+// this function will be needed when proxies are supported
+/*
+static tor_error_t
+safe_or_connection_read_plaintext(safe_or_connection_t *safe_or_conn)
+{
+  tor_assert(safe_or_conn != NULL);
+
+  uint32_t coarse_time = monotime_coarse_get_stamp();
+  safe_or_connection_refill_buckets(safe_or_conn, coarse_time);
+
+  size_t bytes_to_read = safe_or_connection_max_bytes_can_read(safe_or_conn);
+
+  if (bytes_to_read == 0) {
+    log_debug(LD_NET, "Read callback running, but not supposed to read bytes.");
+    return E_SUCCESS;
+  }
+
+  size_t buf_initial_size = buf_datalen(TO_SAFE_CONN(safe_or_conn)->inbuf);
+  size_t bytes_read = 0;
+  int reached_eof = 0;
+  int socket_error = 0;
+  // STEVE: if reusing this with control connections, then need to wrap
+  //        with 'CONN_LOG_PROTECT' (see connection.c,
+  //        !connection_speaks_cells, !conn->linked_conn. )
+  int rv = buf_read_from_socket(TO_SAFE_CONN(safe_or_conn)->inbuf,
+                                TO_SAFE_CONN(safe_or_conn)->socket,
+                                bytes_to_read, &reached_eof,
+                                &socket_error);
+  if (rv < 0) {
+    log_debug(LD_NET, "OR plaintext connection closed on read error.");
+    // TODO: need to send the socket_error back to the main thread
+    return E_ERROR;
+  } else if(rv == 0 && reached_eof != 0) {
+    // close the connection normally
+    log_debug(LD_NET, "OR plaintext connection closed on read eof.");
+    // return an error so that the calling function will close it
+    return E_ERROR;
+  } else {
+    bytes_read = rv;
+  }
+
+  if (PREDICT_LIKELY(bytes_read < SIZE_MAX)) {
+    tor_assert(bytes_read == \
+               buf_datalen(TO_SAFE_CONN(safe_or_conn)->inbuf)-buf_initial_size);
+  } else {
+    log_warn(LD_NET, "We read an unexpectedly large number of bytes: %zu "
+                     ">= SIZE_MAX",
+             bytes_read);
+  }
+
+  log_debug(LD_NET, "OR plaintext read of %ld", (long)bytes_read);
+
+  safe_or_connection_decrement_buckets(safe_or_conn, bytes_read, 0);
+  return E_SUCCESS;
+}
+*/
+
+static tor_error_t
+safe_or_connection_read_encrypted(safe_or_connection_t *safe_or_conn,
+                                  bool use_conn_buckets)
+{
+  tor_assert(safe_or_conn != NULL);
+
+  uint32_t coarse_time = monotime_coarse_get_stamp();
+  safe_or_connection_refill_buckets(safe_or_conn, coarse_time);
+
+  size_t suggested_bytes_to_read = \
+    safe_or_connection_max_bytes_can_read(safe_or_conn, use_conn_buckets);
+  // we may read slightly more than this due to pending TLS bytes
+
+  log_warn(LD_NET, "suggested_bytes_to_read: %ld", suggested_bytes_to_read);
+  // TODO: remove this ^^
+
+  if (suggested_bytes_to_read == 0) {
+    log_debug(LD_NET, "Read callback running, but not supposed to read bytes.");
+    return E_SUCCESS;
+  }
+
+  size_t buf_initial_size = buf_datalen(TO_SAFE_CONN(safe_or_conn)->inbuf);
+  size_t bytes_read = 0;
+  int tls_rv = safe_or_connection_read_tls(safe_or_conn,
+                                           suggested_bytes_to_read,
+                                           &bytes_read);
+  switch (tls_rv) {
+  case TOR_TLS_CLOSE:
+  case TOR_TLS_ERROR_IO:
+    log_debug(LD_NET, "TLS connection closed %son read. Closing.",
+              tls_rv == TOR_TLS_CLOSE ? "cleanly " : "");
+    return E_ERROR;
+  CASE_TOR_TLS_ERROR_ANY_NONIO:
+    log_debug(LD_NET, "TLS error [%s]. Breaking.",
+              tor_tls_err_to_string(tls_rv));
+    return E_ERROR;
+  case TOR_TLS_WANTWRITE:
+    // we need to wait for the socket to become writable
+    // before we can do another read
+    socket_rw_state_set(&safe_or_conn->tls_write_wanted, true,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tor_read_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    break;
+  case TOR_TLS_WANTREAD:
+    // we need to wait for the socket to become readable
+    // again, then do another read
+    break;
+  default:
+    break;
+  }
+
+  if (PREDICT_LIKELY(bytes_read < SIZE_MAX)) {
+    size_t buf_len_diff = buf_datalen(TO_SAFE_CONN(safe_or_conn)->inbuf)-buf_initial_size;
+    if (bytes_read != buf_len_diff) {
+      log_warn(LD_OR, "Doesn't match! bytes_read: %ld, buf_len_diff: %ld",
+               bytes_read, buf_len_diff);
+      tor_assert_nonfatal_unreached_once();
+    }
+  } else {
+    log_warn(LD_NET, "We read an unexpectedly large number of bytes: %zu "
+                     ">= SIZE_MAX",
+             bytes_read);
+  }
+
+  // let any listeners know that we have new data in our incoming buffer
+  if (bytes_read > 0) {
+    event_data_t null_data = { .ptr = NULL };
+    event_source_publish(TO_SAFE_CONN(safe_or_conn)->event_source,
+                         safe_or_conn_has_buffered_data_ev, null_data, NULL);
+  }
+
+  size_t tls_bytes_read = 0;
+  size_t tls_bytes_written = 0;
+  tor_tls_get_n_raw_bytes(safe_or_conn->tls, &tls_bytes_read,
+                          &tls_bytes_written);
+  log_warn(LD_NET, "After TLS read of %ld: %ld read, %ld written",
+            (long)bytes_read, (long)tls_bytes_read, (long)tls_bytes_written);
+  // TODO: change this back to debug ^^
+
+  safe_or_connection_decrement_buckets(safe_or_conn, tls_bytes_read,
+                                       tls_bytes_written, use_conn_buckets);
+
+  // TODO: if get_options()->TestingEnableConnBwEvent, increase conn stats?
+
+  return E_SUCCESS;
+}
+
+static tor_error_t
+safe_or_connection_write_encrypted(safe_or_connection_t *safe_or_conn,
+                                   bool use_conn_buckets)
+{
+  tor_assert(safe_or_conn != NULL);
+
+  uint32_t coarse_time = monotime_coarse_get_stamp();
+  safe_or_connection_refill_buckets(safe_or_conn, coarse_time);
+
+  size_t max_bytes_to_write = \
+    safe_or_connection_max_bytes_can_write(safe_or_conn, use_conn_buckets);
+
+  log_warn(LD_NET, "max_bytes_to_write: %ld", max_bytes_to_write);
+  // TODO: remove this ^^
+
+  if (max_bytes_to_write == 0) {
+    log_debug(LD_NET, "Write callback running, but not supposed to write bytes.");
+    return E_SUCCESS;
+  }
+
+  size_t buf_initial_size = buf_datalen(TO_SAFE_CONN(safe_or_conn)->outbuf);
+  size_t bytes_written = 0;
+  max_bytes_to_write = MIN(max_bytes_to_write, buf_initial_size);
+  int tls_rv = safe_or_connection_write_tls(safe_or_conn,
+                                            max_bytes_to_write,
+                                            &bytes_written);
+  switch (tls_rv) {
+  case TOR_TLS_CLOSE:
+  case TOR_TLS_ERROR_IO:
+    log_debug(LD_NET, "TLS connection closed %son write. Closing.",
+              tls_rv == TOR_TLS_CLOSE ? "cleanly " : "");
+    return E_ERROR;
+  CASE_TOR_TLS_ERROR_ANY_NONIO:
+    log_debug(LD_NET, "TLS error [%s]. Breaking.",
+              tor_tls_err_to_string(tls_rv));
+    return E_ERROR;
+  case TOR_TLS_WANTWRITE:
+    // we need to wait for the socket to become writable
+    // again, then do another write
+    log_warn(LD_NET, "Wants write");
+    // TODO: remove this ^^
+    break;
+  case TOR_TLS_WANTREAD:
+    // we need to wait for the socket to become readable
+    // before we can do another write
+    log_warn(LD_NET, "Wants read");
+    // TODO: remove this ^^
+    socket_rw_state_set(&safe_or_conn->tls_read_wanted, true,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tor_write_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    break;
+  default:
+    break;
+  }
+
+  if (PREDICT_LIKELY(bytes_written < SIZE_MAX)) {
+    size_t buf_len_diff = buf_initial_size-buf_datalen(TO_SAFE_CONN(safe_or_conn)->outbuf);
+    if (bytes_written != buf_len_diff) {
+      log_warn(LD_OR, "Doesn't match! bytes_written: %ld, buf_len_diff: %ld",
+               bytes_written, buf_len_diff);
+      tor_assert_nonfatal_unreached_once();
+    }
+  } else {
+    log_warn(LD_NET, "We wrote an unexpectedly large number of bytes: %zu "
+                     ">= SIZE_MAX",
+             bytes_written);
+  }
+
+  // fixes a throughput problem in old versions of Windows
+  // TODO: we should still include this, but needs to be moved here since it's
+  //       currently static
+  //update_send_buffer_size(TO_SAFE_CONN(safe_or_conn)->socket);
+
+  if (buf_datalen(TO_SAFE_CONN(safe_or_conn)->outbuf) == 0) {
+    // we have no more data to write
+    socket_rw_state_set(&safe_or_conn->tor_write_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+  }
+
+  size_t tls_bytes_read = 0;
+  size_t tls_bytes_written = 0;
+  tor_tls_get_n_raw_bytes(safe_or_conn->tls, &tls_bytes_read,
+                          &tls_bytes_written);
+  log_warn(LD_NET, "After TLS write of %ld: %ld read, %ld written",
+            (long)bytes_written, (long)tls_bytes_read, (long)tls_bytes_written);
+  // TODO: change this back to debug ^^
+
+  safe_or_connection_decrement_buckets(safe_or_conn, tls_bytes_read,
+                                       tls_bytes_written, use_conn_buckets);
+
+  // TODO: if get_options()->TestingEnableConnBwEvent, increase conn stats?
+
+  return E_SUCCESS;
+}
+
+static tor_error_t
+safe_or_connection_tls_handshake(safe_or_connection_t *safe_or_conn)
+{
+  tor_assert(safe_or_conn != NULL);
+  check_no_tls_errors();
+
+  int result = tor_tls_handshake(safe_or_conn->tls);
+  
+  switch (result) {
+  CASE_TOR_TLS_ERROR_ANY:
+    log_info(LD_OR, "TLS error [%s]",
+             tor_tls_err_to_string(result));
+    return E_ERROR;
+  case TOR_TLS_CLOSE:
+    log_info(LD_OR, "TLS closed");
+    return E_ERROR;
+  case TOR_TLS_WANTWRITE:
+    // we need to wait for the socket to become writable
+    // before we can continue the handshake
+    socket_rw_state_set(&safe_or_conn->tls_write_wanted, true,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tls_read_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    return E_SUCCESS;
+  case TOR_TLS_WANTREAD:
+    // we need to wait for the socket to become readable
+    // before we can continue the handshake
+    socket_rw_state_set(&safe_or_conn->tls_read_wanted, true,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tls_write_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    return E_SUCCESS;
+  case TOR_TLS_DONE:
+    // the TLS handshake has finished, but not the entire link handshake
+    if (tor_tls_is_server(safe_or_conn->tls)) {
+      // we didn't start the handshake, so prepare for a v3 handshake
+      log_debug(LD_OR, "Done with initial SSL handshake (receiver-side)");
+    } else {
+      // we need to start the v3 handshake
+      log_debug(LD_OR, "Done with initial SSL handshake (initiator-side)");
+      //if (connection_or_launch_v3_or_handshake(conn) < 0) {
+      //  return E_ERROR;
+      //}
+    }
+    return safe_or_connection_update_state(safe_or_conn,
+             SAFE_OR_CONN_STATE_LINK_HANDSHAKING);
+  default:
+    log_warn(LD_OR, "Unexpected return value from handshake");
+    return E_ERROR;
+  }
+}
+
+/*
+static int
+safe_or_connection_tls_finish_v1_handshake(safe_or_connection_t *safe_or_conn)
+{
+  tor_assert(safe_or_conn != NULL);
+  tor_assert(tor_tls_used_v1_handshake(safe_or_conn->tls));
+  tor_assert(tor_tls_is_server(safe_or_conn->tls));
+  tor_assert(!safe_or_conn->is_outgoing);
+  // we should not be making v1 handshakes, but we may receive v1 handshakes
+
+  log_debug(LD_HANDSHAKE, "%s tls v1 handshake on %p with %s done, using "
+                          "ciphersuite %s. verifying.",
+            safe_or_conn->is_outgoing?"Outgoing":"Incoming",
+            safe_or_conn,
+            safe_or_conn->remote_address_str,
+            tor_tls_get_ciphersuite_name(safe_or_conn->tls));
+
+  //tor_tls_block_renegotiation(safe_or_conn->tls);
+
+  char digest_rcvd[DIGEST_LEN] = {0};
+  // TODO fix below
+  if (connection_or_check_valid_tls_handshake(conn, started_here,
+                                              digest_rcvd) < 0) {
+    return -1;
+  }
+
+  // TODO in main thread
+  //circuit_build_times_network_is_live(get_circuit_build_times_mutable());
+  //conn->link_proto = 1;
+  //connection_or_init_conn_from_address(conn, &conn->base_.addr,
+  //                                     conn->base_.port, digest_rcvd,
+  //                                     NULL, 0);
+  //rep_hist_note_negotiated_link_proto(1, started_here);
+  //return connection_or_set_state_open(conn);
+
+  return 0;
+}
+*/
+
+static void
+safe_or_connection_read_cb(safe_connection_t *safe_conn)
+{
+  tor_assert(safe_conn != NULL);
+  safe_or_connection_t *safe_or_conn = TO_SAFE_OR_CONN(safe_conn);
+
+  log_warn(LD_OR, "OR Connection Read  (state=%d, obj=%p, %s)!!!!",
+           safe_or_conn->state, safe_or_conn,
+           safe_or_conn->is_outgoing?"outgoing":"incoming");
+  // TODO: remove this ^^
+
+  //if (safe_or_conn->tls_write_waiting_on_socket_readable) {
+  //  // since the socket is now readable, we can re-enable TLS write again
+  //  safe_or_conn->tls_write_waiting_on_socket_readable = false;
+  //  safe_connection_set_write_state(TO_SAFE_CONN(safe_or_conn), true);
+  //}
+
+  switch (safe_or_conn->state) {
+  case SAFE_OR_CONN_STATE_UNINITIALIZED:
+    tor_assert_unreached();
+    break;
+  case SAFE_OR_CONN_STATE_TCP_CONNECTING:
+    // we shouldn't get here, so make sure we're not wanting to read
+    socket_rw_state_set(&safe_or_conn->tls_read_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tor_read_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    log_warn(LD_OR, "Connecting OR conection wants to read");
+    break;
+  case SAFE_OR_CONN_STATE_PROXY_HANDSHAKING:
+    log_warn(LD_OR, "Relay connection proxy handshaking state has not yet "
+                    "been implemented");
+    tor_assert(0);
+    // we are performing the proxy handshake
+    //tor_error_t rv = safe_or_connection_plaintext(safe_or_conn);
+    //if (rv != E_SUCCESS) {
+    //  tor_assert(safe_or_connection_update_state(safe_or_conn,
+    //    SAFE_OR_CONN_STATE_CLOSED) == E_SUCCESS);
+    //}
+    break;
+  case SAFE_OR_CONN_STATE_TLS_HANDSHAKING:
+  {
+    // we are performing the initial TLS handshake
+    tor_error_t rv = safe_or_connection_tls_handshake(safe_or_conn);
+    if (rv != E_SUCCESS) {
+      tor_assert(safe_or_connection_update_state(safe_or_conn,
+        SAFE_OR_CONN_STATE_CLOSED) == E_SUCCESS);
+    }
+    break;
+  }
+  case SAFE_OR_CONN_STATE_LINK_HANDSHAKING:
+  case SAFE_OR_CONN_STATE_OPEN:
+  {
+    // performing the link handshake, or the handshake has already
+    // completed and we're sending/receiving cells
+    if (socket_rw_state_get(&safe_or_conn->tls_read_wanted)) {
+      // since the socket is now readable, we can re-enable writing again
+      socket_rw_state_set(&safe_or_conn->tls_read_wanted, false,
+                          TO_SAFE_CONN(safe_or_conn));
+      socket_rw_state_set(&safe_or_conn->tor_write_wanted, true,
+                          TO_SAFE_CONN(safe_or_conn));
+    }
+    // TODO: we may not actually want to read here now that the states are
+    // updated, should we re-check?
+
+    bool use_conn_buckets = (safe_or_conn->state == SAFE_OR_CONN_STATE_OPEN);
+
+    tor_error_t rv = safe_or_connection_read_encrypted(safe_or_conn,
+                                                       use_conn_buckets);
+    if (rv != E_SUCCESS) {
+      tor_assert(safe_or_connection_update_state(safe_or_conn,
+        SAFE_OR_CONN_STATE_CLOSED) == E_SUCCESS);
+    }
+    break;
+  }
+  case SAFE_OR_CONN_STATE_CLOSED:
+  case SAFE_OR_CONN_STATE_NO_SOCKET:
+    // we shouldn't get here, so make sure we're not wanting to read
+    socket_rw_state_set(&safe_or_conn->tls_read_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tor_read_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    log_warn(LD_OR, "Closed OR conection wants to read");
+    break;
+  default:
+    log_warn(LD_OR, "Unexpected safe OR connection state");
+    tor_assert(0);
+    break;
+  }
+}
+
+static void
+safe_or_connection_write_cb(safe_connection_t *safe_conn)
+{
+  tor_assert(safe_conn != NULL);
+  safe_or_connection_t *safe_or_conn = TO_SAFE_OR_CONN(safe_conn);
+
+  log_warn(LD_OR, "OR Connection Write (state=%d, obj=%p, %s)!!!!",
+           safe_or_conn->state, safe_or_conn,
+           safe_or_conn->is_outgoing?"outgoing":"incoming");
+  // TODO: remove this ^^
+
+  switch (safe_or_conn->state) {
+  case SAFE_OR_CONN_STATE_UNINITIALIZED:
+    tor_assert_unreached();
+    break;
+  case SAFE_OR_CONN_STATE_TCP_CONNECTING:
+  {
+    // the socket was connecting and is now ready to write, so we
+    // should check for errors before using the socket
+    tor_error_t rv = safe_or_connection_check_tcp_connection(safe_or_conn);
+    if (rv != E_SUCCESS) {
+      tor_assert(safe_or_connection_update_state(safe_or_conn,
+        SAFE_OR_CONN_STATE_CLOSED));
+    }
+    break;
+  }
+  case SAFE_OR_CONN_STATE_PROXY_HANDSHAKING:
+    log_warn(LD_OR, "Relay connection proxy handshaking state has not yet "
+                    "been implemented");
+    tor_assert(0);
+    // we are performing the proxy handshake
+    break;
+  case SAFE_OR_CONN_STATE_TLS_HANDSHAKING:
+  {
+    // we are performing the initial TLS handshake
+    tor_error_t rv = safe_or_connection_tls_handshake(safe_or_conn);
+    if (rv != E_SUCCESS) {
+      tor_assert(safe_or_connection_update_state(safe_or_conn,
+        SAFE_OR_CONN_STATE_CLOSED));
+    }
+    break;
+  }
+  case SAFE_OR_CONN_STATE_LINK_HANDSHAKING:
+  case SAFE_OR_CONN_STATE_OPEN:
+  {
+    // performing the link handshake, or the handshake has already
+    // completed and we're sending/receiving cells
+    if (socket_rw_state_get(&safe_or_conn->tls_write_wanted)) {
+      // since the socket is now writable, we can re-enable reading again
+      socket_rw_state_set(&safe_or_conn->tls_write_wanted, false,
+                          TO_SAFE_CONN(safe_or_conn));
+      socket_rw_state_set(&safe_or_conn->tor_read_wanted, true,
+                          TO_SAFE_CONN(safe_or_conn));
+    }
+    // TODO: we may not actually want to write here now that the states are
+    // updated, should we re-check?
+
+    bool use_conn_buckets = (safe_or_conn->state == SAFE_OR_CONN_STATE_OPEN);
+
+    tor_error_t rv = safe_or_connection_write_encrypted(safe_or_conn,
+                                                        use_conn_buckets);
+    if (rv != E_SUCCESS) {
+      tor_assert(safe_or_connection_update_state(safe_or_conn,
+        SAFE_OR_CONN_STATE_CLOSED));
+    }
+    break;
+  }
+  case SAFE_OR_CONN_STATE_CLOSED:
+  case SAFE_OR_CONN_STATE_NO_SOCKET:
+    // we shouldn't get here, so make sure we're not wanting to write
+    socket_rw_state_set(&safe_or_conn->tls_write_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    socket_rw_state_set(&safe_or_conn->tor_write_wanted, false,
+                        TO_SAFE_CONN(safe_or_conn));
+    log_warn(LD_OR, "Closed OR conection wants to write");
+    break;
+  default:
+    log_warn(LD_OR, "Unexpected safe OR connection state");
+    tor_assert(0);
+    break;
+  }
+}

+ 177 - 0
src/core/or/safe_connection.h

@@ -0,0 +1,177 @@
+/* Copyright (c) 2013-2019, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef OR_SAFE_CONN_H
+#define OR_SAFE_CONN_H
+
+#include "core/or/relay.h"
+#include "lib/evloop/compat_libevent.h"
+#include "lib/evloop/events.h"
+#include "lib/evloop/token_bucket.h"
+#include "lib/lock/compat_mutex.h"
+#include "lib/tls/x509.h"
+
+extern event_label_t safe_or_conn_tcp_connecting_ev;
+extern event_label_t safe_or_conn_tls_handshaking_ev;
+extern event_label_t safe_or_conn_link_handshaking_ev;
+extern event_label_t safe_or_conn_open_ev;
+extern event_label_t safe_or_conn_closed_ev;
+extern event_label_t safe_or_conn_has_buffered_data_ev;
+
+typedef struct link_handshaking_ev_data_t {
+  tor_x509_cert_t *tls_own_cert; // the ownership is passed in this event
+  tor_x509_cert_t *tls_peer_cert; // the ownership is passed in this event
+} link_handshaking_ev_data;
+
+void link_handshaking_ev_free(void *ptr);
+
+//#define SAFE_BASE_CONN_MAGIC 0x64DB4EE2u
+#define SAFE_OR_CONN_MAGIC 0x1221ABBAu
+
+typedef enum tor_error_t {
+  E_SUCCESS = 0,
+  E_ERROR = 1,
+} tor_error_t;
+
+typedef enum or_conn_state_t {
+  SAFE_OR_CONN_STATE_UNINITIALIZED,
+  SAFE_OR_CONN_STATE_NO_SOCKET,
+  SAFE_OR_CONN_STATE_TCP_CONNECTING,
+  SAFE_OR_CONN_STATE_PROXY_HANDSHAKING,
+  SAFE_OR_CONN_STATE_TLS_HANDSHAKING,
+  SAFE_OR_CONN_STATE_LINK_HANDSHAKING,
+  SAFE_OR_CONN_STATE_OPEN,
+  SAFE_OR_CONN_STATE_CLOSED,
+} or_conn_state_t;
+
+typedef struct socket_rw_state_t {
+  bool state;
+} socket_rw_state_t;
+
+typedef struct safe_connection_t {
+  uint32_t magic;
+  tor_mutex_t lock;
+
+  bool linked;
+  tor_socket_t socket;
+
+  struct event *read_event;
+  struct event *write_event;
+  socket_rw_state_t read_allowed;
+  socket_rw_state_t write_allowed;
+
+  bool (*is_read_wanted)(struct safe_connection_t *);
+  bool (*is_write_wanted)(struct safe_connection_t *);
+  void (*read_cb)(struct safe_connection_t *);
+  void (*write_cb)(struct safe_connection_t *);
+  void (*socket_added_cb)(struct safe_connection_t *);
+  void (*inbuf_modified_cb)(struct safe_connection_t *);
+  void (*outbuf_modified_cb)(struct safe_connection_t *);
+
+  struct buf_t *inbuf;
+  struct buf_t *outbuf;
+
+  event_source_t *event_source;
+} safe_connection_t;
+
+typedef struct safe_or_connection_t {
+  safe_connection_t base_;
+  token_bucket_rw_t bucket;
+  struct tor_tls_t *tls;
+  or_conn_state_t state;
+  bool is_outgoing;
+  char *remote_address_str;
+
+  socket_rw_state_t tor_read_wanted;
+  socket_rw_state_t tor_write_wanted;
+  socket_rw_state_t tls_read_wanted;
+  socket_rw_state_t tls_write_wanted;
+  socket_rw_state_t bucket_read_allowed;
+  socket_rw_state_t bucket_write_allowed;
+
+  //bool tls_read_waiting_on_socket_writable;
+  //bool tls_write_waiting_on_socket_readable;
+} safe_or_connection_t;
+
+safe_or_connection_t *TO_SAFE_OR_CONN(safe_connection_t *safe_conn);
+
+#define TO_SAFE_CONN(c) (&(((c)->base_)))
+
+void safe_or_conn_register_events(event_registry_t *registry);
+
+/********************************************************/
+
+void
+safe_connection_init(safe_connection_t *safe_conn, uint32_t type_magic,
+                     bool (*is_read_wanted)(safe_connection_t *),
+                     bool (*is_write_wanted)(safe_connection_t *),
+                     void (*read_cb)(safe_connection_t *),
+                     void (*write_cb)(safe_connection_t *),
+                     void (*socket_added_cb)(safe_connection_t *),
+                     void (*inbuf_modified_cb)(safe_connection_t *),
+                     void (*outbuf_modified_cb)(safe_connection_t *),
+                     bool requires_buffers, bool linked);
+
+void
+safe_connection_set_socket(safe_connection_t *safe_conn, tor_socket_t socket);
+
+void
+safe_connection_subscribe(safe_connection_t *safe_conn,
+                          event_listener_t *listener, event_label_t label,
+                          bool send_full_event);
+
+void
+safe_connection_unsubscribe_all(safe_connection_t *safe_conn,
+                                event_listener_t *listener);
+
+void
+safe_connection_unregister_events(safe_connection_t *safe_conn);
+
+tor_error_t
+safe_connection_register_events(safe_connection_t *safe_conn,
+                                struct event_base *event_base);
+
+void
+safe_connection_set_read_permission(safe_connection_t *safe_conn,
+                                    bool read_allowed);
+
+void
+safe_connection_set_write_permission(safe_connection_t *safe_conn,
+                                     bool write_allowed);
+
+void
+safe_connection_inbuf_modified(safe_connection_t *safe_conn);
+
+void
+safe_connection_outbuf_modified(safe_connection_t *safe_conn);
+
+/********************************************************/
+
+safe_or_connection_t *
+safe_or_connection_new(bool requires_buffers, bool is_outgoing,
+                       const char *remote_address_str);
+
+void
+safe_or_connection_get_tls_desc(safe_or_connection_t *safe_or_conn,
+                                char *buf, size_t buf_size);
+
+int
+safe_or_connection_tls_secrets(safe_or_connection_t *safe_or_conn,
+                               uint8_t *secrets_out);
+
+int
+safe_or_connection_key_material(safe_or_connection_t *safe_or_conn,
+                                uint8_t *secrets_out,
+                                const uint8_t *context,
+                                size_t context_len, const char *label);
+
+void
+safe_or_connection_refill_buckets(safe_or_connection_t *safe_or_conn,
+                                  uint32_t now_ts);
+
+void
+safe_or_connection_adjust_buckets(safe_or_connection_t *safe_or_conn,
+                                  uint32_t rate, uint32_t burst,
+                                  bool reset, uint32_t now_ts);
+
+#endif

+ 13 - 8
src/feature/nodelist/torcert.c

@@ -505,7 +505,8 @@ or_handshake_certs_free_(or_handshake_certs_t *certs)
 int
 or_handshake_certs_rsa_ok(int severity,
                           or_handshake_certs_t *certs,
-                          tor_tls_t *tls,
+                          //tor_tls_t *tls,
+                          tor_x509_cert_t *peer_cert,
                           time_t now)
 {
   tor_x509_cert_t *link_cert = certs->link_cert;
@@ -515,7 +516,8 @@ or_handshake_certs_rsa_ok(int severity,
   if (certs->started_here) {
     if (! (id_cert && link_cert))
       ERR("The certs we wanted (ID, Link) were missing");
-    if (! tor_tls_cert_matches_key(tls, link_cert))
+    //if (! tor_tls_cert_matches_key(tls, link_cert))
+    if (! tor_tls_cert_matches_key(peer_cert, link_cert))
       ERR("The link certificate didn't match the TLS public key");
     if (! tor_tls_cert_is_valid(severity, link_cert, id_cert, now, 0))
       ERR("The link certificate was not valid");
@@ -540,7 +542,8 @@ or_handshake_certs_rsa_ok(int severity,
 int
 or_handshake_certs_ed25519_ok(int severity,
                               or_handshake_certs_t *certs,
-                              tor_tls_t *tls,
+                              //tor_tls_t *tls,
+                              tor_x509_cert_t *peer_cert,
                               time_t now)
 {
   ed25519_checkable_t check[10];
@@ -565,7 +568,8 @@ or_handshake_certs_ed25519_ok(int severity,
       ERR("No Ed25519 link key");
     {
       /* check for a match with the TLS cert. */
-      tor_x509_cert_t *peer_cert = tor_tls_get_peer_cert(tls);
+      //tor_x509_cert_t *peer_cert = tor_tls_get_peer_cert(tls);
+      
       if (BUG(!peer_cert)) {
         /* This is a bug, because if we got to this point, we are a connection
          * that was initiated here, and we completed a TLS handshake. The
@@ -577,7 +581,7 @@ or_handshake_certs_ed25519_ok(int severity,
       int okay = tor_memeq(peer_cert_digests->d[DIGEST_SHA256],
                            certs->ed_sign_link->signed_key.pubkey,
                            DIGEST256_LEN);
-      tor_x509_cert_free(peer_cert);
+      //tor_x509_cert_free(peer_cert);
       if (!okay)
         ERR("Link certificate does not match TLS certificate");
     }
@@ -684,7 +688,8 @@ check_tap_onion_key_crosscert,(const uint8_t *crosscert,
 void
 or_handshake_certs_check_both(int severity,
                               or_handshake_certs_t *certs,
-                              tor_tls_t *tls,
+                              //tor_tls_t *tls,
+                              tor_x509_cert_t *peer_cert,
                               time_t now,
                               const ed25519_public_key_t **ed_id_out,
                               const common_digests_t **rsa_id_out)
@@ -696,7 +701,7 @@ or_handshake_certs_check_both(int severity,
   *rsa_id_out = NULL;
 
   if (certs->ed_id_sign) {
-    if (or_handshake_certs_ed25519_ok(severity, certs, tls, now)) {
+    if (or_handshake_certs_ed25519_ok(severity, certs, peer_cert, now)) {
       tor_assert(certs->ed_id_sign);
       tor_assert(certs->id_cert);
 
@@ -714,7 +719,7 @@ or_handshake_certs_check_both(int severity,
      * certificates, we expect to verify them! */
   } else {
     /* No ed25519 keys given in the CERTS cell */
-    if (or_handshake_certs_rsa_ok(severity, certs, tls, now)) {
+    if (or_handshake_certs_rsa_ok(severity, certs, peer_cert, now)) {
       *rsa_id_out = tor_x509_cert_get_id_digests(certs->id_cert);
     }
   }

+ 7 - 3
src/feature/nodelist/torcert.h

@@ -5,6 +5,7 @@
 #define TORCERT_H_INCLUDED
 
 #include "lib/crypt_ops/crypto_ed25519.h"
+#include "lib/tls/x509.h"
 
 #define SIGNED_KEY_TYPE_ED25519     0x01
 
@@ -92,15 +93,18 @@ void or_handshake_certs_free_(or_handshake_certs_t *certs);
   FREE_AND_NULL(or_handshake_certs_t, or_handshake_certs_free_, (certs))
 int or_handshake_certs_rsa_ok(int severity,
                               or_handshake_certs_t *certs,
-                              struct tor_tls_t *tls,
+                              //struct tor_tls_t *tls,
+                              tor_x509_cert_t *peer_cert,
                               time_t now);
 int or_handshake_certs_ed25519_ok(int severity,
                                   or_handshake_certs_t *certs,
-                                  struct tor_tls_t *tls,
+                                  //struct tor_tls_t *tls,
+                                  tor_x509_cert_t *peer_cert,
                                   time_t now);
 void or_handshake_certs_check_both(int severity,
                               or_handshake_certs_t *certs,
-                              struct tor_tls_t *tls,
+                              //struct tor_tls_t *tls,
+                              tor_x509_cert_t *peer_cert,
                               time_t now,
                               const ed25519_public_key_t **ed_id_out,
                               const common_digests_t **rsa_id_out);

+ 1 - 1
src/feature/relay/ext_orport.c

@@ -100,7 +100,7 @@ connection_ext_or_transition(or_connection_t *conn)
   channel_t *chan = channel_tls_handle_incoming(conn);
   channel_listener_queue_incoming(channel_tls_get_listener(), chan);
 
-  connection_tls_start_handshake(conn, 1);
+  //connection_tls_start_handshake(conn, 1);
 }
 
 /** Length of authentication cookie. */

+ 0 - 1
src/lib/evloop/compat_libevent.c

@@ -15,7 +15,6 @@
 #include "lib/log/util_bug.h"
 #include "lib/string/compat_string.h"
 
-#include <event2/event.h>
 #include <event2/thread.h>
 #include <string.h>
 

+ 1 - 0
src/lib/evloop/compat_libevent.h

@@ -14,6 +14,7 @@
 #include "lib/malloc/malloc.h"
 #include "lib/thread/threads.h"
 
+#include <event2/event.h>
 #include <stdbool.h>
 
 void configure_libevent_logging(void);

+ 591 - 0
src/lib/evloop/events.c

@@ -0,0 +1,591 @@
+/* Copyright (c) 2013-2019, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#include "lib/evloop/events.h"
+
+#include "lib/log/util_bug.h"
+
+#include <event2/util.h>
+#include <event2/event.h>
+#include <string.h>
+
+/* How a subscribed listener wants to receive an event. */
+typedef struct event_subscription_t {
+  event_listener_t *listener;
+  bool send_full_event;
+} event_subscription_t;
+
+/* What a listener should do if it receives an event. */
+typedef struct event_callback_t {
+  bool edge_triggered;
+  bool is_edge_pending;
+  void (*process_event_fn)(event_label_t, event_data_t, void *);
+} event_callback_t;
+
+/**************************/
+
+static event_subscription_t *
+event_subscription_new(event_listener_t *listener, bool send_full_event)
+{
+  tor_assert(listener != NULL);
+  event_subscription_t *sub = tor_malloc_zero(sizeof(event_subscription_t));
+  sub->listener = listener;
+  sub->send_full_event = send_full_event;
+  return sub;
+}
+
+static void
+event_subscription_free(event_subscription_t *sub)
+{
+  tor_assert(sub != NULL);
+  memset(sub, 0x00, sizeof(*sub));
+  tor_free(sub);
+}
+
+/**************************/
+
+static event_callback_t *
+event_callback_new(bool edge_triggered,
+                   void (*process_event_fn)(event_label_t, event_data_t, void *))
+{
+  tor_assert(process_event_fn != NULL);
+  event_callback_t *cb = tor_malloc_zero(sizeof(event_callback_t));
+  cb->edge_triggered = edge_triggered;
+  cb->is_edge_pending = false;
+  cb->process_event_fn = process_event_fn;
+  return cb;
+}
+
+static void
+event_callback_free(event_callback_t *cb)
+{
+  tor_assert(cb != NULL);
+  memset(cb, 0x00, sizeof(*cb));
+  tor_free(cb);
+}
+
+/**************************/
+
+static event_wrapper_t *
+event_wrapper_new(event_label_t label,
+                  event_data_t data,
+                  void (*free_data_fn)(void *))
+{
+  event_wrapper_t *wrapper = tor_malloc_zero(sizeof(event_wrapper_t));
+  wrapper->label = label;
+  wrapper->data = data;
+  wrapper->free_data_fn = free_data_fn;
+  return wrapper;
+}
+
+static void
+event_wrapper_free(event_wrapper_t *wrapper)
+{
+  tor_assert(wrapper != NULL);
+  if (wrapper->free_data_fn != NULL) {
+    wrapper->free_data_fn(wrapper->data.ptr);
+  }
+  memset(wrapper, 0x00, sizeof(*wrapper));
+  tor_free(wrapper);
+}
+
+/**************************/
+
+event_registry_t *
+event_registry_new(void)
+{
+  event_registry_t* registry = tor_malloc_zero(sizeof(event_registry_t));
+
+  tor_mutex_init(&registry->lock);
+  registry->events = smartlist_new();
+
+  return registry;
+}
+
+void
+event_registry_free(event_registry_t *registry)
+{
+  tor_assert(registry != NULL);
+
+  tor_mutex_uninit(&registry->lock);
+
+  SMARTLIST_FOREACH_BEGIN(registry->events, char *, help_label) {
+    if (help_label != NULL) {
+      tor_free(help_label);
+    }
+  } SMARTLIST_FOREACH_END(help_label);
+  smartlist_free(registry->events);
+
+  memset(registry, 0x00, sizeof(*registry));
+  tor_free(registry);
+}
+
+event_label_t
+event_registry_register_event(event_registry_t *registry,
+                              const char *help_label)
+{
+  tor_assert(registry != NULL);
+  tor_mutex_acquire(&registry->lock);
+
+  int num_events = smartlist_len(registry->events);
+  if (help_label) {
+    smartlist_add_strdup(registry->events, help_label);
+  } else {
+    smartlist_add(registry->events, NULL);
+  }
+
+  tor_mutex_release(&registry->lock);
+  return (event_label_t)num_events;
+}
+
+const char *
+event_registry_get_help_label(event_registry_t *registry,
+                              event_label_t event_label)
+{
+  tor_assert(registry != NULL);
+  tor_mutex_acquire(&registry->lock);
+
+  int label_index = (int)event_label;
+  tor_assert(label_index >= 0);
+  const char *help_label = smartlist_get(registry->events,
+                                         label_index);
+
+  tor_mutex_release(&registry->lock);
+  return help_label;
+}
+
+/**************************/
+
+static void
+event_listener_eventloop_cb(evutil_socket_t sock, short events, void *arg)
+{
+  event_listener_t *listener = arg;
+  (void) sock;
+  (void) events;
+  event_listener_process(listener);
+}
+
+event_listener_t *
+event_listener_new(void *context)
+{
+  event_listener_t* listener = tor_malloc_zero(sizeof(event_listener_t));
+
+  tor_mutex_init(&listener->lock);
+  listener->is_pending = false;
+  listener->callbacks = smartlist_new();
+  TOR_TAILQ_INIT(&listener->pending_events);
+  listener->context = context;
+  listener->eventloop_ev = NULL;
+
+  return listener;
+}
+
+void
+event_listener_free(event_listener_t *listener)
+{
+  tor_assert(listener != NULL);
+
+  tor_mutex_acquire(&listener->lock);
+
+  if (listener->eventloop_ev != NULL) {
+    event_listener_detach(listener);
+    // this will make sure the libevent callback has stopped
+  }
+
+  while (!TOR_TAILQ_EMPTY(&listener->pending_events)) {
+    event_wrapper_t *wrapper = TOR_TAILQ_FIRST(&listener->pending_events);
+    TOR_TAILQ_REMOVE(&listener->pending_events, wrapper, next_event);
+    event_wrapper_free(wrapper);
+  }
+
+  SMARTLIST_FOREACH_BEGIN(listener->callbacks, event_callback_t *, cb) {
+    if (cb != NULL) {
+      event_callback_free(cb);
+    }
+  } SMARTLIST_FOREACH_END(cb);
+  smartlist_free(listener->callbacks);
+
+  listener->context = NULL;
+  listener->is_pending = false;
+
+  tor_mutex_release(&listener->lock);
+  tor_mutex_uninit(&listener->lock);
+
+  memset(listener, 0x00, sizeof(*listener));
+  tor_free(listener);
+}
+
+void
+event_listener_attach(event_listener_t *listener, struct event_base *base)
+{
+  tor_assert(listener != NULL);
+  tor_assert(base != NULL);
+
+  tor_mutex_acquire(&listener->lock);
+
+  tor_assert(listener->eventloop_ev == NULL);
+  listener->eventloop_ev = tor_event_new(base, -1,
+                                         EV_READ|EV_PERSIST, // TODO: do we need persist?
+                                         event_listener_eventloop_cb,
+                                         listener);
+
+  if (listener->is_pending) {
+    event_active(listener->eventloop_ev, EV_READ, 1);
+  }
+
+  tor_mutex_release(&listener->lock);
+}
+
+void
+event_listener_detach(event_listener_t *listener)
+{
+  tor_assert(listener != NULL);
+
+  tor_mutex_acquire(&listener->lock);
+
+  if (listener->eventloop_ev != NULL) {
+    tor_event_free(listener->eventloop_ev);
+    listener->eventloop_ev = NULL;
+  }
+
+  tor_mutex_release(&listener->lock);
+}
+
+void
+event_listener_set_callback(event_listener_t *listener, event_label_t label,
+                            bool edge_triggered,
+                            void (*process_event_fn)(event_label_t,
+                                                     event_data_t,
+                                                     void *))
+{
+  tor_assert(listener != NULL);
+  tor_assert(label != EVENT_LABEL_UNSET);
+  tor_assert(process_event_fn != NULL);
+  
+  int index = (int)label;
+  tor_assert(index >= 0);
+
+  event_callback_t *cb = event_callback_new(edge_triggered, process_event_fn);
+
+  if (index >= 1000) {
+    log_warn(LD_BUG, "An event label was very large (%d), but the event "
+                     "listener assumes that event labels are small.", index);
+    /* We're using a smartlist as a lookup table, and assume that the labels are
+       small and therefore the list should not be sparse. If the label is large,
+       then we either have *many* events, or we're choosing our event labels
+       inefficiently. */
+  }
+
+  tor_mutex_acquire(&listener->lock);
+
+  smartlist_grow(listener->callbacks, index+1);
+
+  event_callback_t *existing_cb = smartlist_get(listener->callbacks, index);
+  if (existing_cb != NULL) {
+    // we only support one callback per event type
+    event_callback_free(existing_cb);
+    log_warn(LD_BUG, "We are overriding a previous callback.");
+  }
+  smartlist_set(listener->callbacks, index, cb);
+
+  tor_mutex_release(&listener->lock); 
+}
+
+static void
+event_listener_receive(event_listener_t *listener, event_label_t label,
+                       event_wrapper_t *wrapper)
+{
+  tor_assert(listener != NULL);
+  tor_assert(label != EVENT_LABEL_UNSET);
+
+  int index = (int)label;
+  tor_assert(index >= 0);
+
+  tor_mutex_acquire(&listener->lock);
+
+  if (index >= smartlist_len(listener->callbacks)) {
+    log_warn(LD_BUG, "We don't have a callback for this event");
+    if (wrapper != NULL) {
+      event_wrapper_free(wrapper);
+    }
+    tor_mutex_release(&listener->lock);
+    return;
+  }
+
+  event_callback_t *cb = smartlist_get(listener->callbacks, index);
+  if (cb == NULL) {
+    log_warn(LD_BUG, "We don't have a callback for this event");
+    if (wrapper != NULL) {
+      event_wrapper_free(wrapper);
+    }
+    tor_mutex_release(&listener->lock);
+    return;
+  }
+
+  if (cb->edge_triggered) {
+    cb->is_edge_pending = true;
+    if (wrapper != NULL) {
+      log_warn(LD_BUG, "An edge-triggered event received a full event");
+      event_wrapper_free(wrapper);
+    }
+  } else {
+    tor_assert(wrapper != NULL);
+    TOR_TAILQ_INSERT_TAIL(&listener->pending_events, wrapper, next_event);
+  }
+
+  if (!listener->is_pending) {
+    listener->is_pending = true;
+    if (listener->eventloop_ev != NULL) {
+      event_active(listener->eventloop_ev, EV_READ, 1);
+    }
+  }
+
+  tor_mutex_release(&listener->lock);
+}
+
+void
+event_listener_process(event_listener_t *listener)
+{
+  tor_assert(listener != NULL);
+
+  tor_mutex_acquire(&listener->lock);
+
+  void *context = listener->context;
+  bool more_events = true;
+
+  while (more_events) {
+    // first process edge-triggered events
+    event_data_t null_data = { .ptr = NULL };
+    SMARTLIST_FOREACH_BEGIN(listener->callbacks, event_callback_t *, cb) {
+      if (cb != NULL && cb->is_edge_pending) {
+        void (*process_event_fn)(event_label_t, event_data_t, void *) = NULL;
+        process_event_fn = cb->process_event_fn;
+        event_label_t label = (int)cb_sl_idx;
+        cb->is_edge_pending = false;
+
+        tor_mutex_release(&listener->lock);
+
+        if (PREDICT_LIKELY(process_event_fn != NULL)) {
+          process_event_fn(label, null_data, context);
+          // edge-triggered events don't have corresponding event data
+        } else {
+          // no callback available
+          log_warn(LD_BUG, "An edge event was received but had no callback");
+        }
+
+        tor_mutex_acquire(&listener->lock);
+        // while we were unlocked, the list may have changed length,
+        // so we make sure it's correct
+        cb_sl_len = smartlist_len(listener->callbacks);
+      }
+    } SMARTLIST_FOREACH_END(cb);
+
+    // then process regular events
+    while (!TOR_TAILQ_EMPTY(&listener->pending_events)) {
+      event_wrapper_t *wrapper = TOR_TAILQ_FIRST(&listener->pending_events);
+      TOR_TAILQ_REMOVE(&listener->pending_events, wrapper, next_event);
+      tor_assert(wrapper != NULL);
+
+      void (*process_event_fn)(event_label_t, event_data_t, void *) = NULL;
+      int index = (int)wrapper->label;
+
+      // do we have a callback for this event label?
+      if (PREDICT_LIKELY(index < smartlist_len(listener->callbacks))) {
+        event_callback_t *cb = smartlist_get(listener->callbacks, index);
+        if (cb != NULL) {
+          tor_assert(!cb->edge_triggered);
+          process_event_fn = cb->process_event_fn;
+        }
+      }
+
+      tor_mutex_release(&listener->lock);
+
+      if (PREDICT_LIKELY(process_event_fn != NULL)) {
+        process_event_fn(wrapper->label, wrapper->data, context);
+      } else {
+        // no callback available
+        log_warn(LD_BUG, "An event was received but had no callback");
+      }
+
+      event_wrapper_free(wrapper);
+      tor_mutex_acquire(&listener->lock);
+    }
+
+    // there's a possibility edge events have been added while running callbacks
+    more_events = false;
+    SMARTLIST_FOREACH_BEGIN(listener->callbacks, event_callback_t *, cb) {
+      if (cb != NULL && cb->is_edge_pending) {
+        more_events = true;
+      }
+    } SMARTLIST_FOREACH_END(cb);
+  }
+
+  listener->is_pending = false;
+
+  tor_mutex_release(&listener->lock);
+}
+
+/**************************/
+
+event_source_t *
+event_source_new(void)
+{
+  event_source_t* source = tor_malloc_zero(sizeof(event_source_t));
+  tor_mutex_init(&source->lock);
+  source->subscriptions = smartlist_new();
+
+  return source;
+}
+
+void
+event_source_free(event_source_t *source)
+{
+  tor_assert(source != NULL);
+
+  tor_mutex_uninit(&source->lock);
+
+  SMARTLIST_FOREACH_BEGIN(source->subscriptions, event_subscription_t *, sub) {
+    if (sub != NULL) {
+      event_subscription_free(sub);
+    }
+  } SMARTLIST_FOREACH_END(sub);
+  smartlist_free(source->subscriptions);
+
+  memset(source, 0x00, sizeof(*source));
+  tor_free(source);
+}
+
+void
+event_source_subscribe(event_source_t *source, event_listener_t *listener,
+                       event_label_t label, bool send_full_event)
+{
+  tor_assert(source != NULL);
+  tor_assert(listener != NULL);
+  tor_assert(label != EVENT_LABEL_UNSET);
+
+  int index = (int)label;
+  tor_assert(index >= 0);
+
+  if (index >= 1000) {
+    log_warn(LD_BUG, "An event label was very large (%d), but the event source "
+                     "assumes that event labels are small.", index);
+    /* We're using a smartlist as a lookup table, and assume that the labels are
+       small and therefore the list should not be sparse. If the label is large,
+       then we either have *many* events, or we're choosing our event labels
+       inefficiently. */
+  }
+
+  event_subscription_t *sub = event_subscription_new(listener, send_full_event);
+
+  tor_mutex_acquire(&source->lock);
+
+  smartlist_grow(source->subscriptions, index+1);
+
+  event_subscription_t *existing_sub = smartlist_get(source->subscriptions, index);
+  if (existing_sub != NULL) {
+    // we only support one listener per event type
+    event_subscription_free(existing_sub);
+    log_warn(LD_BUG, "We are overriding a previous listener.");
+  }
+  smartlist_set(source->subscriptions, index, sub);
+
+  tor_mutex_release(&source->lock);
+}
+
+void
+event_source_unsubscribe(event_source_t *source, event_listener_t *listener,
+                         event_label_t label)
+{
+  tor_assert(source != NULL);
+  tor_assert(listener != NULL);
+  tor_assert(label != EVENT_LABEL_UNSET);
+
+  int index = (int)label;
+  tor_assert(index >= 0);
+
+  tor_mutex_acquire(&source->lock);
+
+  if (index >= smartlist_len(source->subscriptions)) {
+    // there are no subscribers for this event
+    log_warn(LD_GENERAL, "Listener wanted to unsubscribe, but was not subscribed.");
+    tor_mutex_release(&source->lock);
+    return;
+  }
+
+  event_subscription_t *current_sub = smartlist_get(source->subscriptions, index);
+  if (current_sub == NULL || current_sub->listener != listener) {
+    log_warn(LD_GENERAL, "Listener wanted to unsubscribe, but was not subscribed.");
+    tor_mutex_release(&source->lock);
+    return;
+  }
+
+  smartlist_set(source->subscriptions, index, NULL);
+  event_subscription_free(current_sub);
+
+  tor_mutex_release(&source->lock);
+}
+
+void
+event_source_unsubscribe_all(event_source_t *source, event_listener_t *listener)
+{
+  tor_assert(source != NULL);
+  tor_assert(listener != NULL);
+
+  tor_mutex_acquire(&source->lock);
+
+  SMARTLIST_FOREACH_BEGIN(source->subscriptions, event_subscription_t *, sub) {
+    if (sub != NULL && sub->listener == listener) {
+      event_subscription_free(sub);
+      SMARTLIST_REPLACE_CURRENT(source->subscriptions, sub, NULL);
+    }
+  } SMARTLIST_FOREACH_END(sub);
+
+  tor_mutex_release(&source->lock);
+}
+
+void
+event_source_publish(event_source_t *source, event_label_t label,
+                     event_data_t data, void (*free_data_fn)(void *))
+{
+  tor_assert(source != NULL);
+  tor_assert(label != EVENT_LABEL_UNSET);
+
+  int index = (int)label;
+  tor_assert(index >= 0);
+
+  tor_mutex_acquire(&source->lock);
+
+  if (index >= smartlist_len(source->subscriptions)) {
+    // there are no subscribers for this event
+    tor_mutex_release(&source->lock);
+    if (free_data_fn != NULL) {
+      free_data_fn(data.ptr);
+    }
+    return;
+  }
+
+  event_subscription_t *sub = smartlist_get(source->subscriptions, index);
+  if (sub == NULL || sub->listener == NULL) {
+    // there are no subscribers for this event
+    tor_mutex_release(&source->lock);
+    if (free_data_fn != NULL) {
+      free_data_fn(data.ptr);
+    }
+    return;
+  }
+
+  event_wrapper_t *wrapper = NULL;
+
+  if (sub->send_full_event) {
+    wrapper = event_wrapper_new(label, data, free_data_fn);
+  } else {
+    // we don't need to send an event, so free the data now
+    if (free_data_fn != NULL) {
+      free_data_fn(data.ptr);
+    }
+  }
+
+  event_listener_receive(sub->listener, label, wrapper);
+
+  tor_mutex_release(&source->lock);
+}

+ 102 - 0
src/lib/evloop/events.h

@@ -0,0 +1,102 @@
+/* Copyright (c) 2013-2019, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef EVLOOP_EVENTS_H
+#define EVLOOP_EVENTS_H
+
+#include "lib/lock/compat_mutex.h"
+#include "lib/evloop/compat_libevent.h"
+#include "lib/container/smartlist.h"
+#include "ext/tor_queue.h"
+
+/* The type of event. */
+typedef int64_t event_label_t;
+
+#define EVENT_LABEL_UNSET (-1)
+
+/* Data provided with an event. */
+typedef union event_data_t {
+  void *ptr;
+  uint64_t u64;
+} event_data_t;
+
+/* Object to hold an individual event and associated data. */
+typedef struct event_wrapper_t {
+  TOR_TAILQ_ENTRY(event_wrapper_t) next_event;
+  event_label_t label;
+  event_data_t data;
+  void (*free_data_fn)(void *);
+} event_wrapper_t;
+
+/* A list of events and corresponding help strings. */
+typedef struct event_registry_t {
+  tor_mutex_t lock;
+  smartlist_t *events;
+} event_registry_t;
+
+/* An object that publishes events to any subscribed listeners. */
+typedef struct event_source_t {
+  tor_mutex_t lock;
+  smartlist_t *subscriptions;
+} event_source_t;
+
+/* An object that subscribes to a source and processes new events. */
+typedef struct event_listener_t {
+  tor_mutex_t lock;
+  smartlist_t *callbacks;
+  TOR_TAILQ_HEAD(, event_wrapper_t) pending_events;
+  bool is_pending;
+  struct event *eventloop_ev;
+  void *context;
+} event_listener_t;
+
+/* Create the event registry. */
+event_registry_t *event_registry_new(void);
+
+/* Free the event registry. */
+void event_registry_free(event_registry_t *registry);
+
+/* Register a new event and optionally provide a string representation
+   of the event label. */
+event_label_t event_registry_register_event(event_registry_t *registry,
+                                            const char *help_label);
+
+/* Get the help label string registered for an event label. */
+const char *event_registry_get_help_label(event_registry_t *registry,
+                                          event_label_t event_label);
+
+event_listener_t *event_listener_new(void *context);
+
+void event_listener_free(event_listener_t *listener);
+
+void event_listener_attach(event_listener_t *listener, struct event_base *base);
+
+void event_listener_detach(event_listener_t *listener);
+
+void event_listener_set_callback(event_listener_t *listener, event_label_t label,
+                                 bool edge_triggered,
+                                 void (*process_event_fn)(event_label_t,
+                                                          event_data_t,
+                                                          void *));
+
+void event_listener_process(event_listener_t *listener);
+
+/* Create the event source, which publishes events to listeners. */
+event_source_t *event_source_new(void);
+
+void event_source_free(event_source_t *source);
+
+void event_source_subscribe(event_source_t *source, event_listener_t *listener,
+                            event_label_t label, bool send_full_event);
+
+void event_source_unsubscribe(event_source_t *source,
+                              event_listener_t *listener,
+                              event_label_t label);
+
+void event_source_unsubscribe_all(event_source_t *source,
+                                  event_listener_t *listener);
+
+void event_source_publish(event_source_t *source, event_label_t label,
+                          event_data_t data, void (*free_data_fn)(void *));
+
+#endif

+ 2 - 0
src/lib/evloop/include.am

@@ -8,6 +8,7 @@ endif
 # ADD_C_FILE: INSERT SOURCES HERE.
 src_lib_libtor_evloop_a_SOURCES =			\
 	src/lib/evloop/compat_libevent.c		\
+	src/lib/evloop/events.c				\
 	src/lib/evloop/evloop_sys.c			\
 	src/lib/evloop/procmon.c			\
 	src/lib/evloop/timers.c				\
@@ -22,6 +23,7 @@ src_lib_libtor_evloop_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS)
 # ADD_C_FILE: INSERT HEADERS HERE.
 noinst_HEADERS +=					\
 	src/lib/evloop/compat_libevent.h		\
+	src/lib/evloop/events.h				\
 	src/lib/evloop/evloop_sys.h			\
 	src/lib/evloop/procmon.h			\
 	src/lib/evloop/timers.h				\

+ 16 - 27
src/lib/tls/buffers_tls.c

@@ -61,10 +61,10 @@ read_to_chunk_tls(buf_t *buf, chunk_t *chunk, tor_tls_t *tls,
  * ready to write -- or vice versa.
  */
 int
-buf_read_from_tls(buf_t *buf, tor_tls_t *tls, size_t at_most)
+buf_read_from_tls(buf_t *buf, tor_tls_t *tls, size_t at_most, size_t *total_read)
 {
   int r = 0;
-  size_t total_read = 0;
+  *total_read = 0;
 
   check_no_tls_errors();
 
@@ -73,8 +73,8 @@ buf_read_from_tls(buf_t *buf, tor_tls_t *tls, size_t at_most)
   IF_BUG_ONCE(buf->datalen >= INT_MAX - at_most)
     return -1;
 
-  while (at_most > total_read) {
-    size_t readlen = at_most - total_read;
+  while (at_most > *total_read) {
+    size_t readlen = at_most - *total_read;
     chunk_t *chunk;
     if (!buf->tail || CHUNK_REMAINING_CAPACITY(buf->tail) < MIN_READ_LEN) {
       chunk = buf_add_chunk_with_capacity(buf, at_most, 1);
@@ -90,12 +90,11 @@ buf_read_from_tls(buf_t *buf, tor_tls_t *tls, size_t at_most)
     r = read_to_chunk_tls(buf, chunk, tls, readlen);
     if (r < 0)
       return r; /* Error */
-    tor_assert(total_read+r < INT_MAX);
-    total_read += r;
+    *total_read += r;
     if ((size_t)r < readlen) /* eof, block, or no more to read. */
       break;
   }
-  return (int)total_read;
+  return TOR_TLS_DONE;
 }
 
 /** Helper for buf_flush_to_tls(): try to write <b>sz</b> bytes from chunk
@@ -105,8 +104,7 @@ buf_read_from_tls(buf_t *buf, tor_tls_t *tls, size_t at_most)
  * written on success, and a TOR_TLS error code on failure or blocking.
  */
 static inline int
-flush_chunk_tls(tor_tls_t *tls, buf_t *buf, chunk_t *chunk,
-                size_t sz, size_t *buf_flushlen)
+flush_chunk_tls(tor_tls_t *tls, buf_t *buf, chunk_t *chunk, size_t sz)
 {
   int r;
   size_t forced;
@@ -125,13 +123,9 @@ flush_chunk_tls(tor_tls_t *tls, buf_t *buf, chunk_t *chunk,
   r = tor_tls_write(tls, data, sz);
   if (r < 0)
     return r;
-  if (*buf_flushlen > (size_t)r)
-    *buf_flushlen -= r;
-  else
-    *buf_flushlen = 0;
   buf_drain(buf, r);
-  log_debug(LD_NET,"flushed %d bytes, %d ready to flush, %d remain.",
-            r,(int)*buf_flushlen,(int)buf->datalen);
+  log_debug(LD_NET,"flushed %d bytes, %d remain.",
+            r,(int)buf->datalen);
   return r;
 }
 
@@ -140,17 +134,13 @@ flush_chunk_tls(tor_tls_t *tls, buf_t *buf, chunk_t *chunk,
  */
 int
 buf_flush_to_tls(buf_t *buf, tor_tls_t *tls, size_t flushlen,
-              size_t *buf_flushlen)
+                 size_t *total_written)
 {
+  *total_written = 0;
   int r;
-  size_t flushed = 0;
   ssize_t sz;
-  tor_assert(buf_flushlen);
-  if (BUG(*buf_flushlen > buf->datalen)) {
-    *buf_flushlen = buf->datalen;
-  }
-  if (BUG(flushlen > *buf_flushlen)) {
-    flushlen = *buf_flushlen;
+  if (BUG(flushlen > buf->datalen)) {
+    flushlen = buf->datalen;
   }
   sz = (ssize_t) flushlen;
 
@@ -169,14 +159,13 @@ buf_flush_to_tls(buf_t *buf, tor_tls_t *tls, size_t flushlen,
       flushlen0 = 0;
     }
 
-    r = flush_chunk_tls(tls, buf, buf->head, flushlen0, buf_flushlen);
+    r = flush_chunk_tls(tls, buf, buf->head, flushlen0);
     if (r < 0)
       return r;
-    flushed += r;
+    *total_written += r;
     sz -= r;
     if (r == 0) /* Can't flush any more now. */
       break;
   } while (sz > 0);
-  tor_assert(flushed < INT_MAX);
-  return (int)flushed;
+  return TOR_TLS_DONE;
 }

+ 3 - 3
src/lib/tls/buffers_tls.h

@@ -15,9 +15,9 @@
 struct buf_t;
 struct tor_tls_t;
 
-int buf_read_from_tls(struct buf_t *buf,
-                      struct tor_tls_t *tls, size_t at_most);
+int buf_read_from_tls(struct buf_t *buf, struct tor_tls_t *tls,
+                      size_t at_most, size_t *total_read);
 int buf_flush_to_tls(struct buf_t *buf, struct tor_tls_t *tls,
-                     size_t sz, size_t *buf_flushlen);
+                     size_t flushlen, size_t *total_written);
 
 #endif /* !defined(TOR_BUFFERS_TLS_H) */

+ 5 - 5
src/lib/tls/tortls.h

@@ -90,9 +90,9 @@ void tor_tls_context_decref(tor_tls_context_t *ctx);
 tor_tls_context_t *tor_tls_context_get(int is_server);
 tor_tls_t *tor_tls_new(tor_socket_t sock, int is_server);
 void tor_tls_set_logged_address(tor_tls_t *tls, const char *address);
-void tor_tls_set_renegotiate_callback(tor_tls_t *tls,
-                                      void (*cb)(tor_tls_t *, void *arg),
-                                      void *arg);
+//void tor_tls_set_renegotiate_callback(tor_tls_t *tls,
+//                                      void (*cb)(tor_tls_t *, void *arg),
+//                                      void *arg);
 int tor_tls_is_server(tor_tls_t *tls);
 void tor_tls_release_socket(tor_tls_t *tls);
 void tor_tls_free_(tor_tls_t *tls);
@@ -120,10 +120,10 @@ int tor_tls_get_buffer_sizes(tor_tls_t *tls,
 
 MOCK_DECL(double, tls_get_write_overhead_ratio, (void));
 
-int tor_tls_used_v1_handshake(tor_tls_t *tls);
+//int tor_tls_used_v1_handshake(tor_tls_t *tls);
 int tor_tls_get_num_server_handshakes(tor_tls_t *tls);
 int tor_tls_server_got_renegotiate(tor_tls_t *tls);
-MOCK_DECL(int,tor_tls_cert_matches_key,(const tor_tls_t *tls,
+MOCK_DECL(int,tor_tls_cert_matches_key,(const struct tor_x509_cert_t *peer_cert,
                                         const struct tor_x509_cert_t *cert));
 MOCK_DECL(int,tor_tls_get_tlssecrets,(tor_tls_t *tls, uint8_t *secrets_out));
 #ifdef ENABLE_OPENSSL

+ 11 - 11
src/lib/tls/tortls_nss.c

@@ -441,17 +441,17 @@ tor_tls_new(tor_socket_t sock, int is_server)
   return tls;
 }
 
-void
-tor_tls_set_renegotiate_callback(tor_tls_t *tls,
-                                 void (*cb)(tor_tls_t *, void *arg),
-                                 void *arg)
-{
-  tor_assert(tls);
-  (void)cb;
-  (void)arg;
-
-  /* We don't support renegotiation-based TLS with NSS. */
-}
+//void
+//tor_tls_set_renegotiate_callback(tor_tls_t *tls,
+//                                 void (*cb)(tor_tls_t *, void *arg),
+//                                 void *arg)
+//{
+//  tor_assert(tls);
+//  (void)cb;
+//  (void)arg;
+//
+//  /* We don't support renegotiation-based TLS with NSS. */
+//}
 
 /**
  * Tell the TLS library that the underlying socket for <b>tls</b> has been

+ 96 - 101
src/lib/tls/tortls_openssl.c

@@ -476,9 +476,10 @@ static const char CLIENT_CIPHER_LIST[] =
  * the key certified in <b>cert</b> is the same as the key they used to do it.
  */
 MOCK_IMPL(int,
-tor_tls_cert_matches_key,(const tor_tls_t *tls, const tor_x509_cert_t *cert))
+tor_tls_cert_matches_key,(const tor_x509_cert_t *peer_cert, const tor_x509_cert_t *cert))
 {
-  tor_x509_cert_t *peer = tor_tls_get_peer_cert((tor_tls_t *)tls);
+  //tor_x509_cert_t *peer = tor_tls_get_peer_cert((tor_tls_t *)tls);
+  const tor_x509_cert_t *peer = peer_cert;
   if (!peer)
     return 0;
 
@@ -491,7 +492,7 @@ tor_tls_cert_matches_key,(const tor_tls_t *tls, const tor_x509_cert_t *cert))
 
   result = link_key && cert_key && EVP_PKEY_cmp(cert_key, link_key) == 1;
 
-  tor_x509_cert_free(peer);
+  //tor_x509_cert_free(peer);
   if (link_key)
     EVP_PKEY_free(link_key);
   if (cert_key)
@@ -938,57 +939,57 @@ tor_tls_client_is_using_v2_ciphers(const SSL *ssl)
  *         do not send or request extra certificates in v2 handshakes.</li>
  * <li>To detect renegotiation</li></ul>
  */
-void
-tor_tls_server_info_callback(const SSL *ssl, int type, int val)
-{
-  tor_tls_t *tls;
-  (void) val;
-
-  IF_BUG_ONCE(ssl == NULL) {
-    return; // LCOV_EXCL_LINE
-  }
-
-  tor_tls_debug_state_callback(ssl, type, val);
-
-  if (type != SSL_CB_ACCEPT_LOOP)
-    return;
-
-  OSSL_HANDSHAKE_STATE ssl_state = SSL_get_state(ssl);
-  if (! STATE_IS_SW_SERVER_HELLO(ssl_state))
-    return;
-  tls = tor_tls_get_by_ssl(ssl);
-  if (tls) {
-    /* Check whether we're watching for renegotiates.  If so, this is one! */
-    if (tls->negotiated_callback)
-      tls->got_renegotiate = 1;
-  } else {
-    log_warn(LD_BUG, "Couldn't look up the tls for an SSL*. How odd!");
-    return;
-  }
-
-  /* Now check the cipher list. */
-  if (tor_tls_client_is_using_v2_ciphers(ssl)) {
-    if (tls->wasV2Handshake)
-      return; /* We already turned this stuff off for the first handshake;
-               * This is a renegotiation. */
-
-    /* Yes, we're casting away the const from ssl.  This is very naughty of us.
-     * Let's hope openssl doesn't notice! */
-
-    /* Set SSL_MODE_NO_AUTO_CHAIN to keep from sending back any extra certs. */
-    SSL_set_mode((SSL*) ssl, SSL_MODE_NO_AUTO_CHAIN);
-    /* Don't send a hello request. */
-    SSL_set_verify((SSL*) ssl, SSL_VERIFY_NONE, NULL);
-
-    if (tls) {
-      tls->wasV2Handshake = 1;
-    } else {
-      /* LCOV_EXCL_START this line is not reachable */
-      log_warn(LD_BUG, "Couldn't look up the tls for an SSL*. How odd!");
-      /* LCOV_EXCL_STOP */
-    }
-  }
-}
+//void
+//tor_tls_server_info_callback(const SSL *ssl, int type, int val)
+//{
+//  tor_tls_t *tls;
+//  (void) val;
+//
+//  IF_BUG_ONCE(ssl == NULL) {
+//    return; // LCOV_EXCL_LINE
+//  }
+//
+//  tor_tls_debug_state_callback(ssl, type, val);
+//
+//  if (type != SSL_CB_ACCEPT_LOOP)
+//    return;
+//
+//  OSSL_HANDSHAKE_STATE ssl_state = SSL_get_state(ssl);
+//  if (! STATE_IS_SW_SERVER_HELLO(ssl_state))
+//    return;
+//  tls = tor_tls_get_by_ssl(ssl);
+//  if (tls) {
+//    /* Check whether we're watching for renegotiates.  If so, this is one! */
+//    if (tls->negotiated_callback)
+//      tls->got_renegotiate = 1;
+//  } else {
+//    log_warn(LD_BUG, "Couldn't look up the tls for an SSL*. How odd!");
+//    return;
+//  }
+//
+//  /* Now check the cipher list. */
+//  if (tor_tls_client_is_using_v2_ciphers(ssl)) {
+//    if (tls->wasV2Handshake)
+//      return; /* We already turned this stuff off for the first handshake;
+//               * This is a renegotiation. */
+//
+//    /* Yes, we're casting away the const from ssl.  This is very naughty of us.
+//     * Let's hope openssl doesn't notice! */
+//
+//    /* Set SSL_MODE_NO_AUTO_CHAIN to keep from sending back any extra certs. */
+//    SSL_set_mode((SSL*) ssl, SSL_MODE_NO_AUTO_CHAIN);
+//    /* Don't send a hello request. */
+//    SSL_set_verify((SSL*) ssl, SSL_VERIFY_NONE, NULL);
+//
+//    if (tls) {
+//      tls->wasV2Handshake = 1;
+//    } else {
+//      /* LCOV_EXCL_START this line is not reachable */
+//      log_warn(LD_BUG, "Couldn't look up the tls for an SSL*. How odd!");
+//      /* LCOV_EXCL_STOP */
+//    }
+//  }
+//}
 
 /** Callback to get invoked on a server after we've read the list of ciphers
  * the client supports, but before we pick our own ciphersuite.
@@ -1106,9 +1107,16 @@ tor_tls_new(tor_socket_t sock, int isServer)
              result->last_read_count, result->last_write_count);
   }
   if (isServer) {
-    SSL_set_info_callback(result->ssl, tor_tls_server_info_callback);
+    //SSL_set_info_callback(result->ssl, tor_tls_server_info_callback);
   } else {
-    SSL_set_info_callback(result->ssl, tor_tls_debug_state_callback);
+    //SSL_set_info_callback(result->ssl, tor_tls_debug_state_callback);
+  }
+
+  if (isServer) {
+    // set SSL_MODE_NO_AUTO_CHAIN to keep from sending back any extra certs
+    SSL_set_mode(result->ssl, SSL_MODE_NO_AUTO_CHAIN);
+    // don't send a hello request
+    SSL_set_verify(result->ssl, SSL_VERIFY_NONE, NULL);
   }
 
   if (isServer)
@@ -1127,20 +1135,20 @@ tor_tls_new(tor_socket_t sock, int isServer)
  * next gets a client-side renegotiate in the middle of a read.  Do not
  * invoke this function until <em>after</em> initial handshaking is done!
  */
-void
-tor_tls_set_renegotiate_callback(tor_tls_t *tls,
-                                 void (*cb)(tor_tls_t *, void *arg),
-                                 void *arg)
-{
-  tls->negotiated_callback = cb;
-  tls->callback_arg = arg;
-  tls->got_renegotiate = 0;
-  if (cb) {
-    SSL_set_info_callback(tls->ssl, tor_tls_server_info_callback);
-  } else {
-    SSL_set_info_callback(tls->ssl, tor_tls_debug_state_callback);
-  }
-}
+//void
+//tor_tls_set_renegotiate_callback(tor_tls_t *tls,
+//                                 void (*cb)(tor_tls_t *, void *arg),
+//                                 void *arg)
+//{
+//  tls->negotiated_callback = cb;
+//  tls->callback_arg = arg;
+//  tls->got_renegotiate = 0;
+//  if (cb) {
+//    SSL_set_info_callback(tls->ssl, tor_tls_server_info_callback);
+//  } else {
+//    SSL_set_info_callback(tls->ssl, tor_tls_debug_state_callback);
+//  }
+//}
 
 /** If this version of openssl requires it, turn on renegotiation on
  * <b>tls</b>.
@@ -1321,22 +1329,23 @@ tor_tls_handshake(tor_tls_t *tls)
 
   OSSL_HANDSHAKE_STATE newstate = SSL_get_state(tls->ssl);
 
-  if (oldstate != newstate)
+  if (oldstate != newstate) {
     log_debug(LD_HANDSHAKE, "After call, %p was in state %s",
               tls, SSL_state_string_long(tls->ssl));
-  /* We need to call this here and not earlier, since OpenSSL has a penchant
-   * for clearing its flags when you say accept or connect. */
-  tor_tls_unblock_renegotiation(tls);
-  r = tor_tls_get_error(tls,r,0, "handshaking", LOG_INFO, LD_HANDSHAKE);
+  }
+
+  r = tor_tls_get_error(tls, r, 0, "handshaking", LOG_INFO, LD_HANDSHAKE);
   if (ERR_peek_error() != 0) {
     tls_log_errors(tls, tls->isServer ? LOG_INFO : LOG_WARN, LD_HANDSHAKE,
                    "handshaking");
     return TOR_TLS_ERROR_MISC;
   }
+
   if (r == TOR_TLS_DONE) {
     tls->state = TOR_TLS_ST_OPEN;
     return tor_tls_finish_handshake(tls);
   }
+
   return r;
 }
 
@@ -1353,32 +1362,18 @@ tor_tls_finish_handshake(tor_tls_t *tls)
 {
   int r = TOR_TLS_DONE;
   check_no_tls_errors();
+
   if (tls->isServer) {
-    SSL_set_info_callback(tls->ssl, NULL);
     SSL_set_verify(tls->ssl, SSL_VERIFY_PEER, always_accept_verify_cb);
     SSL_clear_mode(tls->ssl, SSL_MODE_NO_AUTO_CHAIN);
-    if (tor_tls_client_is_using_v2_ciphers(tls->ssl)) {
-      /* This check is redundant, but back when we did it in the callback,
-       * we might have not been able to look up the tor_tls_t if the code
-       * was buggy.  Fixing that. */
-      if (!tls->wasV2Handshake) {
-        log_warn(LD_BUG, "For some reason, wasV2Handshake didn't"
-                 " get set. Fixing that.");
-      }
-      tls->wasV2Handshake = 1;
-      log_debug(LD_HANDSHAKE, "Completed V2 TLS handshake with client; waiting"
-                " for renegotiation.");
-    } else {
-      tls->wasV2Handshake = 0;
-    }
+    log_debug(LD_HANDSHAKE, "Completed V2 TLS handshake with client; waiting "
+                            "for renegotiation.");
   } else {
-    /* Client-side */
-    tls->wasV2Handshake = 1;
     /* XXXX this can move, probably? -NM */
-    if (SSL_set_cipher_list(tls->ssl, SERVER_CIPHER_LIST) == 0) {
-      tls_log_errors(NULL, LOG_WARN, LD_HANDSHAKE, "re-setting ciphers");
-      r = TOR_TLS_ERROR_MISC;
-    }
+    //if (SSL_set_cipher_list(tls->ssl, SERVER_CIPHER_LIST) == 0) {
+    //  tls_log_errors(NULL, LOG_WARN, LD_HANDSHAKE, "re-setting ciphers");
+    //  r = TOR_TLS_ERROR_MISC;
+    //}
   }
   tls_log_errors(NULL, LOG_WARN, LD_NET, "finishing the handshake");
   return r;
@@ -1558,11 +1553,11 @@ check_no_tls_errors_(const char *fname, int line)
 
 /** Return true iff the initial TLS connection at <b>tls</b> did not use a v2
  * TLS handshake. Output is undefined if the handshake isn't finished. */
-int
-tor_tls_used_v1_handshake(tor_tls_t *tls)
-{
-  return ! tls->wasV2Handshake;
-}
+//int
+//tor_tls_used_v1_handshake(tor_tls_t *tls)
+//{
+//  return ! tls->wasV2Handshake;
+//}
 
 /** Return true iff the server TLS connection <b>tls</b> got the renegotiation
  * request it was waiting for. */

+ 5 - 5
src/lib/tls/tortls_st.h

@@ -41,11 +41,11 @@ struct tor_tls_t {
                                        * depending on which operations
                                        * have completed successfully. */
   unsigned int isServer:1; /**< True iff this is a server-side connection */
-  unsigned int wasV2Handshake:1; /**< True iff the original handshake for
-                                  * this connection used the updated version
-                                  * of the connection protocol (client sends
-                                  * different cipher list, server sends only
-                                  * one certificate). */
+  //unsigned int wasV2Handshake:1; /**< True iff the original handshake for
+  //                                * this connection used the updated version
+  //                                * of the connection protocol (client sends
+  //                                * different cipher list, server sends only
+  //                                * one certificate). */
   /** True iff we should call negotiated_callback when we're done reading. */
   unsigned int got_renegotiate:1;
 #ifdef ENABLE_OPENSSL

Some files were not shown because too many files changed in this diff