浏览代码

Merge branch 'bufferevent5'

Nick Mathewson 15 年之前
父节点
当前提交
5c83c06c98

+ 9 - 0
changes/bufferevent-support

@@ -0,0 +1,9 @@
+ o Major features
+   - Tor can now optionally build with the "bufferevents" buffered IO
+     backend provided by Libevent, when building with Libevent 2.0.7-rc
+     or later.  To use this feature, make sure you have the latest possible
+     version of Libevent, and run autoconf with the --enable-bufferevents
+     flag.  Using this feature will make our networking code more flexible,
+     lets us stack layers on each other, and let us use more efficient
+     zero-copy transports where available.
+

+ 54 - 3
configure.in

@@ -118,6 +118,9 @@ if test "$enable_local_appdata" = "yes"; then
             [Defined if we default to host local appdata paths on Windows])
 fi
 
+AC_ARG_ENABLE(bufferevents,
+     AS_HELP_STRING(--enable-bufferevents, use Libevent's buffered IO.))
+
 AC_PROG_CC
 AC_PROG_CPP
 AC_PROG_MAKE_SET
@@ -223,7 +226,7 @@ dnl -------------------------------------------------------------------
 dnl Check for functions before libevent, since libevent-1.2 apparently
 dnl exports strlcpy without defining it in a header.
 
-AC_CHECK_FUNCS(gettimeofday ftime socketpair uname inet_aton strptime getrlimit strlcat strlcpy strtoull getaddrinfo localtime_r gmtime_r memmem strtok_r writev readv flock prctl vasprintf)
+AC_CHECK_FUNCS(gettimeofday ftime socketpair uname inet_aton strptime getrlimit strlcat strlcpy strtoull getaddrinfo localtime_r gmtime_r memmem strtok_r flock prctl vasprintf)
 
 using_custom_malloc=no
 if test x$enable_openbsd_malloc = xyes ; then
@@ -302,7 +305,7 @@ AC_CHECK_MEMBERS([struct event.min_heap_idx], , ,
 [#include <event.h>
 ])
 
-AC_CHECK_HEADERS(event2/event.h event2/dns.h)
+AC_CHECK_HEADERS(event2/event.h event2/dns.h event2/bufferevent_ssl.h)
 
 LIBS="$save_LIBS"
 LDFLAGS="$save_LDFLAGS"
@@ -322,6 +325,54 @@ else
 fi
 AC_SUBST(TOR_LIBEVENT_LIBS)
 
+dnl This isn't the best test for Libevent 2.0.3-alpha.  Once it's released,
+dnl we can do much better.
+if test "$enable_bufferevents" = "yes" ; then
+  if test "$ac_cv_header_event2_bufferevent_ssl_h" != "yes" ; then
+    AC_MSG_ERROR([You've asked for bufferevent support, but you're using a version of Libevent without SSL support.  This won't work.  We need Libevent 2.0.7-rc or later, and you don't seem to even have Libevent 2.0.3-alpha.])
+  else
+
+    CPPFLAGS="$CPPFLAGS $TOR_CPPFLAGS_libevent"
+
+    # Check for the right version.  First see if version detection works.
+    AC_MSG_CHECKING([whether we can detect the Libevent version])
+    AC_COMPILE_IFELSE([
+#include <event2/event.h>
+#if !defined(LIBEVENT_VERSION_NUMBER) || LIBEVENT_VERSION_NUMBER < 10
+#error
+int x = y(zz);
+#else
+int x = 1;
+#endif
+  ], [event_version_number_works=yes; AC_MSG_RESULT([yes]) ],
+     [event_version_number_works=no;  AC_MSG_RESULT([no])])
+    if test "$event_version_number_works" != 'yes'; then
+      AC_MSG_WARN([Version detection on Libevent seems broken.  Your Libevent installation is probably screwed up or very old.])
+    else
+      AC_MSG_CHECKING([whether Libevent is new enough for bufferevents])
+      AC_COMPILE_IFELSE([
+#include <event2/event.h>
+#if !defined(LIBEVENT_VERSION_NUMBER) || LIBEVENT_VERSION_NUMBER < 0x02000700
+#error
+int x = y(zz);
+#else
+int x = 1;
+#endif
+   ], [ AC_MSG_RESULT([yes]) ],
+      [ AC_MSG_RESULT([no])
+        AC_MSG_ERROR([Libevent does not seem new enough to support bufferevents.  We require 2.0.7-rc or later]) ] )
+    fi
+  fi
+fi
+
+LIBS="$save_LIBS"
+LDFLAGS="$save_LDFLAGS"
+CPPFLAGS="$save_CPPFLAGS"
+
+AM_CONDITIONAL(USE_BUFFEREVENTS, test "$enable_bufferevents" = "yes")
+if test "$enable_bufferevents" = "yes"; then
+   AC_DEFINE(USE_BUFFEREVENTS, 1, [Defined if we're going to use Libevent's buffered IO API])
+fi
 
 dnl ------------------------------------------------------
 dnl Where do you live, openssl?  And how do we call you?
@@ -393,7 +444,7 @@ AC_SYS_LARGEFILE
 
 AC_CHECK_HEADERS(unistd.h string.h signal.h sys/stat.h sys/types.h fcntl.h sys/fcntl.h sys/time.h errno.h assert.h time.h, , AC_MSG_WARN(Some headers were not found, compilation may fail.  If compilation succeeds, please send your orconfig.h to the developers so we can fix this warning.))
 
-AC_CHECK_HEADERS(netdb.h sys/ioctl.h sys/socket.h arpa/inet.h netinet/in.h pwd.h grp.h sys/un.h sys/uio.h)
+AC_CHECK_HEADERS(netdb.h sys/ioctl.h sys/socket.h arpa/inet.h netinet/in.h pwd.h grp.h sys/un.h)
 
 dnl These headers are not essential
 

+ 26 - 0
src/common/compat_libevent.c

@@ -551,3 +551,29 @@ periodic_timer_free(periodic_timer_t *timer)
   tor_free(timer);
 }
 
+#ifdef USE_BUFFEREVENTS
+static const struct timeval *one_tick = NULL;
+/**
+ * Return a special timeout to be passed whenever libevent's O(1) timeout
+ * implementation should be used. Only use this when the timer is supposed
+ * to fire after 1 / TOR_LIBEVENT_TICKS_PER_SECOND seconds have passed.
+*/
+const struct timeval *
+tor_libevent_get_one_tick_timeout(void)
+{
+  if (PREDICT_UNLIKELY(one_tick == NULL)) {
+    struct event_base *base = tor_libevent_get_base();
+    struct timeval tv;
+    if (TOR_LIBEVENT_TICKS_PER_SECOND == 1) {
+      tv.tv_sec = 1;
+      tv.tv_usec = 0;
+    } else {
+      tv.tv_sec = 0;
+      tv.tv_usec = 1000000 / TOR_LIBEVENT_TICKS_PER_SECOND;
+    }
+    one_tick = event_base_init_common_timeout(base, &tv);
+  }
+  return one_tick;
+}
+#endif
+

+ 8 - 0
src/common/compat_libevent.h

@@ -8,6 +8,9 @@
 
 struct event;
 struct event_base;
+#ifdef USE_BUFFEREVENTS
+struct bufferevent;
+#endif
 
 #ifdef HAVE_EVENT2_EVENT_H
 #include <event2/util.h>
@@ -61,5 +64,10 @@ void tor_check_libevent_version(const char *m, int server,
 void tor_check_libevent_header_compatibility(void);
 const char *tor_libevent_get_version_str(void);
 
+#ifdef USE_BUFFEREVENTS
+#define TOR_LIBEVENT_TICKS_PER_SECOND 3
+const struct timeval *tor_libevent_get_one_tick_timeout(void);
+#endif
+
 #endif
 

+ 2 - 1
src/common/crypto.h

@@ -238,7 +238,8 @@ void secret_to_key(char *key_out, size_t key_out_len, const char *secret,
                    size_t secret_len, const char *s2k_specifier);
 
 #ifdef CRYPTO_PRIVATE
-/* Prototypes for private functions only used by tortls.c and crypto.c */
+/* Prototypes for private functions only used by tortls.c, crypto.c, and the
+ * unit tests. */
 struct rsa_st;
 struct evp_pkey_st;
 struct dh_st;

+ 137 - 42
src/common/tortls.c

@@ -44,7 +44,14 @@
 #error "We require OpenSSL >= 0.9.7"
 #endif
 
+#ifdef USE_BUFFEREVENTS
+#include <event2/bufferevent_ssl.h>
+#include <event2/buffer.h>
+#include "compat_libevent.h"
+#endif
+
 #define CRYPTO_PRIVATE /* to import prototypes from crypto.h */
+#define TORTLS_PRIVATE
 
 #include "crypto.h"
 #include "tortls.h"
@@ -107,6 +114,7 @@ struct tor_tls_t {
   enum {
     TOR_TLS_ST_HANDSHAKE, TOR_TLS_ST_OPEN, TOR_TLS_ST_GOTCLOSE,
     TOR_TLS_ST_SENTCLOSE, TOR_TLS_ST_CLOSED, TOR_TLS_ST_RENEGOTIATE,
+    TOR_TLS_ST_BUFFEREVENT
   } state : 3; /**< The current SSL state, depending on which operations have
                 * completed successfully. */
   unsigned int isServer:1; /**< True iff this is a server-side connection */
@@ -187,7 +195,6 @@ static X509* tor_tls_create_certificate(crypto_pk_env_t *rsa,
                                         const char *cname,
                                         const char *cname_sign,
                                         unsigned int lifetime);
-static void tor_tls_unblock_renegotiation(tor_tls_t *tls);
 
 /** Global tls context. We keep it here because nobody else needs to
  * touch it. */
@@ -1024,7 +1031,7 @@ tor_tls_set_renegotiate_callback(tor_tls_t *tls,
 /** If this version of openssl requires it, turn on renegotiation on
  * <b>tls</b>.
  */
-static void
+void
 tor_tls_unblock_renegotiation(tor_tls_t *tls)
 {
   /* Yes, we know what we are doing here.  No, we do not treat a renegotiation
@@ -1192,56 +1199,86 @@ tor_tls_handshake(tor_tls_t *tls)
   }
   if (r == TOR_TLS_DONE) {
     tls->state = TOR_TLS_ST_OPEN;
-    if (tls->isServer) {
-      SSL_set_info_callback(tls->ssl, NULL);
-      SSL_set_verify(tls->ssl, SSL_VERIFY_PEER, always_accept_verify_cb);
-      /* There doesn't seem to be a clear OpenSSL API to clear mode flags. */
-      tls->ssl->mode &= ~SSL_MODE_NO_AUTO_CHAIN;
+    return tor_tls_finish_handshake(tls);
+  }
+  return r;
+}
+
+/** Perform the final part of the intial TLS handshake on <b>tls</b>.  This
+ * should be called for the first handshake only: it determines whether the v1
+ * or the v2 handshake was used, and adjusts things for the renegotiation
+ * handshake as appropriate.
+ *
+ * tor_tls_handshake() calls this on its own; you only need to call this if
+ * bufferevent is doing the handshake for you.
+ */
+int
+tor_tls_finish_handshake(tor_tls_t *tls)
+{
+  int r = TOR_TLS_DONE;
+  if (tls->isServer) {
+    SSL_set_info_callback(tls->ssl, NULL);
+    SSL_set_verify(tls->ssl, SSL_VERIFY_PEER, always_accept_verify_cb);
+    /* There doesn't seem to be a clear OpenSSL API to clear mode flags. */
+    tls->ssl->mode &= ~SSL_MODE_NO_AUTO_CHAIN;
 #ifdef V2_HANDSHAKE_SERVER
-      if (tor_tls_client_is_using_v2_ciphers(tls->ssl, ADDR(tls))) {
-        /* This check is redundant, but back when we did it in the callback,
-         * we might have not been able to look up the tor_tls_t if the code
-         * was buggy.  Fixing that. */
-        if (!tls->wasV2Handshake) {
-          log_warn(LD_BUG, "For some reason, wasV2Handshake didn't"
-                   " get set. Fixing that.");
-        }
-        tls->wasV2Handshake = 1;
-        log_debug(LD_HANDSHAKE,
-                  "Completed V2 TLS handshake with client; waiting "
-                  "for renegotiation.");
-      } else {
-        tls->wasV2Handshake = 0;
+    if (tor_tls_client_is_using_v2_ciphers(tls->ssl, ADDR(tls))) {
+      /* This check is redundant, but back when we did it in the callback,
+       * we might have not been able to look up the tor_tls_t if the code
+       * was buggy.  Fixing that. */
+      if (!tls->wasV2Handshake) {
+        log_warn(LD_BUG, "For some reason, wasV2Handshake didn't"
+                 " get set. Fixing that.");
       }
-#endif
+      tls->wasV2Handshake = 1;
+      log_debug(LD_HANDSHAKE, "Completed V2 TLS handshake with client; waiting"
+                " for renegotiation.");
     } else {
+      tls->wasV2Handshake = 0;
+    }
+#endif
+  } else {
 #ifdef V2_HANDSHAKE_CLIENT
-      /* If we got no ID cert, we're a v2 handshake. */
-      X509 *cert = SSL_get_peer_certificate(tls->ssl);
-      STACK_OF(X509) *chain = SSL_get_peer_cert_chain(tls->ssl);
-      int n_certs = sk_X509_num(chain);
-      if (n_certs > 1 || (n_certs == 1 && cert != sk_X509_value(chain, 0))) {
-        log_debug(LD_HANDSHAKE, "Server sent back multiple certificates; it "
-                  "looks like a v1 handshake on %p", tls);
-        tls->wasV2Handshake = 0;
-      } else {
-        log_debug(LD_HANDSHAKE,
-                  "Server sent back a single certificate; looks like "
-                  "a v2 handshake on %p.", tls);
-        tls->wasV2Handshake = 1;
-      }
-      if (cert)
-        X509_free(cert);
+    /* If we got no ID cert, we're a v2 handshake. */
+    X509 *cert = SSL_get_peer_certificate(tls->ssl);
+    STACK_OF(X509) *chain = SSL_get_peer_cert_chain(tls->ssl);
+    int n_certs = sk_X509_num(chain);
+    if (n_certs > 1 || (n_certs == 1 && cert != sk_X509_value(chain, 0))) {
+      log_debug(LD_HANDSHAKE, "Server sent back multiple certificates; it "
+                "looks like a v1 handshake on %p", tls);
+      tls->wasV2Handshake = 0;
+    } else {
+      log_debug(LD_HANDSHAKE,
+                "Server sent back a single certificate; looks like "
+                "a v2 handshake on %p.", tls);
+      tls->wasV2Handshake = 1;
+    }
+    if (cert)
+      X509_free(cert);
 #endif
-      if (SSL_set_cipher_list(tls->ssl, SERVER_CIPHER_LIST) == 0) {
-        tls_log_errors(NULL, LOG_WARN, LD_HANDSHAKE, "re-setting ciphers");
-        r = TOR_TLS_ERROR_MISC;
-      }
+    if (SSL_set_cipher_list(tls->ssl, SERVER_CIPHER_LIST) == 0) {
+      tls_log_errors(NULL, LOG_WARN, LD_HANDSHAKE, "re-setting ciphers");
+      r = TOR_TLS_ERROR_MISC;
     }
   }
   return r;
 }
 
+#ifdef USE_BUFFEREVENTS
+/** Put <b>tls</b>, which must be a client connection, into renegotiation
+ * mode. */
+int
+tor_tls_start_renegotiating(tor_tls_t *tls)
+{
+  int r = SSL_renegotiate(tls->ssl);
+  if (r <= 0) {
+    return tor_tls_get_error(tls, r, 0, "renegotiating", LOG_WARN,
+                             LD_HANDSHAKE);
+  }
+  return 0;
+}
+#endif
+
 /** Client only: Renegotiate a TLS session.  When finished, returns
  * TOR_TLS_DONE.  On failure, returns TOR_TLS_ERROR, TOR_TLS_WANTREAD, or
  * TOR_TLS_WANTWRITE.
@@ -1458,6 +1495,8 @@ tor_tls_verify(int severity, tor_tls_t *tls, crypto_pk_env_t **identity_key)
     log_fn(severity,LD_PROTOCOL,"No distinct identity certificate found");
     goto done;
   }
+  tls_log_errors(tls, severity, LD_HANDSHAKE, "before verifying certificate");
+
   if (!(id_pkey = X509_get_pubkey(id_cert)) ||
       X509_verify(cert, id_pkey) <= 0) {
     log_fn(severity,LD_PROTOCOL,"X509_verify on cert and pkey returned <= 0");
@@ -1629,3 +1668,59 @@ tor_tls_get_buffer_sizes(tor_tls_t *tls,
   *wbuf_bytes = tls->ssl->s3->wbuf.left;
 }
 
+#ifdef USE_BUFFEREVENTS
+/** Construct and return an TLS-encrypting bufferevent to send data over
+ * <b>socket</b>, which must match the socket of the underlying bufferevent
+ * <b>bufev_in</b>.  The TLS object <b>tls</b> is used for encryption.
+ *
+ * This function will either create a filtering bufferevent that wraps around
+ * <b>bufev_in</b>, or it will free bufev_in and return a new bufferevent that
+ * uses the <b>tls</b> to talk to the network directly.  Do not use
+ * <b>bufev_in</b> after calling this function.
+ *
+ * The connection will start out doing a server handshake if <b>receiving</b>
+ * is strue, and a client handshake otherwise.
+ *
+ * Returns NULL on failure.
+ */
+struct bufferevent *
+tor_tls_init_bufferevent(tor_tls_t *tls, struct bufferevent *bufev_in,
+                         evutil_socket_t socket, int receiving)
+{
+  struct bufferevent *out;
+  const enum bufferevent_ssl_state state = receiving ?
+    BUFFEREVENT_SSL_ACCEPTING : BUFFEREVENT_SSL_CONNECTING;
+
+#if 0
+  (void) socket;
+  out = bufferevent_openssl_filter_new(tor_libevent_get_base(),
+                                       bufev_in,
+                                       tls->ssl,
+                                       state,
+                                       BEV_OPT_DEFER_CALLBACKS);
+#else
+  if (bufev_in) {
+    evutil_socket_t s = bufferevent_getfd(bufev_in);
+    tor_assert(s == -1 || s == socket);
+    tor_assert(evbuffer_get_length(bufferevent_get_input(bufev_in)) == 0);
+    tor_assert(evbuffer_get_length(bufferevent_get_output(bufev_in)) == 0);
+    tor_assert(BIO_number_read(SSL_get_rbio(tls->ssl)) == 0);
+    tor_assert(BIO_number_written(SSL_get_rbio(tls->ssl)) == 0);
+    bufferevent_free(bufev_in);
+  }
+  tls->state = TOR_TLS_ST_BUFFEREVENT;
+
+  /* Current versions (as of 2.0.7-rc) of Libevent need to defer
+   * bufferevent_openssl callbacks, or else our callback functions will
+   * get called reentrantly, which is bad for us.
+   */
+  out = bufferevent_openssl_socket_new(tor_libevent_get_base(),
+                                       socket,
+                                       tls->ssl,
+                                       state,
+                                       BEV_OPT_DEFER_CALLBACKS);
+#endif
+  return out;
+}
+#endif
+

+ 9 - 0
src/common/tortls.h

@@ -64,7 +64,9 @@ int tor_tls_check_lifetime(tor_tls_t *tls, int tolerance);
 int tor_tls_read(tor_tls_t *tls, char *cp, size_t len);
 int tor_tls_write(tor_tls_t *tls, const char *cp, size_t n);
 int tor_tls_handshake(tor_tls_t *tls);
+int tor_tls_finish_handshake(tor_tls_t *tls);
 int tor_tls_renegotiate(tor_tls_t *tls);
+void tor_tls_unblock_renegotiation(tor_tls_t *tls);
 void tor_tls_block_renegotiation(tor_tls_t *tls);
 int tor_tls_shutdown(tor_tls_t *tls);
 int tor_tls_get_pending_bytes(tor_tls_t *tls);
@@ -85,5 +87,12 @@ int tor_tls_used_v1_handshake(tor_tls_t *tls);
 
 void _check_no_tls_errors(const char *fname, int line);
 
+#ifdef USE_BUFFEREVENTS
+int tor_tls_start_renegotiating(tor_tls_t *tls);
+struct bufferevent *tor_tls_init_bufferevent(tor_tls_t *tls,
+                                     struct bufferevent *bufev_in,
+                                     evutil_socket_t socket, int receiving);
+#endif
+
 #endif
 

+ 8 - 1
src/or/Makefile.am

@@ -40,10 +40,17 @@ AM_CPPFLAGS = -DSHARE_DATADIR="\"$(datadir)\"" \
 # This seems to matter nowhere but on windows, but I assure you that it
 # matters a lot there, and is quite hard to debug if you forget to do it.
 
+if USE_BUFFEREVENTS
+levent_openssl_lib = -levent_openssl
+else
+levent_openssl_lib =
+endif
+
 tor_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ @TOR_LDFLAGS_libevent@
 tor_LDADD = ./libtor.a ../common/libor.a ../common/libor-crypto.a \
 	../common/libor-event.a \
-	@TOR_ZLIB_LIBS@ -lm @TOR_LIBEVENT_LIBS@ @TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@
+	@TOR_ZLIB_LIBS@ -lm @TOR_LIBEVENT_LIBS@ @TOR_OPENSSL_LIBS@ \
+	@TOR_LIB_WS32@ @TOR_LIB_GDI@ $(levent_openssl_lib)
 
 noinst_HEADERS = buffers.h circuitbuild.h circuitlist.h circuituse.h \
 	command.h config.h connection_edge.h connection.h connection_or.h \

+ 503 - 112
src/or/buffers.c

@@ -23,9 +23,6 @@
 #ifdef HAVE_UNISTD_H
 #include <unistd.h>
 #endif
-#ifdef HAVE_SYS_UIO_H
-#include <sys/uio.h>
-#endif
 
 //#define PARANOIA
 
@@ -56,6 +53,13 @@
  * forever.
  */
 
+static int parse_socks(const char *data, size_t datalen, socks_request_t *req,
+                       int log_sockstype, int safe_socks, ssize_t *drain_out,
+                       size_t *want_length_out);
+static int parse_socks_client(const uint8_t *data, size_t datalen,
+                              int state, char **reason,
+                              ssize_t *drain_out);
+
 /* Chunk manipulation functions */
 
 /** A single chunk on a buffer or in a freelist. */
@@ -544,6 +548,7 @@ buf_free(buf_t *buf)
 {
   if (!buf)
     return;
+
   buf_clear(buf);
   buf->magic = 0xdeadbeef;
   tor_free(buf);
@@ -575,10 +580,6 @@ buf_add_chunk_with_capacity(buf_t *buf, size_t capacity, int capped)
   return chunk;
 }
 
-/** If we're using readv and writev, how many chunks are we willing to
- * read/write at a time? */
-#define N_IOV 3
-
 /** Read up to <b>at_most</b> bytes from the socket <b>fd</b> into
  * <b>chunk</b> (which must be on <b>buf</b>). If we get an EOF, set
  * *<b>reached_eof</b> to 1.  Return -1 on error, 0 on eof or blocking,
@@ -588,25 +589,9 @@ read_to_chunk(buf_t *buf, chunk_t *chunk, int fd, size_t at_most,
               int *reached_eof, int *socket_error)
 {
   ssize_t read_result;
-#if 0 && defined(HAVE_READV) && !defined(WIN32)
-  struct iovec iov[N_IOV];
-  int i;
-  size_t remaining = at_most;
-  for (i=0; chunk && i < N_IOV && remaining; ++i) {
-    iov[i].iov_base = CHUNK_WRITE_PTR(chunk);
-    if (remaining > CHUNK_REMAINING_CAPACITY(chunk))
-      iov[i].iov_len = CHUNK_REMAINING_CAPACITY(chunk);
-    else
-      iov[i].iov_len = remaining;
-    remaining -= iov[i].iov_len;
-    chunk = chunk->next;
-  }
-  read_result = readv(fd, iov, i);
-#else
   if (at_most > CHUNK_REMAINING_CAPACITY(chunk))
     at_most = CHUNK_REMAINING_CAPACITY(chunk);
   read_result = tor_socket_recv(fd, CHUNK_WRITE_PTR(chunk), at_most, 0);
-#endif
 
   if (read_result < 0) {
     int e = tor_socket_errno(fd);
@@ -625,14 +610,6 @@ read_to_chunk(buf_t *buf, chunk_t *chunk, int fd, size_t at_most,
     return 0;
   } else { /* actually got bytes. */
     buf->datalen += read_result;
-#if 0 && defined(HAVE_READV) && !defined(WIN32)
-    while ((size_t)read_result > CHUNK_REMAINING_CAPACITY(chunk)) {
-      chunk->datalen += CHUNK_REMAINING_CAPACITY(chunk);
-      read_result -= CHUNK_REMAINING_CAPACITY(chunk);
-      chunk = chunk->next;
-      tor_assert(chunk);
-    }
-#endif
     chunk->datalen += read_result;
     log_debug(LD_NET,"Read %ld bytes. %d on inbuf.", (long)read_result,
               (int)buf->datalen);
@@ -768,25 +745,10 @@ flush_chunk(int s, buf_t *buf, chunk_t *chunk, size_t sz,
             size_t *buf_flushlen)
 {
   ssize_t write_result;
-#if 0 && defined(HAVE_WRITEV) && !defined(WIN32)
-  struct iovec iov[N_IOV];
-  int i;
-  size_t remaining = sz;
-  for (i=0; chunk && i < N_IOV && remaining; ++i) {
-    iov[i].iov_base = chunk->data;
-    if (remaining > chunk->datalen)
-      iov[i].iov_len = chunk->datalen;
-    else
-      iov[i].iov_len = remaining;
-    remaining -= iov[i].iov_len;
-    chunk = chunk->next;
-  }
-  write_result = writev(s, iov, i);
-#else
+
   if (sz > chunk->datalen)
     sz = chunk->datalen;
   write_result = tor_socket_send(s, chunk->data, sz, 0);
-#endif
 
   if (write_result < 0) {
     int e = tor_socket_errno(s);
@@ -1053,6 +1015,103 @@ fetch_var_cell_from_buf(buf_t *buf, var_cell_t **out, int linkproto)
   return 1;
 }
 
+#ifdef USE_BUFFEREVENTS
+/** Try to read <b>n</b> bytes from <b>buf</b> at <b>pos</b> (which may be
+ * NULL for the start of the buffer), copying the data only if necessary.  Set
+ * *<b>data</b> to a pointer to the desired bytes.  Set <b>free_out</b> to 1
+ * if we needed to malloc *<b>data</b> because the original bytes were
+ * noncontiguous; 0 otherwise.  Return the number of bytes actually available
+ * at <b>data</b>.
+ */
+static ssize_t
+inspect_evbuffer(struct evbuffer *buf, char **data, size_t n, int *free_out,
+                 struct evbuffer_ptr *pos)
+{
+  int n_vecs, i;
+
+  if (evbuffer_get_length(buf) < n)
+    n = evbuffer_get_length(buf);
+  if (n == 0)
+    return 0;
+  n_vecs = evbuffer_peek(buf, n, pos, NULL, 0);
+  tor_assert(n_vecs > 0);
+  if (n_vecs == 1) {
+    struct evbuffer_iovec v;
+    i = evbuffer_peek(buf, n, pos, &v, 1);
+    tor_assert(i == 1);
+    *data = v.iov_base;
+    *free_out = 0;
+    return v.iov_len;
+  } else {
+    struct evbuffer_iovec *vecs =
+      tor_malloc(sizeof(struct evbuffer_iovec)*n_vecs);
+    size_t copied = 0;
+    i = evbuffer_peek(buf, n, NULL, vecs, n_vecs);
+    tor_assert(i == n_vecs);
+    *data = tor_malloc(n);
+    for (i=0; i < n_vecs; ++i) {
+      size_t copy = n - copied;
+      if (copy > vecs[i].iov_len)
+        copy = vecs[i].iov_len;
+      tor_assert(copied+copy <= n);
+      memcpy(data+copied, vecs[i].iov_base, copy);
+      copied += copy;
+    }
+    *free_out = 1;
+    return copied;
+  }
+}
+
+/** As fetch_var_cell_from_buf, buf works on an evbuffer. */
+int
+fetch_var_cell_from_evbuffer(struct evbuffer *buf, var_cell_t **out,
+                             int linkproto)
+{
+  char *hdr = NULL;
+  int free_hdr = 0;
+  size_t n;
+  size_t buf_len;
+  uint8_t command;
+  uint16_t cell_length;
+  var_cell_t *cell;
+  int result = 0;
+  if (linkproto == 1)
+    return 0;
+
+  *out = NULL;
+  buf_len = evbuffer_get_length(buf);
+  if (buf_len < VAR_CELL_HEADER_SIZE)
+    return 0;
+
+  n = inspect_evbuffer(buf, &hdr, VAR_CELL_HEADER_SIZE, &free_hdr, NULL);
+  tor_assert(n >= VAR_CELL_HEADER_SIZE);
+
+  command = get_uint8(hdr+2);
+  if (!(CELL_COMMAND_IS_VAR_LENGTH(command))) {
+    goto done;
+  }
+
+  cell_length = ntohs(get_uint16(hdr+3));
+  if (buf_len < (size_t)(VAR_CELL_HEADER_SIZE+cell_length)) {
+    result = 1; /* Not all here yet. */
+    goto done;
+  }
+
+  cell = var_cell_new(cell_length);
+  cell->command = command;
+  cell->circ_id = ntohs(get_uint16(hdr));
+  evbuffer_drain(buf, VAR_CELL_HEADER_SIZE);
+  evbuffer_remove(buf, cell->payload, cell_length);
+  *out = cell;
+  result = 1;
+
+ done:
+  if (free_hdr && hdr)
+    tor_free(hdr);
+  return result;
+}
+#endif
+
 /** Move up to *<b>buf_flushlen</b> bytes from <b>buf_in</b> to
  * <b>buf_out</b>, and modify *<b>buf_flushlen</b> appropriately.
  * Return the number of bytes actually copied.
@@ -1296,6 +1355,94 @@ fetch_from_buf_http(buf_t *buf,
   return 1;
 }
 
+#ifdef USE_BUFFEREVENTS
+/** As fetch_from_buf_http, buf works on an evbuffer. */
+int
+fetch_from_evbuffer_http(struct evbuffer *buf,
+                    char **headers_out, size_t max_headerlen,
+                    char **body_out, size_t *body_used, size_t max_bodylen,
+                    int force_complete)
+{
+  struct evbuffer_ptr crlf, content_length;
+  size_t headerlen, bodylen, contentlen;
+
+  /* Find the first \r\n\r\n in the buffer */
+  crlf = evbuffer_search(buf, "\r\n\r\n", 4, NULL);
+  if (crlf.pos < 0) {
+    /* We didn't find one. */
+    if (evbuffer_get_length(buf) > max_headerlen)
+      return -1; /* Headers too long. */
+    return 0; /* Headers not here yet. */
+  } else if (crlf.pos > (int)max_headerlen) {
+    return -1; /* Headers too long. */
+  }
+
+  headerlen = crlf.pos + 4;  /* Skip over the \r\n\r\n */
+  bodylen = evbuffer_get_length(buf) - headerlen;
+  if (bodylen > max_bodylen)
+    return -1; /* body too long */
+
+  /* Look for the first occurrence of CONTENT_LENGTH insize buf before the
+   * crlfcrlf */
+  content_length = evbuffer_search_range(buf, CONTENT_LENGTH,
+                                         strlen(CONTENT_LENGTH), NULL, &crlf);
+
+  if (content_length.pos >= 0) {
+    /* We found a content_length: parse it and figure out if the body is here
+     * yet. */
+    struct evbuffer_ptr eol;
+    char *data = NULL;
+    int free_data = 0;
+    int n, i;
+    n = evbuffer_ptr_set(buf, &content_length, strlen(CONTENT_LENGTH),
+                         EVBUFFER_PTR_ADD);
+    tor_assert(n == 0);
+    eol = evbuffer_search_eol(buf, &content_length, NULL, EVBUFFER_EOL_CRLF);
+    tor_assert(eol.pos > content_length.pos);
+    tor_assert(eol.pos <= crlf.pos);
+    inspect_evbuffer(buf, &data, eol.pos - content_length.pos, &free_data,
+                         &content_length);
+
+    i = atoi(data);
+    if (free_data)
+      tor_free(data);
+    if (i < 0) {
+      log_warn(LD_PROTOCOL, "Content-Length is less than zero; it looks like "
+               "someone is trying to crash us.");
+      return -1;
+    }
+    contentlen = i;
+    /* if content-length is malformed, then our body length is 0. fine. */
+    log_debug(LD_HTTP,"Got a contentlen of %d.",(int)contentlen);
+    if (bodylen < contentlen) {
+      if (!force_complete) {
+        log_debug(LD_HTTP,"body not all here yet.");
+        return 0; /* not all there yet */
+      }
+    }
+    if (bodylen > contentlen) {
+      bodylen = contentlen;
+      log_debug(LD_HTTP,"bodylen reduced to %d.",(int)bodylen);
+    }
+  }
+
+  if (headers_out) {
+    *headers_out = tor_malloc(headerlen+1);
+    evbuffer_remove(buf, *headers_out, headerlen);
+    (*headers_out)[headerlen] = '\0';
+  }
+  if (body_out) {
+    tor_assert(headers_out);
+    tor_assert(body_used);
+    *body_used = bodylen;
+    *body_out = tor_malloc(bodylen+1);
+    evbuffer_remove(buf, *body_out, bodylen);
+    (*body_out)[bodylen] = '\0';
+  }
+  return 1;
+}
+#endif
+
 /** There is a (possibly incomplete) socks handshake on <b>buf</b>, of one
  * of the forms
  *  - socks4: "socksheader username\\0"
@@ -1324,6 +1471,128 @@ fetch_from_buf_http(buf_t *buf,
 int
 fetch_from_buf_socks(buf_t *buf, socks_request_t *req,
                      int log_sockstype, int safe_socks)
+{
+  int res;
+  ssize_t n_drain;
+  size_t want_length = 128;
+
+  if (buf->datalen < 2) /* version and another byte */
+    return 0;
+
+  do {
+    n_drain = 0;
+    buf_pullup(buf, want_length, 0);
+    tor_assert(buf->head && buf->head->datalen >= 2);
+    want_length = 0;
+
+    res = parse_socks(buf->head->data, buf->head->datalen, req, log_sockstype,
+                      safe_socks, &n_drain, &want_length);
+
+    if (n_drain < 0)
+      buf_clear(buf);
+    else if (n_drain > 0)
+      buf_remove_from_front(buf, n_drain);
+
+  } while (res == 0 && buf->head &&
+           buf->datalen > buf->head->datalen &&
+           want_length < buf->head->datalen);
+
+  return res;
+}
+
+#ifdef USE_BUFFEREVENTS
+/* As fetch_from_buf_socks(), but targets an evbuffer instead. */
+int
+fetch_from_evbuffer_socks(struct evbuffer *buf, socks_request_t *req,
+                          int log_sockstype, int safe_socks)
+{
+  char *data;
+  ssize_t n_drain;
+  size_t datalen, buflen, want_length;
+  int res;
+
+  buflen = evbuffer_get_length(buf);
+  if (buflen < 2)
+    return 0;
+
+  {
+    /* See if we can find the socks request in the first chunk of the buffer.
+     */
+    struct evbuffer_iovec v;
+    int i;
+    want_length = evbuffer_get_contiguous_space(buf);
+    n_drain = 0;
+    i = evbuffer_peek(buf, want_length, NULL, &v, 1);
+    tor_assert(i == 1);
+    data = v.iov_base;
+    datalen = v.iov_len;
+
+    res = parse_socks(data, datalen, req, log_sockstype,
+                      safe_socks, &n_drain, &want_length);
+
+    if (n_drain < 0)
+      evbuffer_drain(buf, evbuffer_get_length(buf));
+    else if (n_drain > 0)
+      evbuffer_drain(buf, n_drain);
+
+    if (res)
+      return res;
+  }
+
+  /* Okay, the first chunk of the buffer didn't have a complete socks request.
+   * That means that either we don't have a whole socks request at all, or
+   * it's gotten split up.  We're going to try passing parse_socks() bigger
+   * and bigger chunks until either it says "Okay, I got it", or it says it
+   * will need more data than we currently have. */
+
+  /* Loop while we have more data that we haven't given parse_socks() yet. */
+  while (evbuffer_get_length(buf) > datalen) {
+    int free_data = 0;
+    n_drain = 0;
+    data = NULL;
+    datalen = inspect_evbuffer(buf, &data, want_length, &free_data, NULL);
+
+    res = parse_socks(data, datalen, req, log_sockstype,
+                      safe_socks, &n_drain, &want_length);
+
+    if (free_data)
+      tor_free(data);
+
+    if (n_drain < 0)
+      evbuffer_drain(buf, evbuffer_get_length(buf));
+    else if (n_drain > 0)
+      evbuffer_drain(buf, n_drain);
+
+    if (res) /* If res is nonzero, parse_socks() made up its mind. */
+      return res;
+
+    /* If parse_socks says that we want less data than we actually tried to
+       give it, we've got some kind of weird situation; just exit the loop for
+       now.
+    */
+    if (want_length <= datalen)
+      break;
+    /* Otherwise, it wants more data than we gave it.  If we can provide more
+     * data than we gave it, we'll try to do so in the next iteration of the
+     * loop. If we can't, the while loop will exit.  It's okay if it asked for
+     * more than we have total; maybe it doesn't really need so much. */
+  }
+
+  return res;
+}
+#endif
+
+/** Implementation helper to implement fetch_from_*_socks.  Instead of looking
+ * at a buffer's contents, we look at the <b>datalen</b> bytes of data in
+ * <b>data</b>. Instead of removing data from the buffer, we set
+ * <b>drain_out</b> to the amount of data that should be removed (or -1 if the
+ * buffer should be cleared).  Instead of pulling more data into the first
+ * chunk of the buffer, we set *<b>want_length_out</b> to the number of bytes
+ * we'd like to see in the input buffer, if they're available. */
+static int
+parse_socks(const char *data, size_t datalen, socks_request_t *req,
+            int log_sockstype, int safe_socks, ssize_t *drain_out,
+            size_t *want_length_out)
 {
   unsigned int len;
   char tmpbuf[TOR_ADDR_BUF_LEN+1];
@@ -1338,25 +1607,20 @@ fetch_from_buf_socks(buf_t *buf, socks_request_t *req,
    * then log a warning to let him know that it might be unwise. */
   static int have_warned_about_unsafe_socks = 0;
 
-  if (buf->datalen < 2) /* version and another byte */
-    return 0;
-
-  buf_pullup(buf, 128, 0);
-  tor_assert(buf->head && buf->head->datalen >= 2);
-
-  socksver = *buf->head->data;
+  socksver = *data;
 
   switch (socksver) { /* which version of socks? */
 
     case 5: /* socks5 */
 
       if (req->socks_version != 5) { /* we need to negotiate a method */
-        unsigned char nummethods = (unsigned char)*(buf->head->data+1);
+        unsigned char nummethods = (unsigned char)*(data+1);
         tor_assert(!req->socks_version);
-        if (buf->datalen < 2u+nummethods)
+        if (datalen < 2u+nummethods) {
+          *want_length_out = 2u+nummethods;
           return 0;
-        buf_pullup(buf, 2u+nummethods, 0);
-        if (!nummethods || !memchr(buf->head->data+2, 0, nummethods)) {
+        }
+        if (!nummethods || !memchr(data+2, 0, nummethods)) {
           log_warn(LD_APP,
                    "socks5: offered methods don't include 'no auth'. "
                    "Rejecting.");
@@ -1367,7 +1631,7 @@ fetch_from_buf_socks(buf_t *buf, socks_request_t *req,
         }
         /* remove packet from buf. also remove any other extraneous
          * bytes, to support broken socks clients. */
-        buf_clear(buf);
+        *drain_out = -1;
 
         req->replylen = 2; /* 2 bytes of response */
         req->reply[0] = 5; /* socks5 reply */
@@ -1378,10 +1642,11 @@ fetch_from_buf_socks(buf_t *buf, socks_request_t *req,
       }
       /* we know the method; read in the request */
       log_debug(LD_APP,"socks5: checking request");
-      if (buf->datalen < 8) /* basic info plus >=2 for addr plus 2 for port */
+      if (datalen < 8) {/* basic info plus >=2 for addr plus 2 for port */
+        *want_length_out = 8;
         return 0; /* not yet */
-      tor_assert(buf->head->datalen >= 8);
-      req->command = (unsigned char) *(buf->head->data+1);
+      }
+      req->command = (unsigned char) *(data+1);
       if (req->command != SOCKS_COMMAND_CONNECT &&
           req->command != SOCKS_COMMAND_RESOLVE &&
           req->command != SOCKS_COMMAND_RESOLVE_PTR) {
@@ -1390,19 +1655,21 @@ fetch_from_buf_socks(buf_t *buf, socks_request_t *req,
                  req->command);
         return -1;
       }
-      switch (*(buf->head->data+3)) { /* address type */
+      switch (*(data+3)) { /* address type */
         case 1: /* IPv4 address */
         case 4: /* IPv6 address */ {
-          const int is_v6 = *(buf->head->data+3) == 4;
+          const int is_v6 = *(data+3) == 4;
           const unsigned addrlen = is_v6 ? 16 : 4;
           log_debug(LD_APP,"socks5: ipv4 address type");
-          if (buf->datalen < 6+addrlen) /* ip/port there? */
+          if (datalen < 6+addrlen) {/* ip/port there? */
+            *want_length_out = 6+addrlen;
             return 0; /* not yet */
+          }
 
           if (is_v6)
-            tor_addr_from_ipv6_bytes(&destaddr, buf->head->data+4);
+            tor_addr_from_ipv6_bytes(&destaddr, data+4);
           else
-            tor_addr_from_ipv4n(&destaddr, get_uint32(buf->head->data+4));
+            tor_addr_from_ipv4n(&destaddr, get_uint32(data+4));
 
           tor_addr_to_str(tmpbuf, &destaddr, sizeof(tmpbuf), 1);
 
@@ -1414,8 +1681,8 @@ fetch_from_buf_socks(buf_t *buf, socks_request_t *req,
             return -1;
           }
           strlcpy(req->address,tmpbuf,sizeof(req->address));
-          req->port = ntohs(get_uint16(buf->head->data+4+addrlen));
-          buf_remove_from_front(buf, 6+addrlen);
+          req->port = ntohs(get_uint16(data+4+addrlen));
+          *drain_out = 6+addrlen;
           if (req->command != SOCKS_COMMAND_RESOLVE_PTR &&
               !addressmap_have_mapping(req->address,0) &&
               !have_warned_about_unsafe_socks) {
@@ -1446,21 +1713,21 @@ fetch_from_buf_socks(buf_t *buf, socks_request_t *req,
                      "hostname type. Rejecting.");
             return -1;
           }
-          len = (unsigned char)*(buf->head->data+4);
-          if (buf->datalen < 7+len) /* addr/port there? */
+          len = (unsigned char)*(data+4);
+          if (datalen < 7+len) { /* addr/port there? */
+            *want_length_out = 7+len;
             return 0; /* not yet */
-          buf_pullup(buf, 7+len, 0);
-          tor_assert(buf->head->datalen >= 7+len);
+          }
           if (len+1 > MAX_SOCKS_ADDR_LEN) {
             log_warn(LD_APP,
                      "socks5 hostname is %d bytes, which doesn't fit in "
                      "%d. Rejecting.", len+1,MAX_SOCKS_ADDR_LEN);
             return -1;
           }
-          memcpy(req->address,buf->head->data+5,len);
+          memcpy(req->address,data+5,len);
           req->address[len] = 0;
-          req->port = ntohs(get_uint16(buf->head->data+5+len));
-          buf_remove_from_front(buf, 5+len+2);
+          req->port = ntohs(get_uint16(data+5+len));
+          *drain_out = 5+len+2;
           if (!tor_strisprint(req->address) || strchr(req->address,'\"')) {
             log_warn(LD_PROTOCOL,
                      "Your application (using socks5 to port %d) gave Tor "
@@ -1476,7 +1743,7 @@ fetch_from_buf_socks(buf_t *buf, socks_request_t *req,
           return 1;
         default: /* unsupported */
           log_warn(LD_APP,"socks5: unsupported address type %d. Rejecting.",
-                   (int) *(buf->head->data+3));
+                   (int) *(data+3));
           return -1;
       }
       tor_assert(0);
@@ -1485,10 +1752,12 @@ fetch_from_buf_socks(buf_t *buf, socks_request_t *req,
       /* http://archive.socks.permeo.com/protocol/socks4a.protocol */
 
       req->socks_version = 4;
-      if (buf->datalen < SOCKS4_NETWORK_LEN) /* basic info available? */
+      if (datalen < SOCKS4_NETWORK_LEN) {/* basic info available? */
+        *want_length_out = SOCKS4_NETWORK_LEN;
         return 0; /* not yet */
-      buf_pullup(buf, 1280, 0);
-      req->command = (unsigned char) *(buf->head->data+1);
+      }
+      // buf_pullup(buf, 1280, 0);
+      req->command = (unsigned char) *(data+1);
       if (req->command != SOCKS_COMMAND_CONNECT &&
           req->command != SOCKS_COMMAND_RESOLVE) {
         /* not a connect or resolve? we don't support it. (No resolve_ptr with
@@ -1498,8 +1767,8 @@ fetch_from_buf_socks(buf_t *buf, socks_request_t *req,
         return -1;
       }
 
-      req->port = ntohs(get_uint16(buf->head->data+2));
-      destip = ntohl(get_uint32(buf->head->data+4));
+      req->port = ntohs(get_uint16(data+2));
+      destip = ntohl(get_uint32(data+4));
       if ((!req->port && req->command!=SOCKS_COMMAND_RESOLVE) || !destip) {
         log_warn(LD_APP,"socks4: Port or DestIP is zero. Rejecting.");
         return -1;
@@ -1519,17 +1788,18 @@ fetch_from_buf_socks(buf_t *buf, socks_request_t *req,
         socks4_prot = socks4;
       }
 
-      next = memchr(buf->head->data+SOCKS4_NETWORK_LEN, 0,
-                    buf->head->datalen-SOCKS4_NETWORK_LEN);
+      next = memchr(data+SOCKS4_NETWORK_LEN, 0,
+                    datalen-SOCKS4_NETWORK_LEN);
       if (!next) {
-        if (buf->head->datalen >= 1024) {
+        if (datalen >= 1024) {
           log_debug(LD_APP, "Socks4 user name too long; rejecting.");
           return -1;
         }
         log_debug(LD_APP,"socks4: Username not here yet.");
+        *want_length_out = datalen+1024; /* ???? */
         return 0;
       }
-      tor_assert(next < CHUNK_WRITE_PTR(buf->head));
+      tor_assert(next < data+datalen);
 
       startaddr = NULL;
       if (socks4_prot != socks4a &&
@@ -1554,18 +1824,20 @@ fetch_from_buf_socks(buf_t *buf, socks_request_t *req,
           return -1;
       }
       if (socks4_prot == socks4a) {
-        if (next+1 == CHUNK_WRITE_PTR(buf->head)) {
+        if (next+1 == data+datalen) {
           log_debug(LD_APP,"socks4: No part of destaddr here yet.");
+          *want_length_out = datalen + 1024; /* More than we need, but safe */
           return 0;
         }
         startaddr = next+1;
-        next = memchr(startaddr, 0, CHUNK_WRITE_PTR(buf->head)-startaddr);
+        next = memchr(startaddr, 0, data + datalen - startaddr);
         if (!next) {
-          if (buf->head->datalen >= 1024) {
+          if (datalen >= 1024) {
             log_debug(LD_APP,"socks4: Destaddr too long.");
             return -1;
           }
           log_debug(LD_APP,"socks4: Destaddr not all here yet.");
+          *want_length_out = datalen + 1024;
           return 0;
         }
         if (MAX_SOCKS_ADDR_LEN <= next-startaddr) {
@@ -1591,7 +1863,7 @@ fetch_from_buf_socks(buf_t *buf, socks_request_t *req,
         return -1;
       }
       /* next points to the final \0 on inbuf */
-      buf_remove_from_front(buf, next - buf->head->data + 1);
+      *drain_out = next - data + 1;
       return 1;
 
     case 'G': /* get */
@@ -1629,9 +1901,10 @@ fetch_from_buf_socks(buf_t *buf, socks_request_t *req,
     default: /* version is not socks4 or socks5 */
       log_warn(LD_APP,
                "Socks version %d not recognized. (Tor is not an http proxy.)",
-               *(buf->head->data));
+               *(data));
       {
-        char *tmp = tor_strndup(buf->head->data, 8); /*XXXX what if longer?*/
+        /* Tell the controller the first 8 bytes. */
+        char *tmp = tor_strndup(data, datalen < 8 ? datalen : 8);
         control_event_client_status(LOG_WARN,
                                     "SOCKS_UNKNOWN_PROTOCOL DATA=\"%s\"",
                                     escaped(tmp));
@@ -1653,21 +1926,63 @@ fetch_from_buf_socks(buf_t *buf, socks_request_t *req,
 int
 fetch_from_buf_socks_client(buf_t *buf, int state, char **reason)
 {
-  unsigned char *data;
-  size_t addrlen;
-
+  ssize_t drain = 0;
+  int r;
   if (buf->datalen < 2)
     return 0;
 
   buf_pullup(buf, 128, 0);
   tor_assert(buf->head && buf->head->datalen >= 2);
 
-  data = (unsigned char *) buf->head->data;
+  r = parse_socks_client((uint8_t*)buf->head->data, buf->head->datalen,
+                         state, reason, &drain);
+  if (drain > 0)
+    buf_remove_from_front(buf, drain);
+  else if (drain < 0)
+    buf_clear(buf);
+
+  return r;
+}
+
+#ifdef USE_BUFFEREVENTS
+/** As fetch_from_buf_socks_client, buf works on an evbuffer */
+int
+fetch_from_evbuffer_socks_client(struct evbuffer *buf, int state,
+                                 char **reason)
+{
+  ssize_t drain = 0;
+  uint8_t *data;
+  size_t datalen;
+  int r;
+
+  data = evbuffer_pullup(buf, 128); /* Make sure we have at least 128
+                                     * contiguous bytes if possible. */
+  datalen = evbuffer_get_contiguous_space(buf);
+  r = parse_socks_client(data, datalen, state, reason, &drain);
+  if (drain > 0)
+    evbuffer_drain(buf, drain);
+  else
+    evbuffer_drain(buf, evbuffer_get_length(buf));
+
+  return r;
+}
+#endif
+
+/** Implementation logic for fetch_from_*_socks_client. */
+static int
+parse_socks_client(const uint8_t *data, size_t datalen,
+                   int state, char **reason,
+                   ssize_t *drain_out)
+{
+  unsigned int addrlen;
+  *drain_out = 0;
+  if (datalen < 2)
+    return 0;
 
   switch (state) {
     case PROXY_SOCKS4_WANT_CONNECT_OK:
       /* Wait for the complete response */
-      if (buf->head->datalen < 8)
+      if (datalen < 8)
         return 0;
 
       if (data[1] != 0x5a) {
@@ -1676,7 +1991,7 @@ fetch_from_buf_socks_client(buf_t *buf, int state, char **reason)
       }
 
       /* Success */
-      buf_remove_from_front(buf, 8);
+      *drain_out = 8;
       return 1;
 
     case PROXY_SOCKS5_WANT_AUTH_METHOD_NONE:
@@ -1688,7 +2003,7 @@ fetch_from_buf_socks_client(buf_t *buf, int state, char **reason)
       }
 
       log_info(LD_NET, "SOCKS 5 client: continuing without authentication");
-      buf_clear(buf);
+      *drain_out = -1;
       return 1;
 
     case PROXY_SOCKS5_WANT_AUTH_METHOD_RFC1929:
@@ -1698,11 +2013,11 @@ fetch_from_buf_socks_client(buf_t *buf, int state, char **reason)
         case 0x00:
           log_info(LD_NET, "SOCKS 5 client: we have auth details but server "
                             "doesn't require authentication.");
-          buf_clear(buf);
+          *drain_out = -1;
           return 1;
         case 0x02:
           log_info(LD_NET, "SOCKS 5 client: need authentication.");
-          buf_clear(buf);
+          *drain_out = -1;
           return 2;
         /* fall through */
       }
@@ -1719,7 +2034,7 @@ fetch_from_buf_socks_client(buf_t *buf, int state, char **reason)
       }
 
       log_info(LD_NET, "SOCKS 5 client: authentication successful.");
-      buf_clear(buf);
+      *drain_out = -1;
       return 1;
 
     case PROXY_SOCKS5_WANT_CONNECT_OK:
@@ -1728,7 +2043,7 @@ fetch_from_buf_socks_client(buf_t *buf, int state, char **reason)
        * the data used */
 
       /* wait for address type field to arrive */
-      if (buf->datalen < 4)
+      if (datalen < 4)
         return 0;
 
       switch (data[3]) {
@@ -1739,7 +2054,7 @@ fetch_from_buf_socks_client(buf_t *buf, int state, char **reason)
           addrlen = 16;
           break;
         case 0x03: /* fqdn (can this happen here?) */
-          if (buf->datalen < 5)
+          if (datalen < 5)
             return 0;
           addrlen = 1 + data[4];
           break;
@@ -1749,7 +2064,7 @@ fetch_from_buf_socks_client(buf_t *buf, int state, char **reason)
       }
 
       /* wait for address and port */
-      if (buf->datalen < 6 + addrlen)
+      if (datalen < 6 + addrlen)
         return 0;
 
       if (data[1] != 0x00) {
@@ -1757,7 +2072,7 @@ fetch_from_buf_socks_client(buf_t *buf, int state, char **reason)
         return -1;
       }
 
-      buf_remove_from_front(buf, 6 + addrlen);
+      *drain_out = 6 + addrlen;
       return 1;
   }
 
@@ -1783,6 +2098,27 @@ peek_buf_has_control0_command(buf_t *buf)
   return 0;
 }
 
+#ifdef USE_BUFFEREVENTS
+int
+peek_evbuffer_has_control0_command(struct evbuffer *buf)
+{
+  int result = 0;
+  if (evbuffer_get_length(buf) >= 4) {
+    int free_out = 0;
+    char *data = NULL;
+    size_t n = inspect_evbuffer(buf, &data, 4, &free_out, NULL);
+    uint16_t cmd;
+    tor_assert(n >= 4);
+    cmd = ntohs(get_uint16(data+2));
+    if (cmd <= 0x14)
+      result = 1;
+    if (free_out)
+      tor_free(data);
+  }
+  return result;
+}
+#endif
+
 /** Return the index within <b>buf</b> at which <b>ch</b> first appears,
  * or -1 if <b>ch</b> does not appear on buf. */
 static off_t
@@ -1800,12 +2136,12 @@ buf_find_offset_of_char(buf_t *buf, char ch)
   return -1;
 }
 
-/** Try to read a single LF-terminated line from <b>buf</b>, and write it,
- * NUL-terminated, into the *<b>data_len</b> byte buffer at <b>data_out</b>.
- * Set *<b>data_len</b> to the number of bytes in the line, not counting the
- * terminating NUL.  Return 1 if we read a whole line, return 0 if we don't
- * have a whole line yet, and return -1 if the line length exceeds
- * *<b>data_len</b>.
+/** Try to read a single LF-terminated line from <b>buf</b>, and write it
+ * (including the LF), NUL-terminated, into the *<b>data_len</b> byte buffer
+ * at <b>data_out</b>.  Set *<b>data_len</b> to the number of bytes in the
+ * line, not counting the terminating NUL.  Return 1 if we read a whole line,
+ * return 0 if we don't have a whole line yet, and return -1 if the line
+ * length exceeds *<b>data_len</b>.
  */
 int
 fetch_from_buf_line(buf_t *buf, char *data_out, size_t *data_len)
@@ -1879,6 +2215,61 @@ write_to_buf_zlib(buf_t *buf, tor_zlib_state_t *state,
   return 0;
 }
 
+#ifdef USE_BUFFEREVENTS
+int
+write_to_evbuffer_zlib(struct evbuffer *buf, tor_zlib_state_t *state,
+                       const char *data, size_t data_len,
+                       int done)
+{
+  char *next;
+  size_t old_avail, avail;
+  int over = 0, n;
+  struct evbuffer_iovec vec[1];
+  do {
+    int need_new_chunk = 0;
+    {
+      size_t cap = data_len / 4;
+      if (cap < 128)
+        cap = 128;
+      /* XXXX NM this strategy is fragmentation-prone. We should really have
+       * two iovecs, and write first into the one, and then into the
+       * second if the first gets full. */
+      n = evbuffer_reserve_space(buf, cap, vec, 1);
+      tor_assert(n == 1);
+    }
+
+    next = vec[0].iov_base;
+    avail = old_avail = vec[0].iov_len;
+
+    switch (tor_zlib_process(state, &next, &avail, &data, &data_len, done)) {
+      case TOR_ZLIB_DONE:
+        over = 1;
+        break;
+      case TOR_ZLIB_ERR:
+        return -1;
+      case TOR_ZLIB_OK:
+        if (data_len == 0)
+          over = 1;
+        break;
+      case TOR_ZLIB_BUF_FULL:
+        if (avail) {
+          /* Zlib says we need more room (ZLIB_BUF_FULL).  Start a new chunk
+           * automatically, whether were going to or not. */
+          need_new_chunk = 1;
+        }
+        break;
+    }
+
+    /* XXXX possible infinite loop on BUF_FULL. */
+    vec[0].iov_len = old_avail - avail;
+    evbuffer_commit_space(buf, vec, 1);
+
+  } while (!over);
+  check();
+  return 0;
+}
+#endif
+
 /** Log an error and exit if <b>buf</b> is corrupted.
  */
 void

+ 17 - 0
src/or/buffers.h

@@ -48,6 +48,23 @@ int fetch_from_buf_line(buf_t *buf, char *data_out, size_t *data_len);
 
 int peek_buf_has_control0_command(buf_t *buf);
 
+#ifdef USE_BUFFEREVENTS
+int fetch_var_cell_from_evbuffer(struct evbuffer *buf, var_cell_t **out,
+                                 int linkproto);
+int fetch_from_evbuffer_socks(struct evbuffer *buf, socks_request_t *req,
+                              int log_sockstype, int safe_socks);
+int fetch_from_evbuffer_socks_client(struct evbuffer *buf, int state,
+                                     char **reason);
+int fetch_from_evbuffer_http(struct evbuffer *buf,
+                        char **headers_out, size_t max_headerlen,
+                        char **body_out, size_t *body_used, size_t max_bodylen,
+                        int force_complete);
+int peek_evbuffer_has_control0_command(struct evbuffer *buf);
+int write_to_evbuffer_zlib(struct evbuffer *buf, tor_zlib_state_t *state,
+                           const char *data, size_t data_len,
+                           int done);
+#endif
+
 void assert_buf_ok(buf_t *buf);
 
 #ifdef BUFFERS_PRIVATE

+ 11 - 0
src/or/config.c

@@ -1231,6 +1231,17 @@ options_act(or_options_t *old_options)
   if (accounting_is_enabled(options))
     configure_accounting(time(NULL));
 
+#ifdef USE_BUFFEREVENTS
+  /* If we're using the bufferevents implementation and our rate limits
+   * changed, we need to tell the rate-limiting system about it. */
+  if (!old_options ||
+      old_options->BandwidthRate != options->BandwidthRate ||
+      old_options->BandwidthBurst != options->BandwidthBurst ||
+      old_options->RelayBandwidthRate != options->RelayBandwidthRate ||
+      old_options->RelayBandwidthBurst != options->RelayBandwidthBurst)
+    connection_bucket_init();
+#endif
+
   /* Change the cell EWMA settings */
   cell_ewma_set_scale_factor(options, networkstatus_get_latest_consensus());
 

+ 412 - 27
src/or/connection.c

@@ -36,6 +36,10 @@
 #include "router.h"
 #include "routerparse.h"
 
+#ifdef USE_BUFFEREVENTS
+#include <event2/event.h>
+#endif
+
 static connection_t *connection_create_listener(
                                struct sockaddr *listensockaddr,
                                socklen_t listensocklen, int type,
@@ -45,8 +49,10 @@ static void connection_init(time_t now, connection_t *conn, int type,
 static int connection_init_accepted_conn(connection_t *conn,
                                          uint8_t listener_type);
 static int connection_handle_listener_read(connection_t *conn, int new_type);
+#ifndef USE_BUFFEREVENTS
 static int connection_bucket_should_increase(int bucket,
                                              or_connection_t *conn);
+#endif
 static int connection_finished_flushing(connection_t *conn);
 static int connection_flushed_some(connection_t *conn);
 static int connection_finished_connecting(connection_t *conn);
@@ -183,6 +189,26 @@ conn_state_to_string(int type, int state)
   return buf;
 }
 
+#ifdef USE_BUFFEREVENTS
+/** Return true iff the connection's type is one that can use a
+    bufferevent-based implementation. */
+int
+connection_type_uses_bufferevent(connection_t *conn)
+{
+  switch (conn->type) {
+    case CONN_TYPE_AP:
+    case CONN_TYPE_EXIT:
+    case CONN_TYPE_DIR:
+    case CONN_TYPE_CONTROL:
+    case CONN_TYPE_OR:
+    case CONN_TYPE_CPUWORKER:
+      return 1;
+    default:
+      return 0;
+  }
+}
+#endif
+
 /** Allocate and return a new dir_connection_t, initialized as by
  * connection_init(). */
 dir_connection_t *
@@ -308,10 +334,13 @@ connection_init(time_t now, connection_t *conn, int type, int socket_family)
 
   conn->type = type;
   conn->socket_family = socket_family;
-  if (!connection_is_listener(conn)) { /* listeners never use their buf */
+#ifndef USE_BUFFEREVENTS
+  if (!connection_is_listener(conn)) {
+    /* listeners never use their buf */
     conn->inbuf = buf_new();
     conn->outbuf = buf_new();
   }
+#endif
 
   conn->timestamp_created = now;
   conn->timestamp_lastread = now;
@@ -377,7 +406,8 @@ _connection_free(connection_t *conn)
              "bytes on inbuf, %d on outbuf.",
              conn_type_to_string(conn->type),
              conn_state_to_string(conn->type, conn->state),
-             (int)buf_datalen(conn->inbuf), (int)buf_datalen(conn->outbuf));
+             (int)connection_get_inbuf_len(conn),
+             (int)connection_get_outbuf_len(conn));
   }
 
   if (!connection_is_listener(conn)) {
@@ -424,6 +454,15 @@ _connection_free(connection_t *conn)
 
   tor_free(conn->read_event); /* Probably already freed by connection_free. */
   tor_free(conn->write_event); /* Probably already freed by connection_free. */
+  IF_HAS_BUFFEREVENT(conn, {
+      /* This was a workaround to handle bugs in some old versions of libevent
+       * where callbacks can occur after calling bufferevent_free().  Setting
+       * the callbacks to NULL prevented this.  It shouldn't be necessary any
+       * more, but let's not tempt fate for now.  */
+      bufferevent_setcb(conn->bufev, NULL, NULL, NULL, NULL);
+      bufferevent_free(conn->bufev);
+      conn->bufev = NULL;
+  });
 
   if (conn->type == CONN_TYPE_DIR) {
     dir_connection_t *dir_conn = TO_DIR_CONN(conn);
@@ -450,6 +489,11 @@ _connection_free(connection_t *conn)
     log_warn(LD_BUG, "called on OR conn with non-zeroed identity_digest");
     connection_or_remove_from_identity_map(TO_OR_CONN(conn));
   }
+#ifdef USE_BUFFEREVENTS
+  if (conn->type == CONN_TYPE_OR && TO_OR_CONN(conn)->bucket_cfg) {
+    ev_token_bucket_cfg_free(TO_OR_CONN(conn)->bucket_cfg);
+  }
+#endif
 
   memset(mem, 0xCC, memlen); /* poison memory */
   tor_free(mem);
@@ -675,10 +719,9 @@ connection_close_immediate(connection_t *conn)
   conn->s = -1;
   if (conn->linked)
     conn->linked_conn_is_closed = 1;
-  if (!connection_is_listener(conn)) {
+  if (conn->outbuf)
     buf_clear(conn->outbuf);
-    conn->outbuf_flushlen = 0;
-  }
+  conn->outbuf_flushlen = 0;
 }
 
 /** Mark <b>conn</b> to be closed next time we loop through
@@ -1319,7 +1362,7 @@ connection_connect(connection_t *conn, const char *address,
          escaped_safe_str_client(address),
          port, inprogress?"in progress":"established", s);
   conn->s = s;
-  if (connection_add(conn) < 0) /* no space, forget it */
+  if (connection_add_connecting(conn) < 0) /* no space, forget it */
     return -1;
   return inprogress ? 0 : 1;
 }
@@ -1542,6 +1585,19 @@ connection_send_socks5_connect(connection_t *conn)
   conn->proxy_state = PROXY_SOCKS5_WANT_CONNECT_OK;
 }
 
+/** DOCDOC */
+static int
+connection_fetch_from_buf_socks_client(connection_t *conn,
+                                       int state, char **reason)
+{
+  IF_HAS_BUFFEREVENT(conn, {
+    struct evbuffer *input = bufferevent_get_input(conn->bufev);
+    return fetch_from_evbuffer_socks_client(input, state, reason);
+  }) ELSE_IF_NO_BUFFEREVENT {
+    return fetch_from_buf_socks_client(conn->inbuf, state, reason);
+  }
+}
+
 /** Call this from connection_*_process_inbuf() to advance the proxy
  * handshake.
  *
@@ -1569,17 +1625,17 @@ connection_read_proxy_handshake(connection_t *conn)
       break;
 
     case PROXY_SOCKS4_WANT_CONNECT_OK:
-      ret = fetch_from_buf_socks_client(conn->inbuf,
-                                        conn->proxy_state,
-                                        &reason);
+      ret = connection_fetch_from_buf_socks_client(conn,
+                                                   conn->proxy_state,
+                                                   &reason);
       if (ret == 1)
         conn->proxy_state = PROXY_CONNECTED;
       break;
 
     case PROXY_SOCKS5_WANT_AUTH_METHOD_NONE:
-      ret = fetch_from_buf_socks_client(conn->inbuf,
-                                        conn->proxy_state,
-                                        &reason);
+      ret = connection_fetch_from_buf_socks_client(conn,
+                                                   conn->proxy_state,
+                                                   &reason);
       /* no auth needed, do connect */
       if (ret == 1) {
         connection_send_socks5_connect(conn);
@@ -1588,9 +1644,9 @@ connection_read_proxy_handshake(connection_t *conn)
       break;
 
     case PROXY_SOCKS5_WANT_AUTH_METHOD_RFC1929:
-      ret = fetch_from_buf_socks_client(conn->inbuf,
-                                        conn->proxy_state,
-                                        &reason);
+      ret = connection_fetch_from_buf_socks_client(conn,
+                                                   conn->proxy_state,
+                                                   &reason);
 
       /* send auth if needed, otherwise do connect */
       if (ret == 1) {
@@ -1625,9 +1681,9 @@ connection_read_proxy_handshake(connection_t *conn)
       break;
 
     case PROXY_SOCKS5_WANT_AUTH_RFC1929_OK:
-      ret = fetch_from_buf_socks_client(conn->inbuf,
-                                        conn->proxy_state,
-                                        &reason);
+      ret = connection_fetch_from_buf_socks_client(conn,
+                                                   conn->proxy_state,
+                                                   &reason);
       /* send the connect request */
       if (ret == 1) {
         connection_send_socks5_connect(conn);
@@ -1636,9 +1692,9 @@ connection_read_proxy_handshake(connection_t *conn)
       break;
 
     case PROXY_SOCKS5_WANT_CONNECT_OK:
-      ret = fetch_from_buf_socks_client(conn->inbuf,
-                                        conn->proxy_state,
-                                        &reason);
+      ret = connection_fetch_from_buf_socks_client(conn,
+                                                   conn->proxy_state,
+                                                   &reason);
       if (ret == 1)
         conn->proxy_state = PROXY_CONNECTED;
       break;
@@ -1902,6 +1958,9 @@ connection_is_rate_limited(connection_t *conn)
     return 1;
 }
 
+#ifdef USE_BUFFEREVENTS
+static struct bufferevent_rate_limit_group *global_rate_limit = NULL;
+#else
 extern int global_read_bucket, global_write_bucket;
 extern int global_relayed_read_bucket, global_relayed_write_bucket;
 
@@ -1909,11 +1968,13 @@ extern int global_relayed_read_bucket, global_relayed_write_bucket;
  * we are likely to run dry again this second, so be stingy with the
  * tokens we just put in. */
 static int write_buckets_empty_last_second = 0;
+#endif
 
 /** How many seconds of no active local circuits will make the
  * connection revert to the "relayed" bandwidth class? */
 #define CLIENT_IDLE_TIME_FOR_PRIORITY 30
 
+#ifndef USE_BUFFEREVENTS
 /** Return 1 if <b>conn</b> should use tokens from the "relayed"
  * bandwidth rates, else 0. Currently, only OR conns with bandwidth
  * class 1, and directory conns that are serving data out, count.
@@ -2024,6 +2085,20 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
   return connection_bucket_round_robin(base, priority,
                                        global_bucket, conn_bucket);
 }
+#else
+static ssize_t
+connection_bucket_read_limit(connection_t *conn, time_t now)
+{
+  (void) now;
+  return bufferevent_get_max_to_read(conn->bufev);
+}
+ssize_t
+connection_bucket_write_limit(connection_t *conn, time_t now)
+{
+  (void) now;
+  return bufferevent_get_max_to_write(conn->bufev);
+}
+#endif
 
 /** Return 1 if the global write buckets are low enough that we
  * shouldn't send <b>attempt</b> bytes of low-priority directory stuff
@@ -2048,8 +2123,12 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
 int
 global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
 {
+#ifdef USE_BUFFEREVENTS
+  ssize_t smaller_bucket = bufferevent_get_max_to_write(conn->bufev);
+#else
   int smaller_bucket = global_write_bucket < global_relayed_write_bucket ?
                        global_write_bucket : global_relayed_write_bucket;
+#endif
   if (authdir_mode(get_options()) && priority>1)
     return 0; /* there's always room to answer v2 if we're an auth dir */
 
@@ -2059,8 +2138,10 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
   if (smaller_bucket < (int)attempt)
     return 1; /* not enough space no matter the priority */
 
+#ifndef USE_BUFFEREVENTS
   if (write_buckets_empty_last_second)
     return 1; /* we're already hitting our limits, no more please */
+#endif
 
   if (priority == 1) { /* old-style v1 query */
     /* Could we handle *two* of these requests within the next two seconds? */
@@ -2076,6 +2157,7 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
   return 0;
 }
 
+#ifndef USE_BUFFEREVENTS
 /** We just read <b>num_read</b> and wrote <b>num_written</b> bytes
  * onto <b>conn</b>. Decrement buckets appropriately. */
 static void
@@ -2319,6 +2401,88 @@ connection_bucket_should_increase(int bucket, or_connection_t *conn)
 
   return 1;
 }
+#else
+
+static void
+connection_buckets_decrement(connection_t *conn, time_t now,
+                             size_t num_read, size_t num_written)
+{
+  (void) conn;
+  (void) now;
+  (void) num_read;
+  (void) num_written;
+  /* Libevent does this for us. */
+}
+void
+connection_bucket_refill(int seconds_elapsed, time_t now)
+{
+  (void) seconds_elapsed;
+  (void) now;
+  /* Libevent does this for us. */
+}
+void
+connection_bucket_init(void)
+{
+  or_options_t *options = get_options();
+  const struct timeval *tick = tor_libevent_get_one_tick_timeout();
+  struct ev_token_bucket_cfg *bucket_cfg;
+
+  uint64_t rate, burst;
+  if (options->RelayBandwidthRate) {
+    rate = options->RelayBandwidthRate;
+    burst = options->RelayBandwidthBurst;
+  } else {
+    rate = options->BandwidthRate;
+    burst = options->BandwidthBurst;
+  }
+
+  rate /= TOR_LIBEVENT_TICKS_PER_SECOND;
+  bucket_cfg = ev_token_bucket_cfg_new((uint32_t)rate, (uint32_t)burst,
+                                       (uint32_t)rate, (uint32_t)burst,
+                                       tick);
+
+  if (!global_rate_limit) {
+    global_rate_limit =
+      bufferevent_rate_limit_group_new(tor_libevent_get_base(), bucket_cfg);
+  } else {
+    bufferevent_rate_limit_group_set_cfg(global_rate_limit, bucket_cfg);
+  }
+  ev_token_bucket_cfg_free(bucket_cfg);
+}
+
+void
+connection_get_rate_limit_totals(uint64_t *read_out, uint64_t *written_out)
+{
+  if (global_rate_limit == NULL) {
+    *read_out = *written_out = 0;
+  } else {
+    bufferevent_rate_limit_group_get_totals(
+      global_rate_limit, read_out, written_out);
+  }
+}
+
+/** DOCDOC */
+void
+connection_enable_rate_limiting(connection_t *conn)
+{
+  if (conn->bufev) {
+    if (!global_rate_limit)
+      connection_bucket_init();
+    bufferevent_add_to_rate_limit_group(conn->bufev, global_rate_limit);
+  }
+}
+
+static void
+connection_consider_empty_write_buckets(connection_t *conn)
+{
+  (void) conn;
+}
+static void
+connection_consider_empty_read_buckets(connection_t *conn)
+{
+  (void) conn;
+}
+#endif
 
 /** Read bytes from conn-\>s and process them.
  *
@@ -2572,13 +2736,13 @@ connection_read_to_buf(connection_t *conn, int *max_to_read, int *socket_error)
   }
 
   if (n_read > 0) { /* change *max_to_read */
-    /*XXXX021 check for overflow*/
+    /*XXXX022 check for overflow*/
     *max_to_read = (int)(at_most - n_read);
   }
 
   if (conn->type == CONN_TYPE_AP) {
     edge_connection_t *edge_conn = TO_EDGE_CONN(conn);
-    /*XXXX021 check for overflow*/
+    /*XXXX022 check for overflow*/
     edge_conn->n_read += (int)n_read;
   }
 
@@ -2600,11 +2764,202 @@ connection_read_to_buf(connection_t *conn, int *max_to_read, int *socket_error)
   return 0;
 }
 
+#ifdef USE_BUFFEREVENTS
+/* XXXX These generic versions could be simplified by making them
+   type-specific */
+
+/** Callback: Invoked whenever bytes are added to or drained from an input
+ * evbuffer.  Used to track the number of bytes read. */
+static void
+evbuffer_inbuf_callback(struct evbuffer *buf,
+                        const struct evbuffer_cb_info *info, void *arg)
+{
+  connection_t *conn = arg;
+  (void) buf;
+  /* XXXX These need to get real counts on the non-nested TLS case. - NM */
+  if (info->n_added) {
+    time_t now = approx_time();
+    conn->timestamp_lastread = now;
+    connection_buckets_decrement(conn, now, info->n_added, 0);
+    connection_consider_empty_read_buckets(conn);
+    if (conn->type == CONN_TYPE_AP) {
+      edge_connection_t *edge_conn = TO_EDGE_CONN(conn);
+      /*XXXX022 check for overflow*/
+      edge_conn->n_read += (int)info->n_added;
+    }
+  }
+}
+
+/** Callback: Invoked whenever bytes are added to or drained from an output
+ * evbuffer.  Used to track the number of bytes written. */
+static void
+evbuffer_outbuf_callback(struct evbuffer *buf,
+                         const struct evbuffer_cb_info *info, void *arg)
+{
+  connection_t *conn = arg;
+  (void)buf;
+  if (info->n_deleted) {
+    time_t now = approx_time();
+    conn->timestamp_lastwritten = now;
+    connection_buckets_decrement(conn, now, 0, info->n_deleted);
+    connection_consider_empty_write_buckets(conn);
+    if (conn->type == CONN_TYPE_AP) {
+      edge_connection_t *edge_conn = TO_EDGE_CONN(conn);
+      /*XXXX022 check for overflow*/
+      edge_conn->n_written += (int)info->n_deleted;
+    }
+  }
+}
+
+/** Callback: invoked whenever a bufferevent has read data. */
+void
+connection_handle_read_cb(struct bufferevent *bufev, void *arg)
+{
+  connection_t *conn = arg;
+  (void) bufev;
+  if (!conn->marked_for_close)
+    if (connection_process_inbuf(conn, 1)<0) /* XXXX Always 1? */
+      connection_mark_for_close(conn);
+}
+
+/** Callback: invoked whenever a bufferevent has written data. */
+void
+connection_handle_write_cb(struct bufferevent *bufev, void *arg)
+{
+  connection_t *conn = arg;
+  struct evbuffer *output;
+  if (connection_flushed_some(conn)<0) {
+    connection_mark_for_close(conn);
+    return;
+  }
+
+  output = bufferevent_get_output(bufev);
+  if (!evbuffer_get_length(output)) {
+    connection_finished_flushing(conn);
+    if (conn->marked_for_close && conn->hold_open_until_flushed) {
+      conn->hold_open_until_flushed = 0;
+      if (conn->linked) {
+        /* send eof */
+        bufferevent_flush(conn->bufev, EV_WRITE, BEV_FINISHED);
+      }
+    }
+  }
+}
+
+/** Callback: invoked whenever a bufferevent has had an event (like a
+ * connection, or an eof, or an error) occur. */
+void
+connection_handle_event_cb(struct bufferevent *bufev, short event, void *arg)
+{
+  connection_t *conn = arg;
+  (void) bufev;
+  if (event & BEV_EVENT_CONNECTED) {
+    tor_assert(connection_state_is_connecting(conn));
+    if (connection_finished_connecting(conn)<0)
+      return;
+  }
+  if (event & BEV_EVENT_EOF) {
+    if (!conn->marked_for_close) {
+      conn->inbuf_reached_eof = 1;
+      if (connection_reached_eof(conn)<0)
+        return;
+    }
+  }
+  if (event & BEV_EVENT_ERROR) {
+    int socket_error = evutil_socket_geterror(conn->s);
+    if (conn->type == CONN_TYPE_OR &&
+        conn->state == OR_CONN_STATE_CONNECTING) {
+      connection_or_connect_failed(TO_OR_CONN(conn),
+                                   errno_to_orconn_end_reason(socket_error),
+                                   tor_socket_strerror(socket_error));
+    } else if (CONN_IS_EDGE(conn)) {
+      edge_connection_t *edge_conn = TO_EDGE_CONN(conn);
+      if (!edge_conn->edge_has_sent_end)
+        connection_edge_end_errno(edge_conn);
+      if (edge_conn->socks_request) /* broken, don't send a socks reply back */
+        edge_conn->socks_request->has_finished = 1;
+    }
+    connection_close_immediate(conn); /* Connection is dead. */
+    if (!conn->marked_for_close)
+      connection_mark_for_close(conn);
+  }
+}
+
+/** Set up the generic callbacks for the bufferevent on <b>conn</b>. */
+void
+connection_configure_bufferevent_callbacks(connection_t *conn)
+{
+  struct bufferevent *bufev;
+  struct evbuffer *input, *output;
+  tor_assert(conn->bufev);
+  bufev = conn->bufev;
+  bufferevent_setcb(bufev,
+                    connection_handle_read_cb,
+                    connection_handle_write_cb,
+                    connection_handle_event_cb,
+                    conn);
+
+  input = bufferevent_get_input(bufev);
+  output = bufferevent_get_output(bufev);
+  evbuffer_add_cb(input, evbuffer_inbuf_callback, conn);
+  evbuffer_add_cb(output, evbuffer_outbuf_callback, conn);
+}
+#endif
+
 /** A pass-through to fetch_from_buf. */
 int
 connection_fetch_from_buf(char *string, size_t len, connection_t *conn)
 {
-  return fetch_from_buf(string, len, conn->inbuf);
+  IF_HAS_BUFFEREVENT(conn, {
+    /* XXX overflow -seb */
+    return (int)bufferevent_read(conn->bufev, string, len);
+  }) ELSE_IF_NO_BUFFEREVENT {
+    return fetch_from_buf(string, len, conn->inbuf);
+  }
+}
+
+/** As fetch_from_buf_line(), but read from a connection's input buffer. */
+int
+connection_fetch_from_buf_line(connection_t *conn, char *data,
+                               size_t *data_len)
+{
+  IF_HAS_BUFFEREVENT(conn, {
+    int r;
+    size_t eol_len=0;
+    struct evbuffer *input = bufferevent_get_input(conn->bufev);
+    struct evbuffer_ptr ptr =
+      evbuffer_search_eol(input, NULL, &eol_len, EVBUFFER_EOL_LF);
+    if (ptr.pos == -1)
+      return 0; /* No EOL found. */
+    if ((size_t)ptr.pos+eol_len >= *data_len) {
+      return -1; /* Too long */
+    }
+    *data_len = ptr.pos+eol_len;
+    r = evbuffer_remove(input, data, ptr.pos+eol_len);
+    tor_assert(r >= 0);
+    data[ptr.pos+eol_len] = '\0';
+    return 1;
+  }) ELSE_IF_NO_BUFFEREVENT {
+    return fetch_from_buf_line(conn->inbuf, data, data_len);
+  }
+}
+
+/** As fetch_from_buf_http, but fetches from a conncetion's input buffer_t or
+ * its bufferevent as appropriate. */
+int
+connection_fetch_from_buf_http(connection_t *conn,
+                               char **headers_out, size_t max_headerlen,
+                               char **body_out, size_t *body_used,
+                               size_t max_bodylen, int force_complete)
+{
+  IF_HAS_BUFFEREVENT(conn, {
+    struct evbuffer *input = bufferevent_get_input(conn->bufev);
+    return fetch_from_evbuffer_http(input, headers_out, max_headerlen,
+                            body_out, body_used, max_bodylen, force_complete);
+  }) ELSE_IF_NO_BUFFEREVENT {
+    return fetch_from_buf_http(conn->inbuf, headers_out, max_headerlen,
+                            body_out, body_used, max_bodylen, force_complete);
+  }
 }
 
 /** Return conn-\>outbuf_flushlen: how many bytes conn wants to flush
@@ -2725,6 +3080,7 @@ connection_handle_write_impl(connection_t *conn, int force)
 
     /* If we just flushed the last bytes, check if this tunneled dir
      * request is done. */
+    /* XXXX move this to flushed_some or finished_flushing -NM */
     if (buf_datalen(conn->outbuf) == 0 && conn->dirreq_id)
       geoip_change_dirreq_state(conn->dirreq_id, DIRREQ_TUNNELED,
                                 DIRREQ_OR_CONN_BUFFER_FLUSHED);
@@ -2780,7 +3136,7 @@ connection_handle_write_impl(connection_t *conn, int force)
 
   if (conn->type == CONN_TYPE_AP) {
     edge_connection_t *edge_conn = TO_EDGE_CONN(conn);
-    /*XXXX021 check for overflow.*/
+    /*XXXX022 check for overflow.*/
     edge_conn->n_written += (int)n_written;
   }
 
@@ -2853,6 +3209,22 @@ _connection_write_to_buf_impl(const char *string, size_t len,
   if (conn->marked_for_close && !conn->hold_open_until_flushed)
     return;
 
+  IF_HAS_BUFFEREVENT(conn, {
+    if (zlib) {
+      int done = zlib < 0;
+      r = write_to_evbuffer_zlib(bufferevent_get_output(conn->bufev),
+                                 TO_DIR_CONN(conn)->zlib_state,
+                                 string, len, done);
+    } else {
+      r = bufferevent_write(conn->bufev, string, len);
+    }
+    if (r < 0) {
+      /* XXXX mark for close? */
+      log_warn(LD_NET, "bufferevent_write failed! That shouldn't happen.");
+    }
+    return;
+  });
+
   old_datalen = buf_datalen(conn->outbuf);
   if (zlib) {
     dir_connection_t *dir_conn = TO_DIR_CONN(conn);
@@ -3264,6 +3636,9 @@ connection_finished_flushing(connection_t *conn)
 
 //  log_fn(LOG_DEBUG,"entered. Socket %u.", conn->s);
 
+  IF_HAS_NO_BUFFEREVENT(conn)
+    connection_stop_writing(conn);
+
   switch (conn->type) {
     case CONN_TYPE_OR:
       return connection_or_finished_flushing(TO_OR_CONN(conn));
@@ -3389,6 +3764,16 @@ assert_connection_ok(connection_t *conn, time_t now)
   tor_assert(conn);
   tor_assert(conn->type >= _CONN_TYPE_MIN);
   tor_assert(conn->type <= _CONN_TYPE_MAX);
+
+#ifdef USE_BUFFEREVENTS
+  if (conn->bufev) {
+    tor_assert(conn->read_event == NULL);
+    tor_assert(conn->write_event == NULL);
+    tor_assert(conn->inbuf == NULL);
+    tor_assert(conn->outbuf == NULL);
+  }
+#endif
+
   switch (conn->type) {
     case CONN_TYPE_OR:
       tor_assert(conn->magic == OR_CONNECTION_MAGIC);
@@ -3427,10 +3812,10 @@ assert_connection_ok(connection_t *conn, time_t now)
    * marked_for_close. */
 
   /* buffers */
-  if (!connection_is_listener(conn)) {
+  if (conn->inbuf)
     assert_buf_ok(conn->inbuf);
+  if (conn->outbuf)
     assert_buf_ok(conn->outbuf);
-  }
 
   if (conn->type == CONN_TYPE_OR) {
     or_connection_t *or_conn = TO_OR_CONN(conn);

+ 55 - 0
src/or/connection.h

@@ -12,6 +12,9 @@
 #ifndef _TOR_CONNECTION_H
 #define _TOR_CONNECTION_H
 
+/* XXXX For buf_datalen in inline function */
+#include "buffers.h"
+
 const char *conn_type_to_string(int type);
 const char *conn_state_to_string(int type, int state);
 
@@ -31,6 +34,15 @@ void _connection_mark_for_close(connection_t *conn,int line, const char *file);
 #define connection_mark_for_close(c) \
   _connection_mark_for_close((c), __LINE__, _SHORT_FILE_)
 
+#define connection_mark_and_flush(c)                                    \
+  do {                                                                  \
+    connection_t *tmp_conn_ = (c);                                      \
+    _connection_mark_for_close(tmp_conn_, __LINE__, _SHORT_FILE_);      \
+    tmp_conn_->hold_open_until_flushed = 1;                             \
+    IF_HAS_BUFFEREVENT(tmp_conn_,                                       \
+                       connection_start_writing(tmp_conn_));            \
+  } while (0)
+
 void connection_expire_held_open(void);
 
 int connection_connect(connection_t *conn, const char *address,
@@ -51,6 +63,12 @@ void connection_bucket_refill(int seconds_elapsed, time_t now);
 int connection_handle_read(connection_t *conn);
 
 int connection_fetch_from_buf(char *string, size_t len, connection_t *conn);
+int connection_fetch_from_buf_line(connection_t *conn, char *data,
+                                   size_t *data_len);
+int connection_fetch_from_buf_http(connection_t *conn,
+                               char **headers_out, size_t max_headerlen,
+                               char **body_out, size_t *body_used,
+                               size_t max_bodylen, int force_complete);
 
 int connection_wants_to_flush(connection_t *conn);
 int connection_outbuf_too_full(connection_t *conn);
@@ -73,6 +91,29 @@ connection_write_to_buf_zlib(const char *string, size_t len,
   _connection_write_to_buf_impl(string, len, TO_CONN(conn), done ? -1 : 1);
 }
 
+static size_t connection_get_inbuf_len(connection_t *conn);
+static size_t connection_get_outbuf_len(connection_t *conn);
+
+static INLINE size_t
+connection_get_inbuf_len(connection_t *conn)
+{
+  IF_HAS_BUFFEREVENT(conn, {
+    return evbuffer_get_length(bufferevent_get_input(conn->bufev));
+  }) ELSE_IF_NO_BUFFEREVENT {
+    return conn->inbuf ? buf_datalen(conn->inbuf) : 0;
+  }
+}
+
+static INLINE size_t
+connection_get_outbuf_len(connection_t *conn)
+{
+  IF_HAS_BUFFEREVENT(conn, {
+    return evbuffer_get_length(bufferevent_get_output(conn->bufev));
+  }) ELSE_IF_NO_BUFFEREVENT {
+    return conn->outbuf ? buf_datalen(conn->outbuf) : 0;
+  }
+}
+
 connection_t *connection_get_by_global_id(uint64_t id);
 
 connection_t *connection_get_by_type(int type);
@@ -96,5 +137,19 @@ int connection_or_nonopen_was_started_here(or_connection_t *conn);
 void connection_dump_buffer_mem_stats(int severity);
 void remove_file_if_very_old(const char *fname, time_t now);
 
+#ifdef USE_BUFFEREVENTS
+int connection_type_uses_bufferevent(connection_t *conn);
+void connection_configure_bufferevent_callbacks(connection_t *conn);
+void connection_handle_read_cb(struct bufferevent *bufev, void *arg);
+void connection_handle_write_cb(struct bufferevent *bufev, void *arg);
+void connection_handle_event_cb(struct bufferevent *bufev, short event,
+                                 void *arg);
+void connection_get_rate_limit_totals(uint64_t *read_out,
+                                      uint64_t *written_out);
+void connection_enable_rate_limiting(connection_t *conn);
+#else
+#define connection_type_uses_bufferevent(c) (0)
+#endif
+
 #endif
 

+ 28 - 15
src/or/connection_edge.c

@@ -92,6 +92,7 @@ _connection_mark_unattached_ap(edge_connection_t *conn, int endreason,
 
   _connection_mark_for_close(TO_CONN(conn), line, file);
   conn->_base.hold_open_until_flushed = 1;
+  IF_HAS_BUFFEREVENT(TO_CONN(conn), connection_start_writing(TO_CONN(conn)));
   conn->end_reason = endreason;
 }
 
@@ -100,7 +101,7 @@ _connection_mark_unattached_ap(edge_connection_t *conn, int endreason,
 int
 connection_edge_reached_eof(edge_connection_t *conn)
 {
-  if (buf_datalen(conn->_base.inbuf) &&
+  if (connection_get_inbuf_len(TO_CONN(conn)) &&
       connection_state_is_open(TO_CONN(conn))) {
     /* it still has stuff to process. don't let it die yet. */
     return 0;
@@ -191,8 +192,7 @@ connection_edge_destroy(circid_t circ_id, edge_connection_t *conn)
       conn->edge_has_sent_end = 1;
       conn->end_reason = END_STREAM_REASON_DESTROY;
       conn->end_reason |= END_STREAM_REASON_FLAG_ALREADY_SENT_CLOSED;
-      connection_mark_for_close(TO_CONN(conn));
-      conn->_base.hold_open_until_flushed = 1;
+      connection_mark_and_flush(TO_CONN(conn));
     }
   }
   conn->cpath_layer = NULL;
@@ -319,7 +319,6 @@ connection_edge_finished_flushing(edge_connection_t *conn)
   switch (conn->_base.state) {
     case AP_CONN_STATE_OPEN:
     case EXIT_CONN_STATE_OPEN:
-      connection_stop_writing(TO_CONN(conn));
       connection_edge_consider_sending_sendme(conn);
       return 0;
     case AP_CONN_STATE_SOCKS_WAIT:
@@ -328,7 +327,7 @@ connection_edge_finished_flushing(edge_connection_t *conn)
     case AP_CONN_STATE_CIRCUIT_WAIT:
     case AP_CONN_STATE_CONNECT_WAIT:
     case AP_CONN_STATE_CONTROLLER_WAIT:
-      connection_stop_writing(TO_CONN(conn));
+    case AP_CONN_STATE_RESOLVE_WAIT:
       return 0;
     default:
       log_warn(LD_BUG, "Called in unexpected state %d.",conn->_base.state);
@@ -358,8 +357,9 @@ connection_edge_finished_connecting(edge_connection_t *edge_conn)
   rep_hist_note_exit_stream_opened(conn->port);
 
   conn->state = EXIT_CONN_STATE_OPEN;
-  connection_watch_events(conn, READ_EVENT); /* stop writing, keep reading */
-  if (connection_wants_to_flush(conn)) /* in case there are any queued relay
+  IF_HAS_NO_BUFFEREVENT(conn)
+    connection_watch_events(conn, READ_EVENT); /* stop writing, keep reading */
+  if (connection_get_outbuf_len(conn)) /* in case there are any queued relay
                                         * cells */
     connection_start_writing(conn);
   /* deliver a 'connected' relay cell back through the circuit. */
@@ -1895,8 +1895,14 @@ connection_ap_handshake_process_socks(edge_connection_t *conn)
 
   log_debug(LD_APP,"entered.");
 
-  sockshere = fetch_from_buf_socks(conn->_base.inbuf, socks,
-                                   options->TestSocks, options->SafeSocks);
+  IF_HAS_BUFFEREVENT(TO_CONN(conn), {
+    struct evbuffer *input = bufferevent_get_input(conn->_base.bufev);
+    sockshere = fetch_from_evbuffer_socks(input, socks,
+                                     options->TestSocks, options->SafeSocks);
+  }) ELSE_IF_NO_BUFFEREVENT {
+    sockshere = fetch_from_buf_socks(conn->_base.inbuf, socks,
+                                     options->TestSocks, options->SafeSocks);
+  };
   if (sockshere == 0) {
     if (socks->replylen) {
       connection_write_to_buf(socks->reply, socks->replylen, TO_CONN(conn));
@@ -1997,7 +2003,7 @@ connection_ap_process_natd(edge_connection_t *conn)
 
   /* look for LF-terminated "[DEST ip_addr port]"
    * where ip_addr is a dotted-quad and port is in string form */
-  err = fetch_from_buf_line(conn->_base.inbuf, tmp_buf, &tlen);
+  err = connection_fetch_from_buf_line(TO_CONN(conn), tmp_buf, &tlen);
   if (err == 0)
     return 0;
   if (err < 0) {
@@ -2104,8 +2110,10 @@ connection_ap_handshake_send_begin(edge_connection_t *ap_conn)
                ap_conn->socks_request->port);
   payload_len = (int)strlen(payload)+1;
 
-  log_debug(LD_APP,
-            "Sending relay cell to begin stream %d.", ap_conn->stream_id);
+  log_info(LD_APP,
+           "Sending relay cell %d to begin stream %d.",
+           (int)ap_conn->use_begindir,
+           ap_conn->stream_id);
 
   begin_type = ap_conn->use_begindir ?
                  RELAY_COMMAND_BEGIN_DIR : RELAY_COMMAND_BEGIN;
@@ -2213,9 +2221,11 @@ connection_ap_handshake_send_resolve(edge_connection_t *ap_conn)
  * and call connection_ap_handshake_attach_circuit(conn) on it.
  *
  * Return the other end of the linked connection pair, or -1 if error.
+ * DOCDOC partner.
  */
 edge_connection_t *
-connection_ap_make_link(char *address, uint16_t port,
+connection_ap_make_link(connection_t *partner,
+                        char *address, uint16_t port,
                         const char *digest, int use_begindir, int want_onehop)
 {
   edge_connection_t *conn;
@@ -2250,6 +2260,8 @@ connection_ap_make_link(char *address, uint16_t port,
   tor_addr_make_unspec(&conn->_base.addr);
   conn->_base.port = 0;
 
+  connection_link_connections(partner, TO_CONN(conn));
+
   if (connection_add(TO_CONN(conn)) < 0) { /* no space, forget it */
     connection_free(TO_CONN(conn));
     return NULL;
@@ -2767,12 +2779,13 @@ connection_exit_connect(edge_connection_t *edge_conn)
   }
 
   conn->state = EXIT_CONN_STATE_OPEN;
-  if (connection_wants_to_flush(conn)) {
+  if (connection_get_outbuf_len(conn)) {
     /* in case there are any queued data cells */
     log_warn(LD_BUG,"newly connected conn had data waiting!");
 //    connection_start_writing(conn);
   }
-  connection_watch_events(conn, READ_EVENT);
+  IF_HAS_NO_BUFFEREVENT(conn)
+    connection_watch_events(conn, READ_EVENT);
 
   /* also, deliver a 'connected' cell back through the circuit. */
   if (connection_edge_is_rendezvous_stream(edge_conn)) {

+ 2 - 1
src/or/connection_edge.h

@@ -29,7 +29,8 @@ int connection_edge_finished_connecting(edge_connection_t *conn);
 int connection_ap_handshake_send_begin(edge_connection_t *ap_conn);
 int connection_ap_handshake_send_resolve(edge_connection_t *ap_conn);
 
-edge_connection_t  *connection_ap_make_link(char *address, uint16_t port,
+edge_connection_t  *connection_ap_make_link(connection_t *partner,
+                                            char *address, uint16_t port,
                                             const char *digest,
                                             int use_begindir, int want_onehop);
 void connection_ap_handshake_socks_reply(edge_connection_t *conn, char *reply,

+ 119 - 11
src/or/connection_or.c

@@ -28,6 +28,10 @@
 #include "router.h"
 #include "routerlist.h"
 
+#ifdef USE_BUFFEREVENTS
+#include <event2/bufferevent_ssl.h>
+#endif
+
 static int connection_tls_finish_handshake(or_connection_t *conn);
 static int connection_or_process_cells_from_inbuf(or_connection_t *conn);
 static int connection_or_send_versions(or_connection_t *conn);
@@ -37,6 +41,12 @@ static int connection_or_check_valid_tls_handshake(or_connection_t *conn,
                                                    int started_here,
                                                    char *digest_rcvd_out);
 
+#ifdef USE_BUFFEREVENTS
+static void connection_or_handle_event_cb(struct bufferevent *bufev,
+                                          short event, void *arg);
+#include <event2/buffer.h>/*XXXX REMOVE */
+#endif
+
 /**************************************************************/
 
 /** Map from identity digest of connected OR or desired OR to a connection_t
@@ -248,7 +258,7 @@ connection_or_process_inbuf(or_connection_t *conn)
 int
 connection_or_flushed_some(or_connection_t *conn)
 {
-  size_t datalen = buf_datalen(conn->_base.outbuf);
+  size_t datalen = connection_get_outbuf_len(TO_CONN(conn));
   /* If we're under the low water mark, add cells until we're just over the
    * high water mark. */
   if (datalen < OR_CONN_LOWWATER) {
@@ -281,7 +291,6 @@ connection_or_finished_flushing(or_connection_t *conn)
     case OR_CONN_STATE_PROXY_HANDSHAKING:
     case OR_CONN_STATE_OPEN:
     case OR_CONN_STATE_OR_HANDSHAKING:
-      connection_stop_writing(TO_CONN(conn));
       break;
     default:
       log_err(LD_BUG,"Called in unexpected state %d.", conn->_base.state);
@@ -379,6 +388,22 @@ connection_or_update_token_buckets_helper(or_connection_t *conn, int reset,
 
   conn->bandwidthrate = rate;
   conn->bandwidthburst = burst;
+#ifdef USE_BUFFEREVENTS
+  {
+    const struct timeval *tick = tor_libevent_get_one_tick_timeout();
+    struct ev_token_bucket_cfg *cfg, *old_cfg;
+    int rate_per_tick = rate / TOR_LIBEVENT_TICKS_PER_SECOND;
+    cfg = ev_token_bucket_cfg_new(rate_per_tick, burst, rate_per_tick,
+                                  burst, tick);
+    old_cfg = conn->bucket_cfg;
+    if (conn->_base.bufev)
+      bufferevent_set_rate_limit(conn->_base.bufev, cfg);
+    if (old_cfg)
+      ev_token_bucket_cfg_free(old_cfg);
+    conn->bucket_cfg = cfg;
+    (void) reset; /* No way to do this with libevent yet. */
+  }
+#else
   if (reset) { /* set up the token buckets to be full */
     conn->read_bucket = conn->write_bucket = burst;
     return;
@@ -389,6 +414,7 @@ connection_or_update_token_buckets_helper(or_connection_t *conn, int reset,
     conn->read_bucket = burst;
   if (conn->write_bucket > burst)
     conn->write_bucket = burst;
+#endif
 }
 
 /** Either our set of relays or our per-conn rate limits have changed.
@@ -852,6 +878,7 @@ int
 connection_tls_start_handshake(or_connection_t *conn, int receiving)
 {
   conn->_base.state = OR_CONN_STATE_TLS_HANDSHAKING;
+  tor_assert(!conn->tls);
   conn->tls = tor_tls_new(conn->_base.s, receiving);
   tor_tls_set_logged_address(conn->tls, // XXX client and relay?
       escaped_safe_str(conn->_base.address));
@@ -859,12 +886,34 @@ connection_tls_start_handshake(or_connection_t *conn, int receiving)
     log_warn(LD_BUG,"tor_tls_new failed. Closing.");
     return -1;
   }
+#ifdef USE_BUFFEREVENTS
+  if (connection_type_uses_bufferevent(TO_CONN(conn))) {
+    struct bufferevent *b =
+      tor_tls_init_bufferevent(conn->tls, conn->_base.bufev, conn->_base.s,
+                               receiving);
+    if (!b) {
+      log_warn(LD_BUG,"tor_tls_init_bufferevent failed. Closing.");
+      return -1;
+    }
+    conn->_base.bufev = b;
+    if (conn->bucket_cfg)
+      bufferevent_set_rate_limit(conn->_base.bufev, conn->bucket_cfg);
+    connection_enable_rate_limiting(TO_CONN(conn));
+    bufferevent_setcb(b, connection_handle_read_cb,
+                      connection_handle_write_cb,
+                      connection_or_handle_event_cb,
+                      TO_CONN(conn));
+  }
+#endif
   connection_start_reading(TO_CONN(conn));
   log_debug(LD_HANDSHAKE,"starting TLS handshake on fd %d", conn->_base.s);
   note_crypto_pk_op(receiving ? TLS_HANDSHAKE_S : TLS_HANDSHAKE_C);
 
-  if (connection_tls_continue_handshake(conn) < 0) {
-    return -1;
+  IF_HAS_BUFFEREVENT(TO_CONN(conn), {
+    /* ???? */;
+  }) ELSE_IF_NO_BUFFEREVENT {
+    if (connection_tls_continue_handshake(conn) < 0)
+      return -1;
   }
   return 0;
 }
@@ -949,6 +998,55 @@ connection_tls_continue_handshake(or_connection_t *conn)
   return 0;
 }
 
+#ifdef USE_BUFFEREVENTS
+static void
+connection_or_handle_event_cb(struct bufferevent *bufev, short event,
+                              void *arg)
+{
+  struct or_connection_t *conn = TO_OR_CONN(arg);
+
+  /* XXXX cut-and-paste code; should become a function. */
+  if (event & BEV_EVENT_CONNECTED) {
+    if (conn->_base.state == OR_CONN_STATE_TLS_HANDSHAKING) {
+      if (tor_tls_finish_handshake(conn->tls) < 0) {
+        log_warn(LD_OR, "Problem finishing handshake");
+        connection_mark_for_close(TO_CONN(conn));
+        return;
+      }
+    }
+
+    if (! tor_tls_used_v1_handshake(conn->tls)) {
+      if (!tor_tls_is_server(conn->tls)) {
+        if (conn->_base.state == OR_CONN_STATE_TLS_HANDSHAKING) {
+          conn->_base.state = OR_CONN_STATE_TLS_CLIENT_RENEGOTIATING;
+          tor_tls_unblock_renegotiation(conn->tls);
+          if (bufferevent_ssl_renegotiate(conn->_base.bufev)<0) {
+            log_warn(LD_OR, "Start_renegotiating went badly.");
+            connection_mark_for_close(TO_CONN(conn));
+          }
+          tor_tls_unblock_renegotiation(conn->tls);
+          return; /* ???? */
+        }
+      } else {
+        /* improved handshake, but not a client. */
+        tor_tls_set_renegotiate_callback(conn->tls,
+                                         connection_or_tls_renegotiated_cb,
+                                         conn);
+        conn->_base.state = OR_CONN_STATE_TLS_SERVER_RENEGOTIATING;
+        /* return 0; */
+        return; /* ???? */
+      }
+    }
+    connection_watch_events(TO_CONN(conn), READ_EVENT|WRITE_EVENT);
+    if (connection_tls_finish_handshake(conn) < 0)
+      connection_mark_for_close(TO_CONN(conn)); /* ???? */
+    return;
+  }
+
+  connection_handle_event_cb(bufev, event, arg);
+}
+#endif
+
 /** Return 1 if we initiated this connection, or 0 if it started
  * out as an incoming connection.
  */
@@ -1209,8 +1307,12 @@ connection_or_set_state_open(or_connection_t *conn)
 
   or_handshake_state_free(conn->handshake_state);
   conn->handshake_state = NULL;
+  IF_HAS_BUFFEREVENT(TO_CONN(conn), {
+    connection_watch_events(TO_CONN(conn), READ_EVENT|WRITE_EVENT);
+  }) ELSE_IF_NO_BUFFEREVENT {
+    connection_start_reading(TO_CONN(conn));
+  }
 
-  connection_start_reading(TO_CONN(conn));
   circuit_n_conn_done(conn, 1); /* send the pending creates, if any. */
 
   return 0;
@@ -1254,12 +1356,18 @@ connection_or_write_var_cell_to_buf(const var_cell_t *cell,
     conn->timestamp_last_added_nonpadding = approx_time();
 }
 
-/** See whether there's a variable-length cell waiting on <b>conn</b>'s
+/** See whether there's a variable-length cell waiting on <b>or_conn</b>'s
  * inbuf.  Return values as for fetch_var_cell_from_buf(). */
 static int
-connection_fetch_var_cell_from_buf(or_connection_t *conn, var_cell_t **out)
+connection_fetch_var_cell_from_buf(or_connection_t *or_conn, var_cell_t **out)
 {
-  return fetch_var_cell_from_buf(conn->_base.inbuf, out, conn->link_proto);
+  connection_t *conn = TO_CONN(or_conn);
+  IF_HAS_BUFFEREVENT(conn, {
+    struct evbuffer *input = bufferevent_get_input(conn->bufev);
+    return fetch_var_cell_from_evbuffer(input, out, or_conn->link_proto);
+  }) ELSE_IF_NO_BUFFEREVENT {
+    return fetch_var_cell_from_buf(conn->inbuf, out, or_conn->link_proto);
+  }
 }
 
 /** Process cells from <b>conn</b>'s inbuf.
@@ -1277,7 +1385,7 @@ connection_or_process_cells_from_inbuf(or_connection_t *conn)
   while (1) {
     log_debug(LD_OR,
               "%d: starting, inbuf_datalen %d (%d pending in tls object).",
-              conn->_base.s,(int)buf_datalen(conn->_base.inbuf),
+              conn->_base.s,(int)connection_get_inbuf_len(TO_CONN(conn)),
               tor_tls_get_pending_bytes(conn->tls));
     if (connection_fetch_var_cell_from_buf(conn, &var_cell)) {
       if (!var_cell)
@@ -1288,8 +1396,8 @@ connection_or_process_cells_from_inbuf(or_connection_t *conn)
     } else {
       char buf[CELL_NETWORK_SIZE];
       cell_t cell;
-      if (buf_datalen(conn->_base.inbuf) < CELL_NETWORK_SIZE) /* whole response
-                                                                 available? */
+      if (connection_get_inbuf_len(TO_CONN(conn))
+          < CELL_NETWORK_SIZE) /* whole response available? */
         return 0; /* not yet */
 
       circuit_build_times_network_is_live(&circ_times);

+ 13 - 4
src/or/control.c

@@ -2719,8 +2719,6 @@ int
 connection_control_finished_flushing(control_connection_t *conn)
 {
   tor_assert(conn);
-
-  connection_stop_writing(TO_CONN(conn));
   return 0;
 }
 
@@ -2755,6 +2753,17 @@ is_valid_initial_command(control_connection_t *conn, const char *cmd)
  * interfaces is broken. */
 #define MAX_COMMAND_LINE_LENGTH (1024*1024)
 
+static int
+peek_connection_has_control0_command(connection_t *conn)
+{
+  IF_HAS_BUFFEREVENT(conn, {
+    struct evbuffer *input = bufferevent_get_input(conn->bufev);
+    return peek_evbuffer_has_control0_command(input);
+  }) ELSE_IF_NO_BUFFEREVENT {
+    return peek_buf_has_control0_command(conn->inbuf);
+  }
+}
+
 /** Called when data has arrived on a v1 control connection: Try to fetch
  * commands from conn->inbuf, and execute them.
  */
@@ -2777,7 +2786,7 @@ connection_control_process_inbuf(control_connection_t *conn)
   }
 
   if (conn->_base.state == CONTROL_CONN_STATE_NEEDAUTH &&
-      peek_buf_has_control0_command(conn->_base.inbuf)) {
+      peek_connection_has_control0_command(TO_CONN(conn))) {
     /* Detect v0 commands and send a "no more v0" message. */
     size_t body_len;
     char buf[128];
@@ -2801,7 +2810,7 @@ connection_control_process_inbuf(control_connection_t *conn)
     /* First, fetch a line. */
     do {
       data_len = conn->incoming_cmd_len - conn->incoming_cmd_cur_len;
-      r = fetch_from_buf_line(conn->_base.inbuf,
+      r = connection_fetch_from_buf_line(TO_CONN(conn),
                               conn->incoming_cmd+conn->incoming_cmd_cur_len,
                               &data_len);
       if (r == 0)

+ 3 - 4
src/or/cpuworker.c

@@ -62,7 +62,6 @@ connection_cpu_finished_flushing(connection_t *conn)
 {
   tor_assert(conn);
   tor_assert(conn->type == CONN_TYPE_CPUWORKER);
-  connection_stop_writing(conn);
   return 0;
 }
 
@@ -141,13 +140,13 @@ connection_cpu_process_inbuf(connection_t *conn)
   tor_assert(conn);
   tor_assert(conn->type == CONN_TYPE_CPUWORKER);
 
-  if (!buf_datalen(conn->inbuf))
+  if (!connection_get_inbuf_len(conn))
     return 0;
 
   if (conn->state == CPUWORKER_STATE_BUSY_ONION) {
-    if (buf_datalen(conn->inbuf) < LEN_ONION_RESPONSE) /* answer available? */
+    if (connection_get_inbuf_len(conn) < LEN_ONION_RESPONSE)
       return 0; /* not yet */
-    tor_assert(buf_datalen(conn->inbuf) == LEN_ONION_RESPONSE);
+    tor_assert(connection_get_inbuf_len(conn) == LEN_ONION_RESPONSE);
 
     connection_fetch_from_buf(&success,1,conn);
     connection_fetch_from_buf(buf,LEN_ONION_RESPONSE-1,conn);

+ 11 - 7
src/or/directory.c

@@ -892,14 +892,14 @@ directory_initiate_command_rend(const char *address, const tor_addr_t *_addr,
      * hook up both sides
      */
     linked_conn =
-      connection_ap_make_link(conn->_base.address, conn->_base.port,
+      connection_ap_make_link(TO_CONN(conn),
+                              conn->_base.address, conn->_base.port,
                               digest, use_begindir, conn->dirconn_direct);
     if (!linked_conn) {
       log_warn(LD_NET,"Making tunnel to dirserver failed.");
       connection_mark_for_close(TO_CONN(conn));
       return;
     }
-    connection_link_connections(TO_CONN(conn), TO_CONN(linked_conn));
 
     if (connection_add(TO_CONN(conn)) < 0) {
       log_warn(LD_NET,"Unable to add connection for link to dirserver.");
@@ -912,8 +912,12 @@ directory_initiate_command_rend(const char *address, const tor_addr_t *_addr,
                            payload, payload_len,
                            supports_conditional_consensus,
                            if_modified_since);
+
     connection_watch_events(TO_CONN(conn), READ_EVENT|WRITE_EVENT);
-    connection_start_reading(TO_CONN(linked_conn));
+    IF_HAS_BUFFEREVENT(TO_CONN(linked_conn), {
+      connection_watch_events(TO_CONN(linked_conn), READ_EVENT|WRITE_EVENT);
+    }) ELSE_IF_NO_BUFFEREVENT
+      connection_start_reading(TO_CONN(linked_conn));
   }
 }
 
@@ -1466,7 +1470,7 @@ connection_dir_client_reached_eof(dir_connection_t *conn)
   int was_compressed=0;
   time_t now = time(NULL);
 
-  switch (fetch_from_buf_http(conn->_base.inbuf,
+  switch (connection_fetch_from_buf_http(TO_CONN(conn),
                               &headers, MAX_HEADERS_SIZE,
                               &body, &body_len, MAX_DIR_DL_SIZE,
                               allow_partial)) {
@@ -2134,7 +2138,7 @@ connection_dir_process_inbuf(dir_connection_t *conn)
     return 0;
   }
 
-  if (buf_datalen(conn->_base.inbuf) > MAX_DIRECTORY_OBJECT_SIZE) {
+  if (connection_get_inbuf_len(TO_CONN(conn)) > MAX_DIRECTORY_OBJECT_SIZE) {
     log_warn(LD_HTTP, "Too much data received from directory connection: "
              "denial of service attempt, or you need to upgrade?");
     connection_mark_for_close(TO_CONN(conn));
@@ -3299,7 +3303,7 @@ directory_handle_command(dir_connection_t *conn)
   tor_assert(conn);
   tor_assert(conn->_base.type == CONN_TYPE_DIR);
 
-  switch (fetch_from_buf_http(conn->_base.inbuf,
+  switch (connection_fetch_from_buf_http(TO_CONN(conn),
                               &headers, MAX_HEADERS_SIZE,
                               &body, &body_len, MAX_DIR_UL_SIZE, 0)) {
     case -1: /* overflow */
@@ -3352,10 +3356,10 @@ connection_dir_finished_flushing(dir_connection_t *conn)
                               DIRREQ_DIRECT,
                               DIRREQ_FLUSHING_DIR_CONN_FINISHED);
   switch (conn->_base.state) {
+    case DIR_CONN_STATE_CONNECTING:
     case DIR_CONN_STATE_CLIENT_SENDING:
       log_debug(LD_DIR,"client finished sending command.");
       conn->_base.state = DIR_CONN_STATE_CLIENT_READING;
-      connection_stop_writing(TO_CONN(conn));
       return 0;
     case DIR_CONN_STATE_SERVER_WRITING:
       log_debug(LD_DIRSERV,"Finished writing server response. Closing.");

+ 4 - 4
src/or/dirserv.c

@@ -3389,7 +3389,7 @@ connection_dirserv_add_servers_to_outbuf(dir_connection_t *conn)
   time_t publish_cutoff = time(NULL)-ROUTER_MAX_AGE_TO_PUBLISH;
 
   while (smartlist_len(conn->fingerprint_stack) &&
-         buf_datalen(conn->_base.outbuf) < DIRSERV_BUFFER_MIN) {
+         connection_get_outbuf_len(TO_CONN(conn)) < DIRSERV_BUFFER_MIN) {
     const char *body;
     char *fp = smartlist_pop_last(conn->fingerprint_stack);
     signed_descriptor_t *sd = NULL;
@@ -3489,7 +3489,7 @@ connection_dirserv_add_dir_bytes_to_outbuf(dir_connection_t *conn)
   ssize_t bytes;
   int64_t remaining;
 
-  bytes = DIRSERV_BUFFER_MIN - buf_datalen(conn->_base.outbuf);
+  bytes = DIRSERV_BUFFER_MIN - connection_get_outbuf_len(TO_CONN(conn));
   tor_assert(bytes > 0);
   tor_assert(conn->cached_dir);
   if (bytes < 8192)
@@ -3528,7 +3528,7 @@ static int
 connection_dirserv_add_networkstatus_bytes_to_outbuf(dir_connection_t *conn)
 {
 
-  while (buf_datalen(conn->_base.outbuf) < DIRSERV_BUFFER_MIN) {
+  while (connection_get_outbuf_len(TO_CONN(conn)) < DIRSERV_BUFFER_MIN) {
     if (conn->cached_dir) {
       int uncompressing = (conn->zlib_state != NULL);
       int r = connection_dirserv_add_dir_bytes_to_outbuf(conn);
@@ -3574,7 +3574,7 @@ connection_dirserv_flushed_some(dir_connection_t *conn)
 {
   tor_assert(conn->_base.state == DIR_CONN_STATE_SERVER_WRITING);
 
-  if (buf_datalen(conn->_base.outbuf) >= DIRSERV_BUFFER_MIN)
+  if (connection_get_outbuf_len(TO_CONN(conn)) >= DIRSERV_BUFFER_MIN)
     return 0;
 
   switch (conn->dir_spool_src) {

+ 184 - 17
src/or/main.c

@@ -56,6 +56,10 @@
 #include <event.h>
 #endif
 
+#ifdef USE_BUFFEREVENTS
+#include <event2/bufferevent.h>
+#endif
+
 void evdns_shutdown(int);
 
 /********* PROTOTYPES **********/
@@ -72,6 +76,7 @@ static int connection_should_read_from_linked_conn(connection_t *conn);
 
 /********* START VARIABLES **********/
 
+#ifndef USE_BUFFEREVENTS
 int global_read_bucket; /**< Max number of bytes I can read this second. */
 int global_write_bucket; /**< Max number of bytes I can write this second. */
 
@@ -79,13 +84,17 @@ int global_write_bucket; /**< Max number of bytes I can write this second. */
 int global_relayed_read_bucket;
 /** Max number of relayed (bandwidth class 1) bytes I can write this second. */
 int global_relayed_write_bucket;
-
 /** What was the read bucket before the last second_elapsed_callback() call?
  * (used to determine how many bytes we've read). */
 static int stats_prev_global_read_bucket;
 /** What was the write bucket before the last second_elapsed_callback() call?
  * (used to determine how many bytes we've written). */
 static int stats_prev_global_write_bucket;
+#else
+static uint64_t stats_prev_n_read = 0;
+static uint64_t stats_prev_n_written = 0;
+#endif
+
 /* XXX we might want to keep stats about global_relayed_*_bucket too. Or not.*/
 /** How many bytes have we read since we started the process? */
 static uint64_t stats_n_bytes_read = 0;
@@ -151,12 +160,38 @@ int can_complete_circuit=0;
 *
 ****************************************************************************/
 
+#ifdef USE_BUFFEREVENTS
+static void
+free_old_inbuf(connection_t *conn)
+{
+  if (! conn->inbuf)
+    return;
+
+  tor_assert(conn->outbuf);
+  tor_assert(buf_datalen(conn->inbuf) == 0);
+  tor_assert(buf_datalen(conn->outbuf) == 0);
+  buf_free(conn->inbuf);
+  buf_free(conn->outbuf);
+  conn->inbuf = conn->outbuf = NULL;
+
+  if (conn->read_event) {
+    event_del(conn->read_event);
+    tor_event_free(conn->read_event);
+  }
+  if (conn->write_event) {
+    event_del(conn->read_event);
+    tor_event_free(conn->write_event);
+  }
+  conn->read_event = conn->write_event = NULL;
+}
+#endif
+
 /** Add <b>conn</b> to the array of connections that we can poll on.  The
  * connection's socket must be set; the connection starts out
  * non-reading and non-writing.
  */
 int
-connection_add(connection_t *conn)
+connection_add_impl(connection_t *conn, int is_connecting)
 {
   tor_assert(conn);
   tor_assert(conn->s >= 0 ||
@@ -168,11 +203,59 @@ connection_add(connection_t *conn)
   conn->conn_array_index = smartlist_len(connection_array);
   smartlist_add(connection_array, conn);
 
-  if (conn->s >= 0 || conn->linked) {
+#ifdef USE_BUFFEREVENTS
+  if (connection_type_uses_bufferevent(conn)) {
+    if (conn->s >= 0 && !conn->linked) {
+      conn->bufev = bufferevent_socket_new(
+                         tor_libevent_get_base(),
+                         conn->s,
+                         BEV_OPT_DEFER_CALLBACKS);
+      /* XXXX CHECK FOR NULL RETURN! */
+      if (is_connecting) {
+        /* Put the bufferevent into a "connecting" state so that we'll get
+         * a "connected" event callback on successful write. */
+        bufferevent_socket_connect(conn->bufev, NULL, 0);
+      }
+      connection_configure_bufferevent_callbacks(conn);
+    } else if (conn->linked && conn->linked_conn &&
+               connection_type_uses_bufferevent(conn->linked_conn)) {
+      tor_assert(conn->s < 0);
+      if (!conn->bufev) {
+        struct bufferevent *pair[2] = { NULL, NULL };
+        /* XXXX CHECK FOR ERROR RETURN! */
+        bufferevent_pair_new(tor_libevent_get_base(),
+                             BEV_OPT_DEFER_CALLBACKS,
+                             pair);
+        tor_assert(pair[0]);
+        conn->bufev = pair[0];
+        conn->linked_conn->bufev = pair[1];
+      } /* else the other side already was added, and got a bufferevent_pair */
+      connection_configure_bufferevent_callbacks(conn);
+    }
+
+    if (conn->bufev && conn->inbuf) {
+      /* XXX Instead we should assert that there is no inbuf, once we
+       * have linked connections using bufferevents. */
+      free_old_inbuf(conn);
+    }
+
+    if (conn->linked_conn && conn->linked_conn->bufev &&
+        conn->linked_conn->inbuf) {
+      /* XXX Instead we should assert that there is no inbuf, once we
+       * have linked connections using bufferevents. */
+      free_old_inbuf(conn->linked_conn);
+    }
+  }
+#else
+  (void) is_connecting;
+#endif
+
+  if (!HAS_BUFFEREVENT(conn) && (conn->s >= 0 || conn->linked)) {
     conn->read_event = tor_event_new(tor_libevent_get_base(),
          conn->s, EV_READ|EV_PERSIST, conn_read_callback, conn);
     conn->write_event = tor_event_new(tor_libevent_get_base(),
          conn->s, EV_WRITE|EV_PERSIST, conn_write_callback, conn);
+    /* XXXX CHECK FOR NULL RETURN! */
   }
 
   log_debug(LD_NET,"new conn type %s, socket %d, address %s, n_conns %d.",
@@ -196,6 +279,12 @@ connection_unregister_events(connection_t *conn)
       log_warn(LD_BUG, "Error removing write event for %d", conn->s);
     tor_free(conn->write_event);
   }
+#ifdef USE_BUFFEREVENTS
+  if (conn->bufev) {
+    bufferevent_free(conn->bufev);
+    conn->bufev = NULL;
+  }
+#endif
   if (conn->dns_server_port) {
     dnsserv_close_listener(conn);
   }
@@ -310,6 +399,17 @@ get_connection_array(void)
 void
 connection_watch_events(connection_t *conn, watchable_events_t events)
 {
+  IF_HAS_BUFFEREVENT(conn, {
+      short ev = ((short)events) & (EV_READ|EV_WRITE);
+      short old_ev = bufferevent_get_enabled(conn->bufev);
+      if ((ev & ~old_ev) != 0) {
+        bufferevent_enable(conn->bufev, ev);
+      }
+      if ((old_ev & ~ev) != 0) {
+        bufferevent_disable(conn->bufev, old_ev & ~ev);
+      }
+      return;
+  });
   if (events & READ_EVENT)
     connection_start_reading(conn);
   else
@@ -327,6 +427,9 @@ connection_is_reading(connection_t *conn)
 {
   tor_assert(conn);
 
+  IF_HAS_BUFFEREVENT(conn,
+    return (bufferevent_get_enabled(conn->bufev) & EV_READ) != 0;
+  );
   return conn->reading_from_linked_conn ||
     (conn->read_event && event_pending(conn->read_event, EV_READ, NULL));
 }
@@ -336,6 +439,12 @@ void
 connection_stop_reading(connection_t *conn)
 {
   tor_assert(conn);
+
+  IF_HAS_BUFFEREVENT(conn, {
+      bufferevent_disable(conn->bufev, EV_READ);
+      return;
+  });
+
   tor_assert(conn->read_event);
 
   if (conn->linked) {
@@ -355,6 +464,12 @@ void
 connection_start_reading(connection_t *conn)
 {
   tor_assert(conn);
+
+  IF_HAS_BUFFEREVENT(conn, {
+      bufferevent_enable(conn->bufev, EV_READ);
+      return;
+  });
+
   tor_assert(conn->read_event);
 
   if (conn->linked) {
@@ -376,6 +491,10 @@ connection_is_writing(connection_t *conn)
 {
   tor_assert(conn);
 
+  IF_HAS_BUFFEREVENT(conn,
+    return (bufferevent_get_enabled(conn->bufev) & EV_WRITE) != 0;
+  );
+
   return conn->writing_to_linked_conn ||
     (conn->write_event && event_pending(conn->write_event, EV_WRITE, NULL));
 }
@@ -385,6 +504,12 @@ void
 connection_stop_writing(connection_t *conn)
 {
   tor_assert(conn);
+
+  IF_HAS_BUFFEREVENT(conn, {
+      bufferevent_disable(conn->bufev, EV_WRITE);
+      return;
+  });
+
   tor_assert(conn->write_event);
 
   if (conn->linked) {
@@ -405,6 +530,12 @@ void
 connection_start_writing(connection_t *conn)
 {
   tor_assert(conn);
+
+  IF_HAS_BUFFEREVENT(conn, {
+      bufferevent_enable(conn->bufev, EV_WRITE);
+      return;
+  });
+
   tor_assert(conn->write_event);
 
   if (conn->linked) {
@@ -590,7 +721,20 @@ conn_close_if_marked(int i)
   assert_connection_ok(conn, now);
   /* assert_all_pending_dns_resolves_ok(); */
 
+#ifdef USE_BUFFEREVENTS
+  if (conn->bufev && conn->hold_open_until_flushed) {
+    if (conn->linked) {
+      /* We need to do this explicitly so that the linked connection
+       * notices that there was an EOF. */
+      bufferevent_flush(conn->bufev, EV_WRITE, BEV_FINISHED);
+    }
+    if (evbuffer_get_length(bufferevent_get_output(conn->bufev)))
+      return 0;
+  }
+#endif
+
   log_debug(LD_NET,"Cleaning up connection (fd %d).",conn->s);
+  IF_HAS_BUFFEREVENT(conn, goto unlink);
   if ((conn->s >= 0 || conn->linked_conn) && connection_wants_to_flush(conn)) {
     /* s == -1 means it's an incomplete edge connection, or that the socket
      * has already been closed as unflushable. */
@@ -613,8 +757,8 @@ conn_close_if_marked(int i)
       }
       log_debug(LD_GENERAL, "Flushed last %d bytes from a linked conn; "
                "%d left; flushlen %d; wants-to-flush==%d", retval,
-               (int)buf_datalen(conn->outbuf),
-               (int)conn->outbuf_flushlen,
+                (int)connection_get_outbuf_len(conn),
+                (int)conn->outbuf_flushlen,
                 connection_wants_to_flush(conn));
     } else if (connection_speaks_cells(conn)) {
       if (conn->state == OR_CONN_STATE_OPEN) {
@@ -651,13 +795,17 @@ conn_close_if_marked(int i)
              "something is wrong with your network connection, or "
              "something is wrong with theirs. "
              "(fd %d, type %s, state %d, marked at %s:%d).",
-             (int)buf_datalen(conn->outbuf),
+             (int)connection_get_outbuf_len(conn),
              escaped_safe_str_client(conn->address),
              conn->s, conn_type_to_string(conn->type), conn->state,
              conn->marked_for_close_file,
              conn->marked_for_close);
     }
   }
+
+#ifdef USE_BUFFEREVENTS
+ unlink:
+#endif
   connection_unlink(conn); /* unlink, remove, free */
   return 1;
 }
@@ -744,7 +892,8 @@ run_connection_housekeeping(int i, time_t now)
   int past_keepalive =
     now >= conn->timestamp_lastwritten + options->KeepalivePeriod;
 
-  if (conn->outbuf && !buf_datalen(conn->outbuf) && conn->type == CONN_TYPE_OR)
+  if (conn->outbuf && !connection_get_outbuf_len(conn) &&
+      conn->type == CONN_TYPE_OR)
     TO_OR_CONN(conn)->timestamp_lastempty = now;
 
   if (conn->marked_for_close) {
@@ -764,7 +913,7 @@ run_connection_housekeeping(int i, time_t now)
     /* This check is temporary; it's to let us know whether we should consider
      * parsing partial serverdesc responses. */
     if (conn->purpose == DIR_PURPOSE_FETCH_SERVERDESC &&
-        buf_datalen(conn->inbuf)>=1024) {
+        connection_get_inbuf_len(conn) >= 1024) {
       log_info(LD_DIR,"Trying to extract information from wedged server desc "
                "download.");
       connection_dir_reached_eof(TO_DIR_CONN(conn));
@@ -781,7 +930,11 @@ run_connection_housekeeping(int i, time_t now)
      the connection or send a keepalive, depending. */
 
   or_conn = TO_OR_CONN(conn);
+#ifdef USE_BUFFEREVENTS
+  tor_assert(conn->bufev);
+#else
   tor_assert(conn->outbuf);
+#endif
 
   if (or_conn->is_bad_for_new_circs && !or_conn->n_circuits) {
     /* It's bad for new circuits, and has no unmarked circuits on it:
@@ -803,13 +956,12 @@ run_connection_housekeeping(int i, time_t now)
       connection_mark_for_close(conn);
     }
   } else if (we_are_hibernating() && !or_conn->n_circuits &&
-             !buf_datalen(conn->outbuf)) {
+             !connection_get_outbuf_len(conn)) {
     /* We're hibernating, there's no circuits, and nothing to flush.*/
     log_info(LD_OR,"Expiring non-used OR connection to fd %d (%s:%d) "
              "[Hibernating or exiting].",
              conn->s,conn->address, conn->port);
-    connection_mark_for_close(conn);
-    conn->hold_open_until_flushed = 1;
+    connection_mark_and_flush(conn);
   } else if (!or_conn->n_circuits &&
              now >= or_conn->timestamp_last_added_nonpadding +
                                          IDLE_OR_CONN_TIMEOUT) {
@@ -817,7 +969,6 @@ run_connection_housekeeping(int i, time_t now)
              "[idle %d].", conn->s,conn->address, conn->port,
              (int)(now - or_conn->timestamp_last_added_nonpadding));
     connection_mark_for_close(conn);
-    conn->hold_open_until_flushed = 1;
   } else if (
       now >= or_conn->timestamp_lastempty + options->KeepalivePeriod*10 &&
       now >= conn->timestamp_lastwritten + options->KeepalivePeriod*10) {
@@ -825,10 +976,10 @@ run_connection_housekeeping(int i, time_t now)
            "Expiring stuck OR connection to fd %d (%s:%d). (%d bytes to "
            "flush; %d seconds since last write)",
            conn->s, conn->address, conn->port,
-           (int)buf_datalen(conn->outbuf),
+           (int)connection_get_outbuf_len(conn),
            (int)(now-conn->timestamp_lastwritten));
     connection_mark_for_close(conn);
-  } else if (past_keepalive && !buf_datalen(conn->outbuf)) {
+  } else if (past_keepalive && !connection_get_outbuf_len(conn)) {
     /* send a padding cell */
     log_fn(LOG_DEBUG,LD_OR,"Sending keepalive to (%s:%d)",
            conn->address, conn->port);
@@ -1249,6 +1400,9 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
   size_t bytes_written;
   size_t bytes_read;
   int seconds_elapsed;
+#ifdef USE_BUFFEREVENTS
+  uint64_t cur_read,cur_written;
+#endif
   or_options_t *options = get_options();
   (void)timer;
   (void)arg;
@@ -1260,9 +1414,15 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
   update_approx_time(now);
 
   /* the second has rolled over. check more stuff. */
+  seconds_elapsed = current_second ? (int)(now - current_second) : 0;
+#ifdef USE_BUFFEREVENTS
+  connection_get_rate_limit_totals(&cur_read, &cur_written);
+  bytes_written = (size_t)(cur_written - stats_prev_n_written);
+  bytes_read = (size_t)(cur_read - stats_prev_n_read);
+#else
   bytes_written = stats_prev_global_write_bucket - global_write_bucket;
   bytes_read = stats_prev_global_read_bucket - global_read_bucket;
-  seconds_elapsed = current_second ? (int)(now - current_second) : 0;
+#endif
   stats_n_bytes_read += bytes_read;
   stats_n_bytes_written += bytes_written;
   if (accounting_is_enabled(options) && seconds_elapsed >= 0)
@@ -1272,8 +1432,13 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
 
   if (seconds_elapsed > 0)
     connection_bucket_refill(seconds_elapsed, now);
+#ifdef USE_BUFFEREVENTS
+  stats_prev_n_written = cur_written;
+  stats_prev_n_read = cur_read;
+#else
   stats_prev_global_read_bucket = global_read_bucket;
   stats_prev_global_write_bucket = global_write_bucket;
+#endif
 
   if (server_mode(options) &&
       !we_are_hibernating() &&
@@ -1474,8 +1639,10 @@ do_main_loop(void)
 
   /* Set up our buckets */
   connection_bucket_init();
+#ifndef USE_BUFFEREVENTS
   stats_prev_global_read_bucket = global_read_bucket;
   stats_prev_global_write_bucket = global_write_bucket;
+#endif
 
   /* initialize the bootstrap status events to know we're starting up */
   control_event_bootstrap(BOOTSTRAP_STATUS_STARTING, 0);
@@ -1719,13 +1886,13 @@ dumpstats(int severity)
       log(severity,LD_GENERAL,
           "Conn %d: %d bytes waiting on inbuf (len %d, last read %d secs ago)",
           i,
-          (int)buf_datalen(conn->inbuf),
+          (int)connection_get_inbuf_len(conn),
           (int)buf_allocation(conn->inbuf),
           (int)(now - conn->timestamp_lastread));
       log(severity,LD_GENERAL,
           "Conn %d: %d bytes waiting on outbuf "
           "(len %d, last written %d secs ago)",i,
-          (int)buf_datalen(conn->outbuf),
+          (int)connection_get_outbuf_len(conn),
           (int)buf_allocation(conn->outbuf),
           (int)(now - conn->timestamp_lastwritten));
       if (conn->type == CONN_TYPE_OR) {

+ 4 - 1
src/or/main.h

@@ -14,7 +14,9 @@
 
 extern int can_complete_circuit;
 
-int connection_add(connection_t *conn);
+int connection_add_impl(connection_t *conn, int is_connecting);
+#define connection_add(conn) connection_add_impl((conn), 0)
+#define connection_add_connecting(conn) connection_add_impl((conn), 1)
 int connection_remove(connection_t *conn);
 void connection_unregister_events(connection_t *conn);
 int connection_in_array(connection_t *conn);
@@ -24,6 +26,7 @@ int connection_is_on_closeable_list(connection_t *conn);
 smartlist_t *get_connection_array(void);
 
 typedef enum watchable_events {
+  /* Yes, it is intentional that these match Libevent's EV_READ and EV_WRITE */
   READ_EVENT=0x02,
   WRITE_EVENT=0x04
 } watchable_events_t;

+ 64 - 2
src/or/or.h

@@ -83,6 +83,13 @@
 #define snprintf _snprintf
 #endif
 
+#ifdef USE_BUFFEREVENTS
+#include <event2/bufferevent.h>
+#include <event2/buffer.h>
+#include <event2/util.h>
+#endif
+
+#include "crypto.h"
 #include "tortls.h"
 #include "../common/torlog.h"
 #include "container.h"
@@ -959,6 +966,7 @@ typedef struct connection_t {
   /** Our socket; -1 if this connection is closed, or has no socket. */
   evutil_socket_t s;
   int conn_array_index; /**< Index into the global connection array. */
+
   struct event *read_event; /**< Libevent event structure. */
   struct event *write_event; /**< Libevent event structure. */
   buf_t *inbuf; /**< Buffer holding data read over this connection. */
@@ -969,6 +977,11 @@ typedef struct connection_t {
                               * read? */
   time_t timestamp_lastwritten; /**< When was the last time libevent said we
                                  * could write? */
+
+#ifdef USE_BUFFEREVENTS
+  struct bufferevent *bufev; /**< A Libevent buffered IO structure. */
+#endif
+
   time_t timestamp_created; /**< When was this connection_t created? */
 
   /* XXXX_IP6 make this IPv6-capable */
@@ -1065,10 +1078,16 @@ typedef struct or_connection_t {
   /* bandwidth* and *_bucket only used by ORs in OPEN state: */
   int bandwidthrate; /**< Bytes/s added to the bucket. (OPEN ORs only.) */
   int bandwidthburst; /**< Max bucket size for this conn. (OPEN ORs only.) */
+#ifndef USE_BUFFEREVENTS
   int read_bucket; /**< When this hits 0, stop receiving. Every second we
                     * add 'bandwidthrate' to this, capping it at
                     * bandwidthburst. (OPEN ORs only) */
   int write_bucket; /**< When this hits 0, stop writing. Like read_bucket. */
+#else
+  /** DOCDOC */
+  /* XXXX we could share this among all connections. */
+  struct ev_token_bucket_cfg *bucket_cfg;
+#endif
   int n_circuits; /**< How many circuits use this connection as p_conn or
                    * n_conn ? */
 
@@ -1264,6 +1283,51 @@ static INLINE control_connection_t *TO_CONTROL_CONN(connection_t *c)
   return DOWNCAST(control_connection_t, c);
 }
 
+/* Conditional macros to help write code that works whether bufferevents are
+   disabled or not.
+
+   We can't just write:
+      if (conn->bufev) {
+        do bufferevent stuff;
+      } else {
+        do other stuff;
+      }
+   because the bufferevent stuff won't even compile unless we have a fairly
+   new version of Libevent.  Instead, we say:
+      IF_HAS_BUFFEREVENT(conn, { do_bufferevent_stuff } );
+   or:
+      IF_HAS_BUFFEREVENT(conn, {
+        do bufferevent stuff;
+      }) ELSE_IF_NO_BUFFEREVENT {
+        do non-bufferevent stuff;
+      }
+   If we're compiling with bufferevent support, then the macros expand more or
+   less to:
+      if (conn->bufev) {
+        do_bufferevent_stuff;
+      } else {
+        do non-bufferevent stuff;
+      }
+   and if we aren't using bufferevents, they expand more or less to:
+      { do non-bufferevent stuff; }
+*/
+#ifdef USE_BUFFEREVENTS
+#define HAS_BUFFEREVENT(c) (((c)->bufev) != NULL)
+#define IF_HAS_BUFFEREVENT(c, stmt)                \
+  if ((c)->bufev) do {                             \
+      stmt ;                                       \
+  } while (0)
+#define ELSE_IF_NO_BUFFEREVENT ; else
+#define IF_HAS_NO_BUFFEREVENT(c)                   \
+  if (NULL == (c)->bufev)
+#else
+#define HAS_BUFFEREVENT(c) (0)
+#define IF_HAS_BUFFEREVENT(c, stmt) (void)0
+#define ELSE_IF_NO_BUFFEREVENT ;
+#define IF_HAS_NO_BUFFEREVENT(c)                \
+  if (1)
+#endif
+
 /** What action type does an address policy indicate: accept or reject? */
 typedef enum {
   ADDR_POLICY_ACCEPT=1,
@@ -2921,8 +2985,6 @@ struct socks_request_t {
                               * every connection. */
 };
 
-/* all the function prototypes go here */
-
 /********************************* circuitbuild.c **********************/
 
 /** How many hops does a general-purpose circuit have by default? */

+ 5 - 6
src/or/relay.c

@@ -1157,8 +1157,7 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
       if (!conn->_base.marked_for_close) {
         /* only mark it if not already marked. it's possible to
          * get the 'end' right around when the client hangs up on us. */
-        connection_mark_for_close(TO_CONN(conn));
-        conn->_base.hold_open_until_flushed = 1;
+        connection_mark_and_flush(TO_CONN(conn));
       }
       return 0;
     case RELAY_COMMAND_EXTEND:
@@ -1365,7 +1364,7 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial,
     return 0;
   }
 
-  amount_to_process = buf_datalen(conn->_base.inbuf);
+  amount_to_process = connection_get_inbuf_len(TO_CONN(conn));
 
   if (!amount_to_process)
     return 0;
@@ -1384,7 +1383,7 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial,
   connection_fetch_from_buf(payload, length, TO_CONN(conn));
 
   log_debug(domain,"(%d) Packaging %d bytes (%d waiting).", conn->_base.s,
-            (int)length, (int)buf_datalen(conn->_base.inbuf));
+            (int)length, (int)connection_get_inbuf_len(TO_CONN(conn)));
 
   if (connection_edge_send_command(conn, RELAY_COMMAND_DATA,
                                    payload, length) < 0 )
@@ -2214,7 +2213,7 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
       edge->edge_blocked_on_circ = block;
     }
 
-    if (!conn->read_event) {
+    if (!conn->read_event && !HAS_BUFFEREVENT(conn)) {
       /* This connection is a placeholder for something; probably a DNS
        * request.  It can't actually stop or start reading.*/
       continue;
@@ -2415,7 +2414,7 @@ append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
     make_circuit_active_on_conn(circ, orconn);
   }
 
-  if (! buf_datalen(orconn->_base.outbuf)) {
+  if (! connection_get_outbuf_len(TO_CONN(orconn))) {
     /* There is no data at all waiting to be sent on the outbuf.  Add a
      * cell, so that we can notice when it gets flushed, flushed_some can
      * get called, and we can start putting more data onto the buffer then.

+ 10 - 1
src/test/Makefile.am

@@ -21,10 +21,19 @@ test_SOURCES = \
 	test_util.c \
 	tinytest.c
 
+if USE_BUFFEREVENTS
+levent_openssl_lib = -levent_openssl
+else
+levent_openssl_lib =
+endif
+
 test_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \
         @TOR_LDFLAGS_libevent@
 test_LDADD = ../or/libtor.a ../common/libor.a ../common/libor-crypto.a \
 	../common/libor-event.a \
-	@TOR_ZLIB_LIBS@ -lm @TOR_LIBEVENT_LIBS@ @TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@
+	@TOR_ZLIB_LIBS@ -lm @TOR_LIBEVENT_LIBS@ @TOR_OPENSSL_LIBS@ \
+	@TOR_LIB_WS32@ @TOR_LIB_GDI@ $(levent_openssl_lib)
 
 noinst_HEADERS = tinytest.h tinytest_macros.h test.h
+
+