clients.cpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660
  1. #include <iostream>
  2. #include <functional>
  3. #include "../App/appconfig.hpp"
  4. // The next line suppresses a deprecation warning within boost
  5. #define BOOST_BIND_GLOBAL_PLACEHOLDERS
  6. #include "boost/property_tree/ptree.hpp"
  7. #include "boost/property_tree/json_parser.hpp"
  8. #include <boost/asio.hpp>
  9. #include <thread>
  10. #include "gcm.h"
  11. #include "sgx_tcrypto.h"
  12. #include "clients.hpp"
  13. #define CEILDIV(x,y) (((x)+(y)-1)/(y))
  14. // Split a hostport string like "127.0.0.1:12000" at the rightmost colon
  15. // into a host part "127.0.0.1" and a port part "12000".
  16. static bool split_host_port(std::string &host, std::string &port,
  17. const std::string &hostport)
  18. {
  19. size_t colon = hostport.find_last_of(':');
  20. if (colon == std::string::npos) {
  21. std::cerr << "Cannot parse \"" << hostport << "\" as host:port\n";
  22. return false;
  23. }
  24. host = hostport.substr(0, colon);
  25. port = hostport.substr(colon+1);
  26. return true;
  27. }
  28. // Convert a single hex character into its value from 0 to 15. Return
  29. // true on success, false if it wasn't a hex character.
  30. static inline bool hextoval(unsigned char &val, char hex)
  31. {
  32. if (hex >= '0' && hex <= '9') {
  33. val = ((unsigned char)hex)-'0';
  34. } else if (hex >= 'a' && hex <= 'f') {
  35. val = ((unsigned char)hex)-'a'+10;
  36. } else if (hex >= 'A' && hex <= 'F') {
  37. val = ((unsigned char)hex)-'A'+10;
  38. } else {
  39. return false;
  40. }
  41. return true;
  42. }
  43. // Convert a 2*len hex character string into a len-byte buffer. Return
  44. // true on success, false on failure.
  45. static bool hextobuf(unsigned char *buf, const char *str, size_t len)
  46. {
  47. if (strlen(str) != 2*len) {
  48. std::cerr << "Hex string was not the expected size\n";
  49. return false;
  50. }
  51. for (size_t i=0;i<len;++i) {
  52. unsigned char hi, lo;
  53. if (!hextoval(hi, str[2*i]) || !hextoval(lo, str[2*i+1])) {
  54. std::cerr << "Cannot parse string as hex\n";
  55. return false;
  56. }
  57. buf[i] = (unsigned char)((hi << 4) + lo);
  58. }
  59. return true;
  60. }
  61. void displayMessage(unsigned char *msg, uint16_t msg_size)
  62. {
  63. clientid_t sid, rid;
  64. unsigned char *ptr = msg;
  65. sid = *((clientid_t*) ptr);
  66. ptr+=sizeof(sid);
  67. rid = *((clientid_t*) ptr);
  68. ptr+=sizeof(sid);
  69. printf("Sender ID: %d, Receiver ID: %d, Token: N/A\n", sid, rid );
  70. printf("Message: ");
  71. for(int j = 0; j<msg_size - sizeof(sid)*2; j++) {
  72. printf("%x", (*ptr));
  73. ptr++;
  74. }
  75. printf("\n");
  76. }
  77. void displayPtMessageBundle(unsigned char *bundle, uint16_t priv_out,
  78. uint16_t msg_size)
  79. {
  80. unsigned char *ptr = bundle;
  81. for(int i=0; i<priv_out; i++) {
  82. displayMessage(ptr, msg_size);
  83. printf("\n");
  84. ptr+=msg_size;
  85. }
  86. printf("\n");
  87. }
  88. void displayEncMessageBundle(unsigned char *bundle, uint16_t priv_out,
  89. uint16_t msg_size)
  90. {
  91. unsigned char *ptr = bundle;
  92. uint64_t header = *((uint64_t*) ptr);
  93. ptr+=sizeof(uint64_t);
  94. printf("IV: ");
  95. for(int i=0; i<SGX_AESGCM_IV_SIZE; i++) {
  96. printf("%x", ptr[i]);
  97. }
  98. printf("\n");
  99. ptr+= SGX_AESGCM_IV_SIZE;
  100. for(int i=0; i<priv_out; i++) {
  101. displayMessage(ptr, msg_size);
  102. ptr+=msg_size;
  103. }
  104. printf("MAC: ");
  105. for(int i=0; i<SGX_AESGCM_MAC_SIZE; i++) {
  106. printf("%x", ptr[i]);
  107. }
  108. printf("\n");
  109. }
  110. static inline uint32_t encMsgBundleSize(uint16_t priv_out, uint16_t msg_size)
  111. {
  112. return(SGX_AESGCM_IV_SIZE + (priv_out * msg_size) + SGX_AESGCM_MAC_SIZE);
  113. }
  114. static inline uint32_t ptMsgBundleSize(uint16_t priv_out, uint16_t msg_size)
  115. {
  116. return((priv_out * msg_size));
  117. }
  118. bool config_parse(Config &config, const std::string configstr,
  119. std::vector<NodeConfig> &ingestion_nodes,
  120. std::vector<NodeConfig> &storage_nodes,
  121. std::vector<uint16_t> &storage_map)
  122. {
  123. bool found_params = false;
  124. bool ret = true;
  125. std::istringstream configstream(configstr);
  126. boost::property_tree::ptree conftree;
  127. read_json(configstream, conftree);
  128. uint16_t node_num = 0;
  129. for (auto & entry : conftree) {
  130. if (!entry.first.compare("params")) {
  131. for (auto & pentry : entry.second) {
  132. if (!pentry.first.compare("msg_size")) {
  133. config.msg_size = pentry.second.get_value<uint16_t>();
  134. } else if (!pentry.first.compare("user_count")) {
  135. config.user_count = pentry.second.get_value<uint32_t>();
  136. } else if (!pentry.first.compare("priv_out")) {
  137. config.m_priv_out = pentry.second.get_value<uint8_t>();
  138. } else if (!pentry.first.compare("priv_in")) {
  139. config.m_priv_in = pentry.second.get_value<uint8_t>();
  140. } else if (!pentry.first.compare("pub_out")) {
  141. config.m_pub_out = pentry.second.get_value<uint8_t>();
  142. } else if (!pentry.first.compare("pub_in")) {
  143. config.m_pub_in = pentry.second.get_value<uint8_t>();
  144. // A hardcoded shared secret to derive various
  145. // keys for client -> server communications and tokens
  146. } else if (!pentry.first.compare("master_secret")) {
  147. std::string hex_key = pentry.second.data();
  148. memcpy(config.master_secret, hex_key.c_str(), SGX_AESGCM_KEY_SIZE);
  149. } else {
  150. std::cerr << "Unknown field in params: " <<
  151. pentry.first << "\n";
  152. ret = false;
  153. }
  154. }
  155. found_params = true;
  156. } else if (!entry.first.compare("nodes")) {
  157. for (auto & node : entry.second) {
  158. NodeConfig nc;
  159. // All nodes need to be assigned their role in manifest.yaml
  160. nc.roles = 0;
  161. for (auto & nentry : node.second) {
  162. if (!nentry.first.compare("name")) {
  163. nc.name = nentry.second.get_value<std::string>();
  164. } else if (!nentry.first.compare("pubkey")) {
  165. ret &= hextobuf((unsigned char *)&nc.pubkey,
  166. nentry.second.get_value<std::string>().c_str(),
  167. sizeof(nc.pubkey));
  168. } else if (!nentry.first.compare("weight")) {
  169. nc.weight = nentry.second.get_value<std::uint8_t>();
  170. } else if (!nentry.first.compare("listen")) {
  171. ret &= split_host_port(nc.listenhost, nc.listenport,
  172. nentry.second.get_value<std::string>());
  173. } else if (!nentry.first.compare("clisten")) {
  174. ret &= split_host_port(nc.clistenhost, nc.clistenport,
  175. nentry.second.get_value<std::string>());
  176. } else if (!nentry.first.compare("roles")) {
  177. nc.roles = nentry.second.get_value<std::uint8_t>();
  178. } else {
  179. std::cerr << "Unknown field in host config: " <<
  180. nentry.first << "\n";
  181. ret = false;
  182. }
  183. }
  184. if(nc.roles & ROLE_INGESTION) {
  185. ingestion_nodes.push_back(std::move(nc));
  186. }
  187. if(nc.roles & ROLE_STORAGE) {
  188. storage_nodes.push_back(std::move(nc));
  189. storage_map.push_back(node_num);
  190. }
  191. node_num++;
  192. }
  193. } else {
  194. std::cerr << "Unknown key in config: " <<
  195. entry.first << "\n";
  196. ret = false;
  197. }
  198. }
  199. if (!found_params) {
  200. std::cerr << "Could not find params in config\n";
  201. ret = false;
  202. }
  203. return ret;
  204. }
  205. static void usage(const char *argv0)
  206. {
  207. fprintf(stderr, "%s [-t nthreads] < config.json\n",
  208. argv0);
  209. exit(1);
  210. }
  211. /*
  212. Generate EMK (Encryption master Secret Key) and TMK (Token master Secret Key)
  213. */
  214. int generateMasterKeys(sgx_aes_gcm_128bit_key_t master_secret,
  215. aes_key &EMK, aes_key &TMK )
  216. {
  217. unsigned char zeroes[SGX_AESGCM_KEY_SIZE];
  218. unsigned char iv[SGX_AESGCM_IV_SIZE];
  219. unsigned char mac[SGX_AESGCM_MAC_SIZE];
  220. memset(iv, 0, SGX_AESGCM_IV_SIZE);
  221. memset(zeroes, 0, SGX_AESGCM_KEY_SIZE);
  222. memcpy(iv, "Encryption", sizeof("Encryption"));
  223. if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0,
  224. master_secret, iv, SGX_AESGCM_IV_SIZE, EMK, mac)) {
  225. printf("Client: generateMasterKeys FAIL\n");
  226. return -1;
  227. }
  228. printf("\n\nEncryption Master Key: ");
  229. for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
  230. printf("%x", EMK[i]);
  231. }
  232. printf("\n");
  233. memset(iv, 0, SGX_AESGCM_IV_SIZE);
  234. memcpy(iv, "Token", sizeof("Token"));
  235. if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0,
  236. master_secret, iv, SGX_AESGCM_IV_SIZE, TMK, mac)) {
  237. printf("generateMasterKeys failed\n");
  238. return -1;
  239. }
  240. printf("Token Master Key: ");
  241. for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
  242. printf("%x", TMK[i]);
  243. }
  244. printf("\n\n");
  245. return 1;
  246. }
  247. /*
  248. Takes the client_number, the master aes_key for generating client keys
  249. for encrypted communication with ingestion (client_ing_key) and
  250. storage servers (client_stg_key)
  251. */
  252. int generateClientKeys(clientid_t client_number, aes_key &EMK,
  253. aes_key &client_ing_key, aes_key &client_stg_key)
  254. {
  255. unsigned char zeroes[SGX_AESGCM_KEY_SIZE];
  256. unsigned char iv[SGX_AESGCM_IV_SIZE];
  257. unsigned char tag[SGX_AESGCM_MAC_SIZE];
  258. memset(iv, 0, SGX_AESGCM_IV_SIZE);
  259. memset(zeroes, 0, SGX_AESGCM_KEY_SIZE);
  260. memset(tag, 0, SGX_AESGCM_KEY_SIZE);
  261. memcpy(iv, &client_number, sizeof(client_number));
  262. /*
  263. printf("Client Key: (before Gen) ");
  264. for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
  265. printf("%x", client_ing_key[i]);
  266. }
  267. printf("\n");
  268. */
  269. if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, EMK,
  270. iv, SGX_AESGCM_IV_SIZE, client_ing_key, tag)) {
  271. printf("generateClientKeys failed\n");
  272. return -1;
  273. }
  274. memset(iv, 0, SGX_AESGCM_IV_SIZE);
  275. memcpy(iv, &client_number, sizeof(client_number));
  276. memcpy(iv +sizeof(client_number), "STG", sizeof("STG"));
  277. if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, EMK,
  278. iv, SGX_AESGCM_IV_SIZE, client_stg_key, tag)) {
  279. printf("generateClientKeys failed\n");
  280. return -1;
  281. }
  282. /*
  283. if(client_number % 10 == 0) {
  284. printf("Client %d Storage Key: (after Gen) ", client_number);
  285. for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
  286. printf("%x", client_stg_key[i]);
  287. }
  288. printf("\n");
  289. }
  290. */
  291. return 1;
  292. }
  293. void Client::initClient(clientid_t cid, aes_key ikey, aes_key skey,
  294. uint16_t num_storage_nodes, std::vector<uint16_t> &storage_map)
  295. {
  296. sim_id = cid;
  297. uint16_t stg_no = cid % num_storage_nodes;
  298. uint16_t stg_id = storage_map[stg_no];
  299. id = stg_id << DEST_UID_BITS;
  300. id += (cid/num_storage_nodes);
  301. memcpy(ing_key, ikey, SGX_AESGCM_KEY_SIZE);
  302. memcpy(stg_key, skey, SGX_AESGCM_KEY_SIZE);
  303. }
  304. void Client::initializeSocket(boost::asio::io_context &ioc,
  305. NodeConfig &ing_server)
  306. {
  307. boost::system::error_code err;
  308. boost::asio::ip::tcp::resolver resolver(ioc);
  309. while(1) {
  310. #ifdef VERBOSE_NET
  311. std::cerr << "Connecting to " << ing_server.name << "...\n";
  312. std::cout << ing_server.clistenhost << ":" << ing_server.clistenport;
  313. #endif
  314. // ingestion_sock needs io_context
  315. ingestion_sock = new boost::asio::ip::tcp::socket(ioc);
  316. boost::asio::connect(*ingestion_sock,
  317. resolver.resolve(ing_server.clistenhost,
  318. ing_server.clistenport), err);
  319. if (!err) break;
  320. std::cerr << "Connection to " << ing_server.name <<
  321. " refused, will , epoch_noretry.\n";
  322. sleep(1);
  323. }
  324. }
  325. /*
  326. Populates the buffer pt_msgbundle with a valid message pt_msgbundle.
  327. Assumes that it is supplied with a pt_msgbundle buffer of the correct length
  328. Correct length for pt_msgbundle = 8 + (priv_out)*(msg_size) + 16 bytes
  329. */
  330. void Client::generateMessageBundle(uint8_t priv_out, uint32_t msg_size,
  331. unsigned char *pt_msgbundle)
  332. {
  333. unsigned char *ptr = pt_msgbundle;
  334. // Setup message pt_msgbundle
  335. for(uint32_t i = 0; i < priv_out; i++) {
  336. memcpy(ptr, &id, sizeof(id));
  337. ptr+=(sizeof(id));
  338. memcpy(ptr, &id, sizeof(id));
  339. ptr+=(sizeof(id));
  340. uint32_t remaining_message_size = msg_size - (sizeof(id)*2);
  341. memset(ptr, 0, remaining_message_size);
  342. ptr+=(remaining_message_size);
  343. }
  344. }
  345. bool Client::encryptMessageBundle(uint32_t enc_bundle_size, unsigned char *pt_msgbundle,
  346. unsigned char *enc_msgbundle)
  347. {
  348. // Encrypt the pt_msgbundle
  349. unsigned char *pt_msgbundle_start = pt_msgbundle;
  350. unsigned char *enc_msgbundle_start = enc_msgbundle + SGX_AESGCM_IV_SIZE;
  351. unsigned char *enc_tag = enc_msgbundle + enc_bundle_size - SGX_AESGCM_MAC_SIZE;
  352. size_t bytes_to_encrypt = enc_bundle_size - SGX_AESGCM_MAC_SIZE - SGX_AESGCM_IV_SIZE;
  353. if (bytes_to_encrypt != gcm_encrypt(pt_msgbundle_start, bytes_to_encrypt,
  354. NULL, 0, ing_key, ing_iv, SGX_AESGCM_IV_SIZE, enc_msgbundle_start, enc_tag)) {
  355. printf("Client: encryptMessageBundle FAIL\n");
  356. return 0;
  357. }
  358. // Copy IV into the bundle
  359. memcpy(enc_msgbundle, ing_iv, SGX_AESGCM_IV_SIZE);
  360. // Update IV
  361. uint64_t *iv_ctr = (uint64_t*) ing_iv;
  362. (*iv_ctr)+=1;
  363. return 1;
  364. }
  365. /*
  366. Assumes pt_msgbundle is a buffer of size messageBundleSize(priv_out, msg_size)
  367. */
  368. void Client::sendMessageBundle(uint16_t priv_out, uint16_t msg_size,
  369. unsigned char *pt_msgbundle, unsigned char *enc_msgbundle)
  370. {
  371. uint32_t enc_bundle_size = encMsgBundleSize(priv_out, msg_size);
  372. generateMessageBundle(priv_out, msg_size, pt_msgbundle);
  373. encryptMessageBundle(enc_bundle_size, pt_msgbundle, enc_msgbundle);
  374. #ifdef VERBOSE_CLIENT
  375. displayPtMessageBundle(pt_msgbundle, priv_out, msg_size);
  376. #endif
  377. boost::asio::write(*ingestion_sock,
  378. boost::asio::buffer(enc_msgbundle, enc_bundle_size));
  379. }
  380. int Client::sendAuthMessage(unsigned long epoch_no)
  381. {
  382. uint32_t auth_size = sizeof(clientid_t) + sizeof(unsigned long) + SGX_AESGCM_KEY_SIZE;
  383. unsigned char *auth_message = (unsigned char*) malloc(auth_size);
  384. unsigned char *am_ptr = auth_message;
  385. memcpy(am_ptr, &sim_id, sizeof(sim_id));
  386. am_ptr+=sizeof(sim_id);
  387. memcpy(am_ptr, &epoch_no, sizeof(unsigned long));
  388. am_ptr+=sizeof(unsigned long);
  389. unsigned char zeroes[SGX_AESGCM_KEY_SIZE] = {0};
  390. unsigned char tag[SGX_AESGCM_MAC_SIZE] = {0};
  391. unsigned char epoch_iv[SGX_AESGCM_IV_SIZE] = {0};
  392. memcpy(epoch_iv, &epoch_no, sizeof(epoch_no));
  393. if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, ing_key,
  394. epoch_iv, SGX_AESGCM_IV_SIZE, am_ptr, tag)) {
  395. printf("generateClientKeys failed\n");
  396. return -1;
  397. }
  398. #ifdef VERBOSE_CLIENT
  399. printf("Client %d auth_message: \n", id);
  400. for(int i=0; i<auth_size; i++) {
  401. printf("%x", auth_message[i]);
  402. }
  403. printf("\n");
  404. #endif
  405. boost::asio::write(*ingestion_sock,
  406. boost::asio::buffer(auth_message, auth_size));
  407. return 1;
  408. }
  409. void generateClients(boost::asio::io_context &io_context,
  410. uint32_t cstart, uint32_t cstop, Client* &clients,
  411. aes_key &EMK, Config &config, std::vector<NodeConfig> &ingestion_nodes,
  412. std::vector<NodeConfig> &storage_nodes, std::vector<uint16_t> &storage_map,
  413. uint32_t num_clients_total, uint32_t clients_per_ing,
  414. uint32_t ing_with_additional)
  415. {
  416. aes_key client_ing_key;
  417. aes_key client_stg_key;
  418. uint16_t num_stg_nodes = storage_nodes.size();
  419. uint16_t *stg_map = new uint16_t[num_stg_nodes];
  420. for(uint32_t i=cstart; i<cstop; i++) {
  421. uint16_t ing_node_this_client = i/clients_per_ing;
  422. if(ing_node_this_client > ing_with_additional && ing_with_additional!=0) {
  423. uint16_t leftover = num_clients_total - (ing_with_additional * clients_per_ing);
  424. ing_node_this_client = ing_with_additional + (leftover / (clients_per_ing-1));
  425. }
  426. int ret = generateClientKeys(i, EMK, client_ing_key, client_stg_key);
  427. clients[i].initClient(i, client_ing_key, client_stg_key, num_stg_nodes, storage_map);
  428. clients[i].initializeSocket(io_context, ingestion_nodes[ing_node_this_client]);
  429. /*
  430. // Test that the keys generated match those generated within
  431. // enclave config
  432. unsigned char *ckey;
  433. ckey = clients[i].getKey();
  434. printf("Client %d, id = %d, key: ", i, clients[i].getid());
  435. for(int j=0;j<SGX_AESGCM_KEY_SIZE;j++) {
  436. printf("%x", ckey[j]);
  437. }
  438. printf("\n\n");
  439. */
  440. }
  441. struct timespec ep;
  442. clock_gettime(CLOCK_REALTIME_COARSE, &ep);
  443. unsigned long ep_time = ep.tv_sec * 1000000 + ep.tv_nsec/1000;
  444. unsigned long epoch_no = CEILDIV(ep_time, EPOCH_INTERVAL);
  445. for(uint32_t i=cstart; i<cstop; i++) {
  446. clients[i].sendAuthMessage(epoch_no);
  447. }
  448. }
  449. void sendMessageBundles(uint32_t cstart, uint32_t cstop, Client* &clients,
  450. Config &config)
  451. {
  452. uint16_t priv_out = config.m_priv_out;
  453. uint16_t msg_size = config.msg_size;
  454. uint32_t pt_bundle_size = ptMsgBundleSize(priv_out, msg_size);
  455. uint32_t enc_bundle_size = encMsgBundleSize(priv_out, msg_size);
  456. unsigned char *pt_msgbundle = (unsigned char*) malloc (pt_bundle_size);
  457. unsigned char *enc_msgbundle = (unsigned char*) malloc (enc_bundle_size);
  458. for(uint32_t i=cstart; i<cstop; i++) {
  459. clients[i].sendMessageBundle(priv_out, msg_size, pt_msgbundle, enc_msgbundle);
  460. }
  461. free(pt_msgbundle);
  462. free(enc_msgbundle);
  463. }
  464. /*
  465. Spin config.user_client actual clients. Each client:
  466. 1) Retrieve messages and tokens from their storage server
  467. 2) Send all their messages to the ingestion server
  468. 3) Wait for a predetermined EPOCH_DURATION time
  469. 4) Repeat from 1)
  470. */
  471. int main(int argc, char **argv)
  472. {
  473. // Unbuffer stdout
  474. setbuf(stdout, NULL);
  475. uint16_t nthreads = 1;
  476. const char *progname = argv[0];
  477. std::vector<NodeConfig> ingestion_nodes, storage_nodes;
  478. std::vector<uint16_t> storage_map;
  479. ++argv;
  480. // Parse options
  481. while (*argv && (*argv)[0] == '-') {
  482. if (!strcmp(*argv, "-t")) {
  483. if (argv[1] == NULL) {
  484. usage(progname);
  485. }
  486. nthreads = uint16_t(atoi(argv[1]));
  487. argv += 2;
  488. } else {
  489. usage(progname);
  490. }
  491. }
  492. printf("nthreads = %d\n", nthreads);
  493. // Read the config.json from the first line of stdin. We have to do
  494. // this before outputting anything to avoid potential deadlock with
  495. // the launch program.
  496. std::string configstr;
  497. std::getline(std::cin, configstr);
  498. Config config;
  499. aes_key EMK, TMK;
  500. boost::asio::io_context io_context;
  501. boost::asio::ip::tcp::resolver resolver(io_context);
  502. if (!config_parse(config, configstr, ingestion_nodes,
  503. storage_nodes, storage_map)) {
  504. exit(1);
  505. }
  506. Client *clients = new Client[config.user_count];
  507. printf("Number of ingestion_nodes = %ld, Number of storage_node = %ld\n",
  508. ingestion_nodes.size(), storage_nodes.size());
  509. generateMasterKeys(config.master_secret, EMK, TMK);
  510. uint32_t num_clients_total = config.user_count;
  511. uint16_t num_ing_nodes = ingestion_nodes.size();
  512. uint32_t clients_per_ing = CEILDIV(num_clients_total, num_ing_nodes);
  513. uint32_t clients_per_thread = CEILDIV(num_clients_total, nthreads);
  514. uint16_t ing_with_additional = num_clients_total % num_ing_nodes;
  515. std::thread threads[nthreads];
  516. // Generate all the clients for the experiment
  517. for(int i=0; i<nthreads; i++) {
  518. uint32_t cstart, cstop;
  519. cstart = i * clients_per_thread;
  520. cstop = (i==nthreads-1)? num_clients_total: (i+1) * clients_per_thread;
  521. printf("Thread %d, cstart = %d, cstop = %d\n", i, cstart, cstop);
  522. threads[i] = std::thread(generateClients, std::ref(io_context),
  523. cstart, cstop, std::ref(clients), std::ref(EMK), std::ref(config),
  524. std::ref(ingestion_nodes), std::ref(storage_nodes),
  525. std::ref(storage_map), num_clients_total,
  526. clients_per_ing, ing_with_additional);
  527. }
  528. for(int i=0; i<nthreads; i++) {
  529. threads[i].join();
  530. }
  531. // Multithreaded client message bundle generation and send
  532. uint32_t epoch = 1;
  533. while(epoch <= 3) {
  534. struct timespec tp;
  535. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  536. unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  537. for(int i=0; i<nthreads; i++) {
  538. uint32_t cstart, cstop;
  539. cstart = i * clients_per_thread;
  540. cstop = (i==nthreads-1)? num_clients_total: (i+1) * clients_per_thread;
  541. threads[i] = std::thread(sendMessageBundles, cstart, cstop,
  542. std::ref(clients), std::ref(config));
  543. }
  544. for(int i=0; i<nthreads; i++) {
  545. threads[i].join();
  546. }
  547. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  548. unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  549. unsigned long time_diff = end - start;
  550. // Sleep for the rest of the epoch interval
  551. printf("Done with submissions for Epoch %d\n", epoch);
  552. if (time_diff < EPOCH_INTERVAL) {
  553. unsigned long time_to_sleep_in_us = (useconds_t) EPOCH_INTERVAL - (useconds_t) time_diff;
  554. //printf("tts_us = %ld\n", time_to_sleep_in_us);
  555. usleep(time_to_sleep_in_us);
  556. }
  557. epoch++;
  558. }
  559. delete [] clients;
  560. }