#include #include #include "../App/appconfig.hpp" // The next line suppresses a deprecation warning within boost #define BOOST_BIND_GLOBAL_PLACEHOLDERS #include "boost/property_tree/ptree.hpp" #include "boost/property_tree/json_parser.hpp" #include #include #include "gcm.h" #include "sgx_tcrypto.h" #include "clients.hpp" #include #define CEILDIV(x,y) (((x)+(y)-1)/(y)) Config config; Client *clients; aes_key ESK, TSK; std::vector ingestion_nodes, storage_nodes; std::vector storage_map; std::vector ingestion_map; unsigned long setup_time; uint16_t nthreads = 1; bool private_routing; // Split a hostport string like "127.0.0.1:12000" at the rightmost colon // into a host part "127.0.0.1" and a port part "12000". static bool split_host_port(std::string &host, std::string &port, const std::string &hostport) { size_t colon = hostport.find_last_of(':'); if (colon == std::string::npos) { std::cerr << "Cannot parse \"" << hostport << "\" as host:port\n"; return false; } host = hostport.substr(0, colon); port = hostport.substr(colon+1); return true; } // Convert a single hex character into its value from 0 to 15. Return // true on success, false if it wasn't a hex character. static inline bool hextoval(unsigned char &val, char hex) { if (hex >= '0' && hex <= '9') { val = ((unsigned char)hex)-'0'; } else if (hex >= 'a' && hex <= 'f') { val = ((unsigned char)hex)-'a'+10; } else if (hex >= 'A' && hex <= 'F') { val = ((unsigned char)hex)-'A'+10; } else { return false; } return true; } // Convert a 2*len hex character string into a len-byte buffer. Return // true on success, false on failure. static bool hextobuf(unsigned char *buf, const char *str, size_t len) { if (strlen(str) != 2*len) { std::cerr << "Hex string was not the expected size\n"; return false; } for (size_t i=0;i &ingestion_nodes, std::vector &storage_nodes, std::vector &storage_map) { bool found_params = false; bool ret = true; std::istringstream configstream(configstr); boost::property_tree::ptree conftree; read_json(configstream, conftree); uint16_t node_num = 0; for (auto & entry : conftree) { if (!entry.first.compare("params")) { for (auto & pentry : entry.second) { if (!pentry.first.compare("msg_size")) { config.msg_size = pentry.second.get_value(); } else if (!pentry.first.compare("user_count")) { config.user_count = pentry.second.get_value(); } else if (!pentry.first.compare("priv_out")) { config.m_priv_out = pentry.second.get_value(); } else if (!pentry.first.compare("priv_in")) { config.m_priv_in = pentry.second.get_value(); } else if (!pentry.first.compare("pub_out")) { config.m_pub_out = pentry.second.get_value(); } else if (!pentry.first.compare("pub_in")) { config.m_pub_in = pentry.second.get_value(); // A hardcoded shared secret to derive various // keys for client -> server communications and tokens } else if (!pentry.first.compare("master_secret")) { std::string hex_key = pentry.second.data(); memcpy(config.master_secret, hex_key.c_str(), SGX_AESGCM_KEY_SIZE); } else if (!pentry.first.compare("private_routing")) { config.private_routing = pentry.second.get_value(); } else { std::cerr << "Unknown field in params: " << pentry.first << "\n"; ret = false; } } found_params = true; } else if (!entry.first.compare("nodes")) { for (auto & node : entry.second) { NodeConfig nc; // All nodes need to be assigned their role in manifest.yaml nc.roles = 0; for (auto & nentry : node.second) { if (!nentry.first.compare("name")) { nc.name = nentry.second.get_value(); } else if (!nentry.first.compare("pubkey")) { ret &= hextobuf((unsigned char *)&nc.pubkey, nentry.second.get_value().c_str(), sizeof(nc.pubkey)); } else if (!nentry.first.compare("weight")) { nc.weight = nentry.second.get_value(); } else if (!nentry.first.compare("listen")) { ret &= split_host_port(nc.listenhost, nc.listenport, nentry.second.get_value()); } else if (!nentry.first.compare("clisten")) { ret &= split_host_port(nc.clistenhost, nc.clistenport, nentry.second.get_value()); } else if (!nentry.first.compare("slisten")) { ret &= split_host_port(nc.slistenhost, nc.slistenport, nentry.second.get_value()); } else if (!nentry.first.compare("roles")) { nc.roles = nentry.second.get_value(); } else { std::cerr << "Unknown field in host config: " << nentry.first << "\n"; ret = false; } } if(nc.roles & ROLE_INGESTION) { ingestion_nodes.push_back(nc); ingestion_map.push_back(node_num); } if(nc.roles & ROLE_STORAGE) { storage_nodes.push_back(std::move(nc)); storage_map.push_back(node_num); } node_num++; } } else { std::cerr << "Unknown key in config: " << entry.first << "\n"; ret = false; } } if (!found_params) { std::cerr << "Could not find params in config\n"; ret = false; } return ret; } static void usage(const char *argv0) { fprintf(stderr, "%s [-t nthreads] < config.json\n", argv0); exit(1); } /* Generate ESK (Encryption Secret Key) and TSK (Token Secret Key) */ int generateMasterKeys(sgx_aes_gcm_128bit_key_t master_secret, aes_key &ESK, aes_key &TSK ) { unsigned char zeroes[SGX_AESGCM_KEY_SIZE]; unsigned char iv[SGX_AESGCM_IV_SIZE]; unsigned char mac[SGX_AESGCM_MAC_SIZE]; memset(iv, 0, SGX_AESGCM_IV_SIZE); memset(zeroes, 0, SGX_AESGCM_KEY_SIZE); memcpy(iv, "Encryption", sizeof("Encryption")); if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, master_secret, iv, SGX_AESGCM_IV_SIZE, ESK, mac)) { printf("Client: generateMasterKeys FAIL\n"); return -1; } printf("\n\nEncryption Master Key: "); for(int i=0;iip_str(); boost::asio::ip::address ip_address = boost::asio::ip::address::from_string(ip_str, err); if(err) { printf("initializeStgSocket::Invalid IP address\n"); } storage_sock = new boost::asio::ip::tcp::socket(ioc); storage_sock->open(boost::asio::ip::tcp::v4(), err); while(1) { boost::asio::ip::tcp::endpoint ep(ip_address, port_no); storage_sock->bind(ep, err); if(!err) break; else { printf("STG: Error %s. (%s:%d)\n", err.message().c_str(), (curr_ip->ip_str()).c_str(), port_no); port_no++; if(port_no >= PORT_END) { port_no = PORT_START; curr_ip->increment(nthreads); } } } boost::asio::ip::address stg_ip = boost::asio::ip::address::from_string(stg_server.slistenhost, err); boost::asio::ip::tcp::endpoint stg_ep(stg_ip, std::stoi(stg_server.slistenport)); // just for printing // boost::asio::ip::tcp::endpoint ep(ip_address, port_no); storage_sock->connect(stg_ep, err); if (!err) { break; } std::cerr <<"STG: Connection to " << stg_server.name << " refused, will , epoch_noretry.\n"; std::cerr << curr_ip->ip_str() << ":" << port_no << "\n"; #ifdef RANDOMIZE_CLIENT_RETRY_SLEEP_TIME int sleep_delay = rand() % 100000; usleep(sleep_delay); #else usleep(1000000); #endif delete(storage_sock); } } void Client::initializeIngSocket(boost::asio::io_context &ioc, NodeConfig &ing_server, ip_addr *curr_ip, uint16_t &port_no) { boost::system::error_code err; boost::asio::ip::tcp::resolver resolver(ioc); while(1) { #ifdef VERBOSE_CLIENT std::cerr << "Connecting to " << ing_server.name << "...\n"; std::cout << ing_server.clistenhost << ":" << ing_server.clistenport; #endif std::string ip_str = curr_ip->ip_str(); boost::asio::ip::address ip_address = boost::asio::ip::address::from_string(ip_str, err); if(err) { printf("initializeIngSocket::Invalid IP address\n"); } ingestion_sock = new boost::asio::ip::tcp::socket(ioc); ingestion_sock->open(boost::asio::ip::tcp::v4(), err); while(1) { boost::asio::ip::tcp::endpoint ep(ip_address, port_no); ingestion_sock->bind(ep, err); if(!err) break; else { printf("ING: Error %s. (%s:%d)\n", err.message().c_str(), (curr_ip->ip_str()).c_str(), port_no); port_no++; if(port_no >= PORT_END) { port_no = PORT_START; curr_ip->increment(nthreads); } } } // just for printing boost::asio::ip::tcp::endpoint ep(ip_address, port_no); boost::asio::ip::address ing_ip = boost::asio::ip::address::from_string(ing_server.clistenhost, err); boost::asio::ip::tcp::endpoint ing_ep(ing_ip, std::stoi(ing_server.clistenport)); //std::cout << "Ing endpoint:" << ing_ep << "\n"; //std::cout<<"ING: Attempting to connect client " << ep << " -> " << ing_ep <<"\n"; ingestion_sock->connect(ing_ep, err); /* boost::asio::connect(*ingestion_sock, resolver.resolve(ing_server.clistenhost, ing_server.clistenport), err); */ if (!err) { //std::cout<<"ING: Connected client " << ep << " -> " << ing_ep <<"\n"; break; } std::cerr << "ING: Connection to " << ing_server.name << " refused, will , epoch_noretry.\n"; std::cerr << curr_ip->ip_str() << ":" << port_no << "\n"; #ifdef RANDOMIZE_CLIENT_RETRY_SLEEP_TIME int sleep_delay = rand() % 100000; usleep(sleep_delay); #else usleep(1000000); #endif delete(ingestion_sock); } } /* Populates the buffer pt_msgbundle with a valid message pt_msgbundle. Assumes that it is supplied with a pt_msgbundle buffer of the correct length Correct length for pt_msgbundle = 8 + (priv_out)*(msg_size) + 16 bytes */ void Client::generateMessageBundle(uint8_t priv_out, uint32_t msg_size, unsigned char *pt_msgbundle) { unsigned char *ptr = pt_msgbundle; // Setup message pt_msgbundle for(uint32_t i = 0; i < priv_out; i++) { memcpy(ptr, &id, sizeof(id)); ptr+=(sizeof(id)); memcpy(ptr, &id, sizeof(id)); ptr+=(sizeof(id)); uint32_t remaining_message_size = msg_size - (sizeof(id)*2); memset(ptr, 0, remaining_message_size); ptr+=(remaining_message_size); } if(private_routing) { // Add the tokens for this msgbundle memcpy(ptr, token_list, config.m_priv_out * TOKEN_SIZE); } } bool Client::encryptMessageBundle(uint32_t enc_bundle_size, unsigned char *pt_msgbundle, unsigned char *enc_msgbundle) { // Encrypt the pt_msgbundle unsigned char *pt_msgbundle_start = pt_msgbundle; unsigned char *enc_msgbundle_start = enc_msgbundle + SGX_AESGCM_IV_SIZE; unsigned char *enc_tag = enc_msgbundle + enc_bundle_size - SGX_AESGCM_MAC_SIZE; size_t bytes_to_encrypt = enc_bundle_size - SGX_AESGCM_MAC_SIZE - SGX_AESGCM_IV_SIZE; if (bytes_to_encrypt != gcm_encrypt(pt_msgbundle_start, bytes_to_encrypt, NULL, 0, ing_key, ing_iv, SGX_AESGCM_IV_SIZE, enc_msgbundle_start, enc_tag)) { printf("Client: encryptMessageBundle FAIL\n"); return 0; } // Copy IV into the bundle memcpy(enc_msgbundle, ing_iv, SGX_AESGCM_IV_SIZE); // Update IV uint64_t *iv_ctr = (uint64_t*) ing_iv; (*iv_ctr)+=1; return 1; } void Client::sendMessageBundle() { uint16_t priv_out = config.m_priv_out; uint16_t pub_out = config.m_pub_out; uint16_t msg_size = config.msg_size; uint32_t send_pt_msgbundle_size, send_enc_msgbundle_size; if(private_routing) { send_pt_msgbundle_size = ptMsgBundleSize(priv_out, msg_size); send_enc_msgbundle_size = encMsgBundleSize(priv_out, msg_size); } else { send_pt_msgbundle_size = ptPubMsgBundleSize(pub_out, msg_size); send_enc_msgbundle_size = encPubMsgBundleSize(pub_out, msg_size); } unsigned char *send_pt_msgbundle = (unsigned char*) malloc (send_pt_msgbundle_size); unsigned char *send_enc_msgbundle = (unsigned char*) malloc (send_enc_msgbundle_size); if(private_routing) { generateMessageBundle(priv_out, msg_size, send_pt_msgbundle); } else { generateMessageBundle(pub_out, msg_size, send_pt_msgbundle); } encryptMessageBundle(send_enc_msgbundle_size, send_pt_msgbundle, send_enc_msgbundle); #ifdef VERBOSE_CLIENT displayPtMessageBundle(send_pt_msgbundle, priv_out, msg_size); #endif boost::asio::async_write(*ingestion_sock, boost::asio::buffer(send_enc_msgbundle, send_enc_msgbundle_size), [this, send_enc_msgbundle] (boost::system::error_code ecc, std::size_t) { #ifdef VERBOSE_CLIENT if(sim_id==0){ printf("TEST: Client 0 send their msgbundle\n"); } #endif if (ecc) { if(ecc == boost::asio::error::eof) { delete(storage_sock); } else { printf("Client: boost async_write failed for sending message bundle\n"); printf("Error %s\n", ecc.message().c_str()); } return; } free(send_enc_msgbundle); }); free(send_pt_msgbundle); } int Client::sendIngAuthMessage(unsigned long epoch_no) { uint32_t auth_size = sizeof(clientid_t) + sizeof(unsigned long) + SGX_AESGCM_KEY_SIZE; unsigned char *auth_message = (unsigned char*) malloc(auth_size); unsigned char *am_ptr = auth_message; memcpy(am_ptr, &sim_id, sizeof(sim_id)); am_ptr+=sizeof(sim_id); memcpy(am_ptr, &epoch_no, sizeof(unsigned long)); am_ptr+=sizeof(unsigned long); unsigned char zeroes[SGX_AESGCM_KEY_SIZE] = {0}; unsigned char tag[SGX_AESGCM_MAC_SIZE] = {0}; unsigned char epoch_iv[SGX_AESGCM_IV_SIZE] = {0}; memcpy(epoch_iv, &epoch_no, sizeof(epoch_no)); if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, ing_key, epoch_iv, SGX_AESGCM_IV_SIZE, am_ptr, tag)) { printf("generateClientKeys failed\n"); return -1; } #ifdef VERBOSE_CLIENT printf("Client %d auth_message: \n", id); for(int i=0; i=PORT_END) { port_no = PORT_START; } curr_ip.increment(nthreads); #else if(port_no>=PORT_END) { port_no = PORT_START; curr_ip.increment(nthreads); } #endif uint16_t ing_no = i % num_ing_nodes; uint16_t stg_no = i % num_stg_nodes; uint16_t stg_node_id = storage_map[stg_no]; uint16_t ing_node_id = ingestion_map[ing_no]; clients[i].setup_client(io_context, i, ing_node_id, stg_node_id, &curr_ip, port_no); } printf("Done with all client_setup calls. Thread_no = %d\n", thread_no); } /* Epochs are server driven. In a single epoch, each client waits to receive from their storage server (i) a token bundle for this epoch and (ii) their messages from the last epoch The client then sends their messages for this epoch to their ingestion servers using the tokens they received in this epoch */ void Client::epoch_process() { uint32_t pt_token_size = (config.m_priv_out * SGX_AESGCM_KEY_SIZE); uint32_t token_bundle_size = pt_token_size + SGX_AESGCM_IV_SIZE + SGX_AESGCM_MAC_SIZE; unsigned char *enc_tokens = (unsigned char*) malloc (token_bundle_size); if(private_routing) { //Async read the encrypted tokens for this epoch boost::asio::async_read(*storage_sock, boost::asio::buffer(enc_tokens, token_bundle_size), [this, enc_tokens, token_bundle_size, pt_token_size] (boost::system::error_code ec, std::size_t) { if (ec) { if(ec == boost::asio::error::eof) { delete(storage_sock); } else { printf("Error %s\n", ec.message().c_str()); printf("Client::epoch_process boost async_read_tokens failed\n"); } return; } #ifdef VERBOSE_CLIENT if(sim_id == 0) { printf("TEST: Client 0: Encrypted token bundle received:\n"); for(uint32_t i = 0; i < token_bundle_size; i++) { printf("%x", enc_tokens[i]); } printf("\n"); } #endif // Decrypt the token bundle unsigned char *enc_tkn_ptr = enc_tokens + SGX_AESGCM_IV_SIZE; unsigned char *enc_tkn_tag = enc_tokens + SGX_AESGCM_IV_SIZE + pt_token_size; int decrypted_bytes = gcm_decrypt(enc_tkn_ptr, pt_token_size, NULL, 0, enc_tkn_tag, (unsigned char*) &(this->stg_key), enc_tokens, SGX_AESGCM_IV_SIZE, (unsigned char*) (this->token_list)); if(decrypted_bytes != pt_token_size) { printf("Client::epoch_process gcm_decrypt tokens failed. decrypted_bytes = %d \n", decrypted_bytes); } free(enc_tokens); /* unsigned char *tkn_ptr = (unsigned char*) this->token_list; if(sim_id==0) { printf("TEST: Client 0: Decrypted client tokens:\n"); for(int i = 0; i < 2 * SGX_AESGCM_KEY_SIZE; i++) { printf("%x", tkn_ptr[i]); } printf("\n"); } */ // Async read the messages recieved in the last epoch uint16_t priv_in = config.m_priv_in; uint16_t msg_size = config.msg_size; uint32_t recv_pt_mailbox_size = ptMailboxSize(priv_in, msg_size); uint32_t recv_enc_mailbox_size = encMailboxSize(priv_in, msg_size); unsigned char *recv_pt_mailbox = (unsigned char*) malloc (recv_pt_mailbox_size); unsigned char *recv_enc_mailbox = (unsigned char*) malloc (recv_enc_mailbox_size); boost::asio::async_read(*storage_sock, boost::asio::buffer(recv_enc_mailbox, recv_enc_mailbox_size), [this, recv_pt_mailbox, recv_enc_mailbox] (boost::system::error_code ecc, std::size_t) { if (ecc) { if(ecc == boost::asio::error::eof) { delete(storage_sock); } else { printf("Client: boost async_read failed for recieving msg_bundle\n"); printf("Error %s\n", ecc.message().c_str()); } return; } #ifdef VERBOSE_CLIENT if(sim_id == 0) { printf("TEST: Client 0: Encrypted msgbundle received\n"); } #endif // Do whatever processing with the received messages here free(recv_enc_mailbox); free(recv_pt_mailbox); // Send this epoch's message bundle sendMessageBundle(); epoch_process(); }); }); } else { // Async read the messages recieved in the last epoch uint16_t pub_in = config.m_pub_in; uint16_t msg_size = config.msg_size; uint32_t recv_pt_mailbox_size = ptMailboxSize(pub_in, msg_size); uint32_t recv_enc_mailbox_size = encMailboxSize(pub_in, msg_size); unsigned char *recv_pt_mailbox = (unsigned char*) malloc (recv_pt_mailbox_size); unsigned char *recv_enc_mailbox = (unsigned char*) malloc (recv_enc_mailbox_size); boost::asio::async_read(*storage_sock, boost::asio::buffer(recv_enc_mailbox, recv_enc_mailbox_size), [this, recv_pt_mailbox, recv_enc_mailbox] (boost::system::error_code ecc, std::size_t) { if (ecc) { if(ecc == boost::asio::error::eof) { delete(storage_sock); } else { printf("Error %s\n", ecc.message().c_str()); } printf("Client: boost async_read failed for recieving msg_bundle\n"); return; } // Do whatever processing with the received messages here free(recv_enc_mailbox); free(recv_pt_mailbox); // Send this epoch's message bundle sendMessageBundle(); epoch_process(); }); } } void client_epoch_process(uint32_t cstart, uint32_t cstop) { for(uint32_t i=cstart; i threads; uint32_t num_clients_total = config.user_count; size_t clients_per_thread = CEILDIV(num_clients_total, nthreads); // Generate all the clients for the experiment for(int i=0; i threads; for(int i=0; i