clients.cpp 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156
  1. #include <iostream>
  2. #include <functional>
  3. #include "../App/appconfig.hpp"
  4. // The next line suppresses a deprecation warning within boost
  5. #define BOOST_BIND_GLOBAL_PLACEHOLDERS
  6. #include "boost/property_tree/ptree.hpp"
  7. #include "boost/property_tree/json_parser.hpp"
  8. #include <boost/thread.hpp>
  9. #include <boost/asio.hpp>
  10. #include "gcm.h"
  11. #include "sgx_tcrypto.h"
  12. #include "clients.hpp"
  13. #include <cstdlib>
  14. #define CEILDIV(x,y) (((x)+(y)-1)/(y))
  15. Config config;
  16. Client *clients;
  17. aes_key ESK, TSK;
  18. std::vector<NodeConfig> ingestion_nodes, storage_nodes;
  19. std::vector<uint16_t> storage_map;
  20. std::vector<uint16_t> ingestion_map;
  21. unsigned long setup_time;
  22. uint16_t nthreads = 1;
  23. bool token_channel;
  24. // Split a hostport string like "127.0.0.1:12000" at the rightmost colon
  25. // into a host part "127.0.0.1" and a port part "12000".
  26. static bool split_host_port(std::string &host, std::string &port,
  27. const std::string &hostport)
  28. {
  29. size_t colon = hostport.find_last_of(':');
  30. if (colon == std::string::npos) {
  31. std::cerr << "Cannot parse \"" << hostport << "\" as host:port\n";
  32. return false;
  33. }
  34. host = hostport.substr(0, colon);
  35. port = hostport.substr(colon+1);
  36. return true;
  37. }
  38. // Convert a single hex character into its value from 0 to 15. Return
  39. // true on success, false if it wasn't a hex character.
  40. static inline bool hextoval(unsigned char &val, char hex)
  41. {
  42. if (hex >= '0' && hex <= '9') {
  43. val = ((unsigned char)hex)-'0';
  44. } else if (hex >= 'a' && hex <= 'f') {
  45. val = ((unsigned char)hex)-'a'+10;
  46. } else if (hex >= 'A' && hex <= 'F') {
  47. val = ((unsigned char)hex)-'A'+10;
  48. } else {
  49. return false;
  50. }
  51. return true;
  52. }
  53. // Convert a 2*len hex character string into a len-byte buffer. Return
  54. // true on success, false on failure.
  55. static bool hextobuf(unsigned char *buf, const char *str, size_t len)
  56. {
  57. if (strlen(str) != 2*len) {
  58. std::cerr << "Hex string was not the expected size\n";
  59. return false;
  60. }
  61. for (size_t i=0;i<len;++i) {
  62. unsigned char hi, lo;
  63. if (!hextoval(hi, str[2*i]) || !hextoval(lo, str[2*i+1])) {
  64. std::cerr << "Cannot parse string as hex\n";
  65. return false;
  66. }
  67. buf[i] = (unsigned char)((hi << 4) + lo);
  68. }
  69. return true;
  70. }
  71. void displayMessage(unsigned char *msg, uint16_t msg_size,
  72. uint32_t client)
  73. {
  74. std::stringstream outbuf;
  75. clientid_t sid, rid;
  76. uint32_t prio;
  77. unsigned char *ptr = msg;
  78. rid = *((clientid_t*) ptr);
  79. ptr+=sizeof(rid);
  80. if (!token_channel) {
  81. prio = *((uint32_t*) ptr);
  82. ptr+=sizeof(prio);
  83. }
  84. sid = *((clientid_t*) ptr);
  85. ptr+=sizeof(sid);
  86. if (token_channel) {
  87. outbuf << std::hex
  88. << "Cli: "
  89. << std::setfill('0') << std::setw(8) << client
  90. << ", Recv: "
  91. << std::setfill('0') << std::setw(8) << rid
  92. << ", Send: "
  93. << std::setfill('0') << std::setw(8) << sid
  94. << "\n";
  95. } else {
  96. outbuf << std::hex
  97. << "Cli: "
  98. << std::setfill('0') << std::setw(8) << client
  99. << ", Recv: "
  100. << std::setfill('0') << std::setw(8) << rid
  101. << ", Prio: "
  102. << std::setfill('0') << std::setw(8) << prio
  103. << ", Send: "
  104. << std::setfill('0') << std::setw(8) << sid
  105. << "\n";
  106. }
  107. unsigned char *end = msg + msg_size;
  108. while (ptr < end) {
  109. size_t remain = end-ptr;
  110. if (remain > 16) {
  111. remain = 16;
  112. }
  113. char row[34 + 17 + 2];
  114. memset(row, ' ', sizeof(row)-1);
  115. row[sizeof(row)-2] = '\n';
  116. row[sizeof(row)-1] = '\0';
  117. size_t hexoffset = 0;
  118. size_t charoffset = 34;
  119. for (size_t i=0; i<remain; ++i) {
  120. char hex[3];
  121. sprintf(hex, "%02x", ptr[i]);
  122. memcpy(row+hexoffset, hex, 2);
  123. if (ptr[i] >= ' ' && ptr[i] <= '~') {
  124. row[charoffset] = ptr[i];
  125. } else {
  126. row[charoffset] = '.';
  127. }
  128. hexoffset += 2;
  129. charoffset += 1;
  130. if (i == 7) {
  131. hexoffset += 1;
  132. charoffset += 1;
  133. }
  134. }
  135. outbuf << row;
  136. ptr += remain;
  137. }
  138. outbuf << "\n";
  139. std::cout << outbuf.str();
  140. }
  141. void displayPtMessageBundle(unsigned char *bundle, uint16_t num_out,
  142. uint16_t msg_size, uint32_t client)
  143. {
  144. unsigned char *ptr = bundle;
  145. for(int i=0; i<num_out; i++) {
  146. displayMessage(ptr, msg_size, client);
  147. ptr+=msg_size;
  148. }
  149. printf("\n");
  150. }
  151. static inline uint32_t encPubMsgBundleSize(uint16_t id_out, uint16_t msg_size)
  152. {
  153. return SGX_AESGCM_IV_SIZE + (uint32_t(id_out) * msg_size)
  154. + SGX_AESGCM_MAC_SIZE;
  155. }
  156. static inline uint32_t ptPubMsgBundleSize(uint16_t id_out, uint16_t msg_size)
  157. {
  158. return uint32_t(id_out) * msg_size;
  159. }
  160. static inline uint32_t encMsgBundleSize(uint16_t token_out, uint16_t msg_size)
  161. {
  162. return SGX_AESGCM_IV_SIZE + (uint32_t(token_out) * (msg_size + TOKEN_SIZE))
  163. + SGX_AESGCM_MAC_SIZE;
  164. }
  165. static inline uint32_t ptMsgBundleSize(uint16_t token_out, uint16_t msg_size)
  166. {
  167. return uint32_t(token_out) * (msg_size + TOKEN_SIZE);
  168. }
  169. static inline uint32_t encMailboxSize(uint16_t token_in, uint16_t msg_size)
  170. {
  171. return SGX_AESGCM_IV_SIZE + (uint32_t(token_in) * msg_size)
  172. + SGX_AESGCM_MAC_SIZE;
  173. }
  174. static inline uint32_t ptMailboxSize(uint16_t token_in, uint16_t msg_size)
  175. {
  176. return uint32_t(token_in) * msg_size;
  177. }
  178. bool config_parse(Config &config, const std::string configstr,
  179. std::vector<NodeConfig> &ingestion_nodes,
  180. std::vector<NodeConfig> &storage_nodes,
  181. std::vector<uint16_t> &storage_map)
  182. {
  183. bool found_params = false;
  184. bool ret = true;
  185. std::istringstream configstream(configstr);
  186. boost::property_tree::ptree conftree;
  187. read_json(configstream, conftree);
  188. uint16_t node_num = 0;
  189. for (auto & entry : conftree) {
  190. if (!entry.first.compare("params")) {
  191. for (auto & pentry : entry.second) {
  192. if (!pentry.first.compare("msg_size")) {
  193. config.msg_size = pentry.second.get_value<uint16_t>();
  194. } else if (!pentry.first.compare("user_count")) {
  195. config.user_count = pentry.second.get_value<uint32_t>();
  196. } else if (!pentry.first.compare("token_out")) {
  197. config.m_token_out = pentry.second.get_value<uint8_t>();
  198. } else if (!pentry.first.compare("token_in")) {
  199. config.m_token_in = pentry.second.get_value<uint8_t>();
  200. } else if (!pentry.first.compare("id_out")) {
  201. config.m_id_out = pentry.second.get_value<uint8_t>();
  202. } else if (!pentry.first.compare("id_in")) {
  203. config.m_id_in = pentry.second.get_value<uint8_t>();
  204. // A stub hardcoded shared secret to derive various
  205. // keys for client <-> server communications and tokens
  206. // In reality, this would be a key exchange
  207. } else if (!pentry.first.compare("master_secret")) {
  208. std::string hex_key = pentry.second.data();
  209. memcpy(config.master_secret, hex_key.c_str(),
  210. SGX_AESGCM_KEY_SIZE);
  211. } else if (!pentry.first.compare("token_channel")) {
  212. config.token_channel = pentry.second.get_value<bool>();
  213. } else {
  214. std::cerr << "Unknown field in params: " <<
  215. pentry.first << "\n";
  216. ret = false;
  217. }
  218. }
  219. found_params = true;
  220. } else if (!entry.first.compare("nodes")) {
  221. for (auto & node : entry.second) {
  222. NodeConfig nc;
  223. // All nodes need to be assigned their role in manifest.yaml
  224. nc.roles = 0;
  225. for (auto & nentry : node.second) {
  226. if (!nentry.first.compare("name")) {
  227. nc.name = nentry.second.get_value<std::string>();
  228. } else if (!nentry.first.compare("pubkey")) {
  229. ret &= hextobuf((unsigned char *)&nc.pubkey,
  230. nentry.second.get_value<std::string>().c_str(),
  231. sizeof(nc.pubkey));
  232. } else if (!nentry.first.compare("weight")) {
  233. nc.weight = nentry.second.get_value<std::uint8_t>();
  234. } else if (!nentry.first.compare("listen")) {
  235. ret &= split_host_port(nc.listenhost, nc.listenport,
  236. nentry.second.get_value<std::string>());
  237. } else if (!nentry.first.compare("clisten")) {
  238. ret &= split_host_port(nc.clistenhost, nc.clistenport,
  239. nentry.second.get_value<std::string>());
  240. } else if (!nentry.first.compare("slisten")) {
  241. ret &= split_host_port(nc.slistenhost, nc.slistenport,
  242. nentry.second.get_value<std::string>());
  243. } else if (!nentry.first.compare("roles")) {
  244. nc.roles = nentry.second.get_value<std::uint8_t>();
  245. } else {
  246. std::cerr << "Unknown field in host config: " <<
  247. nentry.first << "\n";
  248. ret = false;
  249. }
  250. }
  251. if(nc.roles & ROLE_INGESTION) {
  252. ingestion_nodes.push_back(nc);
  253. ingestion_map.push_back(node_num);
  254. }
  255. if(nc.roles & ROLE_STORAGE) {
  256. storage_nodes.push_back(std::move(nc));
  257. storage_map.push_back(node_num);
  258. }
  259. node_num++;
  260. }
  261. } else {
  262. std::cerr << "Unknown key in config: " <<
  263. entry.first << "\n";
  264. ret = false;
  265. }
  266. }
  267. if (!found_params) {
  268. std::cerr << "Could not find params in config\n";
  269. ret = false;
  270. }
  271. return ret;
  272. }
  273. static void usage(const char *argv0)
  274. {
  275. fprintf(stderr, "%s [-t nthreads] < config.json\n",
  276. argv0);
  277. exit(1);
  278. }
  279. /*
  280. Generate ESK (Encryption Secret Key) and TSK (Token Secret Key)
  281. */
  282. int generateMasterKeys(sgx_aes_gcm_128bit_key_t master_secret,
  283. aes_key &ESK, aes_key &TSK )
  284. {
  285. unsigned char zeroes[SGX_AESGCM_KEY_SIZE];
  286. unsigned char iv[SGX_AESGCM_IV_SIZE];
  287. unsigned char mac[SGX_AESGCM_MAC_SIZE];
  288. memset(iv, 0, SGX_AESGCM_IV_SIZE);
  289. memset(zeroes, 0, SGX_AESGCM_KEY_SIZE);
  290. memcpy(iv, "Encryption", sizeof("Encryption"));
  291. if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0,
  292. master_secret, iv, SGX_AESGCM_IV_SIZE, ESK, mac)) {
  293. printf("Client: generateMasterKeys FAIL\n");
  294. return -1;
  295. }
  296. printf("\n\nEncryption Master Key: ");
  297. for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
  298. printf("%x", ESK[i]);
  299. }
  300. printf("\n");
  301. memset(iv, 0, SGX_AESGCM_IV_SIZE);
  302. memcpy(iv, "Token", sizeof("Token"));
  303. if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0,
  304. master_secret, iv, SGX_AESGCM_IV_SIZE, TSK, mac)) {
  305. printf("generateMasterKeys failed\n");
  306. return -1;
  307. }
  308. printf("Token Master Key: ");
  309. for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
  310. printf("%x", TSK[i]);
  311. }
  312. printf("\n\n");
  313. return 1;
  314. }
  315. /*
  316. Takes the client_number, the master aes_key for generating client keys
  317. for encrypted communication with ingestion (client_ing_key) and
  318. storage servers (client_stg_key)
  319. */
  320. int generateClientKeys(clientid_t client_number, aes_key &ESK,
  321. aes_key &client_ing_key, aes_key &client_stg_key)
  322. {
  323. unsigned char zeroes[SGX_AESGCM_KEY_SIZE];
  324. unsigned char iv[SGX_AESGCM_IV_SIZE];
  325. unsigned char tag[SGX_AESGCM_MAC_SIZE];
  326. memset(iv, 0, SGX_AESGCM_IV_SIZE);
  327. memset(zeroes, 0, SGX_AESGCM_KEY_SIZE);
  328. memset(tag, 0, SGX_AESGCM_KEY_SIZE);
  329. memcpy(iv, &client_number, sizeof(client_number));
  330. /*
  331. printf("Client Key: (before Gen) ");
  332. for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
  333. printf("%x", client_ing_key[i]);
  334. }
  335. printf("\n");
  336. */
  337. if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, ESK,
  338. iv, SGX_AESGCM_IV_SIZE, client_ing_key, tag)) {
  339. printf("generateClientKeys failed\n");
  340. return -1;
  341. }
  342. memset(iv, 0, SGX_AESGCM_IV_SIZE);
  343. memcpy(iv, &client_number, sizeof(client_number));
  344. memcpy(iv +sizeof(client_number), "STG", sizeof("STG"));
  345. if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, ESK,
  346. iv, SGX_AESGCM_IV_SIZE, client_stg_key, tag)) {
  347. printf("generateClientKeys failed\n");
  348. return -1;
  349. }
  350. /*
  351. printf("Client %d Ingestion: (after Gen) ", client_number);
  352. for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
  353. printf("%x", client_ing_key[i]);
  354. }
  355. printf("\n");
  356. */
  357. return 1;
  358. }
  359. void Client::initClient(clientid_t cid, uint16_t stg_id,
  360. aes_key ikey, aes_key skey)
  361. {
  362. uint16_t num_storage_nodes = storage_nodes.size();
  363. sim_id = cid;
  364. id = stg_id << DEST_UID_BITS;
  365. id += (cid/num_storage_nodes);
  366. token_list = new token[config.m_token_out];
  367. memcpy(ing_key, ikey, SGX_AESGCM_KEY_SIZE);
  368. memcpy(stg_key, skey, SGX_AESGCM_KEY_SIZE);
  369. }
  370. void Client::initializeStgSocket(boost::asio::io_context &ioc,
  371. NodeConfig &stg_server, ip_addr *curr_ip, uint16_t &port_no)
  372. {
  373. boost::system::error_code err;
  374. while(1) {
  375. #ifdef VERBOSE_CLIENT
  376. std::cerr << "Connecting to " << stg_server.name << "...\n";
  377. std::cout << stg_server.slistenhost << ":" << stg_server.slistenport;
  378. #endif
  379. std::string ip_str = curr_ip->ip_str();
  380. boost::asio::ip::address ip_address =
  381. boost::asio::ip::address::from_string(ip_str, err);
  382. if(err) {
  383. printf("initializeStgSocket::Invalid IP address\n");
  384. }
  385. storage_sock = new boost::asio::ip::tcp::socket(ioc);
  386. storage_sock->open(boost::asio::ip::tcp::v4(), err);
  387. while(1) {
  388. boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
  389. storage_sock->bind(ep, err);
  390. if (!err) {
  391. break;
  392. } else {
  393. printf("STG: Error %s. (%s:%d)\n", err.message().c_str(),
  394. (curr_ip->ip_str()).c_str(), port_no);
  395. port_no++;
  396. if(port_no >= PORT_END) {
  397. port_no = PORT_START;
  398. curr_ip->increment(nthreads);
  399. }
  400. }
  401. }
  402. boost::asio::ip::address stg_ip =
  403. boost::asio::ip::address::from_string(stg_server.slistenhost, err);
  404. boost::asio::ip::tcp::endpoint
  405. stg_ep(stg_ip, std::stoi(stg_server.slistenport));
  406. // just for printing
  407. // boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
  408. storage_sock->connect(stg_ep, err);
  409. if (!err) {
  410. break;
  411. }
  412. std::cerr <<"STG: Connection from " <<
  413. curr_ip->ip_str() << ":" << port_no <<
  414. " to " << stg_server.name << " refused, will retry.\n";
  415. #ifdef RANDOMIZE_CLIENT_RETRY_SLEEP_TIME
  416. int sleep_delay = rand() % 100000;
  417. usleep(sleep_delay);
  418. #else
  419. usleep(1000000);
  420. #endif
  421. delete storage_sock;
  422. storage_sock = nullptr;
  423. }
  424. }
  425. void Client::initializeIngSocket(boost::asio::io_context &ioc,
  426. NodeConfig &ing_server, ip_addr *curr_ip, uint16_t &port_no)
  427. {
  428. boost::system::error_code err;
  429. boost::asio::ip::tcp::resolver resolver(ioc);
  430. while(1) {
  431. #ifdef VERBOSE_CLIENT
  432. std::cerr << "Connecting to " << ing_server.name << "...\n";
  433. std::cout << ing_server.clistenhost << ":" << ing_server.clistenport;
  434. #endif
  435. std::string ip_str = curr_ip->ip_str();
  436. boost::asio::ip::address ip_address =
  437. boost::asio::ip::address::from_string(ip_str, err);
  438. if(err) {
  439. printf("initializeIngSocket::Invalid IP address\n");
  440. }
  441. ingestion_sock = new boost::asio::ip::tcp::socket(ioc);
  442. ingestion_sock->open(boost::asio::ip::tcp::v4(), err);
  443. while(1) {
  444. boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
  445. ingestion_sock->bind(ep, err);
  446. if (!err) {
  447. break;
  448. } else {
  449. printf("ING: Error %s. (%s:%d)\n", err.message().c_str(),
  450. (curr_ip->ip_str()).c_str(), port_no);
  451. port_no++;
  452. if(port_no >= PORT_END) {
  453. port_no = PORT_START;
  454. curr_ip->increment(nthreads);
  455. }
  456. }
  457. }
  458. // just for printing
  459. boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
  460. boost::asio::ip::address ing_ip =
  461. boost::asio::ip::address::from_string(ing_server.clistenhost, err);
  462. boost::asio::ip::tcp::endpoint
  463. ing_ep(ing_ip, std::stoi(ing_server.clistenport));
  464. //std::cout << "Ing endpoint:" << ing_ep << "\n";
  465. //std::cout<<"ING: Attempting to connect client " << ep << " -> " << ing_ep <<"\n";
  466. ingestion_sock->connect(ing_ep, err);
  467. /*
  468. boost::asio::connect(*ingestion_sock,
  469. resolver.resolve(ing_server.clistenhost,
  470. ing_server.clistenport), err);
  471. */
  472. if (!err) {
  473. //std::cout<<"ING: Connected client " << ep << " -> " << ing_ep <<"\n";
  474. break;
  475. }
  476. std::cerr << "ING: Connection from " <<
  477. curr_ip->ip_str() << ":" << port_no <<
  478. " to " << ing_server.name << " refused, will retry.\n";
  479. #ifdef RANDOMIZE_CLIENT_RETRY_SLEEP_TIME
  480. int sleep_delay = rand() % 100000;
  481. usleep(sleep_delay);
  482. #else
  483. usleep(1000000);
  484. #endif
  485. delete ingestion_sock;
  486. ingestion_sock = nullptr;
  487. }
  488. }
  489. /*
  490. Populates the buffer pt_msgbundle with a valid message pt_msgbundle.
  491. Assumes that it is supplied with a pt_msgbundle buffer of the correct length
  492. num_out is either token_out or id_out, depending on whether we're
  493. doing token channel or ID channel routing
  494. Correct length for pt_msgbundle = (num_out)*(msg_size) +
  495. (only for token channel routing) (num_out)*TOKEN_SIZE
  496. */
  497. void Client::generateMessageBundle(uint8_t num_out, uint32_t msg_size,
  498. unsigned char *pt_msgbundle)
  499. {
  500. unsigned char *ptr = pt_msgbundle;
  501. // Setup message pt_msgbundle
  502. for(uint32_t i = 0; i < num_out; i++) {
  503. // For benchmarking, each client just sends messages to
  504. // themselves, so the destination and source ids are the same.
  505. // Receiver id
  506. uint32_t rid = id;
  507. #if 0
  508. uint32_t dest_uid_mask = (1 << DEST_UID_BITS) - 1;
  509. if (!token_channel) {
  510. // If we're testing ID channel routing, have each user send a
  511. // message to the user with the same local id, but on
  512. // storage server 0.
  513. rid &= dest_uid_mask;
  514. }
  515. #endif
  516. unsigned char *start_ptr = ptr;
  517. memcpy(ptr, &rid, sizeof(rid));
  518. ptr += sizeof(rid);
  519. // Priority (for ID channel routing only)
  520. if (!token_channel) {
  521. uint32_t priority = 0;
  522. #ifdef SHOW_RECEIVED_MESSAGES
  523. // If we're testing ID channel routing, set the priority so that
  524. // messages to different users will have the highest
  525. // priority messages sent from users at different servers
  526. uint32_t id_low_bits = id &
  527. ((1 << DEST_STORAGE_NODE_BITS) - 1);
  528. priority = id ^ (id_low_bits << DEST_UID_BITS);
  529. #endif
  530. memcpy(ptr, &priority, sizeof(priority));
  531. ptr += sizeof(priority);
  532. }
  533. // Sender id is our id
  534. uint32_t sid = id;
  535. memcpy(ptr, &sid, sizeof(sid));
  536. ptr += sizeof(sid);
  537. uint32_t remaining_message_size = start_ptr + msg_size - ptr;
  538. memset(ptr, 0, remaining_message_size);
  539. #ifdef SHOW_RECEIVED_MESSAGES
  540. snprintf((char *)ptr, remaining_message_size, "From %08x to %08x",
  541. id, rid);
  542. #endif
  543. ptr+=(remaining_message_size);
  544. }
  545. if(token_channel) {
  546. // Add the tokens for this msgbundle
  547. memcpy(ptr, token_list, config.m_token_out * TOKEN_SIZE);
  548. }
  549. }
  550. bool Client::encryptMessageBundle(uint32_t enc_bundle_size,
  551. unsigned char *pt_msgbundle, unsigned char *enc_msgbundle)
  552. {
  553. // Encrypt the pt_msgbundle
  554. unsigned char *pt_msgbundle_start = pt_msgbundle;
  555. unsigned char *enc_msgbundle_start = enc_msgbundle + SGX_AESGCM_IV_SIZE;
  556. unsigned char *enc_tag = enc_msgbundle + enc_bundle_size - SGX_AESGCM_MAC_SIZE;
  557. size_t bytes_to_encrypt = enc_bundle_size - SGX_AESGCM_MAC_SIZE -
  558. SGX_AESGCM_IV_SIZE;
  559. if (bytes_to_encrypt != gcm_encrypt(pt_msgbundle_start, bytes_to_encrypt,
  560. NULL, 0, ing_key, ing_iv, SGX_AESGCM_IV_SIZE, enc_msgbundle_start,
  561. enc_tag)) {
  562. printf("Client: encryptMessageBundle FAIL\n");
  563. return 0;
  564. }
  565. // Copy IV into the bundle
  566. memcpy(enc_msgbundle, ing_iv, SGX_AESGCM_IV_SIZE);
  567. // Update IV
  568. uint64_t *iv_ctr = (uint64_t*) ing_iv;
  569. (*iv_ctr)+=1;
  570. return 1;
  571. }
  572. #ifdef TRACE_SOCKIO
  573. class LimitLogger {
  574. std::string label;
  575. std::string thrid;
  576. struct timeval last_log;
  577. size_t num_items;
  578. public:
  579. LimitLogger(const char *_label): label(_label), last_log({0,0}),
  580. num_items(0) {
  581. std::stringstream ss;
  582. ss << boost::this_thread::get_id();
  583. thrid = ss.str();
  584. }
  585. void log() {
  586. struct timeval now;
  587. gettimeofday(&now, NULL);
  588. long elapsedus = (now.tv_sec - last_log.tv_sec) * 1000000
  589. + (now.tv_usec - last_log.tv_usec);
  590. if (num_items > 0 && elapsedus > 500000) {
  591. printf("%lu.%06lu: Thread %s end %s of %lu items\n",
  592. last_log.tv_sec, last_log.tv_usec,
  593. thrid.c_str(), label.c_str(),
  594. num_items);
  595. num_items = 0;
  596. }
  597. if (num_items == 0) {
  598. printf("%lu.%06lu: Thread %s begin %s\n", now.tv_sec,
  599. now.tv_usec, thrid.c_str(), label.c_str());
  600. }
  601. gettimeofday(&last_log, NULL);
  602. ++num_items;
  603. }
  604. };
  605. static thread_local LimitLogger
  606. recvlogger("recv"), queuelogger("queue"), sentlogger("sent");
  607. #endif
  608. void Client::sendMessageBundle()
  609. {
  610. uint16_t token_out = config.m_token_out;
  611. uint16_t id_out = config.m_id_out;
  612. uint16_t msg_size = config.msg_size;
  613. uint32_t send_pt_msgbundle_size, send_enc_msgbundle_size;
  614. if(token_channel) {
  615. send_pt_msgbundle_size = ptMsgBundleSize(token_out, msg_size);
  616. send_enc_msgbundle_size = encMsgBundleSize(token_out, msg_size);
  617. } else {
  618. send_pt_msgbundle_size = ptPubMsgBundleSize(id_out, msg_size);
  619. send_enc_msgbundle_size = encPubMsgBundleSize(id_out, msg_size);
  620. }
  621. unsigned char *send_pt_msgbundle =
  622. (unsigned char*) malloc (send_pt_msgbundle_size);
  623. unsigned char *send_enc_msgbundle =
  624. (unsigned char*) malloc (send_enc_msgbundle_size);
  625. if(token_channel) {
  626. generateMessageBundle(token_out, msg_size, send_pt_msgbundle);
  627. } else {
  628. generateMessageBundle(id_out, msg_size, send_pt_msgbundle);
  629. }
  630. encryptMessageBundle(send_enc_msgbundle_size, send_pt_msgbundle,
  631. send_enc_msgbundle);
  632. #ifdef VERBOSE_CLIENT
  633. displayPtMessageBundle(send_pt_msgbundle, token_out, msg_size);
  634. #endif
  635. free(send_pt_msgbundle);
  636. #ifdef TRACE_SOCKIO
  637. queuelogger.log();
  638. #endif
  639. boost::asio::async_write(*ingestion_sock,
  640. boost::asio::buffer(send_enc_msgbundle, send_enc_msgbundle_size),
  641. [this, send_enc_msgbundle] (boost::system::error_code ecc, std::size_t) {
  642. #ifdef TRACE_SOCKIO
  643. sentlogger.log();
  644. #endif
  645. #ifdef VERBOSE_CLIENT
  646. if(sim_id==0){
  647. printf("TEST: Client 0 send their msgbundle\n");
  648. }
  649. #endif
  650. free(send_enc_msgbundle);
  651. if (ecc) {
  652. if(ecc == boost::asio::error::eof) {
  653. delete storage_sock;
  654. storage_sock = nullptr;
  655. } else {
  656. printf("Client: boost async_write failed for sending "
  657. "message bundle\n");
  658. printf("Error %s\n", ecc.message().c_str());
  659. }
  660. return;
  661. }
  662. });
  663. }
  664. int Client::sendIngAuthMessage(unsigned long epoch_no)
  665. {
  666. uint32_t auth_size = sizeof(clientid_t) + sizeof(unsigned long) +
  667. SGX_AESGCM_KEY_SIZE;
  668. unsigned char *auth_message = (unsigned char*) malloc(auth_size);
  669. unsigned char *am_ptr = auth_message;
  670. memcpy(am_ptr, &sim_id, sizeof(sim_id));
  671. am_ptr+=sizeof(sim_id);
  672. memcpy(am_ptr, &epoch_no, sizeof(unsigned long));
  673. am_ptr+=sizeof(unsigned long);
  674. unsigned char zeroes[SGX_AESGCM_KEY_SIZE] = {0};
  675. unsigned char tag[SGX_AESGCM_MAC_SIZE] = {0};
  676. unsigned char epoch_iv[SGX_AESGCM_IV_SIZE] = {0};
  677. memcpy(epoch_iv, &epoch_no, sizeof(epoch_no));
  678. if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE,
  679. NULL, 0, ing_key, epoch_iv, SGX_AESGCM_IV_SIZE,
  680. am_ptr, tag)) {
  681. printf("sendIngAuthMessage failed\n");
  682. return -1;
  683. }
  684. #ifdef VERBOSE_CLIENT
  685. printf("Client %d auth_message: \n", id);
  686. for(int i=0; i<auth_size; i++) {
  687. printf("%x", auth_message[i]);
  688. }
  689. printf("\n");
  690. #endif
  691. /*
  692. if(sim_id%7919==0) {
  693. printf("Client %d auth_message: \n", sim_id);
  694. for(int i=0; i<TOKEN_SIZE; i++) {
  695. printf("%x", am_ptr[i]);
  696. }
  697. printf("\n");
  698. }
  699. */
  700. boost::asio::write(*ingestion_sock,
  701. boost::asio::buffer(auth_message, auth_size));
  702. return 1;
  703. }
  704. int Client::sendStgAuthMessage(unsigned long epoch_no)
  705. {
  706. uint32_t auth_size = sizeof(clientid_t) + sizeof(unsigned long) +
  707. SGX_AESGCM_KEY_SIZE;
  708. unsigned char *auth_message = (unsigned char*) malloc(auth_size);
  709. unsigned char *am_ptr = auth_message;
  710. memcpy(am_ptr, &sim_id, sizeof(sim_id));
  711. am_ptr+=sizeof(sim_id);
  712. memcpy(am_ptr, &epoch_no, sizeof(unsigned long));
  713. am_ptr+=sizeof(unsigned long);
  714. unsigned char zeroes[SGX_AESGCM_KEY_SIZE] = {0};
  715. unsigned char tag[SGX_AESGCM_MAC_SIZE] = {0};
  716. unsigned char epoch_iv[SGX_AESGCM_IV_SIZE] = {0};
  717. memcpy(epoch_iv, &epoch_no, sizeof(epoch_no));
  718. if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE,
  719. NULL, 0, stg_key, epoch_iv, SGX_AESGCM_IV_SIZE,
  720. am_ptr, tag)) {
  721. printf("sendStgAuthMessage failed\n");
  722. return -1;
  723. }
  724. #ifdef VERBOSE_CLIENT
  725. printf("Client %d auth_message: \n", id);
  726. for(int i=0; i<auth_size; i++) {
  727. printf("%x", auth_message[i]);
  728. }
  729. printf("\n");
  730. #endif
  731. boost::asio::async_write(*storage_sock,
  732. boost::asio::buffer(auth_message, auth_size),
  733. [this, auth_message] (boost::system::error_code ecc, std::size_t) {
  734. free(auth_message);
  735. if (ecc) {
  736. if(ecc == boost::asio::error::eof) {
  737. delete storage_sock;
  738. storage_sock = nullptr;
  739. } else {
  740. printf("Error %s\n", ecc.message().c_str());
  741. }
  742. printf("Client::sendStgAuthMessage boost async_write failed\n");
  743. return;
  744. }
  745. });
  746. return 1;
  747. }
  748. void Client::setup_client(boost::asio::io_context &io_context,
  749. uint32_t sim_id, uint16_t ing_node_id, uint16_t stg_node_id,
  750. ip_addr *curr_ip, uint16_t &port_no)
  751. {
  752. // Set up the client's
  753. // (i) client_id
  754. // (ii) symmetric keys shared with their ingestion and storage server
  755. // (iii) sockets to their ingestion and storage server
  756. aes_key client_ing_key;
  757. aes_key client_stg_key;
  758. int ret = generateClientKeys(sim_id, ESK, client_ing_key, client_stg_key);
  759. initClient(sim_id, stg_node_id, client_ing_key, client_stg_key);
  760. initializeStgSocket(io_context, storage_nodes[stg_node_id], curr_ip, port_no);
  761. port_no++;
  762. initializeIngSocket(io_context, ingestion_nodes[ing_node_id], curr_ip, port_no);
  763. port_no++;
  764. // Authenticate clients to their ingestion and storage servers
  765. struct timespec ep;
  766. clock_gettime(CLOCK_REALTIME_COARSE, &ep);
  767. unsigned long time_in_us = ep.tv_sec * 1000000 + ep.tv_nsec/1000;
  768. unsigned long epoch_no = CEILDIV(time_in_us, 5000000);
  769. sendStgAuthMessage(epoch_no);
  770. sendIngAuthMessage(epoch_no);
  771. epoch_process();
  772. }
  773. void generateClients(boost::asio::io_context &io_context,
  774. uint32_t cstart, uint32_t cstop, uint8_t thread_no)
  775. {
  776. uint32_t num_clients_total = config.user_count;
  777. uint16_t num_stg_nodes = storage_nodes.size();
  778. uint16_t num_ing_nodes = ingestion_nodes.size();
  779. uint16_t port_no = PORT_START;
  780. ip_addr curr_ip;
  781. curr_ip.ip1 = 127;
  782. curr_ip.ip2 = 1 + thread_no;
  783. curr_ip.ip3 = 0;
  784. curr_ip.ip4 = 0;
  785. for(uint32_t i=cstart; i<cstop; i++) {
  786. // Compute client's ip and port
  787. #ifdef CLIENT_UNIQUE_IP
  788. if(port_no>=PORT_END) {
  789. port_no = PORT_START;
  790. }
  791. curr_ip.increment(nthreads);
  792. #else
  793. if(port_no>=PORT_END) {
  794. port_no = PORT_START;
  795. curr_ip.increment(nthreads);
  796. }
  797. #endif
  798. uint16_t ing_no = i % num_ing_nodes;
  799. uint16_t stg_no = i % num_stg_nodes;
  800. uint16_t stg_node_id = storage_map[stg_no];
  801. uint16_t ing_node_id = ingestion_map[ing_no];
  802. clients[i].setup_client(io_context, i, ing_node_id, stg_node_id,
  803. &curr_ip, port_no);
  804. }
  805. printf("Done with all client_setup calls. Thread_no = %d\n", thread_no);
  806. }
  807. /*
  808. Epochs are server driven.
  809. In a single epoch, each client waits to receive from their storage server
  810. (i) a token bundle for this epoch and (ii) their messages from the last epoch
  811. The client then sends their messages for this epoch to their ingestion servers
  812. using the tokens they received in this epoch
  813. */
  814. void Client::epoch_process() {
  815. uint32_t pt_token_size = uint32_t(config.m_token_out) * TOKEN_SIZE;
  816. uint32_t token_bundle_size = pt_token_size + SGX_AESGCM_IV_SIZE
  817. + SGX_AESGCM_MAC_SIZE;
  818. unsigned char *enc_tokens = nullptr;
  819. uint16_t num_in = config.m_id_in;
  820. std::vector<boost::asio::mutable_buffer> toreceive;
  821. if (token_channel) {
  822. enc_tokens = (unsigned char*) malloc (token_bundle_size);
  823. toreceive.push_back(boost::asio::buffer(enc_tokens,
  824. token_bundle_size));
  825. num_in = config.m_token_in;
  826. }
  827. uint16_t msg_size = config.msg_size;
  828. uint32_t recv_pt_mailbox_size = ptMailboxSize(num_in, msg_size);
  829. uint32_t recv_enc_mailbox_size = encMailboxSize(num_in, msg_size);
  830. unsigned char *recv_pt_mailbox =
  831. (unsigned char*) malloc (recv_pt_mailbox_size);
  832. unsigned char *recv_enc_mailbox =
  833. (unsigned char*) malloc (recv_enc_mailbox_size);
  834. toreceive.push_back(boost::asio::buffer(recv_enc_mailbox,
  835. recv_enc_mailbox_size));
  836. // Async read the encrypted tokens (for token channel routing only) and
  837. // encrypted mailbox (both token and ID channels) for this
  838. // epoch
  839. boost::asio::async_read(*storage_sock, toreceive,
  840. [this, enc_tokens, token_bundle_size, pt_token_size, num_in,
  841. recv_pt_mailbox, recv_enc_mailbox, recv_pt_mailbox_size]
  842. (boost::system::error_code ec, std::size_t) {
  843. if (ec) {
  844. if(ec == boost::asio::error::eof) {
  845. delete storage_sock;
  846. storage_sock = nullptr;
  847. } else {
  848. printf("Error %s\n", ec.message().c_str());
  849. printf("Client::epoch_process boost "
  850. "async_read_tokens failed\n");
  851. }
  852. free(enc_tokens);
  853. free(recv_pt_mailbox);
  854. free(recv_enc_mailbox);
  855. return;
  856. }
  857. #ifdef TRACE_SOCKIO
  858. recvlogger.log();
  859. #endif
  860. #ifdef VERBOSE_CLIENT
  861. if(sim_id == 0) {
  862. printf("TEST: Client 0: Encrypted token bundle received:\n");
  863. for(uint32_t i = 0; i < token_bundle_size; i++) {
  864. printf("%02x", enc_tokens[i]);
  865. }
  866. printf("\n");
  867. printf("TEST: Client 0: Encrypted msgbundle received\n");
  868. }
  869. #endif
  870. if (token_channel) {
  871. // Decrypt the token bundle
  872. unsigned char *enc_tkn_ptr = enc_tokens + SGX_AESGCM_IV_SIZE;
  873. unsigned char *enc_tkn_tag = enc_tokens + SGX_AESGCM_IV_SIZE +
  874. pt_token_size;
  875. int decrypted_bytes = gcm_decrypt(enc_tkn_ptr, pt_token_size,
  876. NULL, 0, enc_tkn_tag, (unsigned char*) &(this->stg_key),
  877. enc_tokens, SGX_AESGCM_IV_SIZE,
  878. (unsigned char*) (this->token_list));
  879. if (decrypted_bytes != pt_token_size) {
  880. printf("Client::epoch_process gcm_decrypt tokens failed. "
  881. "decrypted_bytes = %d\n", decrypted_bytes);
  882. }
  883. free(enc_tokens);
  884. /*
  885. unsigned char *tkn_ptr = (unsigned char*) this->token_list;
  886. if(sim_id==0) {
  887. printf("TEST: Client 0: Decrypted client tokens:\n");
  888. for(int i = 0; i < pt_token_size; i++) {
  889. printf("%02x", tkn_ptr[i]);
  890. }
  891. printf("\n");
  892. }
  893. */
  894. }
  895. // Do whatever processing with the received messages here
  896. // but for the benchmark, we just ignore the received
  897. // messages (unless we want to print them)
  898. #ifdef SHOW_RECEIVED_MESSAGES
  899. unsigned char *recv_enc_mailbox_ptr =
  900. recv_enc_mailbox + SGX_AESGCM_IV_SIZE;
  901. unsigned char *recv_enc_mailbox_tag =
  902. recv_enc_mailbox + SGX_AESGCM_IV_SIZE + recv_pt_mailbox_size;
  903. int decrypted_bytes = gcm_decrypt(recv_enc_mailbox_ptr,
  904. recv_pt_mailbox_size, NULL, 0, recv_enc_mailbox_tag,
  905. (unsigned char*) &(this->stg_key), recv_enc_mailbox,
  906. SGX_AESGCM_IV_SIZE, recv_pt_mailbox);
  907. if (decrypted_bytes != recv_pt_mailbox_size) {
  908. printf("Client::epoch_process gcm_decrypt mailbox failed. "
  909. "decrypted_bytes = %d\n", decrypted_bytes);
  910. }
  911. if (sim_id < 32) {
  912. displayPtMessageBundle(recv_pt_mailbox, num_in,
  913. config.msg_size, id);
  914. }
  915. #endif
  916. free(recv_enc_mailbox);
  917. free(recv_pt_mailbox);
  918. // Send this epoch's message bundle
  919. sendMessageBundle();
  920. epoch_process();
  921. });
  922. }
  923. void initializeClients(boost::asio::io_context &io_context, uint16_t nthreads)
  924. {
  925. std::vector<boost::thread> threads;
  926. uint32_t num_clients_total = config.user_count;
  927. size_t clients_per_thread = CEILDIV(num_clients_total, nthreads);
  928. // Generate all the clients for the experiment
  929. for(int i=0; i<nthreads; i++) {
  930. uint32_t cstart, cstop;
  931. cstart = i * clients_per_thread;
  932. cstop = (i==(nthreads-1))? num_clients_total: (i+1) * clients_per_thread;
  933. #ifdef VERBOSE_CLIENT
  934. printf("Thread %d, cstart = %d, cstop = %d\n", i, cstart, cstop);
  935. #endif
  936. threads.emplace_back(boost::thread(generateClients,
  937. boost::ref(io_context), cstart, cstop, i));
  938. }
  939. for(int i=0; i<nthreads; i++) {
  940. threads[i].join();
  941. }
  942. }
  943. int main(int argc, char **argv)
  944. {
  945. // Unbuffer stdout
  946. setbuf(stdout, NULL);
  947. const char *progname = argv[0];
  948. ++argv;
  949. // Parse options
  950. while (*argv && (*argv)[0] == '-') {
  951. if (!strcmp(*argv, "-t")) {
  952. if (argv[1] == NULL) {
  953. usage(progname);
  954. }
  955. nthreads = uint16_t(atoi(argv[1]));
  956. argv += 2;
  957. } else {
  958. usage(progname);
  959. }
  960. }
  961. // Read the config.json from the first line of stdin. We have to do
  962. // this before outputting anything to avoid potential deadlock with
  963. // the launch program.
  964. std::string configstr;
  965. std::getline(std::cin, configstr);
  966. boost::asio::io_context io_context;
  967. if (!config_parse(config, configstr, ingestion_nodes,
  968. storage_nodes, storage_map)) {
  969. exit(1);
  970. }
  971. token_channel = config.token_channel;
  972. clients = new Client[config.user_count];
  973. #ifdef VERBOSE_CLIENT
  974. printf("Number of ingestion_nodes = %ld, Number of storage_node = %ld\n",
  975. ingestion_nodes.size(), storage_nodes.size());
  976. #endif
  977. generateMasterKeys(config.master_secret, ESK, TSK);
  978. // Queue up the actual work
  979. boost::asio::post(io_context, [&]{
  980. initializeClients(io_context, nthreads);
  981. });
  982. // Start background threads; one thread will perform the work and the
  983. // others will execute the async_write/async_read handlers
  984. std::vector<boost::thread> threads;
  985. for (int i=0; i<nthreads; i++) {
  986. threads.emplace_back([&]{io_context.run();});
  987. }
  988. io_context.run();
  989. for (int i=0; i<nthreads; i++) {
  990. threads[i].join();
  991. }
  992. delete [] clients;
  993. }