clients.cpp 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979
  1. #include <iostream>
  2. #include <functional>
  3. #include "../App/appconfig.hpp"
  4. // The next line suppresses a deprecation warning within boost
  5. #define BOOST_BIND_GLOBAL_PLACEHOLDERS
  6. #include "boost/property_tree/ptree.hpp"
  7. #include "boost/property_tree/json_parser.hpp"
  8. #include <boost/thread.hpp>
  9. #include <boost/asio.hpp>
  10. #include "gcm.h"
  11. #include "sgx_tcrypto.h"
  12. #include "clients.hpp"
  13. #include <cstdlib>
  14. #define CEILDIV(x,y) (((x)+(y)-1)/(y))
  15. Config config;
  16. Client *clients;
  17. aes_key ESK, TSK;
  18. std::vector<NodeConfig> ingestion_nodes, storage_nodes;
  19. std::vector<uint16_t> storage_map;
  20. std::vector<uint16_t> ingestion_map;
  21. unsigned long setup_time;
  22. uint16_t nthreads = 1;
  23. // Split a hostport string like "127.0.0.1:12000" at the rightmost colon
  24. // into a host part "127.0.0.1" and a port part "12000".
  25. static bool split_host_port(std::string &host, std::string &port,
  26. const std::string &hostport)
  27. {
  28. size_t colon = hostport.find_last_of(':');
  29. if (colon == std::string::npos) {
  30. std::cerr << "Cannot parse \"" << hostport << "\" as host:port\n";
  31. return false;
  32. }
  33. host = hostport.substr(0, colon);
  34. port = hostport.substr(colon+1);
  35. return true;
  36. }
  37. // Convert a single hex character into its value from 0 to 15. Return
  38. // true on success, false if it wasn't a hex character.
  39. static inline bool hextoval(unsigned char &val, char hex)
  40. {
  41. if (hex >= '0' && hex <= '9') {
  42. val = ((unsigned char)hex)-'0';
  43. } else if (hex >= 'a' && hex <= 'f') {
  44. val = ((unsigned char)hex)-'a'+10;
  45. } else if (hex >= 'A' && hex <= 'F') {
  46. val = ((unsigned char)hex)-'A'+10;
  47. } else {
  48. return false;
  49. }
  50. return true;
  51. }
  52. // Convert a 2*len hex character string into a len-byte buffer. Return
  53. // true on success, false on failure.
  54. static bool hextobuf(unsigned char *buf, const char *str, size_t len)
  55. {
  56. if (strlen(str) != 2*len) {
  57. std::cerr << "Hex string was not the expected size\n";
  58. return false;
  59. }
  60. for (size_t i=0;i<len;++i) {
  61. unsigned char hi, lo;
  62. if (!hextoval(hi, str[2*i]) || !hextoval(lo, str[2*i+1])) {
  63. std::cerr << "Cannot parse string as hex\n";
  64. return false;
  65. }
  66. buf[i] = (unsigned char)((hi << 4) + lo);
  67. }
  68. return true;
  69. }
  70. void displayMessage(unsigned char *msg, uint16_t msg_size)
  71. {
  72. clientid_t sid, rid;
  73. unsigned char *ptr = msg;
  74. sid = *((clientid_t*) ptr);
  75. ptr+=sizeof(sid);
  76. rid = *((clientid_t*) ptr);
  77. ptr+=sizeof(sid);
  78. printf("Sender ID: %d, Receiver ID: %d, Token: N/A\n", sid, rid );
  79. printf("Message: ");
  80. for(int j = 0; j<msg_size - sizeof(sid)*2; j++) {
  81. printf("%x", (*ptr));
  82. ptr++;
  83. }
  84. printf("\n");
  85. }
  86. void displayPtMessageBundle(unsigned char *bundle, uint16_t priv_out,
  87. uint16_t msg_size)
  88. {
  89. unsigned char *ptr = bundle;
  90. for(int i=0; i<priv_out; i++) {
  91. displayMessage(ptr, msg_size);
  92. printf("\n");
  93. ptr+=msg_size;
  94. }
  95. printf("\n");
  96. }
  97. void displayEncMessageBundle(unsigned char *bundle, uint16_t priv_out,
  98. uint16_t msg_size)
  99. {
  100. unsigned char *ptr = bundle;
  101. uint64_t header = *((uint64_t*) ptr);
  102. ptr+=sizeof(uint64_t);
  103. printf("IV: ");
  104. for(int i=0; i<SGX_AESGCM_IV_SIZE; i++) {
  105. printf("%x", ptr[i]);
  106. }
  107. printf("\n");
  108. ptr+= SGX_AESGCM_IV_SIZE;
  109. for(int i=0; i<priv_out; i++) {
  110. displayMessage(ptr, msg_size);
  111. ptr+=msg_size;
  112. }
  113. printf("MAC: ");
  114. for(int i=0; i<SGX_AESGCM_MAC_SIZE; i++) {
  115. printf("%x", ptr[i]);
  116. }
  117. printf("\n");
  118. }
  119. static inline uint32_t encMsgBundleSize(uint16_t priv_out, uint16_t msg_size)
  120. {
  121. return(SGX_AESGCM_IV_SIZE + (priv_out * (msg_size + TOKEN_SIZE)) + SGX_AESGCM_MAC_SIZE);
  122. }
  123. static inline uint32_t ptMsgBundleSize(uint16_t priv_out, uint16_t msg_size)
  124. {
  125. return(priv_out * (msg_size + TOKEN_SIZE));
  126. }
  127. static inline uint32_t encMailboxSize(uint16_t priv_in, uint16_t msg_size)
  128. {
  129. return(SGX_AESGCM_IV_SIZE + (priv_in * msg_size) + SGX_AESGCM_MAC_SIZE);
  130. }
  131. static inline uint32_t ptMailboxSize(uint16_t priv_in, uint16_t msg_size)
  132. {
  133. return(priv_in * msg_size);
  134. }
  135. bool config_parse(Config &config, const std::string configstr,
  136. std::vector<NodeConfig> &ingestion_nodes,
  137. std::vector<NodeConfig> &storage_nodes,
  138. std::vector<uint16_t> &storage_map)
  139. {
  140. bool found_params = false;
  141. bool ret = true;
  142. std::istringstream configstream(configstr);
  143. boost::property_tree::ptree conftree;
  144. read_json(configstream, conftree);
  145. uint16_t node_num = 0;
  146. for (auto & entry : conftree) {
  147. if (!entry.first.compare("params")) {
  148. for (auto & pentry : entry.second) {
  149. if (!pentry.first.compare("msg_size")) {
  150. config.msg_size = pentry.second.get_value<uint16_t>();
  151. } else if (!pentry.first.compare("user_count")) {
  152. config.user_count = pentry.second.get_value<uint32_t>();
  153. } else if (!pentry.first.compare("priv_out")) {
  154. config.m_priv_out = pentry.second.get_value<uint8_t>();
  155. } else if (!pentry.first.compare("priv_in")) {
  156. config.m_priv_in = pentry.second.get_value<uint8_t>();
  157. } else if (!pentry.first.compare("pub_out")) {
  158. config.m_pub_out = pentry.second.get_value<uint8_t>();
  159. } else if (!pentry.first.compare("pub_in")) {
  160. config.m_pub_in = pentry.second.get_value<uint8_t>();
  161. // A hardcoded shared secret to derive various
  162. // keys for client -> server communications and tokens
  163. } else if (!pentry.first.compare("master_secret")) {
  164. std::string hex_key = pentry.second.data();
  165. memcpy(config.master_secret, hex_key.c_str(), SGX_AESGCM_KEY_SIZE);
  166. } else {
  167. std::cerr << "Unknown field in params: " <<
  168. pentry.first << "\n";
  169. ret = false;
  170. }
  171. }
  172. found_params = true;
  173. } else if (!entry.first.compare("nodes")) {
  174. for (auto & node : entry.second) {
  175. NodeConfig nc;
  176. // All nodes need to be assigned their role in manifest.yaml
  177. nc.roles = 0;
  178. for (auto & nentry : node.second) {
  179. if (!nentry.first.compare("name")) {
  180. nc.name = nentry.second.get_value<std::string>();
  181. } else if (!nentry.first.compare("pubkey")) {
  182. ret &= hextobuf((unsigned char *)&nc.pubkey,
  183. nentry.second.get_value<std::string>().c_str(),
  184. sizeof(nc.pubkey));
  185. } else if (!nentry.first.compare("weight")) {
  186. nc.weight = nentry.second.get_value<std::uint8_t>();
  187. } else if (!nentry.first.compare("listen")) {
  188. ret &= split_host_port(nc.listenhost, nc.listenport,
  189. nentry.second.get_value<std::string>());
  190. } else if (!nentry.first.compare("clisten")) {
  191. ret &= split_host_port(nc.clistenhost, nc.clistenport,
  192. nentry.second.get_value<std::string>());
  193. } else if (!nentry.first.compare("slisten")) {
  194. ret &= split_host_port(nc.slistenhost, nc.slistenport,
  195. nentry.second.get_value<std::string>());
  196. } else if (!nentry.first.compare("roles")) {
  197. nc.roles = nentry.second.get_value<std::uint8_t>();
  198. } else {
  199. std::cerr << "Unknown field in host config: " <<
  200. nentry.first << "\n";
  201. ret = false;
  202. }
  203. }
  204. if(nc.roles & ROLE_INGESTION) {
  205. ingestion_nodes.push_back(nc);
  206. ingestion_map.push_back(node_num);
  207. }
  208. if(nc.roles & ROLE_STORAGE) {
  209. storage_nodes.push_back(std::move(nc));
  210. storage_map.push_back(node_num);
  211. }
  212. node_num++;
  213. }
  214. } else {
  215. std::cerr << "Unknown key in config: " <<
  216. entry.first << "\n";
  217. ret = false;
  218. }
  219. }
  220. if (!found_params) {
  221. std::cerr << "Could not find params in config\n";
  222. ret = false;
  223. }
  224. return ret;
  225. }
  226. static void usage(const char *argv0)
  227. {
  228. fprintf(stderr, "%s [-t nthreads] < config.json\n",
  229. argv0);
  230. exit(1);
  231. }
  232. /*
  233. Generate ESK (Encryption Secret Key) and TSK (Token Secret Key)
  234. */
  235. int generateMasterKeys(sgx_aes_gcm_128bit_key_t master_secret,
  236. aes_key &ESK, aes_key &TSK )
  237. {
  238. unsigned char zeroes[SGX_AESGCM_KEY_SIZE];
  239. unsigned char iv[SGX_AESGCM_IV_SIZE];
  240. unsigned char mac[SGX_AESGCM_MAC_SIZE];
  241. memset(iv, 0, SGX_AESGCM_IV_SIZE);
  242. memset(zeroes, 0, SGX_AESGCM_KEY_SIZE);
  243. memcpy(iv, "Encryption", sizeof("Encryption"));
  244. if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0,
  245. master_secret, iv, SGX_AESGCM_IV_SIZE, ESK, mac)) {
  246. printf("Client: generateMasterKeys FAIL\n");
  247. return -1;
  248. }
  249. printf("\n\nEncryption Master Key: ");
  250. for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
  251. printf("%x", ESK[i]);
  252. }
  253. printf("\n");
  254. memset(iv, 0, SGX_AESGCM_IV_SIZE);
  255. memcpy(iv, "Token", sizeof("Token"));
  256. if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0,
  257. master_secret, iv, SGX_AESGCM_IV_SIZE, TSK, mac)) {
  258. printf("generateMasterKeys failed\n");
  259. return -1;
  260. }
  261. printf("Token Master Key: ");
  262. for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
  263. printf("%x", TSK[i]);
  264. }
  265. printf("\n\n");
  266. return 1;
  267. }
  268. /*
  269. Takes the client_number, the master aes_key for generating client keys
  270. for encrypted communication with ingestion (client_ing_key) and
  271. storage servers (client_stg_key)
  272. */
  273. int generateClientKeys(clientid_t client_number, aes_key &ESK,
  274. aes_key &client_ing_key, aes_key &client_stg_key)
  275. {
  276. unsigned char zeroes[SGX_AESGCM_KEY_SIZE];
  277. unsigned char iv[SGX_AESGCM_IV_SIZE];
  278. unsigned char tag[SGX_AESGCM_MAC_SIZE];
  279. memset(iv, 0, SGX_AESGCM_IV_SIZE);
  280. memset(zeroes, 0, SGX_AESGCM_KEY_SIZE);
  281. memset(tag, 0, SGX_AESGCM_KEY_SIZE);
  282. memcpy(iv, &client_number, sizeof(client_number));
  283. /*
  284. printf("Client Key: (before Gen) ");
  285. for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
  286. printf("%x", client_ing_key[i]);
  287. }
  288. printf("\n");
  289. */
  290. if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, ESK,
  291. iv, SGX_AESGCM_IV_SIZE, client_ing_key, tag)) {
  292. printf("generateClientKeys failed\n");
  293. return -1;
  294. }
  295. memset(iv, 0, SGX_AESGCM_IV_SIZE);
  296. memcpy(iv, &client_number, sizeof(client_number));
  297. memcpy(iv +sizeof(client_number), "STG", sizeof("STG"));
  298. if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, ESK,
  299. iv, SGX_AESGCM_IV_SIZE, client_stg_key, tag)) {
  300. printf("generateClientKeys failed\n");
  301. return -1;
  302. }
  303. /*
  304. printf("Client %d Ingestion: (after Gen) ", client_number);
  305. for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
  306. printf("%x", client_ing_key[i]);
  307. }
  308. printf("\n");
  309. */
  310. return 1;
  311. }
  312. void Client::initClient(clientid_t cid, uint16_t stg_id,
  313. aes_key ikey, aes_key skey)
  314. {
  315. uint16_t num_storage_nodes = storage_nodes.size();
  316. sim_id = cid;
  317. id = stg_id << DEST_UID_BITS;
  318. id += (cid/num_storage_nodes);
  319. token_list = new token[config.m_priv_out];
  320. memcpy(ing_key, ikey, SGX_AESGCM_KEY_SIZE);
  321. memcpy(stg_key, skey, SGX_AESGCM_KEY_SIZE);
  322. }
  323. void Client::initializeStgSocket(boost::asio::io_context &ioc,
  324. NodeConfig &stg_server, ip_addr *curr_ip, uint16_t &port_no)
  325. {
  326. boost::system::error_code err;
  327. while(1) {
  328. #ifdef VERBOSE_CLIENT
  329. std::cerr << "Connecting to " << stg_server.name << "...\n";
  330. std::cout << stg_server.slistenhost << ":" << stg_server.slistenport;
  331. #endif
  332. std::string ip_str = curr_ip->ip_str();
  333. boost::asio::ip::address ip_address =
  334. boost::asio::ip::address::from_string(ip_str, err);
  335. if(err) {
  336. printf("initializeStgSocket::Invalid IP address\n");
  337. }
  338. storage_sock = new boost::asio::ip::tcp::socket(ioc);
  339. storage_sock->open(boost::asio::ip::tcp::v4(), err);
  340. while(1) {
  341. boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
  342. storage_sock->bind(ep, err);
  343. if(!err) break;
  344. else {
  345. printf("STG: Error %s. (%s:%d)\n", err.message().c_str(), (curr_ip->ip_str()).c_str(), port_no);
  346. port_no++;
  347. if(port_no >= PORT_END) {
  348. port_no = PORT_START;
  349. curr_ip->increment(nthreads);
  350. }
  351. }
  352. }
  353. boost::asio::ip::address stg_ip = boost::asio::ip::address::from_string(stg_server.slistenhost, err);
  354. boost::asio::ip::tcp::endpoint stg_ep(stg_ip, std::stoi(stg_server.slistenport));
  355. // just for printing
  356. // boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
  357. storage_sock->connect(stg_ep, err);
  358. if (!err) {
  359. break;
  360. }
  361. std::cerr <<"STG: Connection to " << stg_server.name <<
  362. " refused, will , epoch_noretry.\n";
  363. std::cerr << curr_ip->ip_str() << ":" << port_no << "\n";
  364. #ifdef RANDOMIZE_CLIENT_RETRY_SLEEP_TIME
  365. int sleep_delay = rand() % 100000;
  366. usleep(sleep_delay);
  367. #else
  368. usleep(1000000);
  369. #endif
  370. delete(storage_sock);
  371. }
  372. }
  373. void Client::initializeIngSocket(boost::asio::io_context &ioc,
  374. NodeConfig &ing_server, ip_addr *curr_ip, uint16_t &port_no)
  375. {
  376. boost::system::error_code err;
  377. boost::asio::ip::tcp::resolver resolver(ioc);
  378. while(1) {
  379. #ifdef VERBOSE_CLIENT
  380. std::cerr << "Connecting to " << ing_server.name << "...\n";
  381. std::cout << ing_server.clistenhost << ":" << ing_server.clistenport;
  382. #endif
  383. std::string ip_str = curr_ip->ip_str();
  384. boost::asio::ip::address ip_address =
  385. boost::asio::ip::address::from_string(ip_str, err);
  386. if(err) {
  387. printf("initializeIngSocket::Invalid IP address\n");
  388. }
  389. ingestion_sock = new boost::asio::ip::tcp::socket(ioc);
  390. ingestion_sock->open(boost::asio::ip::tcp::v4(), err);
  391. while(1) {
  392. boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
  393. ingestion_sock->bind(ep, err);
  394. if(!err) break;
  395. else {
  396. printf("ING: Error %s. (%s:%d)\n", err.message().c_str(), (curr_ip->ip_str()).c_str(), port_no);
  397. port_no++;
  398. if(port_no >= PORT_END) {
  399. port_no = PORT_START;
  400. curr_ip->increment(nthreads);
  401. }
  402. }
  403. }
  404. // just for printing
  405. boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
  406. boost::asio::ip::address ing_ip = boost::asio::ip::address::from_string(ing_server.clistenhost, err);
  407. boost::asio::ip::tcp::endpoint ing_ep(ing_ip, std::stoi(ing_server.clistenport));
  408. //std::cout << "Ing endpoint:" << ing_ep << "\n";
  409. //std::cout<<"ING: Attempting to connect client " << ep << " -> " << ing_ep <<"\n";
  410. ingestion_sock->connect(ing_ep, err);
  411. /*
  412. boost::asio::connect(*ingestion_sock,
  413. resolver.resolve(ing_server.clistenhost,
  414. ing_server.clistenport), err);
  415. */
  416. if (!err) {
  417. //std::cout<<"ING: Connected client " << ep << " -> " << ing_ep <<"\n";
  418. break;
  419. }
  420. std::cerr << "ING: Connection to " << ing_server.name <<
  421. " refused, will , epoch_noretry.\n";
  422. std::cerr << curr_ip->ip_str() << ":" << port_no << "\n";
  423. #ifdef RANDOMIZE_CLIENT_RETRY_SLEEP_TIME
  424. int sleep_delay = rand() % 100000;
  425. usleep(sleep_delay);
  426. #else
  427. usleep(1000000);
  428. #endif
  429. delete(ingestion_sock);
  430. }
  431. }
  432. /*
  433. Populates the buffer pt_msgbundle with a valid message pt_msgbundle.
  434. Assumes that it is supplied with a pt_msgbundle buffer of the correct length
  435. Correct length for pt_msgbundle = 8 + (priv_out)*(msg_size) + 16 bytes
  436. */
  437. void Client::generateMessageBundle(uint8_t priv_out, uint32_t msg_size,
  438. unsigned char *pt_msgbundle)
  439. {
  440. unsigned char *ptr = pt_msgbundle;
  441. // Setup message pt_msgbundle
  442. for(uint32_t i = 0; i < priv_out; i++) {
  443. memcpy(ptr, &id, sizeof(id));
  444. ptr+=(sizeof(id));
  445. memcpy(ptr, &id, sizeof(id));
  446. ptr+=(sizeof(id));
  447. uint32_t remaining_message_size = msg_size - (sizeof(id)*2);
  448. memset(ptr, 0, remaining_message_size);
  449. ptr+=(remaining_message_size);
  450. }
  451. // Add the tokens for this msgbundle
  452. memcpy(ptr, token_list, config.m_priv_out * TOKEN_SIZE);
  453. }
  454. bool Client::encryptMessageBundle(uint32_t enc_bundle_size, unsigned char *pt_msgbundle,
  455. unsigned char *enc_msgbundle)
  456. {
  457. // Encrypt the pt_msgbundle
  458. unsigned char *pt_msgbundle_start = pt_msgbundle;
  459. unsigned char *enc_msgbundle_start = enc_msgbundle + SGX_AESGCM_IV_SIZE;
  460. unsigned char *enc_tag = enc_msgbundle + enc_bundle_size - SGX_AESGCM_MAC_SIZE;
  461. size_t bytes_to_encrypt = enc_bundle_size - SGX_AESGCM_MAC_SIZE - SGX_AESGCM_IV_SIZE;
  462. if (bytes_to_encrypt != gcm_encrypt(pt_msgbundle_start, bytes_to_encrypt,
  463. NULL, 0, ing_key, ing_iv, SGX_AESGCM_IV_SIZE, enc_msgbundle_start, enc_tag)) {
  464. printf("Client: encryptMessageBundle FAIL\n");
  465. return 0;
  466. }
  467. // Copy IV into the bundle
  468. memcpy(enc_msgbundle, ing_iv, SGX_AESGCM_IV_SIZE);
  469. // Update IV
  470. uint64_t *iv_ctr = (uint64_t*) ing_iv;
  471. (*iv_ctr)+=1;
  472. return 1;
  473. }
  474. void Client::sendMessageBundle()
  475. {
  476. uint16_t priv_out = config.m_priv_out;
  477. uint16_t msg_size = config.msg_size;
  478. uint32_t send_pt_msgbundle_size = ptMsgBundleSize(priv_out, msg_size);
  479. uint32_t send_enc_msgbundle_size = encMsgBundleSize(priv_out, msg_size);
  480. unsigned char *send_pt_msgbundle = (unsigned char*) malloc (send_pt_msgbundle_size);
  481. unsigned char *send_enc_msgbundle = (unsigned char*) malloc (send_enc_msgbundle_size);
  482. generateMessageBundle(priv_out, msg_size, send_pt_msgbundle);
  483. encryptMessageBundle(send_enc_msgbundle_size, send_pt_msgbundle, send_enc_msgbundle);
  484. #ifdef VERBOSE_CLIENT
  485. displayPtMessageBundle(send_pt_msgbundle, priv_out, msg_size);
  486. #endif
  487. boost::asio::async_write(*ingestion_sock,
  488. boost::asio::buffer(send_enc_msgbundle, send_enc_msgbundle_size),
  489. [this, send_enc_msgbundle] (boost::system::error_code ecc, std::size_t) {
  490. #ifdef VERBOSE_CLIENT
  491. if(sim_id==0){
  492. printf("TEST: Client 0 send their msgbundle\n");
  493. }
  494. #endif
  495. if (ecc) {
  496. if(ecc == boost::asio::error::eof) {
  497. delete(storage_sock);
  498. }
  499. else {
  500. printf("Error %s\n", ecc.message().c_str());
  501. }
  502. printf("Client: boost async_write failed for sending message bundle\n");
  503. return;
  504. }
  505. free(send_enc_msgbundle);
  506. });
  507. free(send_pt_msgbundle);
  508. }
  509. int Client::sendIngAuthMessage(unsigned long epoch_no)
  510. {
  511. uint32_t auth_size = sizeof(clientid_t) + sizeof(unsigned long) + SGX_AESGCM_KEY_SIZE;
  512. unsigned char *auth_message = (unsigned char*) malloc(auth_size);
  513. unsigned char *am_ptr = auth_message;
  514. memcpy(am_ptr, &sim_id, sizeof(sim_id));
  515. am_ptr+=sizeof(sim_id);
  516. memcpy(am_ptr, &epoch_no, sizeof(unsigned long));
  517. am_ptr+=sizeof(unsigned long);
  518. unsigned char zeroes[SGX_AESGCM_KEY_SIZE] = {0};
  519. unsigned char tag[SGX_AESGCM_MAC_SIZE] = {0};
  520. unsigned char epoch_iv[SGX_AESGCM_IV_SIZE] = {0};
  521. memcpy(epoch_iv, &epoch_no, sizeof(epoch_no));
  522. if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, ing_key,
  523. epoch_iv, SGX_AESGCM_IV_SIZE, am_ptr, tag)) {
  524. printf("generateClientKeys failed\n");
  525. return -1;
  526. }
  527. #ifdef VERBOSE_CLIENT
  528. printf("Client %d auth_message: \n", id);
  529. for(int i=0; i<auth_size; i++) {
  530. printf("%x", auth_message[i]);
  531. }
  532. printf("\n");
  533. #endif
  534. /*
  535. if(sim_id%7919==0) {
  536. printf("Client %d auth_message: \n", sim_id);
  537. for(int i=0; i<TOKEN_SIZE; i++) {
  538. printf("%x", am_ptr[i]);
  539. }
  540. printf("\n");
  541. }
  542. */
  543. boost::asio::write(*ingestion_sock,
  544. boost::asio::buffer(auth_message, auth_size));
  545. return 1;
  546. }
  547. int Client::sendStgAuthMessage(unsigned long epoch_no)
  548. {
  549. uint32_t auth_size = sizeof(clientid_t) + sizeof(unsigned long) + SGX_AESGCM_KEY_SIZE;
  550. unsigned char *auth_message = (unsigned char*) malloc(auth_size);
  551. unsigned char *am_ptr = auth_message;
  552. memcpy(am_ptr, &sim_id, sizeof(sim_id));
  553. am_ptr+=sizeof(sim_id);
  554. memcpy(am_ptr, &epoch_no, sizeof(unsigned long));
  555. am_ptr+=sizeof(unsigned long);
  556. unsigned char zeroes[SGX_AESGCM_KEY_SIZE] = {0};
  557. unsigned char tag[SGX_AESGCM_MAC_SIZE] = {0};
  558. unsigned char epoch_iv[SGX_AESGCM_IV_SIZE] = {0};
  559. memcpy(epoch_iv, &epoch_no, sizeof(epoch_no));
  560. if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, stg_key,
  561. epoch_iv, SGX_AESGCM_IV_SIZE, am_ptr, tag)) {
  562. printf("generateClientKeys failed\n");
  563. return -1;
  564. }
  565. #ifdef VERBOSE_CLIENT
  566. printf("Client %d auth_message: \n", id);
  567. for(int i=0; i<auth_size; i++) {
  568. printf("%x", auth_message[i]);
  569. }
  570. printf("\n");
  571. #endif
  572. boost::asio::async_write(*storage_sock,
  573. boost::asio::buffer(auth_message, auth_size),
  574. [this] (boost::system::error_code ecc, std::size_t) {
  575. if (ecc) {
  576. if(ecc == boost::asio::error::eof) {
  577. delete(storage_sock);
  578. }
  579. else {
  580. printf("Error %s\n", ecc.message().c_str());
  581. }
  582. printf("Client::sendStgAuthMessage boost async_write failed\n");
  583. return;
  584. }
  585. });
  586. return 1;
  587. }
  588. void Client::setup_client(boost::asio::io_context &io_context,
  589. uint32_t sim_id, uint16_t ing_node_id, uint16_t stg_node_id,
  590. ip_addr *curr_ip, uint16_t &port_no)
  591. {
  592. // Setup the client's
  593. // (i) client_id
  594. // (ii) symmetric keys shared with their ingestion and storage server
  595. // (iii) sockets to their ingestion and storage server
  596. aes_key client_ing_key;
  597. aes_key client_stg_key;
  598. int ret = generateClientKeys(sim_id, ESK, client_ing_key, client_stg_key);
  599. initClient(sim_id, stg_node_id, client_ing_key, client_stg_key);
  600. initializeStgSocket(io_context, storage_nodes[stg_node_id], curr_ip, port_no);
  601. port_no++;
  602. initializeIngSocket(io_context, ingestion_nodes[ing_node_id], curr_ip, port_no);
  603. port_no++;
  604. // Authenticate clients to their ingestion and storage servers
  605. struct timespec ep;
  606. clock_gettime(CLOCK_REALTIME_COARSE, &ep);
  607. unsigned long time_in_ns = ep.tv_sec * 1000000 + ep.tv_nsec/1000;
  608. unsigned long epoch_no = CEILDIV(time_in_ns, EPOCH_INTERVAL);
  609. sendStgAuthMessage(epoch_no);
  610. sendIngAuthMessage(epoch_no);
  611. epoch_process();
  612. }
  613. void generateClients(boost::asio::io_context &io_context,
  614. uint32_t cstart, uint32_t cstop, uint8_t thread_no)
  615. {
  616. uint32_t num_clients_total = config.user_count;
  617. uint16_t num_stg_nodes = storage_nodes.size();
  618. uint16_t num_ing_nodes = ingestion_nodes.size();
  619. uint16_t port_no = PORT_START;
  620. ip_addr curr_ip;
  621. curr_ip.ip1 = 127;
  622. curr_ip.ip2 = thread_no;
  623. curr_ip.ip3 = 0;
  624. curr_ip.ip4 = 0;
  625. for(uint32_t i=cstart; i<cstop; i++) {
  626. // Compute client's ip and port
  627. #ifdef CLIENT_UNIQUE_IP
  628. if(port_no>=PORT_END) {
  629. port_no = PORT_START;
  630. }
  631. curr_ip.increment(nthreads);
  632. #else
  633. if(port_no>=PORT_END) {
  634. port_no = PORT_START;
  635. curr_ip.increment(nthreads);
  636. }
  637. #endif
  638. uint16_t ing_no = i % num_ing_nodes;
  639. uint16_t stg_no = i % num_stg_nodes;
  640. uint16_t stg_node_id = storage_map[stg_no];
  641. uint16_t ing_node_id = ingestion_map[ing_no];
  642. clients[i].setup_client(io_context, i, ing_node_id, stg_node_id,
  643. &curr_ip, port_no);
  644. }
  645. }
  646. /*
  647. Epochs are server driven.
  648. In a single epoch, each client waits to receive from their storage server
  649. (i) a token bundle for this epoch and (ii) their messages from the last epoch
  650. The client then sends their messages for this epoch to their ingestion servers
  651. using the tokens they received in this epoch
  652. */
  653. void Client::epoch_process() {
  654. uint32_t pt_token_size = (config.m_priv_out * SGX_AESGCM_KEY_SIZE);
  655. uint32_t token_bundle_size = pt_token_size + SGX_AESGCM_IV_SIZE
  656. + SGX_AESGCM_MAC_SIZE;
  657. unsigned char *enc_tokens = (unsigned char*) malloc (token_bundle_size);
  658. //Async read the encrypted tokens for this epoch
  659. boost::asio::async_read(*storage_sock, boost::asio::buffer(enc_tokens, token_bundle_size),
  660. [this, enc_tokens, token_bundle_size, pt_token_size]
  661. (boost::system::error_code ec, std::size_t) {
  662. if (ec) {
  663. if(ec == boost::asio::error::eof) {
  664. delete(storage_sock);
  665. }
  666. else {
  667. printf("Error %s\n", ec.message().c_str());
  668. printf("Client::epoch_process boost async_read_tokens failed\n");
  669. }
  670. return;
  671. }
  672. #ifdef VERBOSE_CLIENT
  673. if(sim_id == 0) {
  674. printf("TEST: Client 0: Encrypted token bundle received:\n");
  675. for(uint32_t i = 0; i < token_bundle_size; i++) {
  676. printf("%x", enc_tokens[i]);
  677. }
  678. printf("\n");
  679. }
  680. #endif
  681. // Decrypt the token bundle
  682. unsigned char *enc_tkn_ptr = enc_tokens + SGX_AESGCM_IV_SIZE;
  683. unsigned char *enc_tkn_tag = enc_tokens + SGX_AESGCM_IV_SIZE + pt_token_size;
  684. int decrypted_bytes = gcm_decrypt(enc_tkn_ptr, pt_token_size,
  685. NULL, 0, enc_tkn_tag, (unsigned char*) &(this->stg_key),
  686. enc_tokens, SGX_AESGCM_IV_SIZE, (unsigned char*) (this->token_list));
  687. if(decrypted_bytes != pt_token_size) {
  688. printf("Client::epoch_process gcm_decrypt tokens failed. decrypted_bytes = %d \n", decrypted_bytes);
  689. }
  690. free(enc_tokens);
  691. /*
  692. unsigned char *tkn_ptr = (unsigned char*) this->token_list;
  693. if(sim_id==0) {
  694. printf("TEST: Client 0: Decrypted client tokens:\n");
  695. for(int i = 0; i < 2 * SGX_AESGCM_KEY_SIZE; i++) {
  696. printf("%x", tkn_ptr[i]);
  697. }
  698. printf("\n");
  699. }
  700. */
  701. // Async read the messages recieved in the last epoch
  702. uint16_t priv_in = config.m_priv_in;
  703. uint16_t msg_size = config.msg_size;
  704. uint32_t recv_pt_mailbox_size = ptMailboxSize(priv_in, msg_size);
  705. uint32_t recv_enc_mailbox_size = encMailboxSize(priv_in, msg_size);
  706. unsigned char *recv_pt_mailbox = (unsigned char*) malloc (recv_pt_mailbox_size);
  707. unsigned char *recv_enc_mailbox = (unsigned char*) malloc (recv_enc_mailbox_size);
  708. boost::asio::async_read(*storage_sock,
  709. boost::asio::buffer(recv_enc_mailbox, recv_enc_mailbox_size),
  710. [this, recv_pt_mailbox, recv_enc_mailbox]
  711. (boost::system::error_code ecc, std::size_t) {
  712. if (ecc) {
  713. if(ecc == boost::asio::error::eof) {
  714. delete(storage_sock);
  715. }
  716. else {
  717. printf("Error %s\n", ecc.message().c_str());
  718. }
  719. printf("Client: boost async_read failed for recieving msg_bundle\n");
  720. return;
  721. }
  722. #ifdef VERBOSE_CLIENT
  723. if(sim_id == 0) {
  724. printf("TEST: Client 0: Encrypted msgbundle received\n");
  725. }
  726. #endif
  727. // Do whatever processing with the received messages here
  728. free(recv_enc_mailbox);
  729. free(recv_pt_mailbox);
  730. // Send this epoch's message bundle
  731. sendMessageBundle();
  732. epoch_process();
  733. });
  734. });
  735. }
  736. void client_epoch_process(uint32_t cstart, uint32_t cstop)
  737. {
  738. for(uint32_t i=cstart; i<cstop; i++) {
  739. clients[i].epoch_process();
  740. }
  741. }
  742. void initializeClients(boost::asio::io_context &io_context, uint16_t nthreads)
  743. {
  744. std::vector<boost::thread> threads;
  745. uint32_t num_clients_total = config.user_count;
  746. size_t clients_per_thread = CEILDIV(num_clients_total, nthreads);
  747. // Generate all the clients for the experiment
  748. for(int i=0; i<nthreads; i++) {
  749. uint32_t cstart, cstop;
  750. cstart = i * clients_per_thread;
  751. cstop = (i==(nthreads-1))? num_clients_total: (i+1) * clients_per_thread;
  752. #ifdef VERBOSE_CLIENT
  753. printf("Thread %d, cstart = %d, cstop = %d\n", i, cstart, cstop);
  754. #endif
  755. threads.emplace_back(boost::thread(generateClients,
  756. boost::ref(io_context), cstart, cstop, i));
  757. }
  758. for(int i=0; i<nthreads; i++) {
  759. threads[i].join();
  760. }
  761. }
  762. void run_epochs(int nthreads) {
  763. size_t num_clients_total = config.user_count;
  764. size_t clients_per_thread = CEILDIV(num_clients_total, nthreads);
  765. std::vector<boost::thread> threads;
  766. for(int i=0; i<nthreads; i++) {
  767. uint32_t cstart, cstop;
  768. cstart = i * clients_per_thread;
  769. cstop = (i==nthreads-1)? num_clients_total: (i+1) * clients_per_thread;
  770. threads.emplace_back(boost::thread(client_epoch_process,
  771. cstart, cstop));
  772. }
  773. for(int i=0; i<nthreads; i++) {
  774. threads[i].join();
  775. }
  776. }
  777. int main(int argc, char **argv)
  778. {
  779. // Unbuffer stdout
  780. setbuf(stdout, NULL);
  781. const char *progname = argv[0];
  782. ++argv;
  783. // Parse options
  784. while (*argv && (*argv)[0] == '-') {
  785. if (!strcmp(*argv, "-t")) {
  786. if (argv[1] == NULL) {
  787. usage(progname);
  788. }
  789. nthreads = uint16_t(atoi(argv[1]));
  790. argv += 2;
  791. } else {
  792. usage(progname);
  793. }
  794. }
  795. // Read the config.json from the first line of stdin. We have to do
  796. // this before outputting anything to avoid potential deadlock with
  797. // the launch program.
  798. std::string configstr;
  799. std::getline(std::cin, configstr);
  800. boost::asio::io_context io_context;
  801. if (!config_parse(config, configstr, ingestion_nodes,
  802. storage_nodes, storage_map)) {
  803. exit(1);
  804. }
  805. clients = new Client[config.user_count];
  806. #ifdef VERBOSE_CLIENT
  807. printf("Number of ingestion_nodes = %ld, Number of storage_node = %ld\n",
  808. ingestion_nodes.size(), storage_nodes.size());
  809. #endif
  810. generateMasterKeys(config.master_secret, ESK, TSK);
  811. // Queue up the actual work
  812. boost::asio::post(io_context, [&]{
  813. initializeClients(io_context, nthreads);
  814. });
  815. // Start another thread; one will perform the work and the other
  816. // will execute the async_write handlers
  817. boost::thread t([&]{io_context.run();});
  818. io_context.run();
  819. t.join();
  820. delete [] clients;
  821. }