#include "heapsampler.hpp" // In each 64-bit RegAS in the heap, the top bit (of the reconstructed // value) is 0, the next 42 bits are the random tag, and the low 21 bits // are the element value. #define HEAPSAMPLE_TAG_BITS 42 #define HEAPSAMPLE_ELT_BITS 21 // Make the next random tag void HeapSampler::make_randtag(MPCTIO &tio, yield_t &yield) { // Make a uniformly random HEAPSAMPLE_TAG_BITS-bit tag. This needs // to be RegXS in order for the sum (XOR) of P0 and P1's independent // values to be uniform. RegXS tagx; tagx.randomize(HEAPSAMPLE_TAG_BITS); mpc_xs_to_as(tio, yield, randtag, tagx); } // Compute the heap size (the smallest power of two strictly greater // than k) needed to store k elements static size_t heapsize(size_t k) { size_t ret = 1; while (ret <= k) { ret <<= 1; } return ret; } // Return a random bit that reconstructs to 1 with probability k/m static RegBS weighted_coin(MPCTIO &tio, yield_t &yield, size_t k, size_t m) { RegAS limit; limit.ashare = size_t((__uint128_t(k)<<63)/m) * !tio.player(); RegXS randxs; randxs.randomize(63); RegAS randas; mpc_xs_to_as(tio, yield, randas, randxs); CDPF cdpf = tio.cdpf(yield); auto[lt, eq, gt] = cdpf.compare(tio, yield, randas-limit, tio.aes_ops()); return lt; } // Constructor for a HeapSampler that samples k items from a stream // of abritrary and unknown size, using O(k) memory HeapSampler::HeapSampler(MPCTIO &tio, yield_t &yield, size_t k) : k(k), m(0), heap(tio.player(), heapsize(k)) { run_coroutines(tio, [&tio, this](yield_t &yield) { heap.init(tio, yield); }, [&tio, this](yield_t &yield) { make_randtag(tio, yield); }); } // An element has arrived void HeapSampler::ingest(MPCTIO &tio, yield_t &yield, RegAS elt) { ++m; RegAS tagged_elt = (randtag << HEAPSAMPLE_ELT_BITS) + elt; RegAS elt_to_insert = tagged_elt; if (m > k) { RegAS extracted_elt; RegBS selection_bit; run_coroutines(tio, [&tio, this, &extracted_elt](yield_t &yield) { extracted_elt = heap.extract_min(tio, yield); }, [&tio, this, &selection_bit](yield_t &yield) { selection_bit = weighted_coin(tio, yield, k, m); }); mpc_select(tio, yield, elt_to_insert, selection_bit, extracted_elt, tagged_elt); } run_coroutines(tio, [&tio, this, elt_to_insert](yield_t &yield) { heap.insert_optimized(tio, yield, elt_to_insert); }, [&tio, this](yield_t &yield) { make_randtag(tio, yield); }); } // The stream has ended; output min(k,m) randomly sampled elements. // After calling this function, the HeapSampler is reset to its // initial m=0 state. std::vector HeapSampler::close(MPCTIO &tio, yield_t &yield) { size_t retsize = k; if (m < k) { retsize = m; } std::vector ret(retsize); for (size_t i=0; i coroutines; for (size_t i=0; i sample = sampler.close(tio, yield); tio.sync_lamport(); mpcio.dump_stats(std::cout); mpcio.reset_stats(); tio.reset_lamport(); std::cout << "\n===== CHECKING =====\n"; size_t expected_size = k; if (n < k) { expected_size = n; } assert(sample.size() == expected_size); std::vector reconstructed_sample(expected_size); std::vector coroutines; for (size_t i=0; i