|
@@ -1190,6 +1190,9 @@ static void round1c_processing(void *cbpointer) {
|
|
|
// (so as to encounter and keep the highest-priority messages
|
|
|
// for any given receiver first), obliviously turn any messages
|
|
|
// over the limit of pub_in for any given receiver into padding.
|
|
|
+ // Also keep track of which messages are not padding for use in
|
|
|
+ // later compaction.
|
|
|
+ bool *is_not_padding = new bool[round1a_sorted.inserted];
|
|
|
for (uint32_t i=0; i<round1a_sorted.inserted; ++i) {
|
|
|
uint8_t *header = round1a_sorted.buf +
|
|
|
msg_size * (round1a_sorted.inserted - 1 - i);
|
|
@@ -1226,12 +1229,29 @@ static void round1c_processing(void *cbpointer) {
|
|
|
// (padding) if become-padding is 1
|
|
|
receiver_id |= (-(uint32_t(become_padding)));
|
|
|
*(uint32_t*)header = receiver_id;
|
|
|
+ is_not_padding[round1a_sorted.inserted - 1 - i] =
|
|
|
+ (receiver_id != 0xffffffff);
|
|
|
}
|
|
|
|
|
|
#ifdef TRACE_ROUTING
|
|
|
show_messages("In round 1c after padding pass", round1a_sorted.buf,
|
|
|
round1a_sorted.inserted);
|
|
|
#endif
|
|
|
+
|
|
|
+ // Oblivious compaction to move the padding messages to the end,
|
|
|
+ // preserving the (already-sorted) order of the non-padding
|
|
|
+ // messages
|
|
|
+ TightCompact_parallel<OSWAP_16X>(
|
|
|
+ (unsigned char *) round1a_sorted.buf,
|
|
|
+ round1a_sorted.inserted, msg_size, is_not_padding,
|
|
|
+ g_teems_config.nthreads);
|
|
|
+ delete[] is_not_padding;
|
|
|
+
|
|
|
+#ifdef TRACE_ROUTING
|
|
|
+ show_messages("In round 1c after compaction", round1a_sorted.buf,
|
|
|
+ round1a_sorted.inserted);
|
|
|
+#endif
|
|
|
+
|
|
|
send_round_robin_msgs<UidPriorityKey>(route_state.round1c,
|
|
|
round1a_sorted.buf, NULL, round1a_sorted.inserted);
|
|
|
|