shim_ipc_helper.c 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877
  1. /* Copyright (C) 2014 Stony Brook University
  2. This file is part of Graphene Library OS.
  3. Graphene Library OS is free software: you can redistribute it and/or
  4. modify it under the terms of the GNU Lesser General Public License
  5. as published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. Graphene Library OS is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Lesser General Public License for more details.
  11. You should have received a copy of the GNU Lesser General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>. */
  13. /*
  14. * shim_ipc_helper.c
  15. *
  16. * This file contains code to create an IPC helper thread inside library OS and maintain bookkeeping
  17. * of IPC ports.
  18. */
  19. #include <list.h>
  20. #include <pal.h>
  21. #include <pal_error.h>
  22. #include <shim_checkpoint.h>
  23. #include <shim_handle.h>
  24. #include <shim_internal.h>
  25. #include <shim_ipc.h>
  26. #include <shim_profile.h>
  27. #include <shim_thread.h>
  28. #include <shim_utils.h>
  29. #define IPC_HELPER_STACK_SIZE (g_pal_alloc_align * 4)
  30. #define PORT_MGR_ALLOC 32
  31. #define OBJ_TYPE struct shim_ipc_port
  32. #include "memmgr.h"
  33. static MEM_MGR port_mgr;
  34. DEFINE_LISTP(shim_ipc_port);
  35. static LISTP_TYPE(shim_ipc_port) port_list;
  36. static enum { HELPER_NOTALIVE, HELPER_ALIVE } ipc_helper_state;
  37. static struct shim_thread* ipc_helper_thread;
  38. static struct shim_lock ipc_helper_lock;
  39. static AEVENTTYPE install_new_event;
  40. static int create_ipc_helper(void);
  41. static int ipc_resp_callback(struct shim_ipc_msg* msg, struct shim_ipc_port* port);
  42. static ipc_callback ipc_callbacks[IPC_CODE_NUM] = {
  43. /* RESP */ &ipc_resp_callback,
  44. /* CHECKPOINT */ &ipc_checkpoint_callback,
  45. /* parents and children */
  46. /* CLD_EXIT */ &ipc_cld_exit_callback,
  47. #ifdef PROFILE
  48. /* CLD_PROFILE */ &ipc_cld_profile_callback,
  49. #endif
  50. /* pid namespace */
  51. IPC_NS_CALLBACKS(pid)
  52. /* PID_KILL */ &ipc_pid_kill_callback,
  53. /* PID_GETSTATUS */ &ipc_pid_getstatus_callback,
  54. /* PID_RETSTATUS */ &ipc_pid_retstatus_callback,
  55. /* PID_GETMETA */ &ipc_pid_getmeta_callback,
  56. /* PID_RETMETA */ &ipc_pid_retmeta_callback,
  57. /* PID_NOP */ &ipc_pid_nop_callback,
  58. /* PID_SENDRPC */ &ipc_pid_sendrpc_callback,
  59. /* sysv namespace */
  60. IPC_NS_CALLBACKS(sysv)
  61. IPC_NS_KEY_CALLBACKS(sysv)
  62. /* SYSV_DELRES */ &ipc_sysv_delres_callback,
  63. /* SYSV_MOVRES */ &ipc_sysv_movres_callback,
  64. /* SYSV_MSGSND */ &ipc_sysv_msgsnd_callback,
  65. /* SYSV_MSGRCV */ &ipc_sysv_msgrcv_callback,
  66. /* SYSV_MSGMOV */ &ipc_sysv_msgmov_callback,
  67. /* SYSV_SEMOP */ &ipc_sysv_semop_callback,
  68. /* SYSV_SEMCTL */ &ipc_sysv_semctl_callback,
  69. /* SYSV_SEMRET */ &ipc_sysv_semret_callback,
  70. /* SYSV_SEMMOV */ &ipc_sysv_semmov_callback,
  71. };
  72. static int init_self_ipc_port(void) {
  73. lock(&cur_process.lock);
  74. if (!cur_process.self) {
  75. /* very first process or clone/fork case: create IPC port from scratch */
  76. cur_process.self = create_ipc_info_cur_process(/*is_self_ipc_info=*/true);
  77. if (!cur_process.self) {
  78. unlock(&cur_process.lock);
  79. return -EACCES;
  80. }
  81. } else {
  82. /* execve case: inherited IPC port from parent process */
  83. assert(cur_process.self->pal_handle && !qstrempty(&cur_process.self->uri));
  84. add_ipc_port_by_id(cur_process.self->vmid, cur_process.self->pal_handle, IPC_PORT_SERVER,
  85. /*fini=*/NULL, &cur_process.self->port);
  86. }
  87. unlock(&cur_process.lock);
  88. return 0;
  89. }
  90. static int init_parent_ipc_port(void) {
  91. if (!PAL_CB(parent_process) || !cur_process.parent) {
  92. /* no parent process, no sense in creating parent IPC port */
  93. return 0;
  94. }
  95. lock(&cur_process.lock);
  96. assert(cur_process.parent && cur_process.parent->vmid);
  97. /* for execve case, my parent is the parent of my parent (current process transparently inherits
  98. * the "real" parent through already opened pal_handle on "temporary" parent's
  99. * cur_process.parent) */
  100. if (!cur_process.parent->pal_handle) {
  101. /* for clone/fork case, parent is connected on parent_process */
  102. cur_process.parent->pal_handle = PAL_CB(parent_process);
  103. }
  104. add_ipc_port_by_id(cur_process.parent->vmid, cur_process.parent->pal_handle,
  105. IPC_PORT_DIRPRT | IPC_PORT_LISTEN,
  106. /*fini=*/NULL, &cur_process.parent->port);
  107. unlock(&cur_process.lock);
  108. return 0;
  109. }
  110. static int init_ns_ipc_port(int ns_idx) {
  111. if (!cur_process.ns[ns_idx]) {
  112. /* no NS info from parent process, no sense in creating NS IPC port */
  113. return 0;
  114. }
  115. if (!cur_process.ns[ns_idx]->pal_handle && qstrempty(&cur_process.ns[ns_idx]->uri)) {
  116. /* there is no connection to NS leader via PAL handle and there is no URI to find NS leader:
  117. * do not create NS IPC port now, it will be created on-demand during NS leader lookup */
  118. return 0;
  119. }
  120. lock(&cur_process.lock);
  121. if (!cur_process.ns[ns_idx]->pal_handle) {
  122. debug("Reconnecting IPC port %s\n", qstrgetstr(&cur_process.ns[ns_idx]->uri));
  123. cur_process.ns[ns_idx]->pal_handle =
  124. DkStreamOpen(qstrgetstr(&cur_process.ns[ns_idx]->uri), 0, 0, 0, 0);
  125. if (!cur_process.ns[ns_idx]->pal_handle) {
  126. unlock(&cur_process.lock);
  127. return -PAL_ERRNO;
  128. }
  129. }
  130. IDTYPE type = (ns_idx == PID_NS) ? IPC_PORT_PIDLDR : IPC_PORT_SYSVLDR;
  131. add_ipc_port_by_id(cur_process.ns[ns_idx]->vmid, cur_process.ns[ns_idx]->pal_handle,
  132. type | IPC_PORT_LISTEN,
  133. /*fini=*/NULL, &cur_process.ns[ns_idx]->port);
  134. unlock(&cur_process.lock);
  135. return 0;
  136. }
  137. int init_ipc_ports(void) {
  138. if (!(port_mgr = create_mem_mgr(init_align_up(PORT_MGR_ALLOC))))
  139. return -ENOMEM;
  140. int ret;
  141. if ((ret = init_self_ipc_port()) < 0)
  142. return ret;
  143. if ((ret = init_parent_ipc_port()) < 0)
  144. return ret;
  145. if ((ret = init_ns_ipc_port(PID_NS)) < 0)
  146. return ret;
  147. if ((ret = init_ns_ipc_port(SYSV_NS)) < 0)
  148. return ret;
  149. return 0;
  150. }
  151. int init_ipc_helper(void) {
  152. /* early enough in init, can write global vars without the lock */
  153. ipc_helper_state = HELPER_NOTALIVE;
  154. create_lock(&ipc_helper_lock);
  155. create_event(&install_new_event);
  156. /* some IPC ports were already added before this point, so spawn IPC helper thread (and enable
  157. * locking mechanisms if not done already since we are going in multi-threaded mode) */
  158. enable_locking();
  159. lock(&ipc_helper_lock);
  160. create_ipc_helper();
  161. unlock(&ipc_helper_lock);
  162. return 0;
  163. }
  164. static struct shim_ipc_port* __create_ipc_port(PAL_HANDLE hdl) {
  165. struct shim_ipc_port* port =
  166. get_mem_obj_from_mgr_enlarge(port_mgr, size_align_up(PORT_MGR_ALLOC));
  167. if (!port)
  168. return NULL;
  169. memset(port, 0, sizeof(struct shim_ipc_port));
  170. port->pal_handle = hdl;
  171. INIT_LIST_HEAD(port, list);
  172. INIT_LISTP(&port->msgs);
  173. REF_SET(port->ref_count, 0);
  174. create_lock(&port->msgs_lock);
  175. return port;
  176. }
  177. static void __free_ipc_port(struct shim_ipc_port* port) {
  178. if (port->pal_handle) {
  179. DkObjectClose(port->pal_handle);
  180. port->pal_handle = NULL;
  181. }
  182. destroy_lock(&port->msgs_lock);
  183. free_mem_obj_to_mgr(port_mgr, port);
  184. }
  185. static void __get_ipc_port(struct shim_ipc_port* port) {
  186. REF_INC(port->ref_count);
  187. }
  188. static void __put_ipc_port(struct shim_ipc_port* port) {
  189. int ref_count = REF_DEC(port->ref_count);
  190. if (!ref_count)
  191. __free_ipc_port(port);
  192. }
  193. void get_ipc_port(struct shim_ipc_port* port) {
  194. /* no need to grab ipc_helper_lock because __get_ipc_port() is atomic */
  195. __get_ipc_port(port);
  196. }
  197. void put_ipc_port(struct shim_ipc_port* port) {
  198. /* this is atomic so we don't grab lock in common case of ref_count > 0 */
  199. int ref_count = REF_DEC(port->ref_count);
  200. if (!ref_count) {
  201. lock(&ipc_helper_lock);
  202. __free_ipc_port(port);
  203. unlock(&ipc_helper_lock);
  204. }
  205. }
  206. static void __add_ipc_port(struct shim_ipc_port* port, IDTYPE vmid, IDTYPE type, port_fini fini) {
  207. port->type |= type;
  208. if (vmid && !port->vmid)
  209. port->vmid = vmid;
  210. /* find empty slot in fini callbacks and register callback */
  211. if (fini) {
  212. bool found_empty_slot = false;
  213. __UNUSED(found_empty_slot);
  214. for (int i = 0; i < MAX_IPC_PORT_FINI_CB; i++)
  215. if (!port->fini[i] || port->fini[i] == fini) {
  216. port->fini[i] = fini;
  217. found_empty_slot = true;
  218. break;
  219. }
  220. assert(found_empty_slot);
  221. }
  222. /* add to port list if not there already */
  223. if (LIST_EMPTY(port, list)) {
  224. __get_ipc_port(port);
  225. LISTP_ADD(port, &port_list, list);
  226. }
  227. /* wake up IPC helper thread so that it picks up added port */
  228. if (ipc_helper_state == HELPER_ALIVE)
  229. set_event(&install_new_event, 1);
  230. }
  231. static void __del_ipc_port(struct shim_ipc_port* port) {
  232. debug("Deleting port %p (handle %p) of process %u\n", port, port->pal_handle,
  233. port->vmid & 0xFFFF);
  234. DkStreamDelete(port->pal_handle, 0);
  235. LISTP_DEL_INIT(port, &port_list, list);
  236. /* Check for pending messages on port (threads might be blocking for responses) */
  237. lock(&port->msgs_lock);
  238. struct shim_ipc_msg_duplex* msg;
  239. struct shim_ipc_msg_duplex* tmp;
  240. LISTP_FOR_EACH_ENTRY_SAFE(msg, tmp, &port->msgs, list) {
  241. LISTP_DEL_INIT(msg, &port->msgs, list);
  242. msg->retval = -ECONNRESET;
  243. if (msg->thread) {
  244. debug("Deleted pending message on port %p, wake up blocking thread %d\n", port,
  245. msg->thread->tid);
  246. thread_wakeup(msg->thread);
  247. }
  248. }
  249. unlock(&port->msgs_lock);
  250. __put_ipc_port(port);
  251. /* wake up IPC helper thread so that it forgets about deleted port */
  252. if (ipc_helper_state == HELPER_ALIVE)
  253. set_event(&install_new_event, 1);
  254. }
  255. void add_ipc_port(struct shim_ipc_port* port, IDTYPE vmid, IDTYPE type, port_fini fini) {
  256. debug("Adding port %p (handle %p) for process %u (type=%04x)\n", port, port->pal_handle,
  257. port->vmid & 0xFFFF, type);
  258. lock(&ipc_helper_lock);
  259. __add_ipc_port(port, vmid, type, fini);
  260. unlock(&ipc_helper_lock);
  261. }
  262. void add_ipc_port_by_id(IDTYPE vmid, PAL_HANDLE hdl, IDTYPE type, port_fini fini,
  263. struct shim_ipc_port** portptr) {
  264. debug("Adding port (handle %p) for process %u (type %04x)\n", hdl, vmid & 0xFFFF, type);
  265. struct shim_ipc_port* port = NULL;
  266. if (portptr)
  267. *portptr = NULL;
  268. assert(hdl);
  269. lock(&ipc_helper_lock);
  270. /* check if port with this PAL handle already exists, then we only need to update its vmid,
  271. * type, and fini callback */
  272. struct shim_ipc_port* tmp;
  273. LISTP_FOR_EACH_ENTRY(tmp, &port_list, list) {
  274. if (tmp->pal_handle == hdl) {
  275. port = tmp;
  276. break;
  277. }
  278. }
  279. if (!port) {
  280. /* port does not yet exist, create it */
  281. port = __create_ipc_port(hdl);
  282. if (!port) {
  283. debug("Failed to create IPC port for handle %p\n", hdl);
  284. goto out;
  285. }
  286. }
  287. /* add/update port */
  288. __add_ipc_port(port, vmid, type, fini);
  289. if (portptr) {
  290. __get_ipc_port(port);
  291. *portptr = port;
  292. }
  293. out:
  294. unlock(&ipc_helper_lock);
  295. }
  296. void del_ipc_port_fini(struct shim_ipc_port* port, unsigned int exitcode) {
  297. lock(&ipc_helper_lock);
  298. if (LIST_EMPTY(port, list)) {
  299. unlock(&ipc_helper_lock);
  300. return;
  301. }
  302. /* prevent __del_ipc_port() from freeing port since we need it for fini callbacks */
  303. __get_ipc_port(port);
  304. __del_ipc_port(port);
  305. unlock(&ipc_helper_lock);
  306. for (int i = 0; i < MAX_IPC_PORT_FINI_CB; i++)
  307. if (port->fini[i]) {
  308. (port->fini[i])(port, port->vmid, exitcode);
  309. port->fini[i] = NULL;
  310. }
  311. __put_ipc_port(port);
  312. }
  313. void del_all_ipc_ports(void) {
  314. lock(&ipc_helper_lock);
  315. struct shim_ipc_port* port;
  316. struct shim_ipc_port* tmp;
  317. LISTP_FOR_EACH_ENTRY_SAFE(port, tmp, &port_list, list) {
  318. __del_ipc_port(port);
  319. }
  320. unlock(&ipc_helper_lock);
  321. }
  322. struct shim_ipc_port* lookup_ipc_port(IDTYPE vmid, IDTYPE type) {
  323. struct shim_ipc_port* port = NULL;
  324. assert(vmid && type);
  325. lock(&ipc_helper_lock);
  326. struct shim_ipc_port* tmp;
  327. LISTP_FOR_EACH_ENTRY(tmp, &port_list, list) {
  328. if (tmp->vmid == vmid && (tmp->type & type)) {
  329. debug("Found port %p (handle %p) for process %u (type %04x)\n", tmp, tmp->pal_handle,
  330. tmp->vmid & 0xFFFF, tmp->type);
  331. port = tmp;
  332. __get_ipc_port(port);
  333. break;
  334. }
  335. }
  336. unlock(&ipc_helper_lock);
  337. return port;
  338. }
  339. #define PORTS_ON_STACK_CNT 32
  340. int broadcast_ipc(struct shim_ipc_msg* msg, int target_type, struct shim_ipc_port* exclude_port) {
  341. int ret;
  342. struct shim_ipc_port* port;
  343. struct shim_ipc_port** target_ports;
  344. size_t target_ports_cnt = 0;
  345. assert(target_type);
  346. lock(&ipc_helper_lock);
  347. /* Collect all ports with appropriate types. In common case, stack-allocated array of
  348. * PORTS_ON_STACK_CNT ports is enough. If there are more ports, we will allocate a bigger array
  349. * on the heap and collect all ports again. */
  350. struct shim_ipc_port* target_ports_stack[PORTS_ON_STACK_CNT];
  351. LISTP_FOR_EACH_ENTRY(port, &port_list, list) {
  352. if (port == exclude_port)
  353. continue;
  354. if (port->type & target_type) {
  355. if (target_ports_cnt < PORTS_ON_STACK_CNT)
  356. target_ports_stack[target_ports_cnt] = port;
  357. target_ports_cnt++;
  358. }
  359. }
  360. target_ports = target_ports_stack;
  361. if (target_ports_cnt > PORTS_ON_STACK_CNT) {
  362. /* Rare case when there are more than PORTS_ON_STACK_CNT ports. Allocate big-enough array on
  363. * the heap and collect all ports again. */
  364. size_t cnt = 0;
  365. struct shim_ipc_port** target_ports_heap =
  366. malloc(sizeof(struct shim_ipc_port*) * target_ports_cnt);
  367. LISTP_FOR_EACH_ENTRY(port, &port_list, list) {
  368. if (port == exclude_port)
  369. continue;
  370. if (port->type & target_type)
  371. target_ports_heap[cnt++] = port;
  372. }
  373. target_ports = target_ports_heap;
  374. assert(cnt == target_ports_cnt);
  375. }
  376. for (size_t i = 0; i < target_ports_cnt; i++)
  377. __get_ipc_port(target_ports[i]);
  378. unlock(&ipc_helper_lock);
  379. /* send msg to each collected port (note that ports cannot be freed in meantime) */
  380. for (size_t i = 0; i < target_ports_cnt; i++) {
  381. port = target_ports[i];
  382. debug("Broadcast to port %p (handle %p) for process %u (type %x, target %x)\n",
  383. port, port->pal_handle, port->vmid & 0xFFFF, port->type, target_type);
  384. msg->dst = port->vmid;
  385. ret = send_ipc_message(msg, port);
  386. if (ret < 0) {
  387. debug("Broadcast to port %p (handle %p) for process %u failed (errno = %d)!\n",
  388. port, port->pal_handle, port->vmid & 0xFFFF, ret);
  389. goto out;
  390. }
  391. }
  392. ret = 0;
  393. out:
  394. for (size_t i = 0; i < target_ports_cnt; i++)
  395. put_ipc_port(target_ports[i]);
  396. if (target_ports != target_ports_stack)
  397. free(target_ports);
  398. return ret;
  399. }
  400. static int ipc_resp_callback(struct shim_ipc_msg* msg, struct shim_ipc_port* port) {
  401. struct shim_ipc_resp* resp = (struct shim_ipc_resp*)&msg->msg;
  402. debug("IPC callback from %u: IPC_RESP(%d)\n", msg->src & 0xFFFF, resp->retval);
  403. if (!msg->seq)
  404. return resp->retval;
  405. /* find a corresponding request msg for this response msg */
  406. struct shim_ipc_msg_duplex* req_msg = pop_ipc_msg_duplex(port, msg->seq);
  407. /* if some thread is waiting for response, wake it with response retval */
  408. if (req_msg) {
  409. req_msg->retval = resp->retval;
  410. if (req_msg->thread)
  411. thread_wakeup(req_msg->thread);
  412. return 0;
  413. }
  414. return resp->retval;
  415. }
  416. int send_response_ipc_message(struct shim_ipc_port* port, IDTYPE dest, int ret, unsigned long seq) {
  417. ret = (ret == RESPONSE_CALLBACK) ? 0 : ret;
  418. /* create IPC_RESP msg to send to dest, with sequence number seq, and in-body retval ret */
  419. size_t total_msg_size = get_ipc_msg_size(sizeof(struct shim_ipc_resp));
  420. struct shim_ipc_msg* resp_msg = __alloca(total_msg_size);
  421. init_ipc_msg(resp_msg, IPC_RESP, total_msg_size, dest);
  422. resp_msg->seq = seq;
  423. struct shim_ipc_resp* resp = (struct shim_ipc_resp*)resp_msg->msg;
  424. resp->retval = ret;
  425. debug("IPC send to %u: IPC_RESP(%d)\n", resp_msg->dst & 0xFFFF, ret);
  426. return send_ipc_message(resp_msg, port);
  427. }
  428. static int receive_ipc_message(struct shim_ipc_port* port) {
  429. int ret;
  430. size_t readahead = IPC_MSG_MINIMAL_SIZE * 2;
  431. size_t bufsize = IPC_MSG_MINIMAL_SIZE + readahead;
  432. struct shim_ipc_msg* msg = malloc(bufsize);
  433. size_t expected_size = IPC_MSG_MINIMAL_SIZE;
  434. size_t bytes = 0;
  435. do {
  436. while (bytes < expected_size) {
  437. /* grow msg buffer to accomodate bigger messages */
  438. if (expected_size + readahead > bufsize) {
  439. while (expected_size + readahead > bufsize)
  440. bufsize *= 2;
  441. void* tmp_buf = malloc(bufsize);
  442. memcpy(tmp_buf, msg, bytes);
  443. free(msg);
  444. msg = tmp_buf;
  445. }
  446. PAL_NUM read =
  447. DkStreamRead(port->pal_handle, /*offset=*/0, expected_size - bytes + readahead,
  448. (void*)msg + bytes, NULL, 0);
  449. if (read == PAL_STREAM_ERROR) {
  450. if (PAL_ERRNO == EINTR || PAL_ERRNO == EAGAIN || PAL_ERRNO == EWOULDBLOCK)
  451. continue;
  452. debug("Port %p (handle %p) closed while receiving IPC message\n", port,
  453. port->pal_handle);
  454. del_ipc_port_fini(port, -ECHILD);
  455. ret = -PAL_ERRNO;
  456. goto out;
  457. }
  458. bytes += read;
  459. /* extract actual msg size from msg header and continue reading msg body */
  460. if (bytes >= IPC_MSG_MINIMAL_SIZE)
  461. expected_size = msg->size;
  462. }
  463. debug(
  464. "Received IPC message from port %p (handle %p): code=%d size=%lu "
  465. "src=%u dst=%u seq=%lx\n",
  466. port, port->pal_handle, msg->code, msg->size, msg->src & 0xFFFF, msg->dst & 0xFFFF,
  467. msg->seq);
  468. /* skip messages coming from myself (in case of broadcast) */
  469. if (msg->src != cur_process.vmid) {
  470. if (msg->code < IPC_CODE_NUM && ipc_callbacks[msg->code]) {
  471. /* invoke callback to this msg */
  472. ret = (*ipc_callbacks[msg->code])(msg, port);
  473. if ((ret < 0 || ret == RESPONSE_CALLBACK) && msg->seq) {
  474. /* send IPC_RESP message to sender of this msg */
  475. ret = send_response_ipc_message(port, msg->src, ret, msg->seq);
  476. if (ret < 0) {
  477. debug("Sending IPC_RESP msg on port %p (handle %p) to %u failed\n", port,
  478. port->pal_handle, msg->src & 0xFFFF);
  479. ret = -PAL_ERRNO;
  480. goto out;
  481. }
  482. }
  483. }
  484. }
  485. bytes -= expected_size; /* one message was received and handled */
  486. if (bytes > 0) {
  487. /* we may have started reading the next message, move this message to beginning of msg
  488. * buffer and reset expected size */
  489. memmove(msg, (void*)msg + expected_size, bytes);
  490. expected_size = IPC_MSG_MINIMAL_SIZE;
  491. if (bytes >= IPC_MSG_MINIMAL_SIZE)
  492. expected_size = msg->size;
  493. }
  494. } while (bytes > 0);
  495. ret = 0;
  496. out:
  497. free(msg);
  498. return ret;
  499. }
  500. /* Main routine of the IPC helper thread. IPC helper thread is spawned when the first IPC port is
  501. * added and is terminated only when the whole Graphene application terminates. IPC helper thread
  502. * runs in an endless loop and waits on port events (either the addition/removal of ports or actual
  503. * port events: acceptance of new client or receiving/sending messages). In particular, IPC helper
  504. * thread calls receive_ipc_message() if a message arrives on port.
  505. *
  506. * Other threads add and remove IPC ports via add_ipc_xxx() and del_ipc_xxx() functions. These ports
  507. * are added to port_list which the IPC helper thread consults before each new DkObjectsWaitAny().
  508. *
  509. * Note that ports are copied from global port_list to local object_list. This is because ports may
  510. * be removed from port_list by other threads while IPC helper thread is waiting on
  511. * DkObjectsWaitAny(). For this reason IPC thread also get references to all current ports and puts
  512. * them after handling all ports in object_list.
  513. *
  514. * Previous implementation went to great lengths to keep changes to the list of current ports to a
  515. * minimum (instead of repopulating the list before each wait like in current code). Unfortunately,
  516. * this resulted in undue complexity. Current implementation should perform fine for usual case of
  517. * <100 IPC ports and with IPC helper thread always running in background on its own core.
  518. */
  519. noreturn static void shim_ipc_helper(void* dummy) {
  520. __UNUSED(dummy);
  521. struct shim_thread* self = get_cur_thread();
  522. PAL_HANDLE polled = NULL;
  523. /* Initialize two lists:
  524. * - object_list collects IPC port objects and is the main handled list
  525. * - palhandle_list collects corresponding PAL handles of IPC port objects and is needed for
  526. * DkObjectsWaitAny(.., <array-of-PAL-handles>, ..) interface; palhandle_list always contains
  527. * at least install_new_event
  528. *
  529. * We allocate these two lists on the heap so they do not overflow the limited PAL stack. We
  530. * grow them at runtime if needed.
  531. */
  532. size_t object_list_size = 0;
  533. size_t object_list_maxsize = 32;
  534. struct shim_ipc_port** object_list =
  535. malloc(sizeof(struct shim_ipc_port*) * object_list_maxsize);
  536. PAL_HANDLE* palhandle_list = malloc(sizeof(PAL_HANDLE) * (1 + object_list_maxsize));
  537. PAL_HANDLE install_new_event_hdl = event_handle(&install_new_event);
  538. palhandle_list[0] = install_new_event_hdl;
  539. while (true) {
  540. lock(&ipc_helper_lock);
  541. if (ipc_helper_state != HELPER_ALIVE) {
  542. ipc_helper_thread = NULL;
  543. unlock(&ipc_helper_lock);
  544. break;
  545. }
  546. unlock(&ipc_helper_lock);
  547. struct shim_ipc_port* polled_port = NULL;
  548. if (polled == install_new_event_hdl) {
  549. /* some thread wants to install new event; this event is found in object_list below, so
  550. * just re-init install_new_event */
  551. debug("New IPC event was requested (port was added/removed)\n");
  552. clear_event(&install_new_event);
  553. } else {
  554. /* it is not install_new_event handle, so must be one of ports */
  555. for (size_t i = 0; i < object_list_size; i++)
  556. if (polled == object_list[i]->pal_handle) {
  557. polled_port = object_list[i];
  558. break;
  559. }
  560. }
  561. if (polled_port) {
  562. if (polled_port->type & IPC_PORT_SERVER) {
  563. /* if polled port is server port, accept a client, create client port, and add it to
  564. * port list */
  565. PAL_HANDLE client = DkStreamWaitForClient(polled_port->pal_handle);
  566. if (client) {
  567. /* type of client port is the same as original server port but with LISTEN (for
  568. * remote client) and without SERVER (this port doesn't wait for new clients) */
  569. IDTYPE client_type = (polled_port->type & ~IPC_PORT_SERVER) | IPC_PORT_LISTEN;
  570. add_ipc_port_by_id(polled_port->vmid, client, client_type, NULL, NULL);
  571. } else {
  572. debug("Port %p (handle %p) was removed during accepting client\n", polled_port,
  573. polled_port->pal_handle);
  574. del_ipc_port_fini(polled_port, -ECHILD);
  575. }
  576. } else {
  577. PAL_STREAM_ATTR attr;
  578. if (DkStreamAttributesQueryByHandle(polled_port->pal_handle, &attr)) {
  579. /* can read on this port, so receive messages */
  580. if (attr.readable) {
  581. /* NOTE: IPC helper thread does not handle failures currently */
  582. receive_ipc_message(polled_port);
  583. }
  584. if (attr.disconnected) {
  585. debug("Port %p (handle %p) disconnected\n", polled_port,
  586. polled_port->pal_handle);
  587. del_ipc_port_fini(polled_port, -ECONNRESET);
  588. }
  589. } else {
  590. debug("Port %p (handle %p) was removed during attr querying\n", polled_port,
  591. polled_port->pal_handle);
  592. del_ipc_port_fini(polled_port, -PAL_ERRNO);
  593. }
  594. }
  595. }
  596. /* done handling ports; put their references so they can be freed */
  597. for (size_t i = 0; i < object_list_size; i++)
  598. put_ipc_port(object_list[i]);
  599. lock(&ipc_helper_lock);
  600. /* iterate through all ports to repopulate object_list */
  601. object_list_size = 0;
  602. struct shim_ipc_port* port;
  603. struct shim_ipc_port* tmp;
  604. LISTP_FOR_EACH_ENTRY_SAFE(port, tmp, &port_list, list) {
  605. /* get port reference so it is not freed while we wait on/handle it */
  606. __get_ipc_port(port);
  607. if (object_list_size == object_list_maxsize) {
  608. /* grow object_list and palhandle_list to accomodate more objects */
  609. struct shim_ipc_port** tmp_array =
  610. malloc(sizeof(struct shim_ipc_port*) * (object_list_maxsize * 2));
  611. PAL_HANDLE* tmp_pal_array =
  612. malloc(sizeof(PAL_HANDLE) * (1 + object_list_maxsize * 2));
  613. memcpy(tmp_array, object_list, sizeof(struct shim_ipc_port*) * (object_list_size));
  614. memcpy(tmp_pal_array, palhandle_list, sizeof(PAL_HANDLE) * (1 + object_list_size));
  615. object_list_maxsize *= 2;
  616. free(object_list);
  617. free(palhandle_list);
  618. object_list = tmp_array;
  619. palhandle_list = tmp_pal_array;
  620. }
  621. /* re-add this port to object_list and palhandle_list */
  622. object_list[object_list_size] = port;
  623. palhandle_list[object_list_size + 1] = port->pal_handle;
  624. object_list_size++;
  625. debug("Listening to process %u on port %p (handle %p, type %04x)\n",
  626. port->vmid & 0xFFFF, port, port->pal_handle, port->type);
  627. }
  628. unlock(&ipc_helper_lock);
  629. /* wait on collected ports' PAL handles + install_new_event_hdl */
  630. polled = DkObjectsWaitAny(object_list_size + 1, palhandle_list, NO_TIMEOUT);
  631. }
  632. /* IPC thread exits; put acquired port references so they can be freed */
  633. for (size_t i = 0; i < object_list_size; i++)
  634. put_ipc_port(object_list[i]);
  635. free(object_list);
  636. free(palhandle_list);
  637. __disable_preempt(self->shim_tcb);
  638. put_thread(self);
  639. debug("IPC helper thread terminated\n");
  640. DkThreadExit(/*clear_child_tid=*/NULL);
  641. }
  642. static void shim_ipc_helper_prepare(void* arg) {
  643. struct shim_thread* self = (struct shim_thread*)arg;
  644. if (!arg)
  645. return;
  646. shim_tcb_init();
  647. set_cur_thread(self);
  648. update_fs_base(0);
  649. debug_setbuf(shim_get_tcb(), true);
  650. lock(&ipc_helper_lock);
  651. bool notme = (self != ipc_helper_thread);
  652. unlock(&ipc_helper_lock);
  653. void* stack = allocate_stack(IPC_HELPER_STACK_SIZE, g_pal_alloc_align, false);
  654. if (notme || !stack) {
  655. free(stack);
  656. put_thread(self);
  657. DkThreadExit(/*clear_child_tid=*/NULL);
  658. return;
  659. }
  660. debug("IPC helper thread started\n");
  661. /* swap stack to be sure we don't drain the small stack PAL provides */
  662. self->stack_top = stack + IPC_HELPER_STACK_SIZE;
  663. self->stack = stack;
  664. __SWITCH_STACK(self->stack_top, shim_ipc_helper, NULL);
  665. }
  666. /* this should be called with the ipc_helper_lock held */
  667. static int create_ipc_helper(void) {
  668. if (ipc_helper_state == HELPER_ALIVE)
  669. return 0;
  670. struct shim_thread* new = get_new_internal_thread();
  671. if (!new)
  672. return -ENOMEM;
  673. ipc_helper_thread = new;
  674. ipc_helper_state = HELPER_ALIVE;
  675. PAL_HANDLE handle = thread_create(shim_ipc_helper_prepare, new);
  676. if (!handle) {
  677. int ret = -PAL_ERRNO; /* put_thread() may overwrite errno */
  678. ipc_helper_thread = NULL;
  679. ipc_helper_state = HELPER_NOTALIVE;
  680. put_thread(new);
  681. return ret;
  682. }
  683. new->pal_handle = handle;
  684. return 0;
  685. }
  686. /* On success, the reference to ipc helper thread is returned with refcount incremented. It is the
  687. * responsibility of caller to wait for ipc helper's exit and then release the final reference to
  688. * free related resources (it is problematic for the thread itself to release its own resources e.g.
  689. * stack).
  690. */
  691. struct shim_thread* terminate_ipc_helper(void) {
  692. /* First check if thread is alive. */
  693. lock(&ipc_helper_lock);
  694. if (ipc_helper_state != HELPER_ALIVE) {
  695. unlock(&ipc_helper_lock);
  696. return NULL;
  697. }
  698. unlock(&ipc_helper_lock);
  699. /* NOTE: Graphene doesn't have an abstraction of a queue of pending signals between
  700. * communicating processes (instead all communication is done over streams). Thus, app code like
  701. * this (found in e.g. Lmbench's bw_unix):
  702. * kill(child, SIGKILL);
  703. * exit(0);
  704. * results in a data race between the SIGKILL message sent over IPC stream and the parent
  705. * process exiting. In the worst case, the parent will exit before the SIGKILL message goes
  706. * through the host-OS stream, the host OS will close the stream, and the message will never be
  707. * seen by child. To prevent such cases, we simply wait for a bit before exiting.
  708. */
  709. debug(
  710. "Waiting for 0.5s for all in-flight IPC messages to reach their destinations\n");
  711. DkThreadDelayExecution(500000); /* in microseconds */
  712. lock(&ipc_helper_lock);
  713. if (ipc_helper_state != HELPER_ALIVE) {
  714. unlock(&ipc_helper_lock);
  715. return NULL;
  716. }
  717. struct shim_thread* ret = ipc_helper_thread;
  718. if (ret)
  719. get_thread(ret);
  720. ipc_helper_state = HELPER_NOTALIVE;
  721. unlock(&ipc_helper_lock);
  722. /* force wake up of ipc helper thread so that it exits */
  723. set_event(&install_new_event, 1);
  724. return ret;
  725. }