Prechádzať zdrojové kódy

[LibOS] Fixing a bug freeing IPC port objects

Chia-Che Tsai 5 rokov pred
rodič
commit
ac3a6a0021

+ 1 - 1
LibOS/shim/include/shim_ipc.h

@@ -101,7 +101,7 @@ struct shim_ipc_port {
 
     port_fini           fini[MAX_IPC_PORT_FINI_CB];
 
-    bool                update, recent, deleted;
+    bool                update, recent;
     struct {
         unsigned int    type;
         IDTYPE          vmid;

+ 7 - 13
LibOS/shim/src/ipc/shim_ipc.c

@@ -371,20 +371,11 @@ int send_ipc_message (struct shim_ipc_msg * msg, struct shim_ipc_port * port)
     debug("send ipc message to port %p (handle %p)\n", port,
           port->pal_handle);
 
-    PAL_HANDLE pal_handle = port->pal_handle;
-
-    /* Read memory barrier needed here to ensure pal_handle is alive
-     * if port->deleted is not true. */
-    rmb();
-
-    if (port->deleted)
-        return -ECONNRESET;
-
-    int ret = DkStreamWrite(pal_handle, 0, msg->size, msg, NULL);
+    int ret = DkStreamWrite(port->pal_handle, 0, msg->size, msg, NULL);
 
     if (ret == 0 && PAL_NATIVE_ERRNO) {
         debug("port %p (handle %p) is removed at sending\n", port,
-              pal_handle);
+              port->pal_handle);
 
         del_ipc_port_fini(port, -ECHILD);
         return -PAL_ERRNO;
@@ -396,9 +387,12 @@ int send_ipc_message (struct shim_ipc_msg * msg, struct shim_ipc_port * port)
 int close_ipc_message_duplex (struct shim_ipc_msg_obj * msg,
                               struct shim_ipc_port * port)
 {
-    if (port && !list_empty(msg, list)) {
+    if (port) {
+        // Check if the message is pending on the port for response. If so,
+        // remove the message from the list.
         lock(port->msgs_lock);
-        listp_del_init(msg, &port->msgs, list);
+        if (!list_empty(msg, list))
+            listp_del_init(msg, &port->msgs, list);
         unlock(port->msgs_lock);
     }
 

+ 89 - 59
LibOS/shim/src/ipc/shim_ipc_helper.c

@@ -185,6 +185,17 @@ static void __get_ipc_port (struct shim_ipc_port * pobj)
 #endif
 }
 
+static void __free_ipc_port (struct shim_ipc_port * pobj)
+{
+    if (pobj->pal_handle) {
+        DkObjectClose(pobj->pal_handle);
+        pobj->pal_handle = NULL;
+    }
+
+    destroy_lock(pobj->msgs_lock);
+    free_mem_obj_to_mgr(port_mgr, pobj);
+}
+
 static void __put_ipc_port (struct shim_ipc_port * pobj)
 {
     int ref_count = REF_DEC(pobj->ref_count);
@@ -194,16 +205,8 @@ static void __put_ipc_port (struct shim_ipc_port * pobj)
           pobj->pal_handle, ref_count);
 #endif
 
-    if (!ref_count) {
-        if (pobj->pal_handle) {
-            DkObjectClose(pobj->pal_handle);
-            pobj->pal_handle = NULL;
-        }
-
-        destroy_lock(pobj->msgs_lock);
-
-        free_mem_obj_to_mgr(port_mgr, pobj);
-    }
+    if (!ref_count)
+        __free_ipc_port(pobj);
 }
 
 /* This should be called with the ipc_helper_lock held */
@@ -359,6 +362,8 @@ void add_ipc_port_by_id (IDTYPE vmid, PAL_HANDLE hdl, int type,
     }
 
     bool need_restart = __add_ipc_port(port, vmid, type, fini);
+    assert(!list_empty(port, list));
+    assert(!vmid || !list_empty(port, hlist));
 
     if (portptr)
         *portptr = port;
@@ -377,12 +382,12 @@ static bool __del_ipc_port (struct shim_ipc_port * port, int type)
     debug("deleting port %p (handle %p) for process %u\n",
           port, port->pal_handle, port->info.vmid);
 
+    __get_ipc_port(port); // Prevent the object from being freed during deletion
+    assert(!list_empty(port, list)); // Never delete a port twice
+
     bool need_restart = false;
     type = type ? (type & port->info.type) : port->info.type;
 
-    port->deleted = true; /* prevent further usage of the port */
-    wmb(); /* commit the state to the memory */
-
     if ((type & IPC_PORT_KEEPALIVE) ^
         (port->info.type & IPC_PORT_KEEPALIVE))
         need_restart = true;
@@ -395,14 +400,16 @@ static bool __del_ipc_port (struct shim_ipc_port * port, int type)
         goto out;
     }
 
+    // Prevent further usage of the PAL handle
+    DkStreamDelete(port->pal_handle, 0);
+
     if (port->info.type & IPC_PORT_IFPOLL)
         need_restart = true;
 
-    if (!list_empty(port, list)) {
-        listp_del_init(port, &pobj_list, list);
-        port->info.type &= IPC_PORT_IFPOLL;
-        __put_ipc_port(port);
-    }
+    // Officially delete the port
+    listp_del_init(port, &pobj_list, list);
+    port->info.type &= IPC_PORT_IFPOLL;
+    __put_ipc_port(port);
 
     if (!list_empty(port, hlist)) {
         // Re-fetch head pointer
@@ -411,14 +418,36 @@ static bool __del_ipc_port (struct shim_ipc_port * port, int type)
         __put_ipc_port(port);
     }
 
+    // Need to check if there are any pending messages on the port, which means
+    // some threads might be blocking for responses.
+    lock(port->msgs_lock);
+    struct shim_ipc_msg_obj * msg, * n;
+    listp_for_each_entry_safe(msg, n, &port->msgs, list) {
+        listp_del_init(msg, &port->msgs, list);
+        msg->retval = -ECONNRESET;
+        if (msg->thread) {
+            debug("wake up thread %d\n", msg->thread->tid);
+            thread_wakeup(msg->thread);
+        }
+    }
+    unlock(port->msgs_lock);
+
 out:
     port->update = true;
+    __put_ipc_port(port); // Free the object if ref_count is 0
     return need_restart;
 }
 
 void del_ipc_port (struct shim_ipc_port * port, int type)
 {
     lock(ipc_helper_lock);
+
+    // If the port is already deleted, don't delete it again.
+    if (list_empty(port, list)) {
+        unlock(ipc_helper_lock);
+        return;
+    }
+
     bool need_restart = __del_ipc_port(port, type);
 
     if (need_restart)
@@ -436,17 +465,19 @@ void del_ipc_port_by_id (IDTYPE vmid, int type)
     lock(ipc_helper_lock);
 
     listp_for_each_entry_safe(port, n, head, hlist) {
+        if (list_empty(port, list))
+            continue;
+
         debug("port %p (handle %p) for process %u in list %p\n",
               port, port->pal_handle, port->info.vmid, head);
 
-        if (port->info.vmid == vmid) {
-            if (__del_ipc_port(port, type))
-                need_restart = true;
-        }
+        if (port->info.vmid == vmid && __del_ipc_port(port, type))
+            need_restart = true;
     }
 
     if (need_restart)
         restart_ipc_helper(false);
+
     unlock(ipc_helper_lock);
 }
 
