Browse Source

Refactor the PIR subprocess routines into their own file

This is in preparation for having a PIR client subprocess as well.
Ian Goldberg 5 years ago
parent
commit
70c4062d76
4 changed files with 328 additions and 229 deletions
  1. 2 0
      src/core/include.am
  2. 9 229
      src/feature/hs/hs_cache.c
  3. 291 0
      src/feature/hs/hs_pirprocess.c
  4. 26 0
      src/feature/hs/hs_pirprocess.h

+ 2 - 0
src/core/include.am

@@ -94,6 +94,7 @@ LIBTOR_APP_A_SOURCES = 				\
 	src/feature/hs/hs_descriptor.c		\
 	src/feature/hs/hs_ident.c		\
 	src/feature/hs/hs_intropoint.c		\
+	src/feature/hs/hs_pirprocess.c		\
 	src/feature/hs/hs_service.c		\
 	src/feature/hs/hs_stats.c		\
 	src/feature/hs_common/replaycache.c	\
@@ -319,6 +320,7 @@ noinst_HEADERS +=					\
 	src/feature/hs/hs_descriptor.h			\
 	src/feature/hs/hs_ident.h			\
 	src/feature/hs/hs_intropoint.h			\
+	src/feature/hs/hs_pirprocess.h			\
 	src/feature/hs/hs_service.h			\
 	src/feature/hs/hs_stats.h			\
 	src/feature/hs/hsdir_index_st.h			\

+ 9 - 229
src/feature/hs/hs_cache.c

@@ -26,16 +26,7 @@
 
 #include "feature/nodelist/networkstatus_st.h"
 
-#define SUBPROCESS_PRIVATE
-#include "lib/process/env.h"
-#include "lib/process/subprocess.h"
-#include "lib/evloop/compat_libevent.h"
-
-#include <event2/event.h>
-
-#ifdef HAVE_UNISTD_H
-#include <unistd.h>
-#endif
+#include "feature/hs/hs_pirprocess.h"
 
 static int cached_client_descriptor_has_expired(time_t now,
            const hs_cache_client_descriptor_t *cached_desc);
@@ -978,19 +969,6 @@ hs_cache_get_max_descriptor_size(void)
                                             HS_DESC_MAX_LEN, 1, INT32_MAX);
 }
 
-static process_handle_t *pirserver;
-static buf_t *pirserver_stdin_buf;
-static struct event *pirserver_stdin_ev;
-static struct event *pirserver_stdout_ev;
-static struct event *pirserver_stderr_ev;
-
-typedef enum {
-    PIRSERVER_READSTATE_HEADER,
-    PIRSERVER_READSTATE_BODY
-} PIRServerReadState;
-
-#define PIRSERVER_HDR_SIZE 13
-
 #define PIRSERVER_REQUEST_PARAMS 0x01
 #define PIRSERVER_REQUEST_STORE 0x02
 #define PIRSERVER_REQUEST_LOOKUP 0x03
@@ -1013,199 +991,15 @@ hs_cache_pirserver_received(const unsigned char *hdrbuf,
     }
 }
 
