shim_ipc_helper.c 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816
  1. /* Copyright (C) 2014 Stony Brook University
  2. This file is part of Graphene Library OS.
  3. Graphene Library OS is free software: you can redistribute it and/or
  4. modify it under the terms of the GNU Lesser General Public License
  5. as published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. Graphene Library OS is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Lesser General Public License for more details.
  11. You should have received a copy of the GNU Lesser General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>. */
  13. /*
  14. * shim_ipc_helper.c
  15. *
  16. * This file contains code to create an IPC helper thread inside library OS
  17. * and maintain bookkeeping of IPC ports.
  18. */
  19. #include <shim_internal.h>
  20. #include <shim_utils.h>
  21. #include <shim_thread.h>
  22. #include <shim_handle.h>
  23. #include <shim_ipc.h>
  24. #include <shim_checkpoint.h>
  25. #include <shim_profile.h>
  26. #include <pal.h>
  27. #include <pal_error.h>
  28. #include <list.h>
  29. #define IPC_HELPER_STACK_SIZE (allocsize * 4)
  30. #define PORT_MGR_ALLOC 32
  31. #define PAGE_SIZE allocsize
  32. #define OBJ_TYPE struct shim_ipc_port
  33. #include "memmgr.h"
  34. static MEM_MGR port_mgr;
  35. DEFINE_LISTP(shim_ipc_port);
  36. static LISTP_TYPE(shim_ipc_port) port_list;
  37. /* can be read without ipc_helper_lock but always written with lock held */
  38. static enum { HELPER_NOTALIVE, HELPER_ALIVE } ipc_helper_state;
  39. static struct shim_thread* ipc_helper_thread;
  40. static struct shim_lock ipc_helper_lock;
  41. static AEVENTTYPE install_new_event;
  42. static int create_ipc_helper(void);
  43. static int ipc_resp_callback(struct shim_ipc_msg* msg, struct shim_ipc_port* port);
  44. static ipc_callback ipc_callbacks[IPC_CODE_NUM] = {
  45. /* RESP */ &ipc_resp_callback,
  46. /* CHECKPOINT */ &ipc_checkpoint_callback,
  47. /* parents and children */
  48. /* CLD_EXIT */ &ipc_cld_exit_callback,
  49. /* CLD_JOIN */ &ipc_cld_join_callback,
  50. #ifdef PROFILE
  51. /* CLD_PROFILE */ &ipc_cld_profile_callback,
  52. #endif
  53. /* pid namespace */
  54. IPC_NS_CALLBACKS(pid)
  55. /* PID_KILL */ &ipc_pid_kill_callback,
  56. /* PID_GETSTATUS */ &ipc_pid_getstatus_callback,
  57. /* PID_RETSTATUS */ &ipc_pid_retstatus_callback,
  58. /* PID_GETMETA */ &ipc_pid_getmeta_callback,
  59. /* PID_RETMETA */ &ipc_pid_retmeta_callback,
  60. /* PID_NOP */ &ipc_pid_nop_callback,
  61. /* PID_SENDRPC */ &ipc_pid_sendrpc_callback,
  62. /* sysv namespace */
  63. IPC_NS_CALLBACKS(sysv)
  64. IPC_NS_KEY_CALLBACKS(sysv)
  65. /* SYSV_DELRES */ &ipc_sysv_delres_callback,
  66. /* SYSV_MOVRES */ &ipc_sysv_movres_callback,
  67. /* SYSV_MSGSND */ &ipc_sysv_msgsnd_callback,
  68. /* SYSV_MSGRCV */ &ipc_sysv_msgrcv_callback,
  69. /* SYSV_MSGMOV */ &ipc_sysv_msgmov_callback,
  70. /* SYSV_SEMOP */ &ipc_sysv_semop_callback,
  71. /* SYSV_SEMCTL */ &ipc_sysv_semctl_callback,
  72. /* SYSV_SEMRET */ &ipc_sysv_semret_callback,
  73. /* SYSV_SEMMOV */ &ipc_sysv_semmov_callback,
  74. };
  75. static int init_ipc_port(struct shim_ipc_info* info, PAL_HANDLE hdl, IDTYPE type) {
  76. if (!info)
  77. return 0;
  78. if (info->pal_handle == IPC_FORCE_RECONNECT) {
  79. info->pal_handle = NULL;
  80. if (!hdl && !qstrempty(&info->uri)) {
  81. debug("Reconnecting IPC port %s\n", qstrgetstr(&info->uri));
  82. hdl = DkStreamOpen(qstrgetstr(&info->uri), 0, 0, 0, 0);
  83. if (!hdl)
  84. return -PAL_ERRNO;
  85. }
  86. info->pal_handle = hdl;
  87. }
  88. if (!info->pal_handle)
  89. info->pal_handle = hdl;
  90. if (info->pal_handle)
  91. add_ipc_port_by_id(info->vmid == cur_process.vmid ? 0 : info->vmid,
  92. info->pal_handle, type, NULL, &info->port);
  93. return 0;
  94. }
  95. int init_ipc_ports(void) {
  96. int ret = 0;
  97. if (!(port_mgr = create_mem_mgr(init_align_up(PORT_MGR_ALLOC))))
  98. return -ENOMEM;
  99. if ((ret = init_ipc_port(cur_process.self, NULL, IPC_PORT_SERVER)) < 0)
  100. return ret;
  101. if (PAL_CB(parent_process) &&
  102. (ret = init_ipc_port(cur_process.parent, PAL_CB(parent_process),
  103. IPC_PORT_DIRPRT|IPC_PORT_LISTEN)) < 0)
  104. return ret;
  105. if ((ret = init_ipc_port(cur_process.ns[PID_NS], NULL,
  106. IPC_PORT_PIDLDR|IPC_PORT_LISTEN)) < 0)
  107. return ret;
  108. if ((ret = init_ipc_port(cur_process.ns[SYSV_NS], NULL,
  109. IPC_PORT_SYSVLDR|IPC_PORT_LISTEN)) < 0)
  110. return ret;
  111. return 0;
  112. }
  113. int init_ipc_helper(void) {
  114. /* early enough in init, can write global vars without the lock */
  115. ipc_helper_state = HELPER_NOTALIVE;
  116. create_lock(&ipc_helper_lock);
  117. create_event(&install_new_event);
  118. /* some IPC ports were already added before this point, so spawn IPC
  119. * helper thread (and enable locking mechanisms if not done already
  120. * since we are going in multi-threaded mode) */
  121. enable_locking();
  122. lock(&ipc_helper_lock);
  123. create_ipc_helper();
  124. unlock(&ipc_helper_lock);
  125. return 0;
  126. }
  127. static struct shim_ipc_port* __create_ipc_port(PAL_HANDLE hdl) {
  128. struct shim_ipc_port* port =
  129. get_mem_obj_from_mgr_enlarge(port_mgr, size_align_up(PORT_MGR_ALLOC));
  130. if (!port)
  131. return NULL;
  132. memset(port, 0, sizeof(struct shim_ipc_port));
  133. port->pal_handle = hdl;
  134. INIT_LIST_HEAD(port, list);
  135. INIT_LISTP(&port->msgs);
  136. REF_SET(port->ref_count, 0);
  137. create_lock(&port->msgs_lock);
  138. return port;
  139. }
  140. static void __free_ipc_port(struct shim_ipc_port* port) {
  141. if (port->pal_handle) {
  142. DkObjectClose(port->pal_handle);
  143. port->pal_handle = NULL;
  144. }
  145. destroy_lock(&port->msgs_lock);
  146. free_mem_obj_to_mgr(port_mgr, port);
  147. }
  148. static void __get_ipc_port(struct shim_ipc_port* port) {
  149. REF_INC(port->ref_count);
  150. }
  151. static void __put_ipc_port(struct shim_ipc_port* port) {
  152. int ref_count = REF_DEC(port->ref_count);
  153. if (!ref_count)
  154. __free_ipc_port(port);
  155. }
  156. void get_ipc_port(struct shim_ipc_port* port) {
  157. /* no need to grab ipc_helper_lock because __get_ipc_port() is atomic */
  158. __get_ipc_port(port);
  159. }
  160. void put_ipc_port(struct shim_ipc_port* port) {
  161. /* this is atomic so we don't grab lock in common case of ref_count > 0 */
  162. int ref_count = REF_DEC(port->ref_count);
  163. if (!ref_count) {
  164. lock(&ipc_helper_lock);
  165. __free_ipc_port(port);
  166. unlock(&ipc_helper_lock);
  167. }
  168. }
  169. static void __add_ipc_port(struct shim_ipc_port* port, IDTYPE vmid,
  170. IDTYPE type, port_fini fini) {
  171. assert(vmid != cur_process.vmid); /* no sense in IPCing myself */
  172. port->type |= type;
  173. if (vmid && !port->vmid)
  174. port->vmid = vmid;
  175. /* find empty slot in fini callbacks and register callback */
  176. if (fini) {
  177. for (int i = 0; i < MAX_IPC_PORT_FINI_CB; i++)
  178. if (!port->fini[i] || port->fini[i] == fini) {
  179. port->fini[i] = fini;
  180. break;
  181. }
  182. }
  183. /* add to port list if not there already */
  184. if (LIST_EMPTY(port, list)) {
  185. __get_ipc_port(port);
  186. LISTP_ADD(port, &port_list, list);
  187. }
  188. /* wake up IPC helper thread so that it picks up added port */
  189. if (ipc_helper_state == HELPER_ALIVE)
  190. set_event(&install_new_event, 1);
  191. }
  192. static void __del_ipc_port(struct shim_ipc_port* port) {
  193. debug("Deleting port %p (handle %p) of process %u\n",
  194. port, port->pal_handle, port->vmid & 0xFFFF);
  195. DkStreamDelete(port->pal_handle, 0);
  196. LISTP_DEL_INIT(port, &port_list, list);
  197. /* Check for pending messages on port (threads might be blocking for responses) */
  198. lock(&port->msgs_lock);
  199. struct shim_ipc_msg_duplex* msg;
  200. struct shim_ipc_msg_duplex* tmp;
  201. LISTP_FOR_EACH_ENTRY_SAFE(msg, tmp, &port->msgs, list) {
  202. LISTP_DEL_INIT(msg, &port->msgs, list);
  203. msg->retval = -ECONNRESET;
  204. if (msg->thread) {
  205. debug("Deleted pending message on port %p, wake up blocking thread %d\n",
  206. port, msg->thread->tid);
  207. thread_wakeup(msg->thread);
  208. }
  209. }
  210. unlock(&port->msgs_lock);
  211. __put_ipc_port(port);
  212. /* wake up IPC helper thread so that it forgets about deleted port */
  213. if (ipc_helper_state == HELPER_ALIVE)
  214. set_event(&install_new_event, 1);
  215. }
  216. void add_ipc_port(struct shim_ipc_port* port, IDTYPE vmid, IDTYPE type, port_fini fini) {
  217. debug("Adding port %p (handle %p) for process %u (type=%04x)\n",
  218. port, port->pal_handle, port->vmid & 0xFFFF, type);
  219. lock(&ipc_helper_lock);
  220. __add_ipc_port(port, vmid, type, fini);
  221. unlock(&ipc_helper_lock);
  222. }
  223. void add_ipc_port_by_id(IDTYPE vmid, PAL_HANDLE hdl, IDTYPE type,
  224. port_fini fini, struct shim_ipc_port** portptr) {
  225. debug("Adding port (handle %p) for process %u (type %04x)\n",
  226. hdl, vmid & 0xFFFF, type);
  227. struct shim_ipc_port* port = NULL;
  228. if (portptr)
  229. *portptr = NULL;
  230. assert(hdl && PAL_GET_TYPE(hdl));
  231. lock(&ipc_helper_lock);
  232. /* check if port with this PAL handle already exists, then we only
  233. * need to update its vmid, type, and fini callback */
  234. struct shim_ipc_port* tmp;
  235. LISTP_FOR_EACH_ENTRY(tmp, &port_list, list)
  236. if (tmp->pal_handle == hdl) {
  237. port = tmp;
  238. break;
  239. }
  240. if (!port) {
  241. /* port does not yet exist, create it */
  242. port = __create_ipc_port(hdl);
  243. if (!port) {
  244. debug("Failed to create IPC port for handle %p\n", hdl);
  245. goto out;
  246. }
  247. }
  248. /* add/update port */
  249. __add_ipc_port(port, vmid, type, fini);
  250. if (portptr) {
  251. __get_ipc_port(port);
  252. *portptr = port;
  253. }
  254. out:
  255. unlock(&ipc_helper_lock);
  256. }
  257. void del_ipc_port_fini(struct shim_ipc_port* port, unsigned int exitcode) {
  258. lock(&ipc_helper_lock);
  259. if (LIST_EMPTY(port, list)) {
  260. unlock(&ipc_helper_lock);
  261. return;
  262. }
  263. /* prevent __del_ipc_port() from freeing port since we need it for fini callbacks */
  264. __get_ipc_port(port);
  265. __del_ipc_port(port);
  266. unlock(&ipc_helper_lock);
  267. for (int i = 0; i < MAX_IPC_PORT_FINI_CB; i++)
  268. if (port->fini[i])
  269. (port->fini[i])(port, port->vmid, exitcode);
  270. __put_ipc_port(port);
  271. }
  272. void del_all_ipc_ports(void) {
  273. lock(&ipc_helper_lock);
  274. struct shim_ipc_port* port;
  275. struct shim_ipc_port* tmp;
  276. LISTP_FOR_EACH_ENTRY_SAFE(port, tmp, &port_list, list)
  277. __del_ipc_port(port);
  278. unlock(&ipc_helper_lock);
  279. }
  280. struct shim_ipc_port* lookup_ipc_port(IDTYPE vmid, IDTYPE type) {
  281. struct shim_ipc_port* port = NULL;
  282. assert(vmid && type);
  283. lock(&ipc_helper_lock);
  284. struct shim_ipc_port* tmp;
  285. LISTP_FOR_EACH_ENTRY(tmp, &port_list, list) {
  286. if (tmp->vmid == vmid && (tmp->type & type)) {
  287. debug("found port %p (handle %p) for process %u (type %04x)\n",
  288. tmp, tmp->pal_handle, tmp->vmid & 0xFFFF, tmp->type);
  289. port = tmp;
  290. __get_ipc_port(port);
  291. break;
  292. }
  293. }
  294. unlock(&ipc_helper_lock);
  295. return port;
  296. }
  297. int broadcast_ipc(struct shim_ipc_msg* msg, int target_type, struct shim_ipc_port* exclude_port) {
  298. int ret;
  299. struct shim_ipc_port* port;
  300. struct shim_ipc_port** target_ports;
  301. int target_ports_cnt = 0;
  302. assert(target_type);
  303. lock(&ipc_helper_lock);
  304. /* Collect all ports with appropriate types. In common case, stack-allocated
  305. * array of 32 ports is enough. If there are more ports, we will allocate
  306. * a bigger array on the heap and collect all ports again. */
  307. struct shim_ipc_port* target_ports_stack[32];
  308. LISTP_FOR_EACH_ENTRY(port, &port_list, list) {
  309. if (port == exclude_port)
  310. continue;
  311. if (port->type & target_type) {
  312. if (target_ports_cnt < 32)
  313. target_ports_stack[target_ports_cnt] = port;
  314. target_ports_cnt++;
  315. }
  316. }
  317. target_ports = target_ports_stack;
  318. if (target_ports_cnt > 32) {
  319. /* Rare case when there are more than 32 ports. Allocate big-enough
  320. * array on the heap and collect all ports again. */
  321. int cnt = 0;
  322. struct shim_ipc_port** target_ports_heap =
  323. malloc(sizeof(struct shim_ipc_port *) * target_ports_cnt);
  324. LISTP_FOR_EACH_ENTRY(port, &port_list, list) {
  325. if (port == exclude_port)
  326. continue;
  327. if (port->type & target_type)
  328. target_ports_heap[cnt++] = port;
  329. }
  330. target_ports = target_ports_heap;
  331. assert(cnt == target_ports_cnt);
  332. }
  333. unlock(&ipc_helper_lock);
  334. for (int i = 0; i < target_ports_cnt; i++)
  335. get_ipc_port(target_ports[i]);
  336. /* send msg to each collected port (note that ports cannot be freed in meantime) */
  337. for (int i = 0; i < target_ports_cnt; i++) {
  338. port = target_ports[i];
  339. debug("Broadcast to port %p (handle %p) for process %u (type %x, target %x)\n",
  340. port, port->pal_handle, port->vmid & 0xFFFF, port->type, target_type);
  341. msg->dst = port->vmid;
  342. ret = send_ipc_message(msg, port);
  343. if (ret < 0) {
  344. debug("Broadcast to port %p (handle %p) for process %u failed (errno = %d)!\n",
  345. port, port->pal_handle, port->vmid & 0xFFFF, ret);
  346. goto out;
  347. }
  348. }
  349. ret = 0;
  350. out:
  351. for (int i = 0; i < target_ports_cnt; i++)
  352. put_ipc_port(target_ports[i]);
  353. if (target_ports != target_ports_stack)
  354. free(target_ports);
  355. return ret;
  356. }
  357. static int ipc_resp_callback(struct shim_ipc_msg* msg, struct shim_ipc_port* port) {
  358. struct shim_ipc_resp* resp = (struct shim_ipc_resp*) &msg->msg;
  359. debug("IPC callback from %u: IPC_RESP(%d)\n", msg->src & 0xFFFF, resp->retval);
  360. if (!msg->seq)
  361. return resp->retval;
  362. /* find a corresponding request msg for this response msg */
  363. struct shim_ipc_msg_duplex* req_msg = pop_ipc_msg_duplex(port, msg->seq);
  364. /* if some thread waits for response, wake it up with response retval */
  365. if (req_msg) {
  366. req_msg->retval = resp->retval;
  367. if (req_msg->thread)
  368. thread_wakeup(req_msg->thread);
  369. return 0;
  370. }
  371. return resp->retval;
  372. }
  373. int send_response_ipc_message(struct shim_ipc_port* port, IDTYPE dest, int ret, uint64_t seq) {
  374. ret = (ret == RESPONSE_CALLBACK) ? 0 : ret;
  375. /* create IPC_RESP msg to send to dest, with sequence number seq, and in-body retval ret */
  376. size_t total_msg_size = get_ipc_msg_size(sizeof(struct shim_ipc_resp));
  377. struct shim_ipc_msg* resp_msg = __alloca(total_msg_size);
  378. init_ipc_msg(resp_msg, IPC_RESP, total_msg_size, dest);
  379. resp_msg->seq = seq;
  380. struct shim_ipc_resp* resp = (struct shim_ipc_resp *) resp_msg->msg;
  381. resp->retval = ret;
  382. debug("IPC send to %u: IPC_RESP(%d)\n", resp_msg->dst & 0xFFFF, ret);
  383. return send_ipc_message(resp_msg, port);
  384. }
  385. static int receive_ipc_message(struct shim_ipc_port* port) {
  386. int ret;
  387. int readahead = IPC_MSG_MINIMAL_SIZE * 2;
  388. int bufsize = IPC_MSG_MINIMAL_SIZE + readahead;
  389. struct shim_ipc_msg* msg = malloc(bufsize);
  390. int expected_size = IPC_MSG_MINIMAL_SIZE;
  391. int bytes = 0;
  392. do {
  393. while (bytes < expected_size) {
  394. /* grow msg buffer to accomodate bigger messages */
  395. if (expected_size + readahead > bufsize) {
  396. while (expected_size + readahead > bufsize)
  397. bufsize *= 2;
  398. void* tmp_buf = malloc(bufsize);
  399. memcpy(tmp_buf, msg, bytes);
  400. free(msg);
  401. msg = tmp_buf;
  402. }
  403. /* TODO: Add while-loop to receive all msg */
  404. int read = DkStreamRead(port->pal_handle, /*offset*/ 0, expected_size - bytes + readahead,
  405. (void *) msg + bytes, NULL, 0);
  406. if (read == 0) {
  407. /* TODO: Handle benign EINTR case? */
  408. debug("Port %p (handle %p) closed while receiving IPC message\n", port, port->pal_handle);
  409. del_ipc_port_fini(port, -ECHILD);
  410. ret = -PAL_ERRNO;
  411. goto out;
  412. }
  413. bytes += read;
  414. /* extract actual msg size from msg header and continue reading msg body */
  415. if (bytes >= IPC_MSG_MINIMAL_SIZE)
  416. expected_size = msg->size;
  417. }
  418. debug("Received IPC message from port %p (handle %p): code=%d size=%lu src=%u dst=%u seq=%lx\n",
  419. port, port->pal_handle, msg->code, msg->size, msg->src & 0xFFFF, msg->dst & 0xFFFF, msg->seq);
  420. /* skip messages coming from myself (in case of broadcast) */
  421. if (msg->src != cur_process.vmid) {
  422. if (msg->code < IPC_CODE_NUM && ipc_callbacks[msg->code]) {
  423. /* invoke callback to this msg */
  424. ret = (*ipc_callbacks[msg->code]) (msg, port);
  425. if ((ret < 0 || ret == RESPONSE_CALLBACK) && msg->seq) {
  426. /* send IPC_RESP message to sender of this msg */
  427. ret = send_response_ipc_message(port, msg->src, ret, msg->seq);
  428. if (ret < 0) {
  429. debug("Sending IPC_RESP msg on port %p (handle %p) to %u failed\n",
  430. port, port->pal_handle, msg->src & 0xFFFF);
  431. ret = -PAL_ERRNO;
  432. goto out;
  433. }
  434. }
  435. }
  436. }
  437. bytes -= expected_size; /* one message was received and handled */
  438. if (bytes > 0) {
  439. /* we may have started reading the next message, move this message
  440. * to beginning of msg buffer and reset expected size */
  441. memmove(msg, (void *) msg + expected_size, bytes);
  442. expected_size = IPC_MSG_MINIMAL_SIZE;
  443. if (bytes >= IPC_MSG_MINIMAL_SIZE)
  444. expected_size = msg->size;
  445. }
  446. } while (bytes > 0);
  447. ret = 0;
  448. out:
  449. free(msg);
  450. return ret;
  451. }
  452. /* Main routine of the IPC helper thread. IPC helper thread is spawned when
  453. * the first IPC port is added and is terminated only when the whole Graphene
  454. * application terminates. IPC helper thread runs in an endless loop and waits
  455. * on port events (either the addition/removal of ports or actual port events:
  456. * acceptance of new client or receiving/sending messages). In particular,
  457. * IPC helper thread calls receive_ipc_message() if a message arrives on port.
  458. *
  459. * Other threads add and remove IPC ports via add_ipc_xxx() and del_ipc_xxx()
  460. * functions. These ports are added to port_list which the IPC helper thread
  461. * consults before each new DkObjectsWaitAny().
  462. *
  463. * Note that ports are copied from global port_list to local object_list. This
  464. * is because ports may be removed from port_list by other threads while IPC
  465. * helper thread is waiting on DkObjectsWaitAny(). For this reason IPC thread
  466. * also get references to all current ports and puts them after handling all
  467. * ports in object_list.
  468. *
  469. * Previous implementation went to great lengths to keep changes to the list of
  470. * current ports to a minimum (instead of repopulating the list before each wait
  471. * like in current code). Unfortunately, this resulted in undue complexity.
  472. * Current implementation should perform fine for usual case of <100 IPC ports
  473. * and with IPC helper thread always running in background on its own core.
  474. */
  475. noreturn static void shim_ipc_helper(void* dummy) {
  476. __UNUSED(dummy);
  477. struct shim_thread* self = get_cur_thread();
  478. PAL_HANDLE polled = NULL;
  479. /* Initialize two lists:
  480. * - object_list collects IPC port objects and is the main handled list
  481. * - palhandle_list collects corresponding PAL handles of IPC port objects
  482. * and is needed for DkObjectsWaitAny(.., <arrya-of-PAL-handles>, ..)
  483. * interface; palhandle_list always contains at least install_new_event
  484. *
  485. * We allocate these two lists on the heap so they do not overflow the
  486. * limited PAL stack. We grow them at runtime if needed.
  487. */
  488. size_t object_list_size = 0;
  489. size_t object_list_maxsize = 32;
  490. struct shim_ipc_port** object_list =
  491. malloc(sizeof(struct shim_ipc_port *) * object_list_maxsize);
  492. PAL_HANDLE* palhandle_list =
  493. malloc(sizeof(PAL_HANDLE) * (1 + object_list_maxsize));
  494. PAL_HANDLE install_new_event_hdl = event_handle(&install_new_event);
  495. palhandle_list[0] = install_new_event_hdl;
  496. while (ipc_helper_state == HELPER_ALIVE) {
  497. struct shim_ipc_port* polled_port = NULL;
  498. if (polled == install_new_event_hdl) {
  499. /* some thread wants to install new event; this event is found
  500. * in object_list below, so just re-init install_new_event */
  501. debug("New IPC event was requested (port was added/removed)\n");
  502. clear_event(&install_new_event);
  503. } else {
  504. /* it is not install_new_event handle, so must be one of ports */
  505. for (size_t i = 0; i < object_list_size; i++)
  506. if (polled == object_list[i]->pal_handle) {
  507. polled_port = object_list[i];
  508. break;
  509. }
  510. }
  511. if (polled_port) {
  512. if (polled_port->type & IPC_PORT_SERVER) {
  513. /* if polled port is server port, accept a client,
  514. * create client port, and add it to port list */
  515. PAL_HANDLE client = DkStreamWaitForClient(polled_port->pal_handle);
  516. if (client) {
  517. /* type of client port is the same as original server port
  518. * but with LISTEN (for remote client) and without SERVER
  519. * (this port doesn't wait for new clients) */
  520. IDTYPE client_type = (polled_port->type & ~IPC_PORT_SERVER) | IPC_PORT_LISTEN;
  521. add_ipc_port_by_id(polled_port->vmid, client, client_type, NULL, NULL);
  522. } else {
  523. debug("Port %p (handle %p) was removed during accepting client\n",
  524. polled_port, polled_port->pal_handle);
  525. del_ipc_port_fini(polled_port, -ECHILD);
  526. }
  527. } else {
  528. PAL_STREAM_ATTR attr;
  529. if (DkStreamAttributesQueryByHandle(polled_port->pal_handle, &attr)) {
  530. /* can read on this port, so receive messages */
  531. if (attr.readable) {
  532. /* TODO: IPC helper thread does not handle failures currently */
  533. receive_ipc_message(polled_port);
  534. }
  535. if (attr.disconnected) {
  536. debug("Port %p (handle %p) disconnected\n",
  537. polled_port, polled_port->pal_handle);
  538. del_ipc_port_fini(polled_port, -ECONNRESET);
  539. }
  540. } else {
  541. debug("Port %p (handle %p) was removed during attr querying\n",
  542. polled_port, polled_port->pal_handle);
  543. del_ipc_port_fini(polled_port, -PAL_ERRNO);
  544. }
  545. }
  546. }
  547. /* done handling ports; put their references so they can be freed */
  548. for (size_t i = 0; i < object_list_size; i++) {
  549. struct shim_ipc_port* port = object_list[i];
  550. put_ipc_port(port);
  551. }
  552. lock(&ipc_helper_lock);
  553. /* iterate through all ports to repopulate object_list */
  554. object_list_size = 0;
  555. struct shim_ipc_port* port;
  556. struct shim_ipc_port* tmp;
  557. LISTP_FOR_EACH_ENTRY_SAFE(port, tmp, &port_list, list) {
  558. /* get port reference so it is not freed while we wait on/handle it */
  559. __get_ipc_port(port);
  560. /* grow object_list and palhandle_list to accomodate more objects */
  561. if (object_list_size == object_list_maxsize) {
  562. struct shim_ipc_port** tmp_array = malloc(
  563. sizeof(struct shim_ipc_port *) * (object_list_maxsize * 2));
  564. PAL_HANDLE* tmp_pal_array = malloc(
  565. sizeof(PAL_HANDLE) * (1 + object_list_maxsize * 2));
  566. memcpy(tmp_array, object_list,
  567. sizeof(struct shim_ipc_port *) * (object_list_maxsize));
  568. memcpy(tmp_pal_array, palhandle_list,
  569. sizeof(PAL_HANDLE) * (1 + object_list_size));
  570. object_list_maxsize *= 2;
  571. free(object_list);
  572. free(palhandle_list);
  573. object_list = tmp_array;
  574. palhandle_list = tmp_pal_array;
  575. }
  576. /* re-add this port to object_list and palhandle_list */
  577. object_list[object_list_size] = port;
  578. palhandle_list[object_list_size + 1] = port->pal_handle;
  579. object_list_size++;
  580. debug("Listening to process %u on port %p (handle %p, type %04x)\n",
  581. port->vmid & 0xFFFF, port, port->pal_handle, port->type);
  582. }
  583. unlock(&ipc_helper_lock);
  584. /* wait on collected ports' PAL handles + install_new_event_hdl */
  585. polled = DkObjectsWaitAny(object_list_size + 1, palhandle_list, NO_TIMEOUT);
  586. /* ensure that while loop breaks on ipc_helper_state change */
  587. COMPILER_BARRIER();
  588. }
  589. /* IPC thread exits; put acquired port references so they can be freed */
  590. for (size_t i = 0; i < object_list_size; i++) {
  591. struct shim_ipc_port* port = object_list[i];
  592. put_ipc_port(port);
  593. }
  594. free(object_list);
  595. free(palhandle_list);
  596. lock(&ipc_helper_lock);
  597. ipc_helper_state = HELPER_NOTALIVE;
  598. ipc_helper_thread = NULL;
  599. unlock(&ipc_helper_lock);
  600. put_thread(self);
  601. debug("IPC helper thread terminated\n");
  602. DkThreadExit();
  603. }
  604. static void shim_ipc_helper_prepare(void* arg) {
  605. struct shim_thread * self = (struct shim_thread *) arg;
  606. if (!arg)
  607. return;
  608. __libc_tcb_t tcb;
  609. allocate_tls(&tcb, false, self);
  610. debug_setbuf(&tcb.shim_tcb, true);
  611. debug("Set tcb to %p\n", &tcb);
  612. lock(&ipc_helper_lock);
  613. bool notme = (self != ipc_helper_thread);
  614. unlock(&ipc_helper_lock);
  615. void* stack = allocate_stack(IPC_HELPER_STACK_SIZE, allocsize, false);
  616. if (notme || !stack) {
  617. put_thread(self);
  618. DkThreadExit();
  619. return;
  620. }
  621. debug("IPC helper thread started\n");
  622. /* swap stack to be sure we don't drain the small stack PAL provides */
  623. self->stack_top = stack + IPC_HELPER_STACK_SIZE;
  624. self->stack = stack;
  625. __SWITCH_STACK(self->stack_top, shim_ipc_helper, NULL);
  626. }
  627. /* this should be called with the ipc_helper_lock held */
  628. static int create_ipc_helper(void) {
  629. if (ipc_helper_state == HELPER_ALIVE)
  630. return 0;
  631. struct shim_thread * new = get_new_internal_thread();
  632. if (!new)
  633. return -ENOMEM;
  634. ipc_helper_thread = new;
  635. ipc_helper_state = HELPER_ALIVE;
  636. PAL_HANDLE handle = thread_create(shim_ipc_helper_prepare, new);
  637. if (!handle) {
  638. ipc_helper_thread = NULL;
  639. ipc_helper_state = HELPER_NOTALIVE;
  640. put_thread(new);
  641. return -PAL_ERRNO;
  642. }
  643. new->pal_handle = handle;
  644. return 0;
  645. }
  646. /* On success, the reference to ipc helper thread is returned with refcount
  647. * incremented. It is the responsibility of caller to wait for ipc helper's
  648. * exit and then release the final reference to free related resources (it is
  649. * problematic for the thread itself to release its own resources e.g. stack).
  650. */
  651. struct shim_thread* terminate_ipc_helper(void) {
  652. if (ipc_helper_state != HELPER_ALIVE)
  653. return NULL;
  654. /* NOTE: Graphene doesn't have an abstraction of a queue of pending signals
  655. * between communicating processes (instead all communication is done over
  656. * streams). Thus, app code like this (found in e.g. Lmbench's bw_unix):
  657. * kill(child, SIGKILL);
  658. * exit(0);
  659. * results in a data race between the SIGKILL message sent over IPC stream
  660. * and the parent process exiting. In the worst case, the parent will exit
  661. * before the SIGKILL message goes through the host-OS stream, the host OS
  662. * will close the stream, and the message will never be seen by child. To
  663. * prevent such cases, we simply wait for a bit before exiting.
  664. * */
  665. debug("Waiting for 0.5s for all in-flight IPC messages to reach their destinations\n");
  666. DkThreadDelayExecution(500);
  667. lock(&ipc_helper_lock);
  668. struct shim_thread* ret = ipc_helper_thread;
  669. if (ret)
  670. get_thread(ret);
  671. ipc_helper_state = HELPER_NOTALIVE;
  672. unlock(&ipc_helper_lock);
  673. /* force wake up of ipc helper thread so that it exits */
  674. set_event(&install_new_event, 1);
  675. return ret;
  676. }