@@ -456,6 +487,13 @@ void del_ipc_port_fini (struct shim_ipc_port * port, unsigned int exitcode)
     int nfini = 0;
     assert(REF_GET(port->ref_count) > 0);
     lock(ipc_helper_lock);
+
+    // If the port is already deleted, don't delete it again.
+    if (list_empty(port, list)) {
+        unlock(ipc_helper_lock);
+        return;
+    }
+
     IDTYPE vmid = port->info.vmid;
     for (int i = 0 ; i < MAX_IPC_PORT_FINI_CB ; i++)
         if (port->fini[i]) {
@@ -463,44 +501,22 @@ void del_ipc_port_fini (struct shim_ipc_port * port, unsigned int exitcode)
             port->fini[i] = NULL;
         }
 
-    __get_ipc_port(port);
-
     bool need_restart = __del_ipc_port(port, 0);
-    unlock(ipc_helper_lock);
-
-    if (nfini) {
-        for (int i = 0 ; i < nfini ; i++)
-            (fini[i])(port, vmid, exitcode);
-    }
-
-    lock(port->msgs_lock);
-
-    if (!list_empty(port, list)) {
-        struct shim_ipc_msg_obj * msg, * n;
-
-        listp_for_each_entry_safe(msg, n, &port->msgs, list) {
-            listp_del_init(msg, &port->msgs, list);
-            msg->retval = -ECONNRESET;
-            if (msg->thread) {
-                debug("wake up thread %d\n", msg->thread->tid);
-                thread_wakeup(msg->thread);
-            }
-        }
-    }
-
-    put_ipc_port(port);
-    assert(REF_GET(port->ref_count) > 0);
 
     if (need_restart)
         restart_ipc_helper(false);
