Browse Source

Start on the oblivds framework code

Ian Goldberg 1 year ago
commit
f9824f70d7
6 changed files with 536 additions and 0 deletions
  1. 12 0
      Makefile
  2. 67 0
      mpcio.cpp
  3. 220 0
      mpcio.hpp
  4. 127 0
      oblivds.cpp
  5. 101 0
      preproc.cpp
  6. 9 0
      preproc.hpp

+ 12 - 0
Makefile

@@ -0,0 +1,12 @@
+all: oblivds
+
+CXXFLAGS=-std=c++17 -Wall -ggdb
+LDFLAGS=-ggdb
+LDLIBS=-lbsd -lboost_system -lboost_context -lboost_thread -lpthread
+
+oblivds: oblivds.o mpcio.o preproc.o
+	g++ $(LDFLAGS) -o $@ $^ $(LDLIBS)
+
+oblivds.o: preproc.hpp mpcio.hpp
+mpcio.o: mpcio.hpp
+preproc.o: preproc.hpp mpcio.hpp

+ 67 - 0
mpcio.cpp

@@ -0,0 +1,67 @@
+#include "mpcio.hpp"
+
+// The port number for the P1 -> P0 connection
+static const unsigned short port_p1_p0 = 2115;
+
+// The port number for the P2 -> P0 connection
+static const unsigned short port_p2_p0 = 2116;
+
+// The port number for the P2 -> P1 connection
+static const unsigned short port_p2_p1 = 2117;
+
+void mpcio_setup_computational(unsigned player,
+    boost::asio::io_context &io_context,
+    const char *p0addr,  // can be NULL when player=0
+    tcp::socket &peersock, tcp::socket &serversock)
+{
+    if (player == 0) {
+        // Listen for connections from P1 and from P2
+        tcp::acceptor acceptor_p1(io_context,
+            tcp::endpoint(tcp::v4(), port_p1_p0));
+        tcp::acceptor acceptor_p2(io_context,
+            tcp::endpoint(tcp::v4(), port_p2_p0));
+
+        peersock = acceptor_p1.accept();
+        serversock = acceptor_p2.accept();
+    } else if (player == 1) {
+        // Listen for connections from P2, make a connection to P0
+        tcp::acceptor acceptor_p2(io_context,
+            tcp::endpoint(tcp::v4(), port_p2_p1));
+
+        tcp::resolver resolver(io_context);
+        boost::system::error_code err;
+        while(1) {
+            boost::asio::connect(peersock,
+                resolver.resolve(p0addr, std::to_string(port_p1_p0)), err);
+            if (!err) break;
+            std::cerr << "Connection to p0 refused, will retry.\n";
+            sleep(1);
+        }
+        serversock = acceptor_p2.accept();
+    } else {
+        std::cerr << "Invalid player number passed to mpcio_setup_computational\n";
+    }
+}
+
+void mpcio_setup_server(boost::asio::io_context &io_context,
+    const char *p0addr, const char *p1addr,
+    tcp::socket &p0sock, tcp::socket &p1sock)
+{
+    // Make connections to P0 and P1
+    tcp::resolver resolver(io_context);
+    boost::system::error_code err;
+    while(1) {
+        boost::asio::connect(p0sock,
+            resolver.resolve(p0addr, std::to_string(port_p2_p0)), err);
+        if (!err) break;
+        std::cerr << "Connection to p0 refused, will retry.\n";
+        sleep(1);
+    }
+    while(1) {
+        boost::asio::connect(p1sock,
+            resolver.resolve(p1addr, std::to_string(port_p2_p1)), err);
+        if (!err) break;
+        std::cerr << "Connection to p1 refused, will retry.\n";
+        sleep(1);
+    }
+}

+ 220 - 0
mpcio.hpp

