浏览代码

Changes to client for server-driven epochs. Clients do all read and writes asynchronously using boost::async_read/write. Clients use tokens received by the storage server for sending message bundles

Sajin Sasy 1 年之前
父节点
当前提交
c411e07295
共有 2 个文件被更改,包括 347 次插入151 次删除
  1. 324 140
      Client/clients.cpp
  2. 23 11
      Client/clients.hpp

+ 324 - 140
Client/clients.cpp

@@ -6,14 +6,22 @@
 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
 #include "boost/property_tree/ptree.hpp"
 #include "boost/property_tree/ptree.hpp"
 #include "boost/property_tree/json_parser.hpp"
 #include "boost/property_tree/json_parser.hpp"
+#include <boost/thread.hpp>
 #include <boost/asio.hpp>
 #include <boost/asio.hpp>
-#include <thread>
 #include "gcm.h"
 #include "gcm.h"
 #include "sgx_tcrypto.h"
 #include "sgx_tcrypto.h"
 #include "clients.hpp"
 #include "clients.hpp"
 
 
 #define CEILDIV(x,y) (((x)+(y)-1)/(y))
 #define CEILDIV(x,y) (((x)+(y)-1)/(y))
 
 
+Config config;
+Client *clients;
+aes_key ESK, TSK;
+std::vector<NodeConfig> ingestion_nodes, storage_nodes;
+std::vector<uint16_t> storage_map;
+std::vector<uint16_t> ingestion_map;
+unsigned long setup_time;
+
 // Split a hostport string like "127.0.0.1:12000" at the rightmost colon
 // Split a hostport string like "127.0.0.1:12000" at the rightmost colon
 // into a host part "127.0.0.1" and a port part "12000".
 // into a host part "127.0.0.1" and a port part "12000".
 static bool split_host_port(std::string &host, std::string &port,
 static bool split_host_port(std::string &host, std::string &port,
@@ -123,12 +131,12 @@ void displayEncMessageBundle(unsigned char *bundle, uint16_t priv_out,
 
 
 static inline uint32_t encMsgBundleSize(uint16_t priv_out, uint16_t msg_size)
 static inline uint32_t encMsgBundleSize(uint16_t priv_out, uint16_t msg_size)
 {
 {
-    return(SGX_AESGCM_IV_SIZE + (priv_out * msg_size) + SGX_AESGCM_MAC_SIZE);
+    return(SGX_AESGCM_IV_SIZE + (priv_out * (msg_size + TOKEN_SIZE)) + SGX_AESGCM_MAC_SIZE);
 }
 }
 
 
 static inline uint32_t ptMsgBundleSize(uint16_t priv_out, uint16_t msg_size)
 static inline uint32_t ptMsgBundleSize(uint16_t priv_out, uint16_t msg_size)
 {
 {
-    return((priv_out * msg_size));
+    return(priv_out * (msg_size + TOKEN_SIZE));
 }
 }
 
 
 bool config_parse(Config &config, const std::string configstr,
 bool config_parse(Config &config, const std::string configstr,
@@ -194,6 +202,9 @@ bool config_parse(Config &config, const std::string configstr,
                     } else if (!nentry.first.compare("clisten")) {
                     } else if (!nentry.first.compare("clisten")) {
                         ret &= split_host_port(nc.clistenhost, nc.clistenport,
                         ret &= split_host_port(nc.clistenhost, nc.clistenport,
                             nentry.second.get_value<std::string>());
                             nentry.second.get_value<std::string>());
+                    } else if (!nentry.first.compare("slisten")) {
+                        ret &= split_host_port(nc.slistenhost, nc.slistenport,
+                            nentry.second.get_value<std::string>());
                     } else if (!nentry.first.compare("roles")) {
                     } else if (!nentry.first.compare("roles")) {
                         nc.roles = nentry.second.get_value<std::uint8_t>();
                         nc.roles = nentry.second.get_value<std::uint8_t>();
                     } else {
                     } else {
@@ -203,7 +214,8 @@ bool config_parse(Config &config, const std::string configstr,
                     }
                     }
                 }
                 }
                 if(nc.roles & ROLE_INGESTION) {
                 if(nc.roles & ROLE_INGESTION) {
-                    ingestion_nodes.push_back(std::move(nc));
+                    ingestion_nodes.push_back(nc);
+                    ingestion_map.push_back(node_num);
                 }
                 }
                 if(nc.roles & ROLE_STORAGE) {
                 if(nc.roles & ROLE_STORAGE) {
                     storage_nodes.push_back(std::move(nc));
                     storage_nodes.push_back(std::move(nc));
@@ -235,11 +247,11 @@ static void usage(const char *argv0)
 
 
 /*
 /*
 
 
-    Generate EMK (Encryption master Secret Key) and TMK (Token master Secret Key)
+    Generate ESK (Encryption Secret Key) and TSK (Token Secret Key)
 
 
 */
 */
 int generateMasterKeys(sgx_aes_gcm_128bit_key_t master_secret,
 int generateMasterKeys(sgx_aes_gcm_128bit_key_t master_secret,
-   aes_key &EMK, aes_key &TMK )
+   aes_key &ESK, aes_key &TSK )
 {
 {
     unsigned char zeroes[SGX_AESGCM_KEY_SIZE];
     unsigned char zeroes[SGX_AESGCM_KEY_SIZE];
     unsigned char iv[SGX_AESGCM_IV_SIZE];
     unsigned char iv[SGX_AESGCM_IV_SIZE];
@@ -249,28 +261,28 @@ int generateMasterKeys(sgx_aes_gcm_128bit_key_t master_secret,
     memcpy(iv, "Encryption", sizeof("Encryption"));
     memcpy(iv, "Encryption", sizeof("Encryption"));
 
 
     if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0,
     if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0,
-            master_secret, iv, SGX_AESGCM_IV_SIZE, EMK, mac)) {
+            master_secret, iv, SGX_AESGCM_IV_SIZE, ESK, mac)) {
         printf("Client: generateMasterKeys FAIL\n");
         printf("Client: generateMasterKeys FAIL\n");
         return -1;
         return -1;
     }
     }
 
 
     printf("\n\nEncryption Master Key: ");
     printf("\n\nEncryption Master Key: ");
     for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
     for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
-        printf("%x", EMK[i]);
+        printf("%x", ESK[i]);
     }
     }
     printf("\n");
     printf("\n");
 
 
     memset(iv, 0, SGX_AESGCM_IV_SIZE);
     memset(iv, 0, SGX_AESGCM_IV_SIZE);
     memcpy(iv, "Token", sizeof("Token"));
     memcpy(iv, "Token", sizeof("Token"));
     if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0,
     if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0,
-            master_secret, iv, SGX_AESGCM_IV_SIZE, TMK, mac)) {
+            master_secret, iv, SGX_AESGCM_IV_SIZE, TSK, mac)) {
         printf("generateMasterKeys failed\n");
         printf("generateMasterKeys failed\n");
         return -1;
         return -1;
     }
     }
 
 
     printf("Token Master Key: ");
     printf("Token Master Key: ");
     for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
     for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
