Преглед на файлове

Duoram reads and updates are now multithreaded

Ian Goldberg преди 1 година
родител
ревизия
b6abfb1b85
променени са 3 файла, в които са добавени 113 реда и са изтрити 91 реда
  1. 51 29
      duoram.tcc
  2. 7 0
      mpcio.hpp
  3. 55 62
      online.cpp

+ 51 - 29
duoram.tcc

@@ -326,17 +326,22 @@ Duoram<T>::Shape::MemRefS<U>::operator T()
         auto indshift = combine(indoffset, peerindoffset, shape.addr_size);
 
         // Evaluate the DPFs and compute the dotproducts
-        StreamEval ev(dp, IfRegAS<U>(indshift), IfRegXS<U>(indshift),
+        ParallelEval pe(dp, IfRegAS<U>(indshift), IfRegXS<U>(indshift),
+            shape.shape_size, shape.tio.cpu_nthreads(),
             shape.tio.aes_ops());
-        for (size_t i=0; i<shape.shape_size; ++i) {
-            auto L = ev.next();
+        T init;
+        res = pe.reduce(init, [&dp, &shape] (const ParallelEval<RDPFPair> &pe,
+            int thread_num, address_t i, const RDPFPair::node &leaf) {
             // The values from the two DPFs
-            auto [V0, V1] = dp.unit<T>(L);
+            auto [V0, V1] = dp.unit<T>(leaf);
             // References to the appropriate cells in our database, our
             // blind, and our copy of the peer's blinded database
             auto [DB, BL, PBD] = shape.get_comp(i);
-            res += (DB + PBD) * V0.share() - BL * (V1-V0).share();
-        }
+            return (DB + PBD) * V0.share() - BL * (V1-V0).share();
+        },
+        [] (const ParallelEval<RDPFPair> &pe, T &accum, const T &value) {
+            accum += value;
+        });
 
         shape.yield();
 
@@ -359,32 +364,35 @@ Duoram<T>::Shape::MemRefS<U>::operator T()
         auto indshift = combine(p0indoffset, p1indoffset, shape.addr_size);
 
         // Evaluate the DPFs to compute the cancellation terms
-        T gamma0, gamma1;
-        StreamEval ev(dp, IfRegAS<U>(indshift), IfRegXS<U>(indshift),
+        std::tuple<T,T> init, gamma;
+        ParallelEval pe(dp, IfRegAS<U>(indshift), IfRegXS<U>(indshift),
+            shape.shape_size, shape.tio.cpu_nthreads(),
             shape.tio.aes_ops());
-        for (size_t i=0; i<shape.shape_size; ++i) {
-            auto L = ev.next();
-
+        gamma = pe.reduce(init, [&dp, &shape] (const ParallelEval<RDPFPair> &pe,
+            int thread_num, address_t i, const RDPFPair::node &leaf) {
             // The values from the two DPFs
-            auto [V0, V1] = dp.unit<T>(L);
+            auto [V0, V1] = dp.unit<T>(leaf);
 
             // shape.get_server(i) returns a pair of references to the
             // appropriate cells in the two blinded databases
             auto [BL0, BL1] = shape.get_server(i);
-            gamma0 -= BL0 * V1.share();
-            gamma1 -= BL1 * V0.share();
-        }
+            return std::make_tuple(-BL0 * V1.share(), -BL1 * V0.share());
+        },
+        [] (const ParallelEval<RDPFPair> &pe, std::tuple<T,T> &accum,
+            const std::tuple<T,T> &value) {
+            accum += value;
+        });
 
         // Choose a random blinding factor
         T rho;
         rho.randomize();
 
-        gamma0 += rho;
-        gamma1 -= rho;
+        std::get<0>(gamma) += rho;
+        std::get<1>(gamma) -= rho;
 
         // Send the cancellation terms to the computational players
-        shape.tio.iostream_p0() << gamma0;
-        shape.tio.iostream_p1() << gamma1;
+        shape.tio.iostream_p0() << std::get<0>(gamma);
+        shape.tio.iostream_p1() << std::get<1>(gamma);
 
         shape.yield();
     }
@@ -431,12 +439,15 @@ typename Duoram<T>::Shape::MemRefS<U>
         auto Mshift = combine(Moffset, peerMoffset);
 
         // Evaluate the DPFs and add them to the database
-        StreamEval ev(dt, IfRegAS<U>(indshift), IfRegXS<U>(indshift),
+        ParallelEval pe(dt, IfRegAS<U>(indshift), IfRegXS<U>(indshift),
+            shape.shape_size, shape.tio.cpu_nthreads(),
             shape.tio.aes_ops());
-        for (size_t i=0; i<shape.shape_size; ++i) {
-            auto L = ev.next();
+        int init = 0;
+        pe.reduce(init, [&dt, &shape, &Mshift, player]
+            (const ParallelEval<RDPFTriple> &pe, int thread_num,
+            address_t i, const RDPFTriple::node &leaf) {
             // The values from the three DPFs
-            auto [V0, V1, V2] = dt.scaled<T>(L) + dt.unit<T>(L) * Mshift;
+            auto [V0, V1, V2] = dt.scaled<T>(leaf) + dt.unit<T>(leaf) * Mshift;
             // References to the appropriate cells in our database, our
             // blind, and our copy of the peer's blinded database
             auto [DB, BL, PBD] = shape.get_comp(i);
@@ -448,7 +459,11 @@ typename Duoram<T>::Shape::MemRefS<U>
                 BL -= V2;
                 PBD += V1-V0;
             }
-        }
+            return 0;
+        },
+        // We don't need to return a value
+        [] (const ParallelEval<RDPFTriple> &pe, int &accum, int value) {
+        });
     } else {
         // The server does this
 
@@ -466,17 +481,24 @@ typename Duoram<T>::Shape::MemRefS<U>
         auto Mshift = combine(p0Moffset, p1Moffset);
 
         // Evaluate the DPFs and subtract them from the blinds
-        StreamEval ev(dp, IfRegAS<U>(indshift), IfRegXS<U>(indshift),
+        ParallelEval pe(dp, IfRegAS<U>(indshift), IfRegXS<U>(indshift),
+            shape.shape_size, shape.tio.cpu_nthreads(),
             shape.tio.aes_ops());
-        for (size_t i=0; i<shape.shape_size; ++i) {
-            auto L = ev.next();
+        int init = 0;
+        pe.reduce(init, [&dp, &shape, &Mshift]
+            (const ParallelEval<RDPFPair> &pe, int thread_num,
+            address_t i, const RDPFPair::node &leaf) {
             // The values from the two DPFs
-            auto V = dp.scaled<T>(L) + dp.unit<T>(L) * Mshift;
+            auto V = dp.scaled<T>(leaf) + dp.unit<T>(leaf) * Mshift;
             // shape.get_server(i) returns a pair of references to the
             // appropriate cells in the two blinded databases, so we can
             // subtract the pair directly.
             shape.get_server(i) -= V;
-        }
+            return 0;
+        },
+        // We don't need to return a value
+        [] (const ParallelEval<RDPFPair> &pe, int &accum, int value) {
+        });
     }
     return *this;
 }

