| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603 | #include <iostream>#include "Enclave_u.h"#include "Untrusted.hpp"#include "net.hpp"// The command type byte values#define COMMAND_EPOCH 0x00#define COMMAND_MESSAGE 0x01#define COMMAND_CHUNK 0x02#define VERBOSE_NET// #define DEBUG_NET_CLIENTS#define CEILDIV(x,y) (((x)+(y)-1)/(y))NetIO *g_netio = NULL;size_t client_count = 0;NodeIO::NodeIO(tcp::socket &&socket, nodenum_t nodenum) :    sock(std::move(socket)), node_num(nodenum), msgsize_inflight(0),    chunksize_inflight(0), recv_msgsize_inflight(0),    recv_chunksize_inflight(0), bytes_sent(0){}uint8_t *NodeIO::request_frame(){    if (frames_available.empty()) {        // Allocate a new frame.  Note that this memory will (at this        // time) never get deallocated.  In theory, we could deallocate        // it in return_frame, but if a certain number of frames were        // allocated here, it means we had that much data in flight        // (queued but not accepted for sending by the OS), and we're        // likely to need that much again.  Subsequent messages will        // _reuse_ the allocated data, though, so the used memory won't        // grow forever, and will be limited to the amount of in-flight        // data needed.        return new uint8_t[FRAME_SIZE];    }    // Copy the pointer to the frame out of the deque and remove it from    // the deque.  Note this is _not_ taking the address of the element    // *in* the deque (and then popping it, which would invalidate that    // pointer).    frame_deque_lock.lock();    uint8_t *frame = frames_available.back();    frames_available.pop_back();    frame_deque_lock.unlock();    return frame;}void NodeIO::return_frame(uint8_t *frame){    if (!frame) return;    // We push the frame back on to the end of the deque so that it will    // be the next one used.  This may lead to better cache behaviour?    frame_deque_lock.lock();    frames_available.push_back(frame);    frame_deque_lock.unlock();}void NodeIO::send_header_data(uint64_t header, uint8_t *data, size_t len){    commands_deque_lock.lock();    commands_inflight.push_back({header, data, len});    if (commands_inflight.size() == 1) {        async_send_commands();    }    commands_deque_lock.unlock();}void NodeIO::async_send_commands(){    std::vector<boost::asio::const_buffer> tosend;    CommandTuple *commandp = &(commands_inflight.front());    tosend.push_back(boost::asio::buffer(&(std::get<0>(*commandp)), 5));    if (std::get<1>(*commandp) != NULL && std::get<2>(*commandp) > 0) {        tosend.push_back(boost::asio::buffer(std::get<1>(*commandp),            std::get<2>(*commandp)));    }    boost::asio::async_write(sock, tosend,        [this, commandp](boost::system::error_code, std::size_t){            // When the write completes, pop the command from the deque            // (which should now be in the front)            commands_deque_lock.lock();            assert(!commands_inflight.empty() &&                &(commands_inflight.front()) == commandp);            bytes_sent = bytes_sent + 5 + std::get<2>(*commandp);            uint8_t *data = std::get<1>(*commandp);            commands_inflight.pop_front();            if (commands_inflight.size() > 0) {                async_send_commands();            }            // And return the frame            return_frame(data);            commands_deque_lock.unlock();        });}void NodeIO::send_epoch(uint32_t epoch_num){    uint64_t header = (uint64_t(epoch_num) << 8) + COMMAND_EPOCH;    send_header_data(header, NULL, 0);}void NodeIO::send_message_header(uint32_t tot_message_len){    uint64_t header = (uint64_t(tot_message_len) << 8) + COMMAND_MESSAGE;    send_header_data(header, NULL, 0);    // If we're sending a new message header, we have to have finished    // sending the previous message.    assert(chunksize_inflight == msgsize_inflight);    msgsize_inflight = tot_message_len;    chunksize_inflight = 0;}bool NodeIO::send_chunk(uint8_t *data, uint32_t chunk_len){    assert(chunk_len <= FRAME_SIZE);    uint64_t header = (uint64_t(chunk_len) << 8) + COMMAND_CHUNK;    send_header_data(header, data, chunk_len);    chunksize_inflight += chunk_len;    assert(chunksize_inflight <= msgsize_inflight);    return (chunksize_inflight < msgsize_inflight);}void NodeIO::recv_commands(    std::function<void(boost::system::error_code)> error_cb,    std::function<void(uint32_t)> epoch_cb){    // Asynchronously read the header    receive_header = 0;    boost::asio::async_read(sock, boost::asio::buffer(&receive_header, 5),        [this, error_cb, epoch_cb]        (boost::system::error_code ec, std::size_t) {            if (ec) {                error_cb(ec);                return;            }            if ((receive_header & 0xff) == COMMAND_EPOCH) {                epoch_cb(uint32_t(receive_header >> 8));                recv_commands(error_cb, epoch_cb);            } else if ((receive_header & 0xff) == COMMAND_MESSAGE) {                assert(recv_msgsize_inflight == recv_chunksize_inflight);                recv_msgsize_inflight = uint32_t(receive_header >> 8);                recv_chunksize_inflight = 0;                if (ecall_message(node_num, recv_msgsize_inflight)) {                    recv_commands(error_cb, epoch_cb);                } else {                    printf("ecall_message failed\n");                }            } else if ((receive_header & 0xff) == COMMAND_CHUNK) {                uint32_t this_chunk_size = uint32_t(receive_header >> 8);                assert(recv_chunksize_inflight + this_chunk_size <=                    recv_msgsize_inflight);                recv_chunksize_inflight += this_chunk_size;                boost::asio::async_read(sock, boost::asio::buffer(                    receive_frame, this_chunk_size),                    [this, error_cb, epoch_cb, this_chunk_size]                    (boost::system::error_code ecc, std::size_t) {                        if (ecc) {                            error_cb(ecc);                            return;                        }                        if (ecall_chunk(node_num, receive_frame,                                this_chunk_size)) {                            recv_commands(error_cb, epoch_cb);                        } else {                            printf("ecall_chunk failed\n");                        }                    });            } else {                error_cb(boost::system::errc::make_error_code(                    boost::system::errc::errc_t::invalid_argument));            }        });}uint64_t NodeIO::reset_bytes_sent(){    uint64_t b_sent = bytes_sent;    bytes_sent = 0;    return b_sent;}uint64_t NetIO::reset_bytes_sent(){    uint64_t total=0;    for(size_t i = 0; i<nodeios.size(); i++) {        if(nodeios[i].has_value()) {            total+=((nodeios[i].value()).reset_bytes_sent());        }    }    return total;}/*    Receive clients dropped off messages, i.e. a CLIENT_MESSAGE_BUNDLE*/void NetIO::ing_receive_msgbundle(tcp::socket* csocket, clientid_t c_simid){    unsigned char *msgbundle = (unsigned char*) malloc(msgbundle_size);    boost::asio::async_read(*csocket, boost::asio::buffer(msgbundle, msgbundle_size),        [this, csocket, msgbundle, c_simid]        (boost::system::error_code ec, std::size_t) {        if (ec) {            if(ec == boost::asio::error::eof) {                // Client connection terminated so we delete this socket                delete(csocket);            }            else {                printf("Error ing_receive_msgbundle : %s\n", ec.message().c_str());            }            return;        }        bool ret;        //Ingest the message_bundle        if(conf.private_routing) {            ret = ecall_ingest_msgbundle(c_simid, msgbundle, conf.m_priv_out);        } else {            ret = ecall_ingest_msgbundle(c_simid, msgbundle, conf.m_pub_out);        }        free(msgbundle);        // Continue to async receive client message bundles        ing_receive_msgbundle(csocket, c_simid);    });}/*    Handle new client connections.    New clients always send an authentication message.    For ingestion this is then followed by their msg_bundles every epoch.*/void NetIO::ing_authenticate_new_client(tcp::socket* csocket,    const boost::system::error_code& error){    if(error) {        printf("Accept handler failed\n");        return;    }#ifdef DEBUG_NET_CLIENTS        printf("Accept handler success\n");#endif    unsigned char* auth_message = (unsigned char*) malloc(auth_size);    boost::asio::async_read(*csocket, boost::asio::buffer(auth_message, auth_size),        [this, csocket, auth_message]        (boost::system::error_code ec, std::size_t) {        if (ec) {            if(ec == boost::asio::error::eof) {                // Client connection terminated so we delete this socket                delete(csocket);            } else {                printf("Error ing_auth_new_client : %s\n", ec.message().c_str());            }            return;        }        else {            clientid_t c_simid = *((clientid_t *)(auth_message));            // Read the authentication token            unsigned char *auth_ptr = auth_message + sizeof(clientid_t);            bool ret = ecall_authenticate(c_simid, auth_ptr);            free(auth_message);            // Receive client message bundles on this socket            // for client sim_id c_simid            if(ret) {                client_count++;                ing_receive_msgbundle(csocket, c_simid);            } else{                printf("Client <-> Ingestion authentication failed\n");                delete(csocket);            }        }    });    ing_start_accept();}/*    Handle new client connections.    New clients always send an authentication message.    For storage this is then followed by the storage servers sending them    their mailbox every epoch.*/void NetIO::stg_authenticate_new_client(tcp::socket* csocket,    const boost::system::error_code& error){    if(error) {        printf("Accept handler failed\n");        return;    }#ifdef DEBUG_NET_CLIENTS        printf("Accept handler success\n");#endif    unsigned char* auth_message = (unsigned char*) malloc(auth_size);    boost::asio::async_read(*csocket, boost::asio::buffer(auth_message, auth_size),        [this, csocket, auth_message]        (boost::system::error_code ec, std::size_t) {        if (ec) {            if(ec == boost::asio::error::eof) {                // Client connection terminated so we delete this socket                delete(csocket);            } else {                printf("Error stg_auth_new_client: %s\n", ec.message().c_str());            }            return;        }        else {            clientid_t c_simid = *((clientid_t *)(auth_message));            // Read the authentication token            unsigned char *auth_ptr = auth_message + sizeof(clientid_t);            bool ret = ecall_storage_authenticate(c_simid, auth_ptr);            free(auth_message);            // If the auth is successful, store this socket into            // a client socket array at the local_c_simid index            // for storage servers to send clients their mailbox periodically.            if(ret) {                uint32_t lcid = c_simid / num_stg_nodes;                client_sockets[lcid] = csocket;            }            else{                printf("Client <-> Storage authentication failed\n");                delete (csocket);            }        }    });    stg_start_accept();}/*  Asynchronously accept new client connections*/void NetIO::ing_start_accept(){    tcp::socket *csocket = new tcp::socket(io_context());#ifdef DEBUG_NET_CLIENTS    std::cout << "Accepting on " << myconf.clistenhost << ":" << myconf.clistenport << "\n";#endif    ingestion_acceptor->async_accept(*csocket,        boost::bind(&NetIO::ing_authenticate_new_client, this, csocket,        boost::asio::placeholders::error));}void NetIO::stg_start_accept(){    tcp::socket *csocket = new tcp::socket(io_context());#ifdef DEBUG_NET_CLIENTS    std::cout << "Accepting on " << myconf.slistenhost << ":" << myconf.slistenport << "\n";#endif    storage_acceptor->async_accept(*csocket,        boost::bind(&NetIO::stg_authenticate_new_client, this, csocket,        boost::asio::placeholders::error));}void NetIO::send_client_mailbox(){    // Send each client their tokens for the next epoch    for(uint32_t lcid = 0; lcid < num_clients_per_stg; lcid++)    {        unsigned char *tkn_ptr = epoch_tokens + lcid * token_bundle_size;        unsigned char *buf_ptr = epoch_mailboxes + lcid * mailbox_size;        if(client_sockets[lcid]!=nullptr) {            boost::asio::async_write(*(client_sockets[lcid]),                boost::asio::buffer(tkn_ptr, token_bundle_size),                [this, lcid, buf_ptr](boost::system::error_code ec, std::size_t){                if (ec) {                    if(ec == boost::asio::error::eof) {                        // Client connection terminated so we delete this socket                        delete(client_sockets[lcid]);                        printf("Client socket terminated!\n");                    } else {                        printf("Error send_client_mailbox tokens: %s\n", ec.message().c_str());                    }                    return;                }                boost::asio::async_write(*(client_sockets[lcid]),                    boost::asio::buffer(buf_ptr, mailbox_size),                    [this, lcid](boost::system::error_code ecc, std::size_t){                    //printf("NetIO::send_client_mailbox, Client %d messages was sent\n", lcid);                    if (ecc) {                        if(ecc == boost::asio::error::eof) {                            // Client connection terminated so we delete this socket                            delete(client_sockets[lcid]);                            printf("Client socket terminated!\n");                        } else {                            printf("Error send_client_mailbox mailbox (lcid = %d): %s\n",                                lcid, ecc.message().c_str());                        }                        return;                    }                });            });        }        /*        else {            printf("Client did not have a socket!\n");        }        */    }}NetIO::NetIO(boost::asio::io_context &io_context, const Config &config)    : context(io_context), conf(config),      myconf(config.nodes[config.my_node_num]){    num_nodes = nodenum_t(conf.nodes.size());    nodeios.resize(num_nodes);    me = conf.my_node_num;    // Node number n will accept connections from nodes 0, ..., n-1 and    // make connections to nodes n+1, ..., num_nodes-1.  This is all    // single threaded, but it doesn't deadlock because node 0 isn't    // waiting for any incoming connections, so it immediately makes    // outgoing connections.  When it connects to node 1, that node    // accepts its (only) incoming connection, and then starts making    // its outgoing connections, etc.    tcp::resolver resolver(io_context);    tcp::acceptor acceptor(io_context,        resolver.resolve(myconf.listenhost, myconf.listenport)->endpoint());    for(size_t i=0; i<me; ++i) {#ifdef VERBOSE_NET        std::cerr << "Accepting number " << i << "\n";#endif        tcp::socket nodesock = acceptor.accept();#ifdef VERBOSE_NET        std::cerr << "Accepted number " << i << "\n";#endif        // Read 2 bytes from the socket, which will be the        // connecting node's node number        unsigned short node_num;        boost::asio::read(nodesock,            boost::asio::buffer(&node_num, sizeof(node_num)));        if (node_num >= num_nodes) {            std::cerr << "Received bad node number\n";        } else {            nodeios[node_num].emplace(std::move(nodesock), node_num);#ifdef VERBOSE_NET            std::cerr << "Received connection from " <<                config.nodes[node_num].name << "\n";#endif        }    }    for(size_t i=me+1; i<num_nodes; ++i) {        boost::system::error_code err;        tcp::socket nodesock(io_context);        while(1) {#ifdef VERBOSE_NET            std::cerr << "Connecting to " << config.nodes[i].name << "...\n";#endif            boost::asio::connect(nodesock,                resolver.resolve(config.nodes[i].listenhost,                    config.nodes[i].listenport), err);            if (!err) break;            std::cerr << "Connection to " << config.nodes[i].name <<                " refused, will retry.\n";            sleep(1);        }        // Write 2 bytes to the socket to tell the peer node our node        // number        nodenum_t node_num = (nodenum_t)me;        boost::asio::write(nodesock,            boost::asio::buffer(&node_num, sizeof(node_num)));        nodeios[i].emplace(std::move(nodesock), i);#ifdef VERBOSE_NET        std::cerr << "Connected to " << config.nodes[i].name << "\n";#endif    }    auth_size = sizeof(clientid_t) + sizeof(unsigned long) + SGX_AESGCM_KEY_SIZE;    uint16_t priv_out, priv_in, pub_in;    if(config.private_routing) {        priv_out = conf.m_priv_out;        priv_in = conf.m_priv_in;        msgbundle_size = SGX_AESGCM_IV_SIZE            + (conf.m_priv_out * (conf.msg_size + TOKEN_SIZE))            + SGX_AESGCM_MAC_SIZE;        token_bundle_size = ((priv_out * TOKEN_SIZE)            + SGX_AESGCM_IV_SIZE + SGX_AESGCM_MAC_SIZE);        mailbox_size = (priv_in * conf.msg_size) + SGX_AESGCM_IV_SIZE            + SGX_AESGCM_MAC_SIZE;    } else {        pub_in = conf.m_pub_in;        msgbundle_size = SGX_AESGCM_IV_SIZE            + (conf.m_pub_out * conf.msg_size)            + SGX_AESGCM_MAC_SIZE;        mailbox_size = (pub_in * conf.msg_size) + SGX_AESGCM_IV_SIZE            + SGX_AESGCM_MAC_SIZE;    }    if(myconf.roles & ROLE_STORAGE) {        // Setup the client sockets        // Compute no_of_clients per storage_server        uint32_t num_users = config.user_count;        NodeConfig nc;        num_stg_nodes = 0;        for (nodenum_t i=0; i<num_nodes; ++i) {            nc = conf.nodes[i];            if(nc.roles & ROLE_STORAGE) {                num_stg_nodes++;            }        }        num_clients_per_stg = CEILDIV(num_users, num_stg_nodes);        for(uint32_t i = 0; i<num_clients_per_stg; i++) {            client_sockets.emplace_back(nullptr);        }        uint32_t epoch_mailboxes_size = num_clients_per_stg * mailbox_size;        uint32_t epoch_tokens_size = num_clients_per_stg * token_bundle_size;        epoch_mailboxes = (unsigned char *) malloc(epoch_mailboxes_size);        epoch_tokens = (unsigned char *) malloc (epoch_tokens_size);        ecall_supply_storage_buffers(epoch_mailboxes, epoch_mailboxes_size,            epoch_tokens, epoch_tokens_size);        storage_acceptor = std::shared_ptr<tcp::acceptor>(            new tcp::acceptor(io_context,                resolver.resolve(this->myconf.slistenhost,                this->myconf.slistenport)->endpoint()));        stg_start_accept();    }    if(myconf.roles & ROLE_INGESTION) {        ingestion_acceptor = std::shared_ptr<tcp::acceptor>(            new tcp::acceptor(io_context,                resolver.resolve(this->myconf.clistenhost,                this->myconf.clistenport)->endpoint()));        ing_start_accept();    }}void NetIO::recv_commands(    std::function<void(boost::system::error_code)> error_cb,    std::function<void(uint32_t)> epoch_cb){    for (nodenum_t node_num = 0; node_num < num_nodes; ++node_num) {        if (node_num == me) continue;        NodeIO &n = node(node_num);        n.recv_commands(error_cb, epoch_cb);    }}void NetIO::close(){    for (nodenum_t node_num = 0; node_num < num_nodes; ++node_num) {        if (node_num == me) continue;        NodeIO &n = node(node_num);        n.close();    }}/* The enclave calls this to inform the untrusted app that there's a new * messaage to send. The return value is the frame the enclave should * use to store the first (encrypted) chunk of this message. */uint8_t *ocall_message(nodenum_t node_num, uint32_t message_len){    assert(g_netio != NULL);    NodeIO &node = g_netio->node(node_num);    node.send_message_header(message_len);    return node.request_frame();}/* The enclave calls this to inform the untrusted app that there's a new * chunk to send.  The return value is the frame the enclave should use * to store the next (encrypted) chunk of this message, or NULL if this * was the last chunk. */uint8_t *ocall_chunk(nodenum_t node_num, uint8_t *chunkdata,    uint32_t chunklen){    assert(g_netio != NULL);    NodeIO &node = g_netio->node(node_num);    bool morechunks = node.send_chunk(chunkdata, chunklen);    if (morechunks) {        return node.request_frame();    }    return NULL;}
 |