-        printf("%x", TMK[i]);
+        printf("%x", TSK[i]);
     }
     }
     printf("\n\n");
     printf("\n\n");
 
 
@@ -282,7 +294,7 @@ int generateMasterKeys(sgx_aes_gcm_128bit_key_t master_secret,
     for encrypted communication with ingestion (client_ing_key) and
     for encrypted communication with ingestion (client_ing_key) and
     storage servers (client_stg_key)
     storage servers (client_stg_key)
 */
 */
-int generateClientKeys(clientid_t client_number, aes_key &EMK,
+int generateClientKeys(clientid_t client_number, aes_key &ESK,
     aes_key &client_ing_key, aes_key &client_stg_key)
     aes_key &client_ing_key, aes_key &client_stg_key)
 {
 {
     unsigned char zeroes[SGX_AESGCM_KEY_SIZE];
     unsigned char zeroes[SGX_AESGCM_KEY_SIZE];
@@ -301,7 +313,7 @@ int generateClientKeys(clientid_t client_number, aes_key &EMK,
     printf("\n");
     printf("\n");
     */
     */
 
 
-    if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, EMK,
+    if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, ESK,
             iv, SGX_AESGCM_IV_SIZE, client_ing_key, tag)) {
             iv, SGX_AESGCM_IV_SIZE, client_ing_key, tag)) {
         printf("generateClientKeys failed\n");
         printf("generateClientKeys failed\n");
         return -1;
         return -1;
@@ -310,7 +322,7 @@ int generateClientKeys(clientid_t client_number, aes_key &EMK,
     memset(iv, 0, SGX_AESGCM_IV_SIZE);
     memset(iv, 0, SGX_AESGCM_IV_SIZE);
     memcpy(iv, &client_number, sizeof(client_number));
     memcpy(iv, &client_number, sizeof(client_number));
     memcpy(iv +sizeof(client_number), "STG", sizeof("STG"));
     memcpy(iv +sizeof(client_number), "STG", sizeof("STG"));
-    if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, EMK,
+    if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, ESK,
             iv, SGX_AESGCM_IV_SIZE, client_stg_key, tag)) {
             iv, SGX_AESGCM_IV_SIZE, client_stg_key, tag)) {
         printf("generateClientKeys failed\n");
         printf("generateClientKeys failed\n");
         return -1;
         return -1;
@@ -330,20 +342,43 @@ int generateClientKeys(clientid_t client_number, aes_key &EMK,
 }
 }
 
 
 
 