+ 7 - 0
mpcio.hpp

@@ -381,6 +381,13 @@ public:
     inline bool is_server() { return mpcio.player == 2; }
     inline size_t& aes_ops() { return mpcio.aes_ops[thread_num]; }
     inline size_t msgs_sent() { return mpcio.msgs_sent[thread_num]; }
+    inline int cpu_nthreads(int nthreads=0) {
+        int res = local_cpu_nthreads;
+        if (nthreads > 0) {
+            local_cpu_nthreads = nthreads;
+        }
+        return res;
+    }
     inline int comm_nthreads(int nthreads=0) {
         int res = communication_nthreads;
         if (nthreads > 0) {

+ 55 - 62
online.cpp

@@ -584,69 +584,62 @@ static void duoram_test(MPCIO &mpcio,
     }
     share &= ((address_t(1)<<depth)-1);
 
-    int num_threads = opts.num_threads;
-    boost::asio::thread_pool pool(num_threads);
-    for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
-        boost::asio::post(pool, [&mpcio, thread_num, depth, share] {
-            MPCTIO tio(mpcio, thread_num);
-            run_coroutines(tio, [&tio, depth, share] (yield_t &yield) {
-                size_t size = size_t(1)<<depth;
-                // size_t &aes_ops = tio.aes_ops();
-                Duoram<T> oram(tio.player(), size);
-                auto A = oram.flat(tio, yield);
-                RegAS aidx;
-                aidx.ashare = share;
-                T M;
-                if (tio.player() == 0) {
-                    M.set(0xbabb0000);
-                } else {
-                    M.set(0x0000a66e);
-                }
-                RegXS xidx;
-                xidx.xshare = share;
-                T N;
-                if (tio.player() == 0) {
-                    N.set(0xdead0000);
-                } else {
-                    N.set(0x0000beef);
-                }
-                // Writing and reading with additively shared indices
-                printf("Updating\n");
-                A[aidx] += M;
-                printf("Reading\n");
-                T Aa = A[aidx];
-                // Writing and reading with XOR shared indices
-                printf("Updating\n");
-                A[xidx] += N;
-                printf("Reading\n");
-                T Ax = A[xidx];
-                T Ae;
-                // Writing and reading with explicit indices
-                if (depth > 2) {
-                    A[5] += Aa;
-                    Ae = A[6];
-                }
-                if (depth <= 10) {
-                    oram.dump();
-                    auto check = A.reconstruct();
-                    if (tio.player() == 0) {
-                        for (address_t i=0;i<size;++i) {
-                            printf("%04x %016lx\n", i, check[i].share());
-                        }
-                    }
-                }
-                auto checkread = A.reconstruct(Aa);
-                auto checkreade = A.reconstruct(Ae);
-                auto checkreadx = A.reconstruct(Ax);
-                if (tio.player() == 0) {
-                    printf("Read AS value = %016lx\n", checkread.share());
-                    printf("Read AX value = %016lx\n", checkreadx.share());
-                    printf("Read Ex value = %016lx\n", checkreade.share());
+    MPCTIO tio(mpcio, 0, opts.num_threads);
+    run_coroutines(tio, [&tio, depth, share] (yield_t &yield) {
+        size_t size = size_t(1)<<depth;
+        // size_t &aes_ops = tio.aes_ops();
+        Duoram<T> oram(tio.player(), size);
+        auto A = oram.flat(tio, yield);
+        RegAS aidx;
+        aidx.ashare = share;
+        T M;
+        if (tio.player() == 0) {
+            M.set(0xbabb0000);
+        } else {
+            M.set(0x0000a66e);
+        }
+        RegXS xidx;
+        xidx.xshare = share;
+        T N;
+        if (tio.player() == 0) {
+            N.set(0xdead0000);
+        } else {
+            N.set(0x0000beef);
+        }
+        // Writing and reading with additively shared indices
+        printf("Updating\n");
+        A[aidx] += M;
+        printf("Reading\n");
+        T Aa = A[aidx];
+        // Writing and reading with XOR shared indices
+        printf("Updating\n");
+        A[xidx] += N;
+        printf("Reading\n");
+        T Ax = A[xidx];
+        T Ae;
+        // Writing and reading with explicit indices
+        if (depth > 2) {
+            A[5] += Aa;
+            Ae = A[6];
+        }
+        if (depth <= 10) {
+            oram.dump();
+            auto check = A.reconstruct();
+            if (tio.player() == 0) {
+                for (address_t i=0;i<size;++i) {
+                    printf("%04x %016lx\n", i, check[i].share());
                 }
-            });
-        });
-    }
-    pool.join();
+            }
+        }
+        auto checkread = A.reconstruct(Aa);
+        auto checkreade = A.reconstruct(Ae);
+        auto checkreadx = A.reconstruct(Ax);
+        if (tio.player() == 0) {
+            printf("Read AS value = %016lx\n", checkread.share());
+            printf("Read AX value = %016lx\n", checkreadx.share());
+            printf("Read Ex value = %016lx\n", checkreade.share());
+        }
+    });
 }
 
 static void cdpf_test(MPCIO &mpcio,