#include // getrusage #include // getrusage #include "mpcio.hpp" #include "rdpf.hpp" #include "cdpf.hpp" #include "bitutils.hpp" #include "coroutine.hpp" // T is the type being stored // N is a type whose "name" static member is a string naming the type // so that we can report something useful to the user if they try // to read a type that we don't have any more values for template PreCompStorage::PreCompStorage(unsigned player, ProcessingMode mode, const char *filenameprefix, unsigned thread_num) : name(N::name), depth(0) { init(player, mode, filenameprefix, thread_num); } template void PreCompStorage::init(unsigned player, ProcessingMode mode, const char *filenameprefix, unsigned thread_num, nbits_t depth) { if (mode != MODE_ONLINE) return; std::string filename(filenameprefix); char suffix[20]; if (depth) { this->depth = depth; sprintf(suffix, "%02d.p%d.t%u", depth, player%10, thread_num); } else { sprintf(suffix, ".p%d.t%u", player%10, thread_num); } filename.append(suffix); storage.open(filename); // It's OK if not every file exists; so don't worry about checking // for errors here. We'll report an error in get() if we actually // try to use a value for which we don't have a precomputed file. count = 0; } template void PreCompStorage::get(T& nextval) { storage >> nextval; if (!storage.good()) { std::cerr << "Failed to read precomputed value from " << name; if (depth) { std::cerr << (int)depth; } std::cerr << " storage\n"; exit(1); } ++count; } void MPCSingleIO::async_send_from_msgqueue() { #ifdef SEND_LAMPORT_CLOCKS std::vector tosend; tosend.push_back(boost::asio::buffer(messagequeue.front().header)); tosend.push_back(boost::asio::buffer(messagequeue.front().message)); #endif boost::asio::async_write(sock, #ifdef SEND_LAMPORT_CLOCKS tosend, #else boost::asio::buffer(messagequeue.front()), #endif [&](boost::system::error_code ec, std::size_t amt){ messagequeuelock.lock(); messagequeue.pop(); if (messagequeue.size() > 0) { async_send_from_msgqueue(); } messagequeuelock.unlock(); }); } size_t MPCSingleIO::queue(const void *data, size_t len, lamport_t lamport) { // Is this a new message? size_t newmsg = 0; dataqueue.append((const char *)data, len); // If this is the first queue() since the last explicit send(), // which we'll know because message_lamport will be nullopt, set // message_lamport to the current Lamport clock. Note that the // boolean test tests whether message_lamport is nullopt, not // whether its value is zero. if (!message_lamport) { message_lamport = lamport; newmsg = 1; } #ifdef VERBOSE_COMMS printf("Queue %s.%d len=%lu lamp=%u: ", dest.c_str(), thread_num, len, message_lamport.value()); for (size_t i=0;i 28800) { send(true); } return newmsg; } void MPCSingleIO::send(bool implicit_send) { size_t thissize = dataqueue.size(); // Ignore spurious calls to send(), except for resetting // message_lamport if this was an explicit send(). if (thissize == 0) { #ifdef SEND_LAMPORT_CLOCKS // If this was an explicit send(), reset the message_lamport so // that it gets updated at the next queue(). if (!implicit_send) { message_lamport.reset(); } #endif return; } #ifdef RECORD_IOTRACE iotrace.push_back(thissize); #endif messagequeuelock.lock(); // Move the current message to send into the message queue (this // moves a pointer to the data, not copying the data itself) #ifdef SEND_LAMPORT_CLOCKS messagequeue.emplace(std::move(dataqueue), message_lamport.value()); // If this was an explicit send(), reset the message_lamport so // that it gets updated at the next queue(). if (!implicit_send) { message_lamport.reset(); } #else messagequeue.emplace(std::move(dataqueue)); #endif // If this is now the first thing in the message queue, launch // an async_write to write it if (messagequeue.size() == 1) { async_send_from_msgqueue(); } messagequeuelock.unlock(); } size_t MPCSingleIO::recv(void *data, size_t len, lamport_t &lamport) { #ifdef VERBOSE_COMMS size_t orig_len = len; printf("Recv %s.%d len=%lu lamp=%u ", dest.c_str(), thread_num, len, lamport); #endif #ifdef SEND_LAMPORT_CLOCKS char *cdata = (char *)data; size_t res = 0; while (len > 0) { while (recvdataremain == 0) { // Read a new header char hdr[sizeof(uint32_t) + sizeof(lamport_t)]; uint32_t datalen; lamport_t recv_lamport; boost::asio::read(sock, boost::asio::buffer(hdr, sizeof(hdr))); memmove(&datalen, hdr, sizeof(datalen)); memmove(&recv_lamport, hdr+sizeof(datalen), sizeof(lamport_t)); lamport_t new_lamport = recv_lamport + 1; if (lamport < new_lamport) { lamport = new_lamport; } if (datalen > 0) { recvdata.resize(datalen, '\0'); boost::asio::read(sock, boost::asio::buffer(recvdata)); recvdataremain = datalen; } } size_t amttoread = len; if (amttoread > recvdataremain) { amttoread = recvdataremain; } memmove(cdata, recvdata.data()+recvdata.size()-recvdataremain, amttoread); cdata += amttoread; len -= amttoread; recvdataremain -= amttoread; res += amttoread; } #else size_t res = boost::asio::read(sock, boost::asio::buffer(data, len)); #endif #ifdef VERBOSE_COMMS printf("nlamp=%u: ", lamport); for (size_t i=0;i(steady_elapsed) << " wall clock time\n"; os << cpu_elapsed << " {real;user;system}\n"; dump_memusage(os); } MPCPeerIO::MPCPeerIO(unsigned player, ProcessingMode mode, std::deque &peersocks, std::deque &serversocks) : MPCIO(player, mode, peersocks.size()) { unsigned num_threads = unsigned(peersocks.size()); for (unsigned i=0; i 0) { os << " "; } os << "T" << i << " t:" << triples[i].get_stats() << " h:" << halftriples[i].get_stats(); for (nbits_t depth=1; depth<=ADDRESS_MAX_BITS; ++depth) { size_t cnt = rdpftriples[i][depth-1].get_stats(); if (cnt > 0) { os << " r" << int(depth) << ":" << cnt; } } size_t ccnt = cdpfs[i].get_stats(); if (ccnt > 0) { os << " c:" << ccnt; } } os << "\n"; } void MPCPeerIO::reset_precomp_stats() { for (size_t i=0; i &p0socks, std::deque &p1socks) : MPCIO(2, mode, p0socks.size()) { rdpfpairs.resize(num_threads); for (unsigned i=0; i 0) { os << " "; } os << "T" << i; for (nbits_t depth=1; depth<=ADDRESS_MAX_BITS; ++depth) { size_t cnt = rdpfpairs[i][depth-1].get_stats(); if (cnt > 0) { os << " r" << int(depth) << ":" << cnt; } } } os << "\n"; } void MPCServerIO::reset_precomp_stats() { for (size_t i=0; i(mpcio); peer_iostream.emplace(mpcpio.peerios[thread_num], thread_lamport, mpcpio.msgs_sent[thread_num], mpcpio.msg_bytes_sent[thread_num]); server_iostream.emplace(mpcpio.serverios[thread_num], thread_lamport, mpcpio.msgs_sent[thread_num], mpcpio.msg_bytes_sent[thread_num]); } else { MPCServerIO &mpcsrvio = static_cast(mpcio); p0_iostream.emplace(mpcsrvio.p0ios[thread_num], thread_lamport, mpcsrvio.msgs_sent[thread_num], mpcsrvio.msg_bytes_sent[thread_num]); p1_iostream.emplace(mpcsrvio.p1ios[thread_num], thread_lamport, mpcsrvio.msgs_sent[thread_num], mpcsrvio.msg_bytes_sent[thread_num]); } } // Sync our per-thread lamport clock with the master one in the // mpcio. You only need to call this explicitly if your MPCTIO // outlives your thread (in which case call it after the join), or // if your threads do interthread communication amongst themselves // (in which case call it in the sending thread before the send, and // call it in the receiving thread after the receive). void MPCTIO::sync_lamport() { // Update the mpcio Lamport time to be max of the thread Lamport // time and what we thought it was before. We use this // compare_exchange construction in order to atomically // do the comparison, computation, and replacement lamport_t old_lamport = mpcio.lamport; lamport_t new_lamport = thread_lamport; do { if (new_lamport < old_lamport) { new_lamport = old_lamport; } // The next line atomically checks if lamport still has // the value old_lamport; if so, it changes its value to // new_lamport and returns true (ending the loop). If // not, it sets old_lamport to the current value of // lamport, and returns false (continuing the loop so // that new_lamport can be recomputed based on this new // value). } while (!mpcio.lamport.compare_exchange_weak( old_lamport, new_lamport)); thread_lamport = new_lamport; } // Queue up data to the peer or to the server void MPCTIO::queue_peer(const void *data, size_t len) { if (mpcio.player < 2) { MPCPeerIO &mpcpio = static_cast(mpcio); size_t newmsg = mpcpio.peerios[thread_num].queue(data, len, thread_lamport); mpcpio.msgs_sent[thread_num] += newmsg; mpcpio.msg_bytes_sent[thread_num] += len; } } void MPCTIO::queue_server(const void *data, size_t len) { if (mpcio.player < 2) { MPCPeerIO &mpcpio = static_cast(mpcio); size_t newmsg = mpcpio.serverios[thread_num].queue(data, len, thread_lamport); mpcpio.msgs_sent[thread_num] += newmsg; mpcpio.msg_bytes_sent[thread_num] += len; } } // Receive data from the peer or to the server size_t MPCTIO::recv_peer(void *data, size_t len) { if (mpcio.player < 2) { MPCPeerIO &mpcpio = static_cast(mpcio); return mpcpio.peerios[thread_num].recv(data, len, thread_lamport); } return 0; } size_t MPCTIO::recv_server(void *data, size_t len) { if (mpcio.player < 2) { MPCPeerIO &mpcpio = static_cast(mpcio); return mpcpio.serverios[thread_num].recv(data, len, thread_lamport); } return 0; } // Queue up data to p0 or p1 void MPCTIO::queue_p0(const void *data, size_t len) { if (mpcio.player == 2) { MPCServerIO &mpcsrvio = static_cast(mpcio); size_t newmsg = mpcsrvio.p0ios[thread_num].queue(data, len, thread_lamport); mpcsrvio.msgs_sent[thread_num] += newmsg; mpcsrvio.msg_bytes_sent[thread_num] += len; } } void MPCTIO::queue_p1(const void *data, size_t len) { if (mpcio.player == 2) { MPCServerIO &mpcsrvio = static_cast(mpcio); size_t newmsg = mpcsrvio.p1ios[thread_num].queue(data, len, thread_lamport); mpcsrvio.msgs_sent[thread_num] += newmsg; mpcsrvio.msg_bytes_sent[thread_num] += len; } } // Receive data from p0 or p1 size_t MPCTIO::recv_p0(void *data, size_t len) { if (mpcio.player == 2) { MPCServerIO &mpcsrvio = static_cast(mpcio); return mpcsrvio.p0ios[thread_num].recv(data, len, thread_lamport); } return 0; } size_t MPCTIO::recv_p1(void *data, size_t len) { if (mpcio.player == 2) { MPCServerIO &mpcsrvio = static_cast(mpcio); return mpcsrvio.p1ios[thread_num].recv(data, len, thread_lamport); } return 0; } // Send all queued data for this thread void MPCTIO::send() { #ifdef VERBOSE_COMMS printf("Thread %u sending round %lu\n", thread_num, ++round_num); #endif if (mpcio.player < 2) { MPCPeerIO &mpcpio = static_cast(mpcio); mpcpio.peerios[thread_num].send(); mpcpio.serverios[thread_num].send(); } else { MPCServerIO &mpcsrvio = static_cast(mpcio); mpcsrvio.p0ios[thread_num].send(); mpcsrvio.p1ios[thread_num].send(); } } // Functions to get precomputed values. If we're in the online // phase, get them from PreCompStorage. If we're in the // preprocessing or online-only phase, read them from the server. MultTriple MPCTIO::triple(yield_t &yield) { MultTriple val; if (mpcio.player < 2) { MPCPeerIO &mpcpio = static_cast(mpcio); if (mpcpio.mode != MODE_ONLINE) { yield(); recv_server(&val, sizeof(val)); mpcpio.triples[thread_num].inc(); } else { mpcpio.triples[thread_num].get(val); } } else if (mpcio.mode != MODE_ONLINE) { // Create triples (X0,Y0,Z0),(X1,Y1,Z1) such that // (X0*Y1 + Y0*X1) = (Z0+Z1) value_t X0, Y0, Z0, X1, Y1, Z1; arc4random_buf(&X0, sizeof(X0)); arc4random_buf(&Y0, sizeof(Y0)); arc4random_buf(&Z0, sizeof(Z0)); arc4random_buf(&X1, sizeof(X1)); arc4random_buf(&Y1, sizeof(Y1)); Z1 = X0 * Y1 + X1 * Y0 - Z0; MultTriple T0, T1; T0 = std::make_tuple(X0, Y0, Z0); T1 = std::make_tuple(X1, Y1, Z1); queue_p0(&T0, sizeof(T0)); queue_p1(&T1, sizeof(T1)); yield(); } return val; } HalfTriple MPCTIO::halftriple(yield_t &yield) { HalfTriple val; if (mpcio.player < 2) { MPCPeerIO &mpcpio = static_cast(mpcio); if (mpcpio.mode != MODE_ONLINE) { yield(); recv_server(&val, sizeof(val)); mpcpio.halftriples[thread_num].inc(); } else { mpcpio.halftriples[thread_num].get(val); } } else if (mpcio.mode != MODE_ONLINE) { // Create half-triples (X0,Z0),(Y1,Z1) such that // X0*Y1 = Z0 + Z1 value_t X0, Z0, Y1, Z1; arc4random_buf(&X0, sizeof(X0)); arc4random_buf(&Z0, sizeof(Z0)); arc4random_buf(&Y1, sizeof(Y1)); Z1 = X0 * Y1 - Z0; HalfTriple H0, H1; H0 = std::make_tuple(X0, Z0); H1 = std::make_tuple(Y1, Z1); queue_p0(&H0, sizeof(H0)); queue_p1(&H1, sizeof(H1)); yield(); } return val; } SelectTriple MPCTIO::selecttriple(yield_t &yield) { SelectTriple val; if (mpcio.player < 2) { MPCPeerIO &mpcpio = static_cast(mpcio); if (mpcpio.mode != MODE_ONLINE) { uint8_t Xbyte; yield(); recv_server(&Xbyte, sizeof(Xbyte)); val.X = Xbyte & 1; recv_server(&val.Y, sizeof(val.Y)); recv_server(&val.Z, sizeof(val.Z)); } else { std::cerr << "Attempted to read SelectTriple in online phase\n"; } } else if (mpcio.mode != MODE_ONLINE) { // Create triples (X0,Y0,Z0),(X1,Y1,Z1) such that // (X0*Y1 ^ Y0*X1) = (Z0^Z1) bit_t X0, X1; DPFnode Y0, Z0, Y1, Z1; X0 = arc4random() & 1; arc4random_buf(&Y0, sizeof(Y0)); arc4random_buf(&Z0, sizeof(Z0)); X1 = arc4random() & 1; arc4random_buf(&Y1, sizeof(Y1)); DPFnode X0ext, X1ext; // Sign-extend X0 and X1 (so that 0 -> 0000...0 and // 1 -> 1111...1) X0ext = if128_mask[X0]; X1ext = if128_mask[X1]; Z1 = ((X0ext & Y1) ^ (X1ext & Y0)) ^ Z0; queue_p0(&X0, sizeof(X0)); queue_p0(&Y0, sizeof(Y0)); queue_p0(&Z0, sizeof(Z0)); queue_p1(&X1, sizeof(X1)); queue_p1(&Y1, sizeof(Y1)); queue_p1(&Z1, sizeof(Z1)); yield(); } return val; } // Only computational peers call this; the server should be calling // rdpfpair() at the same time RDPFTriple MPCTIO::rdpftriple(yield_t &yield, nbits_t depth, bool keep_expansion) { assert(mpcio.player < 2); RDPFTriple val; MPCPeerIO &mpcpio = static_cast(mpcio); if (mpcio.mode == MODE_ONLINE) { mpcpio.rdpftriples[thread_num][depth-1].get(val); } else { val = RDPFTriple(*this, yield, depth, keep_expansion); iostream_server() << val.dpf[(mpcio.player == 0) ? 1 : 2]; mpcpio.rdpftriples[thread_num][depth-1].inc(); yield(); } return val; } // Only the server calls this; the computational peers should be calling // rdpftriple() at the same time RDPFPair MPCTIO::rdpfpair(yield_t &yield, nbits_t depth) { assert(mpcio.player == 2); RDPFPair val; MPCServerIO &mpcsrvio = static_cast(mpcio); if (mpcio.mode == MODE_ONLINE) { mpcsrvio.rdpfpairs[thread_num][depth-1].get(val); } else { RDPFTriple trip(*this, yield, depth, true); yield(); iostream_p0() >> val.dpf[0]; iostream_p1() >> val.dpf[1]; mpcsrvio.rdpfpairs[thread_num][depth-1].inc(); } return val; } CDPF MPCTIO::cdpf(yield_t &yield) { CDPF val; if (mpcio.player < 2) { MPCPeerIO &mpcpio = static_cast(mpcio); if (mpcpio.mode != MODE_ONLINE) { yield(); iostream_server() >> val; mpcpio.cdpfs[thread_num].inc(); } else { mpcpio.cdpfs[thread_num].get(val); } } else if (mpcio.mode != MODE_ONLINE) { auto [ cdpf0, cdpf1 ] = CDPF::generate(aes_ops()); iostream_p0() << cdpf0; iostream_p1() << cdpf1; yield(); } return val; } // The port number for the P1 -> P0 connection static const unsigned short port_p1_p0 = 2115; // The port number for the P2 -> P0 connection static const unsigned short port_p2_p0 = 2116; // The port number for the P2 -> P1 connection static const unsigned short port_p2_p1 = 2117; void mpcio_setup_computational(unsigned player, boost::asio::io_context &io_context, const char *p0addr, // can be NULL when player=0 int num_threads, std::deque &peersocks, std::deque &serversocks) { if (player == 0) { // Listen for connections from P1 and from P2 tcp::acceptor acceptor_p1(io_context, tcp::endpoint(tcp::v4(), port_p1_p0)); tcp::acceptor acceptor_p2(io_context, tcp::endpoint(tcp::v4(), port_p2_p0)); peersocks.clear(); serversocks.clear(); for (int i=0;i= num_threads) { std::cerr << "Received bad thread number from peer\n"; } else { peersocks[thread_num] = std::move(peersock); } } for (int i=0;i= num_threads) { std::cerr << "Received bad thread number from server\n"; } else { serversocks[thread_num] = std::move(serversock); } } } else if (player == 1) { // Listen for connections from P2, make num_threads connections to P0 tcp::acceptor acceptor_p2(io_context, tcp::endpoint(tcp::v4(), port_p2_p1)); tcp::resolver resolver(io_context); boost::system::error_code err; peersocks.clear(); serversocks.clear(); for (int i=0;i= num_threads) { std::cerr << "Received bad thread number from server\n"; } else { serversocks[thread_num] = std::move(serversock); } } } else { std::cerr << "Invalid player number passed to mpcio_setup_computational\n"; } } void mpcio_setup_server(boost::asio::io_context &io_context, const char *p0addr, const char *p1addr, int num_threads, std::deque &p0socks, std::deque &p1socks) { // Make connections to P0 and P1 tcp::resolver resolver(io_context); boost::system::error_code err; p0socks.clear(); p1socks.clear(); for (unsigned short thread_num = 0; thread_num < num_threads; ++thread_num) { tcp::socket p0sock(io_context); while(1) { boost::asio::connect(p0sock, resolver.resolve(p0addr, std::to_string(port_p2_p0)), err); if (!err) break; std::cerr << "Connection to p0 refused, will retry.\n"; sleep(1); } // Write 2 bytes to the socket indicating which thread // number this socket is for boost::asio::write(p0sock, boost::asio::buffer(&thread_num, sizeof(thread_num))); p0socks.push_back(std::move(p0sock)); } for (unsigned short thread_num = 0; thread_num < num_threads; ++thread_num) { tcp::socket p1sock(io_context); while(1) { boost::asio::connect(p1sock, resolver.resolve(p1addr, std::to_string(port_p2_p1)), err); if (!err) break; std::cerr << "Connection to p1 refused, will retry.\n"; sleep(1); } // Write 2 bytes to the socket indicating which thread // number this socket is for boost::asio::write(p1sock, boost::asio::buffer(&thread_num, sizeof(thread_num))); p1socks.push_back(std::move(p1sock)); } }