Browse Source

Parallel RDPF evaluator

Also works for RDPFPair and RDPFTriple
Ian Goldberg 2 years ago
parent
commit
94fcf8f8b0
4 changed files with 250 additions and 1 deletions
  1. 132 0
      online.cpp
  2. 1 1
      rdpf.cpp
  3. 54 0
      rdpf.hpp
  4. 63 0
      rdpf.tcc

+ 132 - 0
online.cpp

@@ -378,6 +378,68 @@ static void rdpfeval_timing(MPCIO &mpcio,
     });
 }
 
+static void par_rdpfeval_timing(MPCIO &mpcio,
+    const PRACOptions &opts, char **args)
+{
+    nbits_t depth=6;
+    address_t start=0;
+
+    if (*args) {
+        depth = atoi(*args);
+        ++args;
+    }
+    if (*args) {
+        start = strtoull(*args, NULL, 16);
+        ++args;
+    }
+
+    int num_threads = opts.num_threads;
+    MPCTIO tio(mpcio, 0, num_threads);
+    run_coroutines(tio, [&tio, depth, start, num_threads] (yield_t &yield) {
+        if (tio.player() == 2) {
+            RDPFPair dp = tio.rdpfpair(yield, depth);
+            for (int i=0;i<2;++i) {
+                RDPF &dpf = dp.dpf[i];
+                nbits_t depth = dpf.depth();
+                auto pe = ParallelEval(dpf, start, 0,
+                    address_t(1)<<depth, num_threads, tio.aes_ops());
+                RegXS result, init;
+                result = pe.reduce(init, [&dpf] (const ParallelEval<RDPF> &pe,
+                    int thread_num, address_t i, const RDPF::node &leaf) {
+                    return dpf.scaled_xs(leaf);
+                },
+                [] (const ParallelEval<RDPF> &pe, RegXS &accum,
+                    const RegXS &value) {
+                    accum ^= value;
+                });
+                printf("%016lx\n%016lx\n", result.xshare,
+                    dpf.scaled_xor.xshare);
+                printf("\n");
+            }
+        } else {
+            RDPFTriple dt = tio.rdpftriple(yield, depth);
+            for (int i=0;i<3;++i) {
+                RDPF &dpf = dt.dpf[i];
+                nbits_t depth = dpf.depth();
+                auto pe = ParallelEval(dpf, start, 0,
+                    address_t(1)<<depth, num_threads, tio.aes_ops());
+                RegXS result, init;
+                result = pe.reduce(init, [&dpf] (const ParallelEval<RDPF> &pe,
+                    int thread_num, address_t i, const RDPF::node &leaf) {
+                    return dpf.scaled_xs(leaf);
+                },
+                [] (const ParallelEval<RDPF> &pe, RegXS &accum,
+                    const RegXS &value) {
+                    accum ^= value;
+                });
+                printf("%016lx\n%016lx\n", result.xshare,
+                    dpf.scaled_xor.xshare);
+                printf("\n");
+            }
+        }
+    });
+}
+
 static void tupleeval_timing(MPCIO &mpcio,
     const PRACOptions &opts, char **args)
 {
@@ -440,6 +502,70 @@ static void tupleeval_timing(MPCIO &mpcio,
     });
 }
 
