#include #include "Enclave_u.h" #include "Untrusted.hpp" #include "net.hpp" // The command type byte values #define COMMAND_EPOCH 0x00 #define COMMAND_MESSAGE 0x01 #define COMMAND_CHUNK 0x02 #define VERBOSE_NET // #define DEBUG_NET_CLIENTS #define PROFILE_NET_CLIENTS #define CEILDIV(x,y) (((x)+(y)-1)/(y)) NetIO *g_netio = NULL; size_t client_count = 0; NodeIO::NodeIO(tcp::socket &&socket, nodenum_t nodenum) : sock(std::move(socket)), node_num(nodenum), msgsize_inflight(0), chunksize_inflight(0), recv_msgsize_inflight(0), recv_chunksize_inflight(0), bytes_sent(0) { } uint8_t *NodeIO::request_frame() { if (frames_available.empty()) { // Allocate a new frame. Note that this memory will (at this // time) never get deallocated. In theory, we could deallocate // it in return_frame, but if a certain number of frames were // allocated here, it means we had that much data in flight // (queued but not accepted for sending by the OS), and we're // likely to need that much again. Subsequent messages will // _reuse_ the allocated data, though, so the used memory won't // grow forever, and will be limited to the amount of in-flight // data needed. return new uint8_t[FRAME_SIZE]; } // Copy the pointer to the frame out of the deque and remove it from // the deque. Note this is _not_ taking the address of the element // *in* the deque (and then popping it, which would invalidate that // pointer). frame_deque_lock.lock(); uint8_t *frame = frames_available.back(); frames_available.pop_back(); frame_deque_lock.unlock(); return frame; } void NodeIO::return_frame(uint8_t *frame) { if (!frame) return; // We push the frame back on to the end of the deque so that it will // be the next one used. This may lead to better cache behaviour? frame_deque_lock.lock(); frames_available.push_back(frame); frame_deque_lock.unlock(); } void NodeIO::send_header_data(uint64_t header, uint8_t *data, size_t len) { commands_deque_lock.lock(); commands_inflight.push_back({header, data, len}); if (commands_inflight.size() == 1) { async_send_commands(); } commands_deque_lock.unlock(); } void NodeIO::async_send_commands() { std::vector tosend; CommandTuple *commandp = &(commands_inflight.front()); tosend.push_back(boost::asio::buffer(&(std::get<0>(*commandp)), 5)); if (std::get<1>(*commandp) != NULL && std::get<2>(*commandp) > 0) { tosend.push_back(boost::asio::buffer(std::get<1>(*commandp), std::get<2>(*commandp))); } boost::asio::async_write(sock, tosend, [this, commandp](boost::system::error_code, std::size_t){ // When the write completes, pop the command from the deque // (which should now be in the front) commands_deque_lock.lock(); assert(!commands_inflight.empty() && &(commands_inflight.front()) == commandp); bytes_sent = bytes_sent + 5 + std::get<2>(*commandp); uint8_t *data = std::get<1>(*commandp); commands_inflight.pop_front(); if (commands_inflight.size() > 0) { async_send_commands(); } // And return the frame return_frame(data); commands_deque_lock.unlock(); }); } void NodeIO::send_epoch(uint32_t epoch_num) { uint64_t header = (uint64_t(epoch_num) << 8) + COMMAND_EPOCH; send_header_data(header, NULL, 0); } void NodeIO::send_message_header(uint32_t tot_message_len) { uint64_t header = (uint64_t(tot_message_len) << 8) + COMMAND_MESSAGE; send_header_data(header, NULL, 0); // If we're sending a new message header, we have to have finished // sending the previous message. assert(chunksize_inflight == msgsize_inflight); msgsize_inflight = tot_message_len; chunksize_inflight = 0; #ifdef TRACE_SOCKIO struct timeval now; gettimeofday(&now, NULL); printf("%lu.%06lu: RTE queueing %u bytes to %s\n", now.tv_sec, now.tv_usec, msgsize_inflight, g_netio->config().nodes[node_num].name.c_str()); if (msgsize_inflight == 0) { printf("%lu.%06lu: RTE queued %u bytes to %s\n", now.tv_sec, now.tv_usec, msgsize_inflight, g_netio->config().nodes[node_num].name.c_str()); } #endif } bool NodeIO::send_chunk(uint8_t *data, uint32_t chunk_len) { assert(chunk_len <= FRAME_SIZE); uint64_t header = (uint64_t(chunk_len) << 8) + COMMAND_CHUNK; send_header_data(header, data, chunk_len); chunksize_inflight += chunk_len; assert(chunksize_inflight <= msgsize_inflight); #ifdef TRACE_SOCKIO if (msgsize_inflight == chunksize_inflight) { struct timeval now; gettimeofday(&now, NULL); printf("%lu.%06lu: RTE queued %u bytes to %s\n", now.tv_sec, now.tv_usec, msgsize_inflight, g_netio->config().nodes[node_num].name.c_str()); } #endif return (chunksize_inflight < msgsize_inflight); } void NodeIO::recv_commands( std::function error_cb, std::function epoch_cb) { // Asynchronously read the header receive_header = 0; boost::asio::async_read(sock, boost::asio::buffer(&receive_header, 5), [this, error_cb, epoch_cb] (boost::system::error_code ec, std::size_t) { if (ec) { error_cb(ec); return; } if ((receive_header & 0xff) == COMMAND_EPOCH) { epoch_cb(uint32_t(receive_header >> 8)); recv_commands(error_cb, epoch_cb); } else if ((receive_header & 0xff) == COMMAND_MESSAGE) { assert(recv_msgsize_inflight == recv_chunksize_inflight); recv_msgsize_inflight = uint32_t(receive_header >> 8); recv_chunksize_inflight = 0; #ifdef TRACE_SOCKIO struct timeval now; gettimeofday(&now, NULL); printf("%lu.%06lu: RTE receiving %u bytes from %s\n", now.tv_sec, now.tv_usec, recv_msgsize_inflight, g_netio->config().nodes[node_num].name.c_str()); if (recv_msgsize_inflight == 0) { printf("%lu.%06lu: RTE received %u bytes from %s\n", now.tv_sec, now.tv_usec, recv_msgsize_inflight, g_netio->config().nodes[node_num].name.c_str()); } #endif if (ecall_message(node_num, recv_msgsize_inflight)) { recv_commands(error_cb, epoch_cb); } else { printf("ecall_message failed\n"); } } else if ((receive_header & 0xff) == COMMAND_CHUNK) { uint32_t this_chunk_size = uint32_t(receive_header >> 8); assert(recv_chunksize_inflight + this_chunk_size <= recv_msgsize_inflight); recv_chunksize_inflight += this_chunk_size; boost::asio::async_read(sock, boost::asio::buffer( receive_frame, this_chunk_size), [this, error_cb, epoch_cb, this_chunk_size] (boost::system::error_code ecc, std::size_t) { if (ecc) { error_cb(ecc); return; } #ifdef TRACE_SOCKIO if (recv_msgsize_inflight == recv_chunksize_inflight) { struct timeval now; gettimeofday(&now, NULL); printf("%lu.%06lu: RTE received %u bytes from %s\n", now.tv_sec, now.tv_usec, recv_msgsize_inflight, g_netio->config().nodes[node_num].name.c_str()); } #endif if (ecall_chunk(node_num, receive_frame, this_chunk_size)) { recv_commands(error_cb, epoch_cb); } else { printf("ecall_chunk failed\n"); } }); } else { error_cb(boost::system::errc::make_error_code( boost::system::errc::errc_t::invalid_argument)); } }); } uint64_t NodeIO::reset_bytes_sent() { uint64_t b_sent = bytes_sent; bytes_sent = 0; return b_sent; } uint64_t NetIO::reset_bytes_sent() { uint64_t total=0; for(size_t i = 0; i 0 && elapsedus > 500000) { printf("%lu.%06lu: End ingestion of %lu messages\n", last_ing.tv_sec, last_ing.tv_usec, num_ing); num_ing = 0; } if (num_ing == 0) { printf("%lu.%06lu: Begin ingestion\n", now.tv_sec, now.tv_usec); } #endif bool ret; //Ingest the message_bundle if(conf.private_routing) { ret = ecall_ingest_msgbundle(c_simid, msgbundle, conf.m_priv_out); } else { ret = ecall_ingest_msgbundle(c_simid, msgbundle, conf.m_pub_out); } free(msgbundle); #ifdef TRACE_SOCKIO gettimeofday(&last_ing, NULL); ++num_ing; #endif // Continue to async receive client message bundles if(ret) { ing_receive_msgbundle(csocket, c_simid); } }); } /* Handle new client connections. New clients always send an authentication message. For ingestion this is then followed by their msg_bundles every epoch. */ void NetIO::ing_authenticate_new_client(tcp::socket* csocket, const boost::system::error_code& error) { if(error) { printf("Accept handler failed\n"); return; } #ifdef DEBUG_NET_CLIENTS printf("Accept handler success\n"); #endif unsigned char* auth_message = (unsigned char*) malloc(auth_size); boost::asio::async_read(*csocket, boost::asio::buffer(auth_message, auth_size), [this, csocket, auth_message] (boost::system::error_code ec, std::size_t) { if (ec) { if(ec == boost::asio::error::eof) { // Client connection terminated so we delete this socket delete(csocket); } else { printf("Error ing_auth_new_client : %s\n", ec.message().c_str()); } free(auth_message); return; } else { clientid_t c_simid = *((clientid_t *)(auth_message)); // Read the authentication token unsigned char *auth_ptr = auth_message + sizeof(clientid_t); bool ret = ecall_authenticate(c_simid, auth_ptr); free(auth_message); // Receive client message bundles on this socket // for client sim_id c_simid if(ret) { client_count++; ing_receive_msgbundle(csocket, c_simid); } else{ printf("Client <-> Ingestion authentication failed\n"); delete(csocket); } } }); ing_start_accept(); } #ifdef TRACE_SOCKIO static size_t stg_clients_connected = 0; static size_t stg_clients_authenticated = 0; #endif /* Handle new client connections. New clients always send an authentication message. For storage this is then followed by the storage servers sending them their mailbox every epoch. */ void NetIO::stg_authenticate_new_client(tcp::socket* csocket, const boost::system::error_code& error) { if(error) { printf("Accept handler failed\n"); return; } #ifdef DEBUG_NET_CLIENTS printf("Accept handler success\n"); #endif unsigned char* auth_message = (unsigned char*) malloc(auth_size); boost::asio::async_read(*csocket, boost::asio::buffer(auth_message, auth_size), [this, csocket, auth_message] (boost::system::error_code ec, std::size_t) { if (ec) { if(ec == boost::asio::error::eof) { // Client connection terminated so we delete this socket delete(csocket); } else { printf("Error stg_auth_new_client: %s\n", ec.message().c_str()); } free(auth_message); return; } else { #ifdef TRACE_SOCKIO ++stg_clients_connected; if (stg_clients_connected % 1000 == 0) { struct timeval now; gettimeofday(&now, NULL); printf("%lu.%06lu: STG %lu clients connected\n", now.tv_sec, now.tv_usec, stg_clients_connected); } #endif clientid_t c_simid = *((clientid_t *)(auth_message)); // Read the authentication token unsigned char *auth_ptr = auth_message + sizeof(clientid_t); bool ret = ecall_storage_authenticate(c_simid, auth_ptr); free(auth_message); // If the auth is successful, store this socket into // a client socket array at the local_c_simid index // for storage servers to send clients their mailbox periodically. if(ret) { uint32_t lcid = c_simid / num_stg_nodes; client_sockets[lcid] = csocket; #ifdef TRACE_SOCKIO ++stg_clients_authenticated; if (stg_clients_authenticated % 1000 == 0) { struct timeval now; gettimeofday(&now, NULL); printf("%lu.%06lu: STG %lu clients authenticated\n", now.tv_sec, now.tv_usec, stg_clients_authenticated); } #endif } else{ printf("Client <-> Storage authentication failed\n"); delete (csocket); } } }); stg_start_accept(); } /* Asynchronously accept new client connections */ void NetIO::ing_start_accept() { tcp::socket *csocket = new tcp::socket(io_context()); #ifdef DEBUG_NET_CLIENTS std::cout << "Accepting on " << myconf.clistenhost << ":" << myconf.clistenport << "\n"; #endif ingestion_acceptor->async_accept(*csocket, boost::bind(&NetIO::ing_authenticate_new_client, this, csocket, boost::asio::placeholders::error)); } void NetIO::stg_start_accept() { tcp::socket *csocket = new tcp::socket(io_context()); #ifdef DEBUG_NET_CLIENTS std::cout << "Accepting on " << myconf.slistenhost << ":" << myconf.slistenport << "\n"; #endif storage_acceptor->async_accept(*csocket, boost::bind(&NetIO::stg_authenticate_new_client, this, csocket, boost::asio::placeholders::error)); } void NetIO::send_client_mailbox() { #ifdef PROFILE_NET_CLIENTS struct timespec tp; clock_gettime(CLOCK_REALTIME_COARSE, &tp); unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000; #endif #ifdef TRACE_SOCKIO size_t clients_without_sockets = 0; size_t mailboxes_queued = 0; #endif // Send each client their tokens and mailboxes for the next epoch for(uint32_t lcid = 0; lcid < num_clients_per_stg; lcid++) { unsigned char *tkn_ptr = epoch_tokens + lcid * token_bundle_size; unsigned char *buf_ptr = epoch_mailboxes + lcid * mailbox_size; if(client_sockets[lcid]!=nullptr) { std::vector tosend = { boost::asio::buffer(tkn_ptr, token_bundle_size), boost::asio::buffer(buf_ptr, mailbox_size) }; boost::asio::async_write(*(client_sockets[lcid]), tosend, [this, lcid](boost::system::error_code ec, std::size_t){ if (ec) { if(ec == boost::asio::error::eof) { // Client connection terminated so we delete this socket delete(client_sockets[lcid]); printf("Client socket terminated!\n"); } else { printf("Error send_client_mailbox tokens: %s\n", ec.message().c_str()); } return; } }); #ifdef TRACE_SOCKIO ++mailboxes_queued; } else { ++clients_without_sockets; #endif } } #ifdef TRACE_SOCKIO struct timeval now; gettimeofday(&now, NULL); printf("%lu.%06lu: STG queued %lu mailboxes; %lu clients without sockets\n", now.tv_sec, now.tv_usec, mailboxes_queued, clients_without_sockets); #endif #ifdef PROFILE_NET_CLIENTS clock_gettime(CLOCK_REALTIME_COARSE, &tp); unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000; unsigned long diff = end - start; printf("send_client_mailbox time: %lu.%06lu s\n", diff/1000000, diff%1000000); #endif } NetIO::NetIO(boost::asio::io_context &io_context, const Config &config) : context(io_context), conf(config), myconf(config.nodes[config.my_node_num]) { num_nodes = nodenum_t(conf.nodes.size()); nodeios.resize(num_nodes); me = conf.my_node_num; #ifdef TRACE_SOCKIO last_ing = {0, 0}; num_ing = 0; #endif // Node number n will accept connections from nodes 0, ..., n-1 and // make connections to nodes n+1, ..., num_nodes-1. This is all // single threaded, but it doesn't deadlock because node 0 isn't // waiting for any incoming connections, so it immediately makes // outgoing connections. When it connects to node 1, that node // accepts its (only) incoming connection, and then starts making // its outgoing connections, etc. tcp::resolver resolver(io_context); tcp::acceptor acceptor(io_context, resolver.resolve(myconf.listenhost, myconf.listenport)->endpoint()); for(size_t i=0; i= num_nodes) { std::cerr << "Received bad node number\n"; } else { nodeios[node_num].emplace(std::move(nodesock), node_num); #ifdef VERBOSE_NET std::cerr << "Received connection from " << config.nodes[node_num].name << "\n"; #endif } } for(size_t i=me+1; i( new tcp::acceptor(io_context, resolver.resolve(this->myconf.slistenhost, this->myconf.slistenport)->endpoint())); stg_start_accept(); } if(myconf.roles & ROLE_INGESTION) { ingestion_acceptor = std::shared_ptr( new tcp::acceptor(io_context, resolver.resolve(this->myconf.clistenhost, this->myconf.clistenport)->endpoint())); ing_start_accept(); } } void NetIO::recv_commands( std::function error_cb, std::function epoch_cb) { for (nodenum_t node_num = 0; node_num < num_nodes; ++node_num) { if (node_num == me) continue; NodeIO &n = node(node_num); n.recv_commands(error_cb, epoch_cb); } } void NetIO::close() { for (nodenum_t node_num = 0; node_num < num_nodes; ++node_num) { if (node_num == me) continue; NodeIO &n = node(node_num); n.close(); } } /* The enclave calls this to inform the untrusted app that there's a new * messaage to send. The return value is the frame the enclave should * use to store the first (encrypted) chunk of this message. */ uint8_t *ocall_message(nodenum_t node_num, uint32_t message_len) { assert(g_netio != NULL); NodeIO &node = g_netio->node(node_num); node.send_message_header(message_len); return node.request_frame(); } /* The enclave calls this to inform the untrusted app that there's a new * chunk to send. The return value is the frame the enclave should use * to store the next (encrypted) chunk of this message, or NULL if this * was the last chunk. */ uint8_t *ocall_chunk(nodenum_t node_num, uint8_t *chunkdata, uint32_t chunklen) { assert(g_netio != NULL); NodeIO &node = g_netio->node(node_num); bool morechunks = node.send_chunk(chunkdata, chunklen); if (morechunks) { return node.request_frame(); } return NULL; }