@@ -0,0 +1,220 @@
+#ifndef __MCPIO_HPP__
+#define __MCPIO_HPP__
+
+#include <iostream>
+#include <fstream>
+#include <tuple>
+#include <vector>
+#include <queue>
+#include <string>
+#include <cstdint>
+
+#include <boost/asio.hpp>
+#include <boost/coroutine2/all.hpp>
+#include <boost/thread.hpp>
+
+using boost::asio::ip::tcp;
+
+// Classes to represent stored precomputed data (e.g., multiplicative triples)
+
+typedef std::tuple<uint64_t, uint64_t, uint64_t> MultTriple;
+
+template<typename T>
+class PreCompStorage {
+public:
+    PreCompStorage(unsigned player, bool preprocessing,
+        const char *filenameprefix);
+    void get(T& nextval);
+private:
+    std::ifstream storage;
+};
+
+template<typename T>
+PreCompStorage<T>::PreCompStorage(unsigned player, bool preprocessing,
+        const char *filenameprefix) {
+    if (preprocessing) return;
+    std::string filename(filenameprefix);
+    char suffix[4];
+    sprintf(suffix, ".p%d", player%10);
+    filename.append(suffix);
+    storage.open(filename);
+    if (storage.fail()) {
+        std::cerr << "Failed to open " << filename << "\n";
+        exit(1);
+    }
+}
+
+template<typename T>
+void PreCompStorage<T>::get(T& nextval) {
+    storage.read((char *)&nextval, sizeof(T));
+    if (storage.gcount() != sizeof(T)) {
+        std::cerr << "Failed to read precomputed value from storage\n";
+        exit(1);
+    }
+}
+
+// A class to wrap a socket to another MPC party.  This wrapping allows
+// us to do some useful logging, and perform async_writes transparently
+// to the application.
+
+class MPCSingleIO {
+    tcp::socket sock;
+    size_t totread, totwritten;
+    std::vector<ssize_t> iotrace;
+
+    // To avoid blocking if both we and our peer are trying to send
+    // something very large, and neither side is receiving, we will send
+    // with async_write.  But this has a number of implications:
+    // - The data to be sent has to be copied into this MPCSingleIO,
+    //   since asio::buffer pointers are not guaranteed to remain valid
+    //   after the end of the coroutine that created them
+    // - We have to keep a queue of messages to be sent, in case
+    //   coroutines call send() before the previous message has finished
+    //   being sent
+    // - This queue may be accessed from the async_write thread as well
+    //   as the work thread that uses this MPCSingleIO directly (there
+    //   should be only one of the latter), so we need some locking
+
+    // This is where we accumulate data passed in queue()
+    std::string dataqueue;
+
+    // When send() is called, the above dataqueue is appended to this
+    // messagequeue, and the dataqueue is reset.  If messagequeue was
+    // empty before this append, launch async_write to write the first
+    // thing in the messagequeue.  When async_write completes, it will
+    // delete the first thing in the messagequeue, and see if there are
+    // any more elements.  If so, it will start another async_write.
+    // The invariant is that there is an async_write currently running
+    // iff messagequeue is nonempty.
+    std::queue<std::string> messagequeue;
+
+    // Never touch the above messagequeue without holding this lock (you
+    // _can_ touch the strings it contains, though, if you looked one up
+    // while holding the lock).
+    boost::mutex messagequeuelock;
+
+    // Asynchronously send the first message from the message queue.
+    // * The messagequeuelock must be held when this is called! *
+    // This method may be called from either thread (the work thread or
+    // the async_write handler thread).
+    void async_send_from_msgqueue() {
+        boost::asio::async_write(sock,
+            boost::asio::buffer(messagequeue.front()),
+            [&](boost::system::error_code ec, std::size_t amt){
+                messagequeuelock.lock();
+                messagequeue.pop();
+                if (messagequeue.size() > 0) {
+                    async_send_from_msgqueue();
+                }
+                messagequeuelock.unlock();
+            });
+    }
+
+public:
+    MPCSingleIO(tcp::socket &&sock) :
+        sock(std::move(sock)), totread(0), totwritten(0) {}
+
+    void queue(const void *data, size_t len) {
+        dataqueue.append((const char *)data, len);
+    }
+
+    void send() {
+        size_t thissize = dataqueue.size();
+        // Ignore spurious calls to send()
+        if (thissize == 0) return;
+
+        iotrace.push_back(thissize);
+
+        messagequeuelock.lock();
+        // Move the current message to send into the message queue (this
+        // moves a pointer to the data, not copying the data itself)
+        messagequeue.emplace(std::move(dataqueue));
+        // If this is now the first thing in the message queue, launch
+        // an async_write to write it
+        if (messagequeue.size() == 1) {
+            async_send_from_msgqueue();
+        }
+        messagequeuelock.unlock();
+    }
+
+    size_t recv(const std::vector<boost::asio::mutable_buffer>& buffers) {
+        size_t res = boost::asio::read(sock, buffers);
+        iotrace.push_back(-(ssize_t(res)));
+        return res;
+    }
+
+    size_t recv(const boost::asio::mutable_buffer& buffer) {
+        size_t res = boost::asio::read(sock, buffer);
+        iotrace.push_back(-(ssize_t(res)));
+        return res;
+    }
+
+    size_t recv(void *data, size_t len) {
+        size_t res = boost::asio::read(sock, boost::asio::buffer(data, len));
+        iotrace.push_back(-(ssize_t(res)));
+        return res;
+    }
+
+    void dumptrace(std::ostream &os, const char *label = NULL) {
+        if (label) {
+            os << label << " ";
+        }
+        os << "IO trace:";
+        for (auto& s: iotrace) {
+            os << " " << s;
+        }
+        os << "\n";
+    }
+
+    void resettrace() {
+        iotrace.clear();
+    }
+};
+
+// A class to represent all of a computation party's IO, either to other
+// parties or to local storage
+
+struct MPCIO {
+    int player;
+    MPCSingleIO peerio;
+    MPCSingleIO serverio;
+    PreCompStorage<MultTriple> triples;
+
+    MPCIO(unsigned player, bool preprocessing,
+            tcp::socket &&peersock, tcp::socket &&serversock) :
+        player(player),
+        peerio(std::move(peersock)), serverio(std::move(serversock)),
+        triples(player, preprocessing, "triples") {}
+};
+
+// A class to represent all of the server party's IO, either to
+// computational parties or to local storage
+
+struct MPCServerIO {
+    MPCSingleIO p0io;
+    MPCSingleIO p1io;
+
+    MPCServerIO(bool preprocessing, tcp::socket &&p0sock,
+            tcp::socket &&p1sock) :
+        p0io(std::move(p0sock)), p1io(std::move(p1sock)) {}
+};
+
+// Set up the socket connections between the two computational parties
+// (P0 and P1) and the server party (P2).  For each connection, the
+// lower-numbered party does the accept() and the higher-numbered party
+// does the connect().
+
+// Computational parties call this version with player=0 or 1
+
+void mpcio_setup_computational(unsigned player,
+    boost::asio::io_context &io_context,
+    const char *p0addr,  // can be NULL when player=0
+    tcp::socket &peersock, tcp::socket &serversock);
+
+// Server calls this version with player=2
+
+void mpcio_setup_server(boost::asio::io_context &io_context,
+    const char *p0addr, const char *p1addr,
+    tcp::socket &p0sock, tcp::socket &p1sock);
+
+#endif

