123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- #include <type_traits> // std::is_same<>
- #include <limits> // std::numeric_limits<>
- #include <climits> // CHAR_BIT
- #include <cmath> // std::log2, std::ceil, std::floor
- #include <stdexcept> // std::runtime_error
- #include <array> // std::array<>
- #include <iostream> // std::istream and std::ostream
- #include <vector> // std::vector<>
- #include <memory> // std::shared_ptr<>
- #include <utility> // std::move
- #include <algorithm> // std::copy
- #include <cstring> // std::memcpy
- #include <bsd/stdlib.h> // arc4random_buf
- #include <x86intrin.h> // SSE and AVX intrinsics
- #include <../boost/asio/thread_pool.hpp>
- #include <../boost/lexical_cast.hpp>
- #include <../boost/asio.hpp>
- #include <fcntl.h>
- #include <cstdlib>
- #include <chrono>
- #include <sys/mman.h>
- #include <sys/stat.h>
- #include <fstream>
- #include <future>
- #include <mutex>
-
- typedef __m128i node_t;
- constexpr size_t leaf_size = 1;
- typedef __m128i leaf_type;
- typedef std::array<leaf_type, leaf_size> leaf_t;
- size_t communication_cost = 0;
- #include "bitutils.h"
- #include "block.h"
- #include "prg_aes_impl.h"
- #include "filesio.h"
- using boost::asio::ip::tcp;
- using socket_t = boost::asio::ip::tcp::socket;
-
- using namespace dpf; // The namespace is found in bitutils.h
- #include "mpc.h"
- #include "network.h"
- #include "dpfgen.h"
- #include "share-conversion.h"
- int main(int argc, char * argv[])
- {
- boost::asio::io_context io_context;
- std::string addr = "127.0.0.1";
- const std::string host1 = (argc < 2) ? "127.0.0.1" : argv[1];
- const std::string host2 = (argc < 3) ? "127.0.0.1" : argv[2];
- const size_t n_threads = atoi(argv[3]);
- const size_t expo = atoi(argv[4]);
- const size_t db_nitems = 1ULL << expo;
- const size_t maxRAM = atoi(argv[5]);
- //std::cout << "n_threads = " << n_threads << std::endl;
-
- const size_t number_of_sockets = 5 * n_threads;
- std::vector<socket_t> socketsPb, socketsP2;
- std::vector<int> ports, ports2_1, ports2_0;
-
- bool party;
- /* The function make_connections appears in network.h */
- make_connections(party, host1, host2, io_context, socketsPb, socketsP2, ports, ports2_1, ports2_0, number_of_sockets);
-
- size_t RAM_needed_per_thread = 164 * db_nitems;
- std::cout << "RAM needed = " << n_threads*RAM_needed_per_thread << " bytes = " << n_threads*RAM_needed_per_thread/1073741824 << " GiB" << std::endl;
- std::cout << "RAM needed per thread = " << RAM_needed_per_thread << " bytes = " << (RAM_needed_per_thread>>30) << " GiB" << std::endl;
- size_t thread_per_batch = std::floor(double(maxRAM<<30)/RAM_needed_per_thread);
- if (thread_per_batch > n_threads) {
- thread_per_batch = n_threads;
- }
- std::cout << "thread_per_batch = " << thread_per_batch << std::endl;
- if (thread_per_batch < 1) {
- std::cout << "You need more RAM" << std::endl;
- exit(0);
- }
- size_t n_batches = std::ceil(double(n_threads)/thread_per_batch);
- std::cout << "n_batches = " << n_batches << std::endl;
- uint8_t ** target_share_read = new uint8_t*[thread_per_batch];
- generate_random_targets(target_share_read, thread_per_batch, party, expo);
-
- AES_KEY aeskey;
-
- auto start = std::chrono::steady_clock::now();
-
- __m128i * final_correction_word = (__m128i *) std::aligned_alloc(sizeof(__m256i), thread_per_batch * sizeof(__m128i));
- __m128i ** output = (__m128i ** ) malloc(sizeof(__m128i *) * thread_per_batch);
-
- int8_t ** flags = (int8_t ** ) malloc(sizeof(uint8_t *) * thread_per_batch);
-
- for(size_t j = 0; j < thread_per_batch; ++j)
- {
- output[j] = (__m128i *)std::aligned_alloc(sizeof(node_t), db_nitems * sizeof(__m128i));
- flags[j] = (int8_t *)std::aligned_alloc(sizeof(node_t), db_nitems * sizeof(uint8_t));
- }
-
-
- boost::asio::thread_pool pool_share_conversion(thread_per_batch);
-
-
- // The following function call creates and evaluates DPFs at target_share_read[j] for j \in \{0, \ldots, n_threads}
- // the flag vectors are stored in flags
- // the leaves are stored in output
- // the final correctionword is stored in final_correction_word
- dpfP2 * dpf_instance = (dpfP2 * ) malloc (sizeof(dpfP2) * n_threads);
- cw_construction computecw_array;
-
- boost::asio::read(socketsP2[0], boost::asio::buffer(&computecw_array, sizeof(computecw_array)));
- #ifdef VERBOSE
- std::cout << "computecw_array.rand_b: " << computecw_array.rand_b[0] << " " << computecw_array.rand_b[1] << std::endl;
- #endif
- /* The function create_dpfs appears in dpf-gen.h*/
- bool reading = true;
-
- size_t *thread_communication_costs = new size_t[thread_per_batch];
- for(size_t iter = 0; iter < n_batches; ++iter)
- {
- if (n_batches > 1) {
- printf("Starting create_dpfs batch %lu / %lu\n", iter+1, n_batches);
- }
- boost::asio::thread_pool pool(thread_per_batch);
- for(size_t j = 0; j < thread_per_batch; ++j)
- {
- thread_communication_costs[j] = 0;
- boost::asio::post(pool,
- std::bind(create_dpfs, reading, db_nitems, std::ref(aeskey),
- target_share_read[j], std::ref(socketsPb), std::ref(socketsP2),
- 0, db_nitems-1, output[j], flags[j],
- std::ref(final_correction_word[j]), computecw_array,
- std::ref(dpf_instance), party, 5 * j, j,
- std::ref(thread_communication_costs[j])));
- }
- pool.join();
- for(size_t j = 0; j < thread_per_batch; ++j) {
- communication_cost += thread_communication_costs[j];
- }
- }
- delete[] thread_communication_costs;
-
- boost::asio::write(socketsP2[0], boost::asio::buffer(dpf_instance, n_threads * sizeof(dpfP2))); // do this in parallel.
- communication_cost += (n_threads * sizeof(dpfP2));
-
- #ifdef DEBUG
- for(size_t j = 0; j < n_threads; ++j)
- {
- std::cout << "n_threads = " << j << std::endl;
- for(size_t i = 0; i < db_nitems; ++i)
- {
- int8_t flags_reconstruction;
- boost::asio::write(socketsPb[0], boost::asio::buffer(&flags[j][i], sizeof(flags[j][i])));
- boost::asio::read(socketsPb[0], boost::asio::buffer(&flags_reconstruction, sizeof(flags_reconstruction)));
- flags_reconstruction -= flags[j][i];
- if(flags_reconstruction != 0) std::cout << i << " (flag) ---> " << (int) flags_reconstruction << std::endl;
- int64_t output_reconstruction;
- boost::asio::write(socketsPb[0], boost::asio::buffer(&output[j][i][0], sizeof(output[j][i][0])));
- boost::asio::read(socketsPb[0], boost::asio::buffer(&output_reconstruction, sizeof(output_reconstruction)));
- output_reconstruction -= output[j][i][0];
- if(output_reconstruction != 0) std::cout << i << " (output) ---> " << output_reconstruction << std::endl;
- }
- int64_t final_correction_word_reconstruction = 0;
- boost::asio::write(socketsPb[0], boost::asio::buffer(&final_correction_word[j][0], sizeof(final_correction_word[j][0])));
- boost::asio::read(socketsPb[0], boost::asio::buffer(&final_correction_word_reconstruction, sizeof(final_correction_word_reconstruction)));
- final_correction_word_reconstruction = final_correction_word_reconstruction + final_correction_word[j][0];
- std::cout << "final_correction_word_reconstruction = " << final_correction_word_reconstruction << std::endl << std::endl;
- }
- #endif
-
- /*
- leaves is a additive shares of the outputs (leaves of the DPF)
- leafbits is the additive shares of flag bits of the DPFs
- */
- int64_t ** leaves = (int64_t ** ) malloc(sizeof(int64_t *) * thread_per_batch);
- int64_t ** leafbits = (int64_t ** ) malloc(sizeof(int64_t *) * thread_per_batch);
- for(size_t j = 0; j < thread_per_batch; ++j)
- {
- leaves[j] = (int64_t *)std::aligned_alloc(sizeof(node_t), db_nitems * sizeof(int64_t));
- leafbits[j] = (int64_t *)std::aligned_alloc(sizeof(node_t), db_nitems * sizeof(int64_t));
- }
- /* The function convert_shares appears in share-conversion.h */
- for(size_t j = 0; j < thread_per_batch; ++j)
- {
- boost::asio::post(pool_share_conversion, std::bind(convert_shares, j, output, flags, n_threads, db_nitems, final_correction_word, leaves, leafbits,
- std::ref(socketsPb), std::ref(socketsP2), party));
- }
-
- pool_share_conversion.join();
- boost::asio::thread_pool pool_xor_to_additive(thread_per_batch);
- int64_t *additve_shares = new int64_t[thread_per_batch];
- for(size_t j = 0; j < thread_per_batch; ++j)
- {
- boost::asio::post(pool_xor_to_additive, std::bind(xor_to_additive, party, target_share_read[j], std::ref(socketsPb[j]), std::ref(socketsP2[j]), expo, std::ref(additve_shares[j])));
- }
- pool_xor_to_additive.join();
-
-
-
- /* For the artifact, don't actually write these in order to not use very
- * large amounts of storage
- for(size_t i = 0; i < thread_per_batch; ++i)
- {
- write_evalfull_outs_into_a_file(party, i, db_nitems, flags[i], leaves[i], final_correction_word[i], additve_shares[i]);
- }
- */
- auto end = std::chrono::steady_clock::now();
- std::chrono::duration<double> elapsed_seconds = end-start;
- //std::cout << "time to generate and evaluate " << n_threads << " dpfs of size 2^" << atoi(argv[4]) << " is: " << elapsed_seconds.count() << "s\n";
- std::cout << "WallClockTime: " << elapsed_seconds.count() << std::endl;
-
- // std::cout << "elapsed_ FIO = " << elapsed_seconds.count() << std::endl;
- std::cout << "CommunicationCost: " << communication_cost << " bytes" << std::endl;
- #ifdef VERBOSE
- for(size_t j = 0; j < n_threads; ++j)
- {
- int64_t add_;
- boost::asio::write(socketsPb[0], boost::asio::buffer(&additve_shares[j], sizeof(additve_shares[j])));
- boost::asio::read(socketsPb[0], boost::asio::buffer(&add_, sizeof(add_)));
- add_ = add_ + additve_shares[j];
- std::cout << "add_ = " << add_ << std::endl;
- }
- #endif
- delete[] additve_shares;
-
- return 0;
- }
|