Browse Source

Add a timing test for DPF evalution

Useful for comparing time to expand a DPF vs time to load it from disk.

At least on this dev machine, loading it from disk is much faster.

Also add better option parsing, and add a "-c" preprocessing option
which says to store the DPFs compressed (the default is now to store
them expanded).
Ian Goldberg 1 year ago
parent
commit
11c05623ec
7 changed files with 126 additions and 41 deletions
  1. 3 5
      mpcio.cpp
  2. 66 7
      online.cpp
  3. 2 1
      online.hpp
  4. 13 0
      options.hpp
  5. 25 21
      prac.cpp
  6. 14 5
      preproc.cpp
  7. 3 2
      preproc.hpp

+ 3 - 5
mpcio.cpp

@@ -25,11 +25,9 @@ void PreCompStorage<T,N>::init(unsigned player, bool preprocessing,
     }
     filename.append(suffix);
     storage.open(filename);
-    // It's OK if files for not every depth exist
-    if (!depth && storage.fail()) {
-        std::cerr << "Failed to open " << filename << "\n";
-        exit(1);
-    }
+    // It's OK if not every file exists; so don't worry about checking
+    // for errors here.  We'll report an error in get() if we actually
+    // try to use a value for which we don't have a precomputed file.
     count = 0;
 }
 

+ 66 - 7
online.cpp

@@ -5,7 +5,7 @@
 #include "rdpf.hpp"
 
 
-static void online_test(MPCIO &mpcio, int num_threads, char **args)
+static void online_test(MPCIO &mpcio, const PRACOptions &opts, char **args)
 {
     nbits_t nbits = VALUE_BITS;
 
@@ -106,7 +106,7 @@ static void online_test(MPCIO &mpcio, int num_threads, char **args)
     delete[] A;
 }
 
-static void lamport_test(MPCIO &mpcio, int num_threads, char **args)
+static void lamport_test(MPCIO &mpcio, const PRACOptions &opts, char **args)
 {
     // Create a bunch of threads and send a bunch of data to the other
     // peer, and receive their data.  If an arg is specified, repeat
@@ -129,6 +129,7 @@ static void lamport_test(MPCIO &mpcio, int num_threads, char **args)
         ++args;
     }
 
