|
@@ -50,7 +50,12 @@ bool route_init()
|
|
// send at most m_priv_out messages.
|
|
// send at most m_priv_out messages.
|
|
uint32_t users_per_ing = CEILDIV(g_teems_config.user_count,
|
|
uint32_t users_per_ing = CEILDIV(g_teems_config.user_count,
|
|
g_teems_config.num_ingestion_nodes);
|
|
g_teems_config.num_ingestion_nodes);
|
|
- uint32_t tot_msg_per_ing = users_per_ing * g_teems_config.m_priv_out;
|
|
|
|
|
|
+ uint32_t tot_msg_per_ing;
|
|
|
|
+ if (g_teems_config.private_routing) {
|
|
|
|
+ tot_msg_per_ing = users_per_ing * g_teems_config.m_priv_out;
|
|
|
|
+ } else {
|
|
|
|
+ tot_msg_per_ing = users_per_ing * g_teems_config.m_pub_out;
|
|
|
|
+ }
|
|
|
|
|
|
// Compute the maximum number of messages we could receive in round 1
|
|
// Compute the maximum number of messages we could receive in round 1
|
|
// Each ingestion node will send us an our_weight/tot_weight
|
|
// Each ingestion node will send us an our_weight/tot_weight
|
|
@@ -70,8 +75,12 @@ bool route_init()
|
|
g_teems_config.num_storage_nodes);
|
|
g_teems_config.num_storage_nodes);
|
|
|
|
|
|
// And so can receive at most this many messages
|
|
// And so can receive at most this many messages
|
|
- uint32_t tot_msg_per_stg = users_per_stg *
|
|
|
|
- g_teems_config.m_priv_in;
|
|
|
|
|
|
+ uint32_t tot_msg_per_stg;
|
|
|
|
+ if (g_teems_config.private_routing) {
|
|
|
|
+ tot_msg_per_stg = users_per_stg * g_teems_config.m_priv_in;
|
|
|
|
+ } else {
|
|
|
|
+ tot_msg_per_stg = users_per_stg * g_teems_config.m_pub_in;
|
|
|
|
+ }
|
|
|
|
|
|
// Which will be at most this many from us
|
|
// Which will be at most this many from us
|
|
uint32_t max_msg_to_each_stg = CEILDIV(tot_msg_per_stg,
|
|
uint32_t max_msg_to_each_stg = CEILDIV(tot_msg_per_stg,
|
|
@@ -612,6 +621,25 @@ void ecall_routing_proceed(void *cbpointer)
|
|
printf_with_rtclock_diff(start_tally, "end tally (%u)\n", inserted);
|
|
printf_with_rtclock_diff(start_tally, "end tally (%u)\n", inserted);
|
|
#endif
|
|
#endif
|
|
|
|
|
|
|
|
+ // For public routing, remove excess messages, making them padding
|
|
|
|
+ if (!g_teems_config.private_routing) {
|
|
|
|
+ // How many excess messages to remove per storage server
|
|
|
|
+ std::vector<uint32_t> excess = obliv_excess_stg(tally,
|
|
|
|
+ num_storage_nodes, msgs_per_stg);
|
|
|
|
+ // How many padding messages to add per storage server
|
|
|
|
+ std::vector<uint32_t> padding = obliv_padding_stg(tally,
|
|
|
|
+ num_storage_nodes, msgs_per_stg);
|
|
|
|
+ // Sort received messages by increasing storage node and
|
|
|
|
+ // priority. Smaller priority number indicates higher priority.
|
|
|
|
+ // Sorted messages are put back into source buffer.
|
|
|
|
+ sort_mtobliv<NidPriorityKey>(g_teems_config.nthreads,
|
|
|
|
+ round1.buf, g_teems_config.msg_size, round1.inserted,
|
|
|
|
+ round1.bufsize);
|
|
|
|
+ // Convert excess messages into padding
|
|
|
|
+ obliv_excess_to_padding(round1.buf, msg_size, round1.inserted,
|
|
|
|
+ excess, padding, num_storage_nodes);
|
|
|
|
+ }
|
|
|
|
+
|
|
// Note: tally contains private values! It's OK to
|
|
// Note: tally contains private values! It's OK to
|
|
// non-obliviously check for an error condition, though.
|
|
// non-obliviously check for an error condition, though.
|
|
// While we're at it, obliviously change the tally of
|
|
// While we're at it, obliviously change the tally of
|