/* Copyright (C) 2014 Stony Brook University This file is part of Graphene Library OS. Graphene Library OS is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. Graphene Library OS is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ /* * shim_ipc_helper.c * * This file contains code to create an IPC helper thread inside library OS and maintain bookkeeping * of IPC ports. */ #include #include #include #include #include #include #include #include #include #include #define IPC_HELPER_STACK_SIZE (g_pal_alloc_align * 4) static struct shim_lock ipc_port_mgr_lock; #define SYSTEM_LOCK() lock(&ipc_port_mgr_lock) #define SYSTEM_UNLOCK() unlock(&ipc_port_mgr_lock) #define SYSTEM_LOCKED() locked(&ipc_port_mgr_lock) #define PORT_MGR_ALLOC 32 #define OBJ_TYPE struct shim_ipc_port #include "memmgr.h" static MEM_MGR port_mgr; DEFINE_LISTP(shim_ipc_port); static LISTP_TYPE(shim_ipc_port) port_list; static enum { HELPER_NOTALIVE, HELPER_ALIVE } ipc_helper_state; static struct shim_thread* ipc_helper_thread; static struct shim_lock ipc_helper_lock; static AEVENTTYPE install_new_event; static int create_ipc_helper(void); static int ipc_resp_callback(struct shim_ipc_msg* msg, struct shim_ipc_port* port); static ipc_callback ipc_callbacks[IPC_CODE_NUM] = { /* RESP */ &ipc_resp_callback, /* CHECKPOINT */ &ipc_checkpoint_callback, /* parents and children */ /* CLD_EXIT */ &ipc_cld_exit_callback, #ifdef PROFILE /* CLD_PROFILE */ &ipc_cld_profile_callback, #endif /* pid namespace */ IPC_NS_CALLBACKS(pid) /* PID_KILL */ &ipc_pid_kill_callback, /* PID_GETSTATUS */ &ipc_pid_getstatus_callback, /* PID_RETSTATUS */ &ipc_pid_retstatus_callback, /* PID_GETMETA */ &ipc_pid_getmeta_callback, /* PID_RETMETA */ &ipc_pid_retmeta_callback, /* PID_NOP */ &ipc_pid_nop_callback, /* PID_SENDRPC */ &ipc_pid_sendrpc_callback, /* sysv namespace */ IPC_NS_CALLBACKS(sysv) IPC_NS_KEY_CALLBACKS(sysv) /* SYSV_DELRES */ &ipc_sysv_delres_callback, /* SYSV_MOVRES */ &ipc_sysv_movres_callback, /* SYSV_MSGSND */ &ipc_sysv_msgsnd_callback, /* SYSV_MSGRCV */ &ipc_sysv_msgrcv_callback, /* SYSV_MSGMOV */ &ipc_sysv_msgmov_callback, /* SYSV_SEMOP */ &ipc_sysv_semop_callback, /* SYSV_SEMCTL */ &ipc_sysv_semctl_callback, /* SYSV_SEMRET */ &ipc_sysv_semret_callback, /* SYSV_SEMMOV */ &ipc_sysv_semmov_callback, }; static int init_self_ipc_port(void) { lock(&cur_process.lock); if (!cur_process.self) { /* very first process or clone/fork case: create IPC port from scratch */ cur_process.self = create_ipc_info_cur_process(/*is_self_ipc_info=*/true); if (!cur_process.self) { unlock(&cur_process.lock); return -EACCES; } } else { /* execve case: inherited IPC port from parent process */ assert(cur_process.self->pal_handle && !qstrempty(&cur_process.self->uri)); add_ipc_port_by_id(cur_process.self->vmid, cur_process.self->pal_handle, IPC_PORT_SERVER, /*fini=*/NULL, &cur_process.self->port); } unlock(&cur_process.lock); return 0; } static int init_parent_ipc_port(void) { if (!PAL_CB(parent_process) || !cur_process.parent) { /* no parent process, no sense in creating parent IPC port */ return 0; } lock(&cur_process.lock); assert(cur_process.parent && cur_process.parent->vmid); /* for execve case, my parent is the parent of my parent (current process transparently inherits * the "real" parent through already opened pal_handle on "temporary" parent's * cur_process.parent) */ if (!cur_process.parent->pal_handle) { /* for clone/fork case, parent is connected on parent_process */ cur_process.parent->pal_handle = PAL_CB(parent_process); } add_ipc_port_by_id(cur_process.parent->vmid, cur_process.parent->pal_handle, IPC_PORT_DIRPRT | IPC_PORT_LISTEN, /*fini=*/NULL, &cur_process.parent->port); unlock(&cur_process.lock); return 0; } static int init_ns_ipc_port(int ns_idx) { if (!cur_process.ns[ns_idx]) { /* no NS info from parent process, no sense in creating NS IPC port */ return 0; } if (!cur_process.ns[ns_idx]->pal_handle && qstrempty(&cur_process.ns[ns_idx]->uri)) { /* there is no connection to NS leader via PAL handle and there is no URI to find NS leader: * do not create NS IPC port now, it will be created on-demand during NS leader lookup */ return 0; } lock(&cur_process.lock); if (!cur_process.ns[ns_idx]->pal_handle) { debug("Reconnecting IPC port %s\n", qstrgetstr(&cur_process.ns[ns_idx]->uri)); cur_process.ns[ns_idx]->pal_handle = DkStreamOpen(qstrgetstr(&cur_process.ns[ns_idx]->uri), 0, 0, 0, 0); if (!cur_process.ns[ns_idx]->pal_handle) { unlock(&cur_process.lock); return -PAL_ERRNO; } } IDTYPE type = (ns_idx == PID_NS) ? IPC_PORT_PIDLDR : IPC_PORT_SYSVLDR; add_ipc_port_by_id(cur_process.ns[ns_idx]->vmid, cur_process.ns[ns_idx]->pal_handle, type | IPC_PORT_LISTEN, /*fini=*/NULL, &cur_process.ns[ns_idx]->port); unlock(&cur_process.lock); return 0; } int init_ipc_ports(void) { if (!create_lock(&ipc_port_mgr_lock)) { return -ENOMEM; } if (!(port_mgr = create_mem_mgr(init_align_up(PORT_MGR_ALLOC)))) return -ENOMEM; int ret; if ((ret = init_self_ipc_port()) < 0) return ret; if ((ret = init_parent_ipc_port()) < 0) return ret; if ((ret = init_ns_ipc_port(PID_NS)) < 0) return ret; if ((ret = init_ns_ipc_port(SYSV_NS)) < 0) return ret; return 0; } int init_ipc_helper(void) { /* early enough in init, can write global vars without the lock */ ipc_helper_state = HELPER_NOTALIVE; if (!create_lock(&ipc_helper_lock)) { return -ENOMEM; } create_event(&install_new_event); /* some IPC ports were already added before this point, so spawn IPC helper thread (and enable * locking mechanisms if not done already since we are going in multi-threaded mode) */ enable_locking(); lock(&ipc_helper_lock); int ret = create_ipc_helper(); unlock(&ipc_helper_lock); return ret; } static struct shim_ipc_port* __create_ipc_port(PAL_HANDLE hdl) { struct shim_ipc_port* port = get_mem_obj_from_mgr_enlarge(port_mgr, size_align_up(PORT_MGR_ALLOC)); if (!port) return NULL; memset(port, 0, sizeof(struct shim_ipc_port)); port->pal_handle = hdl; INIT_LIST_HEAD(port, list); INIT_LISTP(&port->msgs); REF_SET(port->ref_count, 0); if (!create_lock(&port->msgs_lock)) { free_mem_obj_to_mgr(port_mgr, port); return NULL; } return port; } static void __free_ipc_port(struct shim_ipc_port* port) { assert(locked(&ipc_helper_lock)); if (port->pal_handle) { DkObjectClose(port->pal_handle); port->pal_handle = NULL; } destroy_lock(&port->msgs_lock); free_mem_obj_to_mgr(port_mgr, port); } static void __get_ipc_port(struct shim_ipc_port* port) { REF_INC(port->ref_count); } static void __put_ipc_port(struct shim_ipc_port* port) { assert(locked(&ipc_helper_lock)); int ref_count = REF_DEC(port->ref_count); if (!ref_count) __free_ipc_port(port); } void get_ipc_port(struct shim_ipc_port* port) { /* no need to grab ipc_helper_lock because __get_ipc_port() does not touch global state */ __get_ipc_port(port); } void put_ipc_port(struct shim_ipc_port* port) { /* this is atomic so we don't grab lock in common case of ref_count > 0 */ int ref_count = REF_DEC(port->ref_count); if (!ref_count) { lock(&ipc_helper_lock); __free_ipc_port(port); unlock(&ipc_helper_lock); } } static void __add_ipc_port(struct shim_ipc_port* port, IDTYPE vmid, IDTYPE type, port_fini fini) { assert(locked(&ipc_helper_lock)); port->type |= type; if (vmid && !port->vmid) port->vmid = vmid; /* find empty slot in fini callbacks and register callback */ if (fini) { bool found_empty_slot = false; __UNUSED(found_empty_slot); for (int i = 0; i < MAX_IPC_PORT_FINI_CB; i++) if (!port->fini[i] || port->fini[i] == fini) { port->fini[i] = fini; found_empty_slot = true; break; } assert(found_empty_slot); } /* add to port list if not there already */ if (LIST_EMPTY(port, list)) { __get_ipc_port(port); LISTP_ADD(port, &port_list, list); } /* wake up IPC helper thread so that it picks up added port */ if (ipc_helper_state == HELPER_ALIVE) set_event(&install_new_event, 1); } static void __del_ipc_port(struct shim_ipc_port* port) { assert(locked(&ipc_helper_lock)); debug("Deleting port %p (handle %p) of process %u\n", port, port->pal_handle, port->vmid & 0xFFFF); DkStreamDelete(port->pal_handle, 0); LISTP_DEL_INIT(port, &port_list, list); /* Check for pending messages on port (threads might be blocking for responses) */ lock(&port->msgs_lock); struct shim_ipc_msg_duplex* msg; struct shim_ipc_msg_duplex* tmp; LISTP_FOR_EACH_ENTRY_SAFE(msg, tmp, &port->msgs, list) { LISTP_DEL_INIT(msg, &port->msgs, list); msg->retval = -ECONNRESET; if (msg->thread) { debug("Deleted pending message on port %p, wake up blocking thread %d\n", port, msg->thread->tid); thread_wakeup(msg->thread); } } unlock(&port->msgs_lock); __put_ipc_port(port); /* wake up IPC helper thread so that it forgets about deleted port */ if (ipc_helper_state == HELPER_ALIVE) set_event(&install_new_event, 1); } void add_ipc_port(struct shim_ipc_port* port, IDTYPE vmid, IDTYPE type, port_fini fini) { debug("Adding port %p (handle %p) for process %u (type=%04x)\n", port, port->pal_handle, port->vmid & 0xFFFF, type); lock(&ipc_helper_lock); __add_ipc_port(port, vmid, type, fini); unlock(&ipc_helper_lock); } void add_ipc_port_by_id(IDTYPE vmid, PAL_HANDLE hdl, IDTYPE type, port_fini fini, struct shim_ipc_port** portptr) { debug("Adding port (handle %p) for process %u (type %04x)\n", hdl, vmid & 0xFFFF, type); struct shim_ipc_port* port = NULL; if (portptr) *portptr = NULL; assert(hdl); lock(&ipc_helper_lock); /* check if port with this PAL handle already exists, then we only need to update its vmid, * type, and fini callback */ struct shim_ipc_port* tmp; LISTP_FOR_EACH_ENTRY(tmp, &port_list, list) { if (tmp->pal_handle == hdl) { port = tmp; break; } } if (!port) { /* port does not yet exist, create it */ port = __create_ipc_port(hdl); if (!port) { debug("Failed to create IPC port for handle %p\n", hdl); goto out; } } /* add/update port */ __add_ipc_port(port, vmid, type, fini); if (portptr) { __get_ipc_port(port); *portptr = port; } out: unlock(&ipc_helper_lock); } void del_ipc_port_fini(struct shim_ipc_port* port, unsigned int exitcode) { lock(&ipc_helper_lock); if (LIST_EMPTY(port, list)) { unlock(&ipc_helper_lock); return; } /* prevent __del_ipc_port() from freeing port since we need it for fini callbacks */ __get_ipc_port(port); __del_ipc_port(port); unlock(&ipc_helper_lock); for (int i = 0; i < MAX_IPC_PORT_FINI_CB; i++) if (port->fini[i]) { (port->fini[i])(port, port->vmid, exitcode); port->fini[i] = NULL; } put_ipc_port(port); } void del_all_ipc_ports(void) { lock(&ipc_helper_lock); struct shim_ipc_port* port; struct shim_ipc_port* tmp; LISTP_FOR_EACH_ENTRY_SAFE(port, tmp, &port_list, list) { __del_ipc_port(port); } unlock(&ipc_helper_lock); } struct shim_ipc_port* lookup_ipc_port(IDTYPE vmid, IDTYPE type) { struct shim_ipc_port* port = NULL; assert(vmid && type); lock(&ipc_helper_lock); struct shim_ipc_port* tmp; LISTP_FOR_EACH_ENTRY(tmp, &port_list, list) { if (tmp->vmid == vmid && (tmp->type & type)) { debug("Found port %p (handle %p) for process %u (type %04x)\n", tmp, tmp->pal_handle, tmp->vmid & 0xFFFF, tmp->type); port = tmp; __get_ipc_port(port); break; } } unlock(&ipc_helper_lock); return port; } #define PORTS_ON_STACK_CNT 32 int broadcast_ipc(struct shim_ipc_msg* msg, int target_type, struct shim_ipc_port* exclude_port) { int ret; struct shim_ipc_port* port; struct shim_ipc_port** target_ports; size_t target_ports_cnt = 0; assert(target_type); lock(&ipc_helper_lock); /* Collect all ports with appropriate types. In common case, stack-allocated array of * PORTS_ON_STACK_CNT ports is enough. If there are more ports, we will allocate a bigger array * on the heap and collect all ports again. */ struct shim_ipc_port* target_ports_stack[PORTS_ON_STACK_CNT]; LISTP_FOR_EACH_ENTRY(port, &port_list, list) { if (port == exclude_port) continue; if (port->type & target_type) { if (target_ports_cnt < PORTS_ON_STACK_CNT) target_ports_stack[target_ports_cnt] = port; target_ports_cnt++; } } target_ports = target_ports_stack; if (target_ports_cnt > PORTS_ON_STACK_CNT) { /* Rare case when there are more than PORTS_ON_STACK_CNT ports. Allocate big-enough array on * the heap and collect all ports again. */ size_t cnt = 0; struct shim_ipc_port** target_ports_heap = malloc(sizeof(struct shim_ipc_port*) * target_ports_cnt); if (!target_ports_heap) { unlock(&ipc_helper_lock); debug("Allocation of target_ports_heap failed\n"); return -ENOMEM; } LISTP_FOR_EACH_ENTRY(port, &port_list, list) { if (port == exclude_port) continue; if (port->type & target_type) target_ports_heap[cnt++] = port; } target_ports = target_ports_heap; assert(cnt == target_ports_cnt); } for (size_t i = 0; i < target_ports_cnt; i++) __get_ipc_port(target_ports[i]); unlock(&ipc_helper_lock); /* send msg to each collected port (note that ports cannot be freed in meantime) */ for (size_t i = 0; i < target_ports_cnt; i++) { port = target_ports[i]; debug("Broadcast to port %p (handle %p) for process %u (type %x, target %x)\n", port, port->pal_handle, port->vmid & 0xFFFF, port->type, target_type); msg->dst = port->vmid; ret = send_ipc_message(msg, port); if (ret < 0) { debug("Broadcast to port %p (handle %p) for process %u failed (errno = %d)!\n", port, port->pal_handle, port->vmid & 0xFFFF, ret); goto out; } } ret = 0; out: for (size_t i = 0; i < target_ports_cnt; i++) put_ipc_port(target_ports[i]); if (target_ports != target_ports_stack) free(target_ports); return ret; } static int ipc_resp_callback(struct shim_ipc_msg* msg, struct shim_ipc_port* port) { struct shim_ipc_resp* resp = (struct shim_ipc_resp*)&msg->msg; debug("IPC callback from %u: IPC_RESP(%d)\n", msg->src & 0xFFFF, resp->retval); if (!msg->seq) return resp->retval; /* find a corresponding request msg for this response msg */ struct shim_ipc_msg_duplex* req_msg = pop_ipc_msg_duplex(port, msg->seq); /* if some thread is waiting for response, wake it with response retval */ if (req_msg) { req_msg->retval = resp->retval; if (req_msg->thread) thread_wakeup(req_msg->thread); return 0; } return resp->retval; } int send_response_ipc_message(struct shim_ipc_port* port, IDTYPE dest, int ret, unsigned long seq) { ret = (ret == RESPONSE_CALLBACK) ? 0 : ret; /* create IPC_RESP msg to send to dest, with sequence number seq, and in-body retval ret */ size_t total_msg_size = get_ipc_msg_size(sizeof(struct shim_ipc_resp)); struct shim_ipc_msg* resp_msg = __alloca(total_msg_size); init_ipc_msg(resp_msg, IPC_RESP, total_msg_size, dest); resp_msg->seq = seq; struct shim_ipc_resp* resp = (struct shim_ipc_resp*)resp_msg->msg; resp->retval = ret; debug("IPC send to %u: IPC_RESP(%d)\n", resp_msg->dst & 0xFFFF, ret); return send_ipc_message(resp_msg, port); } static int receive_ipc_message(struct shim_ipc_port* port) { int ret; size_t readahead = IPC_MSG_MINIMAL_SIZE * 2; size_t bufsize = IPC_MSG_MINIMAL_SIZE + readahead; struct shim_ipc_msg* msg = malloc(bufsize); if (!msg) { return -ENOMEM; } size_t expected_size = IPC_MSG_MINIMAL_SIZE; size_t bytes = 0; do { while (bytes < expected_size) { /* grow msg buffer to accomodate bigger messages */ if (expected_size + readahead > bufsize) { while (expected_size + readahead > bufsize) bufsize *= 2; void* tmp_buf = malloc(bufsize); if (!tmp_buf) { ret = -ENOMEM; goto out; } memcpy(tmp_buf, msg, bytes); free(msg); msg = tmp_buf; } PAL_NUM read = DkStreamRead(port->pal_handle, /*offset=*/0, expected_size - bytes + readahead, (void*)msg + bytes, NULL, 0); if (read == PAL_STREAM_ERROR) { if (PAL_ERRNO == EINTR || PAL_ERRNO == EAGAIN || PAL_ERRNO == EWOULDBLOCK) continue; debug("Port %p (handle %p) closed while receiving IPC message\n", port, port->pal_handle); del_ipc_port_fini(port, -ECHILD); ret = -PAL_ERRNO; goto out; } bytes += read; /* extract actual msg size from msg header and continue reading msg body */ if (bytes >= IPC_MSG_MINIMAL_SIZE) expected_size = msg->size; } debug( "Received IPC message from port %p (handle %p): code=%d size=%lu " "src=%u dst=%u seq=%lx\n", port, port->pal_handle, msg->code, msg->size, msg->src & 0xFFFF, msg->dst & 0xFFFF, msg->seq); /* skip messages coming from myself (in case of broadcast) */ if (msg->src != cur_process.vmid) { if (msg->code < IPC_CODE_NUM && ipc_callbacks[msg->code]) { /* invoke callback to this msg */ ret = (*ipc_callbacks[msg->code])(msg, port); if ((ret < 0 || ret == RESPONSE_CALLBACK) && msg->seq) { /* send IPC_RESP message to sender of this msg */ ret = send_response_ipc_message(port, msg->src, ret, msg->seq); if (ret < 0) { debug("Sending IPC_RESP msg on port %p (handle %p) to %u failed\n", port, port->pal_handle, msg->src & 0xFFFF); ret = -PAL_ERRNO; goto out; } } } } bytes -= expected_size; /* one message was received and handled */ if (bytes > 0) { /* we may have started reading the next message, move this message to beginning of msg * buffer and reset expected size */ memmove(msg, (void*)msg + expected_size, bytes); expected_size = IPC_MSG_MINIMAL_SIZE; if (bytes >= IPC_MSG_MINIMAL_SIZE) expected_size = msg->size; } } while (bytes > 0); ret = 0; out: free(msg); return ret; } /* Main routine of the IPC helper thread. IPC helper thread is spawned when the first IPC port is * added and is terminated only when the whole Graphene application terminates. IPC helper thread * runs in an endless loop and waits on port events (either the addition/removal of ports or actual * port events: acceptance of new client or receiving/sending messages). In particular, IPC helper * thread calls receive_ipc_message() if a message arrives on port. * * Other threads add and remove IPC ports via add_ipc_xxx() and del_ipc_xxx() functions. These ports * are added to port_list which the IPC helper thread consults before each DkStreamsWaitEvents(). * * Note that ports are copied from global port_list to local object_list. This is because ports may * be removed from port_list by other threads while IPC helper thread is waiting on * DkStreamsWaitEvents(). For this reason IPC thread also get references to all current ports and * puts them after handling all ports in object_list. * * Previous implementation went to great lengths to keep changes to the list of current ports to a * minimum (instead of repopulating the list before each wait like in current code). Unfortunately, * this resulted in undue complexity. Current implementation should perform fine for usual case of * <100 IPC ports and with IPC helper thread always running in background on its own core. */ noreturn static void shim_ipc_helper(void* dummy) { __UNUSED(dummy); struct shim_thread* self = get_cur_thread(); /* Initialize two lists: * - `ports` collects IPC port objects and is the main list we process here * - `pals` collects PAL handles of IPC port objects; always contains install_new_event */ size_t ports_cnt = 0; size_t ports_max_cnt = 32; struct shim_ipc_port** ports = malloc(sizeof(*ports) * ports_max_cnt); if (!ports) { debug("shim_ipc_helper: allocation of ports failed\n"); goto out_err; } PAL_HANDLE* pals = malloc(sizeof(*pals) * (1 + ports_max_cnt)); if (!pals) { debug("shim_ipc_helper: allocation of pals failed\n"); goto out_err; } /* allocate one memory region to hold two PAL_FLG arrays: events and revents */ PAL_FLG* pal_events = malloc(sizeof(*pal_events) * (1 + ports_max_cnt) * 2); if (!pal_events) { debug("shim_ipc_helper: allocation of pal_events failed\n"); goto out_err; } PAL_FLG* ret_events = pal_events + 1 + ports_max_cnt; PAL_HANDLE install_new_event_pal = event_handle(&install_new_event); pals[0] = install_new_event_pal; pal_events[0] = PAL_WAIT_READ; ret_events[0] = 0; while (true) { lock(&ipc_helper_lock); if (ipc_helper_state != HELPER_ALIVE) { ipc_helper_thread = NULL; unlock(&ipc_helper_lock); break; } /* iterate through all known ports from `port_list` to repopulate `ports` */ ports_cnt = 0; struct shim_ipc_port* port; struct shim_ipc_port* tmp; LISTP_FOR_EACH_ENTRY_SAFE(port, tmp, &port_list, list) { /* get port reference so it is not freed while we wait on/handle it */ __get_ipc_port(port); if (ports_cnt == ports_max_cnt) { /* grow `ports` and `pals` to accommodate more objects */ struct shim_ipc_port** tmp_ports = malloc(sizeof(*tmp_ports) * ports_max_cnt * 2); if (!tmp_ports) { debug("shim_ipc_helper: allocation of tmp_ports failed\n"); goto out_err_unlock; } PAL_HANDLE* tmp_pals = malloc(sizeof(*tmp_pals) * (1 + ports_max_cnt * 2)); if (!tmp_pals) { debug("shim_ipc_helper: allocation of tmp_pals failed\n"); goto out_err_unlock; } PAL_FLG* tmp_pal_events = malloc(sizeof(*tmp_pal_events) * (2 + ports_max_cnt * 4)); if (!tmp_pal_events) { debug("shim_ipc_helper: allocation of tmp_pal_events failed\n"); goto out_err_unlock; } PAL_FLG* tmp_ret_events = tmp_pal_events + 1 + ports_max_cnt * 2; memcpy(tmp_ports, ports, sizeof(*tmp_ports) * ports_max_cnt); memcpy(tmp_pals, pals, sizeof(*tmp_pals) * (1 + ports_max_cnt)); memcpy(tmp_pal_events, pal_events, sizeof(*tmp_pal_events) * (1 + ports_max_cnt)); memcpy(tmp_ret_events, ret_events, sizeof(*tmp_ret_events) * (1 + ports_max_cnt)); ports_max_cnt *= 2; free(ports); free(pals); free(pal_events); ports = tmp_ports; pals = tmp_pals; pal_events = tmp_pal_events; ret_events = tmp_ret_events; } /* re-add this port to ports/pals/events */ ports[ports_cnt] = port; pals[ports_cnt + 1] = port->pal_handle; pal_events[ports_cnt + 1] = PAL_WAIT_READ; ret_events[ports_cnt + 1] = 0; ports_cnt++; debug("Listening to process %u on port %p (handle %p, type %04x)\n", port->vmid & 0xFFFF, port, port->pal_handle, port->type); } unlock(&ipc_helper_lock); /* wait on collected ports' PAL handles + install_new_event_pal */ PAL_BOL polled = DkStreamsWaitEvents(ports_cnt + 1, pals, pal_events, ret_events, NO_TIMEOUT); for (size_t i = 0; polled && i < ports_cnt + 1; i++) { if (ret_events[i]) { if (pals[i] == install_new_event_pal) { /* some thread wants to install new event; this event is found in `ports`, so * just re-init install_new_event */ debug("New IPC event was requested (port was added/removed)\n"); clear_event(&install_new_event); continue; } /* it is not install_new_event handle, so must be one of ports */ assert(i > 0); struct shim_ipc_port* polled_port = ports[i - 1]; assert(polled_port); if (polled_port->type & IPC_PORT_SERVER) { /* server port: accept client, create client port, and add it to port list */ PAL_HANDLE client = DkStreamWaitForClient(polled_port->pal_handle); if (client) { /* type of client port is the same as original server port but with LISTEN * (for remote client) and without SERVER (doesn't wait for new clients) */ IDTYPE client_type = (polled_port->type & ~IPC_PORT_SERVER) | IPC_PORT_LISTEN; add_ipc_port_by_id(polled_port->vmid, client, client_type, NULL, NULL); } else { debug("Port %p (handle %p) was removed during accepting client\n", polled_port, polled_port->pal_handle); del_ipc_port_fini(polled_port, -ECHILD); } } else { PAL_STREAM_ATTR attr; if (DkStreamAttributesQueryByHandle(polled_port->pal_handle, &attr)) { /* can read on this port, so receive messages */ if (attr.readable) { /* NOTE: IPC helper thread does not handle failures currently */ receive_ipc_message(polled_port); } if (attr.disconnected) { debug("Port %p (handle %p) disconnected\n", polled_port, polled_port->pal_handle); del_ipc_port_fini(polled_port, -ECONNRESET); } } else { debug("Port %p (handle %p) was removed during attr querying\n", polled_port, polled_port->pal_handle); del_ipc_port_fini(polled_port, -PAL_ERRNO); } } } } /* done handling ports; put their references so they can be freed */ for (size_t i = 0; i < ports_cnt; i++) put_ipc_port(ports[i]); } free(ports); free(pals); free(pal_events); __disable_preempt(self->shim_tcb); put_thread(self); debug("IPC helper thread terminated\n"); DkThreadExit(/*clear_child_tid=*/NULL); out_err_unlock: unlock(&ipc_helper_lock); out_err: debug("Terminating the process due to a fatal error in ipc helper\n"); put_thread(self); DkProcessExit(1); } static void shim_ipc_helper_prepare(void* arg) { struct shim_thread* self = (struct shim_thread*)arg; if (!arg) return; shim_tcb_init(); set_cur_thread(self); update_fs_base(0); debug_setbuf(shim_get_tcb(), true); lock(&ipc_helper_lock); bool notme = (self != ipc_helper_thread); unlock(&ipc_helper_lock); void* stack = allocate_stack(IPC_HELPER_STACK_SIZE, g_pal_alloc_align, false); if (notme || !stack) { free(stack); put_thread(self); DkThreadExit(/*clear_child_tid=*/NULL); return; } debug("IPC helper thread started\n"); /* swap stack to be sure we don't drain the small stack PAL provides */ self->stack_top = stack + IPC_HELPER_STACK_SIZE; self->stack = stack; __SWITCH_STACK(self->stack_top, shim_ipc_helper, NULL); } /* this should be called with the ipc_helper_lock held */ static int create_ipc_helper(void) { assert(locked(&ipc_helper_lock)); if (ipc_helper_state == HELPER_ALIVE) return 0; struct shim_thread* new = get_new_internal_thread(); if (!new) return -ENOMEM; ipc_helper_thread = new; ipc_helper_state = HELPER_ALIVE; PAL_HANDLE handle = thread_create(shim_ipc_helper_prepare, new); if (!handle) { int ret = -PAL_ERRNO; /* put_thread() may overwrite errno */ ipc_helper_thread = NULL; ipc_helper_state = HELPER_NOTALIVE; put_thread(new); return ret; } new->pal_handle = handle; return 0; } /* On success, the reference to ipc helper thread is returned with refcount incremented. It is the * responsibility of caller to wait for ipc helper's exit and then release the final reference to * free related resources (it is problematic for the thread itself to release its own resources e.g. * stack). */ struct shim_thread* terminate_ipc_helper(void) { /* First check if thread is alive. */ lock(&ipc_helper_lock); if (ipc_helper_state != HELPER_ALIVE) { unlock(&ipc_helper_lock); return NULL; } unlock(&ipc_helper_lock); /* NOTE: Graphene doesn't have an abstraction of a queue of pending signals between * communicating processes (instead all communication is done over streams). Thus, app code like * this (found in e.g. Lmbench's bw_unix): * kill(child, SIGKILL); * exit(0); * results in a data race between the SIGKILL message sent over IPC stream and the parent * process exiting. In the worst case, the parent will exit before the SIGKILL message goes * through the host-OS stream, the host OS will close the stream, and the message will never be * seen by child. To prevent such cases, we simply wait for a bit before exiting. */ debug( "Waiting for 0.5s for all in-flight IPC messages to reach their destinations\n"); DkThreadDelayExecution(500000); /* in microseconds */ lock(&ipc_helper_lock); if (ipc_helper_state != HELPER_ALIVE) { unlock(&ipc_helper_lock); return NULL; } struct shim_thread* ret = ipc_helper_thread; if (ret) get_thread(ret); ipc_helper_state = HELPER_NOTALIVE; unlock(&ipc_helper_lock); /* force wake up of ipc helper thread so that it exits */ set_event(&install_new_event, 1); return ret; }