123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518 |
- #include <vector>
- #include "types.hpp"
- #include "coroutine.hpp"
- #include "preproc.hpp"
- #include "rdpf.hpp"
- #include "cdpf.hpp"
- // Keep track of open files that coroutines might be writing into
- class Openfiles {
- bool append_mode;
- std::vector<std::ofstream> files;
- public:
- Openfiles(bool append_mode = false) : append_mode(append_mode) {}
- class Handle {
- Openfiles &parent;
- size_t idx;
- public:
- Handle(Openfiles &parent, size_t idx) :
- parent(parent), idx(idx) {}
- // Retrieve the ofstream from this Handle
- std::ofstream &os() const { return parent.files[idx]; }
- };
- Handle open(const char *prefix, unsigned player,
- unsigned thread_num, nbits_t depth = 0);
- void closeall();
- };
- // Open a file for writing with name the given prefix, and ".pX.tY"
- // suffix, where X is the (one-digit) player number and Y is the thread
- // number. If depth D is given, use "D.pX.tY" as the suffix.
- Openfiles::Handle Openfiles::open(const char *prefix, unsigned player,
- unsigned thread_num, nbits_t depth)
- {
- std::string filename(prefix);
- char suffix[20];
- if (depth > 0) {
- sprintf(suffix, "%02d.p%d.t%u", depth, player%10, thread_num);
- } else {
- sprintf(suffix, ".p%d.t%u", player%10, thread_num);
- }
- filename.append(suffix);
- std::ofstream &f = files.emplace_back(filename,
- append_mode ? std::ios_base::app : std::ios_base::out);
- if (f.fail()) {
- std::cerr << "Failed to open " << filename << "\n";
- exit(1);
- }
- return Handle(*this, files.size()-1);
- }
- // Close all the open files
- void Openfiles::closeall()
- {
- for (auto& f: files) {
- f.close();
- }
- files.clear();
- }
- // The server-to-computational-peer protocol for sending precomputed
- // data is:
- //
- // One byte: type
- // 0x01 to 0x30: RAM DPF of that depth
- // 0x40: Comparison DPF
- // 0x80: Multiplication triple
- // 0x81: Multiplication half-triple
- // 0x82: AND triple
- // 0x83: Select triple
- // 0x8e: Counter (for testing)
- // 0x00: End of preprocessing
- //
- // One byte: subtype (not sent for type == 0x00)
- // For RAM DPFs, the subtype is the width (0x01 to 0x05), OR'd with
- // 0x80 if it is an incremental RDPF
- // Otherwise, it is 0x00
- //
- // Four bytes: number of objects of that type (not sent for type == 0x00)
- //
- // Then that number of objects
- //
- // Repeat the whole thing until type == 0x00 is received
- void preprocessing_comp(MPCIO &mpcio, const PRACOptions &opts, char **args)
- {
- int num_threads = opts.num_comm_threads;
- boost::asio::thread_pool pool(num_threads);
- for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
- boost::asio::post(pool, [&mpcio, &opts, thread_num] {
- MPCTIO tio(mpcio, thread_num, opts.num_cpu_threads);
- Openfiles ofiles(opts.append_to_files);
- std::vector<coro_t> coroutines;
- while(1) {
- unsigned char type = 0;
- unsigned char subtype = 0;
- unsigned int num = 0;
- size_t res = tio.recv_server(&type, 1);
- if (res < 1 || type == 0) break;
- tio.recv_server(&subtype, 1);
- tio.recv_server(&num, 4);
- if (type == 0x80) {
- // Multiplication triples
- auto tripfile = ofiles.open("mults",
- mpcio.player, thread_num);
- for (unsigned int i=0; i<num; ++i) {
- coroutines.emplace_back(
- [&tio, tripfile](yield_t &yield) {
- yield();
- MultTriple T = tio.multtriple(yield);
- tripfile.os() << T;
- });
- }
- } else if (type == 0x81) {
- // Multiplication half triples
- auto halffile = ofiles.open("halves",
- mpcio.player, thread_num);
- for (unsigned int i=0; i<num; ++i) {
- coroutines.emplace_back(
- [&tio, halffile](yield_t &yield) {
- yield();
- HalfTriple H = tio.halftriple(yield);
- halffile.os() << H;
- });
- }
- } else if (type == 0x82) {
- // AND triples
- auto andfile = ofiles.open("ands",
- mpcio.player, thread_num);
- for (unsigned int i=0; i<num; ++i) {
- coroutines.emplace_back(
- [&tio, andfile](yield_t &yield) {
- yield();
- AndTriple A = tio.andtriple(yield);
- andfile.os() << A;
- });
- }
- } else if (type == 0x83) {
- // Select triples
- auto selfile = ofiles.open("selects",
- mpcio.player, thread_num);
- for (unsigned int i=0; i<num; ++i) {
- coroutines.emplace_back(
- [&tio, selfile](yield_t &yield) {
- yield();
- SelectTriple<value_t> S =
- tio.valselecttriple(yield);
- selfile.os() << S;
- });
- }
- } else if (type >= 0x01 && type <= 0x30) {
- // RAM DPFs
- bool incremental = false;
- if (subtype >= 0x80) {
- incremental = true;
- subtype -= 0x80;
- }
- assert(subtype >= 0x01 && subtype <= 0x05);
- char prefix[12];
- strcpy(prefix, incremental ? "irdpf" : "rdpf");
- if (subtype > 1) {
- sprintf(prefix+strlen(prefix), "%d_", subtype);
- }
- auto tripfile = ofiles.open(prefix,
- mpcio.player, thread_num, type);
- for (unsigned int i=0; i<num; ++i) {
- coroutines.emplace_back(
- [&tio, &opts, incremental, tripfile, type,
- subtype](yield_t &yield) {
- yield();
- switch(subtype) {
- case 1: {
- RDPFTriple<1> rdpftrip =
- tio.rdpftriple<1>(yield, type,
- incremental, opts.expand_rdpfs);
- tripfile.os() << rdpftrip;
- break;
- }
- case 2: {
- RDPFTriple<2> rdpftrip =
- tio.rdpftriple<2>(yield, type,
- incremental, opts.expand_rdpfs);
- tripfile.os() << rdpftrip;
- break;
- }
- case 3: {
- RDPFTriple<3> rdpftrip =
- tio.rdpftriple<3>(yield, type,
- incremental, opts.expand_rdpfs);
- tripfile.os() << rdpftrip;
- break;
- }
- case 4: {
- RDPFTriple<4> rdpftrip =
- tio.rdpftriple<4>(yield, type,
- incremental, opts.expand_rdpfs);
- tripfile.os() << rdpftrip;
- break;
- }
- case 5: {
- RDPFTriple<5> rdpftrip =
- tio.rdpftriple<5>(yield, type,
- incremental, opts.expand_rdpfs);
- tripfile.os() << rdpftrip;
- break;
- }
- }
- });
- }
- } else if (type == 0x40) {
- // Comparison DPFs
- auto cdpffile = ofiles.open("cdpf",
- mpcio.player, thread_num);
- for (unsigned int i=0; i<num; ++i) {
- coroutines.emplace_back(
- [&tio, cdpffile](yield_t &yield) {
- yield();
- CDPF C = tio.cdpf(yield);
- cdpffile.os() << C;
- });
- }
- } else if (type == 0x8e) {
- coroutines.emplace_back(
- [&tio, num](yield_t &yield) {
- yield();
- unsigned int istart = 0x31415080;
- for (unsigned int i=istart; i<istart+num; ++i) {
- tio.queue_peer(&i, sizeof(i));
- tio.queue_server(&i, sizeof(i));
- yield();
- unsigned int peeri, srvi;
- tio.recv_peer(&peeri, sizeof(peeri));
- tio.recv_server(&srvi, sizeof(srvi));
- if (peeri != i || srvi != i) {
- printf("Incorrect counter received: "
- "peer=%08x srv=%08x\n", peeri,
- srvi);
- }
- }
- });
- }
- }
- run_coroutines(tio, coroutines);
- ofiles.closeall();
- });
- }
- pool.join();
- }
- void preprocessing_server(MPCServerIO &mpcsrvio, const PRACOptions &opts, char **args)
- {
- int num_threads = opts.num_comm_threads;
- boost::asio::thread_pool pool(num_threads);
- for (int thread_num = 0; thread_num < num_threads; ++thread_num) {
- boost::asio::post(pool, [&mpcsrvio, &opts, thread_num, args] {
- char **threadargs = args;
- MPCTIO stio(mpcsrvio, thread_num, opts.num_cpu_threads);
- Openfiles ofiles(opts.append_to_files);
- std::vector<coro_t> coroutines;
- if (*threadargs && threadargs[0][0] == 'T') {
- // Per-thread initialization. The args look like:
- // T0 t:50 h:10 T1 t:20 h:30 T2 h:20
- // Skip to the arg marking our thread
- char us[20];
- sprintf(us, "T%u", thread_num);
- while (*threadargs && strcmp(*threadargs, us)) {
- ++threadargs;
- }
- // Now skip to the next arg if there is one
- if (*threadargs) {
- ++threadargs;
- }
- }
- // Stop scanning for args when we get to the end or when we
- // get to another per-thread initialization marker
- while (*threadargs && threadargs[0][0] != 'T') {
- char *arg = strdup(*threadargs);
- char *colon = strchr(arg, ':');
- if (!colon) {
- std::cerr << "Args must be type:num\n";
- ++threadargs;
- free(arg);
- continue;
- }
- unsigned num = atoi(colon+1);
- *colon = '\0';
- char *type = arg;
- if (!strcmp(type, "m")) {
- unsigned char typetag = 0x80;
- unsigned char subtypetag = 0x00;
- stio.queue_p0(&typetag, 1);
- stio.queue_p0(&subtypetag, 1);
- stio.queue_p0(&num, 4);
- stio.queue_p1(&typetag, 1);
- stio.queue_p1(&subtypetag, 1);
- stio.queue_p1(&num, 4);
- for (unsigned int i=0; i<num; ++i) {
- coroutines.emplace_back(
- [&stio](yield_t &yield) {
- yield();
- stio.multtriple(yield);
- });
- }
- } else if (!strcmp(type, "h")) {
- unsigned char typetag = 0x81;
- unsigned char subtypetag = 0x00;
- stio.queue_p0(&typetag, 1);
- stio.queue_p0(&subtypetag, 1);
- stio.queue_p0(&num, 4);
- stio.queue_p1(&typetag, 1);
- stio.queue_p1(&subtypetag, 1);
- stio.queue_p1(&num, 4);
- for (unsigned int i=0; i<num; ++i) {
- coroutines.emplace_back(
- [&stio](yield_t &yield) {
- yield();
- stio.halftriple(yield);
- });
- }
- } else if (!strcmp(type, "a")) {
- unsigned char typetag = 0x82;
- unsigned char subtypetag = 0x00;
- stio.queue_p0(&typetag, 1);
- stio.queue_p0(&subtypetag, 1);
- stio.queue_p0(&num, 4);
- stio.queue_p1(&typetag, 1);
- stio.queue_p1(&subtypetag, 1);
- stio.queue_p1(&num, 4);
- for (unsigned int i=0; i<num; ++i) {
- coroutines.emplace_back(
- [&stio](yield_t &yield) {
- yield();
- stio.andtriple(yield);
- });
- }
- } else if (!strcmp(type, "s")) {
- unsigned char typetag = 0x83;
- unsigned char subtypetag = 0x00;
- stio.queue_p0(&typetag, 1);
- stio.queue_p0(&subtypetag, 1);
- stio.queue_p0(&num, 4);
- stio.queue_p1(&typetag, 1);
- stio.queue_p1(&subtypetag, 1);
- stio.queue_p1(&num, 4);
- for (unsigned int i=0; i<num; ++i) {
- coroutines.emplace_back(
- [&stio](yield_t &yield) {
- yield();
- stio.valselecttriple(yield);
- });
- }
- } else if (type[0] == 'r' || type[0] == 'i') {
- bool incremental = (type[0] == 'i');
- char *widthstr = strchr(type, '.');
- unsigned char width = 1;
- if (widthstr) {
- *widthstr = '\0';
- ++widthstr;
- width = atoi(widthstr);
- }
- int depth = atoi(type+1);
- if (depth < 1 || depth > 48) {
- std::cerr << "Invalid DPF depth\n";
- } else {
- unsigned char typetag = depth;
- unsigned char subtypetag = width;
- if (incremental) {
- subtypetag += 0x80;
- }
- stio.queue_p0(&typetag, 1);
- stio.queue_p0(&subtypetag, 1);
- stio.queue_p0(&num, 4);
- stio.queue_p1(&typetag, 1);
- stio.queue_p1(&subtypetag, 1);
- stio.queue_p1(&num, 4);
- char prefix[12];
- strcpy(prefix, incremental ? "irdpf" : "rdpf");
- if (width > 1) {
- sprintf(prefix+strlen(prefix), "%d_", width);
- }
- auto pairfile = ofiles.open(prefix,
- mpcsrvio.player, thread_num, depth);
- for (unsigned int i=0; i<num; ++i) {
- coroutines.emplace_back(
- [&stio, &opts, pairfile, depth,
- incremental, width](yield_t &yield) {
- yield();
- switch (width) {
- case 1: {
- RDPFPair<1> rdpfpair =
- stio.rdpfpair<1>(yield, depth, incremental);
- if (opts.expand_rdpfs) {
- rdpfpair.dpf[0].expand(stio.aes_ops());
- rdpfpair.dpf[1].expand(stio.aes_ops());
- }
- pairfile.os() << rdpfpair;
- break;
- }
- case 2: {
- RDPFPair<2> rdpfpair =
- stio.rdpfpair<2>(yield, depth, incremental);
- if (opts.expand_rdpfs) {
- rdpfpair.dpf[0].expand(stio.aes_ops());
- rdpfpair.dpf[1].expand(stio.aes_ops());
- }
- pairfile.os() << rdpfpair;
- break;
- }
- case 3: {
- RDPFPair<3> rdpfpair =
- stio.rdpfpair<3>(yield, depth, incremental);
- if (opts.expand_rdpfs) {
- rdpfpair.dpf[0].expand(stio.aes_ops());
- rdpfpair.dpf[1].expand(stio.aes_ops());
- }
- pairfile.os() << rdpfpair;
- break;
- }
- case 4: {
- RDPFPair<4> rdpfpair =
- stio.rdpfpair<4>(yield, depth, incremental);
- if (opts.expand_rdpfs) {
- rdpfpair.dpf[0].expand(stio.aes_ops());
- rdpfpair.dpf[1].expand(stio.aes_ops());
- }
- pairfile.os() << rdpfpair;
- break;
- }
- case 5: {
- RDPFPair<5> rdpfpair =
- stio.rdpfpair<5>(yield, depth, incremental);
- if (opts.expand_rdpfs) {
- rdpfpair.dpf[0].expand(stio.aes_ops());
- rdpfpair.dpf[1].expand(stio.aes_ops());
- }
- pairfile.os() << rdpfpair;
- break;
- }
- }
- });
- }
- }
- } else if (!strcmp(type, "c")) {
- unsigned char typetag = 0x40;
- unsigned char subtypetag = 0x00;
- stio.queue_p0(&typetag, 1);
- stio.queue_p0(&subtypetag, 1);
- stio.queue_p0(&num, 4);
- stio.queue_p1(&typetag, 1);
- stio.queue_p1(&subtypetag, 1);
- stio.queue_p1(&num, 4);
- for (unsigned int i=0; i<num; ++i) {
- coroutines.emplace_back(
- [&stio](yield_t &yield) {
- yield();
- stio.cdpf(yield);
- });
- }
- } else if (!strcmp(type, "k")) {
- unsigned char typetag = 0x8e;
- unsigned char subtypetag = 0x00;
- stio.queue_p0(&typetag, 1);
- stio.queue_p0(&subtypetag, 1);
- stio.queue_p0(&num, 4);
- stio.queue_p1(&typetag, 1);
- stio.queue_p1(&subtypetag, 1);
- stio.queue_p1(&num, 4);
- coroutines.emplace_back(
- [&stio, num] (yield_t &yield) {
- unsigned int istart = 0x31415080;
- yield();
- for (unsigned int i=istart; i<istart+num; ++i) {
- stio.queue_p0(&i, sizeof(i));
- stio.queue_p1(&i, sizeof(i));
- yield();
- unsigned int p0i, p1i;
- stio.recv_p0(&p0i, sizeof(p0i));
- stio.recv_p1(&p1i, sizeof(p1i));
- if (p0i != i || p1i != i) {
- printf("Incorrect counter received: "
- "p0=%08x p1=%08x\n", p0i,
- p1i);
- }
- }
- });
- }
- free(arg);
- ++threadargs;
- }
- // That's all
- unsigned char typetag = 0x00;
- stio.queue_p0(&typetag, 1);
- stio.queue_p1(&typetag, 1);
- run_coroutines(stio, coroutines);
- ofiles.closeall();
- });
- }
- pool.join();
- }
|