shim_ipc_helper.c 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879
  1. /* Copyright (C) 2014 Stony Brook University
  2. This file is part of Graphene Library OS.
  3. Graphene Library OS is free software: you can redistribute it and/or
  4. modify it under the terms of the GNU Lesser General Public License
  5. as published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. Graphene Library OS is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Lesser General Public License for more details.
  11. You should have received a copy of the GNU Lesser General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>. */
  13. /*
  14. * shim_ipc_helper.c
  15. *
  16. * This file contains code to create an IPC helper thread inside library OS
  17. * and maintain bookkeeping of IPC ports.
  18. */
  19. #include <shim_internal.h>
  20. #include <shim_utils.h>
  21. #include <shim_thread.h>
  22. #include <shim_handle.h>
  23. #include <shim_ipc.h>
  24. #include <shim_checkpoint.h>
  25. #include <shim_profile.h>
  26. #include <pal.h>
  27. #include <pal_error.h>
  28. #include <list.h>
  29. #define IPC_HELPER_STACK_SIZE (allocsize * 4)
  30. #define PORT_MGR_ALLOC 32
  31. #define PAGE_SIZE allocsize
  32. #define OBJ_TYPE struct shim_ipc_port
  33. #include "memmgr.h"
  34. static MEM_MGR port_mgr;
  35. DEFINE_LISTP(shim_ipc_port);
  36. static LISTP_TYPE(shim_ipc_port) port_list;
  37. static enum { HELPER_NOTALIVE, HELPER_ALIVE } ipc_helper_state;
  38. static struct shim_thread* ipc_helper_thread;
  39. static struct shim_lock ipc_helper_lock;
  40. static AEVENTTYPE install_new_event;
  41. static int create_ipc_helper(void);
  42. static int ipc_resp_callback(struct shim_ipc_msg* msg, struct shim_ipc_port* port);
  43. static ipc_callback ipc_callbacks[IPC_CODE_NUM] = {
  44. /* RESP */ &ipc_resp_callback,
  45. /* CHECKPOINT */ &ipc_checkpoint_callback,
  46. /* parents and children */
  47. /* CLD_EXIT */ &ipc_cld_exit_callback,
  48. #ifdef PROFILE
  49. /* CLD_PROFILE */ &ipc_cld_profile_callback,
  50. #endif
  51. /* pid namespace */
  52. IPC_NS_CALLBACKS(pid)
  53. /* PID_KILL */ &ipc_pid_kill_callback,
  54. /* PID_GETSTATUS */ &ipc_pid_getstatus_callback,
  55. /* PID_RETSTATUS */ &ipc_pid_retstatus_callback,
  56. /* PID_GETMETA */ &ipc_pid_getmeta_callback,
  57. /* PID_RETMETA */ &ipc_pid_retmeta_callback,
  58. /* PID_NOP */ &ipc_pid_nop_callback,
  59. /* PID_SENDRPC */ &ipc_pid_sendrpc_callback,
  60. /* sysv namespace */
  61. IPC_NS_CALLBACKS(sysv)
  62. IPC_NS_KEY_CALLBACKS(sysv)
  63. /* SYSV_DELRES */ &ipc_sysv_delres_callback,
  64. /* SYSV_MOVRES */ &ipc_sysv_movres_callback,
  65. /* SYSV_MSGSND */ &ipc_sysv_msgsnd_callback,
  66. /* SYSV_MSGRCV */ &ipc_sysv_msgrcv_callback,
  67. /* SYSV_MSGMOV */ &ipc_sysv_msgmov_callback,
  68. /* SYSV_SEMOP */ &ipc_sysv_semop_callback,
  69. /* SYSV_SEMCTL */ &ipc_sysv_semctl_callback,
  70. /* SYSV_SEMRET */ &ipc_sysv_semret_callback,
  71. /* SYSV_SEMMOV */ &ipc_sysv_semmov_callback,
  72. };
  73. static int init_self_ipc_port(void) {
  74. lock(&cur_process.lock);
  75. if (!cur_process.self) {
  76. /* very first process or clone/fork case: create IPC port from scratch */
  77. cur_process.self = create_ipc_info_cur_process(/*is_self_ipc_info=*/true);
  78. if (!cur_process.self) {
  79. unlock(&cur_process.lock);
  80. return -EACCES;
  81. }
  82. } else {
  83. /* execve case: inherited IPC port from parent process */
  84. assert(cur_process.self->pal_handle && !qstrempty(&cur_process.self->uri));
  85. add_ipc_port_by_id(cur_process.self->vmid,
  86. cur_process.self->pal_handle,
  87. IPC_PORT_SERVER,
  88. /*fini=*/NULL,
  89. &cur_process.self->port);
  90. }
  91. unlock(&cur_process.lock);
  92. return 0;
  93. }
  94. static int init_parent_ipc_port(void) {
  95. if (!PAL_CB(parent_process) || !cur_process.parent) {
  96. /* no parent process, no sense in creating parent IPC port */
  97. return 0;
  98. }
  99. lock(&cur_process.lock);
  100. assert(cur_process.parent && cur_process.parent->vmid);
  101. /* for execve case, my parent is the parent of my parent (current
  102. * process transparently inherits the "real" parent through already
  103. * opened pal_handle on "temporary" parent's cur_process.parent) */
  104. if (!cur_process.parent->pal_handle) {
  105. /* for clone/fork case, parent is connected on parent_process */
  106. cur_process.parent->pal_handle = PAL_CB(parent_process);
  107. }
  108. add_ipc_port_by_id(cur_process.parent->vmid,
  109. cur_process.parent->pal_handle,
  110. IPC_PORT_DIRPRT | IPC_PORT_LISTEN,
  111. /*fini=*/NULL,
  112. &cur_process.parent->port);
  113. unlock(&cur_process.lock);
  114. return 0;
  115. }
  116. static int init_ns_ipc_port(int ns_idx) {
  117. if (!cur_process.ns[ns_idx]) {
  118. /* no NS info from parent process, no sense in creating NS IPC port */
  119. return 0;
  120. }
  121. if (!cur_process.ns[ns_idx]->pal_handle && qstrempty(&cur_process.ns[ns_idx]->uri)) {
  122. /* there is no connection to NS leader via PAL handle and there is no
  123. * URI to find NS leader: do not create NS IPC port now, it will be
  124. * created on-demand during NS leader lookup */
  125. return 0;
  126. }
  127. lock(&cur_process.lock);
  128. if (!cur_process.ns[ns_idx]->pal_handle) {
  129. debug("Reconnecting IPC port %s\n", qstrgetstr(&cur_process.ns[ns_idx]->uri));
  130. cur_process.ns[ns_idx]->pal_handle = DkStreamOpen(qstrgetstr(&cur_process.ns[ns_idx]->uri), 0, 0, 0, 0);
  131. if (!cur_process.ns[ns_idx]->pal_handle) {
  132. unlock(&cur_process.lock);
  133. return -PAL_ERRNO;
  134. }
  135. }
  136. IDTYPE type = (ns_idx == PID_NS) ? IPC_PORT_PIDLDR : IPC_PORT_SYSVLDR;
  137. add_ipc_port_by_id(cur_process.ns[ns_idx]->vmid,
  138. cur_process.ns[ns_idx]->pal_handle,
  139. type | IPC_PORT_LISTEN,
  140. /*fini=*/NULL,
  141. &cur_process.ns[ns_idx]->port);
  142. unlock(&cur_process.lock);
  143. return 0;
  144. }
  145. int init_ipc_ports(void) {
  146. if (!(port_mgr = create_mem_mgr(init_align_up(PORT_MGR_ALLOC))))
  147. return -ENOMEM;
  148. int ret;
  149. if ((ret = init_self_ipc_port()) < 0)
  150. return ret;
  151. if ((ret = init_parent_ipc_port()) < 0)
  152. return ret;
  153. if ((ret = init_ns_ipc_port(PID_NS)) < 0)
  154. return ret;
  155. if ((ret = init_ns_ipc_port(SYSV_NS)) < 0)
  156. return ret;
  157. return 0;
  158. }
  159. int init_ipc_helper(void) {
  160. /* early enough in init, can write global vars without the lock */
  161. ipc_helper_state = HELPER_NOTALIVE;
  162. create_lock(&ipc_helper_lock);
  163. create_event(&install_new_event);
  164. /* some IPC ports were already added before this point, so spawn IPC
  165. * helper thread (and enable locking mechanisms if not done already
  166. * since we are going in multi-threaded mode) */
  167. enable_locking();
  168. lock(&ipc_helper_lock);
  169. create_ipc_helper();
  170. unlock(&ipc_helper_lock);
  171. return 0;
  172. }
  173. static struct shim_ipc_port* __create_ipc_port(PAL_HANDLE hdl) {
  174. struct shim_ipc_port* port =
  175. get_mem_obj_from_mgr_enlarge(port_mgr, size_align_up(PORT_MGR_ALLOC));
  176. if (!port)
  177. return NULL;
  178. memset(port, 0, sizeof(struct shim_ipc_port));
  179. port->pal_handle = hdl;
  180. INIT_LIST_HEAD(port, list);
  181. INIT_LISTP(&port->msgs);
  182. REF_SET(port->ref_count, 0);
  183. create_lock(&port->msgs_lock);
  184. return port;
  185. }
  186. static void __free_ipc_port(struct shim_ipc_port* port) {
  187. if (port->pal_handle) {
  188. DkObjectClose(port->pal_handle);
  189. port->pal_handle = NULL;
  190. }
  191. destroy_lock(&port->msgs_lock);
  192. free_mem_obj_to_mgr(port_mgr, port);
  193. }
  194. static void __get_ipc_port(struct shim_ipc_port* port) {
  195. REF_INC(port->ref_count);
  196. }
  197. static void __put_ipc_port(struct shim_ipc_port* port) {
  198. int ref_count = REF_DEC(port->ref_count);
  199. if (!ref_count)
  200. __free_ipc_port(port);
  201. }
  202. void get_ipc_port(struct shim_ipc_port* port) {
  203. /* no need to grab ipc_helper_lock because __get_ipc_port() is atomic */
  204. __get_ipc_port(port);
  205. }
  206. void put_ipc_port(struct shim_ipc_port* port) {
  207. /* this is atomic so we don't grab lock in common case of ref_count > 0 */
  208. int ref_count = REF_DEC(port->ref_count);
  209. if (!ref_count) {
  210. lock(&ipc_helper_lock);
  211. __free_ipc_port(port);
  212. unlock(&ipc_helper_lock);
  213. }
  214. }
  215. static void __add_ipc_port(struct shim_ipc_port* port, IDTYPE vmid,
  216. IDTYPE type, port_fini fini) {
  217. port->type |= type;
  218. if (vmid && !port->vmid)
  219. port->vmid = vmid;
  220. /* find empty slot in fini callbacks and register callback */
  221. if (fini) {
  222. bool found_empty_slot = false;
  223. for (int i = 0; i < MAX_IPC_PORT_FINI_CB; i++)
  224. if (!port->fini[i] || port->fini[i] == fini) {
  225. port->fini[i] = fini;
  226. found_empty_slot = true;
  227. break;
  228. }
  229. assert(found_empty_slot);
  230. }
  231. /* add to port list if not there already */
  232. if (LIST_EMPTY(port, list)) {
  233. __get_ipc_port(port);
  234. LISTP_ADD(port, &port_list, list);
  235. }
  236. /* wake up IPC helper thread so that it picks up added port */
  237. if (ipc_helper_state == HELPER_ALIVE)
  238. set_event(&install_new_event, 1);
  239. }
  240. static void __del_ipc_port(struct shim_ipc_port* port) {
  241. debug("Deleting port %p (handle %p) of process %u\n",
  242. port, port->pal_handle, port->vmid & 0xFFFF);
  243. DkStreamDelete(port->pal_handle, 0);
  244. LISTP_DEL_INIT(port, &port_list, list);
  245. /* Check for pending messages on port (threads might be blocking for responses) */
  246. lock(&port->msgs_lock);
  247. struct shim_ipc_msg_duplex* msg;
  248. struct shim_ipc_msg_duplex* tmp;
  249. LISTP_FOR_EACH_ENTRY_SAFE(msg, tmp, &port->msgs, list) {
  250. LISTP_DEL_INIT(msg, &port->msgs, list);
  251. msg->retval = -ECONNRESET;
  252. if (msg->thread) {
  253. debug("Deleted pending message on port %p, wake up blocking thread %d\n",
  254. port, msg->thread->tid);
  255. thread_wakeup(msg->thread);
  256. }
  257. }
  258. unlock(&port->msgs_lock);
  259. __put_ipc_port(port);
  260. /* wake up IPC helper thread so that it forgets about deleted port */
  261. if (ipc_helper_state == HELPER_ALIVE)
  262. set_event(&install_new_event, 1);
  263. }
  264. void add_ipc_port(struct shim_ipc_port* port, IDTYPE vmid, IDTYPE type, port_fini fini) {
  265. debug("Adding port %p (handle %p) for process %u (type=%04x)\n",
  266. port, port->pal_handle, port->vmid & 0xFFFF, type);
  267. lock(&ipc_helper_lock);
  268. __add_ipc_port(port, vmid, type, fini);
  269. unlock(&ipc_helper_lock);
  270. }
  271. void add_ipc_port_by_id(IDTYPE vmid, PAL_HANDLE hdl, IDTYPE type,
  272. port_fini fini, struct shim_ipc_port** portptr) {
  273. debug("Adding port (handle %p) for process %u (type %04x)\n",
  274. hdl, vmid & 0xFFFF, type);
  275. struct shim_ipc_port* port = NULL;
  276. if (portptr)
  277. *portptr = NULL;
  278. assert(hdl && PAL_GET_TYPE(hdl));
  279. lock(&ipc_helper_lock);
  280. /* check if port with this PAL handle already exists, then we only
  281. * need to update its vmid, type, and fini callback */
  282. struct shim_ipc_port* tmp;
  283. LISTP_FOR_EACH_ENTRY(tmp, &port_list, list)
  284. if (tmp->pal_handle == hdl) {
  285. port = tmp;
  286. break;
  287. }
  288. if (!port) {
  289. /* port does not yet exist, create it */
  290. port = __create_ipc_port(hdl);
  291. if (!port) {
  292. debug("Failed to create IPC port for handle %p\n", hdl);
  293. goto out;
  294. }
  295. }
  296. /* add/update port */
  297. __add_ipc_port(port, vmid, type, fini);
  298. if (portptr) {
  299. __get_ipc_port(port);
  300. *portptr = port;
  301. }
  302. out:
  303. unlock(&ipc_helper_lock);
  304. }
  305. void del_ipc_port_fini(struct shim_ipc_port* port, unsigned int exitcode) {
  306. lock(&ipc_helper_lock);
  307. if (LIST_EMPTY(port, list)) {
  308. unlock(&ipc_helper_lock);
  309. return;
  310. }
  311. /* prevent __del_ipc_port() from freeing port since we need it for fini callbacks */
  312. __get_ipc_port(port);
  313. __del_ipc_port(port);
  314. unlock(&ipc_helper_lock);
  315. for (int i = 0; i < MAX_IPC_PORT_FINI_CB; i++)
  316. if (port->fini[i]) {
  317. (port->fini[i])(port, port->vmid, exitcode);
  318. port->fini[i] = NULL;
  319. }
  320. __put_ipc_port(port);
  321. }
  322. void del_all_ipc_ports(void) {
  323. lock(&ipc_helper_lock);
  324. struct shim_ipc_port* port;
  325. struct shim_ipc_port* tmp;
  326. LISTP_FOR_EACH_ENTRY_SAFE(port, tmp, &port_list, list)
  327. __del_ipc_port(port);
  328. unlock(&ipc_helper_lock);
  329. }
  330. struct shim_ipc_port* lookup_ipc_port(IDTYPE vmid, IDTYPE type) {
  331. struct shim_ipc_port* port = NULL;
  332. assert(vmid && type);
  333. lock(&ipc_helper_lock);
  334. struct shim_ipc_port* tmp;
  335. LISTP_FOR_EACH_ENTRY(tmp, &port_list, list) {
  336. if (tmp->vmid == vmid && (tmp->type & type)) {
  337. debug("Found port %p (handle %p) for process %u (type %04x)\n",
  338. tmp, tmp->pal_handle, tmp->vmid & 0xFFFF, tmp->type);
  339. port = tmp;
  340. __get_ipc_port(port);
  341. break;
  342. }
  343. }
  344. unlock(&ipc_helper_lock);
  345. return port;
  346. }
  347. #define PORTS_ON_STACK_CNT 32
  348. int broadcast_ipc(struct shim_ipc_msg* msg, int target_type, struct shim_ipc_port* exclude_port) {
  349. int ret;
  350. struct shim_ipc_port* port;
  351. struct shim_ipc_port** target_ports;
  352. size_t target_ports_cnt = 0;
  353. assert(target_type);
  354. lock(&ipc_helper_lock);
  355. /* Collect all ports with appropriate types. In common case, stack-allocated
  356. * array of PORTS_ON_STACK_CNT ports is enough. If there are more ports, we
  357. * will allocate a bigger array on the heap and collect all ports again. */
  358. struct shim_ipc_port* target_ports_stack[PORTS_ON_STACK_CNT];
  359. LISTP_FOR_EACH_ENTRY(port, &port_list, list) {
  360. if (port == exclude_port)
  361. continue;
  362. if (port->type & target_type) {
  363. if (target_ports_cnt < PORTS_ON_STACK_CNT)
  364. target_ports_stack[target_ports_cnt] = port;
  365. target_ports_cnt++;
  366. }
  367. }
  368. target_ports = target_ports_stack;
  369. if (target_ports_cnt > PORTS_ON_STACK_CNT) {
  370. /* Rare case when there are more than PORTS_ON_STACK_CNT ports. Allocate
  371. * big-enough array on the heap and collect all ports again. */
  372. size_t cnt = 0;
  373. struct shim_ipc_port** target_ports_heap =
  374. malloc(sizeof(struct shim_ipc_port *) * target_ports_cnt);
  375. LISTP_FOR_EACH_ENTRY(port, &port_list, list) {
  376. if (port == exclude_port)
  377. continue;
  378. if (port->type & target_type)
  379. target_ports_heap[cnt++] = port;
  380. }
  381. target_ports = target_ports_heap;
  382. assert(cnt == target_ports_cnt);
  383. }
  384. for (size_t i = 0; i < target_ports_cnt; i++)
  385. __get_ipc_port(target_ports[i]);
  386. unlock(&ipc_helper_lock);
  387. /* send msg to each collected port (note that ports cannot be freed in meantime) */
  388. for (size_t i = 0; i < target_ports_cnt; i++) {
  389. port = target_ports[i];
  390. debug("Broadcast to port %p (handle %p) for process %u (type %x, target %x)\n",
  391. port, port->pal_handle, port->vmid & 0xFFFF, port->type, target_type);
  392. msg->dst = port->vmid;
  393. ret = send_ipc_message(msg, port);
  394. if (ret < 0) {
  395. debug("Broadcast to port %p (handle %p) for process %u failed (errno = %d)!\n",
  396. port, port->pal_handle, port->vmid & 0xFFFF, ret);
  397. goto out;
  398. }
  399. }
  400. ret = 0;
  401. out:
  402. for (size_t i = 0; i < target_ports_cnt; i++)
  403. put_ipc_port(target_ports[i]);
  404. if (target_ports != target_ports_stack)
  405. free(target_ports);
  406. return ret;
  407. }
  408. static int ipc_resp_callback(struct shim_ipc_msg* msg, struct shim_ipc_port* port) {
  409. struct shim_ipc_resp* resp = (struct shim_ipc_resp*) &msg->msg;
  410. debug("IPC callback from %u: IPC_RESP(%d)\n", msg->src & 0xFFFF, resp->retval);
  411. if (!msg->seq)
  412. return resp->retval;
  413. /* find a corresponding request msg for this response msg */
  414. struct shim_ipc_msg_duplex* req_msg = pop_ipc_msg_duplex(port, msg->seq);
  415. /* if some thread is waiting for response, wake it with response retval */
  416. if (req_msg) {
  417. req_msg->retval = resp->retval;
  418. if (req_msg->thread)
  419. thread_wakeup(req_msg->thread);
  420. return 0;
  421. }
  422. return resp->retval;
  423. }
  424. int send_response_ipc_message(struct shim_ipc_port* port, IDTYPE dest, int ret, unsigned long seq) {
  425. ret = (ret == RESPONSE_CALLBACK) ? 0 : ret;
  426. /* create IPC_RESP msg to send to dest, with sequence number seq, and in-body retval ret */
  427. size_t total_msg_size = get_ipc_msg_size(sizeof(struct shim_ipc_resp));
  428. struct shim_ipc_msg* resp_msg = __alloca(total_msg_size);
  429. init_ipc_msg(resp_msg, IPC_RESP, total_msg_size, dest);
  430. resp_msg->seq = seq;
  431. struct shim_ipc_resp* resp = (struct shim_ipc_resp *)resp_msg->msg;
  432. resp->retval = ret;
  433. debug("IPC send to %u: IPC_RESP(%d)\n", resp_msg->dst & 0xFFFF, ret);
  434. return send_ipc_message(resp_msg, port);
  435. }
  436. static int receive_ipc_message(struct shim_ipc_port* port) {
  437. int ret;
  438. size_t readahead = IPC_MSG_MINIMAL_SIZE * 2;
  439. size_t bufsize = IPC_MSG_MINIMAL_SIZE + readahead;
  440. struct shim_ipc_msg* msg = malloc(bufsize);
  441. size_t expected_size = IPC_MSG_MINIMAL_SIZE;
  442. size_t bytes = 0;
  443. do {
  444. while (bytes < expected_size) {
  445. /* grow msg buffer to accomodate bigger messages */
  446. if (expected_size + readahead > bufsize) {
  447. while (expected_size + readahead > bufsize)
  448. bufsize *= 2;
  449. void* tmp_buf = malloc(bufsize);
  450. memcpy(tmp_buf, msg, bytes);
  451. free(msg);
  452. msg = tmp_buf;
  453. }
  454. size_t read = DkStreamRead(port->pal_handle, /*offset=*/0, expected_size - bytes + readahead,
  455. (void *) msg + bytes, NULL, 0);
  456. if (!read) {
  457. if (PAL_ERRNO == EINTR || PAL_ERRNO == EAGAIN || PAL_ERRNO == EWOULDBLOCK)
  458. continue;
  459. debug("Port %p (handle %p) closed while receiving IPC message\n", port, port->pal_handle);
  460. del_ipc_port_fini(port, -ECHILD);
  461. ret = -PAL_ERRNO;
  462. goto out;
  463. }
  464. bytes += read;
  465. /* extract actual msg size from msg header and continue reading msg body */
  466. if (bytes >= IPC_MSG_MINIMAL_SIZE)
  467. expected_size = msg->size;
  468. }
  469. debug("Received IPC message from port %p (handle %p): code=%d size=%lu src=%u dst=%u seq=%lx\n",
  470. port, port->pal_handle, msg->code, msg->size, msg->src & 0xFFFF, msg->dst & 0xFFFF, msg->seq);
  471. /* skip messages coming from myself (in case of broadcast) */
  472. if (msg->src != cur_process.vmid) {
  473. if (msg->code < IPC_CODE_NUM && ipc_callbacks[msg->code]) {
  474. /* invoke callback to this msg */
  475. ret = (*ipc_callbacks[msg->code])(msg, port);
  476. if ((ret < 0 || ret == RESPONSE_CALLBACK) && msg->seq) {
  477. /* send IPC_RESP message to sender of this msg */
  478. ret = send_response_ipc_message(port, msg->src, ret, msg->seq);
  479. if (ret < 0) {
  480. debug("Sending IPC_RESP msg on port %p (handle %p) to %u failed\n",
  481. port, port->pal_handle, msg->src & 0xFFFF);
  482. ret = -PAL_ERRNO;
  483. goto out;
  484. }
  485. }
  486. }
  487. }
  488. bytes -= expected_size; /* one message was received and handled */
  489. if (bytes > 0) {
  490. /* we may have started reading the next message, move this message
  491. * to beginning of msg buffer and reset expected size */
  492. memmove(msg, (void *)msg + expected_size, bytes);
  493. expected_size = IPC_MSG_MINIMAL_SIZE;
  494. if (bytes >= IPC_MSG_MINIMAL_SIZE)
  495. expected_size = msg->size;
  496. }
  497. } while (bytes > 0);
  498. ret = 0;
  499. out:
  500. free(msg);
  501. return ret;
  502. }
  503. /* Main routine of the IPC helper thread. IPC helper thread is spawned when
  504. * the first IPC port is added and is terminated only when the whole Graphene
  505. * application terminates. IPC helper thread runs in an endless loop and waits
  506. * on port events (either the addition/removal of ports or actual port events:
  507. * acceptance of new client or receiving/sending messages). In particular,
  508. * IPC helper thread calls receive_ipc_message() if a message arrives on port.
  509. *
  510. * Other threads add and remove IPC ports via add_ipc_xxx() and del_ipc_xxx()
  511. * functions. These ports are added to port_list which the IPC helper thread
  512. * consults before each new DkObjectsWaitAny().
  513. *
  514. * Note that ports are copied from global port_list to local object_list. This
  515. * is because ports may be removed from port_list by other threads while IPC
  516. * helper thread is waiting on DkObjectsWaitAny(). For this reason IPC thread
  517. * also get references to all current ports and puts them after handling all
  518. * ports in object_list.
  519. *
  520. * Previous implementation went to great lengths to keep changes to the list of
  521. * current ports to a minimum (instead of repopulating the list before each wait
  522. * like in current code). Unfortunately, this resulted in undue complexity.
  523. * Current implementation should perform fine for usual case of <100 IPC ports
  524. * and with IPC helper thread always running in background on its own core.
  525. */
  526. noreturn static void shim_ipc_helper(void* dummy) {
  527. __UNUSED(dummy);
  528. struct shim_thread* self = get_cur_thread();
  529. PAL_HANDLE polled = NULL;
  530. /* Initialize two lists:
  531. * - object_list collects IPC port objects and is the main handled list
  532. * - palhandle_list collects corresponding PAL handles of IPC port objects
  533. * and is needed for DkObjectsWaitAny(.., <array-of-PAL-handles>, ..)
  534. * interface; palhandle_list always contains at least install_new_event
  535. *
  536. * We allocate these two lists on the heap so they do not overflow the
  537. * limited PAL stack. We grow them at runtime if needed.
  538. */
  539. size_t object_list_size = 0;
  540. size_t object_list_maxsize = 32;
  541. struct shim_ipc_port** object_list =
  542. malloc(sizeof(struct shim_ipc_port *) * object_list_maxsize);
  543. PAL_HANDLE* palhandle_list =
  544. malloc(sizeof(PAL_HANDLE) * (1 + object_list_maxsize));
  545. PAL_HANDLE install_new_event_hdl = event_handle(&install_new_event);
  546. palhandle_list[0] = install_new_event_hdl;
  547. while (true) {
  548. lock(&ipc_helper_lock);
  549. if (ipc_helper_state != HELPER_ALIVE) {
  550. ipc_helper_thread = NULL;
  551. unlock(&ipc_helper_lock);
  552. break;
  553. }
  554. unlock(&ipc_helper_lock);
  555. struct shim_ipc_port* polled_port = NULL;
  556. if (polled == install_new_event_hdl) {
  557. /* some thread wants to install new event; this event is found
  558. * in object_list below, so just re-init install_new_event */
  559. debug("New IPC event was requested (port was added/removed)\n");
  560. clear_event(&install_new_event);
  561. } else {
  562. /* it is not install_new_event handle, so must be one of ports */
  563. for (size_t i = 0; i < object_list_size; i++)
  564. if (polled == object_list[i]->pal_handle) {
  565. polled_port = object_list[i];
  566. break;
  567. }
  568. }
  569. if (polled_port) {
  570. if (polled_port->type & IPC_PORT_SERVER) {
  571. /* if polled port is server port, accept a client,
  572. * create client port, and add it to port list */
  573. PAL_HANDLE client = DkStreamWaitForClient(polled_port->pal_handle);
  574. if (client) {
  575. /* type of client port is the same as original server port
  576. * but with LISTEN (for remote client) and without SERVER
  577. * (this port doesn't wait for new clients) */
  578. IDTYPE client_type = (polled_port->type & ~IPC_PORT_SERVER) | IPC_PORT_LISTEN;
  579. add_ipc_port_by_id(polled_port->vmid, client, client_type, NULL, NULL);
  580. } else {
  581. debug("Port %p (handle %p) was removed during accepting client\n",
  582. polled_port, polled_port->pal_handle);
  583. del_ipc_port_fini(polled_port, -ECHILD);
  584. }
  585. } else {
  586. PAL_STREAM_ATTR attr;
  587. if (DkStreamAttributesQueryByHandle(polled_port->pal_handle, &attr)) {
  588. /* can read on this port, so receive messages */
  589. if (attr.readable) {
  590. /* NOTE: IPC helper thread does not handle failures currently */
  591. receive_ipc_message(polled_port);
  592. }
  593. if (attr.disconnected) {
  594. debug("Port %p (handle %p) disconnected\n",
  595. polled_port, polled_port->pal_handle);
  596. del_ipc_port_fini(polled_port, -ECONNRESET);
  597. }
  598. } else {
  599. debug("Port %p (handle %p) was removed during attr querying\n",
  600. polled_port, polled_port->pal_handle);
  601. del_ipc_port_fini(polled_port, -PAL_ERRNO);
  602. }
  603. }
  604. }
  605. /* done handling ports; put their references so they can be freed */
  606. for (size_t i = 0; i < object_list_size; i++)
  607. put_ipc_port(object_list[i]);
  608. lock(&ipc_helper_lock);
  609. /* iterate through all ports to repopulate object_list */
  610. object_list_size = 0;
  611. struct shim_ipc_port* port;
  612. struct shim_ipc_port* tmp;
  613. LISTP_FOR_EACH_ENTRY_SAFE(port, tmp, &port_list, list) {
  614. /* get port reference so it is not freed while we wait on/handle it */
  615. __get_ipc_port(port);
  616. if (object_list_size == object_list_maxsize) {
  617. /* grow object_list and palhandle_list to accomodate more objects */
  618. struct shim_ipc_port** tmp_array = malloc(
  619. sizeof(struct shim_ipc_port *) * (object_list_maxsize * 2));
  620. PAL_HANDLE* tmp_pal_array = malloc(
  621. sizeof(PAL_HANDLE) * (1 + object_list_maxsize * 2));
  622. memcpy(tmp_array, object_list,
  623. sizeof(struct shim_ipc_port *) * (object_list_size));
  624. memcpy(tmp_pal_array, palhandle_list,
  625. sizeof(PAL_HANDLE) * (1 + object_list_size));
  626. object_list_maxsize *= 2;
  627. free(object_list);
  628. free(palhandle_list);
  629. object_list = tmp_array;
  630. palhandle_list = tmp_pal_array;
  631. }
  632. /* re-add this port to object_list and palhandle_list */
  633. object_list[object_list_size] = port;
  634. palhandle_list[object_list_size + 1] = port->pal_handle;
  635. object_list_size++;
  636. debug("Listening to process %u on port %p (handle %p, type %04x)\n",
  637. port->vmid & 0xFFFF, port, port->pal_handle, port->type);
  638. }
  639. unlock(&ipc_helper_lock);
  640. /* wait on collected ports' PAL handles + install_new_event_hdl */
  641. polled = DkObjectsWaitAny(object_list_size + 1, palhandle_list, NO_TIMEOUT);
  642. }
  643. /* IPC thread exits; put acquired port references so they can be freed */
  644. for (size_t i = 0; i < object_list_size; i++)
  645. put_ipc_port(object_list[i]);
  646. free(object_list);
  647. free(palhandle_list);
  648. put_thread(self);
  649. debug("IPC helper thread terminated\n");
  650. DkThreadExit();
  651. }
  652. static void shim_ipc_helper_prepare(void* arg) {
  653. struct shim_thread * self = (struct shim_thread *) arg;
  654. if (!arg)
  655. return;
  656. __libc_tcb_t tcb;
  657. allocate_tls(&tcb, false, self);
  658. debug_setbuf(&tcb.shim_tcb, true);
  659. debug("Set tcb to %p\n", &tcb);
  660. lock(&ipc_helper_lock);
  661. bool notme = (self != ipc_helper_thread);
  662. unlock(&ipc_helper_lock);
  663. void* stack = allocate_stack(IPC_HELPER_STACK_SIZE, allocsize, false);
  664. if (notme || !stack) {
  665. free(stack);
  666. put_thread(self);
  667. DkThreadExit();
  668. return;
  669. }
  670. debug("IPC helper thread started\n");
  671. /* swap stack to be sure we don't drain the small stack PAL provides */
  672. self->stack_top = stack + IPC_HELPER_STACK_SIZE;
  673. self->stack = stack;
  674. __SWITCH_STACK(self->stack_top, shim_ipc_helper, NULL);
  675. }
  676. /* this should be called with the ipc_helper_lock held */
  677. static int create_ipc_helper(void) {
  678. if (ipc_helper_state == HELPER_ALIVE)
  679. return 0;
  680. struct shim_thread * new = get_new_internal_thread();
  681. if (!new)
  682. return -ENOMEM;
  683. ipc_helper_thread = new;
  684. ipc_helper_state = HELPER_ALIVE;
  685. PAL_HANDLE handle = thread_create(shim_ipc_helper_prepare, new);
  686. if (!handle) {
  687. int ret = -PAL_ERRNO; /* put_thread() may overwrite errno */
  688. ipc_helper_thread = NULL;
  689. ipc_helper_state = HELPER_NOTALIVE;
  690. put_thread(new);
  691. return ret;
  692. }
  693. new->pal_handle = handle;
  694. return 0;
  695. }
  696. /* On success, the reference to ipc helper thread is returned with refcount
  697. * incremented. It is the responsibility of caller to wait for ipc helper's
  698. * exit and then release the final reference to free related resources (it is
  699. * problematic for the thread itself to release its own resources e.g. stack).
  700. */
  701. struct shim_thread* terminate_ipc_helper(void) {
  702. if (ipc_helper_state != HELPER_ALIVE)
  703. return NULL;
  704. /* NOTE: Graphene doesn't have an abstraction of a queue of pending signals
  705. * between communicating processes (instead all communication is done over
  706. * streams). Thus, app code like this (found in e.g. Lmbench's bw_unix):
  707. * kill(child, SIGKILL);
  708. * exit(0);
  709. * results in a data race between the SIGKILL message sent over IPC stream
  710. * and the parent process exiting. In the worst case, the parent will exit
  711. * before the SIGKILL message goes through the host-OS stream, the host OS
  712. * will close the stream, and the message will never be seen by child. To
  713. * prevent such cases, we simply wait for a bit before exiting.
  714. * */
  715. debug("Waiting for 0.5s for all in-flight IPC messages to reach their destinations\n");
  716. DkThreadDelayExecution(500);
  717. lock(&ipc_helper_lock);
  718. struct shim_thread* ret = ipc_helper_thread;
  719. if (ret)
  720. get_thread(ret);
  721. ipc_helper_state = HELPER_NOTALIVE;
  722. unlock(&ipc_helper_lock);
  723. /* force wake up of ipc helper thread so that it exits */
  724. set_event(&install_new_event, 1);
  725. return ret;
  726. }