Prechádzať zdrojové kódy

Add a flag to specify the number of threads the computational peers will use

The computational peers set up that many sockets between them so that
the matching threads on the two peers can directly communicate.
Ian Goldberg 2 rokov pred
rodič
commit
426344a717
3 zmenil súbory, kde vykonal 78 pridanie a 26 odobranie
  1. 34 9
      mpcio.cpp
  2. 14 6
      mpcio.hpp
  3. 30 11
      oblivds.cpp

+ 34 - 9
mpcio.cpp

@@ -12,7 +12,8 @@ static const unsigned short port_p2_p1 = 2117;
 void mpcio_setup_computational(unsigned player,
     boost::asio::io_context &io_context,
     const char *p0addr,  // can be NULL when player=0
-    tcp::socket &peersock, tcp::socket &serversock)
+    int num_threads,
+    std::deque<tcp::socket> &peersocks, tcp::socket &serversock)
 {
     if (player == 0) {
         // Listen for connections from P1 and from P2
@@ -21,21 +22,45 @@ void mpcio_setup_computational(unsigned player,
         tcp::acceptor acceptor_p2(io_context,
             tcp::endpoint(tcp::v4(), port_p2_p0));
 
-        peersock = acceptor_p1.accept();
+        for (int i=0;i<num_threads;++i) {
+            peersocks.emplace_back(io_context);
+        }
+        for (int i=0;i<num_threads;++i) {
+            tcp::socket peersock = acceptor_p1.accept();
+            // Read 2 bytes from the socket, which will be the thread
+            // number
+            unsigned short thread_num;
+            boost::asio::read(peersock,
+                boost::asio::buffer(&thread_num, sizeof(thread_num)));
+            if (thread_num >= num_threads) {
+                std::cerr << "Received bad thread number from peer\n";
+            } else {
+                peersocks[thread_num] = std::move(peersock);
+            }
+        }
         serversock = acceptor_p2.accept();
     } else if (player == 1) {
-        // Listen for connections from P2, make a connection to P0
+        // Listen for connections from P2, make num_threads connections to P0
         tcp::acceptor acceptor_p2(io_context,
             tcp::endpoint(tcp::v4(), port_p2_p1));
 
         tcp::resolver resolver(io_context);
         boost::system::error_code err;
-        while(1) {
-            boost::asio::connect(peersock,
-                resolver.resolve(p0addr, std::to_string(port_p1_p0)), err);
-            if (!err) break;
-            std::cerr << "Connection to p0 refused, will retry.\n";
-            sleep(1);
+        peersocks.clear();
+        for (unsigned short thread_num = 0; thread_num < num_threads; ++thread_num) {
+            tcp::socket peersock(io_context);
+            while(1) {
+                boost::asio::connect(peersock,
+                    resolver.resolve(p0addr, std::to_string(port_p1_p0)), err);
+                if (!err) break;
+                std::cerr << "Connection to p0 refused, will retry.\n";
+                sleep(1);
+            }
+            // Write 2 bytes to the socket indicating which thread
+            // number this socket is for
+            boost::asio::write(peersock,
+                boost::asio::buffer(&thread_num, sizeof(thread_num)));
+            peersocks.push_back(std::move(peersock));
         }
         serversock = acceptor_p2.accept();
     } else {

+ 14 - 6
mpcio.hpp

@@ -5,6 +5,7 @@
 #include <fstream>
 #include <tuple>
 #include <vector>
+#include <deque>
 #include <queue>
 #include <string>
 #include <cstdint>
@@ -182,15 +183,21 @@ public:
 
 struct MPCIO {
     int player;
-    MPCSingleIO peerio;
+    // We use a deque here instead of a vector because you can't have a
+    // vector of a type without a copy constructor (tcp::socket is the
+    // culprit), but you can have a deque of those for some reason.
+    std::deque<MPCSingleIO> peerios;
     MPCSingleIO serverio;
     PreCompStorage<MultTriple> triples;
 
     MPCIO(unsigned player, bool preprocessing,
-            tcp::socket &&peersock, tcp::socket &&serversock) :
-        player(player),
-        peerio(std::move(peersock)), serverio(std::move(serversock)),
-        triples(player, preprocessing, "triples") {}
+            std::deque<tcp::socket> &peersocks, tcp::socket &&serversock) :
+        player(player), serverio(std::move(serversock)),
+        triples(player, preprocessing, "triples") {
+        for (auto &&sock : peersocks) {
+            peerios.emplace_back(std::move(sock));
+        }
+    }
 };
 
 // A class to represent all of the server party's IO, either to
@@ -215,7 +222,8 @@ struct MPCServerIO {
 void mpcio_setup_computational(unsigned player,
     boost::asio::io_context &io_context,
     const char *p0addr,  // can be NULL when player=0
-    tcp::socket &peersock, tcp::socket &serversock);
+    int num_threads,
+    std::deque<tcp::socket> &peersocks, tcp::socket &serversock);
 
 // Server calls this version with player=2
 

+ 30 - 11
oblivds.cpp

@@ -1,11 +1,14 @@
 #include <iostream>
+#include <deque>
 
 #include "mpcio.hpp"
 #include "preproc.hpp"
 
 static void usage(const char *progname)
 {
-    std::cerr << "Usage: " << progname << " [-p] player_num player_addrs args ...\n";
+    std::cerr << "Usage: " << progname << " [-p] [-t num] player_num player_addrs args ...\n";
+    std::cerr << "-p: preprocessing mode\n";
+    std::cerr << "-t num: use num threads for the computational players\n";
     std::cerr << "player_num = 0 or 1 for the computational players\n";
     std::cerr << "player_num = 2 for the server player\n";
     std::cerr << "player_addrs is omitted for player 0\n";
@@ -15,13 +18,14 @@ static void usage(const char *progname)
 }
 
 static void comp_player_main(boost::asio::io_context &io_context,
-    unsigned player, bool preprocessing, const char *p0addr, char **args)
+    unsigned player, bool preprocessing, int num_threads, const char *p0addr,
+    char **args)
 {
-    tcp::socket peersock(io_context), serversock(io_context);
-    mpcio_setup_computational(player, io_context,
-        p0addr, peersock, serversock);
-    MPCIO mpcio(player, preprocessing, std::move(peersock),
-        std::move(serversock));
+    tcp::socket serversock(io_context);
+    std::deque<tcp::socket> peersocks;
+    mpcio_setup_computational(player, io_context, p0addr, num_threads,
+        peersocks, serversock);
+    MPCIO mpcio(player, preprocessing, peersocks, std::move(serversock));
 
     // Queue up the work to be done
     boost::asio::post(io_context, [&]{
@@ -64,11 +68,25 @@ int main(int argc, char **argv)
     char **args = argv+1; // Skip argv[0] (the program name)
     bool preprocessing = false;
     unsigned player = 0;
+    int num_threads = 1;
     const char *p0addr = NULL;
     const char *p1addr = NULL;
-    if (argc > 1 && !strcmp("-p", *args)) {
-        preprocessing = true;
-        ++args;
+    // Get the options
+    while (*args && *args[0] == '-') {
+        if (!strcmp("-p", *args)) {
+            preprocessing = true;
+            ++args;
+        } else if (!strcmp("-t", *args)) {
+            if (args[1]) {
+                num_threads = atoi(args[1]);
+                if (num_threads < 1) {
+                    usage(argv[0]);
+                }
+                args += 2;
+            } else {
+                usage(argv[0]);
+            }
+        }
     }
     if (*args == NULL) {
         // No arguments?
@@ -100,6 +118,7 @@ int main(int argc, char **argv)
     /*
     std::cout << "Preprocessing = " <<
             (preprocessing ? "true" : "false") << "\n";
+    std::cout << "Thread count = " << num_threads << "\n";
     std::cout << "Player = " << player << "\n";
     if (p0addr) {
         std::cout << "Player 0 addr = " << p0addr << "\n";
@@ -118,7 +137,7 @@ int main(int argc, char **argv)
     boost::asio::io_context io_context;
 
     if (player < 2) {
-        comp_player_main(io_context, player, preprocessing, p0addr, args);
+        comp_player_main(io_context, player, preprocessing, num_threads, p0addr, args);
     } else {
         server_player_main(io_context, preprocessing, p0addr, p1addr, args);
     }