+    int num_threads = opts.num_threads;
     boost::asio::thread_pool pool(num_threads);
     for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
         boost::asio::post(pool, [&mpcio, thread_num, niters, chunksize, numchunks] {
@@ -150,7 +151,7 @@ static void lamport_test(MPCIO &mpcio, int num_threads, char **args)
     pool.join();
 }
 
-static void rdpf_test(MPCIO &mpcio, int num_threads, char **args)
+static void rdpf_test(MPCIO &mpcio, const PRACOptions &opts, char **args)
 {
     nbits_t depth=6;
 
@@ -159,6 +160,7 @@ static void rdpf_test(MPCIO &mpcio, int num_threads, char **args)
         ++args;
     }
 
+    int num_threads = opts.num_threads;
     boost::asio::thread_pool pool(num_threads);
     for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
         boost::asio::post(pool, [&mpcio, thread_num, depth] {
@@ -230,20 +232,77 @@ static void rdpf_test(MPCIO &mpcio, int num_threads, char **args)
     pool.join();
 }
 
-void online_main(MPCIO &mpcio, int num_threads, char **args)
+static void rdpf_timing(MPCIO &mpcio, const PRACOptions &opts, char **args)
+{
+    nbits_t depth=6;
+
+    if (*args) {
+        depth = atoi(*args);
+        ++args;
+    }
+
+    int num_threads = opts.num_threads;
+    boost::asio::thread_pool pool(num_threads);
+    for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
+        boost::asio::post(pool, [&mpcio, thread_num, depth] {
+            MPCTIO tio(mpcio, thread_num);
+            size_t &op_counter = tio.aes_ops();
+            if (mpcio.player == 2) {
+                RDPFPair dp = tio.rdpfpair(depth);
+                for (int i=0;i<2;++i) {
+                    RDPF &dpf = dp.dpf[i];
+                    dpf.expand(op_counter);
+                    RegXS scaled_xor;
+                    scaled_xor.xshare = 0;
+                    for (address_t x=0;x<(address_t(1)<<depth);++x) {
+                        DPFnode leaf = dpf.leaf(x, op_counter);
+                        RegXS sx = dpf.scaled_xs(leaf);
+                        scaled_xor ^= sx;
+                    }
+                    printf("%016lx\n%016lx\n", scaled_xor.xshare,
+                        dpf.scaled_xor.xshare);
+                    printf("\n");
+                }
+            } else {
+                RDPFTriple dt = tio.rdpftriple(depth);
+                for (int i=0;i<3;++i) {
+                    RDPF &dpf = dt.dpf[i];
+                    dpf.expand(op_counter);
+                    RegXS scaled_xor;
+                    scaled_xor.xshare = 0;
+                    for (address_t x=0;x<(address_t(1)<<depth);++x) {
+                        DPFnode leaf = dpf.leaf(x, op_counter);
+                        RegXS sx = dpf.scaled_xs(leaf);
+                        scaled_xor ^= sx;
+                    }
+                    printf("%016lx\n%016lx\n", scaled_xor.xshare,
+                        dpf.scaled_xor.xshare);
+                    printf("\n");
+                }
+            }
+            tio.send();
+        });
+    }
+    pool.join();
+}
+
+void online_main(MPCIO &mpcio, const PRACOptions &opts, char **args)
 {
     if (!*args) {
         std::cerr << "Mode is required as the first argument when not preprocessing.\n";
         return;
     } else if (!strcmp(*args, "test")) {
         ++args;
-        online_test(mpcio, num_threads, args);
+        online_test(mpcio, opts, args);
     } else if (!strcmp(*args, "lamporttest")) {
         ++args;
-        lamport_test(mpcio, num_threads, args);
+        lamport_test(mpcio, opts, args);
     } else if (!strcmp(*args, "rdpftest")) {
         ++args;
-        rdpf_test(mpcio, num_threads, args);
+        rdpf_test(mpcio, opts, args);
+    } else if (!strcmp(*args, "rdpftime")) {
+        ++args;
+        rdpf_timing(mpcio, opts, args);
     } else {
         std::cerr << "Unknown mode " << *args << "\n";
     }

+ 2 - 1
online.hpp

@@ -2,7 +2,8 @@
 #define __ONLINE_HPP__
 
 #include "mpcio.hpp"
+#include "options.hpp"
 
-void online_main(MPCIO &mpcio, int num_threads, char **args);
+void online_main(MPCIO &mpcio, const PRACOptions &opts, char **args);
 
 #endif

+ 13 - 0
options.hpp

@@ -0,0 +1,13 @@
+#ifndef __OPTIONS_HPP__
+#define __OPTIONS_HPP__
+
+struct PRACOptions {
+    bool preprocessing;
+    int num_threads;
+    bool expand_rdpfs;
+
+    PRACOptions() : preprocessing(false), num_threads(1),
+        expand_rdpfs(true) {}
+};
+
+#endif

+ 25 - 21
prac.cpp

@@ -4,12 +4,14 @@
 #include "mpcio.hpp"
 #include "preproc.hpp"
 #include "online.hpp"
+#include "options.hpp"
 
 static void usage(const char *progname)
 {
     std::cerr << "Usage: " << progname << " [-p] [-t num] player_num player_addrs args ...\n";
     std::cerr << "-p: preprocessing mode\n";
     std::cerr << "-t num: use num threads\n";
+    std::cerr << "-c: store DPFs compressed (default is expanded)\n";
     std::cerr << "player_num = 0 or 1 for the computational players\n";
     std::cerr << "player_num = 2 for the server player\n";
     std::cerr << "player_addrs is omitted for player 0\n";
@@ -19,20 +21,20 @@ static void usage(const char *progname)
 }
 
 static void comp_player_main(boost::asio::io_context &io_context,
-    unsigned player, bool preprocessing, int num_threads, const char *p0addr,
+    unsigned player, const PRACOptions &opts, const char *p0addr,
     char **args)
 {
     std::deque<tcp::socket> peersocks, serversocks;
-    mpcio_setup_computational(player, io_context, p0addr, num_threads,
-        peersocks, serversocks);
-    MPCPeerIO mpcio(player, preprocessing, peersocks, serversocks);
+    mpcio_setup_computational(player, io_context, p0addr,
+        opts.num_threads, peersocks, serversocks);
+    MPCPeerIO mpcio(player, opts.preprocessing, peersocks, serversocks);
 
     // Queue up the work to be done
     boost::asio::post(io_context, [&]{
-        if (preprocessing) {
-            preprocessing_comp(mpcio, num_threads, args);
+        if (opts.preprocessing) {
+            preprocessing_comp(mpcio, opts, args);
         } else {
-            online_main(mpcio, num_threads, args);
+            online_main(mpcio, opts, args);
         }
     });
 
@@ -45,20 +47,20 @@ static void comp_player_main(boost::asio::io_context &io_context,
 }
 
 static void server_player_main(boost::asio::io_context &io_context,
-    bool preprocessing, int num_threads, const char *p0addr,
+    const PRACOptions &opts, const char *p0addr,
     const char *p1addr, char **args)
 {
     std::deque<tcp::socket> p0socks, p1socks;
-    mpcio_setup_server(io_context, p0addr, p1addr, num_threads,
-        p0socks, p1socks);
-    MPCServerIO mpcserverio(preprocessing, p0socks, p1socks);
+    mpcio_setup_server(io_context, p0addr, p1addr,
+        opts.num_threads, p0socks, p1socks);
+    MPCServerIO mpcserverio(opts.preprocessing, p0socks, p1socks);
 
     // Queue up the work to be done
     boost::asio::post(io_context, [&]{
-        if (preprocessing) {
-            preprocessing_server(mpcserverio, num_threads, args);
+        if (opts.preprocessing) {
+            preprocessing_server(mpcserverio, opts, args);
         } else {
-            online_main(mpcserverio, num_threads, args);
+            online_main(mpcserverio, opts, args);
         }
     });
 
@@ -73,26 +75,28 @@ static void server_player_main(boost::asio::io_context &io_context,
 int main(int argc, char **argv)
 {
     char **args = argv+1; // Skip argv[0] (the program name)
-    bool preprocessing = false;
+    PRACOptions opts;
     unsigned player = 0;
-    int num_threads = 1;
     const char *p0addr = NULL;
     const char *p1addr = NULL;
     // Get the options
     while (*args && *args[0] == '-') {
         if (!strcmp("-p", *args)) {
-            preprocessing = true;
+            opts.preprocessing = true;
             ++args;
         } else if (!strcmp("-t", *args)) {
             if (args[1]) {
-                num_threads = atoi(args[1]);
-                if (num_threads < 1) {
+                opts.num_threads = atoi(args[1]);
+                if (opts.num_threads < 1) {
                     usage(argv[0]);
                 }
                 args += 2;
             } else {
                 usage(argv[0]);
             }
+        } else if (!strcmp("-c", *args)) {
+            opts.expand_rdpfs = false;
+            ++args;
         }
     }
     if (*args == NULL) {
@@ -144,9 +148,9 @@ int main(int argc, char **argv)
     boost::asio::io_context io_context;
 
     if (player < 2) {
-        comp_player_main(io_context, player, preprocessing, num_threads, p0addr, args);
+        comp_player_main(io_context, player, opts, p0addr, args);
     } else {
-        server_player_main(io_context, preprocessing, num_threads, p0addr, p1addr, args);
+        server_player_main(io_context, opts, p0addr, p1addr, args);
     }
 
     return 0;

+ 14 - 5
preproc.cpp

@@ -74,11 +74,12 @@ void Openfiles::closeall()
 //
 // Repeat the whole thing until type == 0x00 is received
 
-void preprocessing_comp(MPCIO &mpcio, int num_threads, char **args)
+void preprocessing_comp(MPCIO &mpcio, const PRACOptions &opts, char **args)
 {
+    int num_threads = opts.num_threads;
     boost::asio::thread_pool pool(num_threads);
     for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
-        boost::asio::post(pool, [&mpcio, thread_num] {
+        boost::asio::post(pool, [&mpcio, &opts, thread_num] {
             MPCTIO tio(mpcio, thread_num);
             Openfiles ofiles;
             std::vector<coro_t> coroutines;
@@ -115,7 +116,8 @@ void preprocessing_comp(MPCIO &mpcio, int num_threads, char **args)
                     for (unsigned int i=0; i<num; ++i) {
                         coroutines.emplace_back(
                             [&, tripfile, type](yield_t &yield) {
-                                RDPFTriple rdpftrip(tio, yield, type);
+                                RDPFTriple rdpftrip(tio, yield, type,
+                                    opts.expand_rdpfs);
                                 printf("usi0 = %016lx\n", rdpftrip.dpf[0].unit_sum_inverse);
                                 printf("sxr0 = %016lx\n", rdpftrip.dpf[0].scaled_xor.xshare);
                                 printf("usi1 = %016lx\n", rdpftrip.dpf[1].unit_sum_inverse);
@@ -136,11 +138,12 @@ void preprocessing_comp(MPCIO &mpcio, int num_threads, char **args)
     pool.join();
 }
 
-void preprocessing_server(MPCServerIO &mpcsrvio, int num_threads, char **args)
+void preprocessing_server(MPCServerIO &mpcsrvio, const PRACOptions &opts, char **args)
 {
+    int num_threads = opts.num_threads;
     boost::asio::thread_pool pool(num_threads);
     for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
-        boost::asio::post(pool, [&mpcsrvio, thread_num, args] {
+        boost::asio::post(pool, [&mpcsrvio, &opts, thread_num, args] {
             char **threadargs = args;
             MPCTIO stio(mpcsrvio, thread_num);
             Openfiles ofiles;
@@ -220,6 +223,12 @@ void preprocessing_server(MPCServerIO &mpcsrvio, int num_threads, char **args)
                                 printf("usi1 = %016lx\n", rdpfpair.dpf[1].unit_sum_inverse);
                                 printf("sxr1 = %016lx\n", rdpfpair.dpf[1].scaled_xor.xshare);
                                 printf("dep1 = %d\n", rdpfpair.dpf[1].depth());
+                                    if (opts.expand_rdpfs) {
+                                        printf("Expanding\n");
+                                        rdpfpair.dpf[0].expand(stio.aes_ops());
+                                        rdpfpair.dpf[1].expand(stio.aes_ops());
+                                        printf("Expanded\n");
+                                    }
                                     pairfile.os() << rdpfpair;
                                 });
                         }

+ 3 - 2
preproc.hpp

@@ -2,8 +2,9 @@
 #define __PREPROC_HPP__
 
 #include "mpcio.hpp"
+#include "options.hpp"
 
-void preprocessing_comp(MPCIO &mpcio, int num_threads, char **args);
-void preprocessing_server(MPCServerIO &mpcio, int num_threads, char **args);
+void preprocessing_comp(MPCIO &mpcio, const PRACOptions &opts, char **args);
+void preprocessing_server(MPCServerIO &mpcio, const PRACOptions &opts, char **args);
 
 #endif