start.cpp 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. #include <condition_variable>
  2. #include <mutex>
  3. #include <stdlib.h>
  4. #include "Untrusted.hpp"
  5. #include "start.hpp"
  6. // Default 4 epochs
  7. int num_epochs = 4;
  8. // Default epoch_duration of 5 seconds
  9. int epoch_duration = 5;
  10. // Default of 12 Waksman Networks (3 per private_route for 4 epochs)
  11. int num_WN_to_precompute = 12;
  12. // We'll always run the WN precomputation in the foreground
  13. // TODO: Later fix this to a command line param
  14. bool FOREGROUND_PRECOMPUTE = true;
  15. #define CEILDIV(x,y) (((x)+(y)-1)/(y))
  16. class Epoch {
  17. NetIO &netio;
  18. uint32_t epoch_num;
  19. std::mutex m;
  20. std::condition_variable cv;
  21. bool epoch_complete;
  22. void round_cb(uint32_t round_num) {
  23. if (round_num) {
  24. printf("Round %u complete\n", round_num);
  25. boost::asio::post(netio.io_context(), [this]{
  26. proceed();
  27. });
  28. } else {
  29. printf("Epoch %u complete\n", epoch_num);
  30. {
  31. std::lock_guard lk(m);
  32. epoch_complete = true;
  33. }
  34. const NodeConfig &my_conf = netio.myconfig();
  35. if(my_conf.roles & ROLE_STORAGE) {
  36. boost::asio::post(netio.io_context(), [this]{
  37. netio.send_client_mailbox();
  38. });
  39. }
  40. cv.notify_one();
  41. }
  42. }
  43. public:
  44. Epoch(NetIO &netio_obj, uint32_t ep_num):
  45. netio(netio_obj), epoch_num(ep_num),
  46. epoch_complete(false) {}
  47. void proceed() {
  48. ecall_routing_proceed([this](uint32_t round_num) {
  49. round_cb(round_num);
  50. });
  51. }
  52. void wait() {
  53. std::unique_lock lk(m);
  54. cv.wait(lk, [this]{ return epoch_complete; });
  55. }
  56. };
  57. static void epoch(NetIO &netio, char **args) {
  58. static uint32_t epoch_num = 1;
  59. uint16_t num_nodes = netio.num_nodes;
  60. uint32_t num_tokens[num_nodes];
  61. uint32_t tot_tokens = 0;
  62. for (nodenum_t j=0;j<num_nodes;++j) {
  63. num_tokens[j] = atoi(args[netio.me*num_nodes+j]);
  64. tot_tokens += num_tokens[j];
  65. }
  66. const Config &config = netio.config();
  67. uint16_t msg_size = config.msg_size;
  68. nodenum_t my_node_num = config.my_node_num;
  69. uint8_t my_roles = config.nodes[my_node_num].roles;
  70. if (my_roles & ROLE_INGESTION) {
  71. uint8_t *msgs = new uint8_t[tot_tokens * msg_size];
  72. uint8_t *nextmsg = msgs;
  73. uint32_t dest_uid_mask = (1 << DEST_UID_BITS) - 1;
  74. uint32_t rem_tokens = tot_tokens;
  75. while (rem_tokens > 0) {
  76. // Pick a random remaining token
  77. uint32_t r = uint32_t(lrand48()) % rem_tokens;
  78. for (nodenum_t j=0;j<num_nodes;++j) {
  79. if (r < num_tokens[j]) {
  80. // Use a token from node j
  81. *((uint32_t*)nextmsg) =
  82. (j << DEST_UID_BITS) +
  83. ((rem_tokens-1) & dest_uid_mask);
  84. // Put a bunch of copies of r as the message body
  85. for (uint16_t i=1;i<msg_size/4;++i) {
  86. ((uint32_t*)nextmsg)[i] = r;
  87. }
  88. num_tokens[j] -= 1;
  89. rem_tokens -= 1;
  90. nextmsg += msg_size;
  91. } else {
  92. r -= num_tokens[j];
  93. }
  94. }
  95. }
  96. /*
  97. for (uint32_t i=0;i<tot_tokens;++i) {
  98. for(uint16_t j=0;j<msg_size/4;++j) {
  99. printf("%08x ", ((uint32_t*)msgs)[i*msg_size/4+j]);
  100. }
  101. printf("\n");
  102. }
  103. */
  104. if (!ecall_ingest_raw(msgs, tot_tokens)) {
  105. printf("Ingestion failed\n");
  106. return;
  107. }
  108. }
  109. Epoch epoch(netio, epoch_num);
  110. epoch.proceed();
  111. epoch.wait();
  112. // Launch threads to refill the precomputed Waksman networks we
  113. // used, but just let them run in the background.
  114. size_t num_sizes = ecall_precompute_sort(-1);
  115. std::vector<boost::thread> ts;
  116. for (int i=0;i<int(num_sizes);++i) {
  117. if(!FOREGROUND_PRECOMPUTE) {
  118. boost::thread t([i] {
  119. ecall_precompute_sort(i);
  120. });
  121. }
  122. else {
  123. ts.emplace_back([i] {
  124. ecall_precompute_sort(i);
  125. });
  126. }
  127. }
  128. if(FOREGROUND_PRECOMPUTE) {
  129. for (auto& t: ts) {
  130. t.join();
  131. }
  132. }
  133. ++epoch_num;
  134. }
  135. static void epoch_clients(NetIO &netio) {
  136. static uint32_t epoch_num = 1;
  137. Epoch epoch(netio, epoch_num);
  138. epoch.proceed();
  139. epoch.wait();
  140. // Launch threads to refill the precomputed Waksman networks we
  141. // used, but just let them run in the background.
  142. size_t num_sizes = ecall_precompute_sort(-1);
  143. for (int i=0;i<int(num_sizes);++i) {
  144. boost::thread t([i] {
  145. ecall_precompute_sort(i);
  146. });
  147. t.detach();
  148. }
  149. ++epoch_num;
  150. }
  151. static void route_clients_test(NetIO &netio)
  152. {
  153. // Default epoch_interval is 5 sec
  154. size_t epoch_interval = epoch_duration * 1000000;
  155. printf("Epoch duration = %d\n", epoch_duration);
  156. // Sleep one epoch_interval for clients to connect
  157. usleep(epoch_interval);
  158. // Precompute some WaksmanNetworks
  159. size_t num_sizes = ecall_precompute_sort(-2);
  160. // Setting num_WN_per_size for background computation mode
  161. printf("Precompute num_sizes = %ld\n", num_sizes);
  162. int num_WN_per_size = int(CEILDIV(num_WN_to_precompute, num_sizes));
  163. if(num_WN_per_size <2) {
  164. num_WN_per_size = 2;
  165. }
  166. std::vector<boost::thread> ts;
  167. for (int i=0;i<int(num_sizes);++i) {
  168. for (int j=0; j<num_WN_per_size; ++j) {
  169. ts.emplace_back([i] {
  170. ecall_precompute_sort(i);
  171. });
  172. }
  173. }
  174. for (auto& t: ts) {
  175. t.join();
  176. }
  177. // Run epoch
  178. for (int i=1; i<=num_epochs; ++i) {
  179. struct timespec tp;
  180. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  181. unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  182. epoch_clients(netio);
  183. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  184. unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  185. unsigned long diff = end - start;
  186. printf("client_count = %ld\n", client_count);
  187. printf("bytes_sent = %ld\n", netio.reset_bytes_sent());
  188. printf("Epoch %d time: %lu.%06lu s\n", i, diff/1000000, diff%1000000);
  189. // Sleep for the rest of the epoch interval
  190. if (diff < epoch_interval) {
  191. usleep(epoch_interval - (useconds_t) diff);
  192. }
  193. }
  194. netio.close();
  195. exit(0);
  196. }
  197. static void route_test(NetIO &netio, char **args)
  198. {
  199. // Count the number of arguments
  200. size_t nargs = 0;
  201. while (args[nargs]) {
  202. ++nargs;
  203. }
  204. uint16_t num_nodes = netio.num_nodes;
  205. size_t sq_nodes = num_nodes;
  206. sq_nodes *= sq_nodes;
  207. if (nargs != sq_nodes) {
  208. printf("Expecting %lu arguments, found %lu\n", sq_nodes, nargs);
  209. return;
  210. }
  211. // The arguments are num_nodes sets of num_nodes values. The jth
  212. // value in the ith set is the number of private routing tokens
  213. // ingestion node i holds for storage node j.
  214. // We are node i = netio.me, so ignore the other sets of values.
  215. // Precompute some WaksmanNetworks
  216. const Config &config = netio.config();
  217. size_t num_sizes = ecall_precompute_sort(-2);
  218. for (int i=0;i<int(num_sizes);++i) {
  219. std::vector<boost::thread> ts;
  220. for (int j=0; j<config.nthreads; ++j) {
  221. ts.emplace_back([i] {
  222. ecall_precompute_sort(i);
  223. });
  224. }
  225. for (auto& t: ts) {
  226. t.join();
  227. }
  228. }
  229. // The epoch interval, in microseconds
  230. uint32_t epoch_interval_us = 1000000;
  231. // Run 10 epochs
  232. for (int i=0; i<10; ++i) {
  233. struct timespec tp;
  234. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  235. unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  236. epoch(netio, args);
  237. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  238. unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  239. unsigned long diff = end - start;
  240. printf("Epoch time: %lu.%06lu s\n", diff/1000000, diff%1000000);
  241. // Sleep for the rest of the epoch interval
  242. if (diff < epoch_interval_us) {
  243. usleep(epoch_interval_us - (useconds_t)diff);
  244. }
  245. }
  246. netio.close();
  247. }
  248. // Once all the networking is set up, start doing whatever we were asked
  249. // to do on the command line
  250. void start(NetIO &netio, char **args)
  251. {
  252. if (*args && !strcmp(*args, "route")) {
  253. ++args;
  254. route_test(netio, args);
  255. return;
  256. }
  257. if (*args && !strcmp(*args, "route_clients")) {
  258. ++args;
  259. printf("num_epochs = %d, epoch_duration = %d, num_WN_to_precompute = %d\n",
  260. num_epochs, epoch_duration, num_WN_to_precompute);
  261. route_clients_test(netio);
  262. return;
  263. }
  264. }