mpcio.hpp 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. #ifndef __MCPIO_HPP__
  2. #define __MCPIO_HPP__
  3. #include <iostream>
  4. #include <fstream>
  5. #include <tuple>
  6. #include <vector>
  7. #include <deque>
  8. #include <queue>
  9. #include <string>
  10. #include <cstdint>
  11. #include <boost/asio.hpp>
  12. #include <boost/coroutine2/all.hpp>
  13. #include <boost/thread.hpp>
  14. using boost::asio::ip::tcp;
  15. // Classes to represent stored precomputed data (e.g., multiplicative triples)
  16. typedef std::tuple<uint64_t, uint64_t, uint64_t> MultTriple;
  17. template<typename T>
  18. class PreCompStorage {
  19. public:
  20. PreCompStorage(unsigned player, bool preprocessing,
  21. const char *filenameprefix);
  22. void get(T& nextval);
  23. private:
  24. std::ifstream storage;
  25. };
  26. template<typename T>
  27. PreCompStorage<T>::PreCompStorage(unsigned player, bool preprocessing,
  28. const char *filenameprefix) {
  29. if (preprocessing) return;
  30. std::string filename(filenameprefix);
  31. char suffix[4];
  32. sprintf(suffix, ".p%d", player%10);
  33. filename.append(suffix);
  34. storage.open(filename);
  35. if (storage.fail()) {
  36. std::cerr << "Failed to open " << filename << "\n";
  37. exit(1);
  38. }
  39. }
  40. template<typename T>
  41. void PreCompStorage<T>::get(T& nextval) {
  42. storage.read((char *)&nextval, sizeof(T));
  43. if (storage.gcount() != sizeof(T)) {
  44. std::cerr << "Failed to read precomputed value from storage\n";
  45. exit(1);
  46. }
  47. }
  48. // A class to wrap a socket to another MPC party. This wrapping allows
  49. // us to do some useful logging, and perform async_writes transparently
  50. // to the application.
  51. class MPCSingleIO {
  52. tcp::socket sock;
  53. size_t totread, totwritten;
  54. std::vector<ssize_t> iotrace;
  55. // To avoid blocking if both we and our peer are trying to send
  56. // something very large, and neither side is receiving, we will send
  57. // with async_write. But this has a number of implications:
  58. // - The data to be sent has to be copied into this MPCSingleIO,
  59. // since asio::buffer pointers are not guaranteed to remain valid
  60. // after the end of the coroutine that created them
  61. // - We have to keep a queue of messages to be sent, in case
  62. // coroutines call send() before the previous message has finished
  63. // being sent
  64. // - This queue may be accessed from the async_write thread as well
  65. // as the work thread that uses this MPCSingleIO directly (there
  66. // should be only one of the latter), so we need some locking
  67. // This is where we accumulate data passed in queue()
  68. std::string dataqueue;
  69. // When send() is called, the above dataqueue is appended to this
  70. // messagequeue, and the dataqueue is reset. If messagequeue was
  71. // empty before this append, launch async_write to write the first
  72. // thing in the messagequeue. When async_write completes, it will
  73. // delete the first thing in the messagequeue, and see if there are
  74. // any more elements. If so, it will start another async_write.
  75. // The invariant is that there is an async_write currently running
  76. // iff messagequeue is nonempty.
  77. std::queue<std::string> messagequeue;
  78. // Never touch the above messagequeue without holding this lock (you
  79. // _can_ touch the strings it contains, though, if you looked one up
  80. // while holding the lock).
  81. boost::mutex messagequeuelock;
  82. // Asynchronously send the first message from the message queue.
  83. // * The messagequeuelock must be held when this is called! *
  84. // This method may be called from either thread (the work thread or
  85. // the async_write handler thread).
  86. void async_send_from_msgqueue() {
  87. boost::asio::async_write(sock,
  88. boost::asio::buffer(messagequeue.front()),
  89. [&](boost::system::error_code ec, std::size_t amt){
  90. messagequeuelock.lock();
  91. messagequeue.pop();
  92. if (messagequeue.size() > 0) {
  93. async_send_from_msgqueue();
  94. }
  95. messagequeuelock.unlock();
  96. });
  97. }
  98. public:
  99. MPCSingleIO(tcp::socket &&sock) :
  100. sock(std::move(sock)), totread(0), totwritten(0) {}
  101. void queue(const void *data, size_t len) {
  102. dataqueue.append((const char *)data, len);
  103. // If we already have some full packets worth of data, may as
  104. // well send it.
  105. if (dataqueue.size() > 28800) {
  106. send();
  107. }
  108. }
  109. void send() {
  110. size_t thissize = dataqueue.size();
  111. // Ignore spurious calls to send()
  112. if (thissize == 0) return;
  113. iotrace.push_back(thissize);
  114. messagequeuelock.lock();
  115. // Move the current message to send into the message queue (this
  116. // moves a pointer to the data, not copying the data itself)
  117. messagequeue.emplace(std::move(dataqueue));
  118. // If this is now the first thing in the message queue, launch
  119. // an async_write to write it
  120. if (messagequeue.size() == 1) {
  121. async_send_from_msgqueue();
  122. }
  123. messagequeuelock.unlock();
  124. }
  125. size_t recv(const std::vector<boost::asio::mutable_buffer>& buffers) {
  126. size_t res = boost::asio::read(sock, buffers);
  127. iotrace.push_back(-(ssize_t(res)));
  128. return res;
  129. }
  130. size_t recv(const boost::asio::mutable_buffer& buffer) {
  131. size_t res = boost::asio::read(sock, buffer);
  132. iotrace.push_back(-(ssize_t(res)));
  133. return res;
  134. }
  135. size_t recv(void *data, size_t len) {
  136. size_t res = boost::asio::read(sock, boost::asio::buffer(data, len));
  137. iotrace.push_back(-(ssize_t(res)));
  138. return res;
  139. }
  140. void dumptrace(std::ostream &os, const char *label = NULL) {
  141. if (label) {
  142. os << label << " ";
  143. }
  144. os << "IO trace:";
  145. for (auto& s: iotrace) {
  146. os << " " << s;
  147. }
  148. os << "\n";
  149. }
  150. void resettrace() {
  151. iotrace.clear();
  152. }
  153. };
  154. // A class to represent all of a computation party's IO, either to other
  155. // parties or to local storage
  156. struct MPCIO {
  157. int player;
  158. // We use a deque here instead of a vector because you can't have a
  159. // vector of a type without a copy constructor (tcp::socket is the
  160. // culprit), but you can have a deque of those for some reason.
  161. std::deque<MPCSingleIO> peerios;
  162. MPCSingleIO serverio;
  163. PreCompStorage<MultTriple> triples;
  164. MPCIO(unsigned player, bool preprocessing,
  165. std::deque<tcp::socket> &peersocks, tcp::socket &&serversock) :
  166. player(player), serverio(std::move(serversock)),
  167. triples(player, preprocessing, "triples") {
  168. for (auto &&sock : peersocks) {
  169. peerios.emplace_back(std::move(sock));
  170. }
  171. }
  172. };
  173. // A class to represent all of the server party's IO, either to
  174. // computational parties or to local storage
  175. struct MPCServerIO {
  176. MPCSingleIO p0io;
  177. MPCSingleIO p1io;
  178. MPCServerIO(bool preprocessing, tcp::socket &&p0sock,
  179. tcp::socket &&p1sock) :
  180. p0io(std::move(p0sock)), p1io(std::move(p1sock)) {}
  181. };
  182. // Set up the socket connections between the two computational parties
  183. // (P0 and P1) and the server party (P2). For each connection, the
  184. // lower-numbered party does the accept() and the higher-numbered party
  185. // does the connect().
  186. // Computational parties call this version with player=0 or 1
  187. void mpcio_setup_computational(unsigned player,
  188. boost::asio::io_context &io_context,
  189. const char *p0addr, // can be NULL when player=0
  190. int num_threads,
  191. std::deque<tcp::socket> &peersocks, tcp::socket &serversock);
  192. // Server calls this version with player=2
  193. void mpcio_setup_server(boost::asio::io_context &io_context,
  194. const char *p0addr, const char *p1addr,
  195. tcp::socket &p0sock, tcp::socket &p1sock);
  196. #endif