123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383 |
- /**
- * \file hs_pirprocess.c
- * \brief Handle hidden service PIR helper processes.
- **/
- #include "core/or/or.h"
- #include "feature/hs/hs_pirprocess.h"
- #define SUBPROCESS_PRIVATE
- #include "lib/process/env.h"
- #include "lib/process/subprocess.h"
- #include "lib/evloop/compat_libevent.h"
- #include "feature/dircommon/dir_connection_st.h"
- #include <event2/event.h>
- #ifdef HAVE_UNISTD_H
- #include <unistd.h>
- #endif
- typedef enum {
- PIRPROCESS_READSTATE_HEADER,
- PIRPROCESS_READSTATE_BODY
- } PIRProcessReadState;
- struct pir_process_st {
- process_handle_t *process;
- buf_t *stdin_buf;
- buf_t *stderr_buf;
- struct event *stdin_ev;
- struct event *stdout_ev;
- struct event *stderr_ev;
- char *loglabel;
- pir_process_msghandler_t msghandler;
- PIRProcessReadState readstate;
- size_t readoff, readleft;
- unsigned char hdrbuf[PIRPROCESS_HDR_SIZE];
- char *bodybuf;
- };
- /* This is called when the pir process has output for us. */
- static void
- pirprocess_stdoutcb(evutil_socket_t fd, short what, void *arg)
- {
- pir_process_t handle = (pir_process_t)arg;
- if (!(what & EV_READ)) {
- /* Not sure why we're here */
- return;
- }
- if (handle->readstate == PIRPROCESS_READSTATE_HEADER) {
- int res = read(fd, handle->hdrbuf + handle->readoff, handle->readleft);
- if (res <= 0) return;
- handle->readoff += res;
- handle->readleft -= res;
- if (handle->readleft == 0) {
- handle->readleft = ntohl(*(uint32_t*)
- (handle->hdrbuf+PIRPROCESS_HDR_SIZE-4));
- tor_free(handle->bodybuf);
- if (handle->readleft > 0) {
- handle->bodybuf = tor_malloc(handle->readleft+1);
- handle->bodybuf[handle->readleft] = '\0';
- handle->readoff = 0;
- handle->readstate = PIRPROCESS_READSTATE_BODY;
- } else {
- (handle->msghandler)(handle->hdrbuf, NULL, 0);
- handle->readoff = 0;
- handle->readleft = PIRPROCESS_HDR_SIZE;
- handle->readstate = PIRPROCESS_READSTATE_HEADER;
- }
- }
- } else if (handle->readstate == PIRPROCESS_READSTATE_BODY) {
- int res = read(fd, handle->bodybuf + handle->readoff,
- handle->readleft);
- if (res <= 0) return;
- handle->readoff += res;
- handle->readleft -= res;
- if (handle->readleft == 0) {
- /* Reading is complete */
- (handle->msghandler)(handle->hdrbuf, handle->bodybuf,
- handle->readoff);
- /* Prepare for the next output from the PIR server */
- tor_free(handle->bodybuf);
- handle->readoff = 0;
- handle->readleft = PIRPROCESS_HDR_SIZE;
- handle->readstate = PIRPROCESS_READSTATE_HEADER;
- }
- }
- }
- /* This is called when the pir process is ready to read from its stdin. */
- static void
- pirprocess_stdincb(evutil_socket_t fd, short what, void *arg)
- {
- pir_process_t handle = (pir_process_t)arg;
- int res;
- size_t bufsize = buf_datalen(handle->stdin_buf);
- char *netbuf = NULL;
- if (!(what & EV_WRITE)) {
- /* Not sure why we're here */
- log_info(LD_DIRSERV,"PIRSERVER bailing");
- return;
- }
- if (bufsize == 0) {
- log_err(LD_DIRSERV,"PIRSERVER trying to write 0-length buffer");
- return;
- }
- netbuf = tor_malloc(bufsize);
- if (netbuf == NULL) {
- log_err(LD_DIRSERV,"PIRSERVER failed to allocate buffer");
- return;
- }
- /* One might think that just calling buf_flush_to_socket would be
- * the thing to do, but that function ends up calling sendto()
- * instead of write(), which doesn't work on pipes. So we do it
- * more manually. Using a bufferevent may be another option. */
- buf_peek(handle->stdin_buf, netbuf, bufsize);
- res = write(fd, netbuf, bufsize);
- tor_free(netbuf);
- if (res < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- /* Try writing again later */
- event_add(handle->stdin_ev, NULL);
- return;
- }
- if (res <= 0) {
- /* Stop trying to write. */
- return;
- }
- buf_drain(handle->stdin_buf, res);
- bufsize -= res;
- if (bufsize > 0) {
- /* There's more to write */
- event_add(handle->stdin_ev, NULL);
- }
- }
- /* This is called when the pir process writes something to its stderr. */
- static void
- pirprocess_stderrcb(evutil_socket_t fd, short what, void *arg)
- {
- pir_process_t handle = (pir_process_t)arg;
- if (!(what & EV_READ)) {
- /* Not sure why we're here */
- return;
- }
- /* Add the data to the stderr buffer */
- char buf[1000];
- int res = read(fd, buf, sizeof(buf)-1);
- if (res <= 0) return;
- buf[res] = '\0';
- buf_add(handle->stderr_buf, buf, res);
- /* Read lines from the stderr_buf */
- while (1) {
- size_t linesize = 0;
- res = buf_get_line(handle->stderr_buf, NULL, &linesize);
- if (res == 0) {
- /* No complete lines in the buffer */
- break;
- }
- char *linebuf = tor_malloc(linesize);
- buf_get_line(handle->stderr_buf, linebuf, &linesize);
- if (linesize > 0 && linebuf[linesize-1] == '\n') {
- linebuf[linesize-1] = '\0';
- }
- log_info(LD_DIRSERV,"%s %s", handle->loglabel, escaped(linebuf));
- tor_free(linebuf);
- }
- }
- void
- hs_pirprocess_close(pir_process_t *handlep)
- {
- pir_process_t handle;
- if (!handlep) return;
- if (!*handlep) return;
- handle = *handlep;
- if (handle->stdin_buf) {
- buf_free(handle->stdin_buf);
- }
- if (handle->stderr_buf) {
- buf_free(handle->stderr_buf);
- }
- if (handle->stdin_ev) {
- event_free(handle->stdin_ev);
- }
- if (handle->stdout_ev) {
- event_free(handle->stdout_ev);
- }
- if (handle->stderr_ev) {
- event_free(handle->stderr_ev);
- }
- if (handle->process) {
- tor_process_handle_destroy(handle->process, 1);
- }
- if (handle->loglabel) {
- tor_free(handle->loglabel);
- }
- if (handle->bodybuf) {
- tor_free(handle->bodybuf);
- }
- tor_free(handle);
- *handlep = NULL;
- }
- void
- hs_pirprocess_poke(const char *path, const char *loglabel,
- pir_process_msghandler_t msghandler, pir_process_t *handlep)
- {
- int res;
- smartlist_t *env_vars = get_current_process_environment_variables();
- const char *argv[2];
- process_environment_t *env;
- pir_process_t handle;
- if (!handlep) return;
- if (*handlep) {
- handle = *handlep;
- if (handle->process &&
- handle->process->status == PROCESS_STATUS_RUNNING) {
- /* The PIR process appears to be open */
- return;
- }
- }
- /* Close everything we've got and start over */
- hs_pirprocess_close(handlep);
- if (!path) {
- /* We don't have a configured PIR server */
- return;
- }
- *handlep = tor_malloc_zero(sizeof(struct pir_process_st));
- handle = *handlep;
- /* Start the process */
- argv[0] = path;
- argv[1] = NULL;
- env = process_environment_make(env_vars);
- res = tor_spawn_background(path, argv, env, &(handle->process));
- SMARTLIST_FOREACH(env_vars, void *, x, tor_free(x));
- smartlist_free(env_vars);
- if (res != PROCESS_STATUS_RUNNING) {
- /* Launch failure */
- hs_pirprocess_close(handlep);
- return;
- }
- handle->loglabel = tor_strdup(loglabel);
- handle->msghandler = msghandler;
- /* Create a libevent event to listen to the PIR process' responses. */
- handle->stdout_ev = event_new(tor_libevent_get_base(),
- handle->process->stdout_pipe, EV_READ|EV_PERSIST,
- pirprocess_stdoutcb, handle);
- event_add(handle->stdout_ev, NULL);
- /* And one to listen to the PIR server's stderr. */
- handle->stderr_ev = event_new(tor_libevent_get_base(),
- handle->process->stderr_pipe, EV_READ|EV_PERSIST,
- pirprocess_stderrcb, handle);
- event_add(handle->stderr_ev, NULL);
- /* And one for writability to the pir process' stdin, but don't add
- * it just yet. Also create the buffer it will use. */
- handle->stdin_buf = buf_new();
- handle->stderr_buf = buf_new();
- handle->stdin_ev = event_new(tor_libevent_get_base(),
- handle->process->stdin_pipe, EV_WRITE, pirprocess_stdincb,
- handle);
- handle->readstate = PIRPROCESS_READSTATE_HEADER;
- handle->readoff = 0;
- handle->readleft = PIRPROCESS_HDR_SIZE;
- handle->bodybuf = NULL;
- }
- int
- hs_pirprocess_send(pir_process_t handle, const unsigned char *buf,
- size_t len)
- {
- if (handle->process == NULL ||
- handle->process->status != PROCESS_STATUS_RUNNING) {
- /* We don't have a process to talk to */
- return -1;
- }
- /* Write the data to the stdin buffer */
- if (len > 0) {
- buf_add(handle->stdin_buf, (const char *)buf, len);
- event_add(handle->stdin_ev, NULL);
- }
- return len;
- }
- /* What we really want is a map from uint64_t to dir_connection_t*, but
- * we've got an implementation of char[20] to void*, which we'll
- * repurpose for this use. Note that the dir_connection_t* pointers in
- * this map are *not* owned by this map, and this map must not free
- * them. */
- static digestmap_t *pirprocess_reqid_map;
- /* The last-used request id */
- static uint64_t pirprocess_last_reqid;
- void
- hs_pirprocess_init(void)
- {
- pirprocess_reqid_map = digestmap_new();
- pirprocess_last_reqid = 0;
- }
- void
- hs_pirprocess_free_all(void)
- {
- digestmap_free(pirprocess_reqid_map, NULL);
- }
- uint64_t
- hs_pirprocess_alloc_reqid(dir_connection_t *dir_conn)
- {
- char mapbuf[DIGEST_LEN];
- uint64_t reqid = dir_conn->pirprocess_reqid;
- if (reqid > 0) {
- return reqid;
- }
- ++pirprocess_last_reqid;
- if (pirprocess_last_reqid == 0) {
- /* We wrapped a 64-bit integer?! */
- pirprocess_last_reqid = 1;
- }
- reqid = pirprocess_last_reqid;
- memset(mapbuf, 0, DIGEST_LEN);
- memmove(mapbuf, &reqid, sizeof(reqid));
- digestmap_set(pirprocess_reqid_map, mapbuf, dir_conn);
- dir_conn->pirprocess_reqid = reqid;
- return dir_conn->pirprocess_reqid;
- }
- void
- hs_pirprocess_dealloc_reqid(dir_connection_t *dir_conn)
- {
- char mapbuf[DIGEST_LEN];
- uint64_t reqid = dir_conn->pirprocess_reqid;
- if (reqid == 0) return;
- memset(mapbuf, 0, DIGEST_LEN);
- memmove(mapbuf, &reqid, sizeof(reqid));
- digestmap_remove(pirprocess_reqid_map, mapbuf);
- dir_conn->pirprocess_reqid = 0;
- }
- dir_connection_t *
- hs_pirprocess_lookup_reqid(uint64_t reqid)
- {
- char mapbuf[DIGEST_LEN];
- dir_connection_t *dir_conn;
- memset(mapbuf, 0, DIGEST_LEN);
- memmove(mapbuf, &reqid, sizeof(reqid));
- dir_conn = digestmap_get(pirprocess_reqid_map, mapbuf);
- return dir_conn;
- }
|