-/* This is called when the pirserver has output for us. */
-static void
-hs_cache_pirserver_stdoutcb(evutil_socket_t fd, short what,
-        ATTR_UNUSED void *arg) {
-    static PIRServerReadState readstate = PIRSERVER_READSTATE_HEADER;
-    static size_t readoff = 0;
-    static size_t readleft = PIRSERVER_HDR_SIZE;
-    static unsigned char hdrbuf[PIRSERVER_HDR_SIZE];
-    static char *bodybuf = NULL;
-
-    if (!(what & EV_READ)) {
-        /* Not sure why we're here */
-        return;
-    }
-
-    if (readstate == PIRSERVER_READSTATE_HEADER) {
-        int res = read(fd, hdrbuf + readoff, readleft);
-        if (res <= 0) return;
-        readoff += res;
-        readleft -= res;
-        if (readleft == 0) {
-            readleft = ntohl(*(uint32_t*)(hdrbuf+PIRSERVER_HDR_SIZE-4));
-            tor_free(bodybuf);
-            if (readleft > 0) {
-                bodybuf = tor_malloc(readleft);
-                readoff = 0;
-                readstate = PIRSERVER_READSTATE_BODY;
-            } else {
-                hs_cache_pirserver_received(hdrbuf, NULL, 0);
-                readoff = 0;
-                readleft = PIRSERVER_HDR_SIZE;
-                readstate = PIRSERVER_READSTATE_HEADER;
-            }
-        }
-    } else if (readstate == PIRSERVER_READSTATE_BODY) {
-        int res = read(fd, bodybuf + readoff, readleft);
-        if (res <= 0) return;
-        readoff += res;
-        readleft -= res;
-        if (readleft == 0) {
-            /* Reading is complete */
-            hs_cache_pirserver_received(hdrbuf, bodybuf, readoff);
-
-            /* Prepare for the next output from the PIR server */
-            tor_free(bodybuf);
-            readoff = 0;
-            readleft = PIRSERVER_HDR_SIZE;
-            readstate = PIRSERVER_READSTATE_HEADER;
-        }
-    }
-}
-
-/* This is called when the pirserver is ready to read from its stdin. */
-static void
-hs_cache_pirserver_stdincb(evutil_socket_t fd, short what,
-        ATTR_UNUSED void *arg) {
-    int res;
-    size_t bufsize = buf_datalen(pirserver_stdin_buf);
-    char *netbuf = NULL;
-
-    if (!(what & EV_WRITE)) {
-        /* Not sure why we're here */
-        log_info(LD_DIRSERV,"PIRSERVER bailing");
-        return;
-    }
-
-    if (bufsize == 0) {
-        log_err(LD_DIRSERV,"PIRSERVER trying to write 0-length buffer");
-        return;
-    }
-
-    netbuf = tor_malloc(bufsize);
-    if (netbuf == NULL) {
-        log_err(LD_DIRSERV,"PIRSERVER failed to allocate buffer");
-        return;
-    }
-
-    /* One might think that just calling buf_flush_to_socket would be
-     * the thing to do, but that function ends up calling sendto()
-     * instead of write(), which doesn't work on pipes.  So we do it
-     * more manually.  Using a bufferevent may be another option. */
-    buf_peek(pirserver_stdin_buf, netbuf, bufsize);
-    res = write(fd, netbuf, bufsize);
-    tor_free(netbuf);
-    if (res < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-        /* Try writing again later */
-        event_add(pirserver_stdin_ev, NULL);
-        return;
-    }
-    if (res <= 0) {
-        /* Stop trying to write. */
-        return;
-    }
-    buf_drain(pirserver_stdin_buf, res);
-    bufsize -= res;
-
-    if (bufsize > 0) {
-        /* There's more to write */
-        event_add(pirserver_stdin_ev, NULL);
-    }
-}
-
-/* This is called when the pirserver writes something to its stderr. */
-static void
-hs_cache_pirserver_stderrcb(evutil_socket_t fd, short what,
-        ATTR_UNUSED void *arg) {
-    if (!(what & EV_READ)) {
-        /* Not sure why we're here */
-        return;
-    }
-
-    char buf[1000];
-    int res = read(fd, buf, sizeof(buf)-1);
-    if (res <= 0) return;
-    buf[res] = '\0';
-    log_info(LD_DIRSERV,"PIRSERVER %s", escaped(buf));
-}
+static pir_process_t pirserver;
 
 /* Poke the hidden service cache PIR subsystem to launch the PIR
    server if needed. */
 static void
 hs_cache_pir_poke(void)
 {
-    int res;
-    const char *pirserverpath = getenv("PIR_SERVER_PATH");
-    smartlist_t *env_vars = get_current_process_environment_variables();
-    const char *argv[2];
-    process_environment_t *env;
-
-    if (pirserver && pirserver->status == PROCESS_STATUS_RUNNING) {
-        /* The PIR server appears to be open */
-        return;
-    }
-
-    if (pirserver) {
-        if (pirserver_stdin_buf) {
-            buf_free(pirserver_stdin_buf);
-            pirserver_stdin_buf = NULL;
-        }
-        if (pirserver_stdin_ev) {
-            event_free(pirserver_stdin_ev);
-            pirserver_stdin_ev = NULL;
-        }
-        if (pirserver_stdout_ev) {
-            event_free(pirserver_stdout_ev);
-            pirserver_stdout_ev = NULL;
-        }
-        if (pirserver_stderr_ev) {
-            event_free(pirserver_stderr_ev);
-            pirserver_stderr_ev = NULL;
-        }
-        tor_process_handle_destroy(pirserver, 1);
-        pirserver = NULL;
-    }
-
-    if (!pirserverpath) {
-        /* We don't have a configured PIR server */
-        return;
-    }
-
-    argv[0] = pirserverpath;
-    argv[1] = NULL;
-
-    env = process_environment_make(env_vars);
-
-    res = tor_spawn_background(pirserverpath, argv, env, &pirserver);
-
-    SMARTLIST_FOREACH(env_vars, void *, x, tor_free(x));
-    smartlist_free(env_vars);
-
-    if (res != PROCESS_STATUS_RUNNING) {
-        /* Launch failure */
-        return;
-    }
-
-    /* Create a libevent event to listen to the PIR server's responses. */
-    pirserver_stdout_ev = event_new(tor_libevent_get_base(),
-        pirserver->stdout_pipe, EV_READ|EV_PERSIST,
-        hs_cache_pirserver_stdoutcb, NULL);
-    event_add(pirserver_stdout_ev, NULL);
-
-    /* And one to listen to the PIR server's stderr. */
-    pirserver_stderr_ev = event_new(tor_libevent_get_base(),
-        pirserver->stderr_pipe, EV_READ|EV_PERSIST,
-        hs_cache_pirserver_stderrcb, NULL);
-    event_add(pirserver_stderr_ev, NULL);
-
-    /* And one for writability to the pirserver's stdin, but don't add
-     * it just yet.  Also create the buffer it will use. */
-    pirserver_stdin_buf = buf_new();
-    pirserver_stdin_ev = event_new(tor_libevent_get_base(),
-        pirserver->stdin_pipe, EV_WRITE, hs_cache_pirserver_stdincb,
-        NULL);
+    hs_pirprocess_poke(getenv("PIR_SERVER_PATH"), "PIRSERVER",
+        hs_cache_pirserver_received, &pirserver);
 }
 
 /* Initialize the hidden service cache PIR subsystem. */