+static void par_tupleeval_timing(MPCIO &mpcio,
+    const PRACOptions &opts, char **args)
+{
+    nbits_t depth=6;
+    address_t start=0;
+
+    if (*args) {
+        depth = atoi(*args);
+        ++args;
+    }
+    if (*args) {
+        start = atoi(*args);
+        ++args;
+    }
+
+    int num_threads = opts.num_threads;
+    MPCTIO tio(mpcio, 0, num_threads);
+    run_coroutines(tio, [&tio, depth, start, num_threads] (yield_t &yield) {
+        size_t &aes_ops = tio.aes_ops();
+        if (tio.player() == 2) {
+            RDPFPair dp = tio.rdpfpair(yield, depth);
+            auto pe = ParallelEval(dp, start, 0, address_t(1)<<depth,
+                num_threads, aes_ops);
+            using V = std::tuple<RegXS,RegXS>;
+            V result, init;
+            result = pe.reduce(init, [&dp] (const ParallelEval<RDPFPair> &pe,
+                int thread_num, address_t i, const RDPFPair::node &leaf) {
+                return dp.scaled<RegXS>(leaf);
+            },
+            [] (const ParallelEval<RDPFPair> &pe, V &accum, const V &value) {
+                accum += value;
+            });
+            printf("%016lx\n%016lx\n", std::get<0>(result).xshare,
+                dp.dpf[0].scaled_xor.xshare);
+            printf("\n");
+            printf("%016lx\n%016lx\n", std::get<1>(result).xshare,
+                dp.dpf[1].scaled_xor.xshare);
+            printf("\n");
+        } else {
+            RDPFTriple dt = tio.rdpftriple(yield, depth);
+            auto pe = ParallelEval(dt, start, 0, address_t(1)<<depth,
+                num_threads, aes_ops);
+            using V = std::tuple<RegXS,RegXS,RegXS>;
+            V result, init;
+            result = pe.reduce(init, [&dt] (const ParallelEval<RDPFTriple> &pe,
+                int thread_num, address_t i, const RDPFTriple::node &leaf) {
+                return dt.scaled<RegXS>(leaf);
+            },
+            [] (const ParallelEval<RDPFTriple> &pe, V &accum, const V &value) {
+                accum += value;
+            });
+            printf("%016lx\n%016lx\n", std::get<0>(result).xshare,
+                dt.dpf[0].scaled_xor.xshare);
+            printf("\n");
+            printf("%016lx\n%016lx\n", std::get<1>(result).xshare,
+                dt.dpf[1].scaled_xor.xshare);
+            printf("\n");
+            printf("%016lx\n%016lx\n", std::get<2>(result).xshare,
+                dt.dpf[2].scaled_xor.xshare);
+            printf("\n");
+        }
+    });
+}
+
 // T is RegAS or RegXS for additive or XOR shared database respectively
 template <typename T>
 static void duoram_test(MPCIO &mpcio,
@@ -866,9 +992,15 @@ void online_main(MPCIO &mpcio, const PRACOptions &opts, char **args)
     } else if (!strcmp(*args, "evaltime")) {
         ++args;
         rdpfeval_timing(mpcio, opts, args);
+    } else if (!strcmp(*args, "parevaltime")) {
+        ++args;
+        par_rdpfeval_timing(mpcio, opts, args);
     } else if (!strcmp(*args, "tupletime")) {
         ++args;
         tupleeval_timing(mpcio, opts, args);
+    } else if (!strcmp(*args, "partupletime")) {
+        ++args;
+        par_tupleeval_timing(mpcio, opts, args);
     } else if (!strcmp(*args, "duotest")) {
         ++args;
         if (opts.use_xor_db) {

+ 1 - 1
rdpf.cpp

@@ -264,7 +264,7 @@ DPFnode RDPF::leaf(address_t input, size_t &aes_ops) const
 // Expand the DPF if it's not already expanded
 //
 // This routine is slightly more efficient than repeatedly calling
-// Eval::next(), but it uses a lot more memory.
+// StreamEval::next(), but it uses a lot more memory.
 void RDPF::expand(size_t &aes_ops)
 {
     nbits_t depth = this->depth();

+ 54 - 0
rdpf.hpp

@@ -249,6 +249,60 @@ public:
     typename T::node next();
 };
 
+// Parallel evaluation.  This class launches a number of threads each
+// running a StreamEval to evaluate a chunk of the RDPF (or RDPFPair or
+// RDPFTriple), and accumulates the results within each chunk, and then
+// accumulates all the chunks together.  T can be RDPF, RDPFPair, or
+// RDPFTriple.
+template <typename T>
+struct ParallelEval {
+    const T &rdpf;
+    address_t start;
+    address_t xor_offset;
+    address_t num_evals;
+    int num_threads;
+    size_t &aes_ops;
+
+    // Create a Parallel evaluator that will evaluate the given rdpf at
+    // DPF(start XOR xor_offset)
+    // DPF((start+1) XOR xor_offset)
+    // DPF((start+2) XOR xor_offset)
+    // ...
+    // DPF((start+num_evals-1) XOR xor_offset)
+    // where all indices are taken mod 2^depth, and accumulate the
+    // results into a single answer.
+    ParallelEval(const T &rdpf, address_t start,
+        address_t xor_offset, address_t num_evals,
+        int num_threads, size_t &aes_ops) :
+        rdpf(rdpf), start(start), xor_offset(xor_offset),
+        num_evals(num_evals), num_threads(num_threads),
+        aes_ops(aes_ops) {}
+
+    // Run the parallel evaluator.  The type V is the type of the
+    // accumulator; init should be the "zero" value of the accumulator.
+    // The type W (process) is a lambda type with the signature
+    // (const ParallelEval &, int, address_t, const T::node &) -> V
+    // which will be called like this for each i from 0 to num_evals-1,
+    // across num_thread threads:
+    // value_i = process(*this, t, i, DPF((start+i) XOR xor_offset))
+    // t is the thread number (0 <= t < num_threads).
+    // The type X (accumulate) is a lambda type with the signature
+    // (const ParallelEval &, V &, const V &)
+    // which will be called to combine the num_evals values of accum,
+    // first accumulating the values within each thread (starting with
+    // the init value), and then accumulating the totals from each
+    // thread together (again starting with the init value):
+    //
+    // total = init
+    // for each thread t:
+    //     accum_t = init
+    //     for each accum_i generated by thread t:
+    //         accumulate(*this, acccum_t, value_i)
+    //     accumulate(*this, total, accum_t)
+    template <typename V, typename W, typename X>
+    inline V reduce(V init, W process, X accumulate);
+};
+
 #include "rdpf.tcc"
 
 #endif

+ 63 - 0
rdpf.tcc

@@ -102,6 +102,69 @@ typename T::node StreamEval<T>::next()
     return leaf;
 }
 
+// Run the parallel evaluator.  The type V is the type of the
+// accumulator; init should be the "zero" value of the accumulator.
+// The type W (process) is a lambda type with the signature
+// (const ParallelEval &, int, address_t, T::node) -> V
+// which will be called like this for each i from 0 to num_evals-1,
+// across num_thread threads:
+// value_i = process(*this, t, i, DPF((start+i) XOR xor_offset))
+// t is the thread number (0 <= t < num_threads).
+// The type X (accumulate) is a lambda type with the signature
+// (const ParallelEval &, V &, const V &)
+// which will be called to combine the num_evals values of accum,
+// first accumulating the values within each thread (starting with
+// the init value), and then accumulating the totals from each
+// thread together (again starting with the init value):
+//
+// total = init
+// for each thread t:
+//     accum_t = init
+//     for each accum_i generated by thread t:
+//         accumulate(*this, acccum_t, value_i)
+//     accumulate(*this, total, accum_t)
+template <typename T> template <typename V, typename W, typename X>
+inline V ParallelEval<T>::reduce(V init, W process, X accumulate)
+{
+    size_t thread_aes_ops[num_threads];
+    V accums[num_threads];
+    boost::asio::thread_pool pool(num_threads);
+    address_t threadstart = start;
+    address_t threadchunk = num_evals / num_threads;
+    address_t threadextra = num_evals % num_threads;
+    nbits_t depth = rdpf.depth();
+    address_t indexmask = (depth < ADDRESS_MAX_BITS ?
+        ((address_t(1)<<depth)-1) : ~0);
+    for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
+        address_t threadsize = threadchunk + (address_t(thread_num) < threadextra);
+        boost::asio::post(pool,
+            [this, &init, &thread_aes_ops, &accums, &process,
+                    &accumulate, thread_num, threadstart, threadsize,
+                    indexmask] {
+                size_t local_aes_ops = 0;
+                auto ev = StreamEval(rdpf, (start+threadstart)&indexmask,
+                    xor_offset, local_aes_ops);
+                V accum = init;
+                for (address_t x=0;x<threadsize;++x) {
+                    typename T::node leaf = ev.next();
+                    V value = process(*this, thread_num,
+                        (threadstart+x)&indexmask, leaf);
+                    accumulate(*this, accum, value);
+                }
+                accums[thread_num] = accum;
+                thread_aes_ops[thread_num] = local_aes_ops;
+            });
+        threadstart = (threadstart + threadsize) & indexmask;
+    }
+    pool.join();
+    V total = init;
+    for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
+        accumulate(*this, total, accums[thread_num]);
+        aes_ops += thread_aes_ops[thread_num];
+    }
+    return total;
+}
+
 // Additive share of the scaling value M_as such that the high words
 // of the leaf values for P0 and P1 add to M_as * e_{target}
 template <>