start.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. #include <condition_variable>
  2. #include <mutex>
  3. #include <stdlib.h>
  4. #include "Untrusted.hpp"
  5. #include "start.hpp"
  6. // Default 4 epochs
  7. int num_epochs = 4;
  8. // Default epoch_wait_time of 5 seconds
  9. int epoch_wait_time = 5;
  10. // Default of 12 Waksman Networks (3 per token channel route for 4 epochs)
  11. int num_WN_to_precompute = 12;
  12. // We'll always run the WN precomputation in the foreground
  13. // TODO: Later fix this to a command line param
  14. static const bool FOREGROUND_PRECOMPUTE = true;
  15. #define CEILDIV(x,y) (((x)+(y)-1)/(y))
  16. // #define DEBUG_PUB_TIMES
  17. class Epoch {
  18. NetIO &netio;
  19. uint32_t epoch_num;
  20. std::mutex m;
  21. std::condition_variable cv;
  22. bool epoch_complete;
  23. #ifdef DEBUG_PUB_TIMES
  24. unsigned long lt = 0;
  25. #endif
  26. void round_cb(uint32_t round_num) {
  27. struct timeval now;
  28. gettimeofday(&now, NULL);
  29. if (round_num) {
  30. printf("%lu.%06lu: Round %u complete\n", now.tv_sec,
  31. now.tv_usec, round_num);
  32. #ifdef DEBUG_PUB_TIMES
  33. struct timespec tp;
  34. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  35. unsigned long time = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  36. if(lt == 0) {
  37. printf("Time now = %lu\n", time);
  38. lt = time;
  39. } else {
  40. printf("Time since last round_cb = %lu.%lu\n", (time-lt)/1000000, (time-lt)%1000000);
  41. }
  42. #endif
  43. boost::asio::post(netio.io_context(), [this]{
  44. proceed();
  45. });
  46. } else {
  47. printf("%lu.%06lu: Epoch %u complete\n", now.tv_sec,
  48. now.tv_usec, epoch_num);
  49. {
  50. std::lock_guard lk(m);
  51. epoch_complete = true;
  52. }
  53. #ifdef DEBUG_PUB_TIMES
  54. struct timespec tp;
  55. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  56. unsigned long time = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  57. printf("Epoch end. Time since last round_cb = %lu.%lu\n", (time-lt)/1000000, (time-lt)%1000000);
  58. lt = 0;
  59. #endif
  60. const NodeConfig &my_conf = netio.myconfig();
  61. if(my_conf.roles & ROLE_STORAGE) {
  62. boost::asio::post(netio.io_context(), [this]{
  63. netio.send_client_mailbox();
  64. });
  65. }
  66. cv.notify_one();
  67. }
  68. }
  69. public:
  70. Epoch(NetIO &netio_obj, uint32_t ep_num):
  71. netio(netio_obj), epoch_num(ep_num),
  72. epoch_complete(false) {}
  73. void proceed() {
  74. ecall_routing_proceed([this](uint32_t round_num) {
  75. round_cb(round_num);
  76. });
  77. }
  78. void wait() {
  79. std::unique_lock lk(m);
  80. cv.wait(lk, [this]{ return epoch_complete; });
  81. }
  82. };
  83. static void epoch(NetIO &netio, char **args) {
  84. static uint32_t epoch_num = 1;
  85. uint16_t num_nodes = netio.num_nodes;
  86. uint32_t num_tokens[num_nodes];
  87. uint32_t tot_tokens = 0;
  88. for (nodenum_t j=0;j<num_nodes;++j) {
  89. num_tokens[j] = atoi(args[netio.me*num_nodes+j]);
  90. tot_tokens += num_tokens[j];
  91. }
  92. const Config &config = netio.config();
  93. uint16_t msg_size = config.msg_size;
  94. nodenum_t my_node_num = config.my_node_num;
  95. uint8_t my_roles = config.nodes[my_node_num].roles;
  96. if (my_roles & ROLE_INGESTION) {
  97. uint8_t *msgs = new uint8_t[tot_tokens * msg_size];
  98. uint8_t *nextmsg = msgs;
  99. uint32_t dest_uid_mask = (1 << DEST_UID_BITS) - 1;
  100. uint32_t rem_tokens = tot_tokens;
  101. while (rem_tokens > 0) {
  102. // Pick a random remaining token
  103. uint32_t r = uint32_t(lrand48()) % rem_tokens;
  104. for (nodenum_t j=0;j<num_nodes;++j) {
  105. if (r < num_tokens[j]) {
  106. // Use a token from node j
  107. *((uint32_t*)nextmsg) =
  108. (j << DEST_UID_BITS) +
  109. ((rem_tokens-1) & dest_uid_mask);
  110. // Put a bunch of copies of r as the message body
  111. for (uint16_t i=1;i<msg_size/4;++i) {
  112. ((uint32_t*)nextmsg)[i] = r;
  113. }
  114. num_tokens[j] -= 1;
  115. rem_tokens -= 1;
  116. nextmsg += msg_size;
  117. } else {
  118. r -= num_tokens[j];
  119. }
  120. }
  121. }
  122. /*
  123. for (uint32_t i=0;i<tot_tokens;++i) {
  124. for(uint16_t j=0;j<msg_size/4;++j) {
  125. printf("%08x ", ((uint32_t*)msgs)[i*msg_size/4+j]);
  126. }
  127. printf("\n");
  128. }
  129. */
  130. if (!ecall_ingest_raw(msgs, tot_tokens)) {
  131. printf("Ingestion failed\n");
  132. return;
  133. }
  134. }
  135. Epoch epoch(netio, epoch_num);
  136. epoch.proceed();
  137. epoch.wait();
  138. // Launch threads to refill the precomputed Waksman networks we
  139. // used, but just let them run in the background.
  140. size_t num_sizes = ecall_precompute_sort(-1);
  141. for (int i=0;i<int(num_sizes);++i) {
  142. boost::thread t([i] {
  143. ecall_precompute_sort(i);
  144. });
  145. t.detach();
  146. }
  147. ++epoch_num;
  148. }
  149. static unsigned long epoch_clients(NetIO &netio) {
  150. static uint32_t epoch_num = 1;
  151. Epoch epoch(netio, epoch_num);
  152. epoch.proceed();
  153. epoch.wait();
  154. struct timespec tp;
  155. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  156. unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  157. printf("Epoch end time = %lu\n", end);
  158. if (FOREGROUND_PRECOMPUTE) {
  159. struct timeval now;
  160. gettimeofday(&now, NULL);
  161. printf("%lu.%06lu: Begin Waksman networks precompute\n",
  162. now.tv_sec, now.tv_usec);
  163. }
  164. // Launch threads to refill the precomputed Waksman networks we used.
  165. size_t num_sizes = ecall_precompute_sort(-1);
  166. std::vector<boost::thread> ts;
  167. for (int i=0;i<int(num_sizes);++i) {
  168. if(!FOREGROUND_PRECOMPUTE) {
  169. boost::thread t([i] {
  170. ecall_precompute_sort(i);
  171. });
  172. t.detach();
  173. }
  174. else {
  175. ts.emplace_back([i] {
  176. ecall_precompute_sort(i);
  177. });
  178. }
  179. }
  180. if(FOREGROUND_PRECOMPUTE) {
  181. for (auto& t: ts) {
  182. t.join();
  183. }
  184. struct timeval now;
  185. gettimeofday(&now, NULL);
  186. printf("%lu.%06lu: End Waksman networks precompute\n",
  187. now.tv_sec, now.tv_usec);
  188. }
  189. ++epoch_num;
  190. return end;
  191. }
  192. static void route_clients_test(NetIO &netio)
  193. {
  194. // Default epoch_interval is 5 sec
  195. unsigned long epoch_interval = epoch_wait_time * 1000000;
  196. printf("Epoch wait time = %d\n", epoch_wait_time);
  197. struct timeval exp_start;
  198. gettimeofday(&exp_start, NULL);
  199. // Precompute some WaksmanNetworks
  200. size_t num_sizes = ecall_precompute_sort(-2);
  201. // Setting num_WN_per_size for background computation mode
  202. printf("Precompute num_sizes = %ld\n", num_sizes);
  203. int num_WN_per_size = int(CEILDIV(num_WN_to_precompute, num_sizes));
  204. if(num_WN_per_size <2) {
  205. num_WN_per_size = 2;
  206. }
  207. printf("%lu.%06lu: Begin Waksman networks precompute\n",
  208. exp_start.tv_sec, exp_start.tv_usec);
  209. std::vector<boost::thread> ts;
  210. for (int i=0;i<int(num_sizes);++i) {
  211. for (int j=0; j<num_WN_per_size; ++j) {
  212. ts.emplace_back([i] {
  213. ecall_precompute_sort(i);
  214. });
  215. }
  216. }
  217. for (auto& t: ts) {
  218. t.join();
  219. }
  220. struct timeval exp_now;
  221. gettimeofday(&exp_now, NULL);
  222. printf("%lu.%06lu: End Waksman networks precompute\n",
  223. exp_now.tv_sec, exp_now.tv_usec);
  224. // Sleep one epoch_interval for clients to connect
  225. long remaining_us =
  226. epoch_interval
  227. - (exp_now.tv_sec - exp_start.tv_sec) * 1000000
  228. - (exp_now.tv_usec - exp_start.tv_usec);
  229. if (remaining_us > 0) {
  230. usleep((useconds_t) remaining_us);
  231. }
  232. // Run epoch
  233. for (int i=1; i<=num_epochs; ++i) {
  234. struct timeval epoch_start;
  235. gettimeofday(&epoch_start, NULL);
  236. printf("%ld.%06ld: Epoch %d start\n", epoch_start.tv_sec,
  237. epoch_start.tv_usec, i);
  238. struct timespec tp;
  239. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  240. unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  241. printf("Epoch start time = %lu\n", start);
  242. unsigned long end = epoch_clients(netio);
  243. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  244. unsigned long end_post_pc = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  245. unsigned long diff_post_pc = end_post_pc - end;
  246. //printf("Epoch end_post_pc time = %lu\n", end_post_pc);
  247. //printf("Epoch diff_post_pc time = %lu\n", diff_post_pc);
  248. unsigned long diff = end - start;
  249. printf("client_count = %ld\n", client_count);
  250. printf("bytes_sent = %ld\n", netio.reset_bytes_sent());
  251. printf("Epoch %d time: %lu.%06lu s\n", i, diff/1000000, diff%1000000);
  252. struct timeval now;
  253. gettimeofday(&now, NULL);
  254. remaining_us = epoch_interval - diff_post_pc;
  255. // Sleep for the rest of the epoch interval
  256. printf("%lu.%06lu: Sleeping for %ld us\n", now.tv_sec,
  257. now.tv_usec, remaining_us);
  258. if (remaining_us > 0) {
  259. usleep((useconds_t)remaining_us);
  260. }
  261. }
  262. netio.close();
  263. exit(0);
  264. }
  265. static void route_test(NetIO &netio, char **args)
  266. {
  267. // Count the number of arguments
  268. size_t nargs = 0;
  269. while (args[nargs]) {
  270. ++nargs;
  271. }
  272. uint16_t num_nodes = netio.num_nodes;
  273. size_t sq_nodes = num_nodes;
  274. sq_nodes *= sq_nodes;
  275. if (nargs != sq_nodes) {
  276. printf("Expecting %lu arguments, found %lu\n", sq_nodes, nargs);
  277. return;
  278. }
  279. // The arguments are num_nodes sets of num_nodes values. The jth
  280. // value in the ith set is the number of token channel routing tokens
  281. // ingestion node i holds for storage node j.
  282. // We are node i = netio.me, so ignore the other sets of values.
  283. // Precompute some WaksmanNetworks
  284. const Config &config = netio.config();
  285. size_t num_sizes = ecall_precompute_sort(-2);
  286. for (int i=0;i<int(num_sizes);++i) {
  287. std::vector<boost::thread> ts;
  288. for (int j=0; j<config.nthreads; ++j) {
  289. ts.emplace_back([i] {
  290. ecall_precompute_sort(i);
  291. });
  292. }
  293. for (auto& t: ts) {
  294. t.join();
  295. }
  296. }
  297. // The epoch interval, in microseconds
  298. uint32_t epoch_interval_us = 1000000;
  299. // Run 10 epochs
  300. for (int i=0; i<10; ++i) {
  301. struct timespec tp;
  302. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  303. unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  304. epoch(netio, args);
  305. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  306. unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  307. unsigned long diff = end - start;
  308. printf("Epoch time: %lu.%06lu s\n", diff/1000000, diff%1000000);
  309. // Sleep for the rest of the epoch interval
  310. if (diff < epoch_interval_us) {
  311. usleep(epoch_interval_us - (useconds_t)diff);
  312. }
  313. }
  314. netio.close();
  315. }
  316. // Once all the networking is set up, start doing whatever we were asked
  317. // to do on the command line
  318. void start(NetIO &netio, char **args)
  319. {
  320. if (*args && !strcmp(*args, "route")) {
  321. ++args;
  322. route_test(netio, args);
  323. return;
  324. }
  325. if (*args && !strcmp(*args, "route_clients")) {
  326. ++args;
  327. printf("num_epochs = %d, epoch_wait_time = %d, num_WN_to_precompute = %d\n",
  328. num_epochs, epoch_wait_time, num_WN_to_precompute);
  329. route_clients_test(netio);
  330. return;
  331. }
  332. }