/** * \file hs_pirprocess.c * \brief Handle hidden service PIR helper processes. **/ #include "core/or/or.h" #include "feature/hs/hs_pirprocess.h" #define SUBPROCESS_PRIVATE #include "lib/process/env.h" #include "lib/process/subprocess.h" #include "lib/evloop/compat_libevent.h" #include "feature/dircommon/dir_connection_st.h" #include #ifdef HAVE_UNISTD_H #include #endif typedef enum { PIRPROCESS_READSTATE_HEADER, PIRPROCESS_READSTATE_BODY } PIRProcessReadState; struct pir_process_st { process_handle_t *process; buf_t *stdin_buf; buf_t *stderr_buf; struct event *stdin_ev; struct event *stdout_ev; struct event *stderr_ev; char *loglabel; pir_process_msghandler_t msghandler; PIRProcessReadState readstate; size_t readoff, readleft; unsigned char hdrbuf[PIRPROCESS_HDR_SIZE]; char *bodybuf; }; /* This is called when the pir process has output for us. */ static void pirprocess_stdoutcb(evutil_socket_t fd, short what, void *arg) { pir_process_t handle = (pir_process_t)arg; if (!(what & EV_READ)) { /* Not sure why we're here */ return; } if (handle->readstate == PIRPROCESS_READSTATE_HEADER) { int res = read(fd, handle->hdrbuf + handle->readoff, handle->readleft); if (res <= 0) { /* Tell the caller that the PIR process failed to send us any output. */ tor_free(handle->bodybuf); handle->readoff = 0; handle->readleft = PIRPROCESS_HDR_SIZE; handle->readstate = PIRPROCESS_READSTATE_HEADER; (handle->msghandler)(NULL, NULL, 0); return; } handle->readoff += res; handle->readleft -= res; if (handle->readleft == 0) { handle->readleft = ntohl(*(uint32_t*) (handle->hdrbuf+PIRPROCESS_HDR_SIZE-4)); tor_free(handle->bodybuf); if (handle->readleft > 0) { handle->bodybuf = tor_malloc(handle->readleft+1); handle->bodybuf[handle->readleft] = '\0'; handle->readoff = 0; handle->readstate = PIRPROCESS_READSTATE_BODY; } else { (handle->msghandler)(handle->hdrbuf, NULL, 0); handle->readoff = 0; handle->readleft = PIRPROCESS_HDR_SIZE; handle->readstate = PIRPROCESS_READSTATE_HEADER; } } } else if (handle->readstate == PIRPROCESS_READSTATE_BODY) { int res = read(fd, handle->bodybuf + handle->readoff, handle->readleft); if (res <= 0) { /* Tell the caller that the PIR process failed to send us any output. */ tor_free(handle->bodybuf); handle->readoff = 0; handle->readleft = PIRPROCESS_HDR_SIZE; handle->readstate = PIRPROCESS_READSTATE_HEADER; (handle->msghandler)(NULL, NULL, 0); return; } handle->readoff += res; handle->readleft -= res; if (handle->readleft == 0) { /* Reading is complete */ (handle->msghandler)(handle->hdrbuf, handle->bodybuf, handle->readoff); /* Prepare for the next output from the PIR server */ tor_free(handle->bodybuf); handle->readoff = 0; handle->readleft = PIRPROCESS_HDR_SIZE; handle->readstate = PIRPROCESS_READSTATE_HEADER; } } } /* This is called when the pir process is ready to read from its stdin. */ static void pirprocess_stdincb(evutil_socket_t fd, short what, void *arg) { pir_process_t handle = (pir_process_t)arg; int res; size_t bufsize = buf_datalen(handle->stdin_buf); char *netbuf = NULL; if (!(what & EV_WRITE)) { /* Not sure why we're here */ log_info(LD_DIRSERV,"PIRSERVER bailing"); return; } if (bufsize == 0) { log_err(LD_DIRSERV,"PIRSERVER trying to write 0-length buffer"); return; } netbuf = tor_malloc(bufsize); if (netbuf == NULL) { log_err(LD_DIRSERV,"PIRSERVER failed to allocate buffer"); return; } /* One might think that just calling buf_flush_to_socket would be * the thing to do, but that function ends up calling sendto() * instead of write(), which doesn't work on pipes. So we do it * more manually. Using a bufferevent may be another option. */ buf_peek(handle->stdin_buf, netbuf, bufsize); res = write(fd, netbuf, bufsize); tor_free(netbuf); if (res < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { /* Try writing again later */ event_add(handle->stdin_ev, NULL); return; } if (res <= 0) { /* Stop trying to write. */ return; } buf_drain(handle->stdin_buf, res); bufsize -= res; if (bufsize > 0) { /* There's more to write */ event_add(handle->stdin_ev, NULL); } } /* This is called when the pir process writes something to its stderr. */ static void pirprocess_stderrcb(evutil_socket_t fd, short what, void *arg) { pir_process_t handle = (pir_process_t)arg; if (!(what & EV_READ)) { /* Not sure why we're here */ return; } /* Add the data to the stderr buffer */ char buf[1000]; int res = read(fd, buf, sizeof(buf)-1); if (res <= 0) return; buf[res] = '\0'; buf_add(handle->stderr_buf, buf, res); /* Read lines from the stderr_buf */ while (1) { size_t linesize = 0; res = buf_get_line(handle->stderr_buf, NULL, &linesize); if (res == 0) { /* No complete lines in the buffer */ break; } char *linebuf = tor_malloc(linesize); buf_get_line(handle->stderr_buf, linebuf, &linesize); if (linesize > 0 && linebuf[linesize-1] == '\n') { linebuf[linesize-1] = '\0'; } log_info(LD_DIRSERV,"%s %s", handle->loglabel, escaped(linebuf)); tor_free(linebuf); } } void hs_pirprocess_close(pir_process_t *handlep) { pir_process_t handle; if (!handlep) return; if (!*handlep) return; handle = *handlep; if (handle->stdin_buf) { buf_free(handle->stdin_buf); } if (handle->stderr_buf) { buf_free(handle->stderr_buf); } if (handle->stdin_ev) { event_free(handle->stdin_ev); } if (handle->stdout_ev) { event_free(handle->stdout_ev); } if (handle->stderr_ev) { event_free(handle->stderr_ev); } if (handle->process) { tor_process_handle_destroy(handle->process, 1); } if (handle->loglabel) { tor_free(handle->loglabel); } if (handle->bodybuf) { tor_free(handle->bodybuf); } tor_free(handle); *handlep = NULL; } void hs_pirprocess_poke(const char *path, const char *loglabel, pir_process_msghandler_t msghandler, pir_process_t *handlep) { int res; smartlist_t *env_vars = get_current_process_environment_variables(); const char *argv[2]; process_environment_t *env; pir_process_t handle; if (!handlep) return; if (*handlep) { handle = *handlep; if (handle->process && handle->process->status == PROCESS_STATUS_RUNNING) { /* The PIR process appears to be open */ return; } } /* Close everything we've got and start over */ hs_pirprocess_close(handlep); if (!path) { /* We don't have a configured PIR server */ return; } *handlep = tor_malloc_zero(sizeof(struct pir_process_st)); handle = *handlep; /* Start the process */ argv[0] = path; argv[1] = NULL; env = process_environment_make(env_vars); res = tor_spawn_background(path, argv, env, &(handle->process)); SMARTLIST_FOREACH(env_vars, void *, x, tor_free(x)); smartlist_free(env_vars); if (res != PROCESS_STATUS_RUNNING) { /* Launch failure */ hs_pirprocess_close(handlep); return; } handle->loglabel = tor_strdup(loglabel); handle->msghandler = msghandler; /* Create a libevent event to listen to the PIR process' responses. */ handle->stdout_ev = event_new(tor_libevent_get_base(), handle->process->stdout_pipe, EV_READ|EV_PERSIST, pirprocess_stdoutcb, handle); event_add(handle->stdout_ev, NULL); /* And one to listen to the PIR server's stderr. */ handle->stderr_ev = event_new(tor_libevent_get_base(), handle->process->stderr_pipe, EV_READ|EV_PERSIST, pirprocess_stderrcb, handle); event_add(handle->stderr_ev, NULL); /* And one for writability to the pir process' stdin, but don't add * it just yet. Also create the buffer it will use. */ handle->stdin_buf = buf_new(); handle->stderr_buf = buf_new(); handle->stdin_ev = event_new(tor_libevent_get_base(), handle->process->stdin_pipe, EV_WRITE, pirprocess_stdincb, handle); handle->readstate = PIRPROCESS_READSTATE_HEADER; handle->readoff = 0; handle->readleft = PIRPROCESS_HDR_SIZE; handle->bodybuf = NULL; } int hs_pirprocess_send(pir_process_t handle, const unsigned char *buf, size_t len) { if (handle == NULL || handle->process == NULL || handle->process->status != PROCESS_STATUS_RUNNING) { /* We don't have a process to talk to */ return -1; } /* Write the data to the stdin buffer */ if (len > 0) { buf_add(handle->stdin_buf, (const char *)buf, len); event_add(handle->stdin_ev, NULL); } return len; } typedef struct { dir_connection_t *conn; pir_process_abort_fn abort_fn; } pirprocess_digestmap_entry_t; /* What we really want is a map from uint64_t to * pirprocess_digestmap_entry_t*, but we've got an implementation of * char[20] to void*, which we'll repurpose for this use. Note that the * dir_connection_t* pointers in this map are *not* owned by this map, * and this map must not free them. */ static digestmap_t *pirprocess_reqid_map; /* The last-used request id */ static uint64_t pirprocess_last_reqid; void hs_pirprocess_init(void) { pirprocess_reqid_map = digestmap_new(); pirprocess_last_reqid = 0; } static void entry_abort_free(void *p) { pirprocess_digestmap_entry_t *entry = (pirprocess_digestmap_entry_t *)p; if (entry->abort_fn) { (entry->abort_fn)(entry->conn); } tor_free(entry); } void hs_pirprocess_abort_all(void) { digestmap_free(pirprocess_reqid_map, entry_abort_free); hs_pirprocess_init(); } void hs_pirprocess_free_all(void) { digestmap_free(pirprocess_reqid_map, tor_free_); } uint64_t hs_pirprocess_alloc_reqid(dir_connection_t *dir_conn, pir_process_abort_fn abort_fn) { char mapbuf[DIGEST_LEN]; pirprocess_digestmap_entry_t *entry; uint64_t reqid = dir_conn->pirprocess_reqid; if (reqid > 0) { return reqid; } ++pirprocess_last_reqid; if (pirprocess_last_reqid == 0) { /* We wrapped a 64-bit integer?! */ pirprocess_last_reqid = 1; } reqid = pirprocess_last_reqid; memset(mapbuf, 0, DIGEST_LEN); memmove(mapbuf, &reqid, sizeof(reqid)); entry = tor_malloc(sizeof(*entry)); entry->conn = dir_conn; entry->abort_fn = abort_fn; digestmap_set(pirprocess_reqid_map, mapbuf, entry); dir_conn->pirprocess_reqid = reqid; return dir_conn->pirprocess_reqid; } void hs_pirprocess_dealloc_reqid(dir_connection_t *dir_conn) { char mapbuf[DIGEST_LEN]; uint64_t reqid = dir_conn->pirprocess_reqid; pirprocess_digestmap_entry_t *entry; if (reqid == 0) return; memset(mapbuf, 0, DIGEST_LEN); memmove(mapbuf, &reqid, sizeof(reqid)); entry = digestmap_remove(pirprocess_reqid_map, mapbuf); tor_free(entry); dir_conn->pirprocess_reqid = 0; } dir_connection_t * hs_pirprocess_lookup_reqid(uint64_t reqid) { char mapbuf[DIGEST_LEN]; pirprocess_digestmap_entry_t *entry; memset(mapbuf, 0, DIGEST_LEN); memmove(mapbuf, &reqid, sizeof(reqid)); entry = digestmap_get(pirprocess_reqid_map, mapbuf); return entry->conn; }