#include #include "types.hpp" #include "coroutine.hpp" #include "preproc.hpp" #include "rdpf.hpp" // Keep track of open files that coroutines might be writing into class Openfiles { std::vector files; public: class Handle { Openfiles &parent; size_t idx; public: Handle(Openfiles &parent, size_t idx) : parent(parent), idx(idx) {} // Retrieve the ofstream from this Handle std::ofstream &os() const { return parent.files[idx]; } }; Handle open(const char *prefix, unsigned player, unsigned thread_num, nbits_t depth = 0); void closeall(); }; // Open a file for writing with name the given prefix, and ".pX.tY" // suffix, where X is the (one-digit) player number and Y is the thread // number. If depth D is given, use "D.pX.tY" as the suffix. Openfiles::Handle Openfiles::open(const char *prefix, unsigned player, unsigned thread_num, nbits_t depth) { std::string filename(prefix); char suffix[20]; if (depth > 0) { sprintf(suffix, "%02d.p%d.t%u", depth, player%10, thread_num); } else { sprintf(suffix, ".p%d.t%u", player%10, thread_num); } filename.append(suffix); std::ofstream &f = files.emplace_back(filename); if (f.fail()) { std::cerr << "Failed to open " << filename << "\n"; exit(1); } return Handle(*this, files.size()-1); } // Close all the open files void Openfiles::closeall() { for (auto& f: files) { f.close(); } files.clear(); } // The server-to-computational-peer protocol for sending precomputed // data is: // // One byte: type // 0x80: Multiplication triple // 0x81: Multiplication half-triple // 0x01 to 0x30: RAM DPF of that depth // 0x40: Comparison DPF // 0x00: End of preprocessing // // Four bytes: number of objects of that type (not sent for type == 0x00) // // Then that number of objects // // Repeat the whole thing until type == 0x00 is received void preprocessing_comp(MPCIO &mpcio, const PRACOptions &opts, char **args) { int num_threads = opts.num_threads; boost::asio::thread_pool pool(num_threads); for (int thread_num = 0; thread_num < num_threads; ++thread_num) { boost::asio::post(pool, [&mpcio, &opts, thread_num] { MPCTIO tio(mpcio, thread_num); Openfiles ofiles; std::vector coroutines; while(1) { unsigned char type = 0; unsigned int num = 0; size_t res = tio.recv_server(&type, 1); if (res < 1 || type == 0) break; tio.recv_server(&num, 4); if (type == 0x80) { // Multiplication triples auto tripfile = ofiles.open("triples", mpcio.player, thread_num); MultTriple T; for (unsigned int i=0; i= 0x01 && type <= 0x30) { // RAM DPFs auto tripfile = ofiles.open("rdpf", mpcio.player, thread_num, type); for (unsigned int i=0; i coroutines; if (*threadargs && threadargs[0][0] == 'T') { // Per-thread initialization. The args look like: // T0 t:50 h:10 T1 t:20 h:30 T2 h:20 // Skip to the arg marking our thread char us[20]; sprintf(us, "T%u", thread_num); while (*threadargs && strcmp(*threadargs, us)) { ++threadargs; } // Now skip to the next arg if there is one if (*threadargs) { ++threadargs; } } // Stop scanning for args when we get to the end or when we // get to another per-thread initialization marker while (*threadargs && threadargs[0][0] != 'T') { char *arg = strdup(*threadargs); char *colon = strchr(arg, ':'); if (!colon) { std::cerr << "Args must be type:num\n"; ++threadargs; free(arg); continue; } unsigned num = atoi(colon+1); *colon = '\0'; char *type = arg; if (!strcmp(type, "t")) { unsigned char typetag = 0x80; stio.queue_p0(&typetag, 1); stio.queue_p0(&num, 4); stio.queue_p1(&typetag, 1); stio.queue_p1(&num, 4); for (unsigned int i=0; i 48) { std::cerr << "Invalid DPF depth\n"; } else { unsigned char typetag = depth; stio.queue_p0(&typetag, 1); stio.queue_p0(&num, 4); stio.queue_p1(&typetag, 1); stio.queue_p1(&num, 4); auto pairfile = ofiles.open("rdpf", mpcsrvio.player, thread_num, depth); for (unsigned int i=0; i> rdpfpair.dpf[0]; stio.iostream_p1() >> rdpfpair.dpf[1]; printf("usi0 = %016lx\n", rdpfpair.dpf[0].unit_sum_inverse); printf("sxr0 = %016lx\n", rdpfpair.dpf[0].scaled_xor.xshare); printf("dep0 = %d\n", rdpfpair.dpf[0].depth()); printf("usi1 = %016lx\n", rdpfpair.dpf[1].unit_sum_inverse); printf("sxr1 = %016lx\n", rdpfpair.dpf[1].scaled_xor.xshare); printf("dep1 = %d\n", rdpfpair.dpf[1].depth()); if (opts.expand_rdpfs) { printf("Expanding\n"); rdpfpair.dpf[0].expand(stio.aes_ops()); rdpfpair.dpf[1].expand(stio.aes_ops()); printf("Expanded\n"); } pairfile.os() << rdpfpair; }); } } } free(arg); ++threadargs; } // That's all unsigned char typetag = 0x00; stio.queue_p0(&typetag, 1); stio.queue_p1(&typetag, 1); run_coroutines(stio, coroutines); ofiles.closeall(); }); } pool.join(); }