#include #include #include #include "Untrusted.hpp" #include "start.hpp" // Default 4 epochs int num_epochs = 4; // Default epoch_duration of 5 seconds int epoch_duration = 5; // Default of 12 Waksman Networks (3 per private_route for 4 epochs) int num_WN_to_precompute = 12; // We'll always run the WN precomputation in the foreground // TODO: Later fix this to a command line param bool FOREGROUND_PRECOMPUTE = true; #define CEILDIV(x,y) (((x)+(y)-1)/(y)) // #define DEBUG_PUB_TIMES class Epoch { NetIO &netio; uint32_t epoch_num; std::mutex m; std::condition_variable cv; bool epoch_complete; #ifdef DEBUG_PUB_TIMES unsigned long lt = 0; #endif void round_cb(uint32_t round_num) { struct timeval now; gettimeofday(&now, NULL); if (round_num) { printf("%lu.%06lu: Round %u complete\n", now.tv_sec, now.tv_usec, round_num); #ifdef DEBUG_PUB_TIMES struct timespec tp; clock_gettime(CLOCK_REALTIME_COARSE, &tp); unsigned long time = tp.tv_sec * 1000000 + tp.tv_nsec/1000; if(lt == 0) { printf("Time now = %lu\n", time); lt = time; } else { printf("Time since last round_cb = %lu.%lu\n", (time-lt)/1000000, (time-lt)%1000000); } #endif boost::asio::post(netio.io_context(), [this]{ proceed(); }); } else { printf("%lu.%06lu: Epoch %u complete\n", now.tv_sec, now.tv_usec, epoch_num); { std::lock_guard lk(m); epoch_complete = true; } #ifdef DEBUG_PUB_TIMES struct timespec tp; clock_gettime(CLOCK_REALTIME_COARSE, &tp); unsigned long time = tp.tv_sec * 1000000 + tp.tv_nsec/1000; printf("Epoch end. Time since last round_cb = %lu.%lu\n", (time-lt)/1000000, (time-lt)%1000000); lt = 0; #endif const NodeConfig &my_conf = netio.myconfig(); if(my_conf.roles & ROLE_STORAGE) { boost::asio::post(netio.io_context(), [this]{ netio.send_client_mailbox(); }); } cv.notify_one(); } } public: Epoch(NetIO &netio_obj, uint32_t ep_num): netio(netio_obj), epoch_num(ep_num), epoch_complete(false) {} void proceed() { ecall_routing_proceed([this](uint32_t round_num) { round_cb(round_num); }); } void wait() { std::unique_lock lk(m); cv.wait(lk, [this]{ return epoch_complete; }); } }; static void epoch(NetIO &netio, char **args) { static uint32_t epoch_num = 1; uint16_t num_nodes = netio.num_nodes; uint32_t num_tokens[num_nodes]; uint32_t tot_tokens = 0; for (nodenum_t j=0;j 0) { // Pick a random remaining token uint32_t r = uint32_t(lrand48()) % rem_tokens; for (nodenum_t j=0;j ts; for (int i=0;i ts; for (int i=0;i 0) { usleep((useconds_t) remaining_us); } // Run epoch for (int i=1; i<=num_epochs; ++i) { struct timeval epoch_start; gettimeofday(&epoch_start, NULL); printf("%ld.%06ld: Epoch %d start\n", epoch_start.tv_sec, epoch_start.tv_usec, i); struct timespec tp; clock_gettime(CLOCK_REALTIME_COARSE, &tp); unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000; printf("Epoch start time = %lu\n", start); unsigned long end = epoch_clients(netio); clock_gettime(CLOCK_REALTIME_COARSE, &tp); unsigned long end_post_pc = tp.tv_sec * 1000000 + tp.tv_nsec/1000; unsigned long diff_post_pc = end_post_pc - end; //printf("Epoch end_post_pc time = %lu\n", end_post_pc); //printf("Epoch diff_post_pc time = %lu\n", diff_post_pc); unsigned long diff = end - start; printf("client_count = %ld\n", client_count); printf("bytes_sent = %ld\n", netio.reset_bytes_sent()); printf("Epoch %d time: %lu.%06lu s\n", i, diff/1000000, diff%1000000); struct timeval now; gettimeofday(&now, NULL); remaining_us = epoch_interval - diff_post_pc; // Sleep for the rest of the epoch interval printf("%lu.%06lu: Sleeping for %ld us\n", now.tv_sec, now.tv_usec, remaining_us); if (remaining_us > 0) { usleep((useconds_t)remaining_us); } } netio.close(); exit(0); } static void route_test(NetIO &netio, char **args) { // Count the number of arguments size_t nargs = 0; while (args[nargs]) { ++nargs; } uint16_t num_nodes = netio.num_nodes; size_t sq_nodes = num_nodes; sq_nodes *= sq_nodes; if (nargs != sq_nodes) { printf("Expecting %lu arguments, found %lu\n", sq_nodes, nargs); return; } // The arguments are num_nodes sets of num_nodes values. The jth // value in the ith set is the number of private routing tokens // ingestion node i holds for storage node j. // We are node i = netio.me, so ignore the other sets of values. // Precompute some WaksmanNetworks const Config &config = netio.config(); size_t num_sizes = ecall_precompute_sort(-2); for (int i=0;i ts; for (int j=0; j