-void Client::initClient(clientid_t cid, aes_key ikey, aes_key skey,
-    uint16_t num_storage_nodes, std::vector<uint16_t> &storage_map)
+void Client::initClient(clientid_t cid, uint16_t stg_id,
+    aes_key ikey, aes_key skey)
 {
 {
+    uint16_t num_storage_nodes = storage_nodes.size();
     sim_id = cid;
     sim_id = cid;
-    uint16_t stg_no = cid % num_storage_nodes;
-    uint16_t stg_id = storage_map[stg_no];
     id = stg_id << DEST_UID_BITS;
     id = stg_id << DEST_UID_BITS;
     id += (cid/num_storage_nodes);
     id += (cid/num_storage_nodes);
 
 
+    token_list = new token[config.m_priv_out];
     memcpy(ing_key, ikey, SGX_AESGCM_KEY_SIZE);
     memcpy(ing_key, ikey, SGX_AESGCM_KEY_SIZE);
     memcpy(stg_key, skey, SGX_AESGCM_KEY_SIZE);
     memcpy(stg_key, skey, SGX_AESGCM_KEY_SIZE);
 }
 }
 
 
-void Client::initializeSocket(boost::asio::io_context &ioc,
+void Client::initializeStgSocket(boost::asio::io_context &ioc,
+    NodeConfig &stg_server)
+{
+
+    boost::system::error_code err;
+    boost::asio::ip::tcp::resolver resolver(ioc);
+
+    while(1) {
+#ifdef VERBOSE_CLIENT
+        std::cerr << "Connecting to " << stg_server.name << "...\n";
+        std::cout << stg_server.slistenhost << ":" << stg_server.slistenport;
+#endif
+        storage_sock = new boost::asio::ip::tcp::socket(ioc);
+        boost::asio::connect(*storage_sock,
+            resolver.resolve(stg_server.slistenhost,
+                stg_server.slistenport), err);
+        if (!err) break;
+        std::cerr << "Connection to " << stg_server.name <<
+            " refused, will , epoch_noretry.\n";
+        sleep(1);
+    }
+}
+
+void Client::initializeIngSocket(boost::asio::io_context &ioc,
     NodeConfig &ing_server)
     NodeConfig &ing_server)
 {
 {
 
 
@@ -351,11 +386,10 @@ void Client::initializeSocket(boost::asio::io_context &ioc,
     boost::asio::ip::tcp::resolver resolver(ioc);
     boost::asio::ip::tcp::resolver resolver(ioc);
 
 
     while(1) {
     while(1) {
-#ifdef VERBOSE_NET
+#ifdef VERBOSE_CLIENT
         std::cerr << "Connecting to " << ing_server.name << "...\n";
         std::cerr << "Connecting to " << ing_server.name << "...\n";
         std::cout << ing_server.clistenhost << ":" << ing_server.clistenport;
         std::cout << ing_server.clistenhost << ":" << ing_server.clistenport;
 #endif
 #endif
-        // ingestion_sock needs io_context
         ingestion_sock = new boost::asio::ip::tcp::socket(ioc);
         ingestion_sock = new boost::asio::ip::tcp::socket(ioc);
         boost::asio::connect(*ingestion_sock,
         boost::asio::connect(*ingestion_sock,
             resolver.resolve(ing_server.clistenhost,
             resolver.resolve(ing_server.clistenhost,
@@ -392,6 +426,9 @@ void Client::generateMessageBundle(uint8_t priv_out, uint32_t msg_size,
         memset(ptr, 0, remaining_message_size);
         memset(ptr, 0, remaining_message_size);
         ptr+=(remaining_message_size);
         ptr+=(remaining_message_size);
     }
     }
+
+    // Add the tokens for this msgbundle
+    memcpy(ptr, token_list, config.m_priv_out * TOKEN_SIZE);
 }
 }
 
 
 
 
@@ -418,29 +455,44 @@ bool Client::encryptMessageBundle(uint32_t enc_bundle_size, unsigned char *pt_ms
     return 1;
     return 1;
 }
 }
 
 
-/*
 
 
-      Assumes pt_msgbundle is a buffer of size messageBundleSize(priv_out, msg_size)
-*/
-
-void Client::sendMessageBundle(uint16_t priv_out, uint16_t msg_size,
-    unsigned char *pt_msgbundle, unsigned char *enc_msgbundle)
+void Client::sendMessageBundle()
 {
 {
-    uint32_t enc_bundle_size = encMsgBundleSize(priv_out, msg_size);
 
 
-    generateMessageBundle(priv_out, msg_size, pt_msgbundle);
+    uint16_t priv_out = config.m_priv_out;
+    uint16_t msg_size = config.msg_size;
+    uint32_t send_pt_msgbundle_size = ptMsgBundleSize(priv_out, msg_size);
+    uint32_t send_enc_msgbundle_size = encMsgBundleSize(priv_out, msg_size);
+    unsigned char *send_pt_msgbundle = (unsigned char*) malloc (send_pt_msgbundle_size);
+    unsigned char *send_enc_msgbundle = (unsigned char*) malloc (send_enc_msgbundle_size);
+
+    generateMessageBundle(priv_out, msg_size, send_pt_msgbundle);
 
 
-    encryptMessageBundle(enc_bundle_size, pt_msgbundle, enc_msgbundle);
+    encryptMessageBundle(send_enc_msgbundle_size, send_pt_msgbundle, send_enc_msgbundle);
 
 
 #ifdef VERBOSE_CLIENT
 #ifdef VERBOSE_CLIENT
-    displayPtMessageBundle(pt_msgbundle, priv_out, msg_size);
+    displayPtMessageBundle(send_pt_msgbundle, priv_out, msg_size);
 #endif
 #endif
 
 
-    boost::asio::write(*ingestion_sock,
-        boost::asio::buffer(enc_msgbundle, enc_bundle_size));
+    boost::asio::async_write(*ingestion_sock,
+        boost::asio::buffer(send_enc_msgbundle, send_enc_msgbundle_size),
+        [this] (boost::system::error_code ecc, std::size_t) {
+
+        if (ecc) {
+            if(ecc == boost::asio::error::eof) {
+                delete(storage_sock);
+            }
+            else {
+                printf("Error %s\n", ecc.message().c_str());
+            }
+            printf("Client: boost async_write failed for sending message bundle\n");
+            return;
+        }
+
+    });
 }
 }
 
 
-int Client::sendAuthMessage(unsigned long epoch_no)
+int Client::sendIngAuthMessage(unsigned long epoch_no)
 {
 {
     uint32_t auth_size = sizeof(clientid_t) + sizeof(unsigned long) + SGX_AESGCM_KEY_SIZE;
     uint32_t auth_size = sizeof(clientid_t) + sizeof(unsigned long) + SGX_AESGCM_KEY_SIZE;
     unsigned char *auth_message = (unsigned char*) malloc(auth_size);
     unsigned char *auth_message = (unsigned char*) malloc(auth_size);
@@ -477,80 +529,258 @@ int Client::sendAuthMessage(unsigned long epoch_no)
     return 1;
     return 1;
 }
 }
 
 
-void generateClients(boost::asio::io_context &io_context,
-    uint32_t cstart, uint32_t cstop, Client* &clients,
-    aes_key &EMK, Config &config, std::vector<NodeConfig> &ingestion_nodes,
-    std::vector<NodeConfig> &storage_nodes, std::vector<uint16_t> &storage_map,
-    uint32_t num_clients_total, uint32_t clients_per_ing,
-    uint32_t ing_with_additional)
+int Client::sendStgAuthMessage(unsigned long epoch_no)
 {
 {
+    uint32_t auth_size = sizeof(clientid_t) + sizeof(unsigned long) + SGX_AESGCM_KEY_SIZE;
+    unsigned char *auth_message = (unsigned char*) malloc(auth_size);
+    unsigned char *am_ptr = auth_message;
+
+    memcpy(am_ptr, &sim_id, sizeof(sim_id));
+    am_ptr+=sizeof(sim_id);
+
+    memcpy(am_ptr, &epoch_no, sizeof(unsigned long));
+    am_ptr+=sizeof(unsigned long);
+
+    unsigned char zeroes[SGX_AESGCM_KEY_SIZE] = {0};
+    unsigned char tag[SGX_AESGCM_MAC_SIZE] = {0};
+    unsigned char epoch_iv[SGX_AESGCM_IV_SIZE] = {0};
+    memcpy(epoch_iv, &epoch_no, sizeof(epoch_no));
+
+    if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, stg_key,
+            epoch_iv, SGX_AESGCM_IV_SIZE, am_ptr, tag)) {
+        printf("generateClientKeys failed\n");
+        return -1;
+    }
+
+#ifdef VERBOSE_CLIENT
+    printf("Client %d auth_message: \n", id);
+    for(int i=0; i<auth_size; i++) {
+        printf("%x", auth_message[i]);
+    }
+    printf("\n");
+#endif
+
+    boost::asio::async_write(*storage_sock,
+        boost::asio::buffer(auth_message, auth_size),
+        [this] (boost::system::error_code ecc, std::size_t) {
+
+        if (ecc) {
+            if(ecc == boost::asio::error::eof) {
+                delete(storage_sock);
+            }
+            else {
+                printf("Error %s\n", ecc.message().c_str());
+            }
+            printf("Client::sendStgAuthMessage boost async_write failed\n");
+            return;
+        }
+
+    });
+
+    return 1;
+}
+
+void Client::setup_client(boost::asio::io_context &io_context,
+    uint32_t sim_id, uint16_t ing_node_id, uint16_t stg_node_id)
+{
+    // Setup the client's
+    // (i) client_id
+    // (ii) symmetric keys shared with their ingestion and storage server
+    // (iii) sockets to their ingestion and storage server
     aes_key client_ing_key;
     aes_key client_ing_key;
     aes_key client_stg_key;
     aes_key client_stg_key;
+    int ret = generateClientKeys(sim_id, ESK, client_ing_key, client_stg_key);
+
+    initClient(sim_id, stg_node_id, client_ing_key, client_stg_key);
+
+    initializeStgSocket(io_context, storage_nodes[stg_node_id]);
+
+    initializeIngSocket(io_context, ingestion_nodes[ing_node_id]);
+
+    // Authenticate clients to their ingestion and storage servers
+    struct timespec ep;
+    clock_gettime(CLOCK_REALTIME_COARSE, &ep);
+    unsigned long time_in_ns = ep.tv_sec * 1000000 + ep.tv_nsec/1000;
+    unsigned long epoch_no = CEILDIV(time_in_ns, EPOCH_INTERVAL);
+    sendStgAuthMessage(epoch_no);
+    sendIngAuthMessage(epoch_no);
+}
+
+void generateClients(boost::asio::io_context &io_context,
+    uint32_t cstart, uint32_t cstop)
+{
+    uint32_t num_clients_total = config.user_count;
     uint16_t num_stg_nodes = storage_nodes.size();
     uint16_t num_stg_nodes = storage_nodes.size();
-    uint16_t *stg_map = new uint16_t[num_stg_nodes];
+    uint16_t num_ing_nodes = ingestion_nodes.size();
+    uint32_t clients_per_ing = CEILDIV(num_clients_total, num_ing_nodes);
+    uint16_t ing_with_additional = num_clients_total % num_ing_nodes;
 
 
     for(uint32_t i=cstart; i<cstop; i++) {
     for(uint32_t i=cstart; i<cstop; i++) {
-        uint16_t ing_node_this_client = i/clients_per_ing;
-        if(ing_node_this_client > ing_with_additional && ing_with_additional!=0) {
+        uint16_t ing_no = i/clients_per_ing;
+        if(ing_no > ing_with_additional && ing_with_additional!=0) {
             uint16_t leftover = num_clients_total - (ing_with_additional * clients_per_ing);
             uint16_t leftover = num_clients_total - (ing_with_additional * clients_per_ing);
-            ing_node_this_client = ing_with_additional + (leftover / (clients_per_ing-1));
+            ing_no = ing_with_additional + (leftover / (clients_per_ing-1));
+        }
+
+        uint16_t stg_no = i % num_stg_nodes;
+        uint16_t stg_node_id = storage_map[stg_no];
+        uint16_t ing_node_id = ingestion_map[ing_no];
+        clients[i].setup_client(io_context, i, ing_node_id, stg_node_id);
+    }
+}
+
+/*
+
+Epochs are server driven.
+In a single epoch, each client waits to receive from their storage server
+(i) a token bundle for this epoch and (ii) their messages from the last epoch
+
+The client then sends their messages for this epoch to their ingestion servers
+using the tokens they received in this epoch
+
+*/
+
+void Client::epoch_process() {
+
+    uint32_t pt_token_size = (config.m_priv_out * SGX_AESGCM_KEY_SIZE);
+    uint32_t token_bundle_size = pt_token_size + SGX_AESGCM_IV_SIZE
+        + SGX_AESGCM_MAC_SIZE;
+    unsigned char *enc_tokens = (unsigned char*) malloc (token_bundle_size);
+
+    //Async read the encrypted tokens for this epoch
+    boost::asio::async_read(*storage_sock, boost::asio::buffer(enc_tokens, token_bundle_size),
+        [this, enc_tokens, token_bundle_size, pt_token_size]
+        (boost::system::error_code ec, std::size_t) {
+
+        if (ec) {
+            if(ec == boost::asio::error::eof) {
+                delete(storage_sock);
+            }
+            else {
+                printf("Error %s\n", ec.message().c_str());
+            }
+            printf("Client::epoch_process boost async_read_tokens failed\n");
+            return;
         }
         }
 
 
-        int ret = generateClientKeys(i, EMK, client_ing_key, client_stg_key);
+        /*
+        if(sim_id == 0) {
+            printf("TEST: Client 0: Encrypted token bundle received:\n");
+            for(uint32_t i = 0; i < token_bundle_size; i++) {
+                printf("%x", enc_tokens[i]);
+            }
+            printf("\n");
+        }
+        */
 
 
-        clients[i].initClient(i, client_ing_key, client_stg_key, num_stg_nodes, storage_map);
+        // Decrypt the token bundle
+        unsigned char *enc_tkn_ptr = enc_tokens + SGX_AESGCM_IV_SIZE;
+        unsigned char *enc_tkn_tag = enc_tokens + SGX_AESGCM_IV_SIZE + pt_token_size;
 
 
-        clients[i].initializeSocket(io_context, ingestion_nodes[ing_node_this_client]);
+        int decrypted_bytes =  gcm_decrypt(enc_tkn_ptr, pt_token_size,
+                NULL, 0, enc_tkn_tag, (unsigned char*) &(this->stg_key),
+                enc_tokens, SGX_AESGCM_IV_SIZE, (unsigned char*) (this->token_list));
+        if(decrypted_bytes != pt_token_size) {
+            printf("Client::epoch_process gcm_decrypt tokens failed \n");
+        }
 
 
+        unsigned char *tkn_ptr = (unsigned char*) this->token_list;
+        free(enc_tokens);
         /*
         /*
-        // Test that the keys generated match those generated within
-        // enclave config
-        unsigned char *ckey;
-        ckey = clients[i].getKey();
-        printf("Client %d, id = %d, key: ", i, clients[i].getid());
-        for(int j=0;j<SGX_AESGCM_KEY_SIZE;j++) {
-            printf("%x", ckey[j]);
+        if(sim_id==0) {
+            printf("TEST: Client 0: Decrypted client tokens:\n");
+            for(int i = 0; i < 2 * SGX_AESGCM_KEY_SIZE; i++) {
+                printf("%x", tkn_ptr[i]);
+            }
+            printf("\n");
         }
         }
-        printf("\n\n");
         */
         */
-    }
 
 
-    struct timespec ep;
-    clock_gettime(CLOCK_REALTIME_COARSE, &ep);
-    unsigned long ep_time = ep.tv_sec * 1000000 + ep.tv_nsec/1000;
-    unsigned long epoch_no = CEILDIV(ep_time, EPOCH_INTERVAL);
+        // Async read the messages recieved in the last epoch
+        uint16_t priv_in = config.m_priv_in;
+        uint16_t msg_size = config.msg_size;
+        uint32_t recv_pt_msgbundle_size = ptMsgBundleSize(priv_in, msg_size);
+        uint32_t recv_enc_msgbundle_size = encMsgBundleSize(priv_in, msg_size);
+        unsigned char *recv_pt_msgbundle = (unsigned char*) malloc (recv_pt_msgbundle_size);
+        unsigned char *recv_enc_msgbundle = (unsigned char*) malloc (recv_enc_msgbundle_size);
+
+        boost::asio::async_read(*storage_sock,
+            boost::asio::buffer(recv_enc_msgbundle, recv_enc_msgbundle_size),
+            [this, recv_pt_msgbundle, recv_enc_msgbundle]
+            (boost::system::error_code ecc, std::size_t) {
+
+            if (ecc) {
+                if(ecc == boost::asio::error::eof) {
+                    delete(storage_sock);
+                }
+                else {
+                    printf("Error %s\n", ecc.message().c_str());
+                }
+                printf("Client: boost async_read failed for recieving msg_bundle\n");
+                return;
+            }
+
+            // Do whatever processing with the received messages here
+            free(recv_enc_msgbundle);
+            free(recv_pt_msgbundle);
+
+            // Send this epoch's message bundle
+            sendMessageBundle();
+        });
 
 
+        epoch_process();
+    });
+
+}
+
+void client_epoch_process(uint32_t cstart, uint32_t cstop)
+{
     for(uint32_t i=cstart; i<cstop; i++) {
     for(uint32_t i=cstart; i<cstop; i++) {
-        clients[i].sendAuthMessage(epoch_no);
+        clients[i].epoch_process();
     }
     }
 }
 }
 
 
-
-void sendMessageBundles(uint32_t cstart, uint32_t cstop, Client* &clients,
-    Config &config)
+void initializeClients(boost::asio::io_context &io_context, uint16_t nthreads)
 {
 {
-    uint16_t priv_out = config.m_priv_out;
-    uint16_t msg_size = config.msg_size;
-    uint32_t pt_bundle_size = ptMsgBundleSize(priv_out, msg_size);
-    uint32_t enc_bundle_size = encMsgBundleSize(priv_out, msg_size);
-    unsigned char *pt_msgbundle = (unsigned char*) malloc (pt_bundle_size);
-    unsigned char *enc_msgbundle = (unsigned char*) malloc (enc_bundle_size);
+    std::vector<boost::thread> threads;
+    uint32_t num_clients_total = config.user_count;
+    size_t clients_per_thread = CEILDIV(num_clients_total, nthreads);
 
 
-    for(uint32_t i=cstart; i<cstop; i++) {
-        clients[i].sendMessageBundle(priv_out, msg_size, pt_msgbundle, enc_msgbundle);
+    // Generate all the clients for the experiment
+    for(int i=0; i<nthreads; i++) {
+        uint32_t cstart, cstop;
+        cstart = i * clients_per_thread;
+        cstop = (i==nthreads-1)? num_clients_total: (i+1) * clients_per_thread;
+
+#ifdef VERBOSE_CLIENT
+        printf("Thread %d, cstart = %d, cstop = %d\n", i, cstart, cstop);
+#endif
+
+        threads.emplace_back(boost::thread(generateClients,
+            boost::ref(io_context), cstart, cstop));
     }
     }
 
 
-    free(pt_msgbundle);
-    free(enc_msgbundle);
+    for(int i=0; i<nthreads; i++) {
+        threads[i].join();
+    }
 }
 }
-/*
-    Spin config.user_client actual clients. Each client:
-    1) Retrieve messages and tokens from their storage server
-    2) Send all their messages to the ingestion server
-    3) Wait for a predetermined EPOCH_DURATION time
-    4) Repeat from 1)
-*/
 
 
+void run_epochs(int nthreads) {
+    size_t num_clients_total = config.user_count;
+    size_t clients_per_thread = CEILDIV(num_clients_total, nthreads);
+    std::vector<boost::thread> threads;
+
+    for(int i=0; i<nthreads; i++) {
+        uint32_t cstart, cstop;
+        cstart = i * clients_per_thread;
+        cstop = (i==nthreads-1)? num_clients_total: (i+1) * clients_per_thread;
+        threads.emplace_back(boost::thread(client_epoch_process,
+            cstart, cstop));
+    }
+    for(int i=0; i<nthreads; i++) {
+        threads[i].join();
+    }
+}
 
 
 int main(int argc, char **argv)
 int main(int argc, char **argv)
 {
 {
@@ -559,8 +789,6 @@ int main(int argc, char **argv)
 
 
     uint16_t nthreads = 1;
     uint16_t nthreads = 1;
     const char *progname = argv[0];
     const char *progname = argv[0];
-    std::vector<NodeConfig> ingestion_nodes, storage_nodes;
-    std::vector<uint16_t> storage_map;
     ++argv;
     ++argv;
 
 
     // Parse options
     // Parse options
@@ -576,85 +804,41 @@ int main(int argc, char **argv)
         }
         }
     }
     }
 
 
-    printf("nthreads = %d\n", nthreads);
     // Read the config.json from the first line of stdin.  We have to do
     // Read the config.json from the first line of stdin.  We have to do
     // this before outputting anything to avoid potential deadlock with
     // this before outputting anything to avoid potential deadlock with
     // the launch program.
     // the launch program.
     std::string configstr;
     std::string configstr;
     std::getline(std::cin, configstr);
     std::getline(std::cin, configstr);
 
 
-    Config config;
-    aes_key EMK, TMK;
     boost::asio::io_context io_context;
     boost::asio::io_context io_context;
-    boost::asio::ip::tcp::resolver resolver(io_context);
 
 
     if (!config_parse(config, configstr, ingestion_nodes,
     if (!config_parse(config, configstr, ingestion_nodes,
         storage_nodes, storage_map)) {
         storage_nodes, storage_map)) {
        exit(1);
        exit(1);
     }
     }
 
 
-    Client *clients = new Client[config.user_count];
+    clients = new Client[config.user_count];
+#ifdef VERBOSE_CLIENT
     printf("Number of ingestion_nodes = %ld, Number of storage_node = %ld\n",
     printf("Number of ingestion_nodes = %ld, Number of storage_node = %ld\n",
         ingestion_nodes.size(), storage_nodes.size());
         ingestion_nodes.size(), storage_nodes.size());
+#endif
 
 
-    generateMasterKeys(config.master_secret, EMK, TMK);
-
-    uint32_t num_clients_total = config.user_count;
-    uint16_t num_ing_nodes = ingestion_nodes.size();
-    uint32_t clients_per_ing = CEILDIV(num_clients_total, num_ing_nodes);
-    uint32_t clients_per_thread = CEILDIV(num_clients_total, nthreads);
-    uint16_t ing_with_additional = num_clients_total % num_ing_nodes;
+    generateMasterKeys(config.master_secret, ESK, TSK);
 
 
-    std::thread threads[nthreads];
 
 
-    // Generate all the clients for the experiment
-    for(int i=0; i<nthreads; i++) {
-        uint32_t cstart, cstop;
-        cstart = i * clients_per_thread;
-        cstop = (i==nthreads-1)? num_clients_total: (i+1) * clients_per_thread;
-        printf("Thread %d, cstart = %d, cstop = %d\n", i, cstart, cstop);
-        threads[i] = std::thread(generateClients, std::ref(io_context),
-            cstart, cstop, std::ref(clients), std::ref(EMK), std::ref(config),
-            std::ref(ingestion_nodes), std::ref(storage_nodes),
-            std::ref(storage_map), num_clients_total,
-            clients_per_ing, ing_with_additional);
-    }
-
-    for(int i=0; i<nthreads; i++) {
-        threads[i].join();
-    }
+    // Queue up the actual work
+    boost::asio::post(io_context, [&]{
 
 
-    // Multithreaded client message bundle generation and send
-    uint32_t epoch = 1;
-    while(epoch <= 3) {
-        struct timespec tp;
-        clock_gettime(CLOCK_REALTIME_COARSE, &tp);
-        unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
+        initializeClients(io_context, nthreads);
 
 
-        for(int i=0; i<nthreads; i++) {
-            uint32_t cstart, cstop;
-            cstart = i * clients_per_thread;
-            cstop = (i==nthreads-1)? num_clients_total: (i+1) * clients_per_thread;
-            threads[i] = std::thread(sendMessageBundles, cstart, cstop,
-                std::ref(clients), std::ref(config));
-        }
-        for(int i=0; i<nthreads; i++) {
-            threads[i].join();
-        }
-
-        clock_gettime(CLOCK_REALTIME_COARSE, &tp);
-        unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
-        unsigned long time_diff = end - start;
+        run_epochs(nthreads);
+    });
 
 
-        // Sleep for the rest of the epoch interval
-        printf("Done with submissions for Epoch %d\n", epoch);
-        if (time_diff < EPOCH_INTERVAL) {
-            unsigned long time_to_sleep_in_us = (useconds_t) EPOCH_INTERVAL - (useconds_t) time_diff;
-            //printf("tts_us = %ld\n", time_to_sleep_in_us);
-            usleep(time_to_sleep_in_us);
-        }
-        epoch++;
-    }
+    // Start another thread; one will perform the work and the other
+    // will execute the async_write handlers
+    boost::thread t([&]{io_context.run();});
+    io_context.run();
+    t.join();
 
 
     delete [] clients;
     delete [] clients;
 }
 }

