#include #include #include "../App/appconfig.hpp" // The next line suppresses a deprecation warning within boost #define BOOST_BIND_GLOBAL_PLACEHOLDERS #include "boost/property_tree/ptree.hpp" #include "boost/property_tree/json_parser.hpp" #include #include #include "gcm.h" #include "sgx_tcrypto.h" #include "clients.hpp" #include #define CEILDIV(x,y) (((x)+(y)-1)/(y)) Config config; Client *clients; aes_key ESK, TSK; std::vector ingestion_nodes, storage_nodes; std::vector storage_map; std::vector ingestion_map; unsigned long setup_time; uint16_t nthreads = 1; bool private_routing; // Split a hostport string like "127.0.0.1:12000" at the rightmost colon // into a host part "127.0.0.1" and a port part "12000". static bool split_host_port(std::string &host, std::string &port, const std::string &hostport) { size_t colon = hostport.find_last_of(':'); if (colon == std::string::npos) { std::cerr << "Cannot parse \"" << hostport << "\" as host:port\n"; return false; } host = hostport.substr(0, colon); port = hostport.substr(colon+1); return true; } // Convert a single hex character into its value from 0 to 15. Return // true on success, false if it wasn't a hex character. static inline bool hextoval(unsigned char &val, char hex) { if (hex >= '0' && hex <= '9') { val = ((unsigned char)hex)-'0'; } else if (hex >= 'a' && hex <= 'f') { val = ((unsigned char)hex)-'a'+10; } else if (hex >= 'A' && hex <= 'F') { val = ((unsigned char)hex)-'A'+10; } else { return false; } return true; } // Convert a 2*len hex character string into a len-byte buffer. Return // true on success, false on failure. static bool hextobuf(unsigned char *buf, const char *str, size_t len) { if (strlen(str) != 2*len) { std::cerr << "Hex string was not the expected size\n"; return false; } for (size_t i=0;i &ingestion_nodes, std::vector &storage_nodes, std::vector &storage_map) { bool found_params = false; bool ret = true; std::istringstream configstream(configstr); boost::property_tree::ptree conftree; read_json(configstream, conftree); uint16_t node_num = 0; for (auto & entry : conftree) { if (!entry.first.compare("params")) { for (auto & pentry : entry.second) { if (!pentry.first.compare("msg_size")) { config.msg_size = pentry.second.get_value(); } else if (!pentry.first.compare("user_count")) { config.user_count = pentry.second.get_value(); } else if (!pentry.first.compare("priv_out")) { config.m_priv_out = pentry.second.get_value(); } else if (!pentry.first.compare("priv_in")) { config.m_priv_in = pentry.second.get_value(); } else if (!pentry.first.compare("pub_out")) { config.m_pub_out = pentry.second.get_value(); } else if (!pentry.first.compare("pub_in")) { config.m_pub_in = pentry.second.get_value(); // A stub hardcoded shared secret to derive various // keys for client <-> server communications and tokens // In reality, this would be a key exchange } else if (!pentry.first.compare("master_secret")) { std::string hex_key = pentry.second.data(); memcpy(config.master_secret, hex_key.c_str(), SGX_AESGCM_KEY_SIZE); } else if (!pentry.first.compare("private_routing")) { config.private_routing = pentry.second.get_value(); } else { std::cerr << "Unknown field in params: " << pentry.first << "\n"; ret = false; } } found_params = true; } else if (!entry.first.compare("nodes")) { for (auto & node : entry.second) { NodeConfig nc; // All nodes need to be assigned their role in manifest.yaml nc.roles = 0; for (auto & nentry : node.second) { if (!nentry.first.compare("name")) { nc.name = nentry.second.get_value(); } else if (!nentry.first.compare("pubkey")) { ret &= hextobuf((unsigned char *)&nc.pubkey, nentry.second.get_value().c_str(), sizeof(nc.pubkey)); } else if (!nentry.first.compare("weight")) { nc.weight = nentry.second.get_value(); } else if (!nentry.first.compare("listen")) { ret &= split_host_port(nc.listenhost, nc.listenport, nentry.second.get_value()); } else if (!nentry.first.compare("clisten")) { ret &= split_host_port(nc.clistenhost, nc.clistenport, nentry.second.get_value()); } else if (!nentry.first.compare("slisten")) { ret &= split_host_port(nc.slistenhost, nc.slistenport, nentry.second.get_value()); } else if (!nentry.first.compare("roles")) { nc.roles = nentry.second.get_value(); } else { std::cerr << "Unknown field in host config: " << nentry.first << "\n"; ret = false; } } if(nc.roles & ROLE_INGESTION) { ingestion_nodes.push_back(nc); ingestion_map.push_back(node_num); } if(nc.roles & ROLE_STORAGE) { storage_nodes.push_back(std::move(nc)); storage_map.push_back(node_num); } node_num++; } } else { std::cerr << "Unknown key in config: " << entry.first << "\n"; ret = false; } } if (!found_params) { std::cerr << "Could not find params in config\n"; ret = false; } return ret; } static void usage(const char *argv0) { fprintf(stderr, "%s [-t nthreads] < config.json\n", argv0); exit(1); } /* Generate ESK (Encryption Secret Key) and TSK (Token Secret Key) */ int generateMasterKeys(sgx_aes_gcm_128bit_key_t master_secret, aes_key &ESK, aes_key &TSK ) { unsigned char zeroes[SGX_AESGCM_KEY_SIZE]; unsigned char iv[SGX_AESGCM_IV_SIZE]; unsigned char mac[SGX_AESGCM_MAC_SIZE]; memset(iv, 0, SGX_AESGCM_IV_SIZE); memset(zeroes, 0, SGX_AESGCM_KEY_SIZE); memcpy(iv, "Encryption", sizeof("Encryption")); if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, master_secret, iv, SGX_AESGCM_IV_SIZE, ESK, mac)) { printf("Client: generateMasterKeys FAIL\n"); return -1; } printf("\n\nEncryption Master Key: "); for(int i=0;iip_str(); boost::asio::ip::address ip_address = boost::asio::ip::address::from_string(ip_str, err); if(err) { printf("initializeStgSocket::Invalid IP address\n"); } storage_sock = new boost::asio::ip::tcp::socket(ioc); storage_sock->open(boost::asio::ip::tcp::v4(), err); while(1) { boost::asio::ip::tcp::endpoint ep(ip_address, port_no); storage_sock->bind(ep, err); if (!err) { break; } else { printf("STG: Error %s. (%s:%d)\n", err.message().c_str(), (curr_ip->ip_str()).c_str(), port_no); port_no++; if(port_no >= PORT_END) { port_no = PORT_START; curr_ip->increment(nthreads); } } } boost::asio::ip::address stg_ip = boost::asio::ip::address::from_string(stg_server.slistenhost, err); boost::asio::ip::tcp::endpoint stg_ep(stg_ip, std::stoi(stg_server.slistenport)); // just for printing // boost::asio::ip::tcp::endpoint ep(ip_address, port_no); storage_sock->connect(stg_ep, err); if (!err) { break; } std::cerr <<"STG: Connection from " << curr_ip->ip_str() << ":" << port_no << " to " << stg_server.name << " refused, will retry.\n"; #ifdef RANDOMIZE_CLIENT_RETRY_SLEEP_TIME int sleep_delay = rand() % 100000; usleep(sleep_delay); #else usleep(1000000); #endif delete storage_sock; storage_sock = nullptr; } } void Client::initializeIngSocket(boost::asio::io_context &ioc, NodeConfig &ing_server, ip_addr *curr_ip, uint16_t &port_no) { boost::system::error_code err; boost::asio::ip::tcp::resolver resolver(ioc); while(1) { #ifdef VERBOSE_CLIENT std::cerr << "Connecting to " << ing_server.name << "...\n"; std::cout << ing_server.clistenhost << ":" << ing_server.clistenport; #endif std::string ip_str = curr_ip->ip_str(); boost::asio::ip::address ip_address = boost::asio::ip::address::from_string(ip_str, err); if(err) { printf("initializeIngSocket::Invalid IP address\n"); } ingestion_sock = new boost::asio::ip::tcp::socket(ioc); ingestion_sock->open(boost::asio::ip::tcp::v4(), err); while(1) { boost::asio::ip::tcp::endpoint ep(ip_address, port_no); ingestion_sock->bind(ep, err); if (!err) { break; } else { printf("ING: Error %s. (%s:%d)\n", err.message().c_str(), (curr_ip->ip_str()).c_str(), port_no); port_no++; if(port_no >= PORT_END) { port_no = PORT_START; curr_ip->increment(nthreads); } } } // just for printing boost::asio::ip::tcp::endpoint ep(ip_address, port_no); boost::asio::ip::address ing_ip = boost::asio::ip::address::from_string(ing_server.clistenhost, err); boost::asio::ip::tcp::endpoint ing_ep(ing_ip, std::stoi(ing_server.clistenport)); //std::cout << "Ing endpoint:" << ing_ep << "\n"; //std::cout<<"ING: Attempting to connect client " << ep << " -> " << ing_ep <<"\n"; ingestion_sock->connect(ing_ep, err); /* boost::asio::connect(*ingestion_sock, resolver.resolve(ing_server.clistenhost, ing_server.clistenport), err); */ if (!err) { //std::cout<<"ING: Connected client " << ep << " -> " << ing_ep <<"\n"; break; } std::cerr << "ING: Connection from " << curr_ip->ip_str() << ":" << port_no << " to " << ing_server.name << " refused, will retry.\n"; #ifdef RANDOMIZE_CLIENT_RETRY_SLEEP_TIME int sleep_delay = rand() % 100000; usleep(sleep_delay); #else usleep(1000000); #endif delete ingestion_sock; ingestion_sock = nullptr; } } /* Populates the buffer pt_msgbundle with a valid message pt_msgbundle. Assumes that it is supplied with a pt_msgbundle buffer of the correct length num_out is either priv_out or pub_out, depending on whether we're doing private or public routing Correct length for pt_msgbundle = (num_out)*(msg_size) + (only for private routing) (num_out)*TOKEN_SIZE */ void Client::generateMessageBundle(uint8_t num_out, uint32_t msg_size, unsigned char *pt_msgbundle) { unsigned char *ptr = pt_msgbundle; // Setup message pt_msgbundle for(uint32_t i = 0; i < num_out; i++) { // For benchmarking, each client just sends messages to // themselves, so the destination and source ids are the same. // Destination id unsigned char *start_ptr = ptr; memcpy(ptr, &id, sizeof(id)); ptr+=(sizeof(id)); // Priority (for public routing only) if (!private_routing) { memset(ptr, 0, sizeof(uint32_t)); ptr+=sizeof(uint32_t); } // Source id memcpy(ptr, &id, sizeof(id)); ptr+=(sizeof(id)); uint32_t remaining_message_size = start_ptr + msg_size - ptr; memset(ptr, 0, remaining_message_size); ptr+=(remaining_message_size); } if(private_routing) { // Add the tokens for this msgbundle memcpy(ptr, token_list, config.m_priv_out * TOKEN_SIZE); } } bool Client::encryptMessageBundle(uint32_t enc_bundle_size, unsigned char *pt_msgbundle, unsigned char *enc_msgbundle) { // Encrypt the pt_msgbundle unsigned char *pt_msgbundle_start = pt_msgbundle; unsigned char *enc_msgbundle_start = enc_msgbundle + SGX_AESGCM_IV_SIZE; unsigned char *enc_tag = enc_msgbundle + enc_bundle_size - SGX_AESGCM_MAC_SIZE; size_t bytes_to_encrypt = enc_bundle_size - SGX_AESGCM_MAC_SIZE - SGX_AESGCM_IV_SIZE; if (bytes_to_encrypt != gcm_encrypt(pt_msgbundle_start, bytes_to_encrypt, NULL, 0, ing_key, ing_iv, SGX_AESGCM_IV_SIZE, enc_msgbundle_start, enc_tag)) { printf("Client: encryptMessageBundle FAIL\n"); return 0; } // Copy IV into the bundle memcpy(enc_msgbundle, ing_iv, SGX_AESGCM_IV_SIZE); // Update IV uint64_t *iv_ctr = (uint64_t*) ing_iv; (*iv_ctr)+=1; return 1; } #ifdef TRACE_SOCKIO class LimitLogger { std::string label; std::string thrid; struct timeval last_log; size_t num_items; public: LimitLogger(const char *_label): label(_label), last_log({0,0}), num_items(0) { std::stringstream ss; ss << boost::this_thread::get_id(); thrid = ss.str(); } void log() { struct timeval now; gettimeofday(&now, NULL); long elapsedus = (now.tv_sec - last_log.tv_sec) * 1000000 + (now.tv_usec - last_log.tv_usec); if (num_items > 0 && elapsedus > 500000) { printf("%lu.%06lu: Thread %s end %s of %lu items\n", last_log.tv_sec, last_log.tv_usec, thrid.c_str(), label.c_str(), num_items); num_items = 0; } if (num_items == 0) { printf("%lu.%06lu: Thread %s begin %s\n", now.tv_sec, now.tv_usec, thrid.c_str(), label.c_str()); } gettimeofday(&last_log, NULL); ++num_items; } }; static thread_local LimitLogger recvlogger("recv"), queuelogger("queue"), sentlogger("sent"); #endif void Client::sendMessageBundle() { uint16_t priv_out = config.m_priv_out; uint16_t pub_out = config.m_pub_out; uint16_t msg_size = config.msg_size; uint32_t send_pt_msgbundle_size, send_enc_msgbundle_size; if(private_routing) { send_pt_msgbundle_size = ptMsgBundleSize(priv_out, msg_size); send_enc_msgbundle_size = encMsgBundleSize(priv_out, msg_size); } else { send_pt_msgbundle_size = ptPubMsgBundleSize(pub_out, msg_size); send_enc_msgbundle_size = encPubMsgBundleSize(pub_out, msg_size); } unsigned char *send_pt_msgbundle = (unsigned char*) malloc (send_pt_msgbundle_size); unsigned char *send_enc_msgbundle = (unsigned char*) malloc (send_enc_msgbundle_size); if(private_routing) { generateMessageBundle(priv_out, msg_size, send_pt_msgbundle); } else { generateMessageBundle(pub_out, msg_size, send_pt_msgbundle); } encryptMessageBundle(send_enc_msgbundle_size, send_pt_msgbundle, send_enc_msgbundle); #ifdef VERBOSE_CLIENT displayPtMessageBundle(send_pt_msgbundle, priv_out, msg_size); #endif free(send_pt_msgbundle); #ifdef TRACE_SOCKIO queuelogger.log(); #endif boost::asio::async_write(*ingestion_sock, boost::asio::buffer(send_enc_msgbundle, send_enc_msgbundle_size), [this, send_enc_msgbundle] (boost::system::error_code ecc, std::size_t) { #ifdef TRACE_SOCKIO sentlogger.log(); #endif #ifdef VERBOSE_CLIENT if(sim_id==0){ printf("TEST: Client 0 send their msgbundle\n"); } #endif free(send_enc_msgbundle); if (ecc) { if(ecc == boost::asio::error::eof) { delete storage_sock; storage_sock = nullptr; } else { printf("Client: boost async_write failed for sending " "message bundle\n"); printf("Error %s\n", ecc.message().c_str()); } return; } }); } int Client::sendIngAuthMessage(unsigned long epoch_no) { uint32_t auth_size = sizeof(clientid_t) + sizeof(unsigned long) + SGX_AESGCM_KEY_SIZE; unsigned char *auth_message = (unsigned char*) malloc(auth_size); unsigned char *am_ptr = auth_message; memcpy(am_ptr, &sim_id, sizeof(sim_id)); am_ptr+=sizeof(sim_id); memcpy(am_ptr, &epoch_no, sizeof(unsigned long)); am_ptr+=sizeof(unsigned long); unsigned char zeroes[SGX_AESGCM_KEY_SIZE] = {0}; unsigned char tag[SGX_AESGCM_MAC_SIZE] = {0}; unsigned char epoch_iv[SGX_AESGCM_IV_SIZE] = {0}; memcpy(epoch_iv, &epoch_no, sizeof(epoch_no)); if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, ing_key, epoch_iv, SGX_AESGCM_IV_SIZE, am_ptr, tag)) { printf("sendIngAuthMessage failed\n"); return -1; } #ifdef VERBOSE_CLIENT printf("Client %d auth_message: \n", id); for(int i=0; i=PORT_END) { port_no = PORT_START; } curr_ip.increment(nthreads); #else if(port_no>=PORT_END) { port_no = PORT_START; curr_ip.increment(nthreads); } #endif uint16_t ing_no = i % num_ing_nodes; uint16_t stg_no = i % num_stg_nodes; uint16_t stg_node_id = storage_map[stg_no]; uint16_t ing_node_id = ingestion_map[ing_no]; clients[i].setup_client(io_context, i, ing_node_id, stg_node_id, &curr_ip, port_no); } printf("Done with all client_setup calls. Thread_no = %d\n", thread_no); } /* Epochs are server driven. In a single epoch, each client waits to receive from their storage server (i) a token bundle for this epoch and (ii) their messages from the last epoch The client then sends their messages for this epoch to their ingestion servers using the tokens they received in this epoch */ void Client::epoch_process() { uint32_t pt_token_size = uint32_t(config.m_priv_out) * TOKEN_SIZE; uint32_t token_bundle_size = pt_token_size + SGX_AESGCM_IV_SIZE + SGX_AESGCM_MAC_SIZE; unsigned char *enc_tokens = nullptr; uint16_t num_in = config.m_pub_in; std::vector toreceive; if (private_routing) { enc_tokens = (unsigned char*) malloc (token_bundle_size); toreceive.push_back(boost::asio::buffer(enc_tokens, token_bundle_size)); num_in = config.m_priv_in; } uint16_t msg_size = config.msg_size; uint32_t recv_pt_mailbox_size = ptMailboxSize(num_in, msg_size); uint32_t recv_enc_mailbox_size = encMailboxSize(num_in, msg_size); unsigned char *recv_pt_mailbox = (unsigned char*) malloc (recv_pt_mailbox_size); unsigned char *recv_enc_mailbox = (unsigned char*) malloc (recv_enc_mailbox_size); toreceive.push_back(boost::asio::buffer(recv_enc_mailbox, recv_enc_mailbox_size)); // Async read the encrypted tokens (for private routing only) and // encrypted mailbox (both private and public routing) for this // epoch boost::asio::async_read(*storage_sock, toreceive, [this, enc_tokens, token_bundle_size, pt_token_size, recv_pt_mailbox, recv_enc_mailbox] (boost::system::error_code ec, std::size_t) { if (ec) { if(ec == boost::asio::error::eof) { delete storage_sock; storage_sock = nullptr; } else { printf("Error %s\n", ec.message().c_str()); printf("Client::epoch_process boost " "async_read_tokens failed\n"); } free(enc_tokens); free(recv_pt_mailbox); free(recv_enc_mailbox); return; } #ifdef TRACE_SOCKIO recvlogger.log(); #endif #ifdef VERBOSE_CLIENT if(sim_id == 0) { printf("TEST: Client 0: Encrypted token bundle received:\n"); for(uint32_t i = 0; i < token_bundle_size; i++) { printf("%02x", enc_tokens[i]); } printf("\n"); printf("TEST: Client 0: Encrypted msgbundle received\n"); } #endif if (private_routing) { // Decrypt the token bundle unsigned char *enc_tkn_ptr = enc_tokens + SGX_AESGCM_IV_SIZE; unsigned char *enc_tkn_tag = enc_tokens + SGX_AESGCM_IV_SIZE + pt_token_size; int decrypted_bytes = gcm_decrypt(enc_tkn_ptr, pt_token_size, NULL, 0, enc_tkn_tag, (unsigned char*) &(this->stg_key), enc_tokens, SGX_AESGCM_IV_SIZE, (unsigned char*) (this->token_list)); if(decrypted_bytes != pt_token_size) { printf("Client::epoch_process gcm_decrypt tokens failed. " "decrypted_bytes = %d\n", decrypted_bytes); } free(enc_tokens); /* unsigned char *tkn_ptr = (unsigned char*) this->token_list; if(sim_id==0) { printf("TEST: Client 0: Decrypted client tokens:\n"); for(int i = 0; i < pt_token_size; i++) { printf("%02x", tkn_ptr[i]); } printf("\n"); } */ } // Do whatever processing with the received messages here // but for the benchmark, we just ignore the received // messages free(recv_enc_mailbox); free(recv_pt_mailbox); // Send this epoch's message bundle sendMessageBundle(); epoch_process(); }); } void initializeClients(boost::asio::io_context &io_context, uint16_t nthreads) { std::vector threads; uint32_t num_clients_total = config.user_count; size_t clients_per_thread = CEILDIV(num_clients_total, nthreads); // Generate all the clients for the experiment for(int i=0; i threads; for (int i=0; i