-    unlock(port->msgs_lock);
+
+    unlock(ipc_helper_lock);
+
+    for (int i = 0 ; i < nfini ; i++)
+        (fini[i])(port, vmid, exitcode);
 }
 
 static struct shim_ipc_port * __lookup_ipc_port (IDTYPE vmid, int type)
 {
     LISTP_TYPE(shim_ipc_port) * head = &ipc_port_pool[PID_HASH(vmid)];
     struct shim_ipc_port * tmp;
-    
+
     listp_for_each_entry(tmp, head, hlist)
         if (tmp->info.vmid == vmid && (!type || tmp->info.type & type)) {
             debug("found port %p (handle %p) for process %u (type %04x)\n",
@@ -516,18 +532,34 @@ struct shim_ipc_port * lookup_ipc_port (IDTYPE vmid, int type)
 {
     lock(ipc_helper_lock);
     struct shim_ipc_port * port = __lookup_ipc_port(vmid, type);
+    if (port) {
+        assert(!list_empty(port, list));
+        assert(!vmid || !list_empty(port, hlist));
+    }
     unlock(ipc_helper_lock);
     return port;
 }
 
 void get_ipc_port (struct shim_ipc_port * port)
 {
+    // No need to grab ipc_helper_lock because __get_ipc_port() is atomic.
     __get_ipc_port(port);
 }
 
 void put_ipc_port (struct shim_ipc_port * port)
 {
-    __put_ipc_port(port);
+    int ref_count = REF_DEC(port->ref_count);
+
+#ifdef DEBUG_REF
+    debug("put ipc port %p (handle %p, ref_count = %d)\n", port,
+          port->pal_handle, ref_count);
+#endif
+
+    if (!ref_count) {
+        lock(ipc_helper_lock); // Need to grab the lock
+        __free_ipc_port(port);
+        unlock(ipc_helper_lock);
+    }
 }
 
 void del_all_ipc_ports (int type)
@@ -537,9 +569,10 @@ void del_all_ipc_ports (int type)
 
     lock(ipc_helper_lock);
 
-    listp_for_each_entry_safe(pobj, n, &pobj_list, list)
-        if (pobj->pal_handle && __del_ipc_port(pobj, type))
+    listp_for_each_entry_safe(pobj, n, &pobj_list, list) {
+        if (__del_ipc_port(pobj, type))
             need_restart = true;
+    }
 
     if (need_restart)
         restart_ipc_helper(false);
@@ -559,9 +592,7 @@ int broadcast_ipc (struct shim_ipc_msg * msg, struct shim_ipc_port ** exclude,
             return 0;
 
         debug("send to broadcast stream\n");
-        get_ipc_port(broadcast_port);
         int ret = send_ipc_message(msg, broadcast_port);
-        put_ipc_port(broadcast_port);
         if (!ret)
             return 0;
     }
@@ -694,8 +725,6 @@ int receive_ipc_message (struct shim_ipc_port * port, unsigned long seq,
     int expected_size;
     int bytes = 0, ret = 0;
 
-    get_ipc_port(port);
-
     do {
         expected_size = IPC_MSG_MINIMAL_SIZE;
         while (bytes < expected_size) {
@@ -773,7 +802,6 @@ next:
     if (msgptr)
         *msgptr = NULL;
 
-    put_ipc_port(port);
     return ret;
 }
 
@@ -914,6 +942,8 @@ update_list:
         for (int i = 0 ; i < port_num ; i++) {
             struct shim_ipc_port * pobj = local_pobjs[i];
 
+            // If the port is removed from the list or intended to be deleted,
+            // remove the port from the polling array
             if (list_empty(pobj, list)) {
                 if (polled == pobj->pal_handle) {
                     polled = NULL;
@@ -1004,7 +1034,7 @@ end:
     barrier();
     if (ipc_helper_state == HELPER_HANDEDOVER) {
         debug("ipc helper thread is the last thread, process exiting\n");
-        shim_clean();
+        shim_terminate(); // Same as shim_clean(), but this is the official termination function
     }
 
     lock(ipc_helper_lock);

+ 0 - 2
LibOS/shim/src/ipc/shim_ipc_pid.c

@@ -158,8 +158,6 @@ int ipc_pid_kill_callback (IPC_CALLBACK_ARGS)
             break;
     }
 
-    assert(ret != -ESRCH);
-
     SAVE_PROFILE_INTERVAL(ipc_pid_kill_callback);
     return ret < 0 ? ret : RESPONSE_CALLBACK;
 }