mpcio.cpp 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025
  1. #include <sys/time.h> // getrusage
  2. #include <sys/resource.h> // getrusage
  3. #include "mpcio.hpp"
  4. #include "rdpf.hpp"
  5. #include "cdpf.hpp"
  6. #include "bitutils.hpp"
  7. #include "coroutine.hpp"
  8. void MPCSingleIO::async_send_from_msgqueue()
  9. {
  10. #ifdef SEND_LAMPORT_CLOCKS
  11. std::vector<boost::asio::const_buffer> tosend;
  12. tosend.push_back(boost::asio::buffer(messagequeue.front().header));
  13. tosend.push_back(boost::asio::buffer(messagequeue.front().message));
  14. #endif
  15. boost::asio::async_write(sock,
  16. #ifdef SEND_LAMPORT_CLOCKS
  17. tosend,
  18. #else
  19. boost::asio::buffer(messagequeue.front()),
  20. #endif
  21. [&](boost::system::error_code ec, std::size_t amt){
  22. messagequeuelock.lock();
  23. messagequeue.pop();
  24. if (messagequeue.size() > 0) {
  25. async_send_from_msgqueue();
  26. }
  27. messagequeuelock.unlock();
  28. });
  29. }
  30. size_t MPCSingleIO::queue(const void *data, size_t len, lamport_t lamport)
  31. {
  32. // Is this a new message?
  33. size_t newmsg = 0;
  34. dataqueue.append((const char *)data, len);
  35. // If this is the first queue() since the last explicit send(),
  36. // which we'll know because message_lamport will be nullopt, set
  37. // message_lamport to the current Lamport clock. Note that the
  38. // boolean test tests whether message_lamport is nullopt, not
  39. // whether its value is zero.
  40. if (!message_lamport) {
  41. message_lamport = lamport;
  42. newmsg = 1;
  43. }
  44. #ifdef VERBOSE_COMMS
  45. printf("Queue %s.%d len=%lu lamp=%u: ", dest.c_str(), thread_num,
  46. len, message_lamport.value());
  47. for (size_t i=0;i<len;++i) {
  48. printf("%02x", ((const unsigned char*)data)[i]);
  49. }
  50. printf("\n");
  51. #endif
  52. // If we already have some full packets worth of data, may as
  53. // well send it.
  54. if (dataqueue.size() > 28800) {
  55. send(true);
  56. }
  57. return newmsg;
  58. }
  59. void MPCSingleIO::send(bool implicit_send)
  60. {
  61. size_t thissize = dataqueue.size();
  62. // Ignore spurious calls to send(), except for resetting
  63. // message_lamport if this was an explicit send().
  64. if (thissize == 0) {
  65. #ifdef SEND_LAMPORT_CLOCKS
  66. // If this was an explicit send(), reset the message_lamport so
  67. // that it gets updated at the next queue().
  68. if (!implicit_send) {
  69. message_lamport.reset();
  70. }
  71. #endif
  72. return;
  73. }
  74. #ifdef RECORD_IOTRACE
  75. iotrace.push_back(thissize);
  76. #endif
  77. messagequeuelock.lock();
  78. // Move the current message to send into the message queue (this
  79. // moves a pointer to the data, not copying the data itself)
  80. #ifdef SEND_LAMPORT_CLOCKS
  81. messagequeue.emplace(std::move(dataqueue),
  82. message_lamport.value());
  83. // If this was an explicit send(), reset the message_lamport so
  84. // that it gets updated at the next queue().
  85. if (!implicit_send) {
  86. message_lamport.reset();
  87. }
  88. #else
  89. messagequeue.emplace(std::move(dataqueue));
  90. #endif
  91. // If this is now the first thing in the message queue, launch
  92. // an async_write to write it
  93. if (messagequeue.size() == 1) {
  94. async_send_from_msgqueue();
  95. }
  96. messagequeuelock.unlock();
  97. }
  98. size_t MPCSingleIO::recv(void *data, size_t len, lamport_t &lamport)
  99. {
  100. #ifdef VERBOSE_COMMS
  101. struct timeval tv;
  102. gettimeofday(&tv, NULL);
  103. size_t orig_len = len;
  104. printf("%lu.%06lu: Recv %s.%d len=%lu lamp=%u ", tv.tv_sec,
  105. tv.tv_usec, dest.c_str(), thread_num, len, lamport);
  106. #endif
  107. #ifdef SEND_LAMPORT_CLOCKS
  108. char *cdata = (char *)data;
  109. size_t res = 0;
  110. while (len > 0) {
  111. while (recvdataremain == 0) {
  112. // Read a new header
  113. char hdr[sizeof(uint32_t) + sizeof(lamport_t)];
  114. uint32_t datalen;
  115. lamport_t recv_lamport;
  116. boost::asio::read(sock, boost::asio::buffer(hdr, sizeof(hdr)));
  117. memmove(&datalen, hdr, sizeof(datalen));
  118. memmove(&recv_lamport, hdr+sizeof(datalen), sizeof(lamport_t));
  119. lamport_t new_lamport = recv_lamport + 1;
  120. if (lamport < new_lamport) {
  121. lamport = new_lamport;
  122. }
  123. if (datalen > 0) {
  124. recvdata.resize(datalen, '\0');
  125. boost::asio::read(sock, boost::asio::buffer(recvdata));
  126. recvdataremain = datalen;
  127. }
  128. }
  129. size_t amttoread = len;
  130. if (amttoread > recvdataremain) {
  131. amttoread = recvdataremain;
  132. }
  133. memmove(cdata, recvdata.data()+recvdata.size()-recvdataremain,
  134. amttoread);
  135. cdata += amttoread;
  136. len -= amttoread;
  137. recvdataremain -= amttoread;
  138. res += amttoread;
  139. }
  140. #else
  141. size_t res = boost::asio::read(sock, boost::asio::buffer(data, len));
  142. #endif
  143. #ifdef VERBOSE_COMMS
  144. gettimeofday(&tv, NULL);
  145. printf("nlamp=%u %lu.%06lu: ", lamport, tv.tv_sec, tv.tv_usec);
  146. for (size_t i=0;i<orig_len;++i) {
  147. printf("%02x", ((const unsigned char*)data)[i]);
  148. }
  149. printf("\n");
  150. #endif
  151. #ifdef RECORD_IOTRACE
  152. iotrace.push_back(-(ssize_t(res)));
  153. #endif
  154. return res;
  155. }
  156. #ifdef RECORD_IOTRACE
  157. void MPCSingleIO::dumptrace(std::ostream &os, const char *label)
  158. {
  159. if (label) {
  160. os << label << " ";
  161. }
  162. os << "IO trace:";
  163. for (auto& s: iotrace) {
  164. os << " " << s;
  165. }
  166. os << "\n";
  167. }
  168. #endif
  169. void MPCIO::reset_stats()
  170. {
  171. msgs_sent.clear();
  172. msg_bytes_sent.clear();
  173. aes_ops.clear();
  174. for (size_t i=0; i<num_threads; ++i) {
  175. msgs_sent.push_back(0);
  176. msg_bytes_sent.push_back(0);
  177. aes_ops.push_back(0);
  178. }
  179. steady_start = boost::chrono::steady_clock::now();
  180. cpu_start = boost::chrono::process_cpu_clock::now();
  181. }
  182. // Report the memory usage
  183. void MPCIO::dump_memusage(std::ostream &os)
  184. {
  185. struct rusage ru;
  186. getrusage(RUSAGE_SELF, &ru);
  187. os << "Mem: " << ru.ru_maxrss << " KiB\n";
  188. }
  189. void MPCIO::dump_stats(std::ostream &os)
  190. {
  191. size_t tot_msgs_sent = 0;
  192. size_t tot_msg_bytes_sent = 0;
  193. size_t tot_aes_ops = 0;
  194. for (auto& n : msgs_sent) {
  195. tot_msgs_sent += n;
  196. }
  197. for (auto& n : msg_bytes_sent) {
  198. tot_msg_bytes_sent += n;
  199. }
  200. for (auto& n : aes_ops) {
  201. tot_aes_ops += n;
  202. }
  203. auto steady_elapsed =
  204. boost::chrono::steady_clock::now() - steady_start;
  205. auto cpu_elapsed =
  206. boost::chrono::process_cpu_clock::now() - cpu_start;
  207. os << tot_msgs_sent << " messages sent\n";
  208. os << tot_msg_bytes_sent << " message bytes sent\n";
  209. os << lamport << " Lamport clock (latencies)\n";
  210. os << tot_aes_ops << " local AES operations\n";
  211. os << boost::chrono::duration_cast
  212. <boost::chrono::milliseconds>(steady_elapsed) <<
  213. " wall clock time\n";
  214. os << cpu_elapsed << " {real;user;system}\n";
  215. dump_memusage(os);
  216. }
  217. // TVA is a tuple of vectors of arrays of PreCompStorage
  218. template <nbits_t WIDTH, typename TVA>
  219. static void rdpfstorage_init(TVA &storage, unsigned player,
  220. ProcessingMode mode, unsigned num_threads, bool incremental)
  221. {
  222. auto &VA = std::get<WIDTH-1>(storage);
  223. VA.resize(num_threads);
  224. char prefix[12];
  225. strcpy(prefix, incremental ? "irdpf" : "rdpf");
  226. if (WIDTH > 1) {
  227. sprintf(prefix+strlen(prefix), "%d_", WIDTH);
  228. }
  229. for (unsigned i=0; i<num_threads; ++i) {
  230. for (unsigned depth=1; depth<=ADDRESS_MAX_BITS; ++depth) {
  231. VA[i][depth-1].init(player, mode, prefix, i, depth, WIDTH);
  232. }
  233. }
  234. }
  235. // TVA is a tuple of vectors of arrays of PreCompStorage
  236. template <nbits_t WIDTH, typename TVA>
  237. static void rdpfstorage_dumpstats(std::ostream &os, TVA &storage,
  238. size_t thread_num, bool incremental)
  239. {
  240. auto &VA = std::get<WIDTH-1>(storage);
  241. for (nbits_t depth=1; depth<=ADDRESS_MAX_BITS; ++depth) {
  242. size_t cnt = VA[thread_num][depth-1].get_stats();
  243. if (cnt > 0) {
  244. os << (incremental ? " i" : " r") << int(depth);
  245. if (WIDTH > 1) {
  246. os << "." << int(WIDTH);
  247. }
  248. os << ":" << cnt;
  249. }
  250. }
  251. }
  252. // TVA is a tuple of vectors of arrays of PreCompStorage
  253. template <nbits_t WIDTH, typename TVA>
  254. static void rdpfstorage_resetstats(TVA &storage, size_t thread_num)
  255. {
  256. auto &VA = std::get<WIDTH-1>(storage);
  257. for (nbits_t depth=1; depth<=ADDRESS_MAX_BITS; ++depth) {
  258. VA[thread_num][depth-1].reset_stats();
  259. }
  260. }
  261. MPCPeerIO::MPCPeerIO(unsigned player, ProcessingMode mode,
  262. std::deque<tcp::socket> &peersocks,
  263. std::deque<tcp::socket> &serversocks) :
  264. MPCIO(player, mode, peersocks.size())
  265. {
  266. unsigned num_threads = unsigned(peersocks.size());
  267. for (unsigned i=0; i<num_threads; ++i) {
  268. multtriples.emplace_back(player, mode, "mults", i);
  269. }
  270. for (unsigned i=0; i<num_threads; ++i) {
  271. halftriples.emplace_back(player, mode, "halves", i);
  272. }
  273. for (unsigned i=0; i<num_threads; ++i) {
  274. andtriples.emplace_back(player, mode, "ands", i);
  275. }
  276. for (unsigned i=0; i<num_threads; ++i) {
  277. valselecttriples.emplace_back(player, mode, "selects", i);
  278. }
  279. rdpfstorage_init<1>(rdpftriples, player, mode, num_threads, false);
  280. rdpfstorage_init<2>(rdpftriples, player, mode, num_threads, false);
  281. rdpfstorage_init<3>(rdpftriples, player, mode, num_threads, false);
  282. rdpfstorage_init<4>(rdpftriples, player, mode, num_threads, false);
  283. rdpfstorage_init<5>(rdpftriples, player, mode, num_threads, false);
  284. rdpfstorage_init<1>(irdpftriples, player, mode, num_threads, true);
  285. rdpfstorage_init<2>(irdpftriples, player, mode, num_threads, true);
  286. rdpfstorage_init<3>(irdpftriples, player, mode, num_threads, true);
  287. rdpfstorage_init<4>(irdpftriples, player, mode, num_threads, true);
  288. rdpfstorage_init<5>(irdpftriples, player, mode, num_threads, true);
  289. for (unsigned i=0; i<num_threads; ++i) {
  290. cdpfs.emplace_back(player, mode, "cdpf", i);
  291. }
  292. for (unsigned i=0; i<num_threads; ++i) {
  293. peerios.emplace_back(std::move(peersocks[i]), "peer", i);
  294. }
  295. for (unsigned i=0; i<num_threads; ++i) {
  296. serverios.emplace_back(std::move(serversocks[i]), "srv", i);
  297. }
  298. }
  299. void MPCPeerIO::dump_precomp_stats(std::ostream &os)
  300. {
  301. for (size_t i=0; i<multtriples.size(); ++i) {
  302. size_t cnt;
  303. if (i > 0) {
  304. os << " ";
  305. }
  306. os << "T" << i;
  307. cnt = multtriples[i].get_stats();
  308. if (cnt > 0) {
  309. os << " m:" << cnt;
  310. }
  311. cnt = halftriples[i].get_stats();
  312. if (cnt > 0) {
  313. os << " h:" << cnt;
  314. }
  315. cnt = andtriples[i].get_stats();
  316. if (cnt > 0) {
  317. os << " a:" << cnt;
  318. }
  319. cnt = valselecttriples[i].get_stats();
  320. if (cnt > 0) {
  321. os << " s:" << cnt;
  322. }
  323. rdpfstorage_dumpstats<1>(os, rdpftriples, i, false);
  324. rdpfstorage_dumpstats<2>(os, rdpftriples, i, false);
  325. rdpfstorage_dumpstats<3>(os, rdpftriples, i, false);
  326. rdpfstorage_dumpstats<4>(os, rdpftriples, i, false);
  327. rdpfstorage_dumpstats<5>(os, rdpftriples, i, false);
  328. rdpfstorage_dumpstats<1>(os, irdpftriples, i, true);
  329. rdpfstorage_dumpstats<2>(os, irdpftriples, i, true);
  330. rdpfstorage_dumpstats<3>(os, irdpftriples, i, true);
  331. rdpfstorage_dumpstats<4>(os, irdpftriples, i, true);
  332. rdpfstorage_dumpstats<5>(os, irdpftriples, i, true);
  333. cnt = cdpfs[i].get_stats();
  334. if (cnt > 0) {
  335. os << " c:" << cnt;
  336. }
  337. }
  338. os << "\n";
  339. }
  340. void MPCPeerIO::reset_precomp_stats()
  341. {
  342. for (size_t i=0; i<multtriples.size(); ++i) {
  343. multtriples[i].reset_stats();
  344. halftriples[i].reset_stats();
  345. andtriples[i].reset_stats();
  346. valselecttriples[i].reset_stats();
  347. rdpfstorage_resetstats<1>(rdpftriples, i);
  348. rdpfstorage_resetstats<2>(rdpftriples, i);
  349. rdpfstorage_resetstats<3>(rdpftriples, i);
  350. rdpfstorage_resetstats<4>(rdpftriples, i);
  351. rdpfstorage_resetstats<5>(rdpftriples, i);
  352. rdpfstorage_resetstats<1>(irdpftriples, i);
  353. rdpfstorage_resetstats<2>(irdpftriples, i);
  354. rdpfstorage_resetstats<3>(irdpftriples, i);
  355. rdpfstorage_resetstats<4>(irdpftriples, i);
  356. rdpfstorage_resetstats<5>(irdpftriples, i);
  357. }
  358. }
  359. void MPCPeerIO::dump_stats(std::ostream &os)
  360. {
  361. MPCIO::dump_stats(os);
  362. os << "Precomputed values used: ";
  363. dump_precomp_stats(os);
  364. }
  365. MPCServerIO::MPCServerIO(ProcessingMode mode,
  366. std::deque<tcp::socket> &p0socks,
  367. std::deque<tcp::socket> &p1socks) :
  368. MPCIO(2, mode, p0socks.size())
  369. {
  370. rdpfstorage_init<1>(rdpfpairs, player, mode, num_threads, false);
  371. rdpfstorage_init<2>(rdpfpairs, player, mode, num_threads, false);
  372. rdpfstorage_init<3>(rdpfpairs, player, mode, num_threads, false);
  373. rdpfstorage_init<4>(rdpfpairs, player, mode, num_threads, false);
  374. rdpfstorage_init<5>(rdpfpairs, player, mode, num_threads, false);
  375. rdpfstorage_init<1>(irdpfpairs, player, mode, num_threads, true);
  376. rdpfstorage_init<2>(irdpfpairs, player, mode, num_threads, true);
  377. rdpfstorage_init<3>(irdpfpairs, player, mode, num_threads, true);
  378. rdpfstorage_init<4>(irdpfpairs, player, mode, num_threads, true);
  379. rdpfstorage_init<5>(irdpfpairs, player, mode, num_threads, true);
  380. for (unsigned i=0; i<num_threads; ++i) {
  381. p0ios.emplace_back(std::move(p0socks[i]), "p0", i);
  382. }
  383. for (unsigned i=0; i<num_threads; ++i) {
  384. p1ios.emplace_back(std::move(p1socks[i]), "p1", i);
  385. }
  386. }
  387. void MPCServerIO::dump_precomp_stats(std::ostream &os)
  388. {
  389. for (size_t i=0; i<std::get<0>(rdpfpairs).size(); ++i) {
  390. if (i > 0) {
  391. os << " ";
  392. }
  393. os << "T" << i;
  394. rdpfstorage_dumpstats<1>(os, rdpfpairs, i, false);
  395. rdpfstorage_dumpstats<2>(os, rdpfpairs, i, false);
  396. rdpfstorage_dumpstats<3>(os, rdpfpairs, i, false);
  397. rdpfstorage_dumpstats<4>(os, rdpfpairs, i, false);
  398. rdpfstorage_dumpstats<5>(os, rdpfpairs, i, false);
  399. rdpfstorage_dumpstats<1>(os, irdpfpairs, i, true);
  400. rdpfstorage_dumpstats<2>(os, irdpfpairs, i, true);
  401. rdpfstorage_dumpstats<3>(os, irdpfpairs, i, true);
  402. rdpfstorage_dumpstats<4>(os, irdpfpairs, i, true);
  403. rdpfstorage_dumpstats<5>(os, irdpfpairs, i, true);
  404. }
  405. os << "\n";
  406. }
  407. void MPCServerIO::reset_precomp_stats()
  408. {
  409. for (size_t i=0; i<std::get<0>(rdpfpairs).size(); ++i) {
  410. rdpfstorage_resetstats<1>(rdpfpairs, i);
  411. rdpfstorage_resetstats<2>(rdpfpairs, i);
  412. rdpfstorage_resetstats<3>(rdpfpairs, i);
  413. rdpfstorage_resetstats<4>(rdpfpairs, i);
  414. rdpfstorage_resetstats<5>(rdpfpairs, i);
  415. rdpfstorage_resetstats<1>(irdpfpairs, i);
  416. rdpfstorage_resetstats<2>(irdpfpairs, i);
  417. rdpfstorage_resetstats<3>(irdpfpairs, i);
  418. rdpfstorage_resetstats<4>(irdpfpairs, i);
  419. rdpfstorage_resetstats<5>(irdpfpairs, i);
  420. }
  421. }
  422. void MPCServerIO::dump_stats(std::ostream &os)
  423. {
  424. MPCIO::dump_stats(os);
  425. os << "Precomputed values used: ";
  426. dump_precomp_stats(os);
  427. }
  428. MPCTIO::MPCTIO(MPCIO &mpcio, int thread_num, int num_threads) :
  429. thread_num(thread_num), local_cpu_nthreads(num_threads),
  430. communication_nthreads(num_threads),
  431. thread_lamport(mpcio.lamport), mpcio(mpcio),
  432. #ifdef VERBOSE_COMMS
  433. round_num(0),
  434. #endif
  435. last_andtriple_bits_remaining(0),
  436. remaining_nodesselecttriples(0)
  437. {
  438. if (mpcio.player < 2) {
  439. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  440. peer_iostream.emplace(mpcpio.peerios[thread_num],
  441. thread_lamport, mpcpio.msgs_sent[thread_num],
  442. mpcpio.msg_bytes_sent[thread_num]);
  443. server_iostream.emplace(mpcpio.serverios[thread_num],
  444. thread_lamport, mpcpio.msgs_sent[thread_num],
  445. mpcpio.msg_bytes_sent[thread_num]);
  446. } else {
  447. MPCServerIO &mpcsrvio = static_cast<MPCServerIO&>(mpcio);
  448. p0_iostream.emplace(mpcsrvio.p0ios[thread_num],
  449. thread_lamport, mpcsrvio.msgs_sent[thread_num],
  450. mpcsrvio.msg_bytes_sent[thread_num]);
  451. p1_iostream.emplace(mpcsrvio.p1ios[thread_num],
  452. thread_lamport, mpcsrvio.msgs_sent[thread_num],
  453. mpcsrvio.msg_bytes_sent[thread_num]);
  454. }
  455. }
  456. // Sync our per-thread lamport clock with the master one in the
  457. // mpcio. You only need to call this explicitly if your MPCTIO
  458. // outlives your thread (in which case call it after the join), or
  459. // if your threads do interthread communication amongst themselves
  460. // (in which case call it in the sending thread before the send, and
  461. // call it in the receiving thread after the receive).
  462. void MPCTIO::sync_lamport()
  463. {
  464. // Update the mpcio Lamport time to be max of the thread Lamport
  465. // time and what we thought it was before. We use this
  466. // compare_exchange construction in order to atomically
  467. // do the comparison, computation, and replacement
  468. lamport_t old_lamport = mpcio.lamport;
  469. lamport_t new_lamport = thread_lamport;
  470. do {
  471. if (new_lamport < old_lamport) {
  472. new_lamport = old_lamport;
  473. }
  474. // The next line atomically checks if lamport still has
  475. // the value old_lamport; if so, it changes its value to
  476. // new_lamport and returns true (ending the loop). If
  477. // not, it sets old_lamport to the current value of
  478. // lamport, and returns false (continuing the loop so
  479. // that new_lamport can be recomputed based on this new
  480. // value).
  481. } while (!mpcio.lamport.compare_exchange_weak(
  482. old_lamport, new_lamport));
  483. thread_lamport = new_lamport;
  484. }
  485. // Only call this if you can be sure that there are no outstanding
  486. // messages in flight, you can call it on all existing MPCTIOs, and
  487. // you really want to reset the Lamport clock in the midding of a
  488. // run.
  489. void MPCTIO::reset_lamport()
  490. {
  491. // Reset both our own Lamport clock and the parent MPCIO's
  492. thread_lamport = 0;
  493. mpcio.lamport = 0;
  494. }
  495. // Queue up data to the peer or to the server
  496. void MPCTIO::queue_peer(const void *data, size_t len)
  497. {
  498. if (mpcio.player < 2) {
  499. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  500. size_t newmsg = mpcpio.peerios[thread_num].queue(data, len, thread_lamport);
  501. mpcpio.msgs_sent[thread_num] += newmsg;
  502. mpcpio.msg_bytes_sent[thread_num] += len;
  503. }
  504. }
  505. void MPCTIO::queue_server(const void *data, size_t len)
  506. {
  507. if (mpcio.player < 2) {
  508. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  509. size_t newmsg = mpcpio.serverios[thread_num].queue(data, len, thread_lamport);
  510. mpcpio.msgs_sent[thread_num] += newmsg;
  511. mpcpio.msg_bytes_sent[thread_num] += len;
  512. }
  513. }
  514. // Receive data from the peer or to the server
  515. size_t MPCTIO::recv_peer(void *data, size_t len)
  516. {
  517. if (mpcio.player < 2) {
  518. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  519. return mpcpio.peerios[thread_num].recv(data, len, thread_lamport);
  520. }
  521. return 0;
  522. }
  523. size_t MPCTIO::recv_server(void *data, size_t len)
  524. {
  525. if (mpcio.player < 2) {
  526. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  527. return mpcpio.serverios[thread_num].recv(data, len, thread_lamport);
  528. }
  529. return 0;
  530. }
  531. // Queue up data to p0 or p1
  532. void MPCTIO::queue_p0(const void *data, size_t len)
  533. {
  534. if (mpcio.player == 2) {
  535. MPCServerIO &mpcsrvio = static_cast<MPCServerIO&>(mpcio);
  536. size_t newmsg = mpcsrvio.p0ios[thread_num].queue(data, len, thread_lamport);
  537. mpcsrvio.msgs_sent[thread_num] += newmsg;
  538. mpcsrvio.msg_bytes_sent[thread_num] += len;
  539. }
  540. }
  541. void MPCTIO::queue_p1(const void *data, size_t len)
  542. {
  543. if (mpcio.player == 2) {
  544. MPCServerIO &mpcsrvio = static_cast<MPCServerIO&>(mpcio);
  545. size_t newmsg = mpcsrvio.p1ios[thread_num].queue(data, len, thread_lamport);
  546. mpcsrvio.msgs_sent[thread_num] += newmsg;
  547. mpcsrvio.msg_bytes_sent[thread_num] += len;
  548. }
  549. }
  550. // Receive data from p0 or p1
  551. size_t MPCTIO::recv_p0(void *data, size_t len)
  552. {
  553. if (mpcio.player == 2) {
  554. MPCServerIO &mpcsrvio = static_cast<MPCServerIO&>(mpcio);
  555. return mpcsrvio.p0ios[thread_num].recv(data, len, thread_lamport);
  556. }
  557. return 0;
  558. }
  559. size_t MPCTIO::recv_p1(void *data, size_t len)
  560. {
  561. if (mpcio.player == 2) {
  562. MPCServerIO &mpcsrvio = static_cast<MPCServerIO&>(mpcio);
  563. return mpcsrvio.p1ios[thread_num].recv(data, len, thread_lamport);
  564. }
  565. return 0;
  566. }
  567. // Send all queued data for this thread
  568. void MPCTIO::send()
  569. {
  570. #ifdef VERBOSE_COMMS
  571. struct timeval tv;
  572. gettimeofday(&tv, NULL);
  573. printf("%lu.%06lu: Thread %u sending round %lu\n", tv.tv_sec,
  574. tv.tv_usec, thread_num, ++round_num);
  575. #endif
  576. if (mpcio.player < 2) {
  577. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  578. mpcpio.peerios[thread_num].send();
  579. mpcpio.serverios[thread_num].send();
  580. } else {
  581. MPCServerIO &mpcsrvio = static_cast<MPCServerIO&>(mpcio);
  582. mpcsrvio.p0ios[thread_num].send();
  583. mpcsrvio.p1ios[thread_num].send();
  584. }
  585. }
  586. // Functions to get precomputed values. If we're in the online
  587. // phase, get them from PreCompStorage. If we're in the
  588. // preprocessing or online-only phase, read them from the server.
  589. MultTriple MPCTIO::multtriple(yield_t &yield)
  590. {
  591. MultTriple val;
  592. if (mpcio.player < 2) {
  593. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  594. if (mpcpio.mode != MODE_ONLINE) {
  595. yield();
  596. recv_server(&val, sizeof(val));
  597. mpcpio.multtriples[thread_num].inc();
  598. } else {
  599. mpcpio.multtriples[thread_num].get(val);
  600. }
  601. } else if (mpcio.mode != MODE_ONLINE) {
  602. // Create multiplication triples (X0,Y0,Z0),(X1,Y1,Z1) such that
  603. // (X0*Y1 + Y0*X1) = (Z0+Z1)
  604. value_t X0, Y0, Z0, X1, Y1, Z1;
  605. arc4random_buf(&X0, sizeof(X0));
  606. arc4random_buf(&Y0, sizeof(Y0));
  607. arc4random_buf(&Z0, sizeof(Z0));
  608. arc4random_buf(&X1, sizeof(X1));
  609. arc4random_buf(&Y1, sizeof(Y1));
  610. Z1 = X0 * Y1 + X1 * Y0 - Z0;
  611. MultTriple T0, T1;
  612. T0 = std::make_tuple(X0, Y0, Z0);
  613. T1 = std::make_tuple(X1, Y1, Z1);
  614. queue_p0(&T0, sizeof(T0));
  615. queue_p1(&T1, sizeof(T1));
  616. yield();
  617. }
  618. return val;
  619. }
  620. // When halftriple() is used internally to another preprocessing
  621. // operation, don't tally it, so that it doesn't appear sepearately in
  622. // the stats from the preprocessing operation that invoked it
  623. HalfTriple MPCTIO::halftriple(yield_t &yield, bool tally)
  624. {
  625. HalfTriple val;
  626. if (mpcio.player < 2) {
  627. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  628. if (mpcpio.mode != MODE_ONLINE) {
  629. yield();
  630. recv_server(&val, sizeof(val));
  631. if (tally) {
  632. mpcpio.halftriples[thread_num].inc();
  633. }
  634. } else {
  635. mpcpio.halftriples[thread_num].get(val);
  636. }
  637. } else if (mpcio.mode != MODE_ONLINE) {
  638. // Create half-triples (X0,Z0),(Y1,Z1) such that
  639. // X0*Y1 = Z0 + Z1
  640. value_t X0, Z0, Y1, Z1;
  641. arc4random_buf(&X0, sizeof(X0));
  642. arc4random_buf(&Z0, sizeof(Z0));
  643. arc4random_buf(&Y1, sizeof(Y1));
  644. Z1 = X0 * Y1 - Z0;
  645. HalfTriple H0, H1;
  646. H0 = std::make_tuple(X0, Z0);
  647. H1 = std::make_tuple(Y1, Z1);
  648. queue_p0(&H0, sizeof(H0));
  649. queue_p1(&H1, sizeof(H1));
  650. yield();
  651. }
  652. return val;
  653. }
  654. MultTriple MPCTIO::andtriple(yield_t &yield)
  655. {
  656. AndTriple val;
  657. if (mpcio.player < 2) {
  658. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  659. if (mpcpio.mode != MODE_ONLINE) {
  660. yield();
  661. recv_server(&val, sizeof(val));
  662. mpcpio.andtriples[thread_num].inc();
  663. } else {
  664. mpcpio.andtriples[thread_num].get(val);
  665. }
  666. } else if (mpcio.mode != MODE_ONLINE) {
  667. // Create AND triples (X0,Y0,Z0),(X1,Y1,Z1) such that
  668. // (X0&Y1 ^ Y0&X1) = (Z0^Z1)
  669. value_t X0, Y0, Z0, X1, Y1, Z1;
  670. arc4random_buf(&X0, sizeof(X0));
  671. arc4random_buf(&Y0, sizeof(Y0));
  672. arc4random_buf(&Z0, sizeof(Z0));
  673. arc4random_buf(&X1, sizeof(X1));
  674. arc4random_buf(&Y1, sizeof(Y1));
  675. Z1 = (X0 & Y1) ^ (X1 & Y0) ^ Z0;
  676. AndTriple T0, T1;
  677. T0 = std::make_tuple(X0, Y0, Z0);
  678. T1 = std::make_tuple(X1, Y1, Z1);
  679. queue_p0(&T0, sizeof(T0));
  680. queue_p1(&T1, sizeof(T1));
  681. yield();
  682. }
  683. return val;
  684. }
  685. void MPCTIO::request_nodeselecttriples(yield_t &yield, size_t num)
  686. {
  687. if (mpcio.player < 2) {
  688. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  689. if (mpcpio.mode != MODE_ONLINE) {
  690. yield();
  691. for (size_t i=0; i<num; ++i) {
  692. SelectTriple<DPFnode> v;
  693. uint8_t Xbyte;
  694. recv_server(&Xbyte, sizeof(Xbyte));
  695. v.X = Xbyte & 1;
  696. recv_server(&v.Y, sizeof(v.Y));
  697. recv_server(&v.Z, sizeof(v.Z));
  698. queued_nodeselecttriples.push_back(v);
  699. }
  700. remaining_nodesselecttriples += num;
  701. } else {
  702. std::cerr << "Attempted to read SelectTriple<DPFnode> in online phase\n";
  703. }
  704. } else if (mpcio.mode != MODE_ONLINE) {
  705. for (size_t i=0; i<num; ++i) {
  706. // Create triples (X0,Y0,Z0),(X1,Y1,Z1) such that
  707. // (X0*Y1 ^ Y0*X1) = (Z0^Z1)
  708. bit_t X0, X1;
  709. DPFnode Y0, Z0, Y1, Z1;
  710. X0 = arc4random() & 1;
  711. arc4random_buf(&Y0, sizeof(Y0));
  712. arc4random_buf(&Z0, sizeof(Z0));
  713. X1 = arc4random() & 1;
  714. arc4random_buf(&Y1, sizeof(Y1));
  715. DPFnode X0ext, X1ext;
  716. // Sign-extend X0 and X1 (so that 0 -> 0000...0 and
  717. // 1 -> 1111...1)
  718. X0ext = if128_mask[X0];
  719. X1ext = if128_mask[X1];
  720. Z1 = ((X0ext & Y1) ^ (X1ext & Y0)) ^ Z0;
  721. queue_p0(&X0, sizeof(X0));
  722. queue_p0(&Y0, sizeof(Y0));
  723. queue_p0(&Z0, sizeof(Z0));
  724. queue_p1(&X1, sizeof(X1));
  725. queue_p1(&Y1, sizeof(Y1));
  726. queue_p1(&Z1, sizeof(Z1));
  727. }
  728. yield();
  729. remaining_nodesselecttriples += num;
  730. }
  731. }
  732. SelectTriple<DPFnode> MPCTIO::nodeselecttriple(yield_t &yield)
  733. {
  734. SelectTriple<DPFnode> val;
  735. if (remaining_nodesselecttriples == 0) {
  736. request_nodeselecttriples(yield, 1);
  737. }
  738. if (mpcio.player < 2) {
  739. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  740. if (mpcpio.mode != MODE_ONLINE) {
  741. val = queued_nodeselecttriples.front();
  742. queued_nodeselecttriples.pop_front();
  743. --remaining_nodesselecttriples;
  744. } else {
  745. std::cerr << "Attempted to read SelectTriple<DPFnode> in online phase\n";
  746. }
  747. } else if (mpcio.mode != MODE_ONLINE) {
  748. --remaining_nodesselecttriples;
  749. }
  750. return val;
  751. }
  752. SelectTriple<value_t> MPCTIO::valselecttriple(yield_t &yield)
  753. {
  754. SelectTriple<value_t> val;
  755. if (mpcio.player < 2) {
  756. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  757. if (mpcpio.mode != MODE_ONLINE) {
  758. uint8_t Xbyte;
  759. yield();
  760. recv_server(&Xbyte, sizeof(Xbyte));
  761. val.X = Xbyte & 1;
  762. recv_server(&val.Y, sizeof(val.Y));
  763. recv_server(&val.Z, sizeof(val.Z));
  764. mpcpio.valselecttriples[thread_num].inc();
  765. } else {
  766. mpcpio.valselecttriples[thread_num].get(val);
  767. }
  768. } else if (mpcio.mode != MODE_ONLINE) {
  769. // Create triples (X0,Y0,Z0),(X1,Y1,Z1) such that
  770. // (X0*Y1 ^ Y0*X1) = (Z0^Z1)
  771. bit_t X0, X1;
  772. value_t Y0, Z0, Y1, Z1;
  773. X0 = arc4random() & 1;
  774. arc4random_buf(&Y0, sizeof(Y0));
  775. arc4random_buf(&Z0, sizeof(Z0));
  776. X1 = arc4random() & 1;
  777. arc4random_buf(&Y1, sizeof(Y1));
  778. value_t X0ext, X1ext;
  779. // Sign-extend X0 and X1 (so that 0 -> 0000...0 and
  780. // 1 -> 1111...1)
  781. X0ext = -value_t(X0);
  782. X1ext = -value_t(X1);
  783. Z1 = ((X0ext & Y1) ^ (X1ext & Y0)) ^ Z0;
  784. queue_p0(&X0, sizeof(X0));
  785. queue_p0(&Y0, sizeof(Y0));
  786. queue_p0(&Z0, sizeof(Z0));
  787. queue_p1(&X1, sizeof(X1));
  788. queue_p1(&Y1, sizeof(Y1));
  789. queue_p1(&Z1, sizeof(Z1));
  790. yield();
  791. }
  792. return val;
  793. }
  794. SelectTriple<bit_t> MPCTIO::bitselecttriple(yield_t &yield)
  795. {
  796. // Do we need to fetch a new AND triple?
  797. if (last_andtriple_bits_remaining == 0) {
  798. last_andtriple = andtriple(yield);
  799. last_andtriple_bits_remaining = 8*sizeof(value_t);
  800. }
  801. --last_andtriple_bits_remaining;
  802. value_t mask = value_t(1) << last_andtriple_bits_remaining;
  803. SelectTriple<bit_t> val;
  804. val.X = !!(std::get<0>(last_andtriple) & mask);
  805. val.Y = !!(std::get<1>(last_andtriple) & mask);
  806. val.Z = !!(std::get<2>(last_andtriple) & mask);
  807. return val;
  808. }
  809. CDPF MPCTIO::cdpf(yield_t &yield)
  810. {
  811. CDPF val;
  812. if (mpcio.player < 2) {
  813. MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
  814. if (mpcpio.mode != MODE_ONLINE) {
  815. yield();
  816. iostream_server() >> val;
  817. mpcpio.cdpfs[thread_num].inc();
  818. } else {
  819. mpcpio.cdpfs[thread_num].get(val);
  820. }
  821. } else if (mpcio.mode != MODE_ONLINE) {
  822. auto [ cdpf0, cdpf1 ] = CDPF::generate(aes_ops());
  823. iostream_p0() << cdpf0;
  824. iostream_p1() << cdpf1;
  825. yield();
  826. }
  827. return val;
  828. }
  829. // The port number for the P1 -> P0 connection
  830. static const unsigned short port_p1_p0 = 2115;
  831. // The port number for the P2 -> P0 connection
  832. static const unsigned short port_p2_p0 = 2116;
  833. // The port number for the P2 -> P1 connection
  834. static const unsigned short port_p2_p1 = 2117;
  835. void mpcio_setup_computational(unsigned player,
  836. boost::asio::io_context &io_context,
  837. const char *p0addr, // can be NULL when player=0
  838. int num_threads,
  839. std::deque<tcp::socket> &peersocks,
  840. std::deque<tcp::socket> &serversocks)
  841. {
  842. if (player == 0) {
  843. // Listen for connections from P1 and from P2
  844. tcp::acceptor acceptor_p1(io_context,
  845. tcp::endpoint(tcp::v4(), port_p1_p0));
  846. tcp::acceptor acceptor_p2(io_context,
  847. tcp::endpoint(tcp::v4(), port_p2_p0));
  848. peersocks.clear();
  849. serversocks.clear();
  850. for (int i=0;i<num_threads;++i) {
  851. peersocks.emplace_back(io_context);
  852. serversocks.emplace_back(io_context);
  853. }
  854. for (int i=0;i<num_threads;++i) {
  855. tcp::socket peersock = acceptor_p1.accept();
  856. // Read 2 bytes from the socket, which will be the thread
  857. // number
  858. unsigned short thread_num;
  859. boost::asio::read(peersock,
  860. boost::asio::buffer(&thread_num, sizeof(thread_num)));
  861. if (thread_num >= num_threads) {
  862. std::cerr << "Received bad thread number from peer\n";
  863. } else {
  864. peersocks[thread_num] = std::move(peersock);
  865. }
  866. }
  867. for (int i=0;i<num_threads;++i) {
  868. tcp::socket serversock = acceptor_p2.accept();
  869. // Read 2 bytes from the socket, which will be the thread
  870. // number
  871. unsigned short thread_num;
  872. boost::asio::read(serversock,
  873. boost::asio::buffer(&thread_num, sizeof(thread_num)));
  874. if (thread_num >= num_threads) {
  875. std::cerr << "Received bad thread number from server\n";
  876. } else {
  877. serversocks[thread_num] = std::move(serversock);
  878. }
  879. }
  880. } else if (player == 1) {
  881. // Listen for connections from P2, make num_threads connections to P0
  882. tcp::acceptor acceptor_p2(io_context,
  883. tcp::endpoint(tcp::v4(), port_p2_p1));
  884. tcp::resolver resolver(io_context);
  885. boost::system::error_code err;
  886. peersocks.clear();
  887. serversocks.clear();
  888. for (int i=0;i<num_threads;++i) {
  889. serversocks.emplace_back(io_context);
  890. }
  891. for (unsigned short thread_num = 0; thread_num < num_threads; ++thread_num) {
  892. tcp::socket peersock(io_context);
  893. while(1) {
  894. boost::asio::connect(peersock,
  895. resolver.resolve(p0addr, std::to_string(port_p1_p0)), err);
  896. if (!err) break;
  897. std::cerr << "Connection to p0 refused, will retry.\n";
  898. sleep(1);
  899. }
  900. // Write 2 bytes to the socket indicating which thread
  901. // number this socket is for
  902. boost::asio::write(peersock,
  903. boost::asio::buffer(&thread_num, sizeof(thread_num)));
  904. peersocks.push_back(std::move(peersock));
  905. }
  906. for (int i=0;i<num_threads;++i) {
  907. tcp::socket serversock = acceptor_p2.accept();
  908. // Read 2 bytes from the socket, which will be the thread
  909. // number
  910. unsigned short thread_num;
  911. boost::asio::read(serversock,
  912. boost::asio::buffer(&thread_num, sizeof(thread_num)));
  913. if (thread_num >= num_threads) {
  914. std::cerr << "Received bad thread number from server\n";
  915. } else {
  916. serversocks[thread_num] = std::move(serversock);
  917. }
  918. }
  919. } else {
  920. std::cerr << "Invalid player number passed to mpcio_setup_computational\n";
  921. }
  922. }
  923. void mpcio_setup_server(boost::asio::io_context &io_context,
  924. const char *p0addr, const char *p1addr, int num_threads,
  925. std::deque<tcp::socket> &p0socks,
  926. std::deque<tcp::socket> &p1socks)
  927. {
  928. // Make connections to P0 and P1
  929. tcp::resolver resolver(io_context);
  930. boost::system::error_code err;
  931. p0socks.clear();
  932. p1socks.clear();
  933. for (unsigned short thread_num = 0; thread_num < num_threads; ++thread_num) {
  934. tcp::socket p0sock(io_context);
  935. while(1) {
  936. boost::asio::connect(p0sock,
  937. resolver.resolve(p0addr, std::to_string(port_p2_p0)), err);
  938. if (!err) break;
  939. std::cerr << "Connection to p0 refused, will retry.\n";
  940. sleep(1);
  941. }
  942. // Write 2 bytes to the socket indicating which thread
  943. // number this socket is for
  944. boost::asio::write(p0sock,
  945. boost::asio::buffer(&thread_num, sizeof(thread_num)));
  946. p0socks.push_back(std::move(p0sock));
  947. }
  948. for (unsigned short thread_num = 0; thread_num < num_threads; ++thread_num) {
  949. tcp::socket p1sock(io_context);
  950. while(1) {
  951. boost::asio::connect(p1sock,
  952. resolver.resolve(p1addr, std::to_string(port_p2_p1)), err);
  953. if (!err) break;
  954. std::cerr << "Connection to p1 refused, will retry.\n";
  955. sleep(1);
  956. }
  957. // Write 2 bytes to the socket indicating which thread
  958. // number this socket is for
  959. boost::asio::write(p1sock,
  960. boost::asio::buffer(&thread_num, sizeof(thread_num)));
  961. p1socks.push_back(std::move(p1sock));
  962. }
  963. }