#include #include #include #include #include #include #include #include using boost::asio::ip::tcp; #include "spir.hpp" using std::cout; using std::cerr; static inline size_t elapsed_us(const struct timeval *start) { struct timeval end; gettimeofday(&end, NULL); return (end.tv_sec-start->tv_sec)*1000000 + end.tv_usec - start->tv_usec; } using socket_t = boost::asio::ip::tcp::socket; void accept_conncections_from_Pb(boost::asio::io_context&io_context, std::vector& sockets_, int port, size_t j) { tcp::acceptor acceptor_a(io_context, tcp::endpoint(tcp::v4(), port)); tcp::socket sb_a(acceptor_a.accept()); sockets_[j] = std::move(sb_a); // sockets_.emplace_back(std::move(sb_a)); } void write_pub_params(tcp::socket& sout, string pub_params) { auto * bytes_to_write = pub_params.data(); auto bytes_remaining = pub_params.length(); while (bytes_remaining ) { auto bytes_written = sout.write_some(boost::asio::buffer(bytes_to_write, bytes_remaining)); bytes_to_write += bytes_written; bytes_remaining -= bytes_written; } } void read_pub_params(tcp::socket& sin, string& pub_params_recv, size_t len) { pub_params_recv.resize(len); auto bytes_remaining = len; char * bytes_to_read = (char*)pub_params_recv.data(); while (bytes_remaining ) { auto bytes_read = sin.read_some(boost::asio::buffer(bytes_to_read, bytes_remaining)); bytes_to_read += bytes_read; bytes_remaining -= bytes_read; } } int main(int argc, char **argv) { boost::asio::io_context io_context; tcp::resolver resolver(io_context); std::string addr = "127.0.0.1"; const std::string host1 = (argc <= 1) ? "127.0.0.1" : argv[1]; const size_t number_of_sockets = 5; std::vector sockets_; for(size_t j = 0; j < number_of_sockets + 1; ++j) { tcp::socket emptysocket(io_context); sockets_.emplace_back(std::move(emptysocket)); } sockets_.reserve(number_of_sockets + 1); printf("number_of_sockets = %zu\n", number_of_sockets); std::vector sockets_2; std::vector ports; for(size_t j = 0; j < number_of_sockets; ++j) { int port = 6000; ports.push_back(port + j); } std::vector ports2_0; for(size_t j = 0; j < number_of_sockets; ++j) { int port = 8000; ports2_0.push_back(port + j); } std::vector ports2_1; for(size_t j = 0; j < number_of_sockets; ++j) { int port = 9000; ports2_1.push_back(port + j); } #if (PARTY == 0) for(size_t j = 0; j < number_of_sockets; ++j) { tcp::socket sb_a(io_context); boost::asio::connect(sb_a, resolver.resolve({host1, std::to_string(ports[j])})); sockets_[j] = std::move(sb_a); } #else boost::asio::thread_pool pool2(number_of_sockets); for(size_t j = 0; j < number_of_sockets; ++j) { boost::asio::post(pool2, std::bind(accept_conncections_from_Pb, std::ref(io_context), std::ref(sockets_), ports[j], j)); } pool2.join(); #endif #if (PARTY == 0) std::cout << "PARTY 0" << std::endl; #endif #if (PARTY == 1) std::cout << "PARTY 1" << std::endl; #endif // if (argc < 2 || argc > 5) { // cerr << "Usage: " << argv[0] << " r [num_threads [num_preproc [num_pirs]]]\n"; // cerr << "r = log_2(num_records)\n"; // exit(1); // } uint32_t r, num_threads = 1, num_preproc = 1, num_pirs = 1; r = strtoul(argv[2], NULL, 10); size_t num_records = ((size_t) 1)< 3) { num_threads = strtoul(argv[3], NULL, 10); } if (argc > 4) { num_preproc = strtoul(argv[4], NULL, 10); } if (argc > 5) { num_pirs = strtoul(argv[5], NULL, 10); } else { num_pirs = num_preproc; } cout << "===== ONE-TIME SETUP =====\n\n"; struct timeval otsetup_start; gettimeofday(&otsetup_start, NULL); cout << "num_threads = " << num_threads << "\n"; SPIR::init(num_threads); string pub_params, pub_params_recv; SPIR_Client client(r, pub_params); std::thread writer(write_pub_params, std::ref(sockets_[0]), pub_params); std::thread reader(read_pub_params, std::ref(sockets_[0]), std::ref(pub_params_recv), pub_params.size()); writer.join(); reader.join(); SPIR_Server server(r, pub_params_recv); size_t otsetup_us = elapsed_us(&otsetup_start); cout << "One-time setup: " << otsetup_us << " µs\n"; cout << "pub_params len = " << pub_params_recv.length() << "\n"; cout << "\n===== PREPROCESSING =====\n\n"; cout << "num_preproc = " << num_preproc << "\n"; struct timeval preproc_client_start; gettimeofday(&preproc_client_start, NULL); string preproc_msg = client.preproc(num_preproc); string preproc_msg_recv = preproc_msg; boost::asio::write(sockets_[0], boost::asio::buffer(preproc_msg)); boost::asio::read(sockets_[0], boost::asio::buffer(preproc_msg_recv)); size_t preproc_client_us = elapsed_us(&preproc_client_start); cout << "Preprocessing client: " << preproc_client_us << " µs\n"; cout << "preproc_msg len = " << preproc_msg.length() << "\n"; struct timeval preproc_server_start; gettimeofday(&preproc_server_start, NULL); string preproc_resp = server.preproc_process(preproc_msg_recv); string preproc_resp_recv = preproc_resp; boost::asio::write(sockets_[0], boost::asio::buffer(preproc_resp)); boost::asio::read(sockets_[0], boost::asio::buffer(preproc_resp_recv)); size_t preproc_server_us = elapsed_us(&preproc_server_start); cout << "Preprocessing server: " << preproc_server_us << " µs\n"; cout << "preproc_resp len = " << preproc_resp.length() << "\n"; struct timeval preproc_finish_start; gettimeofday(&preproc_finish_start, NULL); client.preproc_finish(preproc_resp_recv); size_t preproc_finish_us = elapsed_us(&preproc_finish_start); cout << "Preprocessing client finish: " << preproc_finish_us << " µs\n"; size_t preproc_total_us = elapsed_us(&preproc_client_start); cout << "\n\nTotal preprocessing time: " << preproc_total_us << " µs\n"; cout << "Total preprocessing bytes: " << (preproc_msg.length() + preproc_resp.length()) << "\n"; // Create the database SPIR::DBEntry *db = new SPIR::DBEntry[num_records]; for (size_t i=0; i