#include #include "Enclave_u.h" #include "Untrusted.hpp" #include "net.hpp" // The command type byte values #define COMMAND_EPOCH 0x00 #define COMMAND_MESSAGE 0x01 #define COMMAND_CHUNK 0x02 NetIO *g_netio = NULL; NodeIO::NodeIO(tcp::socket &&socket, nodenum_t nodenum) : sock(std::move(socket)), node_num(nodenum) { } uint8_t *NodeIO::request_frame() { if (frames_available.empty()) { // Allocate a new frame. Note that this memory will (at this // time) never get deallocated. In theory, we could deallocate // it in return_frame, but if a certain number of frames were // allocated here, it means we had that much data in flight // (queued but not accepted for sending by the OS), and we're // likely to need that much again. Subsequent messages will // _reuse_ the allocated data, though, so the used memory won't // grow forever, and will be limited to the amount of in-flight // data needed. return new uint8_t[FRAME_SIZE]; } // Copy the pointer to the frame out of the deque and remove it from // the deque. Note this is _not_ taking the address of the element // *in* the deque (and then popping it, which would invalidate that // pointer). frame_deque_lock.lock(); uint8_t *frame = frames_available.back(); frames_available.pop_back(); frame_deque_lock.unlock(); return frame; } void NodeIO::return_frame(uint8_t *frame) { if (!frame) return; // We push the frame back on to the end of the deque so that it will // be the next one used. This may lead to better cache behaviour? frame_deque_lock.lock(); frames_available.push_back(frame); frame_deque_lock.unlock(); } void NodeIO::send_header_data(uint64_t header, uint8_t *data, size_t len) { commands_deque_lock.lock(); commands_inflight.push_back({header, data, len}); if (commands_inflight.size() == 1) { async_send_commands(); } commands_deque_lock.unlock(); } void NodeIO::async_send_commands() { std::vector tosend; CommandTuple *commandp = &(commands_inflight.front()); tosend.push_back(boost::asio::buffer(&(std::get<0>(*commandp)), 5)); if (std::get<1>(*commandp) != NULL && std::get<2>(*commandp) > 0) { tosend.push_back(boost::asio::buffer(std::get<1>(*commandp), std::get<2>(*commandp))); } boost::asio::async_write(sock, tosend, [this, commandp](boost::system::error_code, std::size_t){ // When the write completes, pop the command from the deque // (which should now be in the front) commands_deque_lock.lock(); assert(!commands_inflight.empty() && &(commands_inflight.front()) == commandp); uint8_t *data = std::get<1>(*commandp); commands_inflight.pop_front(); if (commands_inflight.size() > 0) { async_send_commands(); } // And return the frame return_frame(data); commands_deque_lock.unlock(); }); } void NodeIO::send_epoch(uint32_t epoch_num) { uint64_t header = (uint64_t(epoch_num) << 8) + COMMAND_EPOCH; send_header_data(header, NULL, 0); } void NodeIO::send_message_header(uint32_t tot_message_len) { uint64_t header = (uint64_t(tot_message_len) << 8) + COMMAND_MESSAGE; send_header_data(header, NULL, 0); // If we're sending a new message header, we have to have finished // sending the previous message. assert(chunksize_inflight == msgsize_inflight); msgsize_inflight = tot_message_len; chunksize_inflight = 0; } bool NodeIO::send_chunk(uint8_t *data, uint32_t chunk_len) { assert(chunk_len <= FRAME_SIZE); uint64_t header = (uint64_t(chunk_len) << 8) + COMMAND_CHUNK; send_header_data(header, data, chunk_len); chunksize_inflight += chunk_len; assert(chunksize_inflight <= msgsize_inflight); return (chunksize_inflight < msgsize_inflight); } void NodeIO::recv_commands( std::function error_cb, std::function epoch_cb) { // Asynchronously read the header receive_header = 0; boost::asio::async_read(sock, boost::asio::buffer(&receive_header, 5), [this, error_cb, epoch_cb] (boost::system::error_code ec, std::size_t) { if (ec) { error_cb(ec); return; } if ((receive_header & 0xff) == COMMAND_EPOCH) { epoch_cb(uint32_t(receive_header >> 8)); recv_commands(error_cb, epoch_cb); } else if ((receive_header & 0xff) == COMMAND_MESSAGE) { assert(recv_msgsize_inflight == recv_chunksize_inflight); recv_msgsize_inflight = uint32_t(receive_header >> 8); recv_chunksize_inflight = 0; if (ecall_message(node_num, recv_msgsize_inflight)) { recv_commands(error_cb, epoch_cb); } else { printf("ecall_message failed\n"); } } else if ((receive_header & 0xff) == COMMAND_CHUNK) { uint32_t this_chunk_size = uint32_t(receive_header >> 8); assert(recv_chunksize_inflight + this_chunk_size <= recv_msgsize_inflight); recv_chunksize_inflight += this_chunk_size; boost::asio::async_read(sock, boost::asio::buffer( receive_frame, this_chunk_size), [this, error_cb, epoch_cb, this_chunk_size] (boost::system::error_code ecc, std::size_t) { if (ecc) { error_cb(ecc); return; } if (ecall_chunk(node_num, receive_frame, this_chunk_size)) { recv_commands(error_cb, epoch_cb); } else { printf("ecall_chunk failed\n"); } }); } else { error_cb(boost::system::errc::make_error_code( boost::system::errc::errc_t::invalid_argument)); } }); } void accept_handler(const boost::system::error_code& error) { if(!error) { printf("Accept handler success\n"); // Read 2 bytes from the socket, which will be the // connecting node's node number //unsigned short node_num; //boost::asio::read(acc_sock, // boost::asio::buffer(&node_num, sizeof(node_num))); //printf("node_num received = %d\n", node_num); } else { } } /* void start_accept(boost::asio::io_context& io_context, const NodeConfig& myconf); void handle_accept(boost::asio::io_context& io_context, const NodeConfig &myconf, tcp_connection::pointer new_connection, const boost::system::error_code& error) { if (!error) { new_connection->start(); } start_accept(io_context, myconf); } void start_accept(boost::asio::io_context& io_context, const NodeConfig& myconf) { tcp::resolver resolver(io_context); tcp::acceptor async_acceptor(io_context, resolver.resolve(myconf.clistenhost, myconf.clistenport)->endpoint()); tcp_connection::pointer new_connection = tcp_connection::create(io_context); async_acceptor.async_accept(new_connection->socket(), boost::bind(&handle_accept, io_context, myconf, new_connection, boost::asio::placeholders::error)); } */ NetIO::NetIO(boost::asio::io_context &io_context, const Config &config) : conf(config), myconf(config.nodes[config.my_node_num]) { num_nodes = nodenum_t(conf.nodes.size()); nodeios.resize(num_nodes); me = conf.my_node_num; // Node number n will accept connections from nodes 0, ..., n-1 and // make connections to nodes n+1, ..., num_nodes-1. This is all // single threaded, but it doesn't deadlock because node 0 isn't // waiting for any incoming connections, so it immediately makes // outgoing connections. When it connects to node 1, that node // accepts its (only) incoming connection, and then starts making // its outgoing connections, etc. tcp::resolver resolver(io_context); tcp::acceptor acceptor(io_context, resolver.resolve(myconf.listenhost, myconf.listenport)->endpoint()); for(size_t i=0; i= num_nodes) { std::cerr << "Received bad node number\n"; } else { nodeios[node_num].emplace(std::move(nodesock), node_num); #ifdef VERBOSE_NET std::cerr << "Received connection from " << config.nodes[node_num].name << "\n"; #endif } } for(size_t i=me+1; iendpoint()); std::shared_ptr new_socket(new tcp::socket(io_context)); std::cout << myconf.clistenhost << ":" << myconf.clistenport; async_acceptor.async_accept(*new_socket, accept_handler); } /* if(me==0) { start_accept(io_context, myconf); } */ } void NetIO::recv_commands( std::function error_cb, std::function epoch_cb) { for (nodenum_t node_num = 0; node_num < num_nodes; ++node_num) { if (node_num == me) continue; NodeIO &n = node(node_num); n.recv_commands(error_cb, epoch_cb); } } void NetIO::close() { for (nodenum_t node_num = 0; node_num < num_nodes; ++node_num) { if (node_num == me) continue; NodeIO &n = node(node_num); n.close(); } } /* The enclave calls this to inform the untrusted app that there's a new * messaage to send. The return value is the frame the enclave should * use to store the first (encrypted) chunk of this message. */ uint8_t *ocall_message(nodenum_t node_num, uint32_t message_len) { assert(g_netio != NULL); NodeIO &node = g_netio->node(node_num); node.send_message_header(message_len); return node.request_frame(); } /* The enclave calls this to inform the untrusted app that there's a new * chunk to send. The return value is the frame the enclave should use * to store the next (encrypted) chunk of this message, or NULL if this * was the last chunk. */ uint8_t *ocall_chunk(nodenum_t node_num, uint8_t *chunkdata, uint32_t chunklen) { assert(g_netio != NULL); NodeIO &node = g_netio->node(node_num); bool morechunks = node.send_chunk(chunkdata, chunklen); if (morechunks) { return node.request_frame(); } return NULL; }