@@ -1213,27 +1007,13 @@ static void
 hs_cache_pir_init(void)
 {
     pirserver = NULL;
-    pirserver_stdin_buf = NULL;
-    pirserver_stdin_ev = NULL;
-    pirserver_stdout_ev = NULL;
-    pirserver_stderr_ev = NULL;
 }
 
 static int
 hs_cache_pirserver_send(const unsigned char *buf, size_t len)
 {
     hs_cache_pir_poke();
-    if (pirserver == NULL || pirserver->status != PROCESS_STATUS_RUNNING) {
-        /* Launch failed */
-        return -1;
-    }
-
-    /* Write the data to the stdin buffer */
-    if (len > 0) {
-        buf_add(pirserver_stdin_buf, (const char *)buf, len);
-        event_add(pirserver_stdin_ev, NULL);
-    }
-    return len;
+    return hs_pirprocess_send(pirserver, buf, len);
 }
 
 static int
@@ -1255,7 +1035,7 @@ hs_cache_pirserver_insert_desc(hs_cache_dir_descriptor_t *desc)
     len = DIGEST256_LEN + encoded_desc_len;
     *(uint32_t*)(hdr+9) = htonl(len);
 
-    res = hs_cache_pirserver_send(hdr, PIRSERVER_HDR_SIZE);
+    res = hs_cache_pirserver_send(hdr, PIRPROCESS_HDR_SIZE);
     if (res <= 0) return -1;
     written += res;
 
