shim_ipc_helper.c 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817
  1. /* Copyright (C) 2014 Stony Brook University
  2. This file is part of Graphene Library OS.
  3. Graphene Library OS is free software: you can redistribute it and/or
  4. modify it under the terms of the GNU Lesser General Public License
  5. as published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. Graphene Library OS is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Lesser General Public License for more details.
  11. You should have received a copy of the GNU Lesser General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>. */
  13. /*
  14. * shim_ipc_helper.c
  15. *
  16. * This file contains code to create an IPC helper thread inside library OS
  17. * and maintain bookkeeping of IPC ports.
  18. */
  19. #include <shim_internal.h>
  20. #include <shim_utils.h>
  21. #include <shim_thread.h>
  22. #include <shim_handle.h>
  23. #include <shim_ipc.h>
  24. #include <shim_checkpoint.h>
  25. #include <shim_profile.h>
  26. #include <pal.h>
  27. #include <pal_error.h>
  28. #include <list.h>
  29. #define IPC_HELPER_STACK_SIZE (allocsize * 4)
  30. #define PORT_MGR_ALLOC 32
  31. #define PAGE_SIZE allocsize
  32. #define OBJ_TYPE struct shim_ipc_port
  33. #include "memmgr.h"
  34. static MEM_MGR port_mgr;
  35. DEFINE_LISTP(shim_ipc_port);
  36. static LISTP_TYPE(shim_ipc_port) port_list;
  37. /* can be read without ipc_helper_lock but always written with lock held */
  38. static enum { HELPER_NOTALIVE, HELPER_ALIVE } ipc_helper_state;
  39. static struct shim_thread* ipc_helper_thread;
  40. static struct shim_lock ipc_helper_lock;
  41. static AEVENTTYPE install_new_event;
  42. static int create_ipc_helper(void);
  43. static int ipc_resp_callback(struct shim_ipc_msg* msg, struct shim_ipc_port* port);
  44. static ipc_callback ipc_callbacks[IPC_CODE_NUM] = {
  45. /* RESP */ &ipc_resp_callback,
  46. /* CHECKPOINT */ &ipc_checkpoint_callback,
  47. /* parents and children */
  48. /* CLD_EXIT */ &ipc_cld_exit_callback,
  49. #ifdef PROFILE
  50. /* CLD_PROFILE */ &ipc_cld_profile_callback,
  51. #endif
  52. /* pid namespace */
  53. IPC_NS_CALLBACKS(pid)
  54. /* PID_KILL */ &ipc_pid_kill_callback,
  55. /* PID_GETSTATUS */ &ipc_pid_getstatus_callback,
  56. /* PID_RETSTATUS */ &ipc_pid_retstatus_callback,
  57. /* PID_GETMETA */ &ipc_pid_getmeta_callback,
  58. /* PID_RETMETA */ &ipc_pid_retmeta_callback,
  59. /* PID_NOP */ &ipc_pid_nop_callback,
  60. /* PID_SENDRPC */ &ipc_pid_sendrpc_callback,
  61. /* sysv namespace */
  62. IPC_NS_CALLBACKS(sysv)
  63. IPC_NS_KEY_CALLBACKS(sysv)
  64. /* SYSV_DELRES */ &ipc_sysv_delres_callback,
  65. /* SYSV_MOVRES */ &ipc_sysv_movres_callback,
  66. /* SYSV_MSGSND */ &ipc_sysv_msgsnd_callback,
  67. /* SYSV_MSGRCV */ &ipc_sysv_msgrcv_callback,
  68. /* SYSV_MSGMOV */ &ipc_sysv_msgmov_callback,
  69. /* SYSV_SEMOP */ &ipc_sysv_semop_callback,
  70. /* SYSV_SEMCTL */ &ipc_sysv_semctl_callback,
  71. /* SYSV_SEMRET */ &ipc_sysv_semret_callback,
  72. /* SYSV_SEMMOV */ &ipc_sysv_semmov_callback,
  73. };
  74. static int init_ipc_port(struct shim_ipc_info* info, PAL_HANDLE hdl, IDTYPE type) {
  75. if (!info)
  76. return 0;
  77. if (info->pal_handle == IPC_FORCE_RECONNECT) {
  78. info->pal_handle = NULL;
  79. if (!hdl && !qstrempty(&info->uri)) {
  80. debug("Reconnecting IPC port %s\n", qstrgetstr(&info->uri));
  81. hdl = DkStreamOpen(qstrgetstr(&info->uri), 0, 0, 0, 0);
  82. if (!hdl)
  83. return -PAL_ERRNO;
  84. }
  85. info->pal_handle = hdl;
  86. }
  87. if (!info->pal_handle)
  88. info->pal_handle = hdl;
  89. if (info->pal_handle)
  90. add_ipc_port_by_id(info->vmid == cur_process.vmid ? 0 : info->vmid,
  91. info->pal_handle, type, NULL, &info->port);
  92. return 0;
  93. }
  94. int init_ipc_ports(void) {
  95. int ret = 0;
  96. if (!(port_mgr = create_mem_mgr(init_align_up(PORT_MGR_ALLOC))))
  97. return -ENOMEM;
  98. if ((ret = init_ipc_port(cur_process.self, NULL, IPC_PORT_SERVER)) < 0)
  99. return ret;
  100. if (PAL_CB(parent_process) &&
  101. (ret = init_ipc_port(cur_process.parent, PAL_CB(parent_process),
  102. IPC_PORT_DIRPRT|IPC_PORT_LISTEN)) < 0)
  103. return ret;
  104. if ((ret = init_ipc_port(cur_process.ns[PID_NS], NULL,
  105. IPC_PORT_PIDLDR|IPC_PORT_LISTEN)) < 0)
  106. return ret;
  107. if ((ret = init_ipc_port(cur_process.ns[SYSV_NS], NULL,
  108. IPC_PORT_SYSVLDR|IPC_PORT_LISTEN)) < 0)
  109. return ret;
  110. return 0;
  111. }
  112. int init_ipc_helper(void) {
  113. /* early enough in init, can write global vars without the lock */
  114. ipc_helper_state = HELPER_NOTALIVE;
  115. create_lock(&ipc_helper_lock);
  116. create_event(&install_new_event);
  117. /* some IPC ports were already added before this point, so spawn IPC
  118. * helper thread (and enable locking mechanisms if not done already
  119. * since we are going in multi-threaded mode) */
  120. enable_locking();
  121. lock(&ipc_helper_lock);
  122. create_ipc_helper();
  123. unlock(&ipc_helper_lock);
  124. return 0;
  125. }
  126. static struct shim_ipc_port* __create_ipc_port(PAL_HANDLE hdl) {
  127. struct shim_ipc_port* port =
  128. get_mem_obj_from_mgr_enlarge(port_mgr, size_align_up(PORT_MGR_ALLOC));
  129. if (!port)
  130. return NULL;
  131. memset(port, 0, sizeof(struct shim_ipc_port));
  132. port->pal_handle = hdl;
  133. INIT_LIST_HEAD(port, list);
  134. INIT_LISTP(&port->msgs);
  135. REF_SET(port->ref_count, 0);
  136. create_lock(&port->msgs_lock);
  137. return port;
  138. }
  139. static void __free_ipc_port(struct shim_ipc_port* port) {
  140. if (port->pal_handle) {
  141. DkObjectClose(port->pal_handle);
  142. port->pal_handle = NULL;
  143. }
  144. destroy_lock(&port->msgs_lock);
  145. free_mem_obj_to_mgr(port_mgr, port);
  146. }
  147. static void __get_ipc_port(struct shim_ipc_port* port) {
  148. REF_INC(port->ref_count);
  149. }
  150. static void __put_ipc_port(struct shim_ipc_port* port) {
  151. int ref_count = REF_DEC(port->ref_count);
  152. if (!ref_count)
  153. __free_ipc_port(port);
  154. }
  155. void get_ipc_port(struct shim_ipc_port* port) {
  156. /* no need to grab ipc_helper_lock because __get_ipc_port() is atomic */
  157. __get_ipc_port(port);
  158. }
  159. void put_ipc_port(struct shim_ipc_port* port) {
  160. /* this is atomic so we don't grab lock in common case of ref_count > 0 */
  161. int ref_count = REF_DEC(port->ref_count);
  162. if (!ref_count) {
  163. lock(&ipc_helper_lock);
  164. __free_ipc_port(port);
  165. unlock(&ipc_helper_lock);
  166. }
  167. }
  168. static void __add_ipc_port(struct shim_ipc_port* port, IDTYPE vmid,
  169. IDTYPE type, port_fini fini) {
  170. assert(vmid != cur_process.vmid); /* no sense in IPCing myself */
  171. port->type |= type;
  172. if (vmid && !port->vmid)
  173. port->vmid = vmid;
  174. /* find empty slot in fini callbacks and register callback */
  175. if (fini) {
  176. for (int i = 0; i < MAX_IPC_PORT_FINI_CB; i++)
  177. if (!port->fini[i] || port->fini[i] == fini) {
  178. port->fini[i] = fini;
  179. break;
  180. }
  181. }
  182. /* add to port list if not there already */
  183. if (LIST_EMPTY(port, list)) {
  184. __get_ipc_port(port);
  185. LISTP_ADD(port, &port_list, list);
  186. }
  187. /* wake up IPC helper thread so that it picks up added port */
  188. if (ipc_helper_state == HELPER_ALIVE)
  189. set_event(&install_new_event, 1);
  190. }
  191. static void __del_ipc_port(struct shim_ipc_port* port) {
  192. debug("Deleting port %p (handle %p) of process %u\n",
  193. port, port->pal_handle, port->vmid & 0xFFFF);
  194. DkStreamDelete(port->pal_handle, 0);
  195. LISTP_DEL_INIT(port, &port_list, list);
  196. /* Check for pending messages on port (threads might be blocking for responses) */
  197. lock(&port->msgs_lock);
  198. struct shim_ipc_msg_duplex* msg;
  199. struct shim_ipc_msg_duplex* tmp;
  200. LISTP_FOR_EACH_ENTRY_SAFE(msg, tmp, &port->msgs, list) {
  201. LISTP_DEL_INIT(msg, &port->msgs, list);
  202. msg->retval = -ECONNRESET;
  203. if (msg->thread) {
  204. debug("Deleted pending message on port %p, wake up blocking thread %d\n",
  205. port, msg->thread->tid);
  206. thread_wakeup(msg->thread);
  207. }
  208. }
  209. unlock(&port->msgs_lock);
  210. __put_ipc_port(port);
  211. /* wake up IPC helper thread so that it forgets about deleted port */
  212. if (ipc_helper_state == HELPER_ALIVE)
  213. set_event(&install_new_event, 1);
  214. }
  215. void add_ipc_port(struct shim_ipc_port* port, IDTYPE vmid, IDTYPE type, port_fini fini) {
  216. debug("Adding port %p (handle %p) for process %u (type=%04x)\n",
  217. port, port->pal_handle, port->vmid & 0xFFFF, type);
  218. lock(&ipc_helper_lock);
  219. __add_ipc_port(port, vmid, type, fini);
  220. unlock(&ipc_helper_lock);
  221. }
  222. void add_ipc_port_by_id(IDTYPE vmid, PAL_HANDLE hdl, IDTYPE type,
  223. port_fini fini, struct shim_ipc_port** portptr) {
  224. debug("Adding port (handle %p) for process %u (type %04x)\n",
  225. hdl, vmid & 0xFFFF, type);
  226. struct shim_ipc_port* port = NULL;
  227. if (portptr)
  228. *portptr = NULL;
  229. assert(hdl && PAL_GET_TYPE(hdl));
  230. lock(&ipc_helper_lock);
  231. /* check if port with this PAL handle already exists, then we only
  232. * need to update its vmid, type, and fini callback */
  233. struct shim_ipc_port* tmp;
  234. LISTP_FOR_EACH_ENTRY(tmp, &port_list, list)
  235. if (tmp->pal_handle == hdl) {
  236. port = tmp;
  237. break;
  238. }
  239. if (!port) {
  240. /* port does not yet exist, create it */
  241. port = __create_ipc_port(hdl);
  242. if (!port) {
  243. debug("Failed to create IPC port for handle %p\n", hdl);
  244. goto out;
  245. }
  246. }
  247. /* add/update port */
  248. __add_ipc_port(port, vmid, type, fini);
  249. if (portptr) {
  250. __get_ipc_port(port);
  251. *portptr = port;
  252. }
  253. out:
  254. unlock(&ipc_helper_lock);
  255. }
  256. void del_ipc_port_fini(struct shim_ipc_port* port, unsigned int exitcode) {
  257. lock(&ipc_helper_lock);
  258. if (LIST_EMPTY(port, list)) {
  259. unlock(&ipc_helper_lock);
  260. return;
  261. }
  262. /* prevent __del_ipc_port() from freeing port since we need it for fini callbacks */
  263. __get_ipc_port(port);
  264. __del_ipc_port(port);
  265. unlock(&ipc_helper_lock);
  266. for (int i = 0; i < MAX_IPC_PORT_FINI_CB; i++)
  267. if (port->fini[i])
  268. (port->fini[i])(port, port->vmid, exitcode);
  269. __put_ipc_port(port);
  270. }
  271. void del_all_ipc_ports(void) {
  272. lock(&ipc_helper_lock);
  273. struct shim_ipc_port* port;
  274. struct shim_ipc_port* tmp;
  275. LISTP_FOR_EACH_ENTRY_SAFE(port, tmp, &port_list, list)
  276. __del_ipc_port(port);
  277. unlock(&ipc_helper_lock);
  278. }
  279. struct shim_ipc_port* lookup_ipc_port(IDTYPE vmid, IDTYPE type) {
  280. struct shim_ipc_port* port = NULL;
  281. assert(vmid && type);
  282. lock(&ipc_helper_lock);
  283. struct shim_ipc_port* tmp;
  284. LISTP_FOR_EACH_ENTRY(tmp, &port_list, list) {
  285. if (tmp->vmid == vmid && (tmp->type & type)) {
  286. debug("found port %p (handle %p) for process %u (type %04x)\n",
  287. tmp, tmp->pal_handle, tmp->vmid & 0xFFFF, tmp->type);
  288. port = tmp;
  289. __get_ipc_port(port);
  290. break;
  291. }
  292. }
  293. unlock(&ipc_helper_lock);
  294. return port;
  295. }
  296. int broadcast_ipc(struct shim_ipc_msg* msg, int target_type, struct shim_ipc_port* exclude_port) {
  297. int ret;
  298. struct shim_ipc_port* port;
  299. struct shim_ipc_port** target_ports;
  300. int target_ports_cnt = 0;
  301. assert(target_type);
  302. lock(&ipc_helper_lock);
  303. /* Collect all ports with appropriate types. In common case, stack-allocated
  304. * array of 32 ports is enough. If there are more ports, we will allocate
  305. * a bigger array on the heap and collect all ports again. */
  306. struct shim_ipc_port* target_ports_stack[32];
  307. LISTP_FOR_EACH_ENTRY(port, &port_list, list) {
  308. if (port == exclude_port)
  309. continue;
  310. if (port->type & target_type) {
  311. if (target_ports_cnt < 32)
  312. target_ports_stack[target_ports_cnt] = port;
  313. target_ports_cnt++;
  314. }
  315. }
  316. target_ports = target_ports_stack;
  317. if (target_ports_cnt > 32) {
  318. /* Rare case when there are more than 32 ports. Allocate big-enough
  319. * array on the heap and collect all ports again. */
  320. int cnt = 0;
  321. struct shim_ipc_port** target_ports_heap =
  322. malloc(sizeof(struct shim_ipc_port *) * target_ports_cnt);
  323. LISTP_FOR_EACH_ENTRY(port, &port_list, list) {
  324. if (port == exclude_port)
  325. continue;
  326. if (port->type & target_type)
  327. target_ports_heap[cnt++] = port;
  328. }
  329. target_ports = target_ports_heap;
  330. assert(cnt == target_ports_cnt);
  331. }
  332. unlock(&ipc_helper_lock);
  333. for (int i = 0; i < target_ports_cnt; i++)
  334. get_ipc_port(target_ports[i]);
  335. /* send msg to each collected port (note that ports cannot be freed in meantime) */
  336. for (int i = 0; i < target_ports_cnt; i++) {
  337. port = target_ports[i];
  338. debug("Broadcast to port %p (handle %p) for process %u (type %x, target %x)\n",
  339. port, port->pal_handle, port->vmid & 0xFFFF, port->type, target_type);
  340. msg->dst = port->vmid;
  341. ret = send_ipc_message(msg, port);
  342. if (ret < 0) {
  343. debug("Broadcast to port %p (handle %p) for process %u failed (errno = %d)!\n",
  344. port, port->pal_handle, port->vmid & 0xFFFF, ret);
  345. goto out;
  346. }
  347. }
  348. ret = 0;
  349. out:
  350. for (int i = 0; i < target_ports_cnt; i++)
  351. put_ipc_port(target_ports[i]);
  352. if (target_ports != target_ports_stack)
  353. free(target_ports);
  354. return ret;
  355. }
  356. static int ipc_resp_callback(struct shim_ipc_msg* msg, struct shim_ipc_port* port) {
  357. struct shim_ipc_resp* resp = (struct shim_ipc_resp*) &msg->msg;
  358. debug("IPC callback from %u: IPC_RESP(%d)\n", msg->src & 0xFFFF, resp->retval);
  359. if (!msg->seq)
  360. return resp->retval;
  361. /* find a corresponding request msg for this response msg */
  362. struct shim_ipc_msg_duplex* req_msg = pop_ipc_msg_duplex(port, msg->seq);
  363. /* if some thread waits for response, wake it up with response retval */
  364. if (req_msg) {
  365. req_msg->retval = resp->retval;
  366. if (req_msg->thread)
  367. thread_wakeup(req_msg->thread);
  368. return 0;
  369. }
  370. return resp->retval;
  371. }
  372. int send_response_ipc_message(struct shim_ipc_port* port, IDTYPE dest, int ret, uint64_t seq) {
  373. ret = (ret == RESPONSE_CALLBACK) ? 0 : ret;
  374. /* create IPC_RESP msg to send to dest, with sequence number seq, and in-body retval ret */
  375. size_t total_msg_size = get_ipc_msg_size(sizeof(struct shim_ipc_resp));
  376. struct shim_ipc_msg* resp_msg = __alloca(total_msg_size);
  377. init_ipc_msg(resp_msg, IPC_RESP, total_msg_size, dest);
  378. resp_msg->seq = seq;
  379. struct shim_ipc_resp* resp = (struct shim_ipc_resp *) resp_msg->msg;
  380. resp->retval = ret;
  381. debug("IPC send to %u: IPC_RESP(%d)\n", resp_msg->dst & 0xFFFF, ret);
  382. return send_ipc_message(resp_msg, port);
  383. }
  384. static int receive_ipc_message(struct shim_ipc_port* port) {
  385. int ret;
  386. int readahead = IPC_MSG_MINIMAL_SIZE * 2;
  387. int bufsize = IPC_MSG_MINIMAL_SIZE + readahead;
  388. struct shim_ipc_msg* msg = malloc(bufsize);
  389. int expected_size = IPC_MSG_MINIMAL_SIZE;
  390. int bytes = 0;
  391. do {
  392. while (bytes < expected_size) {
  393. /* grow msg buffer to accomodate bigger messages */
  394. if (expected_size + readahead > bufsize) {
  395. while (expected_size + readahead > bufsize)
  396. bufsize *= 2;
  397. void* tmp_buf = malloc(bufsize);
  398. memcpy(tmp_buf, msg, bytes);
  399. free(msg);
  400. msg = tmp_buf;
  401. }
  402. int read = DkStreamRead(port->pal_handle, /*offset*/ 0, expected_size - bytes + readahead,
  403. (void *) msg + bytes, NULL, 0);
  404. if (!read) {
  405. if (PAL_ERRNO == EINTR || PAL_ERRNO == EAGAIN || PAL_ERRNO == EWOULDBLOCK)
  406. continue;
  407. debug("Port %p (handle %p) closed while receiving IPC message\n", port, port->pal_handle);
  408. del_ipc_port_fini(port, -ECHILD);
  409. ret = -PAL_ERRNO;
  410. goto out;
  411. }
  412. bytes += read;
  413. /* extract actual msg size from msg header and continue reading msg body */
  414. if (bytes >= IPC_MSG_MINIMAL_SIZE)
  415. expected_size = msg->size;
  416. }
  417. debug("Received IPC message from port %p (handle %p): code=%d size=%lu src=%u dst=%u seq=%lx\n",
  418. port, port->pal_handle, msg->code, msg->size, msg->src & 0xFFFF, msg->dst & 0xFFFF, msg->seq);
  419. /* skip messages coming from myself (in case of broadcast) */
  420. if (msg->src != cur_process.vmid) {
  421. if (msg->code < IPC_CODE_NUM && ipc_callbacks[msg->code]) {
  422. /* invoke callback to this msg */
  423. ret = (*ipc_callbacks[msg->code]) (msg, port);
  424. if ((ret < 0 || ret == RESPONSE_CALLBACK) && msg->seq) {
  425. /* send IPC_RESP message to sender of this msg */
  426. ret = send_response_ipc_message(port, msg->src, ret, msg->seq);
  427. if (ret < 0) {
  428. debug("Sending IPC_RESP msg on port %p (handle %p) to %u failed\n",
  429. port, port->pal_handle, msg->src & 0xFFFF);
  430. ret = -PAL_ERRNO;
  431. goto out;
  432. }
  433. }
  434. }
  435. }
  436. bytes -= expected_size; /* one message was received and handled */
  437. if (bytes > 0) {
  438. /* we may have started reading the next message, move this message
  439. * to beginning of msg buffer and reset expected size */
  440. memmove(msg, (void *) msg + expected_size, bytes);
  441. expected_size = IPC_MSG_MINIMAL_SIZE;
  442. if (bytes >= IPC_MSG_MINIMAL_SIZE)
  443. expected_size = msg->size;
  444. }
  445. } while (bytes > 0);
  446. ret = 0;
  447. out:
  448. free(msg);
  449. return ret;
  450. }
  451. /* Main routine of the IPC helper thread. IPC helper thread is spawned when
  452. * the first IPC port is added and is terminated only when the whole Graphene
  453. * application terminates. IPC helper thread runs in an endless loop and waits
  454. * on port events (either the addition/removal of ports or actual port events:
  455. * acceptance of new client or receiving/sending messages). In particular,
  456. * IPC helper thread calls receive_ipc_message() if a message arrives on port.
  457. *
  458. * Other threads add and remove IPC ports via add_ipc_xxx() and del_ipc_xxx()
  459. * functions. These ports are added to port_list which the IPC helper thread
  460. * consults before each new DkObjectsWaitAny().
  461. *
  462. * Note that ports are copied from global port_list to local object_list. This
  463. * is because ports may be removed from port_list by other threads while IPC
  464. * helper thread is waiting on DkObjectsWaitAny(). For this reason IPC thread
  465. * also get references to all current ports and puts them after handling all
  466. * ports in object_list.
  467. *
  468. * Previous implementation went to great lengths to keep changes to the list of
  469. * current ports to a minimum (instead of repopulating the list before each wait
  470. * like in current code). Unfortunately, this resulted in undue complexity.
  471. * Current implementation should perform fine for usual case of <100 IPC ports
  472. * and with IPC helper thread always running in background on its own core.
  473. */
  474. noreturn static void shim_ipc_helper(void* dummy) {
  475. __UNUSED(dummy);
  476. struct shim_thread* self = get_cur_thread();
  477. PAL_HANDLE polled = NULL;
  478. /* Initialize two lists:
  479. * - object_list collects IPC port objects and is the main handled list
  480. * - palhandle_list collects corresponding PAL handles of IPC port objects
  481. * and is needed for DkObjectsWaitAny(.., <arrya-of-PAL-handles>, ..)
  482. * interface; palhandle_list always contains at least install_new_event
  483. *
  484. * We allocate these two lists on the heap so they do not overflow the
  485. * limited PAL stack. We grow them at runtime if needed.
  486. */
  487. size_t object_list_size = 0;
  488. size_t object_list_maxsize = 32;
  489. struct shim_ipc_port** object_list =
  490. malloc(sizeof(struct shim_ipc_port *) * object_list_maxsize);
  491. PAL_HANDLE* palhandle_list =
  492. malloc(sizeof(PAL_HANDLE) * (1 + object_list_maxsize));
  493. PAL_HANDLE install_new_event_hdl = event_handle(&install_new_event);
  494. palhandle_list[0] = install_new_event_hdl;
  495. while (ipc_helper_state == HELPER_ALIVE) {
  496. struct shim_ipc_port* polled_port = NULL;
  497. if (polled == install_new_event_hdl) {
  498. /* some thread wants to install new event; this event is found
  499. * in object_list below, so just re-init install_new_event */
  500. debug("New IPC event was requested (port was added/removed)\n");
  501. clear_event(&install_new_event);
  502. } else {
  503. /* it is not install_new_event handle, so must be one of ports */
  504. for (size_t i = 0; i < object_list_size; i++)
  505. if (polled == object_list[i]->pal_handle) {
  506. polled_port = object_list[i];
  507. break;
  508. }
  509. }
  510. if (polled_port) {
  511. if (polled_port->type & IPC_PORT_SERVER) {
  512. /* if polled port is server port, accept a client,
  513. * create client port, and add it to port list */
  514. PAL_HANDLE client = DkStreamWaitForClient(polled_port->pal_handle);
  515. if (client) {
  516. /* type of client port is the same as original server port
  517. * but with LISTEN (for remote client) and without SERVER
  518. * (this port doesn't wait for new clients) */
  519. IDTYPE client_type = (polled_port->type & ~IPC_PORT_SERVER) | IPC_PORT_LISTEN;
  520. add_ipc_port_by_id(polled_port->vmid, client, client_type, NULL, NULL);
  521. } else {
  522. debug("Port %p (handle %p) was removed during accepting client\n",
  523. polled_port, polled_port->pal_handle);
  524. del_ipc_port_fini(polled_port, -ECHILD);
  525. }
  526. } else {
  527. PAL_STREAM_ATTR attr;
  528. if (DkStreamAttributesQueryByHandle(polled_port->pal_handle, &attr)) {
  529. /* can read on this port, so receive messages */
  530. if (attr.readable) {
  531. /* NOTE: IPC helper thread does not handle failures currently */
  532. receive_ipc_message(polled_port);
  533. }
  534. if (attr.disconnected) {
  535. debug("Port %p (handle %p) disconnected\n",
  536. polled_port, polled_port->pal_handle);
  537. del_ipc_port_fini(polled_port, -ECONNRESET);
  538. }
  539. } else {
  540. debug("Port %p (handle %p) was removed during attr querying\n",
  541. polled_port, polled_port->pal_handle);
  542. del_ipc_port_fini(polled_port, -PAL_ERRNO);
  543. }
  544. }
  545. }
  546. /* done handling ports; put their references so they can be freed */
  547. for (size_t i = 0; i < object_list_size; i++) {
  548. struct shim_ipc_port* port = object_list[i];
  549. put_ipc_port(port);
  550. }
  551. lock(&ipc_helper_lock);
  552. /* iterate through all ports to repopulate object_list */
  553. object_list_size = 0;
  554. struct shim_ipc_port* port;
  555. struct shim_ipc_port* tmp;
  556. LISTP_FOR_EACH_ENTRY_SAFE(port, tmp, &port_list, list) {
  557. /* get port reference so it is not freed while we wait on/handle it */
  558. __get_ipc_port(port);
  559. /* grow object_list and palhandle_list to accomodate more objects */
  560. if (object_list_size == object_list_maxsize) {
  561. struct shim_ipc_port** tmp_array = malloc(
  562. sizeof(struct shim_ipc_port *) * (object_list_maxsize * 2));
  563. PAL_HANDLE* tmp_pal_array = malloc(
  564. sizeof(PAL_HANDLE) * (1 + object_list_maxsize * 2));
  565. memcpy(tmp_array, object_list,
  566. sizeof(struct shim_ipc_port *) * (object_list_maxsize));
  567. memcpy(tmp_pal_array, palhandle_list,
  568. sizeof(PAL_HANDLE) * (1 + object_list_size));
  569. object_list_maxsize *= 2;
  570. free(object_list);
  571. free(palhandle_list);
  572. object_list = tmp_array;
  573. palhandle_list = tmp_pal_array;
  574. }
  575. /* re-add this port to object_list and palhandle_list */
  576. object_list[object_list_size] = port;
  577. palhandle_list[object_list_size + 1] = port->pal_handle;
  578. object_list_size++;
  579. debug("Listening to process %u on port %p (handle %p, type %04x)\n",
  580. port->vmid & 0xFFFF, port, port->pal_handle, port->type);
  581. }
  582. unlock(&ipc_helper_lock);
  583. /* wait on collected ports' PAL handles + install_new_event_hdl */
  584. polled = DkObjectsWaitAny(object_list_size + 1, palhandle_list, NO_TIMEOUT);
  585. /* ensure that while loop breaks on ipc_helper_state change */
  586. COMPILER_BARRIER();
  587. }
  588. /* IPC thread exits; put acquired port references so they can be freed */
  589. for (size_t i = 0; i < object_list_size; i++) {
  590. struct shim_ipc_port* port = object_list[i];
  591. put_ipc_port(port);
  592. }
  593. free(object_list);
  594. free(palhandle_list);
  595. lock(&ipc_helper_lock);
  596. ipc_helper_state = HELPER_NOTALIVE;
  597. ipc_helper_thread = NULL;
  598. unlock(&ipc_helper_lock);
  599. put_thread(self);
  600. debug("IPC helper thread terminated\n");
  601. DkThreadExit();
  602. }
  603. static void shim_ipc_helper_prepare(void* arg) {
  604. struct shim_thread * self = (struct shim_thread *) arg;
  605. if (!arg)
  606. return;
  607. __libc_tcb_t tcb;
  608. allocate_tls(&tcb, false, self);
  609. debug_setbuf(&tcb.shim_tcb, true);
  610. debug("Set tcb to %p\n", &tcb);
  611. lock(&ipc_helper_lock);
  612. bool notme = (self != ipc_helper_thread);
  613. unlock(&ipc_helper_lock);
  614. void* stack = allocate_stack(IPC_HELPER_STACK_SIZE, allocsize, false);
  615. if (notme || !stack) {
  616. put_thread(self);
  617. DkThreadExit();
  618. return;
  619. }
  620. debug("IPC helper thread started\n");
  621. /* swap stack to be sure we don't drain the small stack PAL provides */
  622. self->stack_top = stack + IPC_HELPER_STACK_SIZE;
  623. self->stack = stack;
  624. __SWITCH_STACK(self->stack_top, shim_ipc_helper, NULL);
  625. }
  626. /* this should be called with the ipc_helper_lock held */
  627. static int create_ipc_helper(void) {
  628. if (ipc_helper_state == HELPER_ALIVE)
  629. return 0;
  630. struct shim_thread * new = get_new_internal_thread();
  631. if (!new)
  632. return -ENOMEM;
  633. ipc_helper_thread = new;
  634. ipc_helper_state = HELPER_ALIVE;
  635. PAL_HANDLE handle = thread_create(shim_ipc_helper_prepare, new);
  636. if (!handle) {
  637. ipc_helper_thread = NULL;
  638. ipc_helper_state = HELPER_NOTALIVE;
  639. put_thread(new);
  640. return -PAL_ERRNO;
  641. }
  642. new->pal_handle = handle;
  643. return 0;
  644. }
  645. /* On success, the reference to ipc helper thread is returned with refcount
  646. * incremented. It is the responsibility of caller to wait for ipc helper's
  647. * exit and then release the final reference to free related resources (it is
  648. * problematic for the thread itself to release its own resources e.g. stack).
  649. */
  650. struct shim_thread* terminate_ipc_helper(void) {
  651. if (ipc_helper_state != HELPER_ALIVE)
  652. return NULL;
  653. /* NOTE: Graphene doesn't have an abstraction of a queue of pending signals
  654. * between communicating processes (instead all communication is done over
  655. * streams). Thus, app code like this (found in e.g. Lmbench's bw_unix):
  656. * kill(child, SIGKILL);
  657. * exit(0);
  658. * results in a data race between the SIGKILL message sent over IPC stream
  659. * and the parent process exiting. In the worst case, the parent will exit
  660. * before the SIGKILL message goes through the host-OS stream, the host OS
  661. * will close the stream, and the message will never be seen by child. To
  662. * prevent such cases, we simply wait for a bit before exiting.
  663. * */
  664. debug("Waiting for 0.5s for all in-flight IPC messages to reach their destinations\n");
  665. DkThreadDelayExecution(500);
  666. lock(&ipc_helper_lock);
  667. struct shim_thread* ret = ipc_helper_thread;
  668. if (ret)
  669. get_thread(ret);
  670. ipc_helper_state = HELPER_NOTALIVE;
  671. unlock(&ipc_helper_lock);
  672. /* force wake up of ipc helper thread so that it exits */
  673. set_event(&install_new_event, 1);
  674. return ret;
  675. }