Преглед изворни кода

Resolving conflicts in merging aaron/public-channel

Sajin Sasy пре 1 година
родитељ
комит
a31e54d049

+ 4 - 4
App/appconfig.cpp

@@ -90,7 +90,8 @@ bool config_parse(Config &config, const std::string configstr,
                 } else if (!pentry.first.compare("master_secret")) {
                     std::string hex_key = pentry.second.data();
                     memcpy(config.master_secret, hex_key.c_str(), SGX_AESGCM_KEY_SIZE);
-
+                } else if (!pentry.first.compare("private_routing")) {
+                    config.private_routing = pentry.second.get_value<bool>();
                 } else {
                     std::cerr << "Unknown field in params: " <<
                         pentry.first << "\n";
@@ -164,6 +165,7 @@ bool config_parse(Config &config, const std::string configstr,
     apiparams.m_pub_out = config.m_pub_out;
     apiparams.m_pub_in = config.m_pub_in;
     memcpy(apiparams.master_secret, config.master_secret, SGX_AESGCM_KEY_SIZE);
+    apiparams.private_routing = config.private_routing;
     nodenum_t num_nodes = (nodenum_t)(config.nodes.size());
     std::vector<EnclaveAPINodeConfig> apinodeconfigs;
     apinodeconfigs.resize(num_nodes);
@@ -173,9 +175,7 @@ bool config_parse(Config &config, const std::string configstr,
         apinodeconfigs[i].weight = config.nodes[i].weight;
         apinodeconfigs[i].roles = config.nodes[i].roles;
     }
-    bool private_routing = true;
-    ret &= ecall_config_load(nthreads, private_routing, &apiparams,
-        apinodeconfigs.data(), num_nodes, config.my_node_num);
+    ret &= ecall_config_load(nthreads, &apiparams, apinodeconfigs.data(), num_nodes, config.my_node_num);
     if (!ret) {
         std::cerr << "Loading config into enclave failed\n";
     }

+ 1 - 0
App/appconfig.hpp

@@ -30,6 +30,7 @@ struct Config {
     uint8_t m_priv_in;
     uint8_t m_pub_out;
     uint8_t m_pub_in;
+    bool private_routing;
     uint16_t nthreads;
     // config for each node
     std::vector<NodeConfig> nodes;

+ 3 - 0
App/launch

@@ -78,6 +78,8 @@ if __name__ == "__main__":
         help='log folder to store logs of each server in an experiment')
     aparse.add_argument('-e', default=None,
         help='Set epoch interval time in seconds')
+    aparse.add_argument('-r', default=None,
+        help='override if routing private channel messages (or public)')
     aparse.add_argument('-n', nargs='*', help='nodes to include')
     aparse.add_argument('cmd', nargs='*', help='experiment to run')
     args = aparse.parse_args()
@@ -92,6 +94,7 @@ if __name__ == "__main__":
         'priv_in': args.b,
         'pub_out': args.C,
         'pub_in': args.c,
+        'private_routing': args.r
     }
 
     config = mkconfig.create_json(args.m, args.p, args.n, params_overrides)

+ 0 - 1
Enclave/Enclave.edl

@@ -18,7 +18,6 @@ enclave {
 
         public bool ecall_config_load(
             threadid_t nthreads,
-            bool private_routing,
             [in] struct EnclaveAPIParams *apiparams,
             [in,count=num_nodes] struct EnclaveAPINodeConfig *apinodeconfigs,
             nodenum_t num_nodes, nodenum_t my_node_num);

+ 16 - 0
Enclave/OblivAlgs/TightCompaction_v2.tcc

@@ -163,9 +163,13 @@ void TightCompact_2power_inner_parallel(unsigned char *buf, size_t N, size_t blo
   if (nthreads <= 1) {
     FOAV_SAFE2_CNTXT(TC_inner_base_cases_of_recursion, N, block_size)
     FOAV_SAFE_CNTXT(TC_inner_base_cases_of_recursion, nthreads)
+#ifdef PROFILE_TIGHTCOMPACT
     unsigned long start = printf_with_rtclock("Thread %u starting TightCompact_2power_inner(buf=%p, N=%lu, offset=%lu, nthreads=%lu)\n", g_thread_id, buf, N, offset, nthreads);
+#endif
     TightCompact_2power_inner<oswap_style>(buf, N, block_size, offset, selected, selected_count);
+#ifdef PROFILE_TIGHTCOMPACT
     printf_with_rtclock_diff(start, "Thread %u ending TightCompact_2power_inner(buf=%p, N=%lu, offset=%lu, nthreads=%lu)\n", g_thread_id, buf, N, offset, nthreads);
+#endif
     return;
   }
   FOAV_SAFE_CNTXT(TC_inner_base_cases_of_recursion, N)
@@ -179,7 +183,9 @@ void TightCompact_2power_inner_parallel(unsigned char *buf, size_t N, size_t blo
     return;
   }
 
+#ifdef PROFILE_TIGHTCOMPACT
   unsigned long start = printf_with_rtclock("Thread %u starting TightCompact_2power_inner_parallel(buf=%p, N=%lu, offset=%lu, nthreads=%lu)\n", g_thread_id, buf, N, offset, nthreads);
+#endif
   // Number of selected items in left half
   size_t m1;
   m1 = selected_count[N/2] - selected_count[0];
@@ -227,7 +233,9 @@ void TightCompact_2power_inner_parallel(unsigned char *buf, size_t N, size_t blo
     FOAV_SAFE2_CNTXT(TC_2power_inner_parallel, i, nthreads)
     threadpool_join(g_thread_id+1+i, NULL);
   }
+#ifdef PROFILE_TIGHTCOMPACT
   printf_with_rtclock_diff(start, "Thread %u ending TightCompact_2power_inner_parallel(buf=%p, N=%lu, offset=%lu, nthreads=%lu)\n", g_thread_id, buf, N, offset, nthreads);
+#endif
 }
 
 /*
@@ -348,9 +356,13 @@ void TightCompact_inner_parallel(unsigned char *buf, size_t N, size_t block_size
   FOAV_SAFE2_CNTXT(TC_inner_base_cases_of_recursion, N, block_size)
   FOAV_SAFE_CNTXT(TC_inner_base_cases_of_recursion, nthreads)
   if (nthreads <= 1 || N < 16) {
+  #ifdef PROFILE_TIGHTCOMPACT
     unsigned long start = printf_with_rtclock("Thread %u starting TightCompact_inner(N=%lu)\n", g_thread_id, N);
+  #endif
     TightCompact_inner<oswap_style>(buf, N, block_size, selected, selected_count);
+  #ifdef PROFILE_TIGHTCOMPACT
     printf_with_rtclock_diff(start, "Thread %u ending TightCompact_inner(N=%lu)\n", g_thread_id, N);
+  #endif
     return;
   }
   if(N==0){
@@ -365,7 +377,9 @@ void TightCompact_inner_parallel(unsigned char *buf, size_t N, size_t block_size
     return;
   }
 
+#ifdef PROFILE_TIGHTCOMPACT
   unsigned long start = printf_with_rtclock("Thread %u starting TightCompact_inner_parallel(N=%lu, nthreads=%lu)\n", g_thread_id, N, nthreads);
+#endif
 
   size_t split_index, n1, n2;
 
@@ -421,7 +435,9 @@ void TightCompact_inner_parallel(unsigned char *buf, size_t N, size_t block_size
     FOAV_SAFE2_CNTXT(TC_inner_parallel, i, nthreads)
   }
 
+#ifdef PROFILE_TIGHTCOMPACT
   printf_with_rtclock_diff(start, "Thread %u ending TightCompact_inner_parallel(N=%lu, nthreads=%lu)\n", g_thread_id, N, nthreads);
+#endif
 }
 
   #ifndef BEFTS_MODE 

+ 3 - 3
Enclave/config.cpp

@@ -55,7 +55,7 @@ int generateMasterKeys(sgx_aes_gcm_128bit_key_t master_secret,
 }
 
 
-bool ecall_config_load(threadid_t nthreads, bool private_routing,
+bool ecall_config_load(threadid_t nthreads,
     EnclaveAPIParams *apiparams,
     EnclaveAPINodeConfig *apinodeconfigs,
     nodenum_t num_nodes, nodenum_t my_node_num)
@@ -72,9 +72,8 @@ bool ecall_config_load(threadid_t nthreads, bool private_routing,
     g_teems_config.m_priv_in = apiparams->m_priv_in;
     g_teems_config.m_pub_out = apiparams->m_pub_out;
     g_teems_config.m_pub_in = apiparams->m_pub_in;
-    g_teems_config.private_routing = private_routing;
     memcpy(g_teems_config.master_secret, apiparams->master_secret, SGX_AESGCM_KEY_SIZE);
-
+    g_teems_config.private_routing = apiparams->private_routing;
     // Temporary vectors to store node numbers for nodes of different
     // types, where the node numbers are smaller than our own node
     // number
@@ -173,5 +172,6 @@ bool ecall_config_load(threadid_t nthreads, bool private_routing,
 
 void ecall_close()
 {
+    route_close();
     threadpool_shutdown();
 }

+ 1 - 0
Enclave/enclave_api.h

@@ -16,6 +16,7 @@ struct EnclaveAPIParams {
     uint8_t m_pub_out;
     uint8_t m_pub_in;
     sgx_aes_gcm_128bit_key_t master_secret;
+    bool private_routing;
 };
 
 #define ROLE_INGESTION 0x01

+ 57 - 0
Enclave/obliv.cpp

@@ -69,3 +69,60 @@ void obliv_pad_stg(uint8_t *buf, uint32_t msg_size,
         --tot_padding;
     }
 }
+
+// For each excess message, convert into padding for nodes that will need some.
+// Oblivious to contents of message buffer and tally vector. May modify message
+// buffer and tally vector.
+void obliv_excess_to_padding(uint8_t *buf, uint32_t msg_size, uint32_t num_msgs,
+    std::vector<uint32_t> &tally, uint32_t msgs_per_stg)
+{
+    const nodenum_t num_storage_nodes = nodenum_t(tally.size());
+
+    // Determine the number of messages exceeding and under the maximum that
+    // can be sent to a storage server. Oblivious to the contents of tally
+    // vector.
+    std::vector<uint32_t> excess(num_storage_nodes, 0);
+    std::vector<uint32_t> padding(num_storage_nodes, 0);
+    for (nodenum_t i=0; i<num_storage_nodes; ++i) {
+        bool exceeds = tally[i] > msgs_per_stg;
+        uint32_t diff = tally[i] - msgs_per_stg;
+        excess[i] = oselect_uint32_t(0, diff, exceeds);
+        diff = msgs_per_stg - tally[i];
+        padding[i] = oselect_uint32_t(0, diff, !exceeds);
+    }
+
+    uint8_t *cur_msg = buf + ((num_msgs-1)*msg_size);
+    uint32_t pad_user = (1<<DEST_UID_BITS)-1;
+    for (uint32_t i=0; i<num_msgs; ++i) {
+        // Determine if storage node for current node has excess messages.
+        // Also, decrement excess count and tally if so.
+        uint32_t storage_node_id = (*(const uint32_t*)cur_msg) >> DEST_UID_BITS;
+        bool node_excess = false;
+        for (uint32_t j=0; j<num_storage_nodes; ++j) {
+            bool at_msg_node = (storage_node_id == j);
+            bool cur_node_excess = (excess[j] > 0);
+            node_excess = oselect_uint32_t(node_excess, cur_node_excess,
+                at_msg_node);
+            excess[j] -= (at_msg_node & cur_node_excess);
+            tally[j] -= (at_msg_node & cur_node_excess);
+        }
+        // Find first node that needs padding. Decrement padding count and
+        // increment tally for that if current-message node has excess messages.
+        bool found_padding = false;
+        nodenum_t found_padding_node = 0;
+        for (uint32_t j=0; j<num_storage_nodes; ++j) {
+            bool found_padding_here = (!found_padding) & (!!padding[j]);
+            found_padding_node = oselect_uint32_t(found_padding_node, j,
+                found_padding_here);
+            found_padding = found_padding | found_padding_here;
+            padding[j] -= (found_padding_here & node_excess);
+            tally[j] += (found_padding_here & node_excess);
+        }
+        // Convert to padding if excess
+        uint32_t pad = ((found_padding_node<<DEST_UID_BITS) | pad_user);
+        *(uint32_t*)cur_msg = oselect_uint32_t(*(uint32_t*)cur_msg, pad,
+            node_excess);
+        // Go to previous message for backwards iteration through messages
+        cur_msg -= msg_size;
+    }
+}

+ 5 - 0
Enclave/obliv.hpp

@@ -27,4 +27,9 @@ std::vector<uint32_t> obliv_tally_stg(const uint8_t *buf,
 void obliv_pad_stg(uint8_t *buf, uint32_t msg_size,
     std::vector<uint32_t> &tally, uint32_t tot_padding);
 
+// For each excess message, convert into padding for nodes that will need some.
+// Oblivious to contents of message buffer and tally vector. May modify message
+// buffer and tally vector.
+void obliv_excess_to_padding(uint8_t *buf, uint32_t msg_size, uint32_t num_msgs,
+    std::vector<uint32_t> &tally, uint32_t msgs_per_stg);
 #endif

+ 40 - 3
Enclave/route.cpp

@@ -27,7 +27,12 @@ bool route_init()
     // send at most m_priv_out messages.
     uint32_t users_per_ing = CEILDIV(g_teems_config.user_count,
         g_teems_config.num_ingestion_nodes);
-    uint32_t tot_msg_per_ing = users_per_ing * g_teems_config.m_priv_out;
+    uint32_t tot_msg_per_ing;
+    if (g_teems_config.private_routing) {
+        tot_msg_per_ing = users_per_ing * g_teems_config.m_priv_out;
+    } else {
+        tot_msg_per_ing = users_per_ing * g_teems_config.m_pub_out;
+    }
 
     // Compute the maximum number of messages we could receive in round 1
     // Each ingestion node will send us an our_weight/tot_weight
@@ -47,8 +52,12 @@ bool route_init()
         g_teems_config.num_storage_nodes);
 
     // And so can receive at most this many messages
-    uint32_t tot_msg_per_stg = users_per_stg *
-        g_teems_config.m_priv_in;
+    uint32_t tot_msg_per_stg;
+    if (g_teems_config.private_routing) {
+        tot_msg_per_stg = users_per_stg * g_teems_config.m_priv_in;
+    } else {
+        tot_msg_per_stg = users_per_stg * g_teems_config.m_pub_in;
+    }
 
     // Which will be at most this many from us
     uint32_t max_msg_to_each_stg = CEILDIV(tot_msg_per_stg,
@@ -123,6 +132,14 @@ bool route_init()
     return true;
 }
 
+// Call when shutting system down to deallocate routing state
+void route_close() {
+    uint8_t my_roles = g_teems_config.roles[g_teems_config.my_node_num];
+    if (my_roles & ROLE_STORAGE) {
+        storage_close();
+    }
+}
+
 // Precompute the WaksmanNetworks needed for the sorts.  If you pass -1,
 // it will return the number of different sizes it needs to regenerate.
 // If you pass [0,sizes-1], it will compute one WaksmanNetwork with that
@@ -590,6 +607,26 @@ void ecall_routing_proceed(void *cbpointer)
             printf_with_rtclock_diff(start_tally, "end tally (%u)\n", inserted);
 #endif
 
+            // For public routing, convert excess messages to padding destined
+            // for other storage nodes with fewer messages than the maximum.
+            if (!g_teems_config.private_routing) {
+#ifdef PROFILE_ROUTING
+                unsigned long start_convert_excess = printf_with_rtclock("begin converting excess messages (%u)\n", round1.inserted);
+#endif
+                // Sort received messages by increasing storage node and
+                // priority. Smaller priority number indicates higher priority.
+                // Sorted messages are put back into source buffer.
+                sort_mtobliv<NidPriorityKey>(g_teems_config.nthreads,
+                    round1.buf, g_teems_config.msg_size, round1.inserted,
+                    round1.bufsize);
+                // Convert excess messages into padding
+                obliv_excess_to_padding(round1.buf, msg_size, round1.inserted,
+                    tally, msgs_per_stg);
+#ifdef PROFILE_ROUTING
+                printf_with_rtclock_diff(start_convert_excess, "end converting excess messages (%u)\n", round1.inserted);
+#endif
+            }
+
             // Note: tally contains private values!  It's OK to
             // non-obliviously check for an error condition, though.
             // While we're at it, obliviously change the tally of

+ 3 - 0
Enclave/route.hpp

@@ -87,6 +87,9 @@ extern RouteState route_state;
 // comms_init_nodestate. Returns true on success, false on failure.
 bool route_init();
 
+// Call when shutting system down to deallocate routing state
+void route_close();
+
 // For a given other node, set the received message handler to the first
 // message we would expect from them, given their roles and our roles.
 void route_init_msg_handler(nodenum_t node_num);

+ 42 - 0
Enclave/storage.cpp

@@ -19,6 +19,9 @@ static struct {
     MsgBuffer stg_buf;
     // The destination vector for ORExpand
     std::vector<uint32_t> dest;
+    // The selected array for compaction during public routing
+    // Need an bool array for compaction, and std:vector<bool> lacks .data()
+    bool *pub_selected;
 } storage_state;
 
 bool storage_generateClientKeys(uint32_t num_clients, uint32_t my_stg_no) {
@@ -280,6 +283,7 @@ bool storage_init(uint32_t max_users, uint32_t msg_buf_size)
     storage_state.max_users = max_users;
     storage_state.stg_buf.alloc(msg_buf_size);
     storage_state.dest.resize(msg_buf_size);
+    storage_state.pub_selected = new bool[msg_buf_size];
     uint32_t my_storage_node_id = 0;
     uint32_t my_stg_pos = 0;
     for (nodenum_t i=0; i<g_teems_config.num_nodes; ++i) {
@@ -298,6 +302,10 @@ bool storage_init(uint32_t max_users, uint32_t msg_buf_size)
     return true;
 }
 
+void storage_close() {
+    delete[] storage_state.pub_selected;
+}
+
 // Handle the messages received by a storage node.  Pass a _locked_
 // MsgBuffer.  This function will itself reset and unlock it when it's
 // done with it.
@@ -369,6 +377,40 @@ void storage_received(MsgBuffer &storage_buf)
     printf_with_rtclock_diff(start_sort, "end oblivious sort (%u)\n", storage_buf.inserted);
 #endif
 
+    // For public routing, remove excess per-user messages by making them 
+    // padding, and then compact non-padding messages.
+    if (!g_teems_config.private_routing) {
+        uint8_t *msg = storage_state.stg_buf.buf;
+        uint32_t uid;
+        uint32_t prev_uid = uid_mask; // initialization technically unnecessary
+        uint32_t num_user_msgs = 0; // number of messages to the user
+        uint8_t sel;
+        for (uint32_t i=0; i<num_msgs; ++i) {
+            uid = (*(uint32_t*) msg) &= uid_mask;
+            num_user_msgs = oselect_uint32_t(1, num_user_msgs+1,
+                uid == prev_uid);
+            // Select if messages per user not exceeded and msg is not padding
+            sel = ((uint8_t) ((num_user_msgs <= g_teems_config.m_pub_in))) &   
+                ((uint8_t) uid != uid_mask);
+            storage_state.pub_selected[i] = (bool) sel;
+            // Make padding if not selected
+            *(uint32_t *) msg = (*(uint32_t *) msg) & nid_mask;
+            *(uint32_t *) msg += oselect_uint32_t(uid_mask, uid, sel);
+            msg += msg_size;
+            prev_uid = uid;
+        }
+        #ifdef PROFILE_STORAGE
+            unsigned long start_compaction = printf_with_rtclock("begin public-channel compaction (%u)\n", num_msgs);
+        #endif
+        TightCompact_parallel<OSWAP_16X>(
+            (unsigned char *) storage_state.stg_buf.buf,
+            num_msgs, msg_size, storage_state.pub_selected,
+            g_teems_config.nthreads);
+        #ifdef PROFILE_STORAGE
+            printf_with_rtclock_diff(start_compaction, "end public-channel compaction (%u)\n", num_msgs);
+        #endif
+    }
+
     /*
     for (uint32_t i=0;i<num_msgs; ++i) {
         printf("%3d: %08x %08x\n", i,

+ 4 - 0
Enclave/storage.hpp

@@ -11,6 +11,10 @@
 // failure.
 bool storage_init(uint32_t max_users, uint32_t msg_buf_size);
 
+// Call when shutting system down to deallocate routing state. route_close()
+// will call this function if the node has a storage role.
+void storage_close();
+
 // Handle the messages received by a storage node.  Pass a _locked_
 // MsgBuffer.  This function will itself reset and unlock it when it's
 // done with it.

+ 2 - 3
Untrusted/Untrusted.cpp

@@ -226,14 +226,13 @@ bool ecall_identity_key_load(sgx_ec256_public_t* outpub,
 }
 
 bool ecall_config_load(threadid_t nthreads,
-    bool private_routing,
     struct EnclaveAPIParams *apiparams,
     struct EnclaveAPINodeConfig *apinodeconfigs,
     nodenum_t num_nodes, nodenum_t my_node_num)
 {
     bool ret;
-    ecall_config_load(global_eid, &ret, nthreads, private_routing,
-        apiparams, apinodeconfigs, num_nodes, my_node_num);
+    ecall_config_load(global_eid, &ret, nthreads, apiparams, apinodeconfigs,
+        num_nodes, my_node_num);
     return ret;
 }
 

+ 0 - 1
Untrusted/Untrusted.hpp

@@ -20,7 +20,6 @@ bool ecall_identity_key_load(sgx_ec256_public_t* outpub,
     const sgx_sealed_data_t* insealedpriv);
 
 bool ecall_config_load(threadid_t nthreads,
-    bool private_routing,
     struct EnclaveAPIParams *apiparams,
     struct EnclaveAPINodeConfig *apinodeconfigs,
     nodenum_t num_nodes, nodenum_t my_node_num);