#include #include "Enclave_u.h" #include "Untrusted.hpp" #include "net.hpp" // The command type byte values #define COMMAND_EPOCH 0x00 #define COMMAND_MESSAGE 0x01 #define COMMAND_CHUNK 0x02 #define VERBOSE_NET // #define DEBUG_NET_CLIENTS #define CEILDIV(x,y) (((x)+(y)-1)/(y)) NetIO *g_netio = NULL; NodeIO::NodeIO(tcp::socket &&socket, nodenum_t nodenum) : sock(std::move(socket)), node_num(nodenum) { } uint8_t *NodeIO::request_frame() { if (frames_available.empty()) { // Allocate a new frame. Note that this memory will (at this // time) never get deallocated. In theory, we could deallocate // it in return_frame, but if a certain number of frames were // allocated here, it means we had that much data in flight // (queued but not accepted for sending by the OS), and we're // likely to need that much again. Subsequent messages will // _reuse_ the allocated data, though, so the used memory won't // grow forever, and will be limited to the amount of in-flight // data needed. return new uint8_t[FRAME_SIZE]; } // Copy the pointer to the frame out of the deque and remove it from // the deque. Note this is _not_ taking the address of the element // *in* the deque (and then popping it, which would invalidate that // pointer). frame_deque_lock.lock(); uint8_t *frame = frames_available.back(); frames_available.pop_back(); frame_deque_lock.unlock(); return frame; } void NodeIO::return_frame(uint8_t *frame) { if (!frame) return; // We push the frame back on to the end of the deque so that it will // be the next one used. This may lead to better cache behaviour? frame_deque_lock.lock(); frames_available.push_back(frame); frame_deque_lock.unlock(); } void NodeIO::send_header_data(uint64_t header, uint8_t *data, size_t len) { commands_deque_lock.lock(); commands_inflight.push_back({header, data, len}); if (commands_inflight.size() == 1) { async_send_commands(); } commands_deque_lock.unlock(); } void NodeIO::async_send_commands() { std::vector tosend; CommandTuple *commandp = &(commands_inflight.front()); tosend.push_back(boost::asio::buffer(&(std::get<0>(*commandp)), 5)); if (std::get<1>(*commandp) != NULL && std::get<2>(*commandp) > 0) { tosend.push_back(boost::asio::buffer(std::get<1>(*commandp), std::get<2>(*commandp))); } boost::asio::async_write(sock, tosend, [this, commandp](boost::system::error_code, std::size_t){ // When the write completes, pop the command from the deque // (which should now be in the front) commands_deque_lock.lock(); assert(!commands_inflight.empty() && &(commands_inflight.front()) == commandp); uint8_t *data = std::get<1>(*commandp); commands_inflight.pop_front(); if (commands_inflight.size() > 0) { async_send_commands(); } // And return the frame return_frame(data); commands_deque_lock.unlock(); }); } void NodeIO::send_epoch(uint32_t epoch_num) { uint64_t header = (uint64_t(epoch_num) << 8) + COMMAND_EPOCH; send_header_data(header, NULL, 0); } void NodeIO::send_message_header(uint32_t tot_message_len) { uint64_t header = (uint64_t(tot_message_len) << 8) + COMMAND_MESSAGE; send_header_data(header, NULL, 0); // If we're sending a new message header, we have to have finished // sending the previous message. assert(chunksize_inflight == msgsize_inflight); msgsize_inflight = tot_message_len; chunksize_inflight = 0; } bool NodeIO::send_chunk(uint8_t *data, uint32_t chunk_len) { assert(chunk_len <= FRAME_SIZE); uint64_t header = (uint64_t(chunk_len) << 8) + COMMAND_CHUNK; send_header_data(header, data, chunk_len); chunksize_inflight += chunk_len; assert(chunksize_inflight <= msgsize_inflight); return (chunksize_inflight < msgsize_inflight); } void NodeIO::recv_commands( std::function error_cb, std::function epoch_cb) { // Asynchronously read the header receive_header = 0; boost::asio::async_read(sock, boost::asio::buffer(&receive_header, 5), [this, error_cb, epoch_cb] (boost::system::error_code ec, std::size_t) { if (ec) { error_cb(ec); return; } if ((receive_header & 0xff) == COMMAND_EPOCH) { epoch_cb(uint32_t(receive_header >> 8)); recv_commands(error_cb, epoch_cb); } else if ((receive_header & 0xff) == COMMAND_MESSAGE) { assert(recv_msgsize_inflight == recv_chunksize_inflight); recv_msgsize_inflight = uint32_t(receive_header >> 8); recv_chunksize_inflight = 0; if (ecall_message(node_num, recv_msgsize_inflight)) { recv_commands(error_cb, epoch_cb); } else { printf("ecall_message failed\n"); } } else if ((receive_header & 0xff) == COMMAND_CHUNK) { uint32_t this_chunk_size = uint32_t(receive_header >> 8); assert(recv_chunksize_inflight + this_chunk_size <= recv_msgsize_inflight); recv_chunksize_inflight += this_chunk_size; boost::asio::async_read(sock, boost::asio::buffer( receive_frame, this_chunk_size), [this, error_cb, epoch_cb, this_chunk_size] (boost::system::error_code ecc, std::size_t) { if (ecc) { error_cb(ecc); return; } if (ecall_chunk(node_num, receive_frame, this_chunk_size)) { recv_commands(error_cb, epoch_cb); } else { printf("ecall_chunk failed\n"); } }); } else { error_cb(boost::system::errc::make_error_code( boost::system::errc::errc_t::invalid_argument)); } }); } /* Receive clients dropped off messages, i.e. a CLIENT_MESSAGE_BUNDLE */ void NetIO::ing_receive_msgbundle(tcp::socket* csocket, clientid_t c_simid) { unsigned char *msgbundle = (unsigned char*) malloc(msgbundle_size); boost::asio::async_read(*csocket, boost::asio::buffer(msgbundle, msgbundle_size), [this, csocket, msgbundle, c_simid] (boost::system::error_code ec, std::size_t) { if (ec) { if(ec == boost::asio::error::eof) { // Client connection terminated so we delete this socket delete(csocket); } else { printf("Error ing_receive_msgbundle : %s\n", ec.message().c_str()); } return; } //Ingest the message_bundle bool ret = ecall_ingest_msgbundle(c_simid, msgbundle, conf.m_priv_out); free(msgbundle); // Continue to async receive client message bundles ing_receive_msgbundle(csocket, c_simid); }); } /* Handle new client connections. New clients always send an authentication message. For ingestion this is then followed by their msg_bundles every epoch. */ void NetIO::ing_authenticate_new_client(tcp::socket* csocket, const boost::system::error_code& error) { if(error) { printf("Accept handler failed\n"); return; } #ifdef DEBUG_NET_CLIENTS printf("Accept handler success\n"); #endif unsigned char* auth_message = (unsigned char*) malloc(auth_size); boost::asio::async_read(*csocket, boost::asio::buffer(auth_message, auth_size), [this, csocket, auth_message] (boost::system::error_code ec, std::size_t) { if (ec) { if(ec == boost::asio::error::eof) { // Client connection terminated so we delete this socket delete(csocket); } else { printf("Error ing_auth_new_client : %s\n", ec.message().c_str()); } return; } else { clientid_t c_simid = *((clientid_t *)(auth_message)); // Read the authentication token unsigned char *auth_ptr = auth_message + sizeof(clientid_t); bool ret = ecall_authenticate(c_simid, auth_ptr); free(auth_message); // Receive client message bundles on this socket // for client sim_id c_simid if(ret) { ing_receive_msgbundle(csocket, c_simid); } else{ printf("Client <-> Ingestion authentication failed\n"); delete(csocket); } } }); ing_start_accept(); } /* Handle new client connections. New clients always send an authentication message. For storage this is then followed by the storage servers sending them their mailbox every epoch. */ void NetIO::stg_authenticate_new_client(tcp::socket* csocket, const boost::system::error_code& error) { if(error) { printf("Accept handler failed\n"); return; } #ifdef DEBUG_NET_CLIENTS printf("Accept handler success\n"); #endif unsigned char* auth_message = (unsigned char*) malloc(auth_size); boost::asio::async_read(*csocket, boost::asio::buffer(auth_message, auth_size), [this, csocket, auth_message] (boost::system::error_code ec, std::size_t) { if (ec) { if(ec == boost::asio::error::eof) { // Client connection terminated so we delete this socket delete(csocket); } else { printf("Error stg_auth_new_client: %s\n", ec.message().c_str()); } return; } else { clientid_t c_simid = *((clientid_t *)(auth_message)); // Read the authentication token unsigned char *auth_ptr = auth_message + sizeof(clientid_t); bool ret = ecall_storage_authenticate(c_simid, auth_ptr); free(auth_message); // If the auth is successful, store this socket into // a client socket array at the local_c_simid index // for storage servers to send clients their mailbox periodically. if(ret) { uint32_t lcid = c_simid / num_stg_nodes; client_sockets[lcid] = csocket; } else{ printf("Client <-> Storage authentication failed\n"); delete (csocket); } } }); stg_start_accept(); } /* Asynchronously accept new client connections */ void NetIO::ing_start_accept() { tcp::socket *csocket = new tcp::socket(io_context()); #ifdef DEBUG_NET_CLIENTS std::cout << "Accepting on " << myconf.clistenhost << ":" << myconf.clistenport << "\n"; #endif ingestion_acceptor->async_accept(*csocket, boost::bind(&NetIO::ing_authenticate_new_client, this, csocket, boost::asio::placeholders::error)); } void NetIO::stg_start_accept() { tcp::socket *csocket = new tcp::socket(io_context()); #ifdef DEBUG_NET_CLIENTS std::cout << "Accepting on " << myconf.slistenhost << ":" << myconf.slistenport << "\n"; #endif storage_acceptor->async_accept(*csocket, boost::bind(&NetIO::stg_authenticate_new_client, this, csocket, boost::asio::placeholders::error)); } void NetIO::send_client_mailbox() { // Send each client their tokens for the next epoch for(uint32_t lcid = 0; lcid < num_clients_per_stg; lcid++) { unsigned char *tkn_ptr = epoch_tokens + lcid * token_bundle_size; unsigned char *buf_ptr = epoch_mailboxes + lcid * mailbox_size; if(client_sockets[lcid]!=nullptr) { boost::asio::async_write(*(client_sockets[lcid]), boost::asio::buffer(tkn_ptr, token_bundle_size), [this, lcid, buf_ptr](boost::system::error_code ec, std::size_t){ if (ec) { if(ec == boost::asio::error::eof) { // Client connection terminated so we delete this socket delete(client_sockets[lcid]); printf("Client socket terminated!\n"); } else { printf("Error send_client_mailbox tokens: %s\n", ec.message().c_str()); } return; } boost::asio::async_write(*(client_sockets[lcid]), boost::asio::buffer(buf_ptr, mailbox_size), [this, lcid](boost::system::error_code ecc, std::size_t){ //printf("NetIO::send_client_mailbox, Client %d messages was sent\n", lcid); if (ecc) { if(ecc == boost::asio::error::eof) { // Client connection terminated so we delete this socket delete(client_sockets[lcid]); printf("Client socket terminated!\n"); } else { printf("Error send_client_mailbox mailbox (lcid = %d): %s\n", lcid, ecc.message().c_str()); } return; } }); }); } /* else { printf("Client did not have a socket!\n"); } */ } } NetIO::NetIO(boost::asio::io_context &io_context, const Config &config) : context(io_context), conf(config), myconf(config.nodes[config.my_node_num]) { num_nodes = nodenum_t(conf.nodes.size()); nodeios.resize(num_nodes); me = conf.my_node_num; // Node number n will accept connections from nodes 0, ..., n-1 and // make connections to nodes n+1, ..., num_nodes-1. This is all // single threaded, but it doesn't deadlock because node 0 isn't // waiting for any incoming connections, so it immediately makes // outgoing connections. When it connects to node 1, that node // accepts its (only) incoming connection, and then starts making // its outgoing connections, etc. tcp::resolver resolver(io_context); tcp::acceptor acceptor(io_context, resolver.resolve(myconf.listenhost, myconf.listenport)->endpoint()); for(size_t i=0; i= num_nodes) { std::cerr << "Received bad node number\n"; } else { nodeios[node_num].emplace(std::move(nodesock), node_num); #ifdef VERBOSE_NET std::cerr << "Received connection from " << config.nodes[node_num].name << "\n"; #endif } } for(size_t i=me+1; i( new tcp::acceptor(io_context, resolver.resolve(this->myconf.slistenhost, this->myconf.slistenport)->endpoint())); stg_start_accept(); } if(myconf.roles & ROLE_INGESTION) { ingestion_acceptor = std::shared_ptr( new tcp::acceptor(io_context, resolver.resolve(this->myconf.clistenhost, this->myconf.clistenport)->endpoint())); ing_start_accept(); } } void NetIO::recv_commands( std::function error_cb, std::function epoch_cb) { for (nodenum_t node_num = 0; node_num < num_nodes; ++node_num) { if (node_num == me) continue; NodeIO &n = node(node_num); n.recv_commands(error_cb, epoch_cb); } } void NetIO::close() { for (nodenum_t node_num = 0; node_num < num_nodes; ++node_num) { if (node_num == me) continue; NodeIO &n = node(node_num); n.close(); } } /* The enclave calls this to inform the untrusted app that there's a new * messaage to send. The return value is the frame the enclave should * use to store the first (encrypted) chunk of this message. */ uint8_t *ocall_message(nodenum_t node_num, uint32_t message_len) { assert(g_netio != NULL); NodeIO &node = g_netio->node(node_num); node.send_message_header(message_len); return node.request_frame(); } /* The enclave calls this to inform the untrusted app that there's a new * chunk to send. The return value is the frame the enclave should use * to store the next (encrypted) chunk of this message, or NULL if this * was the last chunk. */ uint8_t *ocall_chunk(nodenum_t node_num, uint8_t *chunkdata, uint32_t chunklen) { assert(g_netio != NULL); NodeIO &node = g_netio->node(node_num); bool morechunks = node.send_chunk(chunkdata, chunklen); if (morechunks) { return node.request_frame(); } return NULL; }