net.cpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530
  1. #include <iostream>
  2. #include "Enclave_u.h"
  3. #include "Untrusted.hpp"
  4. #include "net.hpp"
  5. // The command type byte values
  6. #define COMMAND_EPOCH 0x00
  7. #define COMMAND_MESSAGE 0x01
  8. #define COMMAND_CHUNK 0x02
  9. #define VERBOSE_NET
  10. // #define DEBUG_NET_CLIENTS
  11. #define CEILDIV(x,y) (((x)+(y)-1)/(y))
  12. NetIO *g_netio = NULL;
  13. NodeIO::NodeIO(tcp::socket &&socket, nodenum_t nodenum) :
  14. sock(std::move(socket)), node_num(nodenum)
  15. {
  16. }
  17. uint8_t *NodeIO::request_frame()
  18. {
  19. if (frames_available.empty()) {
  20. // Allocate a new frame. Note that this memory will (at this
  21. // time) never get deallocated. In theory, we could deallocate
  22. // it in return_frame, but if a certain number of frames were
  23. // allocated here, it means we had that much data in flight
  24. // (queued but not accepted for sending by the OS), and we're
  25. // likely to need that much again. Subsequent messages will
  26. // _reuse_ the allocated data, though, so the used memory won't
  27. // grow forever, and will be limited to the amount of in-flight
  28. // data needed.
  29. return new uint8_t[FRAME_SIZE];
  30. }
  31. // Copy the pointer to the frame out of the deque and remove it from
  32. // the deque. Note this is _not_ taking the address of the element
  33. // *in* the deque (and then popping it, which would invalidate that
  34. // pointer).
  35. frame_deque_lock.lock();
  36. uint8_t *frame = frames_available.back();
  37. frames_available.pop_back();
  38. frame_deque_lock.unlock();
  39. return frame;
  40. }
  41. void NodeIO::return_frame(uint8_t *frame)
  42. {
  43. if (!frame) return;
  44. // We push the frame back on to the end of the deque so that it will
  45. // be the next one used. This may lead to better cache behaviour?
  46. frame_deque_lock.lock();
  47. frames_available.push_back(frame);
  48. frame_deque_lock.unlock();
  49. }
  50. void NodeIO::send_header_data(uint64_t header, uint8_t *data, size_t len)
  51. {
  52. commands_deque_lock.lock();
  53. commands_inflight.push_back({header, data, len});
  54. if (commands_inflight.size() == 1) {
  55. async_send_commands();
  56. }
  57. commands_deque_lock.unlock();
  58. }
  59. void NodeIO::async_send_commands()
  60. {
  61. std::vector<boost::asio::const_buffer> tosend;
  62. CommandTuple *commandp = &(commands_inflight.front());
  63. tosend.push_back(boost::asio::buffer(&(std::get<0>(*commandp)), 5));
  64. if (std::get<1>(*commandp) != NULL && std::get<2>(*commandp) > 0) {
  65. tosend.push_back(boost::asio::buffer(std::get<1>(*commandp),
  66. std::get<2>(*commandp)));
  67. }
  68. boost::asio::async_write(sock, tosend,
  69. [this, commandp](boost::system::error_code, std::size_t){
  70. // When the write completes, pop the command from the deque
  71. // (which should now be in the front)
  72. commands_deque_lock.lock();
  73. assert(!commands_inflight.empty() &&
  74. &(commands_inflight.front()) == commandp);
  75. uint8_t *data = std::get<1>(*commandp);
  76. commands_inflight.pop_front();
  77. if (commands_inflight.size() > 0) {
  78. async_send_commands();
  79. }
  80. // And return the frame
  81. return_frame(data);
  82. commands_deque_lock.unlock();
  83. });
  84. }
  85. void NodeIO::send_epoch(uint32_t epoch_num)
  86. {
  87. uint64_t header = (uint64_t(epoch_num) << 8) + COMMAND_EPOCH;
  88. send_header_data(header, NULL, 0);
  89. }
  90. void NodeIO::send_message_header(uint32_t tot_message_len)
  91. {
  92. uint64_t header = (uint64_t(tot_message_len) << 8) + COMMAND_MESSAGE;
  93. send_header_data(header, NULL, 0);
  94. // If we're sending a new message header, we have to have finished
  95. // sending the previous message.
  96. assert(chunksize_inflight == msgsize_inflight);
  97. msgsize_inflight = tot_message_len;
  98. chunksize_inflight = 0;
  99. }
  100. bool NodeIO::send_chunk(uint8_t *data, uint32_t chunk_len)
  101. {
  102. assert(chunk_len <= FRAME_SIZE);
  103. uint64_t header = (uint64_t(chunk_len) << 8) + COMMAND_CHUNK;
  104. send_header_data(header, data, chunk_len);
  105. chunksize_inflight += chunk_len;
  106. assert(chunksize_inflight <= msgsize_inflight);
  107. return (chunksize_inflight < msgsize_inflight);
  108. }
  109. void NodeIO::recv_commands(
  110. std::function<void(boost::system::error_code)> error_cb,
  111. std::function<void(uint32_t)> epoch_cb)
  112. {
  113. // Asynchronously read the header
  114. receive_header = 0;
  115. boost::asio::async_read(sock, boost::asio::buffer(&receive_header, 5),
  116. [this, error_cb, epoch_cb]
  117. (boost::system::error_code ec, std::size_t) {
  118. if (ec) {
  119. error_cb(ec);
  120. return;
  121. }
  122. if ((receive_header & 0xff) == COMMAND_EPOCH) {
  123. epoch_cb(uint32_t(receive_header >> 8));
  124. recv_commands(error_cb, epoch_cb);
  125. } else if ((receive_header & 0xff) == COMMAND_MESSAGE) {
  126. assert(recv_msgsize_inflight == recv_chunksize_inflight);
  127. recv_msgsize_inflight = uint32_t(receive_header >> 8);
  128. recv_chunksize_inflight = 0;
  129. if (ecall_message(node_num, recv_msgsize_inflight)) {
  130. recv_commands(error_cb, epoch_cb);
  131. } else {
  132. printf("ecall_message failed\n");
  133. }
  134. } else if ((receive_header & 0xff) == COMMAND_CHUNK) {
  135. uint32_t this_chunk_size = uint32_t(receive_header >> 8);
  136. assert(recv_chunksize_inflight + this_chunk_size <=
  137. recv_msgsize_inflight);
  138. recv_chunksize_inflight += this_chunk_size;
  139. boost::asio::async_read(sock, boost::asio::buffer(
  140. receive_frame, this_chunk_size),
  141. [this, error_cb, epoch_cb, this_chunk_size]
  142. (boost::system::error_code ecc, std::size_t) {
  143. if (ecc) {
  144. error_cb(ecc);
  145. return;
  146. }
  147. if (ecall_chunk(node_num, receive_frame,
  148. this_chunk_size)) {
  149. recv_commands(error_cb, epoch_cb);
  150. } else {
  151. printf("ecall_chunk failed\n");
  152. }
  153. });
  154. } else {
  155. error_cb(boost::system::errc::make_error_code(
  156. boost::system::errc::errc_t::invalid_argument));
  157. }
  158. });
  159. }
  160. /*
  161. Receive clients dropped off messages, i.e. a CLIENT_MESSAGE_BUNDLE
  162. */
  163. void NetIO::ing_receive_msgbundle(tcp::socket* csocket, clientid_t c_simid)
  164. {
  165. unsigned char *msgbundle = (unsigned char*) malloc(msgbundle_size);
  166. boost::asio::async_read(*csocket, boost::asio::buffer(msgbundle, msgbundle_size),
  167. [this, csocket, msgbundle, c_simid]
  168. (boost::system::error_code ec, std::size_t) {
  169. if (ec) {
  170. if(ec == boost::asio::error::eof) {
  171. // Client connection terminated so we delete this socket
  172. delete(csocket);
  173. }
  174. else {
  175. printf("Error %s\n", ec.message().c_str());
  176. }
  177. return;
  178. }
  179. //Ingest the message_bundle
  180. bool ret = ecall_ingest_msgbundle(c_simid, msgbundle, apiparams.m_priv_out);
  181. free(msgbundle);
  182. // Continue to async receive client message bundles
  183. ing_receive_msgbundle(csocket, c_simid);
  184. });
  185. }
  186. /*
  187. Handle new client connections.
  188. New clients always send an authentication message.
  189. For ingestion this is then followed by their msg_bundles every epoch.
  190. */
  191. void NetIO::ing_authenticate_new_client(tcp::socket* csocket,
  192. const boost::system::error_code& error)
  193. {
  194. if(error) {
  195. printf("Accept handler failed\n");
  196. return;
  197. }
  198. #ifdef DEBUG_NET_CLIENTS
  199. printf("Accept handler success\n");
  200. #endif
  201. unsigned char* auth_message = (unsigned char*) malloc(auth_size);
  202. boost::asio::async_read(*csocket, boost::asio::buffer(auth_message, auth_size),
  203. [this, csocket, auth_message]
  204. (boost::system::error_code ec, std::size_t) {
  205. if (ec) {
  206. if(ec == boost::asio::error::eof) {
  207. // Client connection terminated so we delete this socket
  208. delete(csocket);
  209. } else {
  210. printf("Error %s\n", ec.message().c_str());
  211. }
  212. return;
  213. }
  214. else {
  215. clientid_t c_simid = *((clientid_t *)(auth_message));
  216. // Read the authentication token
  217. unsigned char *auth_ptr = auth_message + sizeof(clientid_t);
  218. bool ret = ecall_authenticate(c_simid, auth_ptr);
  219. free(auth_message);
  220. // Receive client message bundles on this socket
  221. // for client sim_id c_simid
  222. if(ret) {
  223. ing_receive_msgbundle(csocket, c_simid);
  224. } else{
  225. delete(csocket);
  226. }
  227. }
  228. });
  229. ing_start_accept();
  230. }
  231. /*
  232. Handle new client connections.
  233. New clients always send an authentication message.
  234. For storage this is then followed by the storage servers sending them
  235. their mailbox every epoch.
  236. */
  237. void NetIO::stg_authenticate_new_client(tcp::socket* csocket,
  238. const boost::system::error_code& error)
  239. {
  240. if(error) {
  241. printf("Accept handler failed\n");
  242. return;
  243. }
  244. #ifdef DEBUG_NET_CLIENTS
  245. printf("Accept handler success\n");
  246. #endif
  247. unsigned char* auth_message = (unsigned char*) malloc(auth_size);
  248. boost::asio::async_read(*csocket, boost::asio::buffer(auth_message, auth_size),
  249. [this, csocket, auth_message]
  250. (boost::system::error_code ec, std::size_t) {
  251. if (ec) {
  252. if(ec == boost::asio::error::eof) {
  253. // Client connection terminated so we delete this socket
  254. delete(csocket);
  255. } else {
  256. printf("Error %s\n", ec.message().c_str());
  257. }
  258. return;
  259. }
  260. else {
  261. clientid_t c_simid = *((clientid_t *)(auth_message));
  262. // Read the authentication token
  263. unsigned char *auth_ptr = auth_message + sizeof(clientid_t);
  264. bool ret = ecall_storage_authenticate(c_simid, auth_ptr);
  265. free(auth_message);
  266. // If the auth is successful, store this socket into
  267. // a client socket array at the local_c_simid index
  268. // for storage servers to send clients their mailbox periodically.
  269. if(ret) {
  270. uint32_t lcid = c_simid / num_stg_nodes;
  271. client_sockets[lcid] = csocket;
  272. //TODO: Send back this clients tokens for first epoch
  273. unsigned char *tkn_ptr = epoch_tokens + (token_bundle_size * lcid);
  274. /*
  275. if(c_simid == 3) {
  276. printf("Just before async_write: Client tokens for c_simid 0:\n");
  277. unsigned char *tmp_tkn_ptr = tkn_ptr + SGX_AESGCM_IV_SIZE;
  278. for(uint32_t i = 0; i < 2 * SGX_AESGCM_KEY_SIZE; i++) {
  279. printf("%x", tmp_tkn_ptr[i]);
  280. }
  281. printf("\n");
  282. }
  283. */
  284. boost::asio::async_write(*(client_sockets[lcid]),
  285. boost::asio::buffer(tkn_ptr, token_bundle_size),
  286. [this](boost::system::error_code ecc, std::size_t){
  287. if (ecc) {
  288. if(ecc == boost::asio::error::eof) {
  289. // Client connection terminated so we delete this socket
  290. // delete(csocket);
  291. printf("Client socket terminated!\n");
  292. } else {
  293. printf("Error %s\n", ecc.message().c_str());
  294. }
  295. return;
  296. }
  297. });
  298. } else {
  299. printf("Net: Storage Authentication FAIL\n");
  300. }
  301. }
  302. });
  303. stg_start_accept();
  304. }
  305. /*
  306. Asynchronously accept new client connections
  307. */
  308. void NetIO::ing_start_accept()
  309. {
  310. tcp::socket *csocket = new tcp::socket(io_context());
  311. #ifdef DEBUG_NET_CLIENTS
  312. std::cout << "Accepting on " << myconf.clistenhost << ":" << myconf.clistenport << "\n";
  313. #endif
  314. ingestion_acceptor->async_accept(*csocket,
  315. boost::bind(&NetIO::ing_authenticate_new_client, this, csocket,
  316. boost::asio::placeholders::error));
  317. }
  318. void NetIO::stg_start_accept()
  319. {
  320. tcp::socket *csocket = new tcp::socket(io_context());
  321. #ifdef DEBUG_NET_CLIENTS
  322. std::cout << "Accepting on " << myconf.slistenhost << ":" << myconf.slistenport << "\n";
  323. #endif
  324. storage_acceptor->async_accept(*csocket,
  325. boost::bind(&NetIO::stg_authenticate_new_client, this, csocket,
  326. boost::asio::placeholders::error));
  327. }
  328. NetIO::NetIO(boost::asio::io_context &io_context, const Config &config)
  329. : context(io_context), conf(config),
  330. myconf(config.nodes[config.my_node_num])
  331. {
  332. num_nodes = nodenum_t(conf.nodes.size());
  333. nodeios.resize(num_nodes);
  334. me = conf.my_node_num;
  335. // Node number n will accept connections from nodes 0, ..., n-1 and
  336. // make connections to nodes n+1, ..., num_nodes-1. This is all
  337. // single threaded, but it doesn't deadlock because node 0 isn't
  338. // waiting for any incoming connections, so it immediately makes
  339. // outgoing connections. When it connects to node 1, that node
  340. // accepts its (only) incoming connection, and then starts making
  341. // its outgoing connections, etc.
  342. tcp::resolver resolver(io_context);
  343. tcp::acceptor acceptor(io_context,
  344. resolver.resolve(myconf.listenhost, myconf.listenport)->endpoint());
  345. for(size_t i=0; i<me; ++i) {
  346. #ifdef VERBOSE_NET
  347. std::cerr << "Accepting number " << i << "\n";
  348. #endif
  349. tcp::socket nodesock = acceptor.accept();
  350. #ifdef VERBOSE_NET
  351. std::cerr << "Accepted number " << i << "\n";
  352. #endif
  353. // Read 2 bytes from the socket, which will be the
  354. // connecting node's node number
  355. unsigned short node_num;
  356. boost::asio::read(nodesock,
  357. boost::asio::buffer(&node_num, sizeof(node_num)));
  358. if (node_num >= num_nodes) {
  359. std::cerr << "Received bad node number\n";
  360. } else {
  361. nodeios[node_num].emplace(std::move(nodesock), node_num);
  362. #ifdef VERBOSE_NET
  363. std::cerr << "Received connection from " <<
  364. config.nodes[node_num].name << "\n";
  365. #endif
  366. }
  367. }
  368. for(size_t i=me+1; i<num_nodes; ++i) {
  369. boost::system::error_code err;
  370. tcp::socket nodesock(io_context);
  371. while(1) {
  372. #ifdef VERBOSE_NET
  373. std::cerr << "Connecting to " << config.nodes[i].name << "...\n";
  374. #endif
  375. boost::asio::connect(nodesock,
  376. resolver.resolve(config.nodes[i].listenhost,
  377. config.nodes[i].listenport), err);
  378. if (!err) break;
  379. std::cerr << "Connection to " << config.nodes[i].name <<
  380. " refused, will retry.\n";
  381. sleep(1);
  382. }
  383. // Write 2 bytes to the socket to tell the peer node our node
  384. // number
  385. nodenum_t node_num = (nodenum_t)me;
  386. boost::asio::write(nodesock,
  387. boost::asio::buffer(&node_num, sizeof(node_num)));
  388. nodeios[i].emplace(std::move(nodesock), i);
  389. #ifdef VERBOSE_NET
  390. std::cerr << "Connected to " << config.nodes[i].name << "\n";
  391. #endif
  392. }
  393. auth_size = sizeof(clientid_t) + sizeof(unsigned long) + SGX_AESGCM_KEY_SIZE;
  394. msgbundle_size = SGX_AESGCM_IV_SIZE +
  395. (apiparams.m_priv_out * apiparams.msg_size) + SGX_AESGCM_MAC_SIZE;
  396. if(myconf.roles & ROLE_STORAGE) {
  397. // Setup the client sockets
  398. // Compute no_of_clients per storage_server
  399. uint32_t num_users = config.user_count;
  400. NodeConfig nc;
  401. num_stg_nodes = 0;
  402. for (nodenum_t i=0; i<num_nodes; ++i) {
  403. nc = conf.nodes[i];
  404. if(nc.roles & ROLE_STORAGE) {
  405. num_stg_nodes++;
  406. }
  407. }
  408. uint32_t num_clients_per_stg = CEILDIV(num_users, num_stg_nodes);
  409. client_sockets.resize(num_clients_per_stg);
  410. uint16_t num_priv_channels = config.m_priv_in;
  411. uint16_t msg_size = config.msg_size;
  412. uint32_t epoch_msgbundles_size = num_clients_per_stg *
  413. (SGX_AESGCM_IV_SIZE + msg_size * num_priv_channels + SGX_AESGCM_MAC_SIZE);
  414. token_bundle_size = ((num_priv_channels * SGX_AESGCM_KEY_SIZE) + SGX_AESGCM_IV_SIZE + SGX_AESGCM_MAC_SIZE);
  415. uint32_t epoch_tokens_size = num_clients_per_stg * token_bundle_size;
  416. epoch_msgbundles = (unsigned char *) malloc(epoch_msgbundles_size);
  417. epoch_tokens = (unsigned char *) malloc (epoch_tokens_size);
  418. ecall_supply_storage_buffers(epoch_msgbundles, epoch_msgbundles_size,
  419. epoch_tokens, epoch_tokens_size);
  420. storage_acceptor = std::shared_ptr<tcp::acceptor>(
  421. new tcp::acceptor(io_context,
  422. resolver.resolve(this->myconf.slistenhost,
  423. this->myconf.slistenport)->endpoint()));
  424. stg_start_accept();
  425. }
  426. if(myconf.roles & ROLE_INGESTION) {
  427. ingestion_acceptor = std::shared_ptr<tcp::acceptor>(
  428. new tcp::acceptor(io_context,
  429. resolver.resolve(this->myconf.clistenhost,
  430. this->myconf.clistenport)->endpoint()));
  431. ing_start_accept();
  432. }
  433. }
  434. void NetIO::recv_commands(
  435. std::function<void(boost::system::error_code)> error_cb,
  436. std::function<void(uint32_t)> epoch_cb)
  437. {
  438. for (nodenum_t node_num = 0; node_num < num_nodes; ++node_num) {
  439. if (node_num == me) continue;
  440. NodeIO &n = node(node_num);
  441. n.recv_commands(error_cb, epoch_cb);
  442. }
  443. }
  444. void NetIO::close()
  445. {
  446. for (nodenum_t node_num = 0; node_num < num_nodes; ++node_num) {
  447. if (node_num == me) continue;
  448. NodeIO &n = node(node_num);
  449. n.close();
  450. }
  451. }
  452. /* The enclave calls this to inform the untrusted app that there's a new
  453. * messaage to send. The return value is the frame the enclave should
  454. * use to store the first (encrypted) chunk of this message. */
  455. uint8_t *ocall_message(nodenum_t node_num, uint32_t message_len)
  456. {
  457. assert(g_netio != NULL);
  458. NodeIO &node = g_netio->node(node_num);
  459. node.send_message_header(message_len);
  460. return node.request_frame();
  461. }
  462. /* The enclave calls this to inform the untrusted app that there's a new
  463. * chunk to send. The return value is the frame the enclave should use
  464. * to store the next (encrypted) chunk of this message, or NULL if this
  465. * was the last chunk. */
  466. uint8_t *ocall_chunk(nodenum_t node_num, uint8_t *chunkdata,
  467. uint32_t chunklen)
  468. {
  469. assert(g_netio != NULL);
  470. NodeIO &node = g_netio->node(node_num);
  471. bool morechunks = node.send_chunk(chunkdata, chunklen);
  472. if (morechunks) {
  473. return node.request_frame();
  474. }
  475. return NULL;
  476. }