Browse Source

Synchronize client simulator and ingestion servers

Sajin Sasy 1 year ago
parent
commit
5c075a96a4
4 changed files with 33 additions and 35 deletions
  1. 10 10
      App/start.cpp
  2. 5 2
      Client/clientlaunch
  3. 16 23
      Client/clients.cpp
  4. 2 0
      Enclave/enclave_api.h

+ 10 - 10
App/start.cpp

@@ -139,10 +139,9 @@ static void epoch_clients(NetIO &netio) {
 
 static void route_clients_test(NetIO &netio)
 {
-
     // Precompute some WaksmanNetworks
     const Config &config = netio.config();
-    size_t num_sizes = ecall_precompute_sort(-1);
+    size_t num_sizes = ecall_precompute_sort(-2);
     for (int i=0;i<int(num_sizes);++i) {
         std::vector<boost::thread> ts;
         for (int j=0; j<config.nthreads; ++j) {
@@ -155,13 +154,12 @@ static void route_clients_test(NetIO &netio)
         }
     }
 
-    // The epoch interval, in microseconds
-    uint32_t epoch_interval_us = 1000000;
-    printf("Waiting on client's first messages\n");
-    sleep(3);
-
     // Run epoch
-    for (int i=0; i<1; ++i) {
+    for (int i=0; i<3; ++i) {
+        if(i==0) {
+            usleep(EPOCH_INTERVAL);
+        }
+
         struct timespec tp;
         clock_gettime(CLOCK_REALTIME_COARSE, &tp);
         unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
@@ -171,10 +169,12 @@ static void route_clients_test(NetIO &netio)
         clock_gettime(CLOCK_REALTIME_COARSE, &tp);
         unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
         unsigned long diff = end - start;
+
         printf("Epoch time: %lu.%06lu s\n", diff/1000000, diff%1000000);
+
         // Sleep for the rest of the epoch interval
-        if (diff < epoch_interval_us) {
-            usleep(epoch_interval_us - (useconds_t)diff);
+        if (diff < EPOCH_INTERVAL) {
+            usleep(EPOCH_INTERVAL - (useconds_t) diff);
         }
     }
     netio.close();

+ 5 - 2
Client/clientlaunch

@@ -17,12 +17,15 @@ MANIFEST = "./../App/manifest.yaml"
 # The default pubkeys file
 PUBKEYS = "./../App/pubkeys.yaml"
 
-#The client binary
+# The client binary
 CLIENTS = "./clients"
 
+# Client thread allocation
+prefix = "numactl -C24-31 "
+
 def launch(config, cmd, threads):
     cmdline = ''
-    cmdline += CLIENTS + " -t " + str(threads) + ""
+    cmdline += prefix + CLIENTS + " -t " + str(threads) + ""
     proc = subprocess.Popen(shlex.split(cmdline) + cmd,
         stdin=subprocess.PIPE, stdout=subprocess.PIPE,
         stderr=subprocess.STDOUT, bufsize=0)

+ 16 - 23
Client/clients.cpp

@@ -497,12 +497,8 @@ void generateClients(boost::asio::io_context &io_context,
 
 
 void sendMessageBundles(uint32_t cstart, uint32_t cstop, Client* &clients,
-    Config &config, unsigned long &time_diff)
+    Config &config)
 {
-
-    struct timespec tp;
-    clock_gettime(CLOCK_REALTIME_COARSE, &tp);
-    unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
     uint16_t priv_out = config.m_priv_out;
     uint16_t msg_size = config.msg_size;
     uint32_t pt_bundle_size = ptMsgBundleSize(priv_out, msg_size);
@@ -516,10 +512,6 @@ void sendMessageBundles(uint32_t cstart, uint32_t cstop, Client* &clients,
 
     free(pt_msgbundle);
     free(enc_msgbundle);
-
-    clock_gettime(CLOCK_REALTIME_COARSE, &tp);
-    unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
-    time_diff = end - start;
 }
 /*
     Spin config.user_client actual clients. Each client:
@@ -561,9 +553,6 @@ int main(int argc, char **argv)
     std::string configstr;
     std::getline(std::cin, configstr);
 
-    // The epoch interval, in microseconds
-    uint32_t epoch_interval_us = 1000000;
-
     Config config;
     aes_key EMK, TMK;
     boost::asio::io_context io_context;
@@ -607,32 +596,36 @@ int main(int argc, char **argv)
     }
 
     // Multithreaded client message bundle generation and send
-    uint32_t epoch = 0;
-    while(epoch < 3) {
-        unsigned long thread_diff[nthreads] = {0};
-        unsigned long diff = 0;
+    uint32_t epoch = 1;
+    while(epoch <= 3) {
+        struct timespec tp;
+        clock_gettime(CLOCK_REALTIME_COARSE, &tp);
+        unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
 
         for(int i=0; i<nthreads; i++) {
             uint32_t cstart, cstop;
             cstart = i * clients_per_thread;
             cstop = (i==nthreads-1)? num_clients_total: (i+1) * clients_per_thread;
             threads[i] = std::thread(sendMessageBundles, cstart, cstop,
-                std::ref(clients), std::ref(config), std::ref(thread_diff[i]));
+                std::ref(clients), std::ref(config));
         }
         for(int i=0; i<nthreads; i++) {
             threads[i].join();
-            diff+=thread_diff[i];
         }
 
+        clock_gettime(CLOCK_REALTIME_COARSE, &tp);
+        unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
+        unsigned long time_diff = end - start;
+
         // Sleep for the rest of the epoch interval
-        printf("Done with submissions for 1 epoch \n");
-        if (diff < epoch_interval_us) {
-            printf("diff = %ld\n", diff);
-            usleep(epoch_interval_us - (useconds_t)diff);
+        printf("Done with submissions for Epoch %d\n", epoch);
+        if (time_diff < EPOCH_INTERVAL) {
+            unsigned long time_to_sleep_in_us = (useconds_t) EPOCH_INTERVAL - (useconds_t) time_diff;
+            //printf("tts_us = %ld\n", time_to_sleep_in_us);
+            usleep(time_to_sleep_in_us);
         }
         epoch++;
     }
 
-    sleep(10000);
     delete [] clients;
 }

+ 2 - 0
Enclave/enclave_api.h

@@ -38,4 +38,6 @@ struct EnclaveAPINodeConfig {
 #define DEST_STORAGE_NODE_BITS 10
 #define DEST_UID_BITS 22
 
+// Epoch timing configuration (in us)
+#define EPOCH_INTERVAL 2000000
 #endif