|
@@ -943,19 +943,24 @@ static void round1a_processing(void *cbpointer) {
|
|
#endif
|
|
#endif
|
|
#ifdef PROFILE_ROUTING
|
|
#ifdef PROFILE_ROUTING
|
|
uint32_t inserted = round1.inserted;
|
|
uint32_t inserted = round1.inserted;
|
|
- unsigned long start_round1a = printf_with_rtclock("begin round1a processing (%u)\n", inserted);
|
|
|
|
|
|
+ unsigned long start_round1a =
|
|
|
|
+ printf_with_rtclock("begin round1a processing (%u)\n", inserted);
|
|
// Sort the messages we've received
|
|
// Sort the messages we've received
|
|
- unsigned long start_sort = printf_with_rtclock("begin oblivious sort (%u,%u)\n", inserted, route_state.max_round1_msgs);
|
|
|
|
|
|
+ unsigned long start_sort =
|
|
|
|
+ printf_with_rtclock("begin oblivious sort (%u,%u)\n", inserted,
|
|
|
|
+ route_state.max_round1_msgs);
|
|
#endif
|
|
#endif
|
|
// Sort received messages by increasing user ID and
|
|
// Sort received messages by increasing user ID and
|
|
- // priority. Smaller priority number indicates higher priority.
|
|
|
|
|
|
+ // priority. Larger priority number indicates higher priority.
|
|
sort_mtobliv<UidPriorityKey>(g_teems_config.nthreads, round1.buf,
|
|
sort_mtobliv<UidPriorityKey>(g_teems_config.nthreads, round1.buf,
|
|
g_teems_config.msg_size, round1.inserted, route_state.max_round1_msgs,
|
|
g_teems_config.msg_size, round1.inserted, route_state.max_round1_msgs,
|
|
send_round1a_msgs);
|
|
send_round1a_msgs);
|
|
|
|
|
|
#ifdef PROFILE_ROUTING
|
|
#ifdef PROFILE_ROUTING
|
|
- printf_with_rtclock_diff(start_sort, "end oblivious sort (%u,%u)\n", inserted, route_state.max_round1_msgs);
|
|
|
|
- printf_with_rtclock_diff(start_round1a, "end round1a processing (%u)\n", inserted);
|
|
|
|
|
|
+ printf_with_rtclock_diff(start_sort, "end oblivious sort (%u,%u)\n",
|
|
|
|
+ inserted, route_state.max_round1_msgs);
|
|
|
|
+ printf_with_rtclock_diff(start_round1a, "end round1a processing (%u)\n",
|
|
|
|
+ inserted);
|
|
#endif
|
|
#endif
|
|
round1.reset();
|
|
round1.reset();
|
|
pthread_mutex_unlock(&round1.mutex);
|
|
pthread_mutex_unlock(&round1.mutex);
|
|
@@ -1001,12 +1006,15 @@ static void round1b_processing(void *cbpointer) {
|
|
#endif
|
|
#endif
|
|
#ifdef PROFILE_ROUTING
|
|
#ifdef PROFILE_ROUTING
|
|
uint32_t inserted = round1a.inserted;
|
|
uint32_t inserted = round1a.inserted;
|
|
- unsigned long start_round1b = printf_with_rtclock("begin round1b processing (%u)\n", inserted);
|
|
|
|
|
|
+ unsigned long start_round1b =
|
|
|
|
+ printf_with_rtclock("begin round1b processing (%u)\n", inserted);
|
|
// Sort the messages we've received
|
|
// Sort the messages we've received
|
|
- unsigned long start_sort = printf_with_rtclock("begin oblivious sort (%u,%u)\n", inserted, route_state.max_round1a_msgs);
|
|
|
|
|
|
+ unsigned long start_sort =
|
|
|
|
+ printf_with_rtclock("begin oblivious sort (%u,%u)\n", inserted,
|
|
|
|
+ route_state.max_round1a_msgs);
|
|
#endif
|
|
#endif
|
|
// Sort received messages by increasing user ID and
|
|
// Sort received messages by increasing user ID and
|
|
- // priority. Smaller priority number indicates higher priority.
|
|
|
|
|
|
+ // priority. Larger priority number indicates higher priority.
|
|
if (inserted > 0) {
|
|
if (inserted > 0) {
|
|
// copy items in sorted order into round1a_sorted
|
|
// copy items in sorted order into round1a_sorted
|
|
sort_mtobliv<UidPriorityKey>(g_teems_config.nthreads, round1a.buf,
|
|
sort_mtobliv<UidPriorityKey>(g_teems_config.nthreads, round1a.buf,
|
|
@@ -1176,7 +1184,7 @@ static void round1c_processing(void *cbpointer) {
|
|
unsigned long start_sort = printf_with_rtclock("begin full oblivious sort (%u,%u)\n", round1a.inserted, route_state.max_round1a_msgs);
|
|
unsigned long start_sort = printf_with_rtclock("begin full oblivious sort (%u,%u)\n", round1a.inserted, route_state.max_round1a_msgs);
|
|
#endif
|
|
#endif
|
|
// Sort received messages by increasing user ID and
|
|
// Sort received messages by increasing user ID and
|
|
- // priority. Smaller priority number indicates higher priority.
|
|
|
|
|
|
+ // priority. Larger priority number indicates higher priority.
|
|
if (round1a.inserted > 0) {
|
|
if (round1a.inserted > 0) {
|
|
sort_mtobliv<UidPriorityKey>(g_teems_config.nthreads, round1a_sorted.buf,
|
|
sort_mtobliv<UidPriorityKey>(g_teems_config.nthreads, round1a_sorted.buf,
|
|
msg_size, round1a.inserted, route_state.max_round1a_msgs,
|
|
msg_size, round1a.inserted, route_state.max_round1a_msgs,
|
|
@@ -1358,28 +1366,37 @@ void ecall_routing_proceed(void *cbpointer)
|
|
#endif
|
|
#endif
|
|
#ifdef PROFILE_ROUTING
|
|
#ifdef PROFILE_ROUTING
|
|
uint32_t inserted = ingbuf.inserted;
|
|
uint32_t inserted = ingbuf.inserted;
|
|
- unsigned long start_round1 = printf_with_rtclock("begin round1 processing (%u)\n", inserted);
|
|
|
|
- unsigned long start_sort = printf_with_rtclock("begin oblivious sort (%u,%u)\n", inserted, route_state.tot_msg_per_ing);
|
|
|
|
|
|
+ unsigned long start_round1 =
|
|
|
|
+ printf_with_rtclock("begin round1 processing (%u)\n", inserted);
|
|
|
|
+ unsigned long start_sort =
|
|
|
|
+ printf_with_rtclock("begin oblivious sort (%u,%u)\n", inserted,
|
|
|
|
+ route_state.tot_msg_per_ing);
|
|
#endif
|
|
#endif
|
|
if (g_teems_config.private_routing) {
|
|
if (g_teems_config.private_routing) {
|
|
sort_mtobliv<UidKey>(g_teems_config.nthreads, ingbuf.buf,
|
|
sort_mtobliv<UidKey>(g_teems_config.nthreads, ingbuf.buf,
|
|
g_teems_config.msg_size, ingbuf.inserted,
|
|
g_teems_config.msg_size, ingbuf.inserted,
|
|
route_state.tot_msg_per_ing,
|
|
route_state.tot_msg_per_ing,
|
|
[&](const uint8_t *msgs, const UidKey *indices, uint32_t N) {
|
|
[&](const uint8_t *msgs, const UidKey *indices, uint32_t N) {
|
|
- send_round_robin_msgs<UidKey>(route_state.round1, msgs, indices, N);
|
|
|
|
|
|
+ send_round_robin_msgs<UidKey>(route_state.round1,
|
|
|
|
+ msgs, indices, N);
|
|
});
|
|
});
|
|
} else {
|
|
} else {
|
|
// Sort received messages by increasing user ID and
|
|
// Sort received messages by increasing user ID and
|
|
- // priority. Smaller priority number indicates higher priority.
|
|
|
|
|
|
+ // priority. Larger priority number indicates higher priority.
|
|
sort_mtobliv<UidPriorityKey>(g_teems_config.nthreads, ingbuf.buf,
|
|
sort_mtobliv<UidPriorityKey>(g_teems_config.nthreads, ingbuf.buf,
|
|
- g_teems_config.msg_size, ingbuf.inserted, route_state.tot_msg_per_ing,
|
|
|
|
- [&](const uint8_t *msgs, const UidPriorityKey *indices, uint32_t N) {
|
|
|
|
- send_round_robin_msgs<UidPriorityKey>(route_state.round1, msgs, indices, N);
|
|
|
|
|
|
+ g_teems_config.msg_size, ingbuf.inserted,
|
|
|
|
+ route_state.tot_msg_per_ing,
|
|
|
|
+ [&](const uint8_t *msgs,
|
|
|
|
+ const UidPriorityKey *indices, uint32_t N) {
|
|
|
|
+ send_round_robin_msgs<UidPriorityKey>(route_state.round1,
|
|
|
|
+ msgs, indices, N);
|
|
});
|
|
});
|
|
}
|
|
}
|
|
#ifdef PROFILE_ROUTING
|
|
#ifdef PROFILE_ROUTING
|
|
- printf_with_rtclock_diff(start_sort, "end oblivious sort (%u,%u)\n", inserted, route_state.tot_msg_per_ing);
|
|
|
|
- printf_with_rtclock_diff(start_round1, "end round1 processing (%u)\n", inserted);
|
|
|
|
|
|
+ printf_with_rtclock_diff(start_sort, "end oblivious sort (%u,%u)\n",
|
|
|
|
+ inserted, route_state.tot_msg_per_ing);
|
|
|
|
+ printf_with_rtclock_diff(start_round1, "end round1 processing (%u)\n",
|
|
|
|
+ inserted);
|
|
#endif
|
|
#endif
|
|
ingbuf.reset();
|
|
ingbuf.reset();
|
|
pthread_mutex_unlock(&ingbuf.mutex);
|
|
pthread_mutex_unlock(&ingbuf.mutex);
|