@@ -1280,7 +1060,7 @@ hs_cache_pirserver_get_params(dir_connection_t *conn)
     /* Ask the pirserver for the params */
 
     int res;
-    unsigned char hdr[PIRSERVER_HDR_SIZE];
+    unsigned char hdr[PIRPROCESS_HDR_SIZE];
 
     /* PIRONION TODO: For now, the request id is just literally the
      * dir_connection_t pointer itself, so that when we get the
@@ -1290,8 +1070,8 @@ hs_cache_pirserver_get_params(dir_connection_t *conn)
     memmove(hdr, (const char *)(&conn), sizeof(conn));
     hdr[8] = PIRSERVER_REQUEST_PARAMS;
     memmove(hdr+9, "\0\0\0\0", 4);
-    res = hs_cache_pirserver_send(hdr, PIRSERVER_HDR_SIZE);
-    if (res < PIRSERVER_HDR_SIZE) {
+    res = hs_cache_pirserver_send(hdr, PIRPROCESS_HDR_SIZE);
+    if (res < PIRPROCESS_HDR_SIZE) {
         return -1;
     }
     return 0;

+ 291 - 0
src/feature/hs/hs_pirprocess.c

@@ -0,0 +1,291 @@
+/**
+ * \file hs_pirprocess.c
+ * \brief Handle hidden service PIR helper processes.
+ **/
+
+#include "core/or/or.h"
+
+#include "feature/hs/hs_pirprocess.h"
+
+#define SUBPROCESS_PRIVATE
+#include "lib/process/env.h"
+#include "lib/process/subprocess.h"
+#include "lib/evloop/compat_libevent.h"
+
+#include <event2/event.h>
+
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+
+typedef enum {
+    PIRPROCESS_READSTATE_HEADER,
+    PIRPROCESS_READSTATE_BODY
+} PIRProcessReadState;
+
+struct pir_process_st {
+    process_handle_t *process;
+    buf_t *stdin_buf;
+    struct event *stdin_ev;
+    struct event *stdout_ev;
+    struct event *stderr_ev;
+    char *loglabel;
+    pir_process_msghandler_t msghandler;
+    PIRProcessReadState readstate;
+    size_t readoff, readleft;
+    unsigned char hdrbuf[PIRPROCESS_HDR_SIZE];
+    char *bodybuf;
+};
+
+/* This is called when the pir process has output for us. */
+static void
+pirprocess_stdoutcb(evutil_socket_t fd, short what, void *arg)
+{
+    pir_process_t handle = (pir_process_t)arg;
+
+    if (!(what & EV_READ)) {
+        /* Not sure why we're here */
+        return;
+    }
+
+    if (handle->readstate == PIRPROCESS_READSTATE_HEADER) {
+        int res = read(fd, handle->hdrbuf + handle->readoff, handle->readleft);
+        if (res <= 0) return;
+        handle->readoff += res;
+        handle->readleft -= res;
+        if (handle->readleft == 0) {
+            handle->readleft = ntohl(*(uint32_t*)
+                    (handle->hdrbuf+PIRPROCESS_HDR_SIZE-4));
+            tor_free(handle->bodybuf);
+            if (handle->readleft > 0) {
+                handle->bodybuf = tor_malloc(handle->readleft);
+                handle->readoff = 0;
+                handle->readstate = PIRPROCESS_READSTATE_BODY;
+            } else {
+                (handle->msghandler)(handle->hdrbuf, NULL, 0);
+                handle->readoff = 0;
+                handle->readleft = PIRPROCESS_HDR_SIZE;
+                handle->readstate = PIRPROCESS_READSTATE_HEADER;
+            }
+        }
+    } else if (handle->readstate == PIRPROCESS_READSTATE_BODY) {
+        int res = read(fd, handle->bodybuf + handle->readoff,
+                        handle->readleft);
+        if (res <= 0) return;
+        handle->readoff += res;
+        handle->readleft -= res;
+        if (handle->readleft == 0) {
+            /* Reading is complete */
+            (handle->msghandler)(handle->hdrbuf, handle->bodybuf,
+                handle->readoff);
+
+            /* Prepare for the next output from the PIR server */
+            tor_free(handle->bodybuf);
+            handle->readoff = 0;
+            handle->readleft = PIRPROCESS_HDR_SIZE;
+            handle->readstate = PIRPROCESS_READSTATE_HEADER;
+        }
+    }
+}
+
+/* This is called when the pir process is ready to read from its stdin. */
+static void
+pirprocess_stdincb(evutil_socket_t fd, short what, void *arg)
+{
+    pir_process_t handle = (pir_process_t)arg;
+    int res;
+    size_t bufsize = buf_datalen(handle->stdin_buf);
+    char *netbuf = NULL;
+
+    if (!(what & EV_WRITE)) {
+        /* Not sure why we're here */
+        log_info(LD_DIRSERV,"PIRSERVER bailing");
+        return;
+    }
+
+    if (bufsize == 0) {
+        log_err(LD_DIRSERV,"PIRSERVER trying to write 0-length buffer");
+        return;
+    }
+
+    netbuf = tor_malloc(bufsize);
+    if (netbuf == NULL) {
+        log_err(LD_DIRSERV,"PIRSERVER failed to allocate buffer");
+        return;
+    }
+
+    /* One might think that just calling buf_flush_to_socket would be
+     * the thing to do, but that function ends up calling sendto()
+     * instead of write(), which doesn't work on pipes.  So we do it
+     * more manually.  Using a bufferevent may be another option. */
+    buf_peek(handle->stdin_buf, netbuf, bufsize);
+    res = write(fd, netbuf, bufsize);
+    tor_free(netbuf);
+    if (res < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+        /* Try writing again later */
+        event_add(handle->stdin_ev, NULL);
+        return;
+    }
+    if (res <= 0) {
+        /* Stop trying to write. */
+        return;
+    }
+    buf_drain(handle->stdin_buf, res);
+    bufsize -= res;
+
+    if (bufsize > 0) {
+        /* There's more to write */
+        event_add(handle->stdin_ev, NULL);
+    }
+}
+
+/* This is called when the pir process writes something to its stderr. */
+static void
+pirprocess_stderrcb(evutil_socket_t fd, short what, void *arg)
+{
+    pir_process_t handle = (pir_process_t)arg;
+
+    if (!(what & EV_READ)) {
+        /* Not sure why we're here */
+        return;
+    }
+
+    char buf[1000];
+    int res = read(fd, buf, sizeof(buf)-1);
+    if (res <= 0) return;
+    buf[res] = '\0';
+    log_info(LD_DIRSERV,"%s %s", handle->loglabel, escaped(buf));
+}
+
+void
+hs_pirprocess_close(pir_process_t *handlep)
+{
+    pir_process_t handle;
+
+    if (!handlep) return;
+
+    if (!*handlep) return;
+
+    handle = *handlep;
+
+    if (handle->stdin_buf) {
+        buf_free(handle->stdin_buf);
+    }
+    if (handle->stdin_ev) {
+        event_free(handle->stdin_ev);
+    }
+    if (handle->stdout_ev) {
+        event_free(handle->stdout_ev);
+    }
+    if (handle->stderr_ev) {
+        event_free(handle->stderr_ev);
+    }
+    if (handle->process) {
+        tor_process_handle_destroy(handle->process, 1);
+    }
+    if (handle->loglabel) {
+        tor_free(handle->loglabel);
+    }
+    if (handle->bodybuf) {
+        tor_free(handle->bodybuf);
+    }
+    tor_free(handle);
+    *handlep = NULL;
+}
+
+void
+hs_pirprocess_poke(const char *path, const char *loglabel,
+    pir_process_msghandler_t msghandler, pir_process_t *handlep)
+{
+    int res;
+    smartlist_t *env_vars = get_current_process_environment_variables();
+    const char *argv[2];
+    process_environment_t *env;
+    pir_process_t handle;
+
+    if (!handlep) return;
+
+    if (*handlep) {
+        handle = *handlep;
+
+        if (handle->process &&
+                handle->process->status == PROCESS_STATUS_RUNNING) {
+            /* The PIR process appears to be open */
+            return;
+        }
+    }
+
+    /* Close everything we've got and start over */
+    hs_pirprocess_close(handlep);
+
+    if (!path) {
+        /* We don't have a configured PIR server */
+        return;
+    }
+
+    *handlep = tor_malloc_zero(sizeof(struct pir_process_st));
+    handle = *handlep;
+
+    /* Start the process */
+
+    argv[0] = path;
+    argv[1] = NULL;
+
+    env = process_environment_make(env_vars);
+
+    res = tor_spawn_background(path, argv, env, &(handle->process));
+
+    SMARTLIST_FOREACH(env_vars, void *, x, tor_free(x));
+    smartlist_free(env_vars);
+
+    if (res != PROCESS_STATUS_RUNNING) {
+        /* Launch failure */
+        hs_pirprocess_close(handlep);
+        return;
+    }
+
+    handle->loglabel = tor_strdup(loglabel);
+    handle->msghandler = msghandler;
+
+    /* Create a libevent event to listen to the PIR process' responses. */
+    handle->stdout_ev = event_new(tor_libevent_get_base(),
+        handle->process->stdout_pipe, EV_READ|EV_PERSIST,
+        pirprocess_stdoutcb, handle);
+    event_add(handle->stdout_ev, NULL);
+
+    /* And one to listen to the PIR server's stderr. */
+    handle->stderr_ev = event_new(tor_libevent_get_base(),
+        handle->process->stderr_pipe, EV_READ|EV_PERSIST,
+        pirprocess_stderrcb, handle);
+    event_add(handle->stderr_ev, NULL);
+
+    /* And one for writability to the pir process' stdin, but don't add
+     * it just yet.  Also create the buffer it will use. */
+    handle->stdin_buf = buf_new();
+    handle->stdin_ev = event_new(tor_libevent_get_base(),
+        handle->process->stdin_pipe, EV_WRITE, pirprocess_stdincb,
+        handle);
+
+    handle->readstate = PIRPROCESS_READSTATE_HEADER;
+    handle->readoff = 0;
+    handle->readleft = PIRPROCESS_HDR_SIZE;
+    handle->bodybuf = NULL;
+}
+
+int
+hs_pirprocess_send(pir_process_t handle, const unsigned char *buf,
+    size_t len)
+{
+    if (handle->process == NULL ||
+            handle->process->status != PROCESS_STATUS_RUNNING) {
+        /* We don't have a process to talk to */
+        return -1;
+    }
+
+    /* Write the data to the stdin buffer */
+    if (len > 0) {
+        buf_add(handle->stdin_buf, (const char *)buf, len);
+        event_add(handle->stdin_ev, NULL);
+    }
+    return len;
+}

+ 26 - 0
src/feature/hs/hs_pirprocess.h

@@ -0,0 +1,26 @@
+/**
+ * \file hs_cache.h
+ * \brief Header file for hs_cache.c
+ **/
+
+#ifndef TOR_HS_PIRPROCESS_H
+#define TOR_HS_PIRPROCESS_H
+
+#define PIRPROCESS_HDR_SIZE 13
+
+#include "lib/container/buffers.h"
+
+typedef void (*pir_process_msghandler_t)(const unsigned char *hdrbuf,
+        const char *bodybuf, size_t bodylen);
+
+typedef struct pir_process_st *pir_process_t;
+
+void hs_pirprocess_poke(const char *path, const char *loglabel,
+    pir_process_msghandler_t msghandler, pir_process_t *handlep);
+
+void hs_pirprocess_close(pir_process_t *handlep);
+
+int hs_pirprocess_send(pir_process_t handle, const unsigned char *buf,
+    size_t len);
+
+#endif /* !defined(TOR_HS_PIRPROCESS_H) */