#include #include "Enclave_u.h" #include "Untrusted.hpp" #include "net.hpp" // The command type byte values #define COMMAND_EPOCH 0x00 #define COMMAND_MESSAGE 0x01 #define COMMAND_CHUNK 0x02 NetIO *g_netio = NULL; NodeIO::NodeIO(tcp::socket &&socket, nodenum_t nodenum) : sock(std::move(socket)), node_num(nodenum) { } uint8_t *NodeIO::request_frame() { if (frames_available.empty()) { // Allocate a new frame. Note that this memory will (at this // time) never get deallocated. In theory, we could deallocate // it in return_frame, but if a certain number of frames were // allocated here, it means we had that much data in flight // (queued but not accepted for sending by the OS), and we're // likely to need that much again. Subsequent messages will // _reuse_ the allocated data, though, so the used memory won't // grow forever, and will be limited to the amount of in-flight // data needed. return new uint8_t[FRAME_SIZE]; } // Copy the pointer to the frame out of the deque and remove it from // the deque. Note this is _not_ taking the address of the element // *in* the deque (and then popping it, which would invalidate that // pointer). frame_deque_lock.lock(); uint8_t *frame = frames_available.back(); frames_available.pop_back(); frame_deque_lock.unlock(); return frame; } void NodeIO::return_frame(uint8_t *frame) { if (!frame) return; // We push the frame back on to the end of the deque so that it will // be the next one used. This may lead to better cache behaviour? frame_deque_lock.lock(); frames_available.push_back(frame); frame_deque_lock.unlock(); } void NodeIO::send_header_data(uint64_t header, uint8_t *data, size_t len) { commands_deque_lock.lock(); commands_inflight.push_back({header, data, len}); if (commands_inflight.size() == 1) { async_send_commands(); } commands_deque_lock.unlock(); } void NodeIO::async_send_commands() { std::vector tosend; CommandTuple *commandp = &(commands_inflight.front()); tosend.push_back(boost::asio::buffer(&(std::get<0>(*commandp)), 5)); if (std::get<1>(*commandp) != NULL && std::get<2>(*commandp) > 0) { tosend.push_back(boost::asio::buffer(std::get<1>(*commandp), std::get<2>(*commandp))); } boost::asio::async_write(sock, tosend, [this, commandp](boost::system::error_code, std::size_t){ // When the write completes, pop the command from the deque // (which should now be in the front) commands_deque_lock.lock(); assert(!commands_inflight.empty() && &(commands_inflight.front()) == commandp); uint8_t *data = std::get<1>(*commandp); commands_inflight.pop_front(); if (commands_inflight.size() > 0) { async_send_commands(); } // And return the frame return_frame(data); commands_deque_lock.unlock(); }); } void NodeIO::send_epoch(uint32_t epoch_num) { uint64_t header = (uint64_t(epoch_num) << 8) + COMMAND_EPOCH; send_header_data(header, NULL, 0); } void NodeIO::send_message_header(uint32_t tot_message_len) { uint64_t header = (uint64_t(tot_message_len) << 8) + COMMAND_MESSAGE; send_header_data(header, NULL, 0); // If we're sending a new message header, we have to have finished // sending the previous message. assert(chunksize_inflight == msgsize_inflight); msgsize_inflight = tot_message_len; chunksize_inflight = 0; } bool NodeIO::send_chunk(uint8_t *data, uint32_t chunk_len) { assert(chunk_len <= FRAME_SIZE); uint64_t header = (uint64_t(chunk_len) << 8) + COMMAND_CHUNK; send_header_data(header, data, chunk_len); chunksize_inflight += chunk_len; assert(chunksize_inflight <= msgsize_inflight); return (chunksize_inflight < msgsize_inflight); } void NodeIO::recv_commands( std::function error_cb, std::function epoch_cb) { // Asynchronously read the header receive_header = 0; boost::asio::async_read(sock, boost::asio::buffer(&receive_header, 5), [this, error_cb, epoch_cb] (boost::system::error_code ec, std::size_t) { if (ec) { error_cb(ec); return; } if ((receive_header & 0xff) == COMMAND_EPOCH) { epoch_cb(uint32_t(receive_header >> 8)); recv_commands(error_cb, epoch_cb); } else if ((receive_header & 0xff) == COMMAND_MESSAGE) { assert(recv_msgsize_inflight == recv_chunksize_inflight); recv_msgsize_inflight = uint32_t(receive_header >> 8); recv_chunksize_inflight = 0; if (ecall_message(node_num, recv_msgsize_inflight)) { recv_commands(error_cb, epoch_cb); } else { printf("ecall_message failed\n"); } } else if ((receive_header & 0xff) == COMMAND_CHUNK) { uint32_t this_chunk_size = uint32_t(receive_header >> 8); assert(recv_chunksize_inflight + this_chunk_size <= recv_msgsize_inflight); recv_chunksize_inflight += this_chunk_size; boost::asio::async_read(sock, boost::asio::buffer( receive_frame, this_chunk_size), [this, error_cb, epoch_cb, this_chunk_size] (boost::system::error_code ecc, std::size_t) { if (ecc) { error_cb(ecc); return; } if (ecall_chunk(node_num, receive_frame, this_chunk_size)) { recv_commands(error_cb, epoch_cb); } else { printf("ecall_chunk failed\n"); } }); } else { error_cb(boost::system::errc::make_error_code( boost::system::errc::errc_t::invalid_argument)); } }); } #ifdef VERBOSE_NET void displayMessage(unsigned char *msg, uint16_t msg_size) { clientid_t sid, rid; unsigned char *ptr = msg; sid = *((clientid_t*) ptr); ptr+=sizeof(sid); rid = *((clientid_t*) ptr); uint16_t srpair_size = sizeof(sid)*2; printf("Sender ID: %d, Receiver ID: %d, Token: N/A\n", sid, rid ); printf("Message: "); for(int j = 0; j csocket, const boost::system::error_code& error, size_t auth_size, size_t msgbundle_size) { if(!error) { #ifdef VERBOSE_NET printf("Accept handler success\n"); #endif // Read header (1 uint64_t) from the socket and extract the client ID size_t header; clientid_t cid; boost::asio::read(*csocket, boost::asio::buffer(&header, sizeof(uint64_t))); if((header & 0xff) == CLIENT_AUTHENTICATE) { // Read the authentication token boost::asio::read(*csocket, boost::asio::buffer(&header, auth_size)); } else if ((header & 0xff) == CLIENT_MESSAGE_BUNDLE) { unsigned char *msgbundle = (unsigned char*) malloc(msgbundle_size); cid = (clientid_t)(header >> 8); // Read the message_bundle boost::asio::read(*csocket, boost::asio::buffer(msgbundle, msgbundle_size)); #ifdef VERBOSE_NET displayMessageBundle(msgbundle, apiparams.m_priv_out, apiparams.msg_size); #endif //Ingest the message_bundle bool ret = ecall_ingest_msgbundle(cid, msgbundle, apiparams.m_priv_out); free(msgbundle); } /* This should read a MESSAGES_DROP_OFF packet of fixed length from a client. Send this packet over to ingestion processing: - Decrypt the packet with the correct key for that client id - Verify private channel token - Buffer the message for route */ start_accept(auth_size, msgbundle_size); } else { printf("Accept handler failed\n"); } } /* Asynchronously accept client connections */ void NetIO::start_accept(size_t auth_size, size_t msgbundle_size) { std::shared_ptr csocket(new tcp::socket(io_context_)); #ifdef VERBOSE_NET std::cout << "Accepting on " << myconf.clistenhost << ":" << myconf.clistenport << "\n"; #endif client_acceptor->async_accept(*csocket, boost::bind(&NetIO::handle_async_clients, this, csocket, boost::asio::placeholders::error, auth_size, msgbundle_size)); } NetIO::NetIO(boost::asio::io_context &io_context, const Config &config) : io_context_(io_context), conf(config), myconf(config.nodes[config.my_node_num]) { num_nodes = nodenum_t(conf.nodes.size()); nodeios.resize(num_nodes); me = conf.my_node_num; // Node number n will accept connections from nodes 0, ..., n-1 and // make connections to nodes n+1, ..., num_nodes-1. This is all // single threaded, but it doesn't deadlock because node 0 isn't // waiting for any incoming connections, so it immediately makes // outgoing connections. When it connects to node 1, that node // accepts its (only) incoming connection, and then starts making // its outgoing connections, etc. tcp::resolver resolver(io_context); tcp::acceptor acceptor(io_context, resolver.resolve(myconf.listenhost, myconf.listenport)->endpoint()); for(size_t i=0; i= num_nodes) { std::cerr << "Received bad node number\n"; } else { nodeios[node_num].emplace(std::move(nodesock), node_num); #ifdef VERBOSE_NET std::cerr << "Received connection from " << config.nodes[node_num].name << "\n"; #endif } } for(size_t i=me+1; i( new tcp::acceptor(io_context, resolver.resolve(this->myconf.clistenhost, this->myconf.clistenport)->endpoint())); size_t auth_size, msgbundle_size; auth_size = SGX_AESGCM_MAC_SIZE; msgbundle_size = (apiparams.m_priv_out * apiparams.msg_size) + SGX_AESGCM_MAC_SIZE; start_accept(auth_size, msgbundle_size); } } void NetIO::recv_commands( std::function error_cb, std::function epoch_cb) { for (nodenum_t node_num = 0; node_num < num_nodes; ++node_num) { if (node_num == me) continue; NodeIO &n = node(node_num); n.recv_commands(error_cb, epoch_cb); } } void NetIO::close() { for (nodenum_t node_num = 0; node_num < num_nodes; ++node_num) { if (node_num == me) continue; NodeIO &n = node(node_num); n.close(); } } /* The enclave calls this to inform the untrusted app that there's a new * messaage to send. The return value is the frame the enclave should * use to store the first (encrypted) chunk of this message. */ uint8_t *ocall_message(nodenum_t node_num, uint32_t message_len) { assert(g_netio != NULL); NodeIO &node = g_netio->node(node_num); node.send_message_header(message_len); return node.request_frame(); } /* The enclave calls this to inform the untrusted app that there's a new * chunk to send. The return value is the frame the enclave should use * to store the next (encrypted) chunk of this message, or NULL if this * was the last chunk. */ uint8_t *ocall_chunk(nodenum_t node_num, uint8_t *chunkdata, uint32_t chunklen) { assert(g_netio != NULL); NodeIO &node = g_netio->node(node_num); bool morechunks = node.send_chunk(chunkdata, chunklen); if (morechunks) { return node.request_frame(); } return NULL; }