start.cpp 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. #include <condition_variable>
  2. #include <mutex>
  3. #include <stdlib.h>
  4. #include "Untrusted.hpp"
  5. #include "start.hpp"
  6. class Epoch {
  7. NetIO &netio;
  8. uint32_t epoch_num;
  9. std::mutex m;
  10. std::condition_variable cv;
  11. bool epoch_complete;
  12. void round_cb(uint32_t round_num) {
  13. if (round_num) {
  14. printf("Round %u complete\n", round_num);
  15. boost::asio::post(netio.io_context(), [this]{
  16. proceed();
  17. });
  18. } else {
  19. printf("Epoch %u complete\n", epoch_num);
  20. {
  21. std::lock_guard lk(m);
  22. epoch_complete = true;
  23. }
  24. const NodeConfig &my_conf = netio.myconfig();
  25. if(my_conf.roles & ROLE_STORAGE) {
  26. boost::asio::post(netio.io_context(), [this]{
  27. netio.send_client_mailbox();
  28. });
  29. }
  30. cv.notify_one();
  31. }
  32. }
  33. public:
  34. Epoch(NetIO &netio_obj, uint32_t ep_num):
  35. netio(netio_obj), epoch_num(ep_num),
  36. epoch_complete(false) {}
  37. void proceed() {
  38. ecall_routing_proceed([this](uint32_t round_num) {
  39. round_cb(round_num);
  40. });
  41. }
  42. void wait() {
  43. std::unique_lock lk(m);
  44. cv.wait(lk, [this]{ return epoch_complete; });
  45. }
  46. };
  47. static void epoch(NetIO &netio, char **args) {
  48. static uint32_t epoch_num = 1;
  49. uint16_t num_nodes = netio.num_nodes;
  50. uint32_t num_tokens[num_nodes];
  51. uint32_t tot_tokens = 0;
  52. for (nodenum_t j=0;j<num_nodes;++j) {
  53. num_tokens[j] = atoi(args[netio.me*num_nodes+j]);
  54. tot_tokens += num_tokens[j];
  55. }
  56. const Config &config = netio.config();
  57. uint16_t msg_size = config.msg_size;
  58. nodenum_t my_node_num = config.my_node_num;
  59. uint8_t my_roles = config.nodes[my_node_num].roles;
  60. if (my_roles & ROLE_INGESTION) {
  61. uint8_t *msgs = new uint8_t[tot_tokens * msg_size];
  62. uint8_t *nextmsg = msgs;
  63. uint32_t dest_uid_mask = (1 << DEST_UID_BITS) - 1;
  64. uint32_t rem_tokens = tot_tokens;
  65. while (rem_tokens > 0) {
  66. // Pick a random remaining token
  67. uint32_t r = uint32_t(lrand48()) % rem_tokens;
  68. for (nodenum_t j=0;j<num_nodes;++j) {
  69. if (r < num_tokens[j]) {
  70. // Use a token from node j
  71. *((uint32_t*)nextmsg) =
  72. (j << DEST_UID_BITS) +
  73. ((rem_tokens-1) & dest_uid_mask);
  74. // Put a bunch of copies of r as the message body
  75. for (uint16_t i=1;i<msg_size/4;++i) {
  76. ((uint32_t*)nextmsg)[i] = r;
  77. }
  78. num_tokens[j] -= 1;
  79. rem_tokens -= 1;
  80. nextmsg += msg_size;
  81. } else {
  82. r -= num_tokens[j];
  83. }
  84. }
  85. }
  86. /*
  87. for (uint32_t i=0;i<tot_tokens;++i) {
  88. for(uint16_t j=0;j<msg_size/4;++j) {
  89. printf("%08x ", ((uint32_t*)msgs)[i*msg_size/4+j]);
  90. }
  91. printf("\n");
  92. }
  93. */
  94. if (!ecall_ingest_raw(msgs, tot_tokens)) {
  95. printf("Ingestion failed\n");
  96. return;
  97. }
  98. }
  99. Epoch epoch(netio, epoch_num);
  100. epoch.proceed();
  101. epoch.wait();
  102. // Launch threads to refill the precomputed Waksman networks we
  103. // used, but just let them run in the background.
  104. size_t num_sizes = ecall_precompute_sort(-1);
  105. for (int i=0;i<int(num_sizes);++i) {
  106. boost::thread t([i] {
  107. ecall_precompute_sort(i);
  108. });
  109. t.detach();
  110. }
  111. ++epoch_num;
  112. }
  113. static void epoch_clients(NetIO &netio) {
  114. static uint32_t epoch_num = 1;
  115. Epoch epoch(netio, epoch_num);
  116. epoch.proceed();
  117. epoch.wait();
  118. // Launch threads to refill the precomputed Waksman networks we
  119. // used, but just let them run in the background.
  120. size_t num_sizes = ecall_precompute_sort(-1);
  121. for (int i=0;i<int(num_sizes);++i) {
  122. boost::thread t([i] {
  123. ecall_precompute_sort(i);
  124. });
  125. t.detach();
  126. }
  127. ++epoch_num;
  128. }
  129. static void route_clients_test(NetIO &netio)
  130. {
  131. // Precompute some WaksmanNetworks
  132. const Config &config = netio.config();
  133. size_t num_sizes = ecall_precompute_sort(-2);
  134. for (int i=0;i<int(num_sizes);++i) {
  135. std::vector<boost::thread> ts;
  136. for (int j=0; j<config.nthreads; ++j) {
  137. ts.emplace_back([i] {
  138. ecall_precompute_sort(i);
  139. });
  140. }
  141. for (auto& t: ts) {
  142. t.join();
  143. }
  144. }
  145. // Run epoch
  146. for (int i=1; i<10; ++i) {
  147. struct timespec tp;
  148. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  149. unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  150. epoch_clients(netio);
  151. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  152. unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  153. unsigned long diff = end - start;
  154. printf("Epoch time: %lu.%06lu s\n", diff/1000000, diff%1000000);
  155. // Sleep for the rest of the epoch interval
  156. if (diff < EPOCH_INTERVAL) {
  157. usleep(EPOCH_INTERVAL - (useconds_t) diff);
  158. }
  159. }
  160. netio.close();
  161. }
  162. static void route_test(NetIO &netio, char **args)
  163. {
  164. // Count the number of arguments
  165. size_t nargs = 0;
  166. while (args[nargs]) {
  167. ++nargs;
  168. }
  169. uint16_t num_nodes = netio.num_nodes;
  170. size_t sq_nodes = num_nodes;
  171. sq_nodes *= sq_nodes;
  172. if (nargs != sq_nodes) {
  173. printf("Expecting %lu arguments, found %lu\n", sq_nodes, nargs);
  174. return;
  175. }
  176. // The arguments are num_nodes sets of num_nodes values. The jth
  177. // value in the ith set is the number of private routing tokens
  178. // ingestion node i holds for storage node j.
  179. // We are node i = netio.me, so ignore the other sets of values.
  180. // Precompute some WaksmanNetworks
  181. const Config &config = netio.config();
  182. size_t num_sizes = ecall_precompute_sort(-2);
  183. for (int i=0;i<int(num_sizes);++i) {
  184. std::vector<boost::thread> ts;
  185. for (int j=0; j<config.nthreads; ++j) {
  186. ts.emplace_back([i] {
  187. ecall_precompute_sort(i);
  188. });
  189. }
  190. for (auto& t: ts) {
  191. t.join();
  192. }
  193. }
  194. // The epoch interval, in microseconds
  195. uint32_t epoch_interval_us = 1000000;
  196. // Run 10 epochs
  197. for (int i=0; i<10; ++i) {
  198. struct timespec tp;
  199. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  200. unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  201. epoch(netio, args);
  202. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  203. unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  204. unsigned long diff = end - start;
  205. printf("Epoch time: %lu.%06lu s\n", diff/1000000, diff%1000000);
  206. // Sleep for the rest of the epoch interval
  207. if (diff < epoch_interval_us) {
  208. usleep(epoch_interval_us - (useconds_t)diff);
  209. }
  210. }
  211. netio.close();
  212. }
  213. // Once all the networking is set up, start doing whatever we were asked
  214. // to do on the command line
  215. void start(NetIO &netio, char **args)
  216. {
  217. if (*args && !strcmp(*args, "route")) {
  218. ++args;
  219. route_test(netio, args);
  220. return;
  221. }
  222. if (*args && !strcmp(*args, "route_clients")) {
  223. ++args;
  224. route_clients_test(netio);
  225. return;
  226. }
  227. }