123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155 |
- #include <iostream>
- #include <functional>
- #include "../App/appconfig.hpp"
- // The next line suppresses a deprecation warning within boost
- #define BOOST_BIND_GLOBAL_PLACEHOLDERS
- #include "boost/property_tree/ptree.hpp"
- #include "boost/property_tree/json_parser.hpp"
- #include <boost/thread.hpp>
- #include <boost/asio.hpp>
- #include "gcm.h"
- #include "sgx_tcrypto.h"
- #include "clients.hpp"
- #include <cstdlib>
- #define CEILDIV(x,y) (((x)+(y)-1)/(y))
- Config config;
- Client *clients;
- aes_key ESK, TSK;
- std::vector<NodeConfig> ingestion_nodes, storage_nodes;
- std::vector<uint16_t> storage_map;
- std::vector<uint16_t> ingestion_map;
- unsigned long setup_time;
- uint16_t nthreads = 1;
- bool private_routing;
- // Split a hostport string like "127.0.0.1:12000" at the rightmost colon
- // into a host part "127.0.0.1" and a port part "12000".
- static bool split_host_port(std::string &host, std::string &port,
- const std::string &hostport)
- {
- size_t colon = hostport.find_last_of(':');
- if (colon == std::string::npos) {
- std::cerr << "Cannot parse \"" << hostport << "\" as host:port\n";
- return false;
- }
- host = hostport.substr(0, colon);
- port = hostport.substr(colon+1);
- return true;
- }
- // Convert a single hex character into its value from 0 to 15. Return
- // true on success, false if it wasn't a hex character.
- static inline bool hextoval(unsigned char &val, char hex)
- {
- if (hex >= '0' && hex <= '9') {
- val = ((unsigned char)hex)-'0';
- } else if (hex >= 'a' && hex <= 'f') {
- val = ((unsigned char)hex)-'a'+10;
- } else if (hex >= 'A' && hex <= 'F') {
- val = ((unsigned char)hex)-'A'+10;
- } else {
- return false;
- }
- return true;
- }
- // Convert a 2*len hex character string into a len-byte buffer. Return
- // true on success, false on failure.
- static bool hextobuf(unsigned char *buf, const char *str, size_t len)
- {
- if (strlen(str) != 2*len) {
- std::cerr << "Hex string was not the expected size\n";
- return false;
- }
- for (size_t i=0;i<len;++i) {
- unsigned char hi, lo;
- if (!hextoval(hi, str[2*i]) || !hextoval(lo, str[2*i+1])) {
- std::cerr << "Cannot parse string as hex\n";
- return false;
- }
- buf[i] = (unsigned char)((hi << 4) + lo);
- }
- return true;
- }
- void displayMessage(unsigned char *msg, uint16_t msg_size,
- uint32_t client)
- {
- std::stringstream outbuf;
- clientid_t sid, rid;
- uint32_t prio;
- unsigned char *ptr = msg;
- rid = *((clientid_t*) ptr);
- ptr+=sizeof(rid);
- if (!private_routing) {
- prio = *((uint32_t*) ptr);
- ptr+=sizeof(prio);
- }
- sid = *((clientid_t*) ptr);
- ptr+=sizeof(sid);
- if (private_routing) {
- outbuf << std::hex
- << "Cli: "
- << std::setfill('0') << std::setw(8) << client
- << ", Recv: "
- << std::setfill('0') << std::setw(8) << rid
- << ", Send: "
- << std::setfill('0') << std::setw(8) << sid
- << "\n";
- } else {
- outbuf << std::hex
- << "Cli: "
- << std::setfill('0') << std::setw(8) << client
- << ", Recv: "
- << std::setfill('0') << std::setw(8) << rid
- << ", Prio: "
- << std::setfill('0') << std::setw(8) << prio
- << ", Send: "
- << std::setfill('0') << std::setw(8) << sid
- << "\n";
- }
- unsigned char *end = msg + msg_size;
- while (ptr < end) {
- size_t remain = end-ptr;
- if (remain > 16) {
- remain = 16;
- }
- char row[34 + 17 + 2];
- memset(row, ' ', sizeof(row)-1);
- row[sizeof(row)-2] = '\n';
- row[sizeof(row)-1] = '\0';
- size_t hexoffset = 0;
- size_t charoffset = 34;
- for (size_t i=0; i<remain; ++i) {
- char hex[3];
- sprintf(hex, "%02x", ptr[i]);
- memcpy(row+hexoffset, hex, 2);
- if (ptr[i] >= ' ' && ptr[i] <= '~') {
- row[charoffset] = ptr[i];
- } else {
- row[charoffset] = '.';
- }
- hexoffset += 2;
- charoffset += 1;
- if (i == 7) {
- hexoffset += 1;
- charoffset += 1;
- }
- }
- outbuf << row;
- ptr += remain;
- }
- outbuf << "\n";
- std::cout << outbuf.str();
- }
- void displayPtMessageBundle(unsigned char *bundle, uint16_t num_out,
- uint16_t msg_size, uint32_t client)
- {
- unsigned char *ptr = bundle;
- for(int i=0; i<num_out; i++) {
- displayMessage(ptr, msg_size, client);
- ptr+=msg_size;
- }
- printf("\n");
- }
- static inline uint32_t encPubMsgBundleSize(uint16_t pub_out, uint16_t msg_size)
- {
- return SGX_AESGCM_IV_SIZE + (uint32_t(pub_out) * msg_size)
- + SGX_AESGCM_MAC_SIZE;
- }
- static inline uint32_t ptPubMsgBundleSize(uint16_t pub_out, uint16_t msg_size)
- {
- return uint32_t(pub_out) * msg_size;
- }
- static inline uint32_t encMsgBundleSize(uint16_t priv_out, uint16_t msg_size)
- {
- return SGX_AESGCM_IV_SIZE + (uint32_t(priv_out) * (msg_size + TOKEN_SIZE))
- + SGX_AESGCM_MAC_SIZE;
- }
- static inline uint32_t ptMsgBundleSize(uint16_t priv_out, uint16_t msg_size)
- {
- return uint32_t(priv_out) * (msg_size + TOKEN_SIZE);
- }
- static inline uint32_t encMailboxSize(uint16_t priv_in, uint16_t msg_size)
- {
- return SGX_AESGCM_IV_SIZE + (uint32_t(priv_in) * msg_size)
- + SGX_AESGCM_MAC_SIZE;
- }
- static inline uint32_t ptMailboxSize(uint16_t priv_in, uint16_t msg_size)
- {
- return uint32_t(priv_in) * msg_size;
- }
- bool config_parse(Config &config, const std::string configstr,
- std::vector<NodeConfig> &ingestion_nodes,
- std::vector<NodeConfig> &storage_nodes,
- std::vector<uint16_t> &storage_map)
- {
- bool found_params = false;
- bool ret = true;
- std::istringstream configstream(configstr);
- boost::property_tree::ptree conftree;
- read_json(configstream, conftree);
- uint16_t node_num = 0;
- for (auto & entry : conftree) {
- if (!entry.first.compare("params")) {
- for (auto & pentry : entry.second) {
- if (!pentry.first.compare("msg_size")) {
- config.msg_size = pentry.second.get_value<uint16_t>();
- } else if (!pentry.first.compare("user_count")) {
- config.user_count = pentry.second.get_value<uint32_t>();
- } else if (!pentry.first.compare("priv_out")) {
- config.m_priv_out = pentry.second.get_value<uint8_t>();
- } else if (!pentry.first.compare("priv_in")) {
- config.m_priv_in = pentry.second.get_value<uint8_t>();
- } else if (!pentry.first.compare("pub_out")) {
- config.m_pub_out = pentry.second.get_value<uint8_t>();
- } else if (!pentry.first.compare("pub_in")) {
- config.m_pub_in = pentry.second.get_value<uint8_t>();
- // A stub hardcoded shared secret to derive various
- // keys for client <-> server communications and tokens
- // In reality, this would be a key exchange
- } else if (!pentry.first.compare("master_secret")) {
- std::string hex_key = pentry.second.data();
- memcpy(config.master_secret, hex_key.c_str(),
- SGX_AESGCM_KEY_SIZE);
- } else if (!pentry.first.compare("private_routing")) {
- config.private_routing = pentry.second.get_value<bool>();
- } else {
- std::cerr << "Unknown field in params: " <<
- pentry.first << "\n";
- ret = false;
- }
- }
- found_params = true;
- } else if (!entry.first.compare("nodes")) {
- for (auto & node : entry.second) {
- NodeConfig nc;
- // All nodes need to be assigned their role in manifest.yaml
- nc.roles = 0;
- for (auto & nentry : node.second) {
- if (!nentry.first.compare("name")) {
- nc.name = nentry.second.get_value<std::string>();
- } else if (!nentry.first.compare("pubkey")) {
- ret &= hextobuf((unsigned char *)&nc.pubkey,
- nentry.second.get_value<std::string>().c_str(),
- sizeof(nc.pubkey));
- } else if (!nentry.first.compare("weight")) {
- nc.weight = nentry.second.get_value<std::uint8_t>();
- } else if (!nentry.first.compare("listen")) {
- ret &= split_host_port(nc.listenhost, nc.listenport,
- nentry.second.get_value<std::string>());
- } else if (!nentry.first.compare("clisten")) {
- ret &= split_host_port(nc.clistenhost, nc.clistenport,
- nentry.second.get_value<std::string>());
- } else if (!nentry.first.compare("slisten")) {
- ret &= split_host_port(nc.slistenhost, nc.slistenport,
- nentry.second.get_value<std::string>());
- } else if (!nentry.first.compare("roles")) {
- nc.roles = nentry.second.get_value<std::uint8_t>();
- } else {
- std::cerr << "Unknown field in host config: " <<
- nentry.first << "\n";
- ret = false;
- }
- }
- if(nc.roles & ROLE_INGESTION) {
- ingestion_nodes.push_back(nc);
- ingestion_map.push_back(node_num);
- }
- if(nc.roles & ROLE_STORAGE) {
- storage_nodes.push_back(std::move(nc));
- storage_map.push_back(node_num);
- }
- node_num++;
- }
- } else {
- std::cerr << "Unknown key in config: " <<
- entry.first << "\n";
- ret = false;
- }
- }
- if (!found_params) {
- std::cerr << "Could not find params in config\n";
- ret = false;
- }
- return ret;
- }
- static void usage(const char *argv0)
- {
- fprintf(stderr, "%s [-t nthreads] < config.json\n",
- argv0);
- exit(1);
- }
- /*
- Generate ESK (Encryption Secret Key) and TSK (Token Secret Key)
- */
- int generateMasterKeys(sgx_aes_gcm_128bit_key_t master_secret,
- aes_key &ESK, aes_key &TSK )
- {
- unsigned char zeroes[SGX_AESGCM_KEY_SIZE];
- unsigned char iv[SGX_AESGCM_IV_SIZE];
- unsigned char mac[SGX_AESGCM_MAC_SIZE];
- memset(iv, 0, SGX_AESGCM_IV_SIZE);
- memset(zeroes, 0, SGX_AESGCM_KEY_SIZE);
- memcpy(iv, "Encryption", sizeof("Encryption"));
- if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0,
- master_secret, iv, SGX_AESGCM_IV_SIZE, ESK, mac)) {
- printf("Client: generateMasterKeys FAIL\n");
- return -1;
- }
- printf("\n\nEncryption Master Key: ");
- for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
- printf("%x", ESK[i]);
- }
- printf("\n");
- memset(iv, 0, SGX_AESGCM_IV_SIZE);
- memcpy(iv, "Token", sizeof("Token"));
- if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0,
- master_secret, iv, SGX_AESGCM_IV_SIZE, TSK, mac)) {
- printf("generateMasterKeys failed\n");
- return -1;
- }
- printf("Token Master Key: ");
- for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
- printf("%x", TSK[i]);
- }
- printf("\n\n");
- return 1;
- }
- /*
- Takes the client_number, the master aes_key for generating client keys
- for encrypted communication with ingestion (client_ing_key) and
- storage servers (client_stg_key)
- */
- int generateClientKeys(clientid_t client_number, aes_key &ESK,
- aes_key &client_ing_key, aes_key &client_stg_key)
- {
- unsigned char zeroes[SGX_AESGCM_KEY_SIZE];
- unsigned char iv[SGX_AESGCM_IV_SIZE];
- unsigned char tag[SGX_AESGCM_MAC_SIZE];
- memset(iv, 0, SGX_AESGCM_IV_SIZE);
- memset(zeroes, 0, SGX_AESGCM_KEY_SIZE);
- memset(tag, 0, SGX_AESGCM_KEY_SIZE);
- memcpy(iv, &client_number, sizeof(client_number));
- /*
- printf("Client Key: (before Gen) ");
- for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
- printf("%x", client_ing_key[i]);
- }
- printf("\n");
- */
- if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, ESK,
- iv, SGX_AESGCM_IV_SIZE, client_ing_key, tag)) {
- printf("generateClientKeys failed\n");
- return -1;
- }
- memset(iv, 0, SGX_AESGCM_IV_SIZE);
- memcpy(iv, &client_number, sizeof(client_number));
- memcpy(iv +sizeof(client_number), "STG", sizeof("STG"));
- if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, ESK,
- iv, SGX_AESGCM_IV_SIZE, client_stg_key, tag)) {
- printf("generateClientKeys failed\n");
- return -1;
- }
- /*
- printf("Client %d Ingestion: (after Gen) ", client_number);
- for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
- printf("%x", client_ing_key[i]);
- }
- printf("\n");
- */
- return 1;
- }
- void Client::initClient(clientid_t cid, uint16_t stg_id,
- aes_key ikey, aes_key skey)
- {
- uint16_t num_storage_nodes = storage_nodes.size();
- sim_id = cid;
- id = stg_id << DEST_UID_BITS;
- id += (cid/num_storage_nodes);
- token_list = new token[config.m_priv_out];
- memcpy(ing_key, ikey, SGX_AESGCM_KEY_SIZE);
- memcpy(stg_key, skey, SGX_AESGCM_KEY_SIZE);
- }
- void Client::initializeStgSocket(boost::asio::io_context &ioc,
- NodeConfig &stg_server, ip_addr *curr_ip, uint16_t &port_no)
- {
- boost::system::error_code err;
- while(1) {
- #ifdef VERBOSE_CLIENT
- std::cerr << "Connecting to " << stg_server.name << "...\n";
- std::cout << stg_server.slistenhost << ":" << stg_server.slistenport;
- #endif
- std::string ip_str = curr_ip->ip_str();
- boost::asio::ip::address ip_address =
- boost::asio::ip::address::from_string(ip_str, err);
- if(err) {
- printf("initializeStgSocket::Invalid IP address\n");
- }
- storage_sock = new boost::asio::ip::tcp::socket(ioc);
- storage_sock->open(boost::asio::ip::tcp::v4(), err);
- while(1) {
- boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
- storage_sock->bind(ep, err);
- if (!err) {
- break;
- } else {
- printf("STG: Error %s. (%s:%d)\n", err.message().c_str(),
- (curr_ip->ip_str()).c_str(), port_no);
- port_no++;
- if(port_no >= PORT_END) {
- port_no = PORT_START;
- curr_ip->increment(nthreads);
- }
- }
- }
- boost::asio::ip::address stg_ip =
- boost::asio::ip::address::from_string(stg_server.slistenhost, err);
- boost::asio::ip::tcp::endpoint
- stg_ep(stg_ip, std::stoi(stg_server.slistenport));
- // just for printing
- // boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
- storage_sock->connect(stg_ep, err);
- if (!err) {
- break;
- }
- std::cerr <<"STG: Connection from " <<
- curr_ip->ip_str() << ":" << port_no <<
- " to " << stg_server.name << " refused, will retry.\n";
- #ifdef RANDOMIZE_CLIENT_RETRY_SLEEP_TIME
- int sleep_delay = rand() % 100000;
- usleep(sleep_delay);
- #else
- usleep(1000000);
- #endif
- delete storage_sock;
- storage_sock = nullptr;
- }
- }
- void Client::initializeIngSocket(boost::asio::io_context &ioc,
- NodeConfig &ing_server, ip_addr *curr_ip, uint16_t &port_no)
- {
- boost::system::error_code err;
- boost::asio::ip::tcp::resolver resolver(ioc);
- while(1) {
- #ifdef VERBOSE_CLIENT
- std::cerr << "Connecting to " << ing_server.name << "...\n";
- std::cout << ing_server.clistenhost << ":" << ing_server.clistenport;
- #endif
- std::string ip_str = curr_ip->ip_str();
- boost::asio::ip::address ip_address =
- boost::asio::ip::address::from_string(ip_str, err);
- if(err) {
- printf("initializeIngSocket::Invalid IP address\n");
- }
- ingestion_sock = new boost::asio::ip::tcp::socket(ioc);
- ingestion_sock->open(boost::asio::ip::tcp::v4(), err);
- while(1) {
- boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
- ingestion_sock->bind(ep, err);
- if (!err) {
- break;
- } else {
- printf("ING: Error %s. (%s:%d)\n", err.message().c_str(),
- (curr_ip->ip_str()).c_str(), port_no);
- port_no++;
- if(port_no >= PORT_END) {
- port_no = PORT_START;
- curr_ip->increment(nthreads);
- }
- }
- }
- // just for printing
- boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
- boost::asio::ip::address ing_ip =
- boost::asio::ip::address::from_string(ing_server.clistenhost, err);
- boost::asio::ip::tcp::endpoint
- ing_ep(ing_ip, std::stoi(ing_server.clistenport));
- //std::cout << "Ing endpoint:" << ing_ep << "\n";
- //std::cout<<"ING: Attempting to connect client " << ep << " -> " << ing_ep <<"\n";
- ingestion_sock->connect(ing_ep, err);
- /*
- boost::asio::connect(*ingestion_sock,
- resolver.resolve(ing_server.clistenhost,
- ing_server.clistenport), err);
- */
- if (!err) {
- //std::cout<<"ING: Connected client " << ep << " -> " << ing_ep <<"\n";
- break;
- }
- std::cerr << "ING: Connection from " <<
- curr_ip->ip_str() << ":" << port_no <<
- " to " << ing_server.name << " refused, will retry.\n";
- #ifdef RANDOMIZE_CLIENT_RETRY_SLEEP_TIME
- int sleep_delay = rand() % 100000;
- usleep(sleep_delay);
- #else
- usleep(1000000);
- #endif
- delete ingestion_sock;
- ingestion_sock = nullptr;
- }
- }
- /*
- Populates the buffer pt_msgbundle with a valid message pt_msgbundle.
- Assumes that it is supplied with a pt_msgbundle buffer of the correct length
- num_out is either priv_out or pub_out, depending on whether we're
- doing private or public routing
- Correct length for pt_msgbundle = (num_out)*(msg_size) +
- (only for private routing) (num_out)*TOKEN_SIZE
- */
- void Client::generateMessageBundle(uint8_t num_out, uint32_t msg_size,
- unsigned char *pt_msgbundle)
- {
- unsigned char *ptr = pt_msgbundle;
- // Setup message pt_msgbundle
- for(uint32_t i = 0; i < num_out; i++) {
- // For benchmarking, each client just sends messages to
- // themselves, so the destination and source ids are the same.
- // Receiver id
- uint32_t rid = id;
- #if 0
- uint32_t dest_uid_mask = (1 << DEST_UID_BITS) - 1;
- if (!private_routing) {
- // If we're testing public routing, have each user send a
- // message to the user with the same local id, but on
- // storage server 0.
- rid &= dest_uid_mask;
- }
- #endif
- unsigned char *start_ptr = ptr;
- memcpy(ptr, &rid, sizeof(rid));
- ptr += sizeof(rid);
- // Priority (for public routing only)
- if (!private_routing) {
- uint32_t priority = 0;
- #ifdef SHOW_RECEIVED_MESSAGES
- // If we're testing public routing, set the priority so that
- // messages to different users will have the highest
- // priority messages sent from users at different servers
- uint32_t id_low_bits = id &
- ((1 << DEST_STORAGE_NODE_BITS) - 1);
- priority = id ^ (id_low_bits << DEST_UID_BITS);
- #endif
- memcpy(ptr, &priority, sizeof(priority));
- ptr += sizeof(priority);
- }
- // Sender id is our id
- memcpy(ptr, &id, sizeof(id));
- ptr += sizeof(id);
- uint32_t remaining_message_size = start_ptr + msg_size - ptr;
- memset(ptr, 0, remaining_message_size);
- #ifdef SHOW_RECEIVED_MESSAGES
- snprintf((char *)ptr, remaining_message_size, "From %08x to %08x",
- id, rid);
- #endif
- ptr+=(remaining_message_size);
- }
- if(private_routing) {
- // Add the tokens for this msgbundle
- memcpy(ptr, token_list, config.m_priv_out * TOKEN_SIZE);
- }
- }
- bool Client::encryptMessageBundle(uint32_t enc_bundle_size,
- unsigned char *pt_msgbundle, unsigned char *enc_msgbundle)
- {
- // Encrypt the pt_msgbundle
- unsigned char *pt_msgbundle_start = pt_msgbundle;
- unsigned char *enc_msgbundle_start = enc_msgbundle + SGX_AESGCM_IV_SIZE;
- unsigned char *enc_tag = enc_msgbundle + enc_bundle_size - SGX_AESGCM_MAC_SIZE;
- size_t bytes_to_encrypt = enc_bundle_size - SGX_AESGCM_MAC_SIZE -
- SGX_AESGCM_IV_SIZE;
- if (bytes_to_encrypt != gcm_encrypt(pt_msgbundle_start, bytes_to_encrypt,
- NULL, 0, ing_key, ing_iv, SGX_AESGCM_IV_SIZE, enc_msgbundle_start,
- enc_tag)) {
- printf("Client: encryptMessageBundle FAIL\n");
- return 0;
- }
- // Copy IV into the bundle
- memcpy(enc_msgbundle, ing_iv, SGX_AESGCM_IV_SIZE);
- // Update IV
- uint64_t *iv_ctr = (uint64_t*) ing_iv;
- (*iv_ctr)+=1;
- return 1;
- }
- #ifdef TRACE_SOCKIO
- class LimitLogger {
- std::string label;
- std::string thrid;
- struct timeval last_log;
- size_t num_items;
- public:
- LimitLogger(const char *_label): label(_label), last_log({0,0}),
- num_items(0) {
- std::stringstream ss;
- ss << boost::this_thread::get_id();
- thrid = ss.str();
- }
- void log() {
- struct timeval now;
- gettimeofday(&now, NULL);
- long elapsedus = (now.tv_sec - last_log.tv_sec) * 1000000
- + (now.tv_usec - last_log.tv_usec);
- if (num_items > 0 && elapsedus > 500000) {
- printf("%lu.%06lu: Thread %s end %s of %lu items\n",
- last_log.tv_sec, last_log.tv_usec,
- thrid.c_str(), label.c_str(),
- num_items);
- num_items = 0;
- }
- if (num_items == 0) {
- printf("%lu.%06lu: Thread %s begin %s\n", now.tv_sec,
- now.tv_usec, thrid.c_str(), label.c_str());
- }
- gettimeofday(&last_log, NULL);
- ++num_items;
- }
- };
- static thread_local LimitLogger
- recvlogger("recv"), queuelogger("queue"), sentlogger("sent");
- #endif
- void Client::sendMessageBundle()
- {
- uint16_t priv_out = config.m_priv_out;
- uint16_t pub_out = config.m_pub_out;
- uint16_t msg_size = config.msg_size;
- uint32_t send_pt_msgbundle_size, send_enc_msgbundle_size;
- if(private_routing) {
- send_pt_msgbundle_size = ptMsgBundleSize(priv_out, msg_size);
- send_enc_msgbundle_size = encMsgBundleSize(priv_out, msg_size);
- } else {
- send_pt_msgbundle_size = ptPubMsgBundleSize(pub_out, msg_size);
- send_enc_msgbundle_size = encPubMsgBundleSize(pub_out, msg_size);
- }
- unsigned char *send_pt_msgbundle =
- (unsigned char*) malloc (send_pt_msgbundle_size);
- unsigned char *send_enc_msgbundle =
- (unsigned char*) malloc (send_enc_msgbundle_size);
- if(private_routing) {
- generateMessageBundle(priv_out, msg_size, send_pt_msgbundle);
- } else {
- generateMessageBundle(pub_out, msg_size, send_pt_msgbundle);
- }
- encryptMessageBundle(send_enc_msgbundle_size, send_pt_msgbundle,
- send_enc_msgbundle);
- #ifdef VERBOSE_CLIENT
- displayPtMessageBundle(send_pt_msgbundle, priv_out, msg_size);
- #endif
- free(send_pt_msgbundle);
- #ifdef TRACE_SOCKIO
- queuelogger.log();
- #endif
- boost::asio::async_write(*ingestion_sock,
- boost::asio::buffer(send_enc_msgbundle, send_enc_msgbundle_size),
- [this, send_enc_msgbundle] (boost::system::error_code ecc, std::size_t) {
- #ifdef TRACE_SOCKIO
- sentlogger.log();
- #endif
- #ifdef VERBOSE_CLIENT
- if(sim_id==0){
- printf("TEST: Client 0 send their msgbundle\n");
- }
- #endif
- free(send_enc_msgbundle);
- if (ecc) {
- if(ecc == boost::asio::error::eof) {
- delete storage_sock;
- storage_sock = nullptr;
- } else {
- printf("Client: boost async_write failed for sending "
- "message bundle\n");
- printf("Error %s\n", ecc.message().c_str());
- }
- return;
- }
- });
- }
- int Client::sendIngAuthMessage(unsigned long epoch_no)
- {
- uint32_t auth_size = sizeof(clientid_t) + sizeof(unsigned long) +
- SGX_AESGCM_KEY_SIZE;
- unsigned char *auth_message = (unsigned char*) malloc(auth_size);
- unsigned char *am_ptr = auth_message;
- memcpy(am_ptr, &sim_id, sizeof(sim_id));
- am_ptr+=sizeof(sim_id);
- memcpy(am_ptr, &epoch_no, sizeof(unsigned long));
- am_ptr+=sizeof(unsigned long);
- unsigned char zeroes[SGX_AESGCM_KEY_SIZE] = {0};
- unsigned char tag[SGX_AESGCM_MAC_SIZE] = {0};
- unsigned char epoch_iv[SGX_AESGCM_IV_SIZE] = {0};
- memcpy(epoch_iv, &epoch_no, sizeof(epoch_no));
- if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE,
- NULL, 0, ing_key, epoch_iv, SGX_AESGCM_IV_SIZE,
- am_ptr, tag)) {
- printf("sendIngAuthMessage failed\n");
- return -1;
- }
- #ifdef VERBOSE_CLIENT
- printf("Client %d auth_message: \n", id);
- for(int i=0; i<auth_size; i++) {
- printf("%x", auth_message[i]);
- }
- printf("\n");
- #endif
- /*
- if(sim_id%7919==0) {
- printf("Client %d auth_message: \n", sim_id);
- for(int i=0; i<TOKEN_SIZE; i++) {
- printf("%x", am_ptr[i]);
- }
- printf("\n");
- }
- */
- boost::asio::write(*ingestion_sock,
- boost::asio::buffer(auth_message, auth_size));
- return 1;
- }
- int Client::sendStgAuthMessage(unsigned long epoch_no)
- {
- uint32_t auth_size = sizeof(clientid_t) + sizeof(unsigned long) +
- SGX_AESGCM_KEY_SIZE;
- unsigned char *auth_message = (unsigned char*) malloc(auth_size);
- unsigned char *am_ptr = auth_message;
- memcpy(am_ptr, &sim_id, sizeof(sim_id));
- am_ptr+=sizeof(sim_id);
- memcpy(am_ptr, &epoch_no, sizeof(unsigned long));
- am_ptr+=sizeof(unsigned long);
- unsigned char zeroes[SGX_AESGCM_KEY_SIZE] = {0};
- unsigned char tag[SGX_AESGCM_MAC_SIZE] = {0};
- unsigned char epoch_iv[SGX_AESGCM_IV_SIZE] = {0};
- memcpy(epoch_iv, &epoch_no, sizeof(epoch_no));
- if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE,
- NULL, 0, stg_key, epoch_iv, SGX_AESGCM_IV_SIZE,
- am_ptr, tag)) {
- printf("sendStgAuthMessage failed\n");
- return -1;
- }
- #ifdef VERBOSE_CLIENT
- printf("Client %d auth_message: \n", id);
- for(int i=0; i<auth_size; i++) {
- printf("%x", auth_message[i]);
- }
- printf("\n");
- #endif
- boost::asio::async_write(*storage_sock,
- boost::asio::buffer(auth_message, auth_size),
- [this, auth_message] (boost::system::error_code ecc, std::size_t) {
- free(auth_message);
- if (ecc) {
- if(ecc == boost::asio::error::eof) {
- delete storage_sock;
- storage_sock = nullptr;
- } else {
- printf("Error %s\n", ecc.message().c_str());
- }
- printf("Client::sendStgAuthMessage boost async_write failed\n");
- return;
- }
- });
- return 1;
- }
- void Client::setup_client(boost::asio::io_context &io_context,
- uint32_t sim_id, uint16_t ing_node_id, uint16_t stg_node_id,
- ip_addr *curr_ip, uint16_t &port_no)
- {
- // Set up the client's
- // (i) client_id
- // (ii) symmetric keys shared with their ingestion and storage server
- // (iii) sockets to their ingestion and storage server
- aes_key client_ing_key;
- aes_key client_stg_key;
- int ret = generateClientKeys(sim_id, ESK, client_ing_key, client_stg_key);
- initClient(sim_id, stg_node_id, client_ing_key, client_stg_key);
- initializeStgSocket(io_context, storage_nodes[stg_node_id], curr_ip, port_no);
- port_no++;
- initializeIngSocket(io_context, ingestion_nodes[ing_node_id], curr_ip, port_no);
- port_no++;
- // Authenticate clients to their ingestion and storage servers
- struct timespec ep;
- clock_gettime(CLOCK_REALTIME_COARSE, &ep);
- unsigned long time_in_us = ep.tv_sec * 1000000 + ep.tv_nsec/1000;
- unsigned long epoch_no = CEILDIV(time_in_us, 5000000);
- sendStgAuthMessage(epoch_no);
- sendIngAuthMessage(epoch_no);
- epoch_process();
- }
- void generateClients(boost::asio::io_context &io_context,
- uint32_t cstart, uint32_t cstop, uint8_t thread_no)
- {
- uint32_t num_clients_total = config.user_count;
- uint16_t num_stg_nodes = storage_nodes.size();
- uint16_t num_ing_nodes = ingestion_nodes.size();
- uint16_t port_no = PORT_START;
- ip_addr curr_ip;
- curr_ip.ip1 = 127;
- curr_ip.ip2 = 1 + thread_no;
- curr_ip.ip3 = 0;
- curr_ip.ip4 = 0;
- for(uint32_t i=cstart; i<cstop; i++) {
- // Compute client's ip and port
- #ifdef CLIENT_UNIQUE_IP
- if(port_no>=PORT_END) {
- port_no = PORT_START;
- }
- curr_ip.increment(nthreads);
- #else
- if(port_no>=PORT_END) {
- port_no = PORT_START;
- curr_ip.increment(nthreads);
- }
- #endif
- uint16_t ing_no = i % num_ing_nodes;
- uint16_t stg_no = i % num_stg_nodes;
- uint16_t stg_node_id = storage_map[stg_no];
- uint16_t ing_node_id = ingestion_map[ing_no];
- clients[i].setup_client(io_context, i, ing_node_id, stg_node_id,
- &curr_ip, port_no);
- }
- printf("Done with all client_setup calls. Thread_no = %d\n", thread_no);
- }
- /*
- Epochs are server driven.
- In a single epoch, each client waits to receive from their storage server
- (i) a token bundle for this epoch and (ii) their messages from the last epoch
- The client then sends their messages for this epoch to their ingestion servers
- using the tokens they received in this epoch
- */
- void Client::epoch_process() {
- uint32_t pt_token_size = uint32_t(config.m_priv_out) * TOKEN_SIZE;
- uint32_t token_bundle_size = pt_token_size + SGX_AESGCM_IV_SIZE
- + SGX_AESGCM_MAC_SIZE;
- unsigned char *enc_tokens = nullptr;
- uint16_t num_in = config.m_pub_in;
- std::vector<boost::asio::mutable_buffer> toreceive;
- if (private_routing) {
- enc_tokens = (unsigned char*) malloc (token_bundle_size);
- toreceive.push_back(boost::asio::buffer(enc_tokens,
- token_bundle_size));
- num_in = config.m_priv_in;
- }
- uint16_t msg_size = config.msg_size;
- uint32_t recv_pt_mailbox_size = ptMailboxSize(num_in, msg_size);
- uint32_t recv_enc_mailbox_size = encMailboxSize(num_in, msg_size);
- unsigned char *recv_pt_mailbox =
- (unsigned char*) malloc (recv_pt_mailbox_size);
- unsigned char *recv_enc_mailbox =
- (unsigned char*) malloc (recv_enc_mailbox_size);
- toreceive.push_back(boost::asio::buffer(recv_enc_mailbox,
- recv_enc_mailbox_size));
- // Async read the encrypted tokens (for private routing only) and
- // encrypted mailbox (both private and public routing) for this
- // epoch
- boost::asio::async_read(*storage_sock, toreceive,
- [this, enc_tokens, token_bundle_size, pt_token_size, num_in,
- recv_pt_mailbox, recv_enc_mailbox, recv_pt_mailbox_size]
- (boost::system::error_code ec, std::size_t) {
- if (ec) {
- if(ec == boost::asio::error::eof) {
- delete storage_sock;
- storage_sock = nullptr;
- } else {
- printf("Error %s\n", ec.message().c_str());
- printf("Client::epoch_process boost "
- "async_read_tokens failed\n");
- }
- free(enc_tokens);
- free(recv_pt_mailbox);
- free(recv_enc_mailbox);
- return;
- }
- #ifdef TRACE_SOCKIO
- recvlogger.log();
- #endif
- #ifdef VERBOSE_CLIENT
- if(sim_id == 0) {
- printf("TEST: Client 0: Encrypted token bundle received:\n");
- for(uint32_t i = 0; i < token_bundle_size; i++) {
- printf("%02x", enc_tokens[i]);
- }
- printf("\n");
- printf("TEST: Client 0: Encrypted msgbundle received\n");
- }
- #endif
- if (private_routing) {
- // Decrypt the token bundle
- unsigned char *enc_tkn_ptr = enc_tokens + SGX_AESGCM_IV_SIZE;
- unsigned char *enc_tkn_tag = enc_tokens + SGX_AESGCM_IV_SIZE +
- pt_token_size;
- int decrypted_bytes = gcm_decrypt(enc_tkn_ptr, pt_token_size,
- NULL, 0, enc_tkn_tag, (unsigned char*) &(this->stg_key),
- enc_tokens, SGX_AESGCM_IV_SIZE,
- (unsigned char*) (this->token_list));
- if (decrypted_bytes != pt_token_size) {
- printf("Client::epoch_process gcm_decrypt tokens failed. "
- "decrypted_bytes = %d\n", decrypted_bytes);
- }
- free(enc_tokens);
- /*
- unsigned char *tkn_ptr = (unsigned char*) this->token_list;
- if(sim_id==0) {
- printf("TEST: Client 0: Decrypted client tokens:\n");
- for(int i = 0; i < pt_token_size; i++) {
- printf("%02x", tkn_ptr[i]);
- }
- printf("\n");
- }
- */
- }
- // Do whatever processing with the received messages here
- // but for the benchmark, we just ignore the received
- // messages (unless we want to print them)
- #ifdef SHOW_RECEIVED_MESSAGES
- unsigned char *recv_enc_mailbox_ptr =
- recv_enc_mailbox + SGX_AESGCM_IV_SIZE;
- unsigned char *recv_enc_mailbox_tag =
- recv_enc_mailbox + SGX_AESGCM_IV_SIZE + recv_pt_mailbox_size;
- int decrypted_bytes = gcm_decrypt(recv_enc_mailbox_ptr,
- recv_pt_mailbox_size, NULL, 0, recv_enc_mailbox_tag,
- (unsigned char*) &(this->stg_key), recv_enc_mailbox,
- SGX_AESGCM_IV_SIZE, recv_pt_mailbox);
- if (decrypted_bytes != recv_pt_mailbox_size) {
- printf("Client::epoch_process gcm_decrypt mailbox failed. "
- "decrypted_bytes = %d\n", decrypted_bytes);
- }
- if (sim_id < 32) {
- displayPtMessageBundle(recv_pt_mailbox, num_in,
- config.msg_size, id);
- }
- #endif
- free(recv_enc_mailbox);
- free(recv_pt_mailbox);
- // Send this epoch's message bundle
- sendMessageBundle();
- epoch_process();
- });
- }
- void initializeClients(boost::asio::io_context &io_context, uint16_t nthreads)
- {
- std::vector<boost::thread> threads;
- uint32_t num_clients_total = config.user_count;
- size_t clients_per_thread = CEILDIV(num_clients_total, nthreads);
- // Generate all the clients for the experiment
- for(int i=0; i<nthreads; i++) {
- uint32_t cstart, cstop;
- cstart = i * clients_per_thread;
- cstop = (i==(nthreads-1))? num_clients_total: (i+1) * clients_per_thread;
- #ifdef VERBOSE_CLIENT
- printf("Thread %d, cstart = %d, cstop = %d\n", i, cstart, cstop);
- #endif
- threads.emplace_back(boost::thread(generateClients,
- boost::ref(io_context), cstart, cstop, i));
- }
- for(int i=0; i<nthreads; i++) {
- threads[i].join();
- }
- }
- int main(int argc, char **argv)
- {
- // Unbuffer stdout
- setbuf(stdout, NULL);
- const char *progname = argv[0];
- ++argv;
- // Parse options
- while (*argv && (*argv)[0] == '-') {
- if (!strcmp(*argv, "-t")) {
- if (argv[1] == NULL) {
- usage(progname);
- }
- nthreads = uint16_t(atoi(argv[1]));
- argv += 2;
- } else {
- usage(progname);
- }
- }
- // Read the config.json from the first line of stdin. We have to do
- // this before outputting anything to avoid potential deadlock with
- // the launch program.
- std::string configstr;
- std::getline(std::cin, configstr);
- boost::asio::io_context io_context;
- if (!config_parse(config, configstr, ingestion_nodes,
- storage_nodes, storage_map)) {
- exit(1);
- }
- private_routing = config.private_routing;
- clients = new Client[config.user_count];
- #ifdef VERBOSE_CLIENT
- printf("Number of ingestion_nodes = %ld, Number of storage_node = %ld\n",
- ingestion_nodes.size(), storage_nodes.size());
- #endif
- generateMasterKeys(config.master_secret, ESK, TSK);
- // Queue up the actual work
- boost::asio::post(io_context, [&]{
- initializeClients(io_context, nthreads);
- });
- // Start background threads; one thread will perform the work and the
- // others will execute the async_write/async_read handlers
- std::vector<boost::thread> threads;
- for (int i=0; i<nthreads; i++) {
- threads.emplace_back([&]{io_context.run();});
- }
- io_context.run();
- for (int i=0; i<nthreads; i++) {
- threads[i].join();
- }
- delete [] clients;
- }
|