Browse Source

Cleanly separate communication threads from CPU threads in the CLI

Now "-C num" is always communication threads, and "-t num" is always CPU
threads.
Ian Goldberg 5 months ago
parent
commit
de44157928
9 changed files with 58 additions and 58 deletions
  1. 2 2
      avl.cpp
  2. 1 1
      bst.cpp
  3. 1 1
      cell.cpp
  4. 1 1
      heap.cpp
  5. 2 2
      heapsampler.cpp
  6. 25 24
      online.cpp
  7. 5 3
      options.hpp
  8. 17 6
      prac.cpp
  9. 4 18
      preproc.cpp

+ 2 - 2
avl.cpp

@@ -1813,7 +1813,7 @@ void avl(MPCIO &mpcio,
     size_t init_size = (size_t(1)<<(depth));
     size_t oram_size = init_size + n_inserts;
 
-    MPCTIO tio(mpcio, 0, opts.num_threads);
+    MPCTIO tio(mpcio, 0, opts.num_cpu_threads);
     run_coroutines(tio, [&tio, &mpcio, depth, oram_size, init_size, n_inserts, n_deletes, run_sanity, optimized] (yield_t &yield) {
         //printf("ORAM init_size = %ld, oram_size = %ld\n", init_size, oram_size);
         std::cout << "\n===== SETUP =====\n";
@@ -1891,7 +1891,7 @@ void avl_tests(MPCIO &mpcio,
     nbits_t depth=4;
     size_t items = (size_t(1)<<depth)-1;
 
-    MPCTIO tio(mpcio, 0, opts.num_threads);
+    MPCTIO tio(mpcio, 0, opts.num_cpu_threads);
     run_coroutines(tio, [&tio, depth, items] (yield_t &yield) {
         size_t size = size_t(1)<<depth;
         bool player0 = tio.player()==0;

+ 1 - 1
bst.cpp

@@ -764,7 +764,7 @@ void bst(MPCIO &mpcio,
         ++args;
     }
 
-    MPCTIO tio(mpcio, 0, opts.num_threads);
+    MPCTIO tio(mpcio, 0, opts.num_cpu_threads);
     run_coroutines(tio, [&tio, depth] (yield_t &yield) {
         size_t size = size_t(1)<<depth;
         BST tree(tio.player(), size);

+ 1 - 1
cell.cpp

@@ -26,7 +26,7 @@ void cell(MPCIO &mpcio,
         ++args;
     }
 
-    MPCTIO tio(mpcio, 0, opts.num_threads);
+    MPCTIO tio(mpcio, 0, opts.num_cpu_threads);
     run_coroutines(tio, [&tio, depth] (yield_t &yield) {
         size_t size = size_t(1)<<depth;
         Duoram<Cell> oram(tio.player(), size);

+ 1 - 1
heap.cpp

@@ -698,7 +698,7 @@ RegAS MinHeap::extract_min(MPCTIO &tio, yield_t & yield, int is_optimized) {
 void Heap(MPCIO & mpcio,  const PRACOptions & opts, char ** args) {
 
 
-    MPCTIO tio(mpcio, 0, opts.num_threads);
+    MPCTIO tio(mpcio, 0, opts.num_cpu_threads);
 
     int nargs = 0;
 

+ 2 - 2
heapsampler.cpp

@@ -135,7 +135,7 @@ void heapsampler_test(MPCIO &mpcio, const PRACOptions &opts, char **args)
         ++args;
     }
 
-    MPCTIO tio(mpcio, 0, opts.num_threads);
+    MPCTIO tio(mpcio, 0, opts.num_cpu_threads);
     run_coroutines(tio, [&mpcio, &tio, n, k] (yield_t &yield) {
 
         std::cout << "\n===== STREAMING =====\n";
@@ -204,7 +204,7 @@ void weighted_coin_test(MPCIO &mpcio, const PRACOptions &opts, char **args)
         ++args;
     }
 
-    MPCTIO tio(mpcio, 0, opts.num_threads);
+    MPCTIO tio(mpcio, 0, opts.num_cpu_threads);
     run_coroutines(tio, [&mpcio, &tio, iters, m, k] (yield_t &yield) {
 
         size_t heads = 0, tails = 0;

+ 25 - 24
online.cpp

@@ -188,7 +188,7 @@ static void lamport_test(MPCIO &mpcio,
         ++args;
     }
 
-    int num_threads = opts.num_threads;
+    int num_threads = opts.num_comm_threads;
     boost::asio::thread_pool pool(num_threads);
     for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
         boost::asio::post(pool, [&mpcio, thread_num, niters, chunksize, numchunks] {
@@ -226,7 +226,7 @@ static void rdpf_test(MPCIO &mpcio,
         ++args;
     }
 
-    MPCTIO tio(mpcio, 0, opts.num_threads);
+    MPCTIO tio(mpcio, 0, opts.num_cpu_threads);
     run_coroutines(tio, [&tio, depth, num_iters, incremental] (yield_t &yield) {
         size_t &aes_ops = tio.aes_ops();
         nbits_t min_level = incremental ? 1 : depth;
@@ -341,7 +341,7 @@ static void rdpf_timing(MPCIO &mpcio,
         ++args;
     }
 
-    int num_threads = opts.num_threads;
+    int num_threads = opts.num_comm_threads;
     boost::asio::thread_pool pool(num_threads);
     for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
         boost::asio::post(pool, [&mpcio, thread_num, depth] {
@@ -385,10 +385,11 @@ static void rdpf_timing(MPCIO &mpcio,
     pool.join();
 }
 
-static value_t parallel_streameval_rdpf(MPCIO &mpcio, const RDPF<1> &dpf,
+static value_t parallel_streameval_rdpf(MPCTIO &tio, const RDPF<1> &dpf,
     address_t start, int num_threads)
 {
     RDPF<1>::RegXSW scaled_xor[num_threads];
+    size_t aes_ops[num_threads];
     boost::asio::thread_pool pool(num_threads);
     address_t totsize = (address_t(1)<<dpf.depth());
     address_t threadstart = start;
@@ -397,8 +398,7 @@ static value_t parallel_streameval_rdpf(MPCIO &mpcio, const RDPF<1> &dpf,
     for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
         address_t threadsize = threadchunk + (address_t(thread_num) < threadextra);
         boost::asio::post(pool,
-            [&mpcio, &dpf, &scaled_xor, thread_num, threadstart, threadsize] {
-                MPCTIO tio(mpcio, thread_num);
+            [&tio, &dpf, &scaled_xor, &aes_ops, thread_num, threadstart, threadsize] {
 //printf("Thread %d from %X for %X\n", thread_num, threadstart, threadsize);
                 RDPF<1>::RegXSW local_xor;
                 size_t local_aes_ops = 0;
@@ -409,7 +409,7 @@ static value_t parallel_streameval_rdpf(MPCIO &mpcio, const RDPF<1> &dpf,
                     local_xor ^= dpf.scaled_xs(leaf);
                 }
                 scaled_xor[thread_num] = local_xor;
-                tio.aes_ops() += local_aes_ops;
+                aes_ops[thread_num] = local_aes_ops;
 //printf("Thread %d complete\n", thread_num);
             });
         threadstart = (threadstart + threadsize) % totsize;
@@ -418,6 +418,7 @@ static value_t parallel_streameval_rdpf(MPCIO &mpcio, const RDPF<1> &dpf,
     RDPF<1>::RegXSW res;
     for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
         res ^= scaled_xor[thread_num];
+        tio.aes_ops() += aes_ops[thread_num];
     }
     return res[0].xshare;
 }
@@ -437,15 +438,15 @@ static void rdpfeval_timing(MPCIO &mpcio,
         ++args;
     }
 
-    int num_threads = opts.num_threads;
+    int num_threads = opts.num_cpu_threads;
     MPCTIO tio(mpcio, 0, num_threads);
-    run_coroutines(tio, [&mpcio, &tio, depth, start, num_threads] (yield_t &yield) {
+    run_coroutines(tio, [&tio, depth, start, num_threads] (yield_t &yield) {
         if (tio.player() == 2) {
             RDPFPair<1> dp = tio.rdpfpair(yield, depth);
             for (int i=0;i<2;++i) {
                 RDPF<1> &dpf = dp.dpf[i];
                 value_t scaled_xor =
-                    parallel_streameval_rdpf(mpcio, dpf, start, num_threads);
+                    parallel_streameval_rdpf(tio, dpf, start, num_threads);
                 printf("%016lx\n%016lx\n", scaled_xor,
                     dpf.li[0].scaled_xor[0].xshare);
                 printf("\n");
@@ -455,7 +456,7 @@ static void rdpfeval_timing(MPCIO &mpcio,
             for (int i=0;i<3;++i) {
                 RDPF<1> &dpf = dt.dpf[i];
                 value_t scaled_xor =
-                    parallel_streameval_rdpf(mpcio, dpf, start, num_threads);
+                    parallel_streameval_rdpf(tio, dpf, start, num_threads);
                 printf("%016lx\n%016lx\n", scaled_xor,
                     dpf.li[0].scaled_xor[0].xshare);
                 printf("\n");
@@ -479,7 +480,7 @@ static void par_rdpfeval_timing(MPCIO &mpcio,
         ++args;
     }
 
-    int num_threads = opts.num_threads;
+    int num_threads = opts.num_cpu_threads;
     MPCTIO tio(mpcio, 0, num_threads);
     run_coroutines(tio, [&tio, depth, start, num_threads] (yield_t &yield) {
         if (tio.player() == 2) {
@@ -533,7 +534,7 @@ static void tupleeval_timing(MPCIO &mpcio,
         ++args;
     }
 
-    int num_threads = opts.num_threads;
+    int num_threads = opts.num_cpu_threads;
     MPCTIO tio(mpcio, 0, num_threads);
     run_coroutines(tio, [&tio, depth, start] (yield_t &yield) {
         size_t &aes_ops = tio.aes_ops();
@@ -595,7 +596,7 @@ static void par_tupleeval_timing(MPCIO &mpcio,
         ++args;
     }
 
-    int num_threads = opts.num_threads;
+    int num_threads = opts.num_cpu_threads;
     MPCTIO tio(mpcio, 0, num_threads);
     run_coroutines(tio, [&tio, depth, start, num_threads] (yield_t &yield) {
         size_t &aes_ops = tio.aes_ops();
@@ -663,7 +664,7 @@ static void duoram_test(MPCIO &mpcio,
         ++args;
     }
 
-    MPCTIO tio(mpcio, 0, opts.num_threads);
+    MPCTIO tio(mpcio, 0, opts.num_cpu_threads);
     run_coroutines(tio, [&tio, depth, share, len] (yield_t &yield) {
         // size_t &aes_ops = tio.aes_ops();
         Duoram<T> oram(tio.player(), len);
@@ -781,7 +782,7 @@ static void duoram(MPCIO &mpcio,
         ++args;
     }
 
-    MPCTIO tio(mpcio, 0, opts.num_threads);
+    MPCTIO tio(mpcio, 0, opts.num_cpu_threads);
     run_coroutines(tio, [&mpcio, &tio, depth, items] (yield_t &yield) {
         size_t size = size_t(1)<<depth;
         address_t mask = (depth < ADDRESS_MAX_BITS ?
@@ -912,7 +913,7 @@ static void read_test(MPCIO &mpcio,
         ++args;
     }
 
-    MPCTIO tio(mpcio, 0, opts.num_threads);
+    MPCTIO tio(mpcio, 0, opts.num_cpu_threads);
     run_coroutines(tio, [&mpcio, &tio, depth, items] (yield_t &yield) {
         size_t size = size_t(1)<<depth;
         Duoram<T> oram(tio.player(), size);
@@ -951,7 +952,7 @@ static void cdpf_test(MPCIO &mpcio,
         ++args;
     }
 
-    int num_threads = opts.num_threads;
+    int num_threads = opts.num_comm_threads;
     boost::asio::thread_pool pool(num_threads);
     for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
         boost::asio::post(pool, [&mpcio, thread_num, query, target, iters] {
@@ -1087,7 +1088,7 @@ static void compare_test(MPCIO &mpcio,
         ++args;
     }
 
-    int num_threads = opts.num_threads;
+    int num_threads = opts.num_comm_threads;
     boost::asio::thread_pool pool(num_threads);
     for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
         boost::asio::post(pool, [&mpcio, thread_num, target, x] {
@@ -1135,7 +1136,7 @@ static void sort_test(MPCIO &mpcio,
         ++args;
     }
 
-    MPCTIO tio(mpcio, 0, opts.num_threads);
+    MPCTIO tio(mpcio, 0, opts.num_cpu_threads);
     run_coroutines(tio, [&tio, depth, len] (yield_t &yield) {
         address_t size = address_t(1)<<depth;
         // size_t &aes_ops = tio.aes_ops();
@@ -1194,7 +1195,7 @@ static void pad_test(MPCIO &mpcio,
         ++args;
     }
 
-    MPCTIO tio(mpcio, 0, opts.num_threads);
+    MPCTIO tio(mpcio, 0, opts.num_cpu_threads);
     run_coroutines(tio, [&mpcio, &tio, depth, len] (yield_t &yield) {
         int player = tio.player();
         Duoram<RegAS> oram(player, len);
@@ -1282,7 +1283,7 @@ static void bsearch_test(MPCIO &mpcio,
         ++args;
     }
 
-    MPCTIO tio(mpcio, 0, opts.num_threads);
+    MPCTIO tio(mpcio, 0, opts.num_cpu_threads);
     run_coroutines(tio, [&tio, &mpcio, depth, len, iters, target, is_presorted] (yield_t &yield) {
         RegAS tshare;
         std::cout << "\n===== SETUP =====\n";
@@ -1425,7 +1426,7 @@ static void related(MPCIO &mpcio,
     }
     assert(layer < depth);
 
-    MPCTIO tio(mpcio, 0, opts.num_threads);
+    MPCTIO tio(mpcio, 0, opts.num_cpu_threads);
     run_coroutines(tio, [&mpcio, &tio, depth, layer] (yield_t &yield) {
         size_t size = size_t(1)<<(depth+1);
         Duoram<T> oram(tio.player(), size);
@@ -1524,7 +1525,7 @@ static void path(MPCIO &mpcio,
         ++args;
     }
 
-    MPCTIO tio(mpcio, 0, opts.num_threads);
+    MPCTIO tio(mpcio, 0, opts.num_cpu_threads);
     run_coroutines(tio, [&mpcio, &tio, depth, target_node] (yield_t &yield) {
         size_t size = size_t(1)<<(depth+1);
         Duoram<T> oram(tio.player(), size);

+ 5 - 3
options.hpp

@@ -5,13 +5,15 @@
 
 struct PRACOptions {
     ProcessingMode mode;
-    int num_threads;
+    int num_comm_threads;
+    int num_cpu_threads;
     bool expand_rdpfs;
     bool use_xor_db;
     bool append_to_files;
 
-    PRACOptions() : mode(MODE_ONLINE), num_threads(1),
-        expand_rdpfs(false), use_xor_db(false), append_to_files(false) {}
+    PRACOptions() : mode(MODE_ONLINE), num_comm_threads(1),
+        num_cpu_threads(1), expand_rdpfs(false), use_xor_db(false),
+        append_to_files(false) {}
 };
 
 #endif

+ 17 - 6
prac.cpp

@@ -8,11 +8,12 @@
 
 static void usage(const char *progname)
 {
-    std::cerr << "Usage: " << progname << " [-p | -a | -o] [-t num] [-e] [-x] player_num player_addrs args ...\n";
+    std::cerr << "Usage: " << progname << " [-p | -a | -o] [-C num] [-t num] [-e] [-x] player_num player_addrs args ...\n";
     std::cerr << "-p: preprocessing mode\n";
     std::cerr << "-a: append to files in preprocessing mode (implies -p)\n";
     std::cerr << "-o: online-only mode\n";
-    std::cerr << "-t num: use num threads\n";
+    std::cerr << "-C num: use num communication threads\n";
+    std::cerr << "-t num: use num CPU threads per communication thread\n";
     std::cerr << "-e: store DPFs expanded (default is compressed)\n";
     std::cerr << "-x: use XOR-shared database (default is additive)\n";
     std::cerr << "player_num = 0 or 1 for the computational players\n";
@@ -29,7 +30,7 @@ static void comp_player_main(boost::asio::io_context &io_context,
 {
     std::deque<tcp::socket> peersocks, serversocks;
     mpcio_setup_computational(player, io_context, p0addr,
-        opts.num_threads, peersocks, serversocks);
+        opts.num_comm_threads, peersocks, serversocks);
     MPCPeerIO mpcio(player, opts.mode, peersocks, serversocks);
 
     // Queue up the work to be done
@@ -55,7 +56,7 @@ static void server_player_main(boost::asio::io_context &io_context,
 {
     std::deque<tcp::socket> p0socks, p1socks;
     mpcio_setup_server(io_context, p0addr, p1addr,
-        opts.num_threads, p0socks, p1socks);
+        opts.num_comm_threads, p0socks, p1socks);
     MPCServerIO mpcserverio(opts.mode, p0socks, p1socks);
 
     // Queue up the work to be done
@@ -94,10 +95,20 @@ int main(int argc, char **argv)
         } else if (!strcmp("-o", *args)) {
             opts.mode = MODE_ONLINEONLY;
             ++args;
+        } else if (!strcmp("-C", *args)) {
+            if (args[1]) {
+                opts.num_comm_threads = atoi(args[1]);
+                if (opts.num_comm_threads < 1) {
+                    usage(argv[0]);
+                }
+                args += 2;
+            } else {
+                usage(argv[0]);
+            }
         } else if (!strcmp("-t", *args)) {
             if (args[1]) {
-                opts.num_threads = atoi(args[1]);
-                if (opts.num_threads < 1) {
+                opts.num_cpu_threads = atoi(args[1]);
+                if (opts.num_cpu_threads < 1) {
                     usage(argv[0]);
                 }
                 args += 2;

+ 4 - 18
preproc.cpp

@@ -74,7 +74,6 @@ void Openfiles::closeall()
 //   0x82: AND triple
 //   0x83: Select triple
 //   0x8e: Counter (for testing)
-//   0x8f: Set number of CPU threads for this communication thread
 //   0x00: End of preprocessing
 //
 // One byte: subtype (not sent for type == 0x00)
@@ -90,11 +89,11 @@ void Openfiles::closeall()
 
 void preprocessing_comp(MPCIO &mpcio, const PRACOptions &opts, char **args)
 {
-    int num_threads = opts.num_threads;
+    int num_threads = opts.num_comm_threads;
     boost::asio::thread_pool pool(num_threads);
     for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
         boost::asio::post(pool, [&mpcio, &opts, thread_num] {
-            MPCTIO tio(mpcio, thread_num);
+            MPCTIO tio(mpcio, thread_num, opts.num_cpu_threads);
             Openfiles ofiles(opts.append_to_files);
             std::vector<coro_t> coroutines;
             while(1) {
@@ -249,8 +248,6 @@ void preprocessing_comp(MPCIO &mpcio, const PRACOptions &opts, char **args)
                                 }
                             }
                         });
-                } else if (type == 0x8f) {
-                    tio.cpu_nthreads(num);
                 }
             }
             run_coroutines(tio, coroutines);
@@ -262,12 +259,12 @@ void preprocessing_comp(MPCIO &mpcio, const PRACOptions &opts, char **args)
 
 void preprocessing_server(MPCServerIO &mpcsrvio, const PRACOptions &opts, char **args)
 {
-    int num_threads = opts.num_threads;
+    int num_threads = opts.num_comm_threads;
     boost::asio::thread_pool pool(num_threads);
     for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
         boost::asio::post(pool, [&mpcsrvio, &opts, thread_num, args] {
             char **threadargs = args;
-            MPCTIO stio(mpcsrvio, thread_num);
+            MPCTIO stio(mpcsrvio, thread_num, opts.num_cpu_threads);
             Openfiles ofiles(opts.append_to_files);
             std::vector<coro_t> coroutines;
             if (*threadargs && threadargs[0][0] == 'T') {
@@ -505,17 +502,6 @@ void preprocessing_server(MPCServerIO &mpcsrvio, const PRACOptions &opts, char *
                             }
                         });
 
-                } else if (!strcmp(type, "p")) {
-                    unsigned char typetag = 0x8f;
-                    unsigned char subtypetag = 0x00;
-                    stio.queue_p0(&typetag, 1);
-                    stio.queue_p0(&subtypetag, 1);
-                    stio.queue_p0(&num, 4);
-                    stio.queue_p1(&typetag, 1);
-                    stio.queue_p1(&subtypetag, 1);
-                    stio.queue_p1(&num, 4);
-
-                    stio.cpu_nthreads(num);
 		}
                 free(arg);
                 ++threadargs;