+ 127 - 0
oblivds.cpp

@@ -0,0 +1,127 @@
+#include <iostream>
+
+#include "mpcio.hpp"
+#include "preproc.hpp"
+
+static void usage(const char *progname)
+{
+    std::cerr << "Usage: " << progname << " [-p] player_num player_addrs args ...\n";
+    std::cerr << "player_num = 0 or 1 for the computational players\n";
+    std::cerr << "player_num = 2 for the server player\n";
+    std::cerr << "player_addrs is omitted for player 0\n";
+    std::cerr << "player_addrs is p0's hostname for player 1\n";
+    std::cerr << "player_addrs is p0's hostname followed by p1's hostname for player 2\n";
+    exit(1);
+}
+
+static void comp_player_main(boost::asio::io_context &io_context,
+    unsigned player, bool preprocessing, const char *p0addr, char **args)
+{
+    tcp::socket peersock(io_context), serversock(io_context);
+    mpcio_setup_computational(player, io_context,
+        p0addr, peersock, serversock);
+    MPCIO mpcio(player, preprocessing, std::move(peersock),
+        std::move(serversock));
+
+    // Queue up the work to be done
+    boost::asio::post(io_context, [&]{
+        if (preprocessing) {
+            preprocessing_comp(mpcio, args);
+        }
+    });
+
+    // Start another thread; one will perform the work and the other
+    // will execute the async_write handlers
+    boost::thread t([&]{io_context.run();});
+    io_context.run();
+    t.join();
+}
+
+static void server_player_main(boost::asio::io_context &io_context,
+    bool preprocessing, const char *p0addr, const char *p1addr, char **args)
+{
+    tcp::socket p0sock(io_context), p1sock(io_context);
+    mpcio_setup_server(io_context, p0addr, p1addr, p0sock, p1sock);
+    MPCServerIO mpcserverio(preprocessing, std::move(p0sock),
+        std::move(p1sock));
+
+    // Queue up the work to be done
+    boost::asio::post(io_context, [&]{
+        if (preprocessing) {
+            preprocessing_server(mpcserverio, args);
+        }
+    });
+
+    // Start another thread; one will perform the work and the other
+    // will execute the async_write handlers
+    boost::thread t([&]{io_context.run();});
+    io_context.run();
+    t.join();
+}
+
+int main(int argc, char **argv)
+{
+    char **args = argv+1; // Skip argv[0] (the program name)
+    bool preprocessing = false;
+    unsigned player = 0;
+    const char *p0addr = NULL;
+    const char *p1addr = NULL;
+    if (argc > 1 && !strcmp("-p", *args)) {
+        preprocessing = true;
+        ++args;
+    }
+    if (*args == NULL) {
+        // No arguments?
+        usage(argv[0]);
+    } else {
+        player = atoi(*args);
+        ++args;
+    }
+    if (player > 2) {
+        usage(argv[0]);
+    }
+    if (player > 0) {
+        if (*args == NULL) {
+            usage(argv[0]);
+        } else {
+            p0addr = *args;
+            ++args;
+        }
+    }
+    if (player > 1) {
+        if (*args == NULL) {
+            usage(argv[0]);
+        } else {
+            p1addr = *args;
+            ++args;
+        }
+    }
+
+    /*
+    std::cout << "Preprocessing = " <<
+            (preprocessing ? "true" : "false") << "\n";
+    std::cout << "Player = " << player << "\n";
+    if (p0addr) {
+        std::cout << "Player 0 addr = " << p0addr << "\n";
+    }
+    if (p1addr) {
+        std::cout << "Player 1 addr = " << p1addr << "\n";
+    }
+    std::cout << "Args =";
+    for (char **a = args; *a; ++a) {
+        std::cout << " " << *a;
+    }
+    std::cout << "\n";
+    */
+
+    // Make the network connections
+    boost::asio::io_context io_context;
+
+    if (player < 2) {
+        comp_player_main(io_context, player, preprocessing, p0addr, args);
+    } else {
+        server_player_main(io_context, preprocessing, p0addr, p1addr, args);
+    }
+
+    return 0;
+}

