start.cpp 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. #include <condition_variable>
  2. #include <mutex>
  3. #include <stdlib.h>
  4. #include "Untrusted.hpp"
  5. #include "start.hpp"
  6. class Epoch {
  7. boost::asio::io_context &io_context;
  8. uint32_t epoch_num;
  9. std::mutex m;
  10. std::condition_variable cv;
  11. bool epoch_complete;
  12. void round_cb(uint32_t round_num) {
  13. if (round_num) {
  14. printf("Round %u complete\n", round_num);
  15. boost::asio::post(io_context, [this]{
  16. proceed();
  17. });
  18. } else {
  19. printf("Epoch %u complete\n", epoch_num);
  20. {
  21. std::lock_guard lk(m);
  22. epoch_complete = true;
  23. }
  24. cv.notify_one();
  25. }
  26. }
  27. public:
  28. Epoch(boost::asio::io_context &context, uint32_t ep_num):
  29. io_context(context), epoch_num(ep_num),
  30. epoch_complete(false) {}
  31. void proceed() {
  32. ecall_routing_proceed([this](uint32_t round_num) {
  33. round_cb(round_num);
  34. });
  35. }
  36. void wait() {
  37. std::unique_lock lk(m);
  38. cv.wait(lk, [this]{ return epoch_complete; });
  39. }
  40. };
  41. static void epoch(NetIO &netio, char **args) {
  42. static uint32_t epoch_num = 1;
  43. uint16_t num_nodes = netio.num_nodes;
  44. uint32_t num_tokens[num_nodes];
  45. uint32_t tot_tokens = 0;
  46. for (nodenum_t j=0;j<num_nodes;++j) {
  47. num_tokens[j] = atoi(args[netio.me*num_nodes+j]);
  48. tot_tokens += num_tokens[j];
  49. }
  50. const Config &config = netio.config();
  51. uint16_t msg_size = config.msg_size;
  52. nodenum_t my_node_num = config.my_node_num;
  53. uint8_t *msgs = new uint8_t[tot_tokens * msg_size];
  54. uint8_t *nextmsg = msgs;
  55. uint32_t dest_uid_mask = (1 << DEST_UID_BITS) - 1;
  56. uint32_t rem_tokens = tot_tokens;
  57. while (rem_tokens > 0) {
  58. // Pick a random remaining token
  59. uint32_t r = uint32_t(lrand48()) % rem_tokens;
  60. for (nodenum_t j=0;j<num_nodes;++j) {
  61. if (r < num_tokens[j]) {
  62. // Use a token from node j
  63. *((uint32_t*)nextmsg) =
  64. (j << DEST_UID_BITS) +
  65. (((r<<8)+(my_node_num&0xff)) & dest_uid_mask);
  66. // Put a bunch of copies of r as the message body
  67. for (uint16_t i=1;i<msg_size/4;++i) {
  68. ((uint32_t*)nextmsg)[i] = r;
  69. }
  70. num_tokens[j] -= 1;
  71. rem_tokens -= 1;
  72. nextmsg += msg_size;
  73. } else {
  74. r -= num_tokens[j];
  75. }
  76. }
  77. }
  78. /*
  79. for (uint32_t i=0;i<tot_tokens;++i) {
  80. for(uint16_t j=0;j<msg_size/4;++j) {
  81. printf("%08x ", ((uint32_t*)msgs)[i*msg_size/4+j]);
  82. }
  83. printf("\n");
  84. }
  85. */
  86. if (!ecall_ingest_raw(msgs, tot_tokens)) {
  87. printf("Ingestion failed\n");
  88. return;
  89. }
  90. Epoch epoch(netio.io_context(), epoch_num);
  91. epoch.proceed();
  92. epoch.wait();
  93. // Launch threads to refill the precomputed Waksman networks we
  94. // used, but just let them run in the background.
  95. size_t num_sizes = ecall_precompute_sort(-1);
  96. for (int i=0;i<int(num_sizes);++i) {
  97. boost::thread t([i] {
  98. ecall_precompute_sort(i);
  99. });
  100. t.detach();
  101. }
  102. ++epoch_num;
  103. }
  104. static void route_test(NetIO &netio, char **args)
  105. {
  106. // Count the number of arguments
  107. size_t nargs = 0;
  108. while (args[nargs]) {
  109. ++nargs;
  110. }
  111. uint16_t num_nodes = netio.num_nodes;
  112. size_t sq_nodes = num_nodes;
  113. sq_nodes *= sq_nodes;
  114. if (nargs != sq_nodes) {
  115. printf("Expecting %lu arguments, found %lu\n", sq_nodes, nargs);
  116. return;
  117. }
  118. // The arguments are num_nodes sets of num_nodes values. The jth
  119. // value in the ith set is the number of private routing tokens
  120. // ingestion node i holds for storage node j.
  121. // We are node i = netio.me, so ignore the other sets of values.
  122. // Precompute some WaksmanNetworks
  123. const Config &config = netio.config();
  124. size_t num_sizes = ecall_precompute_sort(-1);
  125. for (int i=0;i<int(num_sizes);++i) {
  126. std::vector<boost::thread> ts;
  127. for (int j=0; j<config.nthreads; ++j) {
  128. ts.emplace_back([i] {
  129. ecall_precompute_sort(i);
  130. });
  131. }
  132. for (auto& t: ts) {
  133. t.join();
  134. }
  135. }
  136. // Run 10 epochs
  137. for (int i=0; i<10; ++i) {
  138. struct timespec tp;
  139. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  140. unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  141. epoch(netio, args);
  142. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  143. unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  144. unsigned long diff = end - start;
  145. printf("Epoch time: %lu.%06lu s\n", diff/1000000, diff%1000000);
  146. }
  147. netio.close();
  148. }
  149. // Once all the networking is set up, start doing whatever we were asked
  150. // to do on the command line
  151. void start(NetIO &netio, char **args)
  152. {
  153. if (*args && !strcmp(*args, "route")) {
  154. ++args;
  155. route_test(netio, args);
  156. return;
  157. }
  158. }