net.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714
  1. #include <iostream>
  2. #include "Enclave_u.h"
  3. #include "Untrusted.hpp"
  4. #include "net.hpp"
  5. // The command type byte values
  6. #define COMMAND_EPOCH 0x00
  7. #define COMMAND_MESSAGE 0x01
  8. #define COMMAND_CHUNK 0x02
  9. #define VERBOSE_NET
  10. // #define DEBUG_NET_CLIENTS
  11. #define PROFILE_NET_CLIENTS
  12. #define CEILDIV(x,y) (((x)+(y)-1)/(y))
  13. NetIO *g_netio = NULL;
  14. size_t client_count = 0;
  15. NodeIO::NodeIO(tcp::socket &&socket, nodenum_t nodenum) :
  16. sock(std::move(socket)), node_num(nodenum), msgsize_inflight(0),
  17. chunksize_inflight(0), recv_msgsize_inflight(0),
  18. recv_chunksize_inflight(0), bytes_sent(0)
  19. {
  20. }
  21. uint8_t *NodeIO::request_frame()
  22. {
  23. if (frames_available.empty()) {
  24. // Allocate a new frame. Note that this memory will (at this
  25. // time) never get deallocated. In theory, we could deallocate
  26. // it in return_frame, but if a certain number of frames were
  27. // allocated here, it means we had that much data in flight
  28. // (queued but not accepted for sending by the OS), and we're
  29. // likely to need that much again. Subsequent messages will
  30. // _reuse_ the allocated data, though, so the used memory won't
  31. // grow forever, and will be limited to the amount of in-flight
  32. // data needed.
  33. return new uint8_t[FRAME_SIZE];
  34. }
  35. // Copy the pointer to the frame out of the deque and remove it from
  36. // the deque. Note this is _not_ taking the address of the element
  37. // *in* the deque (and then popping it, which would invalidate that
  38. // pointer).
  39. frame_deque_lock.lock();
  40. uint8_t *frame = frames_available.back();
  41. frames_available.pop_back();
  42. frame_deque_lock.unlock();
  43. return frame;
  44. }
  45. void NodeIO::return_frame(uint8_t *frame)
  46. {
  47. if (!frame) return;
  48. // We push the frame back on to the end of the deque so that it will
  49. // be the next one used. This may lead to better cache behaviour?
  50. frame_deque_lock.lock();
  51. frames_available.push_back(frame);
  52. frame_deque_lock.unlock();
  53. }
  54. void NodeIO::send_header_data(uint64_t header, uint8_t *data, size_t len)
  55. {
  56. commands_deque_lock.lock();
  57. commands_inflight.push_back({header, data, len});
  58. if (commands_inflight.size() == 1) {
  59. async_send_commands();
  60. }
  61. commands_deque_lock.unlock();
  62. }
  63. void NodeIO::async_send_commands()
  64. {
  65. std::vector<boost::asio::const_buffer> tosend;
  66. CommandTuple *commandp = &(commands_inflight.front());
  67. tosend.push_back(boost::asio::buffer(&(std::get<0>(*commandp)), 5));
  68. if (std::get<1>(*commandp) != NULL && std::get<2>(*commandp) > 0) {
  69. tosend.push_back(boost::asio::buffer(std::get<1>(*commandp),
  70. std::get<2>(*commandp)));
  71. }
  72. boost::asio::async_write(sock, tosend,
  73. [this, commandp](boost::system::error_code, std::size_t){
  74. // When the write completes, pop the command from the deque
  75. // (which should now be in the front)
  76. commands_deque_lock.lock();
  77. assert(!commands_inflight.empty() &&
  78. &(commands_inflight.front()) == commandp);
  79. bytes_sent = bytes_sent + 5 + std::get<2>(*commandp);
  80. uint8_t *data = std::get<1>(*commandp);
  81. commands_inflight.pop_front();
  82. if (commands_inflight.size() > 0) {
  83. async_send_commands();
  84. }
  85. // And return the frame
  86. return_frame(data);
  87. commands_deque_lock.unlock();
  88. });
  89. }
  90. void NodeIO::send_epoch(uint32_t epoch_num)
  91. {
  92. uint64_t header = (uint64_t(epoch_num) << 8) + COMMAND_EPOCH;
  93. send_header_data(header, NULL, 0);
  94. }
  95. void NodeIO::send_message_header(uint32_t tot_message_len)
  96. {
  97. uint64_t header = (uint64_t(tot_message_len) << 8) + COMMAND_MESSAGE;
  98. send_header_data(header, NULL, 0);
  99. // If we're sending a new message header, we have to have finished
  100. // sending the previous message.
  101. assert(chunksize_inflight == msgsize_inflight);
  102. msgsize_inflight = tot_message_len;
  103. chunksize_inflight = 0;
  104. #ifdef TRACE_SOCKIO
  105. struct timeval now;
  106. gettimeofday(&now, NULL);
  107. printf("%lu.%06lu: RTE queueing %u bytes to %s\n", now.tv_sec,
  108. now.tv_usec, msgsize_inflight,
  109. g_netio->config().nodes[node_num].name.c_str());
  110. if (msgsize_inflight == 0) {
  111. printf("%lu.%06lu: RTE queued %u bytes to %s\n", now.tv_sec,
  112. now.tv_usec, msgsize_inflight,
  113. g_netio->config().nodes[node_num].name.c_str());
  114. }
  115. #endif
  116. }
  117. bool NodeIO::send_chunk(uint8_t *data, uint32_t chunk_len)
  118. {
  119. assert(chunk_len <= FRAME_SIZE);
  120. uint64_t header = (uint64_t(chunk_len) << 8) + COMMAND_CHUNK;
  121. send_header_data(header, data, chunk_len);
  122. chunksize_inflight += chunk_len;
  123. assert(chunksize_inflight <= msgsize_inflight);
  124. #ifdef TRACE_SOCKIO
  125. if (msgsize_inflight == chunksize_inflight) {
  126. struct timeval now;
  127. gettimeofday(&now, NULL);
  128. printf("%lu.%06lu: RTE queued %u bytes to %s\n", now.tv_sec,
  129. now.tv_usec, msgsize_inflight,
  130. g_netio->config().nodes[node_num].name.c_str());
  131. }
  132. #endif
  133. return (chunksize_inflight < msgsize_inflight);
  134. }
  135. void NodeIO::recv_commands(
  136. std::function<void(boost::system::error_code)> error_cb,
  137. std::function<void(uint32_t)> epoch_cb)
  138. {
  139. // Asynchronously read the header
  140. receive_header = 0;
  141. boost::asio::async_read(sock, boost::asio::buffer(&receive_header, 5),
  142. [this, error_cb, epoch_cb]
  143. (boost::system::error_code ec, std::size_t) {
  144. if (ec) {
  145. error_cb(ec);
  146. return;
  147. }
  148. if ((receive_header & 0xff) == COMMAND_EPOCH) {
  149. epoch_cb(uint32_t(receive_header >> 8));
  150. recv_commands(error_cb, epoch_cb);
  151. } else if ((receive_header & 0xff) == COMMAND_MESSAGE) {
  152. assert(recv_msgsize_inflight == recv_chunksize_inflight);
  153. recv_msgsize_inflight = uint32_t(receive_header >> 8);
  154. recv_chunksize_inflight = 0;
  155. #ifdef TRACE_SOCKIO
  156. struct timeval now;
  157. gettimeofday(&now, NULL);
  158. printf("%lu.%06lu: RTE receiving %u bytes from %s\n", now.tv_sec,
  159. now.tv_usec, recv_msgsize_inflight,
  160. g_netio->config().nodes[node_num].name.c_str());
  161. if (recv_msgsize_inflight == 0) {
  162. printf("%lu.%06lu: RTE received %u bytes from %s\n", now.tv_sec,
  163. now.tv_usec, recv_msgsize_inflight,
  164. g_netio->config().nodes[node_num].name.c_str());
  165. }
  166. #endif
  167. if (ecall_message(node_num, recv_msgsize_inflight)) {
  168. recv_commands(error_cb, epoch_cb);
  169. } else {
  170. printf("ecall_message failed\n");
  171. }
  172. } else if ((receive_header & 0xff) == COMMAND_CHUNK) {
  173. uint32_t this_chunk_size = uint32_t(receive_header >> 8);
  174. assert(recv_chunksize_inflight + this_chunk_size <=
  175. recv_msgsize_inflight);
  176. recv_chunksize_inflight += this_chunk_size;
  177. boost::asio::async_read(sock, boost::asio::buffer(
  178. receive_frame, this_chunk_size),
  179. [this, error_cb, epoch_cb, this_chunk_size]
  180. (boost::system::error_code ecc, std::size_t) {
  181. if (ecc) {
  182. error_cb(ecc);
  183. return;
  184. }
  185. #ifdef TRACE_SOCKIO
  186. if (recv_msgsize_inflight == recv_chunksize_inflight) {
  187. struct timeval now;
  188. gettimeofday(&now, NULL);
  189. printf("%lu.%06lu: RTE received %u bytes from %s\n", now.tv_sec,
  190. now.tv_usec, recv_msgsize_inflight,
  191. g_netio->config().nodes[node_num].name.c_str());
  192. }
  193. #endif
  194. if (ecall_chunk(node_num, receive_frame,
  195. this_chunk_size)) {
  196. recv_commands(error_cb, epoch_cb);
  197. } else {
  198. printf("ecall_chunk failed\n");
  199. }
  200. });
  201. } else {
  202. error_cb(boost::system::errc::make_error_code(
  203. boost::system::errc::errc_t::invalid_argument));
  204. }
  205. });
  206. }
  207. uint64_t NodeIO::reset_bytes_sent()
  208. {
  209. uint64_t b_sent = bytes_sent;
  210. bytes_sent = 0;
  211. return b_sent;
  212. }
  213. uint64_t NetIO::reset_bytes_sent()
  214. {
  215. uint64_t total=0;
  216. for(size_t i = 0; i<nodeios.size(); i++) {
  217. if(nodeios[i].has_value()) {
  218. total+=((nodeios[i].value()).reset_bytes_sent());
  219. }
  220. }
  221. return total;
  222. }
  223. /*
  224. Receive clients' dropped-off messages, i.e. a CLIENT_MESSAGE_BUNDLE
  225. */
  226. void NetIO::ing_receive_msgbundle(tcp::socket* csocket, clientid_t c_simid)
  227. {
  228. unsigned char *msgbundle = (unsigned char*) malloc(msgbundle_size);
  229. boost::asio::async_read(*csocket, boost::asio::buffer(msgbundle, msgbundle_size),
  230. [this, csocket, msgbundle, c_simid]
  231. (boost::system::error_code ec, std::size_t) {
  232. if (ec) {
  233. if(ec == boost::asio::error::eof) {
  234. // Client connection terminated so we delete this socket
  235. delete(csocket);
  236. } else {
  237. printf("Error ing_receive_msgbundle : %s\n", ec.message().c_str());
  238. }
  239. free(msgbundle);
  240. return;
  241. }
  242. #ifdef TRACE_SOCKIO
  243. struct timeval now;
  244. gettimeofday(&now, NULL);
  245. long elapsedus = (now.tv_sec - last_ing.tv_sec) * 1000000
  246. + (now.tv_usec - last_ing.tv_usec);
  247. if (num_ing > 0 && elapsedus > 500000) {
  248. printf("%lu.%06lu: End ingestion of %lu messages\n",
  249. last_ing.tv_sec, last_ing.tv_usec, num_ing);
  250. num_ing = 0;
  251. }
  252. if (num_ing == 0) {
  253. printf("%lu.%06lu: Begin ingestion\n", now.tv_sec,
  254. now.tv_usec);
  255. }
  256. #endif
  257. bool ret;
  258. //Ingest the message_bundle
  259. if(conf.private_routing) {
  260. ret = ecall_ingest_msgbundle(c_simid, msgbundle, conf.m_priv_out);
  261. } else {
  262. ret = ecall_ingest_msgbundle(c_simid, msgbundle, conf.m_pub_out);
  263. }
  264. free(msgbundle);
  265. #ifdef TRACE_SOCKIO
  266. gettimeofday(&last_ing, NULL);
  267. ++num_ing;
  268. #endif
  269. // Continue to async receive client message bundles
  270. if(ret) {
  271. ing_receive_msgbundle(csocket, c_simid);
  272. }
  273. });
  274. }
  275. /*
  276. Handle new client connections.
  277. New clients always send an authentication message.
  278. For ingestion this is then followed by their msg_bundles every epoch.
  279. */
  280. void NetIO::ing_authenticate_new_client(tcp::socket* csocket,
  281. const boost::system::error_code& error)
  282. {
  283. if(error) {
  284. printf("Accept handler failed\n");
  285. return;
  286. }
  287. #ifdef DEBUG_NET_CLIENTS
  288. printf("Accept handler success\n");
  289. #endif
  290. unsigned char* auth_message = (unsigned char*) malloc(auth_size);
  291. boost::asio::async_read(*csocket, boost::asio::buffer(auth_message, auth_size),
  292. [this, csocket, auth_message]
  293. (boost::system::error_code ec, std::size_t) {
  294. if (ec) {
  295. if(ec == boost::asio::error::eof) {
  296. // Client connection terminated so we delete this socket
  297. delete(csocket);
  298. } else {
  299. printf("Error ing_auth_new_client : %s\n", ec.message().c_str());
  300. }
  301. free(auth_message);
  302. return;
  303. } else {
  304. clientid_t c_simid = *((clientid_t *)(auth_message));
  305. // Read the authentication token
  306. unsigned char *auth_ptr = auth_message + sizeof(clientid_t);
  307. bool ret = ecall_authenticate(c_simid, auth_ptr);
  308. free(auth_message);
  309. // Receive client message bundles on this socket
  310. // for client sim_id c_simid
  311. if(ret) {
  312. client_count++;
  313. ing_receive_msgbundle(csocket, c_simid);
  314. } else{
  315. printf("Client <-> Ingestion authentication failed\n");
  316. delete(csocket);
  317. }
  318. }
  319. });
  320. ing_start_accept();
  321. }
  322. #ifdef TRACE_SOCKIO
  323. static size_t stg_clients_connected = 0;
  324. static size_t stg_clients_authenticated = 0;
  325. #endif
  326. /*
  327. Handle new client connections.
  328. New clients always send an authentication message.
  329. For storage this is then followed by the storage servers sending them
  330. their mailbox every epoch.
  331. */
  332. void NetIO::stg_authenticate_new_client(tcp::socket* csocket,
  333. const boost::system::error_code& error)
  334. {
  335. if(error) {
  336. printf("Accept handler failed\n");
  337. return;
  338. }
  339. #ifdef DEBUG_NET_CLIENTS
  340. printf("Accept handler success\n");
  341. #endif
  342. unsigned char* auth_message = (unsigned char*) malloc(auth_size);
  343. boost::asio::async_read(*csocket, boost::asio::buffer(auth_message, auth_size),
  344. [this, csocket, auth_message]
  345. (boost::system::error_code ec, std::size_t) {
  346. if (ec) {
  347. if(ec == boost::asio::error::eof) {
  348. // Client connection terminated so we delete this socket
  349. delete(csocket);
  350. } else {
  351. printf("Error stg_auth_new_client: %s\n", ec.message().c_str());
  352. }
  353. free(auth_message);
  354. return;
  355. }
  356. else {
  357. #ifdef TRACE_SOCKIO
  358. ++stg_clients_connected;
  359. if (stg_clients_connected % 1000 == 0) {
  360. struct timeval now;
  361. gettimeofday(&now, NULL);
  362. printf("%lu.%06lu: STG %lu clients connected\n",
  363. now.tv_sec, now.tv_usec, stg_clients_connected);
  364. }
  365. #endif
  366. clientid_t c_simid = *((clientid_t *)(auth_message));
  367. // Read the authentication token
  368. unsigned char *auth_ptr = auth_message + sizeof(clientid_t);
  369. bool ret = ecall_storage_authenticate(c_simid, auth_ptr);
  370. free(auth_message);
  371. // If the auth is successful, store this socket into
  372. // a client socket array at the local_c_simid index
  373. // for storage servers to send clients their mailbox periodically.
  374. if(ret) {
  375. uint32_t lcid = c_simid / num_stg_nodes;
  376. client_sockets[lcid] = csocket;
  377. #ifdef TRACE_SOCKIO
  378. ++stg_clients_authenticated;
  379. if (stg_clients_authenticated % 1000 == 0) {
  380. struct timeval now;
  381. gettimeofday(&now, NULL);
  382. printf("%lu.%06lu: STG %lu clients authenticated\n",
  383. now.tv_sec, now.tv_usec, stg_clients_authenticated);
  384. }
  385. #endif
  386. }
  387. else{
  388. printf("Client <-> Storage authentication failed\n");
  389. delete (csocket);
  390. }
  391. }
  392. });
  393. stg_start_accept();
  394. }
  395. /*
  396. Asynchronously accept new client connections
  397. */
  398. void NetIO::ing_start_accept()
  399. {
  400. tcp::socket *csocket = new tcp::socket(io_context());
  401. #ifdef DEBUG_NET_CLIENTS
  402. std::cout << "Accepting on " << myconf.clistenhost << ":" << myconf.clistenport << "\n";
  403. #endif
  404. ingestion_acceptor->async_accept(*csocket,
  405. boost::bind(&NetIO::ing_authenticate_new_client, this, csocket,
  406. boost::asio::placeholders::error));
  407. }
  408. void NetIO::stg_start_accept()
  409. {
  410. tcp::socket *csocket = new tcp::socket(io_context());
  411. #ifdef DEBUG_NET_CLIENTS
  412. std::cout << "Accepting on " << myconf.slistenhost << ":" << myconf.slistenport << "\n";
  413. #endif
  414. storage_acceptor->async_accept(*csocket,
  415. boost::bind(&NetIO::stg_authenticate_new_client, this, csocket,
  416. boost::asio::placeholders::error));
  417. }
  418. void NetIO::send_client_mailbox()
  419. {
  420. #ifdef PROFILE_NET_CLIENTS
  421. struct timespec tp;
  422. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  423. unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  424. #endif
  425. #ifdef TRACE_SOCKIO
  426. size_t clients_without_sockets = 0;
  427. size_t mailboxes_queued = 0;
  428. #endif
  429. // Send each client their tokens and mailboxes for the next epoch
  430. for(uint32_t lcid = 0; lcid < num_clients_per_stg; lcid++)
  431. {
  432. unsigned char *tkn_ptr = epoch_tokens + lcid * token_bundle_size;
  433. unsigned char *buf_ptr = epoch_mailboxes + lcid * mailbox_size;
  434. if(client_sockets[lcid]!=nullptr) {
  435. std::vector<boost::asio::const_buffer> tosend = {
  436. boost::asio::buffer(tkn_ptr, token_bundle_size),
  437. boost::asio::buffer(buf_ptr, mailbox_size)
  438. };
  439. boost::asio::async_write(*(client_sockets[lcid]), tosend,
  440. [this, lcid](boost::system::error_code ec, std::size_t){
  441. if (ec) {
  442. if(ec == boost::asio::error::eof) {
  443. // Client connection terminated so we delete this socket
  444. delete(client_sockets[lcid]);
  445. printf("Client socket terminated!\n");
  446. } else {
  447. printf("Error send_client_mailbox tokens: %s\n",
  448. ec.message().c_str());
  449. }
  450. return;
  451. }
  452. });
  453. #ifdef TRACE_SOCKIO
  454. ++mailboxes_queued;
  455. } else {
  456. ++clients_without_sockets;
  457. #endif
  458. }
  459. }
  460. #ifdef TRACE_SOCKIO
  461. struct timeval now;
  462. gettimeofday(&now, NULL);
  463. printf("%lu.%06lu: STG queued %lu mailboxes; %lu clients without sockets\n",
  464. now.tv_sec, now.tv_usec, mailboxes_queued,
  465. clients_without_sockets);
  466. #endif
  467. #ifdef PROFILE_NET_CLIENTS
  468. clock_gettime(CLOCK_REALTIME_COARSE, &tp);
  469. unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
  470. unsigned long diff = end - start;
  471. printf("send_client_mailbox time: %lu.%06lu s\n", diff/1000000, diff%1000000);
  472. #endif
  473. }
  474. NetIO::NetIO(boost::asio::io_context &io_context, const Config &config)
  475. : context(io_context), conf(config),
  476. myconf(config.nodes[config.my_node_num])
  477. {
  478. num_nodes = nodenum_t(conf.nodes.size());
  479. nodeios.resize(num_nodes);
  480. me = conf.my_node_num;
  481. #ifdef TRACE_SOCKIO
  482. last_ing = {0, 0};
  483. num_ing = 0;
  484. #endif
  485. // Node number n will accept connections from nodes 0, ..., n-1 and
  486. // make connections to nodes n+1, ..., num_nodes-1. This is all
  487. // single threaded, but it doesn't deadlock because node 0 isn't
  488. // waiting for any incoming connections, so it immediately makes
  489. // outgoing connections. When it connects to node 1, that node
  490. // accepts its (only) incoming connection, and then starts making
  491. // its outgoing connections, etc.
  492. tcp::resolver resolver(io_context);
  493. tcp::acceptor acceptor(io_context,
  494. resolver.resolve(myconf.listenhost, myconf.listenport)->endpoint());
  495. for(size_t i=0; i<me; ++i) {
  496. #ifdef VERBOSE_NET
  497. std::cerr << "Accepting number " << i << "\n";
  498. #endif
  499. tcp::socket nodesock = acceptor.accept();
  500. #ifdef VERBOSE_NET
  501. std::cerr << "Accepted number " << i << "\n";
  502. #endif
  503. // Read 2 bytes from the socket, which will be the
  504. // connecting node's node number
  505. unsigned short node_num;
  506. boost::asio::read(nodesock,
  507. boost::asio::buffer(&node_num, sizeof(node_num)));
  508. if (node_num >= num_nodes) {
  509. std::cerr << "Received bad node number\n";
  510. } else {
  511. nodeios[node_num].emplace(std::move(nodesock), node_num);
  512. #ifdef VERBOSE_NET
  513. std::cerr << "Received connection from " <<
  514. config.nodes[node_num].name << "\n";
  515. #endif
  516. }
  517. }
  518. for(size_t i=me+1; i<num_nodes; ++i) {
  519. boost::system::error_code err;
  520. tcp::socket nodesock(io_context);
  521. while(1) {
  522. #ifdef VERBOSE_NET
  523. std::cerr << "Connecting to " << config.nodes[i].name << "...\n";
  524. #endif
  525. boost::asio::connect(nodesock,
  526. resolver.resolve(config.nodes[i].listenhost,
  527. config.nodes[i].listenport), err);
  528. if (!err) break;
  529. std::cerr << "Connection to " << config.nodes[i].name <<
  530. " refused, will retry.\n";
  531. sleep(1);
  532. }
  533. // Write 2 bytes to the socket to tell the peer node our node
  534. // number
  535. nodenum_t node_num = (nodenum_t)me;
  536. boost::asio::write(nodesock,
  537. boost::asio::buffer(&node_num, sizeof(node_num)));
  538. nodeios[i].emplace(std::move(nodesock), i);
  539. #ifdef VERBOSE_NET
  540. std::cerr << "Connected to " << config.nodes[i].name << "\n";
  541. #endif
  542. }
  543. auth_size = sizeof(clientid_t) + sizeof(unsigned long) + SGX_AESGCM_KEY_SIZE;
  544. uint16_t priv_out, priv_in, pub_in;
  545. if(config.private_routing) {
  546. priv_out = conf.m_priv_out;
  547. priv_in = conf.m_priv_in;
  548. msgbundle_size = SGX_AESGCM_IV_SIZE
  549. + (conf.m_priv_out * (conf.msg_size + TOKEN_SIZE))
  550. + SGX_AESGCM_MAC_SIZE;
  551. token_bundle_size = ((priv_out * TOKEN_SIZE)
  552. + SGX_AESGCM_IV_SIZE + SGX_AESGCM_MAC_SIZE);
  553. mailbox_size = (priv_in * conf.msg_size) + SGX_AESGCM_IV_SIZE
  554. + SGX_AESGCM_MAC_SIZE;
  555. } else {
  556. pub_in = conf.m_pub_in;
  557. msgbundle_size = SGX_AESGCM_IV_SIZE
  558. + (conf.m_pub_out * conf.msg_size)
  559. + SGX_AESGCM_MAC_SIZE;
  560. mailbox_size = (pub_in * conf.msg_size) + SGX_AESGCM_IV_SIZE
  561. + SGX_AESGCM_MAC_SIZE;
  562. }
  563. if(myconf.roles & ROLE_STORAGE) {
  564. // Set up the client sockets
  565. // Compute no_of_clients per storage_server
  566. uint32_t num_users = config.user_count;
  567. NodeConfig nc;
  568. num_stg_nodes = 0;
  569. for (nodenum_t i=0; i<num_nodes; ++i) {
  570. nc = conf.nodes[i];
  571. if(nc.roles & ROLE_STORAGE) {
  572. num_stg_nodes++;
  573. }
  574. }
  575. num_clients_per_stg = CEILDIV(num_users, num_stg_nodes);
  576. for(uint32_t i = 0; i<num_clients_per_stg; i++) {
  577. client_sockets.emplace_back(nullptr);
  578. }
  579. uint32_t epoch_mailboxes_size = num_clients_per_stg * mailbox_size;
  580. uint32_t epoch_tokens_size = num_clients_per_stg * token_bundle_size;
  581. epoch_mailboxes = (unsigned char *) malloc(epoch_mailboxes_size);
  582. epoch_tokens = (unsigned char *) malloc (epoch_tokens_size);
  583. ecall_supply_storage_buffers(epoch_mailboxes, epoch_mailboxes_size,
  584. epoch_tokens, epoch_tokens_size);
  585. storage_acceptor = std::shared_ptr<tcp::acceptor>(
  586. new tcp::acceptor(io_context,
  587. resolver.resolve(this->myconf.slistenhost,
  588. this->myconf.slistenport)->endpoint()));
  589. stg_start_accept();
  590. }
  591. if(myconf.roles & ROLE_INGESTION) {
  592. ingestion_acceptor = std::shared_ptr<tcp::acceptor>(
  593. new tcp::acceptor(io_context,
  594. resolver.resolve(this->myconf.clistenhost,
  595. this->myconf.clistenport)->endpoint()));
  596. ing_start_accept();
  597. }
  598. }
  599. void NetIO::recv_commands(
  600. std::function<void(boost::system::error_code)> error_cb,
  601. std::function<void(uint32_t)> epoch_cb)
  602. {
  603. for (nodenum_t node_num = 0; node_num < num_nodes; ++node_num) {
  604. if (node_num == me) continue;
  605. NodeIO &n = node(node_num);
  606. n.recv_commands(error_cb, epoch_cb);
  607. }
  608. }
  609. void NetIO::close()
  610. {
  611. for (nodenum_t node_num = 0; node_num < num_nodes; ++node_num) {
  612. if (node_num == me) continue;
  613. NodeIO &n = node(node_num);
  614. n.close();
  615. }
  616. }
  617. /* The enclave calls this to inform the untrusted app that there's a new
  618. * messaage to send. The return value is the frame the enclave should
  619. * use to store the first (encrypted) chunk of this message. */
  620. uint8_t *ocall_message(nodenum_t node_num, uint32_t message_len)
  621. {
  622. assert(g_netio != NULL);
  623. NodeIO &node = g_netio->node(node_num);
  624. node.send_message_header(message_len);
  625. return node.request_frame();
  626. }
  627. /* The enclave calls this to inform the untrusted app that there's a new
  628. * chunk to send. The return value is the frame the enclave should use
  629. * to store the next (encrypted) chunk of this message, or NULL if this
  630. * was the last chunk. */
  631. uint8_t *ocall_chunk(nodenum_t node_num, uint8_t *chunkdata,
  632. uint32_t chunklen)
  633. {
  634. assert(g_netio != NULL);
  635. NodeIO &node = g_netio->node(node_num);
  636. bool morechunks = node.send_chunk(chunkdata, chunklen);
  637. if (morechunks) {
  638. return node.request_frame();
  639. }
  640. return NULL;
  641. }