Browse Source

Parallel StreamEval

The precomputed versions are faster, but AES-expanding a compressed DPF
is slower with multiple threads?
Ian Goldberg 1 year ago
parent
commit
b899b24b6d
1 changed files with 59 additions and 42 deletions
  1. 59 42
      online.cpp

+ 59 - 42
online.cpp

@@ -299,6 +299,40 @@ static void rdpf_timing(MPCIO &mpcio,
     pool.join();
 }
 
+static value_t parallel_streameval_rdpf(MPCIO &mpcio, const RDPF &dpf,
+    address_t start, int num_threads)
+{
+    RegXS scaled_xor[num_threads];
+    boost::asio::thread_pool pool(num_threads);
+    address_t totsize = (address_t(1)<<dpf.depth());
+    address_t threadstart = start;
+    address_t threadchunk = totsize / num_threads;
+    address_t threadextra = totsize % num_threads;
+    for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
+        address_t threadsize = threadchunk + (address_t(thread_num) < threadextra);
+        boost::asio::post(pool,
+            [&mpcio, &dpf, &scaled_xor, thread_num, threadstart, threadsize] {
+                MPCTIO tio(mpcio, thread_num);
+//printf("Thread %d from %X for %X\n", thread_num, threadstart, threadsize);
+                auto ev = StreamEval(dpf, threadstart, 0, tio.aes_ops());
+                for (address_t x=0;x<threadsize;++x) {
+//if (x%0x10000 == 0) printf("%d", thread_num);
+                    DPFnode leaf = ev.next();
+                    RegXS sx = dpf.scaled_xs(leaf);
+                    scaled_xor[thread_num] ^= sx;
+                }
+//printf("Thread %d complete\n", thread_num);
+            });
+        threadstart = (threadstart + threadsize) % totsize;
+    }
+    pool.join();
+    RegXS res;
+    for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
+        res ^= scaled_xor[thread_num];
+    }
+    return res.xshare;
+}
+
 static void rdpfeval_timing(MPCIO &mpcio,
     const PRACOptions &opts, char **args)
 {
@@ -310,52 +344,35 @@ static void rdpfeval_timing(MPCIO &mpcio,
         ++args;
     }
     if (*args) {
-        start = atoi(*args);
+        start = strtoull(*args, NULL, 16);
         ++args;
     }
 
     int num_threads = opts.num_threads;
-    boost::asio::thread_pool pool(num_threads);
-    for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
-        boost::asio::post(pool, [&mpcio, thread_num, depth, start] {
-            MPCTIO tio(mpcio, thread_num);
-            run_coroutines(tio, [&tio, depth, start] (yield_t &yield) {
-                size_t &aes_ops = tio.aes_ops();
-                if (tio.player() == 2) {
-                    RDPFPair dp = tio.rdpfpair(yield, depth);
-                    for (int i=0;i<2;++i) {
-                        RDPF &dpf = dp.dpf[i];
-                        RegXS scaled_xor;
-                        auto ev = StreamEval(dpf, start, 0, aes_ops, false);
-                        for (address_t x=0;x<(address_t(1)<<depth);++x) {
-                            DPFnode leaf = ev.next();
-                            RegXS sx = dpf.scaled_xs(leaf);
-                            scaled_xor ^= sx;
-                        }
-                        printf("%016lx\n%016lx\n", scaled_xor.xshare,
-                            dpf.scaled_xor.xshare);
-                        printf("\n");
-                    }
-                } else {
-                    RDPFTriple dt = tio.rdpftriple(yield, depth);
-                    for (int i=0;i<3;++i) {
-                        RDPF &dpf = dt.dpf[i];
-                        RegXS scaled_xor;
-                        auto ev = StreamEval(dpf, start, 0, aes_ops, false);
-                        for (address_t x=0;x<(address_t(1)<<depth);++x) {
-                            DPFnode leaf = ev.next();
-                            RegXS sx = dpf.scaled_xs(leaf);
-                            scaled_xor ^= sx;
-                        }
-                        printf("%016lx\n%016lx\n", scaled_xor.xshare,
-                            dpf.scaled_xor.xshare);
-                        printf("\n");
-                    }
-                }
-            });
-        });
-    }
-    pool.join();
+    MPCTIO tio(mpcio, 0);
+    run_coroutines(tio, [&mpcio, &tio, depth, start, num_threads] (yield_t &yield) {
+        if (tio.player() == 2) {
+            RDPFPair dp = tio.rdpfpair(yield, depth);
+            for (int i=0;i<2;++i) {
+                RDPF &dpf = dp.dpf[i];
+                value_t scaled_xor =
+                    parallel_streameval_rdpf(mpcio, dpf, start, num_threads);
+                printf("%016lx\n%016lx\n", scaled_xor,
+                    dpf.scaled_xor.xshare);
+                printf("\n");
+            }
+        } else {
+            RDPFTriple dt = tio.rdpftriple(yield, depth);
+            for (int i=0;i<3;++i) {
+                RDPF &dpf = dt.dpf[i];
+                value_t scaled_xor =
+                    parallel_streameval_rdpf(mpcio, dpf, start, num_threads);
+                printf("%016lx\n%016lx\n", scaled_xor,
+                    dpf.scaled_xor.xshare);
+                printf("\n");
+            }
+        }
+    });
 }
 
 static void tupleeval_timing(MPCIO &mpcio,