+ 23 - 11
Client/clients.hpp

@@ -45,32 +45,44 @@ private:
     // so they set and increment the IV
     // so they set and increment the IV
     unsigned char ing_iv[SGX_AESGCM_IV_SIZE] = {0};
     unsigned char ing_iv[SGX_AESGCM_IV_SIZE] = {0};
 
 
+    token *token_list;
+
     boost::asio::ip::tcp::socket *ingestion_sock = NULL;
     boost::asio::ip::tcp::socket *ingestion_sock = NULL;
+    boost::asio::ip::tcp::socket *storage_sock = NULL;
+
 
 
     void generateAuthenticationMessage();
     void generateAuthenticationMessage();
 
 
+    int sendIngAuthMessage(unsigned long epoch_no);
+    int sendStgAuthMessage(unsigned long epoch_no);
+
     void generateMessageBundle(uint8_t priv_out, uint32_t msg_size,
     void generateMessageBundle(uint8_t priv_out, uint32_t msg_size,
         unsigned char *pt_msgbundle);
         unsigned char *pt_msgbundle);
 
 
     bool encryptMessageBundle(uint32_t bundle_size, unsigned char *pt_msgbundle,
     bool encryptMessageBundle(uint32_t bundle_size, unsigned char *pt_msgbundle,
         unsigned char* enc_msgbundle);
         unsigned char* enc_msgbundle);
 
 
-public:
+    void sendMessageBundle();
 
 
-    Client () {}
+    void initializeIngSocket(boost::asio::io_context &ioc, NodeConfig &ing_server);
+    void initializeStgSocket(boost::asio::io_context &ioc, NodeConfig &ing_server);
 
 
-    void initClient(clientid_t cid, aes_key ikey, aes_key skey,
-        uint16_t num_storage_nodes, std::vector<uint16_t> &storage_map);
+    void initClient(clientid_t cid, uint16_t stg_id, aes_key ikey, aes_key skey);
 
 
-    bool socketReady(){
-        return(ingestion_sock!=NULL);
-    }
 
 
-    void initializeSocket(boost::asio::io_context &ioc, NodeConfig &ing_server);
+public:
+
+    Client() {}
+
+    ~Client() {
+        free(token_list);
+        delete(ingestion_sock);
+        delete(storage_sock);
+    }
 
 
-    int sendAuthMessage(unsigned long epoch_no);
+    void setup_client(boost::asio::io_context &ioc, uint32_t sim_id,
+        uint16_t ing_node_id, uint16_t stg_node_id);
 
 
-    void sendMessageBundle(uint16_t priv_out, uint16_t msg_size,
-         unsigned char *pt_msgbundle, unsigned char *enc_msgbundle);
+    void epoch_process();
 
 
 };
 };