+ 101 - 0
preproc.cpp

@@ -0,0 +1,101 @@
+#include <bsd/stdlib.h> // arc4random_buf
+
+#include "preproc.hpp"
+
+// Open a file for writing with name the given prefix, and ".pX" suffix,
+// where X is the (one-digit) player number
+static std::ofstream openfile(const char *prefix, unsigned player)
+{
+    std::string filename(prefix);
+    char suffix[4];
+    sprintf(suffix, ".p%d", player%10);
+    filename.append(suffix);
+    std::ofstream f;
+    f.open(filename);
+    if (f.fail()) {
+        std::cerr << "Failed to open " << filename << "\n";
+        exit(1);
+    }
+    return f;
+}
+
+// The server-to-computational-peer protocol for sending precomputed
+// data is:
+//
+// One byte: type
+//   0x80: Multiplication triple
+//   0x81: Multiplication half-triple
+//   0x01 to 0x40: DPF of that depth
+//   0x00: End of preprocessing
+//
+// Four bytes: number of objects of that type (not sent for type == 0x00)
+//
+// Then that number of objects
+//
+// Repeat the whole thing until type == 0x00 is received
+
+void preprocessing_comp(MPCIO &mpcio, char **args)
+{
+    while(1) {
+        unsigned char type = 0;
+        unsigned int num = 0;
+        size_t res = mpcio.serverio.recv(&type, 1);
+        if (res < 1 || type == 0) break;
+        mpcio.serverio.recv(&num, 4);
+        if (type == 0x80) {
+            // Multiplication triples
+            std::ofstream tripfile = openfile("triples", mpcio.player);
+            
+            MultTriple T;
+            for (unsigned int i=0; i<num; ++i) {
+                res = mpcio.serverio.recv(&T, sizeof(T));
+                if (res < sizeof(T)) break;
+                tripfile.write((const char *)&T, sizeof(T));
+            }
+            tripfile.close();
+        }
+    }
+}
+
+void preprocessing_server(MPCServerIO &mpcsrvio, char **args)
+{
+    unsigned int numtriples = 100;
+    if (*args) {
+        numtriples = atoi(*args);
+        ++args;
+    }
+    unsigned char type = 0x80;
+    mpcsrvio.p0io.queue(&type, 1);
+    mpcsrvio.p0io.queue(&numtriples, 4);
+    mpcsrvio.p1io.queue(&type, 1);
+    mpcsrvio.p1io.queue(&numtriples, 4);
+
+    for (unsigned int i=0; i<numtriples; ++i) {
+        uint64_t X0, Y0, Z0, X1, Y1, Z1;
+        arc4random_buf(&X0, sizeof(X0));
+        arc4random_buf(&Y0, sizeof(Y0));
+        arc4random_buf(&Z0, sizeof(Z0));
+        arc4random_buf(&X1, sizeof(X1));
+        arc4random_buf(&Y1, sizeof(Y1));
+        Z1 = X0 * Y1 + X1 * Y0 - Z0;
+        MultTriple T0, T1;
+        T0 = std::make_tuple(X0, Y0, Z0);
+        T1 = std::make_tuple(X1, Y1, Z1);
+        mpcsrvio.p0io.queue(&T0, sizeof(T0));
+        mpcsrvio.p1io.queue(&T1, sizeof(T1));
+        // Flush the queue every so often
+        if ((i % 1000) == 999) {
+            mpcsrvio.p0io.send();
+            mpcsrvio.p1io.send();
+        }
+    }
+    mpcsrvio.p0io.send();
+    mpcsrvio.p1io.send();
+
+    // That's all
+    type = 0x00;
+    mpcsrvio.p0io.queue(&type, 1);
+    mpcsrvio.p1io.queue(&type, 1);
+    mpcsrvio.p0io.send();
+    mpcsrvio.p1io.send();
+}

+ 9 - 0
preproc.hpp

@@ -0,0 +1,9 @@
+#ifndef __PREPROC_HPP__
+#define __PREPROC_HPP__
+
+#include "mpcio.hpp"
+
+void preprocessing_comp(MPCIO &mpcio, char **args);
+void preprocessing_server(MPCServerIO &mpcio, char **args);
+
+#endif