#include // std::is_same<> #include // std::numeric_limits<> #include // CHAR_BIT #include // std::log2, std::ceil, std::floor #include // std::runtime_error #include // std::array<> #include // std::istream and std::ostream #include // std::vector<> #include // std::shared_ptr<> #include // std::move #include // std::copy #include // std::memcpy #include // arc4random_buf #include // SSE and AVX intrinsics #include <../boost/asio/thread_pool.hpp> #include <../boost/lexical_cast.hpp> #include <../boost/asio.hpp> #include #include #include #include #include #include #include #include typedef __m128i node_t; constexpr size_t leaf_size = 1; typedef __m128i leaf_type; typedef std::array leaf_t; size_t communication_cost = 0; #include "bitutils.h" #include "block.h" #include "prg_aes_impl.h" #include "filesio.h" using boost::asio::ip::tcp; using socket_t = boost::asio::ip::tcp::socket; using namespace dpf; // The namespace is found in bitutils.h #include "mpc.h" #include "network.h" #include "dpfgen.h" #include "share-conversion.h" int main(int argc, char * argv[]) { boost::asio::io_context io_context; std::string addr = "127.0.0.1"; const std::string host1 = (argc < 2) ? "127.0.0.1" : argv[1]; const std::string host2 = (argc < 3) ? "127.0.0.1" : argv[2]; const size_t n_threads = atoi(argv[3]); const size_t expo = atoi(argv[4]); const size_t maxRAM = atoi(argv[5]); //std::cout << "n_threads = " << n_threads << std::endl; const size_t number_of_sockets = 5 * n_threads; std::vector socketsPb, socketsP2; std::vector ports, ports2_1, ports2_0; bool party; /* The function make_connections appears in network.h */ make_connections(party, host1, host2, io_context, socketsPb, socketsP2, ports, ports2_1, ports2_0, number_of_sockets); const size_t db_nitems = 1ULL << atoi(argv[4]); //std::cout << "maxRAM = " << maxRAM << std::endl; size_t RAM_needed = 0; RAM_needed = n_threads * 164 * db_nitems; //std::cout << "RAM needed = " << RAM_needed << " bytes = " << RAM_needed/1073741824 << " GiB" << std::endl; size_t n_batches = std::ceil(double(RAM_needed)/(1073741824 * maxRAM)); //std::cout << "n_batches = " << n_batches << std::endl; size_t thread_per_batch = std::ceil(double(n_threads)/n_batches); //std::cout << "thread_per_batch = " << thread_per_batch << std::endl; if(n_batches > n_threads) { std::cout << "You need more RAM" << std::endl; exit(0); } uint8_t ** target_share_read = new uint8_t*[thread_per_batch]; generate_random_targets(target_share_read, thread_per_batch, party, expo); AES_KEY aeskey; auto start = std::chrono::steady_clock::now(); __m128i * final_correction_word = (__m128i *) std::aligned_alloc(sizeof(__m256i), thread_per_batch * sizeof(__m128i)); __m128i ** output = (__m128i ** ) malloc(sizeof(__m128i *) * thread_per_batch); int8_t ** flags = (int8_t ** ) malloc(sizeof(uint8_t *) * thread_per_batch); for(size_t j = 0; j < thread_per_batch; ++j) { output[j] = (__m128i *)std::aligned_alloc(sizeof(node_t), db_nitems * sizeof(__m128i)); flags[j] = (int8_t *)std::aligned_alloc(sizeof(node_t), db_nitems * sizeof(uint8_t)); } boost::asio::thread_pool pool_share_conversion(thread_per_batch); // The following function call creates and evaluates DPFs at target_share_read[j] for j \in \{0, \ldots, n_threads} // the flag vectors are stored in flags // the leaves are stored in output // the final correctionword is stored in final_correction_word dpfP2 * dpf_instance = (dpfP2 * ) malloc (sizeof(dpfP2) * n_threads); cw_construction computecw_array; boost::asio::read(socketsP2[0], boost::asio::buffer(&computecw_array, sizeof(computecw_array))); #ifdef VERBOSE std::cout << "computecw_array.rand_b: " << computecw_array.rand_b[0] << " " << computecw_array.rand_b[1] << std::endl; #endif /* The function create_dpfs appears in dpf-gen.h*/ bool reading = true; for(size_t iter = 0; iter < n_batches; ++iter) { boost::asio::thread_pool pool(thread_per_batch); for(size_t j = 0; j < thread_per_batch; ++j) { boost::asio::post(pool, std::bind(create_dpfs, reading, db_nitems, std::ref(aeskey), target_share_read[j], std::ref(socketsPb), std::ref(socketsP2), 0, db_nitems-1, output[j], flags[j], std::ref(final_correction_word[j]), computecw_array, std::ref(dpf_instance), party, 5 * j, j)); } pool.join(); } boost::asio::write(socketsP2[0], boost::asio::buffer(dpf_instance, n_threads * sizeof(dpfP2))); // do this in parallel. communication_cost += (n_threads * sizeof(dpfP2)); #ifdef DEBUG for(size_t j = 0; j < n_threads; ++j) { std::cout << "n_threads = " << j << std::endl; for(size_t i = 0; i < db_nitems; ++i) { int8_t flags_reconstruction; boost::asio::write(socketsPb[0], boost::asio::buffer(&flags[j][i], sizeof(flags[j][i]))); boost::asio::read(socketsPb[0], boost::asio::buffer(&flags_reconstruction, sizeof(flags_reconstruction))); flags_reconstruction -= flags[j][i]; if(flags_reconstruction != 0) std::cout << i << " (flag) ---> " << (int) flags_reconstruction << std::endl; int64_t output_reconstruction; boost::asio::write(socketsPb[0], boost::asio::buffer(&output[j][i][0], sizeof(output[j][i][0]))); boost::asio::read(socketsPb[0], boost::asio::buffer(&output_reconstruction, sizeof(output_reconstruction))); output_reconstruction -= output[j][i][0]; if(output_reconstruction != 0) std::cout << i << " (output) ---> " << output_reconstruction << std::endl; } int64_t final_correction_word_reconstruction = 0; boost::asio::write(socketsPb[0], boost::asio::buffer(&final_correction_word[j][0], sizeof(final_correction_word[j][0]))); boost::asio::read(socketsPb[0], boost::asio::buffer(&final_correction_word_reconstruction, sizeof(final_correction_word_reconstruction))); final_correction_word_reconstruction = final_correction_word_reconstruction + final_correction_word[j][0]; std::cout << "final_correction_word_reconstruction = " << final_correction_word_reconstruction << std::endl << std::endl; } #endif /* leaves is a additive shares of the outputs (leaves of the DPF) leafbits is the additive shares of flag bits of the DPFs */ int64_t ** leaves = (int64_t ** ) malloc(sizeof(int64_t *) * thread_per_batch); int64_t ** leafbits = (int64_t ** ) malloc(sizeof(int64_t *) * thread_per_batch); for(size_t j = 0; j < thread_per_batch; ++j) { leaves[j] = (int64_t *)std::aligned_alloc(sizeof(node_t), db_nitems * sizeof(int64_t)); leafbits[j] = (int64_t *)std::aligned_alloc(sizeof(node_t), db_nitems * sizeof(int64_t)); } /* The function convert_shares appears in share-conversion.h */ for(size_t j = 0; j < thread_per_batch; ++j) { boost::asio::post(pool_share_conversion, std::bind(convert_shares, j, output, flags, n_threads, db_nitems, final_correction_word, leaves, leafbits, std::ref(socketsPb), std::ref(socketsP2), party)); } pool_share_conversion.join(); boost::asio::thread_pool pool_xor_to_additive(thread_per_batch); std::array additve_shares; for(size_t j = 0; j < thread_per_batch; ++j) { boost::asio::post(pool_xor_to_additive, std::bind(xor_to_additive, party, target_share_read[j], std::ref(socketsPb[j]), std::ref(socketsP2[j]), expo, std::ref(additve_shares[j]))); } pool_xor_to_additive.join(); for(size_t i = 0; i < thread_per_batch; ++i) { write_evalfull_outs_into_a_file(party, i, db_nitems, flags[i], leaves[i], final_correction_word[i], additve_shares[i]); } auto end = std::chrono::steady_clock::now(); std::chrono::duration elapsed_seconds = end-start; //std::cout << "time to generate and evaluate " << n_threads << " dpfs of size 2^" << atoi(argv[4]) << " is: " << elapsed_seconds.count() << "s\n"; std::cout << "WallClockTime: " << elapsed_seconds.count() << std::endl; // std::cout << "elapsed_ FIO = " << elapsed_seconds.count() << std::endl; std::cout << "CommunicationCost: " << communication_cost << " bytes" << std::endl; #ifdef VERBOSE for(size_t j = 0; j < n_threads; ++j) { int64_t add_; boost::asio::write(socketsPb[0], boost::asio::buffer(&additve_shares[j], sizeof(additve_shares[j]))); boost::asio::read(socketsPb[0], boost::asio::buffer(&add_, sizeof(add_))); add_ = add_ + additve_shares[j]; std::cout << "add_ = " << add_ << std::endl; } #endif return 0; }