| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383 | 
							- #include <condition_variable>
 
- #include <mutex>
 
- #include <stdlib.h>
 
- #include "Untrusted.hpp"
 
- #include "start.hpp"
 
- // Default 4 epochs
 
- int num_epochs = 4;
 
- // Default epoch_wait_time of 5 seconds
 
- int epoch_wait_time = 5;
 
- // Default of 12 Waksman Networks (3 per private_route for 4 epochs)
 
- int num_WN_to_precompute = 12;
 
- // We'll always run the WN precomputation in the foreground
 
- // TODO: Later fix this to a command line param
 
- static const bool FOREGROUND_PRECOMPUTE = true;
 
- #define CEILDIV(x,y) (((x)+(y)-1)/(y))
 
- // #define DEBUG_PUB_TIMES
 
- class Epoch {
 
-     NetIO &netio;
 
-     uint32_t epoch_num;
 
-     std::mutex m;
 
-     std::condition_variable cv;
 
-     bool epoch_complete;
 
- #ifdef DEBUG_PUB_TIMES
 
-     unsigned long lt = 0;
 
- #endif
 
-     void round_cb(uint32_t round_num) {
 
-         struct timeval now;
 
-         gettimeofday(&now, NULL);
 
-         if (round_num) {
 
-             printf("%lu.%06lu: Round %u complete\n", now.tv_sec,
 
-                 now.tv_usec, round_num);
 
- #ifdef DEBUG_PUB_TIMES
 
-             struct timespec tp;
 
-             clock_gettime(CLOCK_REALTIME_COARSE, &tp);
 
-             unsigned long time = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
 
-             if(lt == 0) {
 
-                 printf("Time now = %lu\n", time);
 
-                 lt = time;
 
-             } else {
 
-                 printf("Time since last round_cb = %lu.%lu\n", (time-lt)/1000000, (time-lt)%1000000);
 
-             }
 
- #endif
 
-             boost::asio::post(netio.io_context(), [this]{
 
-                 proceed();
 
-             });
 
-         } else {
 
-             printf("%lu.%06lu: Epoch %u complete\n", now.tv_sec,
 
-                 now.tv_usec, epoch_num);
 
-             {
 
-                 std::lock_guard lk(m);
 
-                 epoch_complete = true;
 
-             }
 
- #ifdef DEBUG_PUB_TIMES
 
-                 struct timespec tp;
 
-                 clock_gettime(CLOCK_REALTIME_COARSE, &tp);
 
-                 unsigned long time = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
 
-                 printf("Epoch end. Time since last round_cb = %lu.%lu\n", (time-lt)/1000000, (time-lt)%1000000);
 
-                 lt = 0;
 
- #endif
 
-             const NodeConfig &my_conf = netio.myconfig();
 
-             if(my_conf.roles & ROLE_STORAGE) {
 
-                 boost::asio::post(netio.io_context(), [this]{
 
-                     netio.send_client_mailbox();
 
-                 });
 
-             }
 
-             cv.notify_one();
 
-         }
 
-     }
 
- public:
 
-     Epoch(NetIO &netio_obj, uint32_t ep_num):
 
-         netio(netio_obj), epoch_num(ep_num),
 
-         epoch_complete(false) {}
 
-     void proceed() {
 
-         ecall_routing_proceed([this](uint32_t round_num) {
 
-             round_cb(round_num);
 
-         });
 
-     }
 
-     void wait() {
 
-         std::unique_lock lk(m);
 
-         cv.wait(lk, [this]{ return epoch_complete; });
 
-     }
 
- };
 
- static void epoch(NetIO &netio, char **args) {
 
-     static uint32_t epoch_num = 1;
 
-     uint16_t num_nodes = netio.num_nodes;
 
-     uint32_t num_tokens[num_nodes];
 
-     uint32_t tot_tokens = 0;
 
-     for (nodenum_t j=0;j<num_nodes;++j) {
 
-         num_tokens[j] = atoi(args[netio.me*num_nodes+j]);
 
-         tot_tokens += num_tokens[j];
 
-     }
 
-     const Config &config = netio.config();
 
-     uint16_t msg_size = config.msg_size;
 
-     nodenum_t my_node_num = config.my_node_num;
 
-     uint8_t my_roles = config.nodes[my_node_num].roles;
 
-     if (my_roles & ROLE_INGESTION) {
 
-         uint8_t *msgs = new uint8_t[tot_tokens * msg_size];
 
-         uint8_t *nextmsg = msgs;
 
-         uint32_t dest_uid_mask = (1 << DEST_UID_BITS) - 1;
 
-         uint32_t rem_tokens = tot_tokens;
 
-         while (rem_tokens > 0) {
 
-             // Pick a random remaining token
 
-             uint32_t r = uint32_t(lrand48()) % rem_tokens;
 
-             for (nodenum_t j=0;j<num_nodes;++j) {
 
-                 if (r < num_tokens[j]) {
 
-                     // Use a token from node j
 
-                     *((uint32_t*)nextmsg) =
 
-                         (j << DEST_UID_BITS) +
 
-                             ((rem_tokens-1) & dest_uid_mask);
 
-                     // Put a bunch of copies of r as the message body
 
-                     for (uint16_t i=1;i<msg_size/4;++i) {
 
-                         ((uint32_t*)nextmsg)[i] = r;
 
-                     }
 
-                     num_tokens[j] -= 1;
 
-                     rem_tokens -= 1;
 
-                     nextmsg += msg_size;
 
-                 } else {
 
-                     r -= num_tokens[j];
 
-                 }
 
-             }
 
-         }
 
-         /*
 
-         for (uint32_t i=0;i<tot_tokens;++i) {
 
-             for(uint16_t j=0;j<msg_size/4;++j) {
 
-                 printf("%08x ", ((uint32_t*)msgs)[i*msg_size/4+j]);
 
-             }
 
-             printf("\n");
 
-         }
 
-         */
 
-         if (!ecall_ingest_raw(msgs, tot_tokens)) {
 
-             printf("Ingestion failed\n");
 
-             return;
 
-         }
 
-     }
 
-     Epoch epoch(netio, epoch_num);
 
-     epoch.proceed();
 
-     epoch.wait();
 
-     // Launch threads to refill the precomputed Waksman networks we
 
-     // used, but just let them run in the background.
 
-     size_t num_sizes = ecall_precompute_sort(-1);
 
-     for (int i=0;i<int(num_sizes);++i) {
 
-         boost::thread t([i] {
 
-             ecall_precompute_sort(i);
 
-         });
 
-         t.detach();
 
-     }
 
-     ++epoch_num;
 
- }
 
- static unsigned long epoch_clients(NetIO &netio) {
 
-     static uint32_t epoch_num = 1;
 
-     Epoch epoch(netio, epoch_num);
 
-     epoch.proceed();
 
-     epoch.wait();
 
-     struct timespec tp;
 
-     clock_gettime(CLOCK_REALTIME_COARSE, &tp);
 
-     unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
 
-     printf("Epoch end time = %lu\n", end);
 
-     if (FOREGROUND_PRECOMPUTE) {
 
-         struct timeval now;
 
-         gettimeofday(&now, NULL);
 
-         printf("%lu.%06lu: Begin Waksman networks precompute\n",
 
-             now.tv_sec, now.tv_usec);
 
-     }
 
-     // Launch threads to refill the precomputed Waksman networks we used.
 
-     size_t num_sizes = ecall_precompute_sort(-1);
 
-     std::vector<boost::thread> ts;
 
-     for (int i=0;i<int(num_sizes);++i) {
 
-         if(!FOREGROUND_PRECOMPUTE) {
 
-             boost::thread t([i] {
 
-                 ecall_precompute_sort(i);
 
-             });
 
-             t.detach();
 
-         }
 
-         else {
 
-             ts.emplace_back([i] {
 
-                 ecall_precompute_sort(i);
 
-             });
 
-         }
 
-     }
 
-     if(FOREGROUND_PRECOMPUTE) {
 
-         for (auto& t: ts) {
 
-             t.join();
 
-         }
 
-         struct timeval now;
 
-         gettimeofday(&now, NULL);
 
-         printf("%lu.%06lu: End Waksman networks precompute\n",
 
-             now.tv_sec, now.tv_usec);
 
-     }
 
-     ++epoch_num;
 
-     return end;
 
- }
 
- static void route_clients_test(NetIO &netio)
 
- {
 
-     // Default epoch_interval is 5 sec
 
-     unsigned long epoch_interval = epoch_wait_time * 1000000;
 
-     printf("Epoch wait time = %d\n", epoch_wait_time);
 
-     struct timeval exp_start;
 
-     gettimeofday(&exp_start, NULL);
 
-     // Precompute some WaksmanNetworks
 
-     size_t num_sizes = ecall_precompute_sort(-2);
 
-     // Setting num_WN_per_size for background computation mode
 
-     printf("Precompute num_sizes = %ld\n", num_sizes);
 
-     int num_WN_per_size = int(CEILDIV(num_WN_to_precompute, num_sizes));
 
-     if(num_WN_per_size <2) {
 
-         num_WN_per_size = 2;
 
-     }
 
-     printf("%lu.%06lu: Begin Waksman networks precompute\n",
 
-         exp_start.tv_sec, exp_start.tv_usec);
 
-     std::vector<boost::thread> ts;
 
-     for (int i=0;i<int(num_sizes);++i) {
 
-         for (int j=0; j<num_WN_per_size; ++j) {
 
-             ts.emplace_back([i] {
 
-                 ecall_precompute_sort(i);
 
-             });
 
-         }
 
-     }
 
-     for (auto& t: ts) {
 
-         t.join();
 
-     }
 
-     struct timeval exp_now;
 
-     gettimeofday(&exp_now, NULL);
 
-     printf("%lu.%06lu: End Waksman networks precompute\n",
 
-         exp_now.tv_sec, exp_now.tv_usec);
 
-     // Sleep one epoch_interval for clients to connect
 
-     long remaining_us =
 
-         epoch_interval
 
-         - (exp_now.tv_sec - exp_start.tv_sec) * 1000000
 
-         - (exp_now.tv_usec - exp_start.tv_usec);
 
-     if (remaining_us > 0) {
 
-         usleep((useconds_t) remaining_us);
 
-     }
 
-     // Run epoch
 
-     for (int i=1; i<=num_epochs; ++i) {
 
-         struct timeval epoch_start;
 
-         gettimeofday(&epoch_start, NULL);
 
-         printf("%ld.%06ld: Epoch %d start\n", epoch_start.tv_sec,
 
-             epoch_start.tv_usec, i);
 
-         struct timespec tp;
 
-         clock_gettime(CLOCK_REALTIME_COARSE, &tp);
 
-         unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
 
-         printf("Epoch start time = %lu\n", start);
 
-         unsigned long end = epoch_clients(netio);
 
-         clock_gettime(CLOCK_REALTIME_COARSE, &tp);
 
-         unsigned long end_post_pc = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
 
-         unsigned long diff_post_pc = end_post_pc - end;
 
-         //printf("Epoch end_post_pc time = %lu\n", end_post_pc);
 
-         //printf("Epoch diff_post_pc time = %lu\n", diff_post_pc);
 
-         unsigned long diff = end - start;
 
-         printf("client_count = %ld\n", client_count);
 
-         printf("bytes_sent = %ld\n", netio.reset_bytes_sent());
 
-         printf("Epoch %d time: %lu.%06lu s\n", i, diff/1000000, diff%1000000);
 
-         struct timeval now;
 
-         gettimeofday(&now, NULL);
 
-         remaining_us = epoch_interval - diff_post_pc;
 
-         // Sleep for the rest of the epoch interval
 
-         printf("%lu.%06lu: Sleeping for %ld us\n", now.tv_sec,
 
-             now.tv_usec, remaining_us);
 
-         if (remaining_us > 0) {
 
-             usleep((useconds_t)remaining_us);
 
-         }
 
-     }
 
-     netio.close();
 
-     exit(0);
 
- }
 
- static void route_test(NetIO &netio, char **args)
 
- {
 
-     // Count the number of arguments
 
-     size_t nargs = 0;
 
-     while (args[nargs]) {
 
-         ++nargs;
 
-     }
 
-     uint16_t num_nodes = netio.num_nodes;
 
-     size_t sq_nodes = num_nodes;
 
-     sq_nodes *= sq_nodes;
 
-     if (nargs != sq_nodes) {
 
-         printf("Expecting %lu arguments, found %lu\n", sq_nodes, nargs);
 
-         return;
 
-     }
 
-     // The arguments are num_nodes sets of num_nodes values.  The jth
 
-     // value in the ith set is the number of private routing tokens
 
-     // ingestion node i holds for storage node j.
 
-     // We are node i = netio.me, so ignore the other sets of values.
 
-     // Precompute some WaksmanNetworks
 
-     const Config &config = netio.config();
 
-     size_t num_sizes = ecall_precompute_sort(-2);
 
-     for (int i=0;i<int(num_sizes);++i) {
 
-         std::vector<boost::thread> ts;
 
-         for (int j=0; j<config.nthreads; ++j) {
 
-             ts.emplace_back([i] {
 
-                 ecall_precompute_sort(i);
 
-             });
 
-         }
 
-         for (auto& t: ts) {
 
-             t.join();
 
-         }
 
-     }
 
-     // The epoch interval, in microseconds
 
-     uint32_t epoch_interval_us = 1000000;
 
-     // Run 10 epochs
 
-     for (int i=0; i<10; ++i) {
 
-         struct timespec tp;
 
-         clock_gettime(CLOCK_REALTIME_COARSE, &tp);
 
-         unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
 
-         epoch(netio, args);
 
-         clock_gettime(CLOCK_REALTIME_COARSE, &tp);
 
-         unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
 
-         unsigned long diff = end - start;
 
-         printf("Epoch time: %lu.%06lu s\n", diff/1000000, diff%1000000);
 
-         // Sleep for the rest of the epoch interval
 
-         if (diff < epoch_interval_us) {
 
-             usleep(epoch_interval_us - (useconds_t)diff);
 
-         }
 
-     }
 
-     netio.close();
 
- }
 
- // Once all the networking is set up, start doing whatever we were asked
 
- // to do on the command line
 
- void start(NetIO &netio, char **args)
 
- {
 
-     if (*args && !strcmp(*args, "route")) {
 
-         ++args;
 
-         route_test(netio, args);
 
-         return;
 
-     }
 
-     if (*args && !strcmp(*args, "route_clients")) {
 
-         ++args;
 
-         printf("num_epochs = %d, epoch_wait_time = %d, num_WN_to_precompute = %d\n",
 
-             num_epochs, epoch_wait_time, num_WN_to_precompute);
 
-         route_clients_test(netio);
 
-         return;
 
-     }
 
- }
 
 
  |