Przeglądaj źródła

Be able to run multiple epochs

Ian Goldberg 1 rok temu
rodzic
commit
57c1d8a441
2 zmienionych plików z 98 dodań i 35 usunięć
  1. 96 33
      App/start.cpp
  2. 2 2
      Enclave/route.cpp

+ 96 - 33
App/start.cpp

@@ -1,31 +1,56 @@
+#include <condition_variable>
+#include <mutex>
 #include <stdlib.h>
 
 #include "Untrusted.hpp"
 #include "start.hpp"
 
-static void route_test(NetIO &netio, char **args)
-{
-    // Count the number of arguments
-    size_t nargs = 0;
-    while (args[nargs]) {
-        ++nargs;
+class Epoch {
+    boost::asio::io_context &io_context;
+    uint32_t epoch_num;
+    std::mutex m;
+    std::condition_variable cv;
+    bool epoch_complete;
+
+    void round_cb(uint32_t round_num) {
+        if (round_num) {
+            printf("Round %u complete\n", round_num);
+            boost::asio::post(io_context, [this]{
+                proceed();
+            });
+        } else {
+            printf("Epoch %u complete\n", epoch_num);
+            {
+                std::lock_guard lk(m);
+                epoch_complete = true;
+            }
+            cv.notify_one();
+        }
     }
 
-    uint16_t num_nodes = netio.num_nodes;
-    size_t sq_nodes = num_nodes;
-    sq_nodes *= sq_nodes;
+public:
+    Epoch(boost::asio::io_context &context, uint32_t ep_num):
+        io_context(context), epoch_num(ep_num),
+        epoch_complete(false) {}
 
-    if (nargs != sq_nodes) {
-        printf("Expecting %lu arguments, found %lu\n", sq_nodes, nargs);
-        return;
+    void proceed() {
+        ecall_routing_proceed([this](uint32_t round_num) {
+            round_cb(round_num);
+        });
     }
 
-    // The arguments are num_nodes sets of num_nodes values.  The jth
-    // value in the ith set is the number of private routing tokens
-    // ingestion node i holds for storage node j.
+    void wait() {
+        std::unique_lock lk(m);
+        cv.wait(lk, [this]{ return epoch_complete; });
+    }
 
-    // We are node i = netio.me, so ignore the other sets of values.
+};
+
+static void epoch(NetIO &netio, char **args) {
 
+    static uint32_t epoch_num = 1;
+
+    uint16_t num_nodes = netio.num_nodes;
     uint32_t num_tokens[num_nodes];
     uint32_t tot_tokens = 0;
     for (nodenum_t j=0;j<num_nodes;++j) {
@@ -71,7 +96,51 @@ static void route_test(NetIO &netio, char **args)
     }
     */
 
+    if (!ecall_ingest_raw(msgs, tot_tokens)) {
+        printf("Ingestion failed\n");
+        return;
+    }
+
+    Epoch epoch(netio.io_context(), epoch_num);
+    epoch.proceed();
+    epoch.wait();
+    // Launch threads to refill the precomputed Waksman networks we
+    // used, but just let them run in the background.
+    size_t num_sizes = ecall_precompute_sort(-1);
+    for (int i=0;i<int(num_sizes);++i) {
+        boost::thread t([i] {
+            ecall_precompute_sort(i);
+        });
+        t.detach();
+    }
+    ++epoch_num;
+}
+
+static void route_test(NetIO &netio, char **args)
+{
+    // Count the number of arguments
+    size_t nargs = 0;
+    while (args[nargs]) {
+        ++nargs;
+    }
+
+    uint16_t num_nodes = netio.num_nodes;
+    size_t sq_nodes = num_nodes;
+    sq_nodes *= sq_nodes;
+
+    if (nargs != sq_nodes) {
+        printf("Expecting %lu arguments, found %lu\n", sq_nodes, nargs);
+        return;
+    }
+
+    // The arguments are num_nodes sets of num_nodes values.  The jth
+    // value in the ith set is the number of private routing tokens
+    // ingestion node i holds for storage node j.
+
+    // We are node i = netio.me, so ignore the other sets of values.
+
     // Precompute some WaksmanNetworks
+    const Config &config = netio.config();
     size_t num_sizes = ecall_precompute_sort(-1);
     for (int i=0;i<int(num_sizes);++i) {
         std::vector<boost::thread> ts;
@@ -85,24 +154,18 @@ static void route_test(NetIO &netio, char **args)
         }
     }
 
-    if (!ecall_ingest_raw(msgs, tot_tokens)) {
-        printf("Ingestion failed\n");
-        return;
+    // Run 10 epochs
+    for (int i=0; i<10; ++i) {
+        struct timespec tp;
+        clock_gettime(CLOCK_REALTIME_COARSE, &tp);
+        unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
+        epoch(netio, args);
+        clock_gettime(CLOCK_REALTIME_COARSE, &tp);
+        unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
+        unsigned long diff = end - start;
+        printf("Epoch time: %lu.%06lu s\n", diff/1000000, diff%1000000);
     }
-
-    ecall_routing_proceed([&](uint32_t round_num) {
-        printf("Round %u complete\n", round_num);
-        boost::asio::post(netio.io_context(), [&]{
-            ecall_routing_proceed([&](uint32_t round_num2) {
-                printf("Round %u complete\n", round_num2);
-                boost::asio::post(netio.io_context(), []{
-                    ecall_routing_proceed([](uint32_t round_num3) {
-                        printf("Round %u complete\n", round_num3);
-                    });
-                });
-            });
-        });
-    });
+    netio.close();
 }
 
 // Once all the networking is set up, start doing whatever we were asked

+ 2 - 2
Enclave/route.cpp

@@ -734,11 +734,11 @@ void ecall_routing_proceed(void *cbpointer)
 
             // We're done
             route_state.step = ROUTE_NOT_STARTED;
-            ocall_routing_round_complete(cbpointer, 3);
+            ocall_routing_round_complete(cbpointer, 0);
         } else {
             // We're done
             route_state.step = ROUTE_NOT_STARTED;
-            ocall_routing_round_complete(cbpointer, 3);
+            ocall_routing_round_complete(cbpointer, 0);
         }
     }
 }