Prechádzať zdrojové kódy

Save to disk the RDPF pairs and triples created during preprocessing

Ian Goldberg 1 rok pred
rodič
commit
ee74b05694
2 zmenil súbory, kde vykonal 67 pridanie a 19 odobranie
  1. 59 15
      preproc.cpp
  2. 8 4
      rdpf.hpp

+ 59 - 15
preproc.cpp

@@ -5,23 +5,57 @@
 #include "preproc.hpp"
 #include "rdpf.hpp"
 
+// Keep track of open files that coroutines might be writing into
+class Openfiles {
+    std::vector<std::ofstream> files;
+
+public:
+    class Handle {
+        Openfiles &parent;
+        size_t idx;
+    public:
+        Handle(Openfiles &parent, size_t idx) :
+            parent(parent), idx(idx) {}
+
+        // Retrieve the ofstream from this Handle
+        std::ofstream &os() const { return parent.files[idx]; }
+    };
+
+    Handle open(const char *prefix, unsigned player,
+        unsigned thread_num, nbits_t depth = 0);
+
+    void closeall();
+};
+
 // Open a file for writing with name the given prefix, and ".pX.tY"
 // suffix, where X is the (one-digit) player number and Y is the thread
-// number
-static std::ofstream openfile(const char *prefix, unsigned player,
-    unsigned thread_num)
+// number.  If depth D is given, use "D.pX.tY" as the suffix.
+Openfiles::Handle Openfiles::open(const char *prefix, unsigned player,
+    unsigned thread_num, nbits_t depth)
 {
     std::string filename(prefix);
     char suffix[20];
-    sprintf(suffix, ".p%d.t%u", player%10, thread_num);
+    if (depth > 0) {
+        sprintf(suffix, "%02d.p%d.t%u", depth, player%10, thread_num);
+    } else {
+        sprintf(suffix, ".p%d.t%u", player%10, thread_num);
+    }
     filename.append(suffix);
-    std::ofstream f;
-    f.open(filename);
+    std::ofstream &f = files.emplace_back(filename);
     if (f.fail()) {
         std::cerr << "Failed to open " << filename << "\n";
         exit(1);
     }
-    return f;
+    return Handle(*this, files.size()-1);
+}
+
+// Close all the open files
+void Openfiles::closeall()
+{
+    for (auto& f: files) {
+        f.close();
+    }
+    files.clear();
 }
 
 // The server-to-computational-peer protocol for sending precomputed
@@ -46,6 +80,7 @@ void preprocessing_comp(MPCIO &mpcio, int num_threads, char **args)
     for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
         boost::asio::post(pool, [&mpcio, thread_num] {
             MPCTIO tio(mpcio, thread_num);
+            Openfiles ofiles;
             std::vector<coro_t> coroutines;
             while(1) {
                 unsigned char type = 0;
@@ -55,31 +90,31 @@ void preprocessing_comp(MPCIO &mpcio, int num_threads, char **args)
                 tio.recv_server(&num, 4);
                 if (type == 0x80) {
                     // Multiplication triples
-                    std::ofstream tripfile = openfile("triples",
+                    auto tripfile = ofiles.open("triples",
                         mpcio.player, thread_num);
 
                     MultTriple T;
                     for (unsigned int i=0; i<num; ++i) {
                         T = tio.triple();
-                        tripfile.write((const char *)&T, sizeof(T));
+                        tripfile.os().write((const char *)&T, sizeof(T));
                     }
-                    tripfile.close();
                 } else if (type == 0x81) {
                     // Multiplication half triples
-                    std::ofstream halffile = openfile("halves",
+                    auto halffile = ofiles.open("halves",
                         mpcio.player, thread_num);
 
                     HalfTriple H;
                     for (unsigned int i=0; i<num; ++i) {
                         H = tio.halftriple();
-                        halffile.write((const char *)&H, sizeof(H));
+                        halffile.os().write((const char *)&H, sizeof(H));
                     }
-                    halffile.close();
                 } else if (type >= 0x01 && type <= 0x30) {
                     // RAM DPFs
+                    auto tripfile = ofiles.open("rdpf",
+                        mpcio.player, thread_num, type);
                     for (unsigned int i=0; i<num; ++i) {
                         coroutines.emplace_back(
-                            [&](yield_t &yield) {
+                            [&, tripfile, type](yield_t &yield) {
                                 RDPFTriple rdpftrip(tio, yield, type);
                                 printf("usi0 = %016lx\n", rdpftrip.dpf[0].unit_sum_inverse);
                                 printf("sxr0 = %016lx\n", rdpftrip.dpf[0].scaled_xor.xshare);
@@ -89,11 +124,13 @@ void preprocessing_comp(MPCIO &mpcio, int num_threads, char **args)
                                 printf("sxr2 = %016lx\n", rdpftrip.dpf[2].scaled_xor.xshare);
                                 tio.iostream_server() <<
                                     rdpftrip.dpf[(mpcio.player == 0) ? 1 : 2];
+                                tripfile.os() << rdpftrip;
                             });
                     }
                 }
             }
             run_coroutines(tio, coroutines);
+            ofiles.closeall();
         });
     }
     pool.join();
@@ -106,6 +143,7 @@ void preprocessing_server(MPCServerIO &mpcsrvio, int num_threads, char **args)
         boost::asio::post(pool, [&mpcsrvio, thread_num, args] {
             char **threadargs = args;
             MPCTIO stio(mpcsrvio, thread_num);
+            Openfiles ofiles;
             std::vector<coro_t> coroutines;
             if (*threadargs && threadargs[0][0] == 'T') {
                 // Per-thread initialization.  The args look like:
@@ -167,17 +205,22 @@ void preprocessing_server(MPCServerIO &mpcsrvio, int num_threads, char **args)
                         stio.queue_p1(&typetag, 1);
                         stio.queue_p1(&num, 4);
 
+                        auto pairfile = ofiles.open("rdpf",
+                            mpcsrvio.player, thread_num, depth);
                         for (unsigned int i=0; i<num; ++i) {
                             coroutines.emplace_back(
-                                [&](yield_t &yield) {
+                                [&, pairfile, depth](yield_t &yield) {
                                     RDPFTriple rdpftrip(stio, yield, depth);
                                     RDPFPair rdpfpair;
                                     stio.iostream_p0() >> rdpfpair.dpf[0];
                                     stio.iostream_p1() >> rdpfpair.dpf[1];
                                 printf("usi0 = %016lx\n", rdpfpair.dpf[0].unit_sum_inverse);
                                 printf("sxr0 = %016lx\n", rdpfpair.dpf[0].scaled_xor.xshare);
+                                printf("dep0 = %d\n", rdpfpair.dpf[0].depth());
                                 printf("usi1 = %016lx\n", rdpfpair.dpf[1].unit_sum_inverse);
                                 printf("sxr1 = %016lx\n", rdpfpair.dpf[1].scaled_xor.xshare);
+                                printf("dep1 = %d\n", rdpfpair.dpf[1].depth());
+                                    pairfile.os() << rdpfpair;
                                 });
                         }
                     }
@@ -190,6 +233,7 @@ void preprocessing_server(MPCServerIO &mpcsrvio, int num_threads, char **args)
             stio.queue_p0(&typetag, 1);
             stio.queue_p1(&typetag, 1);
             run_coroutines(stio, coroutines);
+            ofiles.closeall();
         });
     }
     pool.join();

+ 8 - 4
rdpf.hpp

@@ -126,8 +126,9 @@ T& operator<<(T &os, const RDPFTriple &rdpftrip)
 {
     os << rdpftrip.dpf[0] << rdpftrip.dpf[1] << rdpftrip.dpf[2];
     nbits_t depth = rdpftrip.dpf[0].depth();
-    os.write(&rdpftrip.as_target.ashare, BITBYTES(depth));
-    os.write(&rdpftrip.xs_target.xshare, BITBYTES(depth));
+    os.write((const char *)&rdpftrip.as_target.ashare, BITBYTES(depth));
+    os.write((const char *)&rdpftrip.xs_target.xshare, BITBYTES(depth));
+    return os;
 }
 
 template <typename T>
@@ -136,9 +137,10 @@ T& operator>>(T &is, RDPFTriple &rdpftrip)
     is >> rdpftrip.dpf[0] >> rdpftrip.dpf[1] >> rdpftrip.dpf[2];
     nbits_t depth = rdpftrip.dpf[0].depth();
     rdpftrip.as_target.ashare = 0;
-    is.read(&rdpftrip.as_target.ashare, BITBYTES(depth));
+    is.read((char *)&rdpftrip.as_target.ashare, BITBYTES(depth));
     rdpftrip.xs_target.xshare = 0;
-    is.read(&rdpftrip.xs_target.xshare, BITBYTES(depth));
+    is.read((char *)&rdpftrip.xs_target.xshare, BITBYTES(depth));
+    return is;
 }
 
 struct RDPFPair {
@@ -151,12 +153,14 @@ template <typename T>
 T& operator<<(T &os, const RDPFPair &rdpfpair)
 {
     os << rdpfpair.dpf[0] << rdpfpair.dpf[1];
+    return os;
 }
 
 template <typename T>
 T& operator>>(T &is, RDPFPair &rdpfpair)
 {
     is >> rdpfpair.dpf[0] >> rdpfpair.dpf[1];
+    return is;
 }
 
 #endif