123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 |
- #include <condition_variable>
- #include <mutex>
- #include <stdlib.h>
- #include "Untrusted.hpp"
- #include "start.hpp"
- // Default 4 epochs
- int num_epochs = 4;
- // Default epoch_duration of 5 seconds
- int epoch_duration = 5;
- // Default of 12 Waksman Networks (3 per private_route for 4 epochs)
- int num_WN_to_precompute = 12;
- // We'll always run the WN precomputation in the foreground
- // TODO: Later fix this to a command line param
- bool FOREGROUND_PRECOMPUTE = true;
- #define CEILDIV(x,y) (((x)+(y)-1)/(y))
- class Epoch {
- NetIO &netio;
- uint32_t epoch_num;
- std::mutex m;
- std::condition_variable cv;
- bool epoch_complete;
- void round_cb(uint32_t round_num) {
- if (round_num) {
- printf("Round %u complete\n", round_num);
- boost::asio::post(netio.io_context(), [this]{
- proceed();
- });
- } else {
- printf("Epoch %u complete\n", epoch_num);
- {
- std::lock_guard lk(m);
- epoch_complete = true;
- }
- const NodeConfig &my_conf = netio.myconfig();
- if(my_conf.roles & ROLE_STORAGE) {
- boost::asio::post(netio.io_context(), [this]{
- netio.send_client_mailbox();
- });
- }
- cv.notify_one();
- }
- }
- public:
- Epoch(NetIO &netio_obj, uint32_t ep_num):
- netio(netio_obj), epoch_num(ep_num),
- epoch_complete(false) {}
- void proceed() {
- ecall_routing_proceed([this](uint32_t round_num) {
- round_cb(round_num);
- });
- }
- void wait() {
- std::unique_lock lk(m);
- cv.wait(lk, [this]{ return epoch_complete; });
- }
- };
- static void epoch(NetIO &netio, char **args) {
- static uint32_t epoch_num = 1;
- uint16_t num_nodes = netio.num_nodes;
- uint32_t num_tokens[num_nodes];
- uint32_t tot_tokens = 0;
- for (nodenum_t j=0;j<num_nodes;++j) {
- num_tokens[j] = atoi(args[netio.me*num_nodes+j]);
- tot_tokens += num_tokens[j];
- }
- const Config &config = netio.config();
- uint16_t msg_size = config.msg_size;
- nodenum_t my_node_num = config.my_node_num;
- uint8_t my_roles = config.nodes[my_node_num].roles;
- if (my_roles & ROLE_INGESTION) {
- uint8_t *msgs = new uint8_t[tot_tokens * msg_size];
- uint8_t *nextmsg = msgs;
- uint32_t dest_uid_mask = (1 << DEST_UID_BITS) - 1;
- uint32_t rem_tokens = tot_tokens;
- while (rem_tokens > 0) {
- // Pick a random remaining token
- uint32_t r = uint32_t(lrand48()) % rem_tokens;
- for (nodenum_t j=0;j<num_nodes;++j) {
- if (r < num_tokens[j]) {
- // Use a token from node j
- *((uint32_t*)nextmsg) =
- (j << DEST_UID_BITS) +
- ((rem_tokens-1) & dest_uid_mask);
- // Put a bunch of copies of r as the message body
- for (uint16_t i=1;i<msg_size/4;++i) {
- ((uint32_t*)nextmsg)[i] = r;
- }
- num_tokens[j] -= 1;
- rem_tokens -= 1;
- nextmsg += msg_size;
- } else {
- r -= num_tokens[j];
- }
- }
- }
- /*
- for (uint32_t i=0;i<tot_tokens;++i) {
- for(uint16_t j=0;j<msg_size/4;++j) {
- printf("%08x ", ((uint32_t*)msgs)[i*msg_size/4+j]);
- }
- printf("\n");
- }
- */
- if (!ecall_ingest_raw(msgs, tot_tokens)) {
- printf("Ingestion failed\n");
- return;
- }
- }
- Epoch epoch(netio, epoch_num);
- epoch.proceed();
- epoch.wait();
- // Launch threads to refill the precomputed Waksman networks we
- // used, but just let them run in the background.
- size_t num_sizes = ecall_precompute_sort(-1);
- std::vector<boost::thread> ts;
- for (int i=0;i<int(num_sizes);++i) {
- if(!FOREGROUND_PRECOMPUTE) {
- boost::thread t([i] {
- ecall_precompute_sort(i);
- });
- }
- else {
- ts.emplace_back([i] {
- ecall_precompute_sort(i);
- });
- }
- }
- if(FOREGROUND_PRECOMPUTE) {
- for (auto& t: ts) {
- t.join();
- }
- }
- ++epoch_num;
- }
- static void epoch_clients(NetIO &netio) {
- static uint32_t epoch_num = 1;
- Epoch epoch(netio, epoch_num);
- epoch.proceed();
- epoch.wait();
- // Launch threads to refill the precomputed Waksman networks we
- // used, but just let them run in the background.
- size_t num_sizes = ecall_precompute_sort(-1);
- for (int i=0;i<int(num_sizes);++i) {
- boost::thread t([i] {
- ecall_precompute_sort(i);
- });
- t.detach();
- }
- ++epoch_num;
- }
- static void route_clients_test(NetIO &netio)
- {
- // Default epoch_interval is 5 sec
- size_t epoch_interval = epoch_duration * 1000000;
- printf("Epoch duration = %d\n", epoch_duration);
- // Sleep one epoch_interval for clients to connect
- usleep(epoch_interval);
- // Precompute some WaksmanNetworks
- size_t num_sizes = ecall_precompute_sort(-2);
- /*
- // Setting num_WN_per_size for background computation mode
- printf("Precompute num_sizes = %ld\n", num_sizes);
- int num_WN_per_size = int(CEILDIV(num_WN_to_precompute, num_sizes));
- if(num_WN_per_size <2) {
- num_WN_per_size = 2;
- }
- */
- std::vector<boost::thread> ts;
- for (int i=0;i<int(num_sizes);++i) {
- for (int j=0; j<num_WN_per_size; ++j) {
- ts.emplace_back([i] {
- ecall_precompute_sort(i);
- });
- }
- }
- for (auto& t: ts) {
- t.join();
- }
- // Run epoch
- for (int i=1; i<=num_epochs; ++i) {
- struct timespec tp;
- clock_gettime(CLOCK_REALTIME_COARSE, &tp);
- unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
- epoch_clients(netio);
- clock_gettime(CLOCK_REALTIME_COARSE, &tp);
- unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
- unsigned long diff = end - start;
- printf("client_count = %ld\n", client_count);
- printf("bytes_sent = %ld\n", netio.reset_bytes_sent());
- printf("Epoch %d time: %lu.%06lu s\n", i, diff/1000000, diff%1000000);
- // Sleep for the rest of the epoch interval
- if (diff < epoch_interval) {
- usleep(epoch_interval - (useconds_t) diff);
- }
- }
- netio.close();
- exit(0);
- }
- static void route_test(NetIO &netio, char **args)
- {
- // Count the number of arguments
- size_t nargs = 0;
- while (args[nargs]) {
- ++nargs;
- }
- uint16_t num_nodes = netio.num_nodes;
- size_t sq_nodes = num_nodes;
- sq_nodes *= sq_nodes;
- if (nargs != sq_nodes) {
- printf("Expecting %lu arguments, found %lu\n", sq_nodes, nargs);
- return;
- }
- // The arguments are num_nodes sets of num_nodes values. The jth
- // value in the ith set is the number of private routing tokens
- // ingestion node i holds for storage node j.
- // We are node i = netio.me, so ignore the other sets of values.
- // Precompute some WaksmanNetworks
- const Config &config = netio.config();
- size_t num_sizes = ecall_precompute_sort(-2);
- for (int i=0;i<int(num_sizes);++i) {
- std::vector<boost::thread> ts;
- for (int j=0; j<config.nthreads; ++j) {
- ts.emplace_back([i] {
- ecall_precompute_sort(i);
- });
- }
- for (auto& t: ts) {
- t.join();
- }
- }
- // The epoch interval, in microseconds
- uint32_t epoch_interval_us = 1000000;
- // Run 10 epochs
- for (int i=0; i<10; ++i) {
- struct timespec tp;
- clock_gettime(CLOCK_REALTIME_COARSE, &tp);
- unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
- epoch(netio, args);
- clock_gettime(CLOCK_REALTIME_COARSE, &tp);
- unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
- unsigned long diff = end - start;
- printf("Epoch time: %lu.%06lu s\n", diff/1000000, diff%1000000);
- // Sleep for the rest of the epoch interval
- if (diff < epoch_interval_us) {
- usleep(epoch_interval_us - (useconds_t)diff);
- }
- }
- netio.close();
- }
- // Once all the networking is set up, start doing whatever we were asked
- // to do on the command line
- void start(NetIO &netio, char **args)
- {
- if (*args && !strcmp(*args, "route")) {
- ++args;
- route_test(netio, args);
- return;
- }
- if (*args && !strcmp(*args, "route_clients")) {
- ++args;
- printf("num_epochs = %d, epoch_duration = %d, num_WN_to_precompute = %d\n",
- num_epochs, epoch_duration, num_WN_to_precompute);
- route_clients_test(netio);
- return;
- }
- }
|