start.cpp 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. #include <condition_variable>
  2. #include <mutex>
  3. #include <stdlib.h>
  4. #include "Untrusted.hpp"
  5. #include "start.hpp"
  6. // Default 4 epochs
  7. int num_epochs = 4;
  8. // Default epoch_duration of 5 seconds
  9. int epoch_duration = 5;
  10. // Default of 12 Waksman Networks (3 per private_route for 4 epochs)
  11. int num_WN_to_precompute = 12;
  12. // We'll always run the WN precomputation in the foreground
  13. // TODO: Later fix this to a command line param
  14. bool FOREGROUND_PRECOMPUTE = true;
  15. #define CEILDIV(x,y) (((x)+(y)-1)/(y))
  16. class Epoch {
  17. NetIO &netio;
  18. uint32_t epoch_num;
  19. std::mutex m;
  20. std::condition_variable cv;
  21. bool epoch_complete;
  22. void round_cb(uint32_t round_num) {
  23. if (round_num) {
  24. printf("Round %u complete\n", round_num);
  25. boost::asio::post(netio.io_context(), [this]{
  26. proceed();
  27. });
  28. } else {
  29. printf("Epoch %u complete\n", epoch_num);
  30. {
  31. std::lock_guard lk(m);
  32. epoch_complete = true;
  33. }
  34. const NodeConfig &my_conf = netio.myconfig();
  35. if(my_conf.roles & ROLE_STORAGE) {
  36. boost::asio::post(netio.io_context(), [this]{
  37. netio.send_client_mailbox();
  38. });
  39. }
  40. cv.notify_one();
  41. }
  42. }
  43. public:
  44. Epoch(NetIO &netio_obj, uint32_t ep_num):
  45. netio(netio_obj), epoch_num(ep_num),
  46. epoch_complete(false) {}
  47. void proceed() {
  48. ecall_routing_proceed([this](uint32_t round_num) {
  49. round_cb(round_num);
  50. });
  51. }
  52. void wait() {
  53. std::unique_lock lk(m);
  54. cv.wait(lk, [this]{ return epoch_complete; });
  55. }
  56. };
  57. static void epoch(NetIO &netio, char **args) {
  58. static uint32_t epoch_num = 1;
  59. uint16_t num_nodes = netio.num_nodes;
  60. uint32_t num_tokens[num_nodes];
  61. uint32_t tot_tokens = 0;
  62. for (nodenum_t j=0;j<num_nodes;++j) {
  63. num_tokens[j] = atoi(args[netio.me*num_nodes+j]);
  64. tot_tokens += num_tokens[j];
  65. }
  66. const Config &config = netio.config();
  67. uint16_t msg_size = config.msg_size;
  68. nodenum_t my_node_num = config.my_node_num;
  69. uint8_t my_roles = config.nodes[my_node_num].roles;
  70. if (my_roles & ROLE_INGESTION) {
  71. uint8_t *msgs = new uint8_t[tot_tokens * msg_size];
  72. uint8_t *nextmsg = msgs;
  73. uint32_t dest_uid_mask = (1 << DEST_UID_BITS) - 1;
  74. uint32_t rem_tokens = tot_tokens;
  75. while (rem_tokens > 0) {
  76. // Pick a random remaining token
  77. uint32_t r = uint32_t(lrand48()) % rem_tokens;
  78. for (nodenum_t j=0;j<num_nodes;++j) {
  79. if (r < num_tokens[j]) {
  80. // Use a token from node j
  81. *((uint32_t*)nextmsg) =
  82. (j << DEST_UID_BITS) +
  83. ((rem_tokens-1) & dest_uid_mask);
  84. // Put a bunch of copies of r as the message body
  85. for (uint16_t i=1;i<msg_size/4;++i) {
  86. ((uint32_t*)nextmsg)[i] = r;
  87. }
  88. num_tokens[j] -= 1;
  89. rem_tokens -= 1;
  90. nextmsg += msg_size;
  91. } else {
  92. r -= num_tokens[j];
  93. }
  94. }
  95. }
  96. /*
  97. for (uint32_t i=0;i<tot_tokens;++i) {
  98. for(uint16_t j=0;j<msg_size/4;++j) {
  99. printf("%08x ", ((uint32_t*)msgs)[i*msg_size/4+j]);
  100. }
  101. printf("\n");
  102. }
  103. */
  104. if (!ecall_ingest_raw(msgs, tot_tokens)) {
  105. printf("Ingestion failed\n");
  106. return;
  107. }
  108. }
  109. Epoch epoch(netio, epoch_num);
  110. epoch.proceed();
  111. epoch.wait();
  112. // Launch threads to refill the precomputed Waksman networks we
  113. // used, but just let them run in the background.
  114. size_t num_sizes = ecall_precompute_sort(-1);
  115. std::vector<boost::thread> ts;
  116. for (int i=0;i<int(num_sizes);++i) {
  117. if(!FOREGROUND_PRECOMPUTE) {
  118. boost::thread t([i] {
  119. ecall_precompute_sort(i);
  120. });
  121. }
  122. else {
  123. ts.emplace_back([i] {
  124. ecall_precompute_sort(i);
  125. });
  126. }
  127. }
  128. if(FOREGROUND_PRECOMPUTE) {
  129. for (auto& t: ts) {
  130. t.join();
  131. }
  132. }
  133. ++epoch_num;
  134. }
  135. static void epoch_clients(NetIO &netio) {
  136. static uint32_t epoch_num = 1;
  137. Epoch epoch(netio, epoch_num);
  138. epoch.proceed();
  139. epoch.wait();
  140. // Launch threads to refill the precomputed Waksman networks we
  141. // used, but just let them run in the background.
  142. size_t num_sizes = ecall_precompute_sort(-1);
  143. for (int i=0;i<int(num_sizes);++i) {
  144. boost::thread t([i] {
  145. ecall_precompute_sort(i);
  146. });
  147. t.detach();
  148. }
  149. ++epoch_num;
  150. }
  151. static void route_clients_test(NetIO &netio)
  152. {
  153. // Default epoch_interval is 5 sec
  154. size_t epoch_interval = epoch_duration * 1000000;
  155. printf("Epoch duration = %d\n", epoch_duration);
  156. // Sleep one epoch_interval for clients to connect
  157. usleep(epoch_interval);
  158. // Precompute some WaksmanNetworks
  159. size_t num_sizes = ecall_precompute_sort(-2);
  160. /*
  161. // Setting num_WN_per_size for background computation mode
  162. printf("Precompute num_sizes = %ld\n", num_sizes);
  163. int num_WN_per_size = int(CEILDIV(num_WN_to_precompute, num_sizes));
  164. if(num_WN_per_size <2) {
  165. num_WN_per_size = 2;
  166. }
  167. */
  168. std::vector<boost::thread> ts;
  169. for (int i=0;i<int(num_sizes);++i) {
  170. for (int j=0; j<num_WN_per_size; ++j) {
  171. ts.emplace_back([i] {
  172. ecall_precompute_sort(i);
  173. });
  174. }
  175. }
  176. for (auto& t: ts) {
  177. t.join();
  178. }
  179. // Run epoch
  180. for (int i=1; i<=num_epochs; ++i) {
  181. struct timespec tp;
  182. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  183. unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  184. epoch_clients(netio);
  185. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  186. unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  187. unsigned long diff = end - start;
  188. printf("client_count = %ld\n", client_count);
  189. printf("bytes_sent = %ld\n", netio.reset_bytes_sent());
  190. printf("Epoch %d time: %lu.%06lu s\n", i, diff/1000000, diff%1000000);
  191. // Sleep for the rest of the epoch interval
  192. if (diff < epoch_interval) {
  193. usleep(epoch_interval - (useconds_t) diff);
  194. }
  195. }
  196. netio.close();
  197. exit(0);
  198. }
  199. static void route_test(NetIO &netio, char **args)
  200. {
  201. // Count the number of arguments
  202. size_t nargs = 0;
  203. while (args[nargs]) {
  204. ++nargs;
  205. }
  206. uint16_t num_nodes = netio.num_nodes;
  207. size_t sq_nodes = num_nodes;
  208. sq_nodes *= sq_nodes;
  209. if (nargs != sq_nodes) {
  210. printf("Expecting %lu arguments, found %lu\n", sq_nodes, nargs);
  211. return;
  212. }
  213. // The arguments are num_nodes sets of num_nodes values. The jth
  214. // value in the ith set is the number of private routing tokens
  215. // ingestion node i holds for storage node j.
  216. // We are node i = netio.me, so ignore the other sets of values.
  217. // Precompute some WaksmanNetworks
  218. const Config &config = netio.config();
  219. size_t num_sizes = ecall_precompute_sort(-2);
  220. for (int i=0;i<int(num_sizes);++i) {
  221. std::vector<boost::thread> ts;
  222. for (int j=0; j<config.nthreads; ++j) {
  223. ts.emplace_back([i] {
  224. ecall_precompute_sort(i);
  225. });
  226. }
  227. for (auto& t: ts) {
  228. t.join();
  229. }
  230. }
  231. // The epoch interval, in microseconds
  232. uint32_t epoch_interval_us = 1000000;
  233. // Run 10 epochs
  234. for (int i=0; i<10; ++i) {
  235. struct timespec tp;
  236. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  237. unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  238. epoch(netio, args);
  239. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  240. unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  241. unsigned long diff = end - start;
  242. printf("Epoch time: %lu.%06lu s\n", diff/1000000, diff%1000000);
  243. // Sleep for the rest of the epoch interval
  244. if (diff < epoch_interval_us) {
  245. usleep(epoch_interval_us - (useconds_t)diff);
  246. }
  247. }
  248. netio.close();
  249. }
  250. // Once all the networking is set up, start doing whatever we were asked
  251. // to do on the command line
  252. void start(NetIO &netio, char **args)
  253. {
  254. if (*args && !strcmp(*args, "route")) {
  255. ++args;
  256. route_test(netio, args);
  257. return;
  258. }
  259. if (*args && !strcmp(*args, "route_clients")) {
  260. ++args;
  261. printf("num_epochs = %d, epoch_duration = %d, num_WN_to_precompute = %d\n",
  262. num_epochs, epoch_duration, num_WN_to_precompute);
  263. route_clients_test(netio);
  264. return;
  265. }
  266. }