Browse Source

Multithreading the client simulator

Sajin Sasy 1 year ago
parent
commit
5b359918c2
1 changed files with 90 additions and 43 deletions
  1. 90 43
      Client/clients.cpp

+ 90 - 43
Client/clients.cpp

@@ -1,4 +1,5 @@
 #include <iostream>
+#include <functional>
 #include "../App/appconfig.hpp"
 
 // The next line suppresses a deprecation warning within boost
@@ -6,7 +7,7 @@
 #include "boost/property_tree/ptree.hpp"
 #include "boost/property_tree/json_parser.hpp"
 #include <boost/asio.hpp>
-#include <boost/thread.hpp>
+#include <thread>
 #include "gcm.h"
 #include "sgx_tcrypto.h"
 #include "clients.hpp"
@@ -191,9 +192,10 @@ bool config_parse(Config &config, const std::string configstr,
                         ret = false;
                     }
                 }
-                if(nc.roles == ROLE_INGESTION) {
+                if(nc.roles & ROLE_INGESTION) {
                     ingestion_nodes.push_back(std::move(nc));
-                } else if(nc.roles == ROLE_STORAGE) {
+                }
+                if(nc.roles & ROLE_STORAGE) {
                     storage_nodes.push_back(std::move(nc));
                 }
             }
@@ -415,6 +417,60 @@ void Client::sendMessageBundle(uint16_t priv_out, uint16_t msg_size,
 }
 
 
+void generateClients(boost::asio::io_context &io_context,
+    uint32_t cstart, uint32_t cstop, Client* &clients,
+    aes_key &EMK, Config &config, std::vector<NodeConfig> &ingestion_nodes,
+    uint32_t num_clients_total, uint32_t clients_per_ing,
+    uint32_t ing_with_additional)
+{
+    aes_key client_key;
+
+    for(uint32_t i=cstart; i<cstop; i++) {
+        uint16_t ing_node_this_client = i/clients_per_ing;
+        if(ing_node_this_client > ing_with_additional && ing_with_additional!=0) {
+            uint16_t leftover = num_clients_total - (ing_with_additional * clients_per_ing);
+            ing_node_this_client = ing_with_additional + (leftover / (clients_per_ing-1));
+        }
+
+        int ret = generateClientEncryptionKey(i, EMK, client_key);
+        clients[i].initClient(i, client_key);
+
+        clients[i].initializeSocket(io_context, ingestion_nodes[ing_node_this_client]);
+        //clients[i].sendAuthMessage();
+
+        /*
+        // Test that the keys generated match those generated within
+        // enclave config
+        unsigned char *ckey;
+        ckey = clients[i].getKey();
+        printf("Client %d, id = %d, key: ", i, clients[i].getid());
+        for(int j=0;j<SGX_AESGCM_KEY_SIZE;j++) {
+            printf("%x", ckey[j]);
+        }
+        printf("\n\n");
+        */
+    }
+}
+
+
+void sendMessageBundles(uint32_t cstart, uint32_t cstop, Client* &clients,
+    Config &config)
+{
+    uint16_t priv_out = config.m_priv_out;
+    uint16_t msg_size = config.msg_size;
+    uint32_t pt_bundle_size = ptMsgBundleSize(priv_out, msg_size);
+    uint32_t enc_bundle_size = encMsgBundleSize(priv_out, msg_size);
+    unsigned char *pt_msgbundle = (unsigned char*) malloc (pt_bundle_size);
+    unsigned char *enc_msgbundle = (unsigned char*) malloc (enc_bundle_size);
+
+    for(uint32_t i=cstart; i<cstop; i++) {
+        clients[i].sendMessageBundle(priv_out, msg_size, pt_msgbundle, enc_msgbundle);
+        sleep(2);
+    }
+
+    free(pt_msgbundle);
+    free(enc_msgbundle);
+}
 /*
     Spin config.user_client actual clients. Each client:
     1) Retrieve messages and tokens from their storage server
@@ -454,7 +510,7 @@ int main(int argc, char **argv)
     std::getline(std::cin, configstr);
 
     Config config;
-    aes_key EMK, TMK, client_key;
+    aes_key EMK, TMK;
     boost::asio::io_context io_context;
     boost::asio::ip::tcp::resolver resolver(io_context);
 
@@ -471,51 +527,42 @@ int main(int argc, char **argv)
     uint32_t num_clients_total = config.user_count;
     uint16_t num_ing_nodes = ingestion_nodes.size();
     uint32_t clients_per_ing = CEILDIV(num_clients_total, num_ing_nodes);
+    uint32_t clients_per_thread = CEILDIV(num_clients_total, nthreads);
     uint16_t ing_with_additional = num_clients_total % num_ing_nodes;
-    uint16_t priv_out = config.m_priv_out;
-    uint16_t msg_size = config.msg_size;
 
-    uint32_t pt_bundle_size = ptMsgBundleSize(priv_out, msg_size);
-    uint32_t enc_bundle_size = encMsgBundleSize(priv_out, msg_size);
-    unsigned char *pt_msgbundle = (unsigned char*) malloc (pt_bundle_size);
-    unsigned char *enc_msgbundle = (unsigned char*) malloc (enc_bundle_size);
+    std::thread threads[nthreads];
 
-    uint64_t epoch = 1;
-    while(epoch<2) {
-
-        for(uint32_t i=0; i<num_clients_total; i++) {
-            if(epoch==1) {
-                uint16_t ing_node_this_client = i/clients_per_ing;
-                if(ing_node_this_client > ing_with_additional && ing_with_additional!=0) {
-                    uint16_t leftover = num_clients_total - (ing_with_additional * clients_per_ing);
-                    ing_node_this_client = ing_with_additional + (leftover / (clients_per_ing-1));
-                }
+    // Generate all the clients for the experiment
 
-                int ret = generateClientEncryptionKey(i, EMK, client_key);
-                clients[i].initClient(i, client_key);
-
-                clients[i].initializeSocket(io_context, ingestion_nodes[ing_node_this_client]);
-                //clients[i].sendAuthMessage();
-
-                /*
-                // Test that the keys generated match those generated within
-                // enclave config
-                unsigned char *ckey;
-                ckey = clients[i].getKey();
-                printf("Client %d, id = %d, key: ", i, clients[i].getid());
-                for(int j=0;j<SGX_AESGCM_KEY_SIZE;j++) {
-                    printf("%x", ckey[j]);
-                }
-                printf("\n\n");
-                */
-            }
+    for(int i=0; i<nthreads; i++) {
+        uint32_t cstart, cstop;
+        cstart = i * clients_per_thread;
+        cstop = (i==nthreads-1)? num_clients_total: (i+1) * clients_per_thread;
+        printf("Thread %d, cstart = %d, cstop = %d\n", i, cstart, cstop);
+        threads[i] = std::thread(generateClients, std::ref(io_context),
+            cstart, cstop, std::ref(clients), std::ref(EMK), std::ref(config),
+            std::ref(ingestion_nodes), num_clients_total,
+            clients_per_ing, ing_with_additional);
+    }
 
-            clients[i].sendMessageBundle(priv_out, msg_size, pt_msgbundle, enc_msgbundle);
-        }
-        epoch++;
-        sleep(1);
+    for(int i=0; i<nthreads; i++) {
+        threads[i].join();
     }
 
-    free(pt_msgbundle);
+    // Multithreaded client message bundle generation and send
+    //uint32_t epoch = 0;
+    //while(epoch < 3) {
+        for(int i=0; i<nthreads; i++) {
+            uint32_t cstart, cstop;
+            cstart = i * clients_per_thread;
+            cstop = (i==nthreads-1)? num_clients_total: (i+1) * clients_per_thread;
+            threads[i] = std::thread(sendMessageBundles, cstart, cstop,
+                std::ref(clients), std::ref(config));
+        }
+        for(int i=0; i<nthreads; i++) {
+            threads[i].join();
+        }
+        //epoch++;
+    //}
     delete [] clients;
 }