소스 검색

Refill the Waksman network sizes we've actually used each epoch

Ian Goldberg 1 년 전
부모
커밋
8161930588
5개의 변경된 파일63개의 추가작업 그리고 32개의 파일을 삭제
  1. 1 1
      App/start.cpp
  2. 29 31
      Enclave/route.cpp
  3. 23 0
      Enclave/sort.cpp
  4. 4 0
      Enclave/sort.hpp
  5. 6 0
      Enclave/storage.cpp

+ 1 - 1
App/start.cpp

@@ -144,7 +144,7 @@ static void route_test(NetIO &netio, char **args)
 
     // Precompute some WaksmanNetworks
     const Config &config = netio.config();
-    size_t num_sizes = ecall_precompute_sort(-1);
+    size_t num_sizes = ecall_precompute_sort(-2);
     for (int i=0;i<int(num_sizes);++i) {
         std::vector<boost::thread> ts;
         for (int j=0; j<config.nthreads; ++j) {

+ 29 - 31
Enclave/route.cpp

@@ -140,47 +140,45 @@ bool route_init()
 }
 
 // Precompute the WaksmanNetworks needed for the sorts.  If you pass -1,
-// it will return the number of different sizes it needs.  If you pass
-// [0,sizes-1], it will compute one WaksmanNetwork with that size index
-// and return the number of available WaksmanNetworks of that size.
+// it will return the number of different sizes it needs to regenerate.
+// If you pass [0,sizes-1], it will compute one WaksmanNetwork with that
+// size index and return the number of available WaksmanNetworks of that
+// size.  If you pass anything else, it will return the number of
+// different sizes it needs at all.
+
+// The list of sizes that need refilling, updated when you pass -1
+static std::vector<uint32_t> used_sizes;
 
 size_t ecall_precompute_sort(int sizeidx)
 {
     size_t ret = 0;
 
-    switch(sizeidx) {
-    case 0:
-#ifdef PROFILE_ROUTING
-    {unsigned long start = printf_with_rtclock("begin precompute WaksmanNetwork (%u)\n", route_state.tot_msg_per_ing);
-#endif
-        ret = sort_precompute(route_state.tot_msg_per_ing);
-#ifdef PROFILE_ROUTING
-    printf_with_rtclock_diff(start, "end precompute Waksman Network (%u)\n", route_state.tot_msg_per_ing);}
-#endif
-        break;
-    case 1:
-#ifdef PROFILE_ROUTING
-    {unsigned long start = printf_with_rtclock("begin precompute WaksmanNetwork (%u)\n", route_state.max_round2_msgs);
-#endif
-        ret = sort_precompute(route_state.max_round2_msgs);
-#ifdef PROFILE_ROUTING
-    printf_with_rtclock_diff(start, "end precompute Waksman Network (%u)\n", route_state.max_round2_msgs);}
-#endif
-        break;
-    case 2:
+    if (sizeidx == -1) {
+        used_sizes = sort_get_used();
+        ret = used_sizes.size();
+    } else if (sizeidx >= 0 && sizeidx < used_sizes.size()) {
+        uint32_t size = used_sizes[sizeidx];
 #ifdef PROFILE_ROUTING
-    {unsigned long start = printf_with_rtclock("begin precompute WaksmanNetwork (%u)\n", route_state.max_stg_msgs);
+        unsigned long start = printf_with_rtclock("begin precompute WaksmanNetwork (%u)\n", size);
 #endif
-        ret = sort_precompute(route_state.max_stg_msgs);
+        ret = sort_precompute(size);
 #ifdef PROFILE_ROUTING
-    printf_with_rtclock_diff(start, "end precompute Waksman Network (%u)\n", route_state.max_stg_msgs);}
+        printf_with_rtclock_diff(start, "end precompute Waksman Network (%u)\n", size);
 #endif
-        break;
-    default:
-        ret = 3;
-        break;
-    }
+    } else {
+        uint8_t my_roles = g_teems_config.roles[g_teems_config.my_node_num];
 
+        if (my_roles & ROLE_INGESTION) {
+            used_sizes.push_back(route_state.tot_msg_per_ing);
+        }
+        if (my_roles & ROLE_ROUTING) {
+            used_sizes.push_back(route_state.max_round2_msgs);
+        }
+        if (my_roles & ROLE_STORAGE) {
+            used_sizes.push_back(route_state.max_stg_msgs);
+        }
+        ret = used_sizes.size();
+    }
     return ret;
 }
 

+ 23 - 0
Enclave/sort.cpp

@@ -21,6 +21,16 @@ struct PrecompWNs {
 
 static PrecompWNs precomp_wns;
 
+// A (mutexed) vector of sizes we've used since we were last asked
+struct UsedSizes {
+    pthread_mutex_t mutex;
+    std::vector<uint32_t> used;
+
+    UsedSizes() { pthread_mutex_init(&mutex, NULL); }
+};
+
+static UsedSizes used_sizes;
+
 // A (mutexed) map mapping (N, nthreads) pairs to WNEvalPlans
 struct EvalPlans {
     pthread_mutex_t mutex;
@@ -96,6 +106,9 @@ uint32_t shuffle_mtobliv(threadid_t nthreads, uint8_t* items, uint16_t msg_size,
             pthread_mutex_unlock(&N.second.mutex);
             continue;
         }
+        pthread_mutex_lock(&used_sizes.mutex);
+        used_sizes.used.push_back(N.first);
+        pthread_mutex_unlock(&used_sizes.mutex);
         wn = std::move(N.second.wns.front());
         N.second.wns.pop_front();
         Nw = N.first;
@@ -128,3 +141,13 @@ uint32_t shuffle_mtobliv(threadid_t nthreads, uint8_t* items, uint16_t msg_size,
     return Nw;
 }
 
+std::vector<uint32_t> sort_get_used()
+{
+    std::vector<uint32_t> res;
+
+    pthread_mutex_lock(&used_sizes.mutex);
+    res = std::move(used_sizes.used);
+    pthread_mutex_unlock(&used_sizes.mutex);
+
+    return res;
+}

+ 4 - 0
Enclave/sort.hpp

@@ -27,6 +27,10 @@
 // size.
 size_t sort_precompute(uint32_t N);
 
+// Return a vector of the precomputed sizes we've used since we were
+// last asked
+std::vector<uint32_t> sort_get_used();
+
 // Precompute a WNEvalPlan for a given size and number of threads.
 // These are not consumed as they are used, so you only need to call
 // this once for each (size,nthreads) pair you need.  The precomputation

+ 6 - 0
Enclave/storage.cpp

@@ -90,11 +90,13 @@ void storage_received(MsgBuffer &storage_buf)
     }
     printf("%u real, %u padding\n", real, padding);
 
+    /*
     for (uint32_t i=0;i<num_msgs; ++i) {
         printf("%3d: %08x %08x\n", i,
         *(uint32_t*)(storage_buf.buf+(i*msg_size)),
         *(uint32_t*)(storage_buf.buf+(i*msg_size+4)));
     }
+    */
     // Sort the received messages by userid into the
     // storage_state.stg_buf MsgBuffer.
 #ifdef PROFILE_STORAGE
@@ -107,11 +109,13 @@ void storage_received(MsgBuffer &storage_buf)
     printf_with_rtclock_diff(start_sort, "end oblivious sort (%u)\n", storage_buf.inserted);
 #endif
 
+    /*
     for (uint32_t i=0;i<num_msgs; ++i) {
         printf("%3d: %08x %08x\n", i,
         *(uint32_t*)(storage_state.stg_buf.buf+(i*msg_size)),
         *(uint32_t*)(storage_state.stg_buf.buf+(i*msg_size+4)));
     }
+    */
 
 #ifdef PROFILE_STORAGE
     unsigned long start_dest = printf_with_rtclock("begin setting dests (%u)\n", storage_state.stg_buf.bufsize);
@@ -153,11 +157,13 @@ void storage_received(MsgBuffer &storage_buf)
 #ifdef PROFILE_STORAGE
     printf_with_rtclock_diff(start_expand, "end ORExpand (%u)\n", stg_size);
 #endif
+    /*
     for (uint32_t i=0;i<stg_size; ++i) {
         printf("%3d: %08x %08x\n", i,
         *(uint32_t*)(storage_state.stg_buf.buf+(i*msg_size)),
         *(uint32_t*)(storage_state.stg_buf.buf+(i*msg_size+4)));
     }
+    */
 
     // You can do more processing after these lines, as long as they
     // don't touch storage_buf.  They _can_ touch the backing buffer