#include #include #include "../App/appconfig.hpp" // The next line suppresses a deprecation warning within boost #define BOOST_BIND_GLOBAL_PLACEHOLDERS #include "boost/property_tree/ptree.hpp" #include "boost/property_tree/json_parser.hpp" #include #include #include "gcm.h" #include "sgx_tcrypto.h" #include "clients.hpp" #include #define CEILDIV(x,y) (((x)+(y)-1)/(y)) Config config; Client *clients; aes_key ESK, TSK; std::vector ingestion_nodes, storage_nodes; std::vector storage_map; std::vector ingestion_map; unsigned long setup_time; uint16_t nthreads = 1; bool token_channel; // Split a hostport string like "127.0.0.1:12000" at the rightmost colon // into a host part "127.0.0.1" and a port part "12000". static bool split_host_port(std::string &host, std::string &port, const std::string &hostport) { size_t colon = hostport.find_last_of(':'); if (colon == std::string::npos) { std::cerr << "Cannot parse \"" << hostport << "\" as host:port\n"; return false; } host = hostport.substr(0, colon); port = hostport.substr(colon+1); return true; } // Convert a single hex character into its value from 0 to 15. Return // true on success, false if it wasn't a hex character. static inline bool hextoval(unsigned char &val, char hex) { if (hex >= '0' && hex <= '9') { val = ((unsigned char)hex)-'0'; } else if (hex >= 'a' && hex <= 'f') { val = ((unsigned char)hex)-'a'+10; } else if (hex >= 'A' && hex <= 'F') { val = ((unsigned char)hex)-'A'+10; } else { return false; } return true; } // Convert a 2*len hex character string into a len-byte buffer. Return // true on success, false on failure. static bool hextobuf(unsigned char *buf, const char *str, size_t len) { if (strlen(str) != 2*len) { std::cerr << "Hex string was not the expected size\n"; return false; } for (size_t i=0;i 16) { remain = 16; } char row[34 + 17 + 2]; memset(row, ' ', sizeof(row)-1); row[sizeof(row)-2] = '\n'; row[sizeof(row)-1] = '\0'; size_t hexoffset = 0; size_t charoffset = 34; for (size_t i=0; i= ' ' && ptr[i] <= '~') { row[charoffset] = ptr[i]; } else { row[charoffset] = '.'; } hexoffset += 2; charoffset += 1; if (i == 7) { hexoffset += 1; charoffset += 1; } } outbuf << row; ptr += remain; } outbuf << "\n"; std::cout << outbuf.str(); } void displayPtMessageBundle(unsigned char *bundle, uint16_t num_out, uint16_t msg_size, uint32_t client) { unsigned char *ptr = bundle; for(int i=0; i &ingestion_nodes, std::vector &storage_nodes, std::vector &storage_map) { bool found_params = false; bool ret = true; std::istringstream configstream(configstr); boost::property_tree::ptree conftree; read_json(configstream, conftree); uint16_t node_num = 0; for (auto & entry : conftree) { if (!entry.first.compare("params")) { for (auto & pentry : entry.second) { if (!pentry.first.compare("msg_size")) { config.msg_size = pentry.second.get_value(); } else if (!pentry.first.compare("user_count")) { config.user_count = pentry.second.get_value(); } else if (!pentry.first.compare("token_out")) { config.m_token_out = pentry.second.get_value(); } else if (!pentry.first.compare("token_in")) { config.m_token_in = pentry.second.get_value(); } else if (!pentry.first.compare("id_out")) { config.m_id_out = pentry.second.get_value(); } else if (!pentry.first.compare("id_in")) { config.m_id_in = pentry.second.get_value(); // A stub hardcoded shared secret to derive various // keys for client <-> server communications and tokens // In reality, this would be a key exchange } else if (!pentry.first.compare("master_secret")) { std::string hex_key = pentry.second.data(); memcpy(config.master_secret, hex_key.c_str(), SGX_AESGCM_KEY_SIZE); } else if (!pentry.first.compare("token_channel")) { config.token_channel = pentry.second.get_value(); } else { std::cerr << "Unknown field in params: " << pentry.first << "\n"; ret = false; } } found_params = true; } else if (!entry.first.compare("nodes")) { for (auto & node : entry.second) { NodeConfig nc; // All nodes need to be assigned their role in manifest.yaml nc.roles = 0; for (auto & nentry : node.second) { if (!nentry.first.compare("name")) { nc.name = nentry.second.get_value(); } else if (!nentry.first.compare("pubkey")) { ret &= hextobuf((unsigned char *)&nc.pubkey, nentry.second.get_value().c_str(), sizeof(nc.pubkey)); } else if (!nentry.first.compare("weight")) { nc.weight = nentry.second.get_value(); } else if (!nentry.first.compare("listen")) { ret &= split_host_port(nc.listenhost, nc.listenport, nentry.second.get_value()); } else if (!nentry.first.compare("clisten")) { ret &= split_host_port(nc.clistenhost, nc.clistenport, nentry.second.get_value()); } else if (!nentry.first.compare("slisten")) { ret &= split_host_port(nc.slistenhost, nc.slistenport, nentry.second.get_value()); } else if (!nentry.first.compare("roles")) { nc.roles = nentry.second.get_value(); } else { std::cerr << "Unknown field in host config: " << nentry.first << "\n"; ret = false; } } if(nc.roles & ROLE_INGESTION) { ingestion_nodes.push_back(nc); ingestion_map.push_back(node_num); } if(nc.roles & ROLE_STORAGE) { storage_nodes.push_back(std::move(nc)); storage_map.push_back(node_num); } node_num++; } } else { std::cerr << "Unknown key in config: " << entry.first << "\n"; ret = false; } } if (!found_params) { std::cerr << "Could not find params in config\n"; ret = false; } return ret; } static void usage(const char *argv0) { fprintf(stderr, "%s [-t nthreads] < config.json\n", argv0); exit(1); } /* Generate ESK (Encryption Secret Key) and TSK (Token Secret Key) */ int generateMasterKeys(sgx_aes_gcm_128bit_key_t master_secret, aes_key &ESK, aes_key &TSK ) { unsigned char zeroes[SGX_AESGCM_KEY_SIZE]; unsigned char iv[SGX_AESGCM_IV_SIZE]; unsigned char mac[SGX_AESGCM_MAC_SIZE]; memset(iv, 0, SGX_AESGCM_IV_SIZE); memset(zeroes, 0, SGX_AESGCM_KEY_SIZE); memcpy(iv, "Encryption", sizeof("Encryption")); if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, master_secret, iv, SGX_AESGCM_IV_SIZE, ESK, mac)) { printf("Client: generateMasterKeys FAIL\n"); return -1; } printf("\n\nEncryption Master Key: "); for(int i=0;iip_str(); boost::asio::ip::address ip_address = boost::asio::ip::address::from_string(ip_str, err); if(err) { printf("initializeStgSocket::Invalid IP address\n"); } storage_sock = new boost::asio::ip::tcp::socket(ioc); storage_sock->open(boost::asio::ip::tcp::v4(), err); while(1) { boost::asio::ip::tcp::endpoint ep(ip_address, port_no); storage_sock->bind(ep, err); if (!err) { break; } else { printf("STG: Error %s. (%s:%d)\n", err.message().c_str(), (curr_ip->ip_str()).c_str(), port_no); port_no++; if(port_no >= PORT_END) { port_no = PORT_START; curr_ip->increment(nthreads); } } } boost::asio::ip::address stg_ip = boost::asio::ip::address::from_string(stg_server.slistenhost, err); boost::asio::ip::tcp::endpoint stg_ep(stg_ip, std::stoi(stg_server.slistenport)); // just for printing // boost::asio::ip::tcp::endpoint ep(ip_address, port_no); storage_sock->connect(stg_ep, err); if (!err) { break; } std::cerr <<"STG: Connection from " << curr_ip->ip_str() << ":" << port_no << " to " << stg_server.name << " refused, will retry.\n"; #ifdef RANDOMIZE_CLIENT_RETRY_SLEEP_TIME int sleep_delay = rand() % 100000; usleep(sleep_delay); #else usleep(1000000); #endif delete storage_sock; storage_sock = nullptr; } } void Client::initializeIngSocket(boost::asio::io_context &ioc, NodeConfig &ing_server, ip_addr *curr_ip, uint16_t &port_no) { boost::system::error_code err; boost::asio::ip::tcp::resolver resolver(ioc); while(1) { #ifdef VERBOSE_CLIENT std::cerr << "Connecting to " << ing_server.name << "...\n"; std::cout << ing_server.clistenhost << ":" << ing_server.clistenport; #endif std::string ip_str = curr_ip->ip_str(); boost::asio::ip::address ip_address = boost::asio::ip::address::from_string(ip_str, err); if(err) { printf("initializeIngSocket::Invalid IP address\n"); } ingestion_sock = new boost::asio::ip::tcp::socket(ioc); ingestion_sock->open(boost::asio::ip::tcp::v4(), err); while(1) { boost::asio::ip::tcp::endpoint ep(ip_address, port_no); ingestion_sock->bind(ep, err); if (!err) { break; } else { printf("ING: Error %s. (%s:%d)\n", err.message().c_str(), (curr_ip->ip_str()).c_str(), port_no); port_no++; if(port_no >= PORT_END) { port_no = PORT_START; curr_ip->increment(nthreads); } } } // just for printing boost::asio::ip::tcp::endpoint ep(ip_address, port_no); boost::asio::ip::address ing_ip = boost::asio::ip::address::from_string(ing_server.clistenhost, err); boost::asio::ip::tcp::endpoint ing_ep(ing_ip, std::stoi(ing_server.clistenport)); //std::cout << "Ing endpoint:" << ing_ep << "\n"; //std::cout<<"ING: Attempting to connect client " << ep << " -> " << ing_ep <<"\n"; ingestion_sock->connect(ing_ep, err); /* boost::asio::connect(*ingestion_sock, resolver.resolve(ing_server.clistenhost, ing_server.clistenport), err); */ if (!err) { //std::cout<<"ING: Connected client " << ep << " -> " << ing_ep <<"\n"; break; } std::cerr << "ING: Connection from " << curr_ip->ip_str() << ":" << port_no << " to " << ing_server.name << " refused, will retry.\n"; #ifdef RANDOMIZE_CLIENT_RETRY_SLEEP_TIME int sleep_delay = rand() % 100000; usleep(sleep_delay); #else usleep(1000000); #endif delete ingestion_sock; ingestion_sock = nullptr; } } /* Populates the buffer pt_msgbundle with a valid message pt_msgbundle. Assumes that it is supplied with a pt_msgbundle buffer of the correct length num_out is either token_out or id_out, depending on whether we're doing token channel or ID channel routing Correct length for pt_msgbundle = (num_out)*(msg_size) + (only for token channel routing) (num_out)*TOKEN_SIZE */ void Client::generateMessageBundle(uint8_t num_out, uint32_t msg_size, unsigned char *pt_msgbundle) { unsigned char *ptr = pt_msgbundle; // Setup message pt_msgbundle for(uint32_t i = 0; i < num_out; i++) { // For benchmarking, each client just sends messages to // themselves, so the destination and source ids are the same. // Receiver id uint32_t rid = id; #if 0 uint32_t dest_uid_mask = (1 << DEST_UID_BITS) - 1; if (!token_channel) { // If we're testing ID channel routing, have each user send a // message to the user with the same local id, but on // storage server 0. rid &= dest_uid_mask; } #endif unsigned char *start_ptr = ptr; memcpy(ptr, &rid, sizeof(rid)); ptr += sizeof(rid); // Priority (for ID channel routing only) if (!token_channel) { uint32_t priority = 0; #ifdef SHOW_RECEIVED_MESSAGES // If we're testing ID channel routing, set the priority so that // messages to different users will have the highest // priority messages sent from users at different servers uint32_t id_low_bits = id & ((1 << DEST_STORAGE_NODE_BITS) - 1); priority = id ^ (id_low_bits << DEST_UID_BITS); #endif memcpy(ptr, &priority, sizeof(priority)); ptr += sizeof(priority); } // Sender id is our id uint32_t sid = id; memcpy(ptr, &sid, sizeof(sid)); ptr += sizeof(sid); uint32_t remaining_message_size = start_ptr + msg_size - ptr; memset(ptr, 0, remaining_message_size); #ifdef SHOW_RECEIVED_MESSAGES snprintf((char *)ptr, remaining_message_size, "From %08x to %08x", id, rid); #endif ptr+=(remaining_message_size); } if(token_channel) { // Add the tokens for this msgbundle memcpy(ptr, token_list, config.m_token_out * TOKEN_SIZE); } } bool Client::encryptMessageBundle(uint32_t enc_bundle_size, unsigned char *pt_msgbundle, unsigned char *enc_msgbundle) { // Encrypt the pt_msgbundle unsigned char *pt_msgbundle_start = pt_msgbundle; unsigned char *enc_msgbundle_start = enc_msgbundle + SGX_AESGCM_IV_SIZE; unsigned char *enc_tag = enc_msgbundle + enc_bundle_size - SGX_AESGCM_MAC_SIZE; size_t bytes_to_encrypt = enc_bundle_size - SGX_AESGCM_MAC_SIZE - SGX_AESGCM_IV_SIZE; if (bytes_to_encrypt != gcm_encrypt(pt_msgbundle_start, bytes_to_encrypt, NULL, 0, ing_key, ing_iv, SGX_AESGCM_IV_SIZE, enc_msgbundle_start, enc_tag)) { printf("Client: encryptMessageBundle FAIL\n"); return 0; } // Copy IV into the bundle memcpy(enc_msgbundle, ing_iv, SGX_AESGCM_IV_SIZE); // Update IV uint64_t *iv_ctr = (uint64_t*) ing_iv; (*iv_ctr)+=1; return 1; } #ifdef TRACE_SOCKIO class LimitLogger { std::string label; std::string thrid; struct timeval last_log; size_t num_items; public: LimitLogger(const char *_label): label(_label), last_log({0,0}), num_items(0) { std::stringstream ss; ss << boost::this_thread::get_id(); thrid = ss.str(); } void log() { struct timeval now; gettimeofday(&now, NULL); long elapsedus = (now.tv_sec - last_log.tv_sec) * 1000000 + (now.tv_usec - last_log.tv_usec); if (num_items > 0 && elapsedus > 500000) { printf("%lu.%06lu: Thread %s end %s of %lu items\n", last_log.tv_sec, last_log.tv_usec, thrid.c_str(), label.c_str(), num_items); num_items = 0; } if (num_items == 0) { printf("%lu.%06lu: Thread %s begin %s\n", now.tv_sec, now.tv_usec, thrid.c_str(), label.c_str()); } gettimeofday(&last_log, NULL); ++num_items; } }; static thread_local LimitLogger recvlogger("recv"), queuelogger("queue"), sentlogger("sent"); #endif void Client::sendMessageBundle() { uint16_t token_out = config.m_token_out; uint16_t id_out = config.m_id_out; uint16_t msg_size = config.msg_size; uint32_t send_pt_msgbundle_size, send_enc_msgbundle_size; if(token_channel) { send_pt_msgbundle_size = ptMsgBundleSize(token_out, msg_size); send_enc_msgbundle_size = encMsgBundleSize(token_out, msg_size); } else { send_pt_msgbundle_size = ptPubMsgBundleSize(id_out, msg_size); send_enc_msgbundle_size = encPubMsgBundleSize(id_out, msg_size); } unsigned char *send_pt_msgbundle = (unsigned char*) malloc (send_pt_msgbundle_size); unsigned char *send_enc_msgbundle = (unsigned char*) malloc (send_enc_msgbundle_size); if(token_channel) { generateMessageBundle(token_out, msg_size, send_pt_msgbundle); } else { generateMessageBundle(id_out, msg_size, send_pt_msgbundle); } encryptMessageBundle(send_enc_msgbundle_size, send_pt_msgbundle, send_enc_msgbundle); #ifdef VERBOSE_CLIENT displayPtMessageBundle(send_pt_msgbundle, token_out, msg_size); #endif free(send_pt_msgbundle); #ifdef TRACE_SOCKIO queuelogger.log(); #endif boost::asio::async_write(*ingestion_sock, boost::asio::buffer(send_enc_msgbundle, send_enc_msgbundle_size), [this, send_enc_msgbundle] (boost::system::error_code ecc, std::size_t) { #ifdef TRACE_SOCKIO sentlogger.log(); #endif #ifdef VERBOSE_CLIENT if(sim_id==0){ printf("TEST: Client 0 send their msgbundle\n"); } #endif free(send_enc_msgbundle); if (ecc) { if(ecc == boost::asio::error::eof) { delete storage_sock; storage_sock = nullptr; } else { printf("Client: boost async_write failed for sending " "message bundle\n"); printf("Error %s\n", ecc.message().c_str()); } return; } }); } int Client::sendIngAuthMessage(unsigned long epoch_no) { uint32_t auth_size = sizeof(clientid_t) + sizeof(unsigned long) + SGX_AESGCM_KEY_SIZE; unsigned char *auth_message = (unsigned char*) malloc(auth_size); unsigned char *am_ptr = auth_message; memcpy(am_ptr, &sim_id, sizeof(sim_id)); am_ptr+=sizeof(sim_id); memcpy(am_ptr, &epoch_no, sizeof(unsigned long)); am_ptr+=sizeof(unsigned long); unsigned char zeroes[SGX_AESGCM_KEY_SIZE] = {0}; unsigned char tag[SGX_AESGCM_MAC_SIZE] = {0}; unsigned char epoch_iv[SGX_AESGCM_IV_SIZE] = {0}; memcpy(epoch_iv, &epoch_no, sizeof(epoch_no)); if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, ing_key, epoch_iv, SGX_AESGCM_IV_SIZE, am_ptr, tag)) { printf("sendIngAuthMessage failed\n"); return -1; } #ifdef VERBOSE_CLIENT printf("Client %d auth_message: \n", id); for(int i=0; i=PORT_END) { port_no = PORT_START; } curr_ip.increment(nthreads); #else if(port_no>=PORT_END) { port_no = PORT_START; curr_ip.increment(nthreads); } #endif uint16_t ing_no = i % num_ing_nodes; uint16_t stg_no = i % num_stg_nodes; uint16_t stg_node_id = storage_map[stg_no]; uint16_t ing_node_id = ingestion_map[ing_no]; clients[i].setup_client(io_context, i, ing_node_id, stg_node_id, &curr_ip, port_no); } printf("Done with all client_setup calls. Thread_no = %d\n", thread_no); } /* Epochs are server driven. In a single epoch, each client waits to receive from their storage server (i) a token bundle for this epoch and (ii) their messages from the last epoch The client then sends their messages for this epoch to their ingestion servers using the tokens they received in this epoch */ void Client::epoch_process() { uint32_t pt_token_size = uint32_t(config.m_token_out) * TOKEN_SIZE; uint32_t token_bundle_size = pt_token_size + SGX_AESGCM_IV_SIZE + SGX_AESGCM_MAC_SIZE; unsigned char *enc_tokens = nullptr; uint16_t num_in = config.m_id_in; std::vector toreceive; if (token_channel) { enc_tokens = (unsigned char*) malloc (token_bundle_size); toreceive.push_back(boost::asio::buffer(enc_tokens, token_bundle_size)); num_in = config.m_token_in; } uint16_t msg_size = config.msg_size; uint32_t recv_pt_mailbox_size = ptMailboxSize(num_in, msg_size); uint32_t recv_enc_mailbox_size = encMailboxSize(num_in, msg_size); unsigned char *recv_pt_mailbox = (unsigned char*) malloc (recv_pt_mailbox_size); unsigned char *recv_enc_mailbox = (unsigned char*) malloc (recv_enc_mailbox_size); toreceive.push_back(boost::asio::buffer(recv_enc_mailbox, recv_enc_mailbox_size)); // Async read the encrypted tokens (for token channel routing only) and // encrypted mailbox (both token and ID channels) for this // epoch boost::asio::async_read(*storage_sock, toreceive, [this, enc_tokens, token_bundle_size, pt_token_size, num_in, recv_pt_mailbox, recv_enc_mailbox, recv_pt_mailbox_size] (boost::system::error_code ec, std::size_t) { if (ec) { if(ec == boost::asio::error::eof) { delete storage_sock; storage_sock = nullptr; } else { printf("Error %s\n", ec.message().c_str()); printf("Client::epoch_process boost " "async_read_tokens failed\n"); } free(enc_tokens); free(recv_pt_mailbox); free(recv_enc_mailbox); return; } #ifdef TRACE_SOCKIO recvlogger.log(); #endif #ifdef VERBOSE_CLIENT if(sim_id == 0) { printf("TEST: Client 0: Encrypted token bundle received:\n"); for(uint32_t i = 0; i < token_bundle_size; i++) { printf("%02x", enc_tokens[i]); } printf("\n"); printf("TEST: Client 0: Encrypted msgbundle received\n"); } #endif if (token_channel) { // Decrypt the token bundle unsigned char *enc_tkn_ptr = enc_tokens + SGX_AESGCM_IV_SIZE; unsigned char *enc_tkn_tag = enc_tokens + SGX_AESGCM_IV_SIZE + pt_token_size; int decrypted_bytes = gcm_decrypt(enc_tkn_ptr, pt_token_size, NULL, 0, enc_tkn_tag, (unsigned char*) &(this->stg_key), enc_tokens, SGX_AESGCM_IV_SIZE, (unsigned char*) (this->token_list)); if (decrypted_bytes != pt_token_size) { printf("Client::epoch_process gcm_decrypt tokens failed. " "decrypted_bytes = %d\n", decrypted_bytes); } free(enc_tokens); /* unsigned char *tkn_ptr = (unsigned char*) this->token_list; if(sim_id==0) { printf("TEST: Client 0: Decrypted client tokens:\n"); for(int i = 0; i < pt_token_size; i++) { printf("%02x", tkn_ptr[i]); } printf("\n"); } */ } // Do whatever processing with the received messages here // but for the benchmark, we just ignore the received // messages (unless we want to print them) #ifdef SHOW_RECEIVED_MESSAGES unsigned char *recv_enc_mailbox_ptr = recv_enc_mailbox + SGX_AESGCM_IV_SIZE; unsigned char *recv_enc_mailbox_tag = recv_enc_mailbox + SGX_AESGCM_IV_SIZE + recv_pt_mailbox_size; int decrypted_bytes = gcm_decrypt(recv_enc_mailbox_ptr, recv_pt_mailbox_size, NULL, 0, recv_enc_mailbox_tag, (unsigned char*) &(this->stg_key), recv_enc_mailbox, SGX_AESGCM_IV_SIZE, recv_pt_mailbox); if (decrypted_bytes != recv_pt_mailbox_size) { printf("Client::epoch_process gcm_decrypt mailbox failed. " "decrypted_bytes = %d\n", decrypted_bytes); } if (sim_id < 32) { displayPtMessageBundle(recv_pt_mailbox, num_in, config.msg_size, id); } #endif free(recv_enc_mailbox); free(recv_pt_mailbox); // Send this epoch's message bundle sendMessageBundle(); epoch_process(); }); } void initializeClients(boost::asio::io_context &io_context, uint16_t nthreads) { std::vector threads; uint32_t num_clients_total = config.user_count; size_t clients_per_thread = CEILDIV(num_clients_total, nthreads); // Generate all the clients for the experiment for(int i=0; i threads; for (int i=0; i