Browse Source

Adithya's patches to the https://git-crysp.uwaterloo.ca/iang/spiral-spir repo

Adithya 1 year ago
parent
commit
f45c2908fc
4 changed files with 239 additions and 40 deletions
  1. 13 4
      cpir-read/cxx/Makefile
  2. 4 0
      cpir-read/cxx/reading.h
  3. 1 1
      cpir-read/cxx/spir.hpp
  4. 221 35
      cpir-read/cxx/spir_test.cpp

+ 13 - 4
cpir-read/cxx/Makefile

@@ -1,7 +1,16 @@
-CXXFLAGS = -O3 -Wall
+CXXFLAGS = -g -march=native -std=c++17 -Wall -pedantic -fopenmp -O3 -Wno-ignored-attributes -pthread
 
-spir_test: spir_test.o libspir_cxx.a
-	g++ -o $@ $^ -lpthread -ldl
+test0: 
+	g++ -O3 -Wall   -c -o spir_test1.o spir_test.cpp -DPARTY=1
+
+test1: 
+	g++ -O3 -Wall   -c -o spir_test0.o spir_test.cpp -DPARTY=0
+
+spir_test0: spir_test0.o libspir_cxx.a
+	g++ -o $@ $^  -DPARTY=0 -lpthread -ldl
+
+spir_test1: spir_test1.o libspir_cxx.a
+	g++ -o $@ $^  -DPARTY=1  -lpthread -ldl
 
 libspir_cxx.a: spir.o ../target/release/libspiral_spir.a
 	cp ../target/release/libspiral_spir.a $@
@@ -11,4 +20,4 @@ libspir_cxx.a: spir.o ../target/release/libspiral_spir.a
 	RUSTFLAGS="-C target-cpu=native" cargo build --release
 
 clean:
-	-rm -f libspir_cxx.a spir.o spir_test.o spir_test
+	-rm -f libspir_cxx.a spir.o spir_test.o spir_test0 spir_test1 spir_test1.o spir_test0.o

+ 4 - 0
cpir-read/cxx/reading.h

@@ -0,0 +1,4 @@
+void blind_the_database(uint64_t * db, uint64_t blind, size_t db_size)
+{
+		for(size_t j = 0; j < db_size; ++j) db[j] += blind;
+}

+ 1 - 1
cpir-read/cxx/spir.hpp

@@ -4,7 +4,7 @@
 #include <string>
 #include <stdint.h>
 
-using std::string;
+using std::string; 
 
 class SPIR {
 public:

+ 221 - 35
cpir-read/cxx/spir_test.cpp

@@ -2,56 +2,181 @@
 #include <stdlib.h>
 #include <sys/random.h>
 #include <sys/time.h>
-#include <unistd.h>
+#include <unistd.h>   
+#include <bsd/stdlib.h> 
+#include <boost/asio.hpp>
+#include <string> 
+using boost::asio::ip::tcp;
 #include "spir.hpp"
-
+#include "reading.h"
 using std::cout;
 using std::cerr;
-
+ 
 static inline size_t elapsed_us(const struct timeval *start)
-{
+{ 
     struct timeval end;
     gettimeofday(&end, NULL);
     return (end.tv_sec-start->tv_sec)*1000000 + end.tv_usec - start->tv_usec;
 }
+using socket_t = boost::asio::ip::tcp::socket;
+ 
+ void accept_conncections_from_Pb(boost::asio::io_context&io_context, std::vector<socket_t>& sockets_, int port, size_t j)
+{
+     tcp::acceptor acceptor_a(io_context, tcp::endpoint(tcp::v4(), port));
+     tcp::socket sb_a(acceptor_a.accept());
+     sockets_[j] = std::move(sb_a); 
+   
+  // sockets_.emplace_back(std::move(sb_a)); 
+}
+
+  void write_pub_params(tcp::socket& sout, string pub_params)
+  {
+     auto * bytes_to_write = pub_params.data();
+     auto bytes_remaining = pub_params.length();
+     while (bytes_remaining )
+     {
+         auto bytes_written = sout.write_some(boost::asio::buffer(bytes_to_write, bytes_remaining));
+         bytes_to_write += bytes_written;
+         bytes_remaining -= bytes_written; 
+      }
+  }
+
+  void read_pub_params(tcp::socket& sin, string& pub_params_recv, size_t len)
+  {    
+       pub_params_recv.resize(len);
+       auto bytes_remaining = len;
+
+        char * bytes_to_read = (char*)pub_params_recv.data();
+ 
+        while (bytes_remaining )
+        {
+          auto bytes_read = sin.read_some(boost::asio::buffer(bytes_to_read, bytes_remaining));
+          bytes_to_read += bytes_read;
+          bytes_remaining -= bytes_read; 
+        }
+  }
 
 int main(int argc, char **argv)
-{
-    if (argc < 2 || argc > 5) {
-        cerr << "Usage: " << argv[0] << " r [num_threads [num_preproc [num_pirs]]]\n";
-        cerr << "r = log_2(num_records)\n";
-        exit(1);
+{ 
+    boost::asio::io_context io_context;
+  tcp::resolver resolver(io_context);
+    std::string addr = "127.0.0.1";
+  
+  const std::string host1 = (argc < 1) ? "127.0.0.1" : argv[1];
+ 
+
+  
+ 
+  const size_t number_of_sockets = 5;
+    std::vector<socket_t> sockets_;
+    for(size_t j = 0; j < number_of_sockets + 1; ++j)
+    {
+        tcp::socket emptysocket(io_context);
+        sockets_.emplace_back(std::move(emptysocket));
+    }
+    sockets_.reserve(number_of_sockets + 1);
+    printf("number_of_sockets = %zu\n", number_of_sockets);
+
+    std::vector<socket_t> sockets_2;
+
+    std::vector<int> ports;
+    for(size_t j = 0; j < number_of_sockets; ++j) 
+    {
+        int port = 6000;
+        ports.push_back(port + j);
     }
-    uint32_t r, num_threads = 1, num_preproc = 1, num_pirs = 1;
-    r = strtoul(argv[1], NULL, 10);
-    size_t num_records = ((size_t) 1)<<r;
-    size_t num_records_mask = num_records - 1;
-    if (argc > 2) {
-        num_threads = strtoul(argv[2], NULL, 10);
+    
+    std::vector<int> ports2_0;
+    for(size_t j = 0; j < number_of_sockets; ++j) 
+    {
+        int port = 8000;
+        ports2_0.push_back(port + j);
     }
-    if (argc > 3) {
-        num_preproc = strtoul(argv[3], NULL, 10);
+
+    std::vector<int> ports2_1;
+    for(size_t j = 0; j < number_of_sockets; ++j) 
+    {
+        int port = 9000;
+        ports2_1.push_back(port + j);
     }
-    if (argc > 4) {
-        num_pirs = strtoul(argv[4], NULL, 10);
-    } else {
-        num_pirs = num_preproc;
+
+
+
+  #if (PARTY == 0)    
+     
+ 
+    for(size_t j = 0; j < number_of_sockets; ++j)
+    {
+      tcp::socket sb_a(io_context);
+      boost::asio::connect(sb_a, resolver.resolve({host1, std::to_string(ports[j])}));
+        sockets_[j] = std::move(sb_a); 
     }
+ #else  
+ 
+ 
+ 
+
+   boost::asio::thread_pool pool2(number_of_sockets); 
+   for(size_t j = 0; j < number_of_sockets; ++j)
+   {
+        boost::asio::post(pool2, std::bind(accept_conncections_from_Pb,  std::ref(io_context), std::ref(sockets_), ports[j],  j));
+   } 
+   pool2.join();
+
+#endif
+
+ 
+    #if (PARTY == 0)    
+        std::cout << "PARTY 0" << std::endl;
+    #endif
+
+    #if (PARTY == 1)    
+        std::cout << "PARTY 1" << std::endl;
+    #endif
+
+        // if (argc < 2 || argc > 5) {
+        //     cerr << "Usage: " << argv[0] << " r [num_threads [num_preproc [num_pirs]]]\n";
+        //     cerr << "r = log_2(num_records)\n";
+        //     exit(1);
+        // }
+        uint32_t r, num_threads = 1, num_preproc = 1, num_pirs = 1;
+        r = strtoul(argv[2], NULL, 10);
+        size_t num_records = ((size_t) 1)<<r;
+        size_t num_records_mask = num_records - 1;
+        if (argc > 4) {
+            num_threads = strtoul(argv[3], NULL, 10);
+        }
+        if (argc > 5) {
+            num_preproc = strtoul(argv[4], NULL, 10);
+        }
+        if (argc > 5) {
+            num_pirs = strtoul(argv[5], NULL, 10);
+        } else {
+            num_pirs = num_preproc;
+        }
 
-    cout << "===== ONE-TIME SETUP =====\n\n";
+          cout << "===== ONE-TIME SETUP =====\n\n";
 
     struct timeval otsetup_start;
     gettimeofday(&otsetup_start, NULL);
 
     SPIR::init(num_threads);
-    string pub_params;
+
+    string pub_params, pub_params_recv;
     SPIR_Client client(r, pub_params);
-    SPIR_Server server(r, pub_params);
 
+    std::thread writer(write_pub_params, std::ref(sockets_[0]), pub_params);
+    std::thread reader(read_pub_params, std::ref(sockets_[0]), std::ref(pub_params_recv), pub_params.size());
+
+    writer.join();
+    reader.join();
+ 
+
+    SPIR_Server server(r, pub_params_recv);
     size_t otsetup_us = elapsed_us(&otsetup_start);
     cout << "One-time setup: " << otsetup_us << " µs\n";
-    cout << "pub_params len = " << pub_params.length() << "\n";
-
+    cout << "pub_params len = " << pub_params_recv.length() << "\n";
+ 
     cout << "\n===== PREPROCESSING =====\n\n";
 
     cout << "num_preproc = " << num_preproc << "\n";
@@ -60,6 +185,14 @@ int main(int argc, char **argv)
     gettimeofday(&preproc_client_start, NULL);
 
     string preproc_msg = client.preproc(num_preproc);
+    
+    string preproc_msg_recv = preproc_msg;
+
+    boost::asio::write(sockets_[0], boost::asio::buffer(preproc_msg));
+    boost::asio::read(sockets_[0], boost::asio::buffer(preproc_msg_recv));
+
+
+
     size_t preproc_client_us = elapsed_us(&preproc_client_start);
     cout << "Preprocessing client: " << preproc_client_us << " µs\n";
     cout << "preproc_msg len = " << preproc_msg.length() << "\n";
@@ -67,7 +200,12 @@ int main(int argc, char **argv)
     struct timeval preproc_server_start;
     gettimeofday(&preproc_server_start, NULL);
 
-    string preproc_resp = server.preproc_process(preproc_msg);
+    string preproc_resp = server.preproc_process(preproc_msg_recv);
+    
+    string preproc_resp_recv = preproc_resp;
+    boost::asio::write(sockets_[0], boost::asio::buffer(preproc_resp));
+    boost::asio::read(sockets_[0], boost::asio::buffer(preproc_resp_recv));
+
     size_t preproc_server_us = elapsed_us(&preproc_server_start);
     cout << "Preprocessing server: " << preproc_server_us << " µs\n";
     cout << "preproc_resp len = " << preproc_resp.length() << "\n";
@@ -75,16 +213,23 @@ int main(int argc, char **argv)
     struct timeval preproc_finish_start;
     gettimeofday(&preproc_finish_start, NULL);
 
-    client.preproc_finish(preproc_resp);
+    client.preproc_finish(preproc_resp_recv);
     size_t preproc_finish_us = elapsed_us(&preproc_finish_start);
     cout << "Preprocessing client finish: " << preproc_finish_us << " µs\n";
 
     // Create the database
     SPIR::DBEntry *db = new SPIR::DBEntry[num_records];
     for (size_t i=0; i<num_records; ++i) {
-        db[i] = i * 10000001;
+        db[i] = i;// * 10000001;
+        #if(PARTY == 0)
+        db[i] = 0;
+        #endif
     }
 
+    SPIR::DBEntry rand_blind =  1221030;
+
+    //blind_the_database(db, rand_blind, num_records);
+    
     for (size_t i=0; i<num_pirs; ++i) {
         cout << "\n===== SPIR QUERY " << i+1 << " =====\n\n";
 
@@ -94,11 +239,30 @@ int main(int argc, char **argv)
             exit(1);
         }
         idx &= num_records_mask;
+                boost::asio::write(sockets_[0], boost::asio::buffer(&idx, sizeof(idx)));
+        size_t idx_recv;
+        boost::asio::read(sockets_[0], boost::asio::buffer(&idx_recv, sizeof(idx_recv)));
+
+        idx_recv += idx;
+        idx_recv = idx_recv % num_records;
+        cout << "idx = " << idx << std::endl;
+        cout << "idx_reconstructed = " << idx_recv << std::endl;
+        // idx = 100;
+        // #if(PARTY == 1) 
+        //     idx = 40;
+        // #endif
 
         struct timeval query_client_start;
         gettimeofday(&query_client_start, NULL);
 
         string query_msg = client.query(idx);
+        boost::asio::write(sockets_[0], boost::asio::buffer(query_msg));
+        string query_msg_recv = client.query(idx);
+        boost::asio::read(sockets_[0], boost::asio::buffer(query_msg_recv));
+
+
+
+
         size_t query_client_us = elapsed_us(&query_client_start);
         cout << "Query client: " << query_client_us << " µs\n";
         cout << "query_msg len = " << query_msg.length() << "\n";
@@ -106,7 +270,12 @@ int main(int argc, char **argv)
         struct timeval query_server_start;
         gettimeofday(&query_server_start, NULL);
 
-        string query_resp = server.query_process(query_msg, db, 100, 20);
+        //string query_resp = server.query_process(query_msg_recv, db, 0, 0);
+        string query_resp = server.query_process(query_msg_recv, db, idx, rand_blind);
+        boost::asio::write(sockets_[0], boost::asio::buffer(query_resp));
+        string query_resp_recv = query_resp;
+        boost::asio::read(sockets_[0], boost::asio::buffer(query_resp_recv));
+
         size_t query_server_us = elapsed_us(&query_server_start);
         cout << "Query server: " << query_server_us << " µs\n";
         cout << "query_resp len = " << query_resp.length() << "\n";
@@ -114,13 +283,30 @@ int main(int argc, char **argv)
         struct timeval query_finish_start;
         gettimeofday(&query_finish_start, NULL);
 
-        SPIR::DBEntry entry = client.query_finish(query_resp);
+        SPIR::DBEntry entry = client.query_finish(query_resp_recv);
+
+
+        boost::asio::write(sockets_[0], boost::asio::buffer(&entry, sizeof(entry)));
+        SPIR::DBEntry entry_recv;
+        boost::asio::read(sockets_[0], boost::asio::buffer(&entry_recv, sizeof(entry)));
+
+        SPIR::DBEntry read_output = entry_recv - rand_blind;
+       
+        boost::asio::write(sockets_[0], boost::asio::buffer(&read_output, sizeof(entry)));
+        SPIR::DBEntry read_output_recv;
+        boost::asio::read(sockets_[0], boost::asio::buffer(&read_output_recv, sizeof(entry)));
+        
+        read_output_recv += read_output;
+
+        cout << "read_output_recv = " << read_output_recv << std::endl;
+
+
         size_t query_finish_us = elapsed_us(&query_finish_start);
         cout << "Query client finish: " << query_finish_us << " µs\n";
         cout << "idx = " << idx << "; entry = " << entry << "\n";
-    }
+     }
 
-    delete[] db;
+        delete[] db;
 
-    return 0;
-}
+        return 0;
+    }