|
@@ -34,6 +34,13 @@ struct MsgBuffer {
|
|
|
buf = new uint8_t[size_t(msgs) * g_teems_config.msg_size];
|
|
|
}
|
|
|
|
|
|
+ // Reset the contents of the buffer
|
|
|
+ void reset() {
|
|
|
+ memset(buf, 0, inserted * g_teems_config.msg_size);
|
|
|
+ reserved = 0;
|
|
|
+ inserted = 0;
|
|
|
+ }
|
|
|
+
|
|
|
// You can't copy a MsgBuffer
|
|
|
MsgBuffer(const MsgBuffer&) = delete;
|
|
|
MsgBuffer &operator=(const MsgBuffer&) = delete;
|
|
@@ -212,3 +219,53 @@ bool ecall_ingest_raw(uint8_t *msgs, uint32_t num_msgs)
|
|
|
|
|
|
return true;
|
|
|
}
|
|
|
+
|
|
|
+// Send the round 1 messages
|
|
|
+static void send_round1_msgs(const uint8_t *msgs, const uint64_t *indices,
|
|
|
+ uint32_t N)
|
|
|
+{
|
|
|
+ uint16_t msg_size = g_teems_config.msg_size;
|
|
|
+ uint16_t tot_weight = g_teems_config.tot_weight;
|
|
|
+
|
|
|
+ /*
|
|
|
+ for (uint32_t i=0;i<N;++i) {
|
|
|
+ const uint8_t *msg = msgs + indices[i]*msg_size;
|
|
|
+ for (uint16_t j=0;j<msg_size/4;++j) {
|
|
|
+ printf("%08x ", ((const uint32_t*)msg)[j]);
|
|
|
+ }
|
|
|
+ printf("\n");
|
|
|
+ }
|
|
|
+ */
|
|
|
+}
|
|
|
+
|
|
|
+// Perform the next round of routing. The callback pointer will be
|
|
|
+// passed to ocall_routing_round_complete when the round is complete.
|
|
|
+void ecall_routing_proceed(void *cbpointer)
|
|
|
+{
|
|
|
+ if (route_state.step == ROUTE_NOT_STARTED) {
|
|
|
+
|
|
|
+ MsgBuffer &round1 = route_state.round1;
|
|
|
+
|
|
|
+ pthread_mutex_lock(&round1.mutex);
|
|
|
+ // Ensure there are no pending messages currently being inserted
|
|
|
+ // into the buffer
|
|
|
+ while (round1.reserved != round1.inserted) {
|
|
|
+ pthread_mutex_unlock(&round1.mutex);
|
|
|
+ pthread_mutex_lock(&round1.mutex);
|
|
|
+ }
|
|
|
+ // Sort the messages we've received
|
|
|
+#ifdef PROFILE_ROUTING
|
|
|
+ uint32_t inserted = round1.inserted;
|
|
|
+ unsigned long start = printf_with_rtclock("begin oblivious sort (%u,%u)\n", inserted, route_state.tot_msg_per_ing);
|
|
|
+#endif
|
|
|
+ sort_mtobliv(g_teems_config.nthreads, round1.buf,
|
|
|
+ g_teems_config.msg_size, round1.inserted,
|
|
|
+ route_state.tot_msg_per_ing, send_round1_msgs);
|
|
|
+ round1.reset();
|
|
|
+#ifdef PROFILE_ROUTING
|
|
|
+ printf_with_rtclock_diff(start, "end oblivious sort (%u,%u)\n", inserted, route_state.tot_msg_per_ing);
|
|
|
+#endif
|
|
|
+ route_state.step = ROUTE_ROUND_1;
|
|
|
+ ocall_routing_round_complete(cbpointer, 1);
|
|
|
+ }
|
|
|
+}
|