start.cpp 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. #include <condition_variable>
  2. #include <mutex>
  3. #include <stdlib.h>
  4. #include "Untrusted.hpp"
  5. #include "start.hpp"
  6. // epoch_duration is set to 5 seconds by default
  7. int epoch_duration = 5;
  8. class Epoch {
  9. NetIO &netio;
  10. uint32_t epoch_num;
  11. std::mutex m;
  12. std::condition_variable cv;
  13. bool epoch_complete;
  14. void round_cb(uint32_t round_num) {
  15. if (round_num) {
  16. printf("Round %u complete\n", round_num);
  17. boost::asio::post(netio.io_context(), [this]{
  18. proceed();
  19. });
  20. } else {
  21. printf("Epoch %u complete\n", epoch_num);
  22. {
  23. std::lock_guard lk(m);
  24. epoch_complete = true;
  25. }
  26. const NodeConfig &my_conf = netio.myconfig();
  27. if(my_conf.roles & ROLE_STORAGE) {
  28. boost::asio::post(netio.io_context(), [this]{
  29. netio.send_client_mailbox();
  30. });
  31. }
  32. cv.notify_one();
  33. }
  34. }
  35. public:
  36. Epoch(NetIO &netio_obj, uint32_t ep_num):
  37. netio(netio_obj), epoch_num(ep_num),
  38. epoch_complete(false) {}
  39. void proceed() {
  40. ecall_routing_proceed([this](uint32_t round_num) {
  41. round_cb(round_num);
  42. });
  43. }
  44. void wait() {
  45. std::unique_lock lk(m);
  46. cv.wait(lk, [this]{ return epoch_complete; });
  47. }
  48. };
  49. static void epoch(NetIO &netio, char **args) {
  50. static uint32_t epoch_num = 1;
  51. uint16_t num_nodes = netio.num_nodes;
  52. uint32_t num_tokens[num_nodes];
  53. uint32_t tot_tokens = 0;
  54. for (nodenum_t j=0;j<num_nodes;++j) {
  55. num_tokens[j] = atoi(args[netio.me*num_nodes+j]);
  56. tot_tokens += num_tokens[j];
  57. }
  58. const Config &config = netio.config();
  59. uint16_t msg_size = config.msg_size;
  60. nodenum_t my_node_num = config.my_node_num;
  61. uint8_t my_roles = config.nodes[my_node_num].roles;
  62. if (my_roles & ROLE_INGESTION) {
  63. uint8_t *msgs = new uint8_t[tot_tokens * msg_size];
  64. uint8_t *nextmsg = msgs;
  65. uint32_t dest_uid_mask = (1 << DEST_UID_BITS) - 1;
  66. uint32_t rem_tokens = tot_tokens;
  67. while (rem_tokens > 0) {
  68. // Pick a random remaining token
  69. uint32_t r = uint32_t(lrand48()) % rem_tokens;
  70. for (nodenum_t j=0;j<num_nodes;++j) {
  71. if (r < num_tokens[j]) {
  72. // Use a token from node j
  73. *((uint32_t*)nextmsg) =
  74. (j << DEST_UID_BITS) +
  75. ((rem_tokens-1) & dest_uid_mask);
  76. // Put a bunch of copies of r as the message body
  77. for (uint16_t i=1;i<msg_size/4;++i) {
  78. ((uint32_t*)nextmsg)[i] = r;
  79. }
  80. num_tokens[j] -= 1;
  81. rem_tokens -= 1;
  82. nextmsg += msg_size;
  83. } else {
  84. r -= num_tokens[j];
  85. }
  86. }
  87. }
  88. /*
  89. for (uint32_t i=0;i<tot_tokens;++i) {
  90. for(uint16_t j=0;j<msg_size/4;++j) {
  91. printf("%08x ", ((uint32_t*)msgs)[i*msg_size/4+j]);
  92. }
  93. printf("\n");
  94. }
  95. */
  96. if (!ecall_ingest_raw(msgs, tot_tokens)) {
  97. printf("Ingestion failed\n");
  98. return;
  99. }
  100. }
  101. Epoch epoch(netio, epoch_num);
  102. epoch.proceed();
  103. epoch.wait();
  104. // Launch threads to refill the precomputed Waksman networks we
  105. // used, but just let them run in the background.
  106. size_t num_sizes = ecall_precompute_sort(-1);
  107. for (int i=0;i<int(num_sizes);++i) {
  108. boost::thread t([i] {
  109. ecall_precompute_sort(i);
  110. });
  111. t.detach();
  112. }
  113. ++epoch_num;
  114. }
  115. static void epoch_clients(NetIO &netio) {
  116. static uint32_t epoch_num = 1;
  117. Epoch epoch(netio, epoch_num);
  118. epoch.proceed();
  119. epoch.wait();
  120. // Launch threads to refill the precomputed Waksman networks we
  121. // used, but just let them run in the background.
  122. size_t num_sizes = ecall_precompute_sort(-1);
  123. for (int i=0;i<int(num_sizes);++i) {
  124. boost::thread t([i] {
  125. ecall_precompute_sort(i);
  126. });
  127. t.detach();
  128. }
  129. ++epoch_num;
  130. }
  131. static void route_clients_test(NetIO &netio)
  132. {
  133. // Default epoch_interval is 5 sec
  134. size_t epoch_interval = epoch_duration * 1000000;
  135. printf("Epoch duration = %d\n", epoch_duration);
  136. // Sleep two epoch_interval for clients to connect
  137. usleep(2 * epoch_interval);
  138. // Precompute some WaksmanNetworks
  139. const Config &config = netio.config();
  140. size_t num_sizes = ecall_precompute_sort(-2);
  141. for (int i=0;i<int(num_sizes);++i) {
  142. std::vector<boost::thread> ts;
  143. for (int j=0; j<config.nthreads; ++j) {
  144. ts.emplace_back([i] {
  145. ecall_precompute_sort(i);
  146. });
  147. }
  148. for (auto& t: ts) {
  149. t.join();
  150. }
  151. }
  152. // Run epoch
  153. for (int i=1; i<=10; ++i) {
  154. struct timespec tp;
  155. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  156. unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  157. epoch_clients(netio);
  158. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  159. unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  160. unsigned long diff = end - start;
  161. //printf("client_count = %ld\n", client_count);
  162. printf("Epoch %d time: %lu.%06lu s\n", i, diff/1000000, diff%1000000);
  163. // Sleep for the rest of the epoch interval
  164. if (diff < epoch_interval) {
  165. usleep(epoch_interval - (useconds_t) diff);
  166. }
  167. }
  168. netio.close();
  169. exit(0);
  170. }
  171. static void route_test(NetIO &netio, char **args)
  172. {
  173. // Count the number of arguments
  174. size_t nargs = 0;
  175. while (args[nargs]) {
  176. ++nargs;
  177. }
  178. uint16_t num_nodes = netio.num_nodes;
  179. size_t sq_nodes = num_nodes;
  180. sq_nodes *= sq_nodes;
  181. if (nargs != sq_nodes) {
  182. printf("Expecting %lu arguments, found %lu\n", sq_nodes, nargs);
  183. return;
  184. }
  185. // The arguments are num_nodes sets of num_nodes values. The jth
  186. // value in the ith set is the number of private routing tokens
  187. // ingestion node i holds for storage node j.
  188. // We are node i = netio.me, so ignore the other sets of values.
  189. // Precompute some WaksmanNetworks
  190. const Config &config = netio.config();
  191. size_t num_sizes = ecall_precompute_sort(-2);
  192. for (int i=0;i<int(num_sizes);++i) {
  193. std::vector<boost::thread> ts;
  194. for (int j=0; j<config.nthreads; ++j) {
  195. ts.emplace_back([i] {
  196. ecall_precompute_sort(i);
  197. });
  198. }
  199. for (auto& t: ts) {
  200. t.join();
  201. }
  202. }
  203. // The epoch interval, in microseconds
  204. uint32_t epoch_interval_us = 1000000;
  205. // Run 10 epochs
  206. for (int i=0; i<10; ++i) {
  207. struct timespec tp;
  208. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  209. unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  210. epoch(netio, args);
  211. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  212. unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  213. unsigned long diff = end - start;
  214. printf("Epoch time: %lu.%06lu s\n", diff/1000000, diff%1000000);
  215. // Sleep for the rest of the epoch interval
  216. if (diff < epoch_interval_us) {
  217. usleep(epoch_interval_us - (useconds_t)diff);
  218. }
  219. }
  220. netio.close();
  221. }
  222. // Once all the networking is set up, start doing whatever we were asked
  223. // to do on the command line
  224. void start(NetIO &netio, char **args)
  225. {
  226. if (*args && !strcmp(*args, "route")) {
  227. ++args;
  228. route_test(netio, args);
  229. return;
  230. }
  231. if (*args && !strcmp(*args, "route_clients")) {
  232. ++args;
  233. route_clients_test(netio);
  234. return;
  235. }
  236. }