mpcio.cpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772
  1. #include "mpcio.hpp"
  2. #include "rdpf.hpp"
  3. #include "bitutils.hpp"
  4. // T is the type being stored
  5. // N is a type whose "name" static member is a string naming the type
  6. // so that we can report something useful to the user if they try
  7. // to read a type that we don't have any more values for
  8. template<typename T, typename N>
  9. PreCompStorage<T,N>::PreCompStorage(unsigned player, bool preprocessing,
  10. const char *filenameprefix, unsigned thread_num) :
  11. name(N::name), depth(0)
  12. {
  13. init(player, preprocessing, filenameprefix, thread_num);
  14. }
  15. template<typename T, typename N>
  16. void PreCompStorage<T,N>::init(unsigned player, bool preprocessing,
  17. const char *filenameprefix, unsigned thread_num, nbits_t depth)
  18. {
  19. if (preprocessing) return;
  20. std::string filename(filenameprefix);
  21. char suffix[20];
  22. if (depth) {
  23. this->depth = depth;
  24. sprintf(suffix, "%02d.p%d.t%u", depth, player%10, thread_num);
  25. } else {
  26. sprintf(suffix, ".p%d.t%u", player%10, thread_num);
  27. }
  28. filename.append(suffix);
  29. storage.open(filename);
  30. // It's OK if not every file exists; so don't worry about checking
  31. // for errors here. We'll report an error in get() if we actually
  32. // try to use a value for which we don't have a precomputed file.
  33. count = 0;
  34. }
  35. template<typename T, typename N>
  36. void PreCompStorage<T,N>::get(T& nextval)
  37. {
  38. storage >> nextval;
  39. if (!storage.good()) {
  40. std::cerr << "Failed to read precomputed value from " << name;
  41. if (depth) {
  42. std::cerr << (int)depth;
  43. }
  44. std::cerr << " storage\n";
  45. exit(1);
  46. }
  47. ++count;
  48. }
  49. void MPCSingleIO::async_send_from_msgqueue()
  50. {
  51. #ifdef SEND_LAMPORT_CLOCKS
  52. std::vector<boost::asio::const_buffer> tosend;
  53. tosend.push_back(boost::asio::buffer(messagequeue.front().header));
  54. tosend.push_back(boost::asio::buffer(messagequeue.front().message));
  55. #endif
  56. boost::asio::async_write(sock,
  57. #ifdef SEND_LAMPORT_CLOCKS
  58. tosend,
  59. #else
  60. boost::asio::buffer(messagequeue.front()),
  61. #endif
  62. [&](boost::system::error_code ec, std::size_t amt){
  63. messagequeuelock.lock();
  64. messagequeue.pop();
  65. if (messagequeue.size() > 0) {
  66. async_send_from_msgqueue();
  67. }
  68. messagequeuelock.unlock();
  69. });
  70. }
  71. size_t MPCSingleIO::queue(const void *data, size_t len, lamport_t lamport)
  72. {
  73. // Is this a new message?
  74. size_t newmsg = 0;
  75. dataqueue.append((const char *)data, len);
  76. // If this is the first queue() since the last explicit send(),
  77. // which we'll know because message_lamport will be nullopt, set
  78. // message_lamport to the current Lamport clock. Note that the
  79. // boolean test tests whether message_lamport is nullopt, not
  80. // whether its value is zero.
  81. if (!message_lamport) {
  82. message_lamport = lamport;
  83. newmsg = 1;
  84. }
  85. // If we already have some full packets worth of data, may as
  86. // well send it.
  87. if (dataqueue.size() > 28800) {
  88. send(true);
  89. }
  90. return newmsg;
  91. }
  92. void MPCSingleIO::send(bool implicit_send)
  93. {
  94. size_t thissize = dataqueue.size();
  95. // Ignore spurious calls to send(), except for resetting
  96. // message_lamport if this was an explicit send().
  97. if (thissize == 0) {
  98. #ifdef SEND_LAMPORT_CLOCKS
  99. // If this was an explicit send(), reset the message_lamport so
  100. // that it gets updated at the next queue().
  101. if (!implicit_send) {
  102. message_lamport.reset();
  103. }
  104. #endif
  105. return;
  106. }
  107. #ifdef RECORD_IOTRACE
  108. iotrace.push_back(thissize);
  109. #endif
  110. messagequeuelock.lock();
  111. // Move the current message to send into the message queue (this
  112. // moves a pointer to the data, not copying the data itself)
  113. #ifdef SEND_LAMPORT_CLOCKS
  114. messagequeue.emplace(std::move(dataqueue),
  115. message_lamport.value());
  116. // If this was an explicit send(), reset the message_lamport so
  117. // that it gets updated at the next queue().
  118. if (!implicit_send) {
  119. message_lamport.reset();
  120. }
  121. #else
  122. messagequeue.emplace(std::move(dataqueue));
  123. #endif
  124. // If this is now the first thing in the message queue, launch
  125. // an async_write to write it
  126. if (messagequeue.size() == 1) {
  127. async_send_from_msgqueue();
  128. }
  129. messagequeuelock.unlock();
  130. }
  131. size_t MPCSingleIO::recv(void *data, size_t len, lamport_t &lamport)
  132. {
  133. #ifdef SEND_LAMPORT_CLOCKS
  134. char *cdata = (char *)data;
  135. size_t res = 0;
  136. while (len > 0) {
  137. while (recvdataremain == 0) {
  138. // Read a new header
  139. char hdr[sizeof(uint32_t) + sizeof(lamport_t)];
  140. uint32_t datalen;
  141. lamport_t recv_lamport;
  142. boost::asio::read(sock, boost::asio::buffer(hdr, sizeof(hdr)));
  143. memmove(&datalen, hdr, sizeof(datalen));
  144. memmove(&recv_lamport, hdr+sizeof(datalen), sizeof(lamport_t));
  145. lamport_t new_lamport = recv_lamport + 1;
  146. if (lamport < new_lamport) {
  147. lamport = new_lamport;
  148. }
  149. if (datalen > 0) {
  150. recvdata.resize(datalen, '\0');
  151. boost::asio::read(sock, boost::asio::buffer(recvdata));
  152. recvdataremain = datalen;
  153. }
  154. }
  155. size_t amttoread = len;
  156. if (amttoread > recvdataremain) {
  157. amttoread = recvdataremain;
  158. }
  159. memmove(cdata, recvdata.data()+recvdata.size()-recvdataremain,
  160. amttoread);
  161. cdata += amttoread;
  162. len -= amttoread;
  163. recvdataremain -= amttoread;
  164. res += amttoread;
  165. }
  166. #else
  167. size_t res = boost::asio::read(sock, boost::asio::buffer(data, len));
  168. #endif
  169. #ifdef RECORD_IOTRACE
  170. iotrace.push_back(-(ssize_t(res)));
  171. #endif
  172. return res;
  173. }
  174. #ifdef RECORD_IOTRACE
  175. void MPCSingleIO::dumptrace(std::ostream &os, const char *label)
  176. {
  177. if (label) {
  178. os << label << " ";
  179. }
  180. os << "IO trace:";
  181. for (auto& s: iotrace) {
  182. os << " " << s;
  183. }
  184. os << "\n";
  185. }
  186. #endif
  187. void MPCIO::reset_stats()
  188. {
  189. msgs_sent.clear();
  190. msg_bytes_sent.clear();
  191. aes_ops.clear();
  192. for (size_t i=0; i<num_threads; ++i) {
  193. msgs_sent.push_back(0);
  194. msg_bytes_sent.push_back(0);
  195. aes_ops.push_back(0);
  196. }
  197. steady_start = boost::chrono::steady_clock::now();
  198. cpu_start = boost::chrono::process_cpu_clock::now();
  199. }
  200. void MPCIO::dump_stats(std::ostream &os)
  201. {
  202. size_t tot_msgs_sent = 0;
  203. size_t tot_msg_bytes_sent = 0;
  204. size_t tot_aes_ops = 0;
  205. for (auto& n : msgs_sent) {
  206. tot_msgs_sent += n;
  207. }
  208. for (auto& n : msg_bytes_sent) {
  209. tot_msg_bytes_sent += n;
  210. }
  211. for (auto& n : aes_ops) {
  212. tot_aes_ops += n;
  213. }
  214. auto steady_elapsed =
  215. boost::chrono::steady_clock::now() - steady_start;
  216. auto cpu_elapsed =
  217. boost::chrono::process_cpu_clock::now() - cpu_start;
  218. os << tot_msgs_sent << " messages sent\n";
  219. os << tot_msg_bytes_sent << " message bytes sent\n";
  220. os << tot_aes_ops << " local AES operations\n";
  221. os << lamport << " Lamport clock (latencies)\n";
  222. os << boost::chrono::duration_cast
  223. <boost::chrono::milliseconds>(steady_elapsed) <<
  224. " wall clock time\n";
  225. os << cpu_elapsed << " {real;user;system}\n";
  226. }
  227. MPCPeerIO::MPCPeerIO(unsigned player, bool preprocessing,
  228. std::deque<tcp::socket> &peersocks,
  229. std::deque<tcp::socket> &serversocks) :
  230. MPCIO(player, preprocessing, peersocks.size())
  231. {
  232. unsigned num_threads = unsigned(peersocks.size());
  233. for (unsigned i=0; i<num_threads; ++i) {
  234. triples.emplace_back(player, preprocessing, "triples", i);
  235. }
  236. for (unsigned i=0; i<num_threads; ++i) {
  237. halftriples.emplace_back(player, preprocessing, "halves", i);
  238. }
  239. rdpftriples.resize(num_threads);
  240. for (unsigned i=0; i<num_threads; ++i) {
  241. for (unsigned depth=1; depth<=ADDRESS_MAX_BITS; ++depth) {
  242. rdpftriples[i][depth-1].init(player, preprocessing,
  243. "rdpf", i, depth);
  244. }
  245. }
  246. for (auto &&sock : peersocks) {
  247. peerios.emplace_back(std::move(sock));
  248. }
  249. for (auto &&sock : serversocks) {
  250. serverios.emplace_back(std::move(sock));
  251. }
  252. }
  253. void MPCPeerIO::dump_precomp_stats(std::ostream &os)
  254. {
  255. for (size_t i=0; i<triples.size(); ++i) {
  256. if (i > 0) {
  257. os << " ";
  258. }
  259. os << "T" << i << " t:" << triples[i].get_stats() <<
  260. " h:" << halftriples[i].get_stats();
  261. for (nbits_t depth=1; depth<=ADDRESS_MAX_BITS; ++depth) {
  262. size_t cnt = rdpftriples[i][depth-1].get_stats();
  263. if (cnt > 0) {
  264. os << " r" << int(depth) << ":" << cnt;
  265. }
  266. }
  267. }
  268. os << "\n";
  269. }
  270. void MPCPeerIO::reset_precomp_stats()
  271. {
  272. for (size_t i=0; i<triples.size(); ++i) {
  273. triples[i].reset_stats();
  274. halftriples[i].reset_stats();
  275. for (nbits_t depth=1; depth<=ADDRESS_MAX_BITS; ++depth) {
  276. rdpftriples[i][depth-1].reset_stats();
  277. }
  278. }
  279. }
  280. void MPCPeerIO::dump_stats(std::ostream &os)
  281. {
  282. MPCIO::dump_stats(os);
  283. os << "Precomputed values used: ";
  284. dump_precomp_stats(os);
  285. }
  286. MPCServerIO::MPCServerIO(bool preprocessing,
  287. std::deque<tcp::socket> &p0socks,
  288. std::deque<tcp::socket> &p1socks) :
  289. MPCIO(2, preprocessing, p0socks.size())
  290. {
  291. rdpfpairs.resize(num_threads);
  292. for (unsigned i=0; i<num_threads; ++i) {
  293. for (unsigned depth=1; depth<=ADDRESS_MAX_BITS; ++depth) {
  294. rdpfpairs[i][depth-1].init(player, preprocessing,
  295. "rdpf", i, depth);
  296. }
  297. }
  298. for (auto &&sock : p0socks) {
  299. p0ios.emplace_back(std::move(sock));
  300. }
  301. for (auto &&sock : p1socks) {
  302. p1ios.emplace_back(std::move(sock));
  303. }
  304. }
  305. void MPCServerIO::dump_precomp_stats(std::ostream &os)
  306. {
  307. for (size_t i=0; i<rdpfpairs.size(); ++i) {
  308. if (i > 0) {
  309. os << " ";
  310. }
  311. os << "T" << i;
  312. for (nbits_t depth=1; depth<=ADDRESS_MAX_BITS; ++depth) {
  313. size_t cnt = rdpfpairs[i][depth-1].get_stats();
  314. if (cnt > 0) {
  315. os << " r" << int(depth) << ":" << cnt;
  316. }
  317. }
  318. }
  319. os << "\n";
  320. }
  321. void MPCServerIO::reset_precomp_stats()
  322. {
  323. for (size_t i=0; i<rdpfpairs.size(); ++i) {
  324. for (nbits_t depth=1; depth<=ADDRESS_MAX_BITS; ++depth) {
  325. rdpfpairs[i][depth-1].reset_stats();
  326. }
  327. }
  328. }
  329. void MPCServerIO::dump_stats(std::ostream &os)
  330. {
  331. MPCIO::dump_stats(os);
  332. os << "Precomputed values used: ";
  333. dump_precomp_stats(os);
  334. }
  335. MPCTIO::MPCTIO(MPCIO &mpcio, int thread_num) :
  336. thread_num(thread_num), thread_lamport(mpcio.lamport),
  337. mpcio(mpcio)
  338. {
  339. if (mpcio.player < 2) {
  340. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  341. peer_iostream.emplace(mpcpio.peerios[thread_num],
  342. thread_lamport, mpcpio.msgs_sent[thread_num],
  343. mpcpio.msg_bytes_sent[thread_num]);
  344. server_iostream.emplace(mpcpio.serverios[thread_num],
  345. thread_lamport, mpcpio.msgs_sent[thread_num],
  346. mpcpio.msg_bytes_sent[thread_num]);
  347. } else {
  348. MPCServerIO &mpcsrvio = static_cast<MPCServerIO&>(mpcio);
  349. p0_iostream.emplace(mpcsrvio.p0ios[thread_num],
  350. thread_lamport, mpcsrvio.msgs_sent[thread_num],
  351. mpcsrvio.msg_bytes_sent[thread_num]);
  352. p1_iostream.emplace(mpcsrvio.p1ios[thread_num],
  353. thread_lamport, mpcsrvio.msgs_sent[thread_num],
  354. mpcsrvio.msg_bytes_sent[thread_num]);
  355. }
  356. }
  357. // Sync our per-thread lamport clock with the master one in the
  358. // mpcio. You only need to call this explicitly if your MPCTIO
  359. // outlives your thread (in which case call it after the join), or
  360. // if your threads do interthread communication amongst themselves
  361. // (in which case call it in the sending thread before the send, and
  362. // call it in the receiving thread after the receive).
  363. void MPCTIO::sync_lamport()
  364. {
  365. // Update the mpcio Lamport time to be max of the thread Lamport
  366. // time and what we thought it was before. We use this
  367. // compare_exchange construction in order to atomically
  368. // do the comparison, computation, and replacement
  369. lamport_t old_lamport = mpcio.lamport;
  370. lamport_t new_lamport = thread_lamport;
  371. do {
  372. if (new_lamport < old_lamport) {
  373. new_lamport = old_lamport;
  374. }
  375. // The next line atomically checks if lamport still has
  376. // the value old_lamport; if so, it changes its value to
  377. // new_lamport and returns true (ending the loop). If
  378. // not, it sets old_lamport to the current value of
  379. // lamport, and returns false (continuing the loop so
  380. // that new_lamport can be recomputed based on this new
  381. // value).
  382. } while (!mpcio.lamport.compare_exchange_weak(
  383. old_lamport, new_lamport));
  384. thread_lamport = new_lamport;
  385. }
  386. // Queue up data to the peer or to the server
  387. void MPCTIO::queue_peer(const void *data, size_t len)
  388. {
  389. if (mpcio.player < 2) {
  390. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  391. size_t newmsg = mpcpio.peerios[thread_num].queue(data, len, thread_lamport);
  392. mpcpio.msgs_sent[thread_num] += newmsg;
  393. mpcpio.msg_bytes_sent[thread_num] += len;
  394. }
  395. }
  396. void MPCTIO::queue_server(const void *data, size_t len)
  397. {
  398. if (mpcio.player < 2) {
  399. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  400. size_t newmsg = mpcpio.serverios[thread_num].queue(data, len, thread_lamport);
  401. mpcpio.msgs_sent[thread_num] += newmsg;
  402. mpcpio.msg_bytes_sent[thread_num] += len;
  403. }
  404. }
  405. // Receive data from the peer or to the server
  406. size_t MPCTIO::recv_peer(void *data, size_t len)
  407. {
  408. if (mpcio.player < 2) {
  409. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  410. return mpcpio.peerios[thread_num].recv(data, len, thread_lamport);
  411. }
  412. return 0;
  413. }
  414. size_t MPCTIO::recv_server(void *data, size_t len)
  415. {
  416. if (mpcio.player < 2) {
  417. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  418. return mpcpio.serverios[thread_num].recv(data, len, thread_lamport);
  419. }
  420. return 0;
  421. }
  422. // Queue up data to p0 or p1
  423. void MPCTIO::queue_p0(const void *data, size_t len)
  424. {
  425. if (mpcio.player == 2) {
  426. MPCServerIO &mpcsrvio = static_cast<MPCServerIO&>(mpcio);
  427. size_t newmsg = mpcsrvio.p0ios[thread_num].queue(data, len, thread_lamport);
  428. mpcsrvio.msgs_sent[thread_num] += newmsg;
  429. mpcsrvio.msg_bytes_sent[thread_num] += len;
  430. }
  431. }
  432. void MPCTIO::queue_p1(const void *data, size_t len)
  433. {
  434. if (mpcio.player == 2) {
  435. MPCServerIO &mpcsrvio = static_cast<MPCServerIO&>(mpcio);
  436. size_t newmsg = mpcsrvio.p1ios[thread_num].queue(data, len, thread_lamport);
  437. mpcsrvio.msgs_sent[thread_num] += newmsg;
  438. mpcsrvio.msg_bytes_sent[thread_num] += len;
  439. }
  440. }
  441. // Receive data from p0 or p1
  442. size_t MPCTIO::recv_p0(void *data, size_t len)
  443. {
  444. if (mpcio.player == 2) {
  445. MPCServerIO &mpcsrvio = static_cast<MPCServerIO&>(mpcio);
  446. return mpcsrvio.p0ios[thread_num].recv(data, len, thread_lamport);
  447. }
  448. return 0;
  449. }
  450. size_t MPCTIO::recv_p1(void *data, size_t len)
  451. {
  452. if (mpcio.player == 2) {
  453. MPCServerIO &mpcsrvio = static_cast<MPCServerIO&>(mpcio);
  454. return mpcsrvio.p1ios[thread_num].recv(data, len, thread_lamport);
  455. }
  456. return 0;
  457. }
  458. // Send all queued data for this thread
  459. void MPCTIO::send()
  460. {
  461. if (mpcio.player < 2) {
  462. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  463. mpcpio.peerios[thread_num].send();
  464. mpcpio.serverios[thread_num].send();
  465. } else {
  466. MPCServerIO &mpcsrvio = static_cast<MPCServerIO&>(mpcio);
  467. mpcsrvio.p0ios[thread_num].send();
  468. mpcsrvio.p1ios[thread_num].send();
  469. }
  470. }
  471. // Functions to get precomputed values. If we're in the online
  472. // phase, get them from PreCompStorage. If we're in the
  473. // preprocessing phase, read them from the server.
  474. MultTriple MPCTIO::triple()
  475. {
  476. MultTriple val;
  477. if (mpcio.player < 2) {
  478. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  479. if (mpcpio.preprocessing) {
  480. recv_server(&val, sizeof(val));
  481. } else {
  482. mpcpio.triples[thread_num].get(val);
  483. }
  484. } else if (mpcio.preprocessing) {
  485. // Create triples (X0,Y0,Z0),(X1,Y1,Z1) such that
  486. // (X0*Y1 + Y0*X1) = (Z0+Z1)
  487. value_t X0, Y0, Z0, X1, Y1, Z1;
  488. arc4random_buf(&X0, sizeof(X0));
  489. arc4random_buf(&Y0, sizeof(Y0));
  490. arc4random_buf(&Z0, sizeof(Z0));
  491. arc4random_buf(&X1, sizeof(X1));
  492. arc4random_buf(&Y1, sizeof(Y1));
  493. Z1 = X0 * Y1 + X1 * Y0 - Z0;
  494. MultTriple T0, T1;
  495. T0 = std::make_tuple(X0, Y0, Z0);
  496. T1 = std::make_tuple(X1, Y1, Z1);
  497. queue_p0(&T0, sizeof(T0));
  498. queue_p1(&T1, sizeof(T1));
  499. }
  500. return val;
  501. }
  502. HalfTriple MPCTIO::halftriple()
  503. {
  504. HalfTriple val;
  505. if (mpcio.player < 2) {
  506. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  507. if (mpcpio.preprocessing) {
  508. recv_server(&val, sizeof(val));
  509. } else {
  510. mpcpio.halftriples[thread_num].get(val);
  511. }
  512. } else if (mpcio.preprocessing) {
  513. // Create half-triples (X0,Z0),(Y1,Z1) such that
  514. // X0*Y1 = Z0 + Z1
  515. value_t X0, Z0, Y1, Z1;
  516. arc4random_buf(&X0, sizeof(X0));
  517. arc4random_buf(&Z0, sizeof(Z0));
  518. arc4random_buf(&Y1, sizeof(Y1));
  519. Z1 = X0 * Y1 - Z0;
  520. HalfTriple H0, H1;
  521. H0 = std::make_tuple(X0, Z0);
  522. H1 = std::make_tuple(Y1, Z1);
  523. queue_p0(&H0, sizeof(H0));
  524. queue_p1(&H1, sizeof(H1));
  525. }
  526. return val;
  527. }
  528. SelectTriple MPCTIO::selecttriple()
  529. {
  530. SelectTriple val;
  531. if (mpcio.player < 2) {
  532. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  533. if (mpcpio.preprocessing) {
  534. uint8_t Xbyte;
  535. recv_server(&Xbyte, sizeof(Xbyte));
  536. val.X = Xbyte & 1;
  537. recv_server(&val.Y, sizeof(val.Y));
  538. recv_server(&val.Z, sizeof(val.Z));
  539. } else {
  540. std::cerr << "Attempted to read SelectTriple in online phase\n";
  541. }
  542. } else if (mpcio.preprocessing) {
  543. // Create triples (X0,Y0,Z0),(X1,Y1,Z1) such that
  544. // (X0*Y1 ^ Y0*X1) = (Z0^Z1)
  545. bit_t X0, X1;
  546. DPFnode Y0, Z0, Y1, Z1;
  547. X0 = arc4random() & 1;
  548. arc4random_buf(&Y0, sizeof(Y0));
  549. arc4random_buf(&Z0, sizeof(Z0));
  550. X1 = arc4random() & 1;
  551. arc4random_buf(&Y1, sizeof(Y1));
  552. DPFnode X0ext, X1ext;
  553. // Sign-extend X0 and X1 (so that 0 -> 0000...0 and
  554. // 1 -> 1111...1)
  555. X0ext = if128_mask[X0];
  556. X1ext = if128_mask[X1];
  557. Z1 = ((X0ext & Y1) ^ (X1ext & Y0)) ^ Z0;
  558. queue_p0(&X0, sizeof(X0));
  559. queue_p0(&Y0, sizeof(Y0));
  560. queue_p0(&Z0, sizeof(Z0));
  561. queue_p1(&X1, sizeof(X1));
  562. queue_p1(&Y1, sizeof(Y1));
  563. queue_p1(&Z1, sizeof(Z1));
  564. }
  565. return val;
  566. }
  567. RDPFTriple MPCTIO::rdpftriple(nbits_t depth)
  568. {
  569. RDPFTriple val;
  570. if (!mpcio.preprocessing && mpcio.player <= 2) {
  571. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  572. mpcpio.rdpftriples[thread_num][depth-1].get(val);
  573. }
  574. return val;
  575. }
  576. RDPFPair MPCTIO::rdpfpair(nbits_t depth)
  577. {
  578. RDPFPair val;
  579. if (!mpcio.preprocessing && mpcio.player == 2) {
  580. MPCServerIO &mpcsrvio = static_cast<MPCServerIO&>(mpcio);
  581. mpcsrvio.rdpfpairs[thread_num][depth-1].get(val);
  582. }
  583. return val;
  584. }
  585. // The port number for the P1 -> P0 connection
  586. static const unsigned short port_p1_p0 = 2115;
  587. // The port number for the P2 -> P0 connection
  588. static const unsigned short port_p2_p0 = 2116;
  589. // The port number for the P2 -> P1 connection
  590. static const unsigned short port_p2_p1 = 2117;
  591. void mpcio_setup_computational(unsigned player,
  592. boost::asio::io_context &io_context,
  593. const char *p0addr, // can be NULL when player=0
  594. int num_threads,
  595. std::deque<tcp::socket> &peersocks,
  596. std::deque<tcp::socket> &serversocks)
  597. {
  598. if (player == 0) {
  599. // Listen for connections from P1 and from P2
  600. tcp::acceptor acceptor_p1(io_context,
  601. tcp::endpoint(tcp::v4(), port_p1_p0));
  602. tcp::acceptor acceptor_p2(io_context,
  603. tcp::endpoint(tcp::v4(), port_p2_p0));
  604. peersocks.clear();
  605. serversocks.clear();
  606. for (int i=0;i<num_threads;++i) {
  607. peersocks.emplace_back(io_context);
  608. serversocks.emplace_back(io_context);
  609. }
  610. for (int i=0;i<num_threads;++i) {
  611. tcp::socket peersock = acceptor_p1.accept();
  612. // Read 2 bytes from the socket, which will be the thread
  613. // number
  614. unsigned short thread_num;
  615. boost::asio::read(peersock,
  616. boost::asio::buffer(&thread_num, sizeof(thread_num)));
  617. if (thread_num >= num_threads) {
  618. std::cerr << "Received bad thread number from peer\n";
  619. } else {
  620. peersocks[thread_num] = std::move(peersock);
  621. }
  622. }
  623. for (int i=0;i<num_threads;++i) {
  624. tcp::socket serversock = acceptor_p2.accept();
  625. // Read 2 bytes from the socket, which will be the thread
  626. // number
  627. unsigned short thread_num;
  628. boost::asio::read(serversock,
  629. boost::asio::buffer(&thread_num, sizeof(thread_num)));
  630. if (thread_num >= num_threads) {
  631. std::cerr << "Received bad thread number from server\n";
  632. } else {
  633. serversocks[thread_num] = std::move(serversock);
  634. }
  635. }
  636. } else if (player == 1) {
  637. // Listen for connections from P2, make num_threads connections to P0
  638. tcp::acceptor acceptor_p2(io_context,
  639. tcp::endpoint(tcp::v4(), port_p2_p1));
  640. tcp::resolver resolver(io_context);
  641. boost::system::error_code err;
  642. peersocks.clear();
  643. serversocks.clear();
  644. for (int i=0;i<num_threads;++i) {
  645. serversocks.emplace_back(io_context);
  646. }
  647. for (unsigned short thread_num = 0; thread_num < num_threads; ++thread_num) {
  648. tcp::socket peersock(io_context);
  649. while(1) {
  650. boost::asio::connect(peersock,
  651. resolver.resolve(p0addr, std::to_string(port_p1_p0)), err);
  652. if (!err) break;
  653. std::cerr << "Connection to p0 refused, will retry.\n";
  654. sleep(1);
  655. }
  656. // Write 2 bytes to the socket indicating which thread
  657. // number this socket is for
  658. boost::asio::write(peersock,
  659. boost::asio::buffer(&thread_num, sizeof(thread_num)));
  660. peersocks.push_back(std::move(peersock));
  661. }
  662. for (int i=0;i<num_threads;++i) {
  663. tcp::socket serversock = acceptor_p2.accept();
  664. // Read 2 bytes from the socket, which will be the thread
  665. // number
  666. unsigned short thread_num;
  667. boost::asio::read(serversock,
  668. boost::asio::buffer(&thread_num, sizeof(thread_num)));
  669. if (thread_num >= num_threads) {
  670. std::cerr << "Received bad thread number from server\n";
  671. } else {
  672. serversocks[thread_num] = std::move(serversock);
  673. }
  674. }
  675. } else {
  676. std::cerr << "Invalid player number passed to mpcio_setup_computational\n";
  677. }
  678. }
  679. void mpcio_setup_server(boost::asio::io_context &io_context,
  680. const char *p0addr, const char *p1addr, int num_threads,
  681. std::deque<tcp::socket> &p0socks,
  682. std::deque<tcp::socket> &p1socks)
  683. {
  684. // Make connections to P0 and P1
  685. tcp::resolver resolver(io_context);
  686. boost::system::error_code err;
  687. p0socks.clear();
  688. p1socks.clear();
  689. for (unsigned short thread_num = 0; thread_num < num_threads; ++thread_num) {
  690. tcp::socket p0sock(io_context);
  691. while(1) {
  692. boost::asio::connect(p0sock,
  693. resolver.resolve(p0addr, std::to_string(port_p2_p0)), err);
  694. if (!err) break;
  695. std::cerr << "Connection to p0 refused, will retry.\n";
  696. sleep(1);
  697. }
  698. // Write 2 bytes to the socket indicating which thread
  699. // number this socket is for
  700. boost::asio::write(p0sock,
  701. boost::asio::buffer(&thread_num, sizeof(thread_num)));
  702. p0socks.push_back(std::move(p0sock));
  703. }
  704. for (unsigned short thread_num = 0; thread_num < num_threads; ++thread_num) {
  705. tcp::socket p1sock(io_context);
  706. while(1) {
  707. boost::asio::connect(p1sock,
  708. resolver.resolve(p1addr, std::to_string(port_p2_p1)), err);
  709. if (!err) break;
  710. std::cerr << "Connection to p1 refused, will retry.\n";
  711. sleep(1);
  712. }
  713. // Write 2 bytes to the socket indicating which thread
  714. // number this socket is for
  715. boost::asio::write(p1sock,
  716. boost::asio::buffer(&thread_num, sizeof(thread_num)));
  717. p1socks.push_back(std::move(p1sock));
  718. }
  719. }