/* Copyright (C) 2014 Stony Brook University
This file is part of Graphene Library OS.
Graphene Library OS is free software: you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public License
as published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
Graphene Library OS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see . */
/*
* shim_ipc.c
*
* This file contains code to maintain generic bookkeeping of IPC: operations
* on shim_ipc_msg (one-way IPC messages), shim_ipc_msg_duplex (IPC messages
* with acknowledgement), shim_ipc_info (IPC ports of process), shim_process.
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define IPC_INFO_MGR_ALLOC 32
#define PAGE_SIZE allocsize
#define OBJ_TYPE struct shim_ipc_info
#include "memmgr.h"
static MEM_MGR ipc_info_mgr;
struct shim_lock ipc_info_lock;
struct shim_process cur_process;
#define CLIENT_HASH_BITLEN 6
#define CLIENT_HASH_NUM (1 << CLIENT_HASH_BITLEN)
#define CLIENT_HASH_MASK (CLIENT_HASH_NUM - 1)
#define CLIENT_HASH(vmid) ((vmid)&CLIENT_HASH_MASK)
DEFINE_LISTP(shim_ipc_info);
static LISTP_TYPE(shim_ipc_info) info_hlist[CLIENT_HASH_NUM];
DEFINE_PROFILE_CATEGORY(ipc, );
DEFINE_PROFILE_OCCURENCE(syscall_use_ipc, ipc);
int init_ipc_ports(void);
int init_ns_pid(void);
int init_ns_sysv(void);
int init_ipc(void) {
int ret = 0;
create_lock(&ipc_info_lock);
if (!(ipc_info_mgr = create_mem_mgr(init_align_up(IPC_INFO_MGR_ALLOC))))
return -ENOMEM;
if ((ret = init_ipc_ports()) < 0)
return ret;
if ((ret = init_ns_pid()) < 0)
return ret;
if ((ret = init_ns_sysv()) < 0)
return ret;
return 0;
}
int prepare_ns_leaders(void) {
int ret = 0;
if ((ret = prepare_pid_leader()) < 0)
return ret;
if ((ret = prepare_sysv_leader()) < 0)
return ret;
return 0;
}
static struct shim_ipc_info* __create_ipc_info(IDTYPE vmid, const char* uri, size_t len) {
struct shim_ipc_info* info =
get_mem_obj_from_mgr_enlarge(ipc_info_mgr, size_align_up(IPC_INFO_MGR_ALLOC));
if (!info)
return NULL;
memset(info, 0, sizeof(struct shim_ipc_info));
info->vmid = vmid;
if (uri)
qstrsetstr(&info->uri, uri, len);
REF_SET(info->ref_count, 1);
INIT_LIST_HEAD(info, hlist);
return info;
}
static void __free_ipc_info(struct shim_ipc_info* info) {
if (info->pal_handle) {
DkObjectClose(info->pal_handle);
info->pal_handle = NULL;
}
if (info->port)
put_ipc_port(info->port);
qstrfree(&info->uri);
free_mem_obj_to_mgr(ipc_info_mgr, info);
}
static void __get_ipc_info(struct shim_ipc_info* info) {
REF_INC(info->ref_count);
}
static void __put_ipc_info(struct shim_ipc_info* info) {
int ref_count = REF_DEC(info->ref_count);
if (!ref_count)
__free_ipc_info(info);
}
void get_ipc_info(struct shim_ipc_info* info) {
/* no need to grab ipc_info_lock because __get_ipc_info() is atomic */
__get_ipc_info(info);
}
void put_ipc_info(struct shim_ipc_info* info) {
/* this is atomic so we don't grab lock in common case of ref_count > 0 */
int ref_count = REF_DEC(info->ref_count);
if (!ref_count) {
lock(&ipc_info_lock);
__free_ipc_info(info);
unlock(&ipc_info_lock);
}
}
struct shim_ipc_info* create_ipc_info(IDTYPE vmid, const char* uri, size_t len) {
lock(&ipc_info_lock);
struct shim_ipc_info* info = __create_ipc_info(vmid, uri, len);
unlock(&ipc_info_lock);
return info;
}
struct shim_ipc_info* create_ipc_info_in_list(IDTYPE vmid, const char* uri, size_t len) {
assert(vmid);
struct shim_ipc_info* info;
lock(&ipc_info_lock);
/* check if info with this vmid & uri already exists and return it */
LISTP_TYPE(shim_ipc_info)* info_bucket = &info_hlist[CLIENT_HASH(vmid)];
LISTP_FOR_EACH_ENTRY(info, info_bucket, hlist) {
if (info->vmid == vmid && !qstrcmpstr(&info->uri, uri, len)) {
get_ipc_info(info);
unlock(&ipc_info_lock);
return info;
}
}
/* otherwise create new info and return it */
info = __create_ipc_info(vmid, uri, len);
if (info) {
LISTP_ADD(info, info_bucket, hlist);
get_ipc_info(info);
}
unlock(&ipc_info_lock);
return info;
}
void put_ipc_info_in_list(struct shim_ipc_info* info) {
LISTP_TYPE(shim_ipc_info)* info_bucket = &info_hlist[CLIENT_HASH(info->vmid)];
lock(&ipc_info_lock);
__put_ipc_info(info);
if (REF_GET(info->ref_count) == 1) {
LISTP_DEL_INIT(info, info_bucket, hlist);
__put_ipc_info(info);
}
unlock(&ipc_info_lock);
}
struct shim_ipc_info* lookup_ipc_info(IDTYPE vmid) {
assert(vmid);
lock(&ipc_info_lock);
struct shim_ipc_info* info;
LISTP_TYPE(shim_ipc_info)* info_bucket = &info_hlist[CLIENT_HASH(vmid)];
LISTP_FOR_EACH_ENTRY(info, info_bucket, hlist) {
if (info->vmid == vmid && !qstrempty(&info->uri)) {
__get_ipc_info(info);
unlock(&ipc_info_lock);
return info;
}
}
unlock(&ipc_info_lock);
return NULL;
}
struct shim_process* create_process(bool dup_cur_process) {
struct shim_process* new_process = calloc(1, sizeof(struct shim_process));
if (!new_process)
return NULL;
lock(&cur_process.lock);
/* current process must have been initialized with info on its own IPC info */
assert(cur_process.self);
assert(cur_process.self->pal_handle && !qstrempty(&cur_process.self->uri));
if (dup_cur_process) {
/* execve case, new process assumes identity of current process and thus has
* - same vmid as current process
* - same self IPC info as current process
* - same parent IPC info as current process
*/
new_process->vmid = cur_process.vmid;
new_process->self = create_ipc_info(
cur_process.self->vmid, qstrgetstr(&cur_process.self->uri), cur_process.self->uri.len);
new_process->self->pal_handle = cur_process.self->pal_handle;
if (!new_process->self) {
unlock(&cur_process.lock);
return NULL;
}
/* there is a corner case of execve in very first process; such process does
* not have parent process, so cannot copy parent IPC info */
if (cur_process.parent) {
new_process->parent =
create_ipc_info(cur_process.parent->vmid, qstrgetstr(&cur_process.parent->uri),
cur_process.parent->uri.len);
new_process->parent->pal_handle = cur_process.parent->pal_handle;
}
} else {
/* fork/clone case, new process has new identity but inherits parent */
new_process->vmid = 0;
new_process->self = NULL;
new_process->parent = create_ipc_info(
cur_process.self->vmid, qstrgetstr(&cur_process.self->uri), cur_process.self->uri.len);
}
if (cur_process.parent && !new_process->parent) {
if (new_process->self)
put_ipc_info(new_process->self);
unlock(&cur_process.lock);
return NULL;
}
/* new process inherits the same namespace leaders */
for (int i = 0; i < TOTAL_NS; i++) {
if (cur_process.ns[i]) {
new_process->ns[i] =
create_ipc_info(cur_process.ns[i]->vmid, qstrgetstr(&cur_process.ns[i]->uri),
cur_process.ns[i]->uri.len);
if (!new_process->ns[i]) {
if (new_process->self)
put_ipc_info(new_process->self);
if (new_process->parent)
put_ipc_info(new_process->parent);
for (int j = 0; j < i; j++) {
put_ipc_info(new_process->ns[j]);
}
unlock(&cur_process.lock);
return NULL;
}
}
}
unlock(&cur_process.lock);
return new_process;
}
void free_process(struct shim_process* process) {
if (process->self)
put_ipc_info(process->self);
if (process->parent)
put_ipc_info(process->parent);
for (int i = 0; i < TOTAL_NS; i++)
if (process->ns[i])
put_ipc_info(process->ns[i]);
free(process);
}
void init_ipc_msg(struct shim_ipc_msg* msg, int code, size_t size, IDTYPE dest) {
msg->code = code;
msg->size = get_ipc_msg_size(size);
msg->src = cur_process.vmid;
msg->dst = dest;
msg->seq = 0;
}
void init_ipc_msg_duplex(struct shim_ipc_msg_duplex* msg, int code, size_t size, IDTYPE dest) {
init_ipc_msg(&msg->msg, code, size, dest);
msg->thread = NULL;
INIT_LIST_HEAD(msg, list);
msg->retval = 0;
msg->private = NULL;
}
int send_ipc_message(struct shim_ipc_msg* msg, struct shim_ipc_port* port) {
assert(msg->size >= IPC_MSG_MINIMAL_SIZE);
msg->src = cur_process.vmid;
debug("Sending ipc message to port %p (handle %p)\n", port, port->pal_handle);
size_t total_bytes = msg->size;
size_t bytes = 0;
do {
size_t ret =
DkStreamWrite(port->pal_handle, 0, total_bytes - bytes, (void*)msg + bytes, NULL);
if (!ret) {
if (PAL_ERRNO == EINTR || PAL_ERRNO == EAGAIN || PAL_ERRNO == EWOULDBLOCK)
continue;
debug("Port %p (handle %p) was removed during sending\n", port, port->pal_handle);
del_ipc_port_fini(port, -ECHILD);
return -PAL_ERRNO;
}
bytes += ret;
} while (bytes < total_bytes);
return 0;
}
struct shim_ipc_msg_duplex* pop_ipc_msg_duplex(struct shim_ipc_port* port, unsigned long seq) {
struct shim_ipc_msg_duplex* found = NULL;
lock(&port->msgs_lock);
struct shim_ipc_msg_duplex* tmp;
LISTP_FOR_EACH_ENTRY(tmp, &port->msgs, list) {
if (tmp->msg.seq == seq) {
found = tmp;
LISTP_DEL_INIT(tmp, &port->msgs, list);
break;
}
}
unlock(&port->msgs_lock);
return found;
}
int send_ipc_message_duplex(struct shim_ipc_msg_duplex* msg, struct shim_ipc_port* port,
unsigned long* seq, void* private_data) {
int ret = 0;
struct shim_thread* thread = get_cur_thread();
assert(thread);
/* prepare thread which will send the message for waiting for response
* (this also acquires reference to the thread) */
if (!msg->thread)
thread_setwait(&msg->thread, thread);
static struct atomic_int ipc_seq_counter;
msg->msg.seq = atomic_inc_return(&ipc_seq_counter);
/* save the message to list of port msgs together with its private data */
lock(&port->msgs_lock);
msg->private = private_data;
LISTP_ADD_TAIL(msg, &port->msgs, list);
unlock(&port->msgs_lock);
ret = send_ipc_message(&msg->msg, port);
if (ret < 0)
goto out;
if (seq)
*seq = msg->msg.seq;
debug("Waiting for response (seq = %lu)\n", msg->msg.seq);
/* force thread which will send the message to wait for response;
* ignore unrelated interrupts but fail on actual errors */
do {
ret = thread_sleep(NO_TIMEOUT);
if (ret < 0 && ret != -EINTR && ret != -EAGAIN)
goto out;
} while (ret != 0);
debug("Finished waiting for response (seq = %lu, ret = %d)\n", msg->msg.seq, msg->retval);
ret = msg->retval;
out:
lock(&port->msgs_lock);
if (!LIST_EMPTY(msg, list))
LISTP_DEL_INIT(msg, &port->msgs, list);
unlock(&port->msgs_lock);
if (msg->thread) {
/* put reference to the thread acquired earlier */
put_thread(msg->thread);
msg->thread = NULL;
}
return ret;
}
/* must be called with cur_process.lock taken */
struct shim_ipc_info* create_ipc_info_cur_process(bool is_self_ipc_info) {
struct shim_ipc_info* info = create_ipc_info(cur_process.vmid, NULL, 0);
if (!info)
return NULL;
/* pipe for cur_process.self is of format "pipe:", others with random name */
char uri[PIPE_URI_SIZE];
if (create_pipe(NULL, uri, PIPE_URI_SIZE, &info->pal_handle, &info->uri, is_self_ipc_info) <
0) {
put_ipc_info(info);
return NULL;
}
add_ipc_port_by_id(cur_process.vmid, info->pal_handle, IPC_PORT_SERVER, NULL, &info->port);
return info;
}
int get_ipc_info_cur_process(struct shim_ipc_info** info) {
lock(&cur_process.lock);
if (!cur_process.self) {
cur_process.self = create_ipc_info_cur_process(true);
if (!cur_process.self) {
unlock(&cur_process.lock);
return -EACCES;
}
}
get_ipc_info(cur_process.self);
*info = cur_process.self;
unlock(&cur_process.lock);
return 0;
}
DEFINE_PROFILE_INTERVAL(ipc_checkpoint_send, ipc);
DEFINE_PROFILE_INTERVAL(ipc_checkpoint_callback, ipc);
/* Graphene's checkpoint() syscall broadcasts a msg to all processes
* asking to checkpoint their state and save in process-unique file in
* directory cpdir under session cpsession. */
int ipc_checkpoint_send(const char* cpdir, IDTYPE cpsession) {
BEGIN_PROFILE_INTERVAL();
int ret;
size_t len = strlen(cpdir);
size_t total_msg_size = get_ipc_msg_size(sizeof(struct shim_ipc_checkpoint) + len);
struct shim_ipc_msg* msg = __alloca(total_msg_size);
init_ipc_msg(msg, IPC_CHECKPOINT, total_msg_size, 0);
struct shim_ipc_checkpoint* msgin = (struct shim_ipc_checkpoint*)&msg->msg;
msgin->cpsession = cpsession;
memcpy(&msgin->cpdir, cpdir, len + 1);
debug("IPC broadcast to all: IPC_CHECKPOINT(%u, %s)\n", cpsession, cpdir);
/* broadcast to all including myself (so I can also checkpoint) */
ret = broadcast_ipc(msg, IPC_PORT_DIRCLD | IPC_PORT_DIRPRT,
/*exclude_port=*/NULL);
SAVE_PROFILE_INTERVAL(ipc_checkpoint_send);
return ret;
}
/* This process is asked to create a checkpoint, so it:
* - sends a Graphene-specific SIGCP signal to all its threads (for
* all to stop and join the checkpoint for consistent state),
* - broadcasts checkpoint msg further to other processes. */
int ipc_checkpoint_callback(struct shim_ipc_msg* msg, struct shim_ipc_port* port) {
BEGIN_PROFILE_INTERVAL();
int ret = 0;
struct shim_ipc_checkpoint* msgin = (struct shim_ipc_checkpoint*)msg->msg;
debug("IPC callback from %u: IPC_CHECKPOINT(%u, %s)\n", msg->src, msgin->cpsession,
msgin->cpdir);
ret = create_checkpoint(msgin->cpdir, &msgin->cpsession);
if (ret < 0)
goto out;
kill_all_threads(NULL, msgin->cpsession, SIGCP);
broadcast_ipc(msg, IPC_PORT_DIRCLD | IPC_PORT_DIRPRT, port);
out:
SAVE_PROFILE_INTERVAL(ipc_checkpoint_callback);
return ret;
}
BEGIN_CP_FUNC(ipc_info) {
assert(size == sizeof(struct shim_ipc_info));
struct shim_ipc_info* info = (struct shim_ipc_info*)obj;
struct shim_ipc_info* new_info = NULL;
ptr_t off = GET_FROM_CP_MAP(obj);
if (!off) {
off = ADD_CP_OFFSET(sizeof(struct shim_ipc_info));
ADD_TO_CP_MAP(obj, off);
new_info = (struct shim_ipc_info*)(base + off);
memcpy(new_info, info, sizeof(struct shim_ipc_info));
REF_SET(new_info->ref_count, 0);
/* call qstr-specific checkpointing function for new_info->uri */
DO_CP_IN_MEMBER(qstr, new_info, uri);
if (info->pal_handle) {
struct shim_palhdl_entry* entry;
/* call palhdl-specific checkpointing function to checkpoint
* info->pal_handle and return created object in entry */
DO_CP(palhdl, info->pal_handle, &entry);
/* info's PAL handle will be re-opened with new URI during
* palhdl restore (see checkpoint.c) */
entry->uri = &new_info->uri;
entry->phandle = &new_info->pal_handle;
}
} else {
/* already checkpointed */
new_info = (struct shim_ipc_info*)(base + off);
}
if (new_info && objp)
*objp = (void*)new_info;
}
END_CP_FUNC_NO_RS(ipc_info)
BEGIN_CP_FUNC(process) {
assert(size == sizeof(struct shim_process));
struct shim_process* process = (struct shim_process*)obj;
struct shim_process* new_process = NULL;
ptr_t off = GET_FROM_CP_MAP(obj);
if (!off) {
off = ADD_CP_OFFSET(sizeof(struct shim_process));
ADD_TO_CP_MAP(obj, off);
new_process = (struct shim_process*)(base + off);
memcpy(new_process, process, sizeof(struct shim_process));
/* call ipc_info-specific checkpointing functions
* for new_process's self, parent, and ns infos */
if (process->self)
DO_CP_MEMBER(ipc_info, process, new_process, self);
if (process->parent)
DO_CP_MEMBER(ipc_info, process, new_process, parent);
for (int i = 0; i < TOTAL_NS; i++)
if (process->ns[i])
DO_CP_MEMBER(ipc_info, process, new_process, ns[i]);
ADD_CP_FUNC_ENTRY(off);
} else {
/* already checkpointed */
new_process = (struct shim_process*)(base + off);
}
if (objp)
*objp = (void*)new_process;
}
END_CP_FUNC(process)
BEGIN_RS_FUNC(process) {
__UNUSED(offset);
struct shim_process* process = (void*)(base + GET_CP_FUNC_ENTRY());
/* process vmid = 0: fork/clone case, forces to pick up new host-OS vmid
* process vmid != 0: execve case, forces to re-use vmid of parent */
if (!process->vmid)
process->vmid = cur_process.vmid;
CP_REBASE(process->self);
CP_REBASE(process->parent);
CP_REBASE(process->ns);
if (process->self) {
process->self->vmid = process->vmid;
get_ipc_info(process->self);
}
if (process->parent)
get_ipc_info(process->parent);
for (int i = 0; i < TOTAL_NS; i++)
if (process->ns[i])
get_ipc_info(process->ns[i]);
memcpy(&cur_process, process, sizeof(struct shim_process));
create_lock(&cur_process.lock);
DEBUG_RS("vmid=%u,uri=%s,parent=%u(%s)", process->vmid,
process->self ? qstrgetstr(&process->self->uri) : "",
process->parent ? process->parent->vmid : 0,
process->parent ? qstrgetstr(&process->parent->uri) : "");
}
END_RS_FUNC(process)