|
@@ -28,7 +28,7 @@ static void show_messages(const char *label, const unsigned char *buffer,
|
|
|
}
|
|
}
|
|
|
for (size_t i=0; i<num && i<300; ++i) {
|
|
for (size_t i=0; i<num && i<300; ++i) {
|
|
|
const uint32_t *ibuf = (const uint32_t *)buffer;
|
|
const uint32_t *ibuf = (const uint32_t *)buffer;
|
|
|
- if (g_teems_config.private_routing) {
|
|
|
|
|
|
|
+ if (g_teems_config.token_channel) {
|
|
|
printf("%3d R:%08x S:%08x [%08x]\n", i, ibuf[0], ibuf[1],
|
|
printf("%3d R:%08x S:%08x [%08x]\n", i, ibuf[0], ibuf[1],
|
|
|
ibuf[2]);
|
|
ibuf[2]);
|
|
|
} else {
|
|
} else {
|
|
@@ -51,18 +51,18 @@ bool route_init()
|
|
|
|
|
|
|
|
// Each ingestion node will have at most
|
|
// Each ingestion node will have at most
|
|
|
// ceil(user_count/num_ingestion_nodes) users, and each user will
|
|
// ceil(user_count/num_ingestion_nodes) users, and each user will
|
|
|
- // send at most m_priv_out messages.
|
|
|
|
|
|
|
+ // send at most m_token_out messages.
|
|
|
uint32_t users_per_ing = CEILDIV(g_teems_config.user_count,
|
|
uint32_t users_per_ing = CEILDIV(g_teems_config.user_count,
|
|
|
g_teems_config.num_ingestion_nodes);
|
|
g_teems_config.num_ingestion_nodes);
|
|
|
uint32_t tot_msg_per_ing;
|
|
uint32_t tot_msg_per_ing;
|
|
|
- if (g_teems_config.private_routing) {
|
|
|
|
|
- tot_msg_per_ing = users_per_ing * g_teems_config.m_priv_out;
|
|
|
|
|
|
|
+ if (g_teems_config.token_channel) {
|
|
|
|
|
+ tot_msg_per_ing = users_per_ing * g_teems_config.m_token_out;
|
|
|
} else {
|
|
} else {
|
|
|
- tot_msg_per_ing = users_per_ing * g_teems_config.m_pub_out;
|
|
|
|
|
|
|
+ tot_msg_per_ing = users_per_ing * g_teems_config.m_id_out;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Compute the maximum number of messages we could receive in round 1
|
|
// Compute the maximum number of messages we could receive in round 1
|
|
|
- // In private routing, each ingestion node will send us an
|
|
|
|
|
|
|
+ // In token channel routing, each ingestion node will send us an
|
|
|
// our_weight/tot_weight fraction of the messages they hold
|
|
// our_weight/tot_weight fraction of the messages they hold
|
|
|
uint32_t max_msg_from_each_ing;
|
|
uint32_t max_msg_from_each_ing;
|
|
|
max_msg_from_each_ing = CEILDIV(tot_msg_per_ing, g_teems_config.tot_weight) *
|
|
max_msg_from_each_ing = CEILDIV(tot_msg_per_ing, g_teems_config.tot_weight) *
|
|
@@ -81,10 +81,10 @@ bool route_init()
|
|
|
|
|
|
|
|
// And so can receive at most this many messages
|
|
// And so can receive at most this many messages
|
|
|
uint32_t tot_msg_per_stg;
|
|
uint32_t tot_msg_per_stg;
|
|
|
- if (g_teems_config.private_routing) {
|
|
|
|
|
- tot_msg_per_stg = users_per_stg * g_teems_config.m_priv_in;
|
|
|
|
|
|
|
+ if (g_teems_config.token_channel) {
|
|
|
|
|
+ tot_msg_per_stg = users_per_stg * g_teems_config.m_token_in;
|
|
|
} else {
|
|
} else {
|
|
|
- tot_msg_per_stg = users_per_stg * g_teems_config.m_pub_in;
|
|
|
|
|
|
|
+ tot_msg_per_stg = users_per_stg * g_teems_config.m_id_in;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Which will be at most this many from us
|
|
// Which will be at most this many from us
|
|
@@ -114,8 +114,8 @@ bool route_init()
|
|
|
max_stg_msgs = (tot_msg_per_stg/g_teems_config.tot_weight
|
|
max_stg_msgs = (tot_msg_per_stg/g_teems_config.tot_weight
|
|
|
+ g_teems_config.tot_weight) * g_teems_config.tot_weight;
|
|
+ g_teems_config.tot_weight) * g_teems_config.tot_weight;
|
|
|
|
|
|
|
|
- // Calculating public-routing buffer sizes
|
|
|
|
|
- // Weights are not used in public routing
|
|
|
|
|
|
|
+ // Calculating ID channel buffer sizes
|
|
|
|
|
+ // Weights are not used in ID channel routing
|
|
|
// Round up to a multiple of num_routing_nodes
|
|
// Round up to a multiple of num_routing_nodes
|
|
|
uint32_t max_round1b_msgs_to_adj_rtr = CEILDIV(
|
|
uint32_t max_round1b_msgs_to_adj_rtr = CEILDIV(
|
|
|
(g_teems_config.num_routing_nodes-1)*(g_teems_config.num_routing_nodes-1),
|
|
(g_teems_config.num_routing_nodes-1)*(g_teems_config.num_routing_nodes-1),
|
|
@@ -154,7 +154,7 @@ bool route_init()
|
|
|
printf("route_init alloc %u msgs\n", max_round2_msgs);
|
|
printf("route_init alloc %u msgs\n", max_round2_msgs);
|
|
|
printf("route_init H3 heap %u\n", g_peak_heap_used);
|
|
printf("route_init H3 heap %u\n", g_peak_heap_used);
|
|
|
#endif
|
|
#endif
|
|
|
- if (!g_teems_config.private_routing) {
|
|
|
|
|
|
|
+ if (!g_teems_config.token_channel) {
|
|
|
route_state.round1a.alloc(max_round1a_msgs);
|
|
route_state.round1a.alloc(max_round1a_msgs);
|
|
|
route_state.round1a_sorted.alloc(max_round1a_msgs +
|
|
route_state.round1a_sorted.alloc(max_round1a_msgs +
|
|
|
max_round1b_msgs_to_adj_rtr);
|
|
max_round1b_msgs_to_adj_rtr);
|
|
@@ -205,7 +205,7 @@ bool route_init()
|
|
|
}
|
|
}
|
|
|
if (my_roles & ROLE_ROUTING) {
|
|
if (my_roles & ROLE_ROUTING) {
|
|
|
sort_precompute_evalplan(max_round2_msgs, nthreads);
|
|
sort_precompute_evalplan(max_round2_msgs, nthreads);
|
|
|
- if(!g_teems_config.private_routing) {
|
|
|
|
|
|
|
+ if(!g_teems_config.token_channel) {
|
|
|
sort_precompute_evalplan(max_round1a_msgs, nthreads);
|
|
sort_precompute_evalplan(max_round1a_msgs, nthreads);
|
|
|
sort_precompute_evalplan(2*max_round1b_msgs_to_adj_rtr, nthreads);
|
|
sort_precompute_evalplan(2*max_round1b_msgs_to_adj_rtr, nthreads);
|
|
|
}
|
|
}
|
|
@@ -264,14 +264,14 @@ size_t ecall_precompute_sort(int sizeidx)
|
|
|
}
|
|
}
|
|
|
if (my_roles & ROLE_ROUTING) {
|
|
if (my_roles & ROLE_ROUTING) {
|
|
|
used_sizes.push_back(route_state.max_round2_msgs);
|
|
used_sizes.push_back(route_state.max_round2_msgs);
|
|
|
- if(!g_teems_config.private_routing) {
|
|
|
|
|
|
|
+ if(!g_teems_config.token_channel) {
|
|
|
used_sizes.push_back(route_state.max_round1a_msgs);
|
|
used_sizes.push_back(route_state.max_round1a_msgs);
|
|
|
used_sizes.push_back(2*route_state.max_round1b_msgs_to_adj_rtr);
|
|
used_sizes.push_back(2*route_state.max_round1b_msgs_to_adj_rtr);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
if (my_roles & ROLE_STORAGE) {
|
|
if (my_roles & ROLE_STORAGE) {
|
|
|
used_sizes.push_back(route_state.max_stg_msgs);
|
|
used_sizes.push_back(route_state.max_stg_msgs);
|
|
|
- if(!g_teems_config.private_routing) {
|
|
|
|
|
|
|
+ if(!g_teems_config.token_channel) {
|
|
|
used_sizes.push_back(route_state.max_stg_msgs);
|
|
used_sizes.push_back(route_state.max_stg_msgs);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -342,7 +342,7 @@ static void round1_received(NodeCommState &nodest,
|
|
|
pthread_mutex_unlock(&route_state.round1.mutex);
|
|
pthread_mutex_unlock(&route_state.round1.mutex);
|
|
|
|
|
|
|
|
// What is the next message we expect from this node?
|
|
// What is the next message we expect from this node?
|
|
|
- if (g_teems_config.private_routing) {
|
|
|
|
|
|
|
+ if (g_teems_config.token_channel) {
|
|
|
if ((our_roles & ROLE_STORAGE) && (their_roles & ROLE_ROUTING)) {
|
|
if ((our_roles & ROLE_STORAGE) && (their_roles & ROLE_ROUTING)) {
|
|
|
nodest.in_msg_get_buf = [&](NodeCommState &commst,
|
|
nodest.in_msg_get_buf = [&](NodeCommState &commst,
|
|
|
uint32_t tot_enc_chunk_size) {
|
|
uint32_t tot_enc_chunk_size) {
|
|
@@ -743,7 +743,7 @@ static void send_round_robin_msgs(MsgBuffer &round, const uint8_t *msgs,
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// Send the round 1a messages from the round 1 buffer, which only occurs in public-channel routing.
|
|
|
|
|
|
|
+// Send the round 1a messages from the round 1 buffer, which only occurs in ID-channel routing.
|
|
|
// msgs points to the message buffer, indices points to the the sorted indices, and N is the number
|
|
// msgs points to the message buffer, indices points to the the sorted indices, and N is the number
|
|
|
// of non-padding items.
|
|
// of non-padding items.
|
|
|
static void send_round1a_msgs(const uint8_t *msgs, const UidPriorityKey *indices, uint32_t N) {
|
|
static void send_round1a_msgs(const uint8_t *msgs, const UidPriorityKey *indices, uint32_t N) {
|
|
@@ -805,7 +805,7 @@ static void send_round1a_msgs(const uint8_t *msgs, const UidPriorityKey *indices
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// Send the round 1b messages from the round 1a buffer, which only occurs in public-channel routing.
|
|
|
|
|
|
|
+// Send the round 1b messages from the round 1a buffer, which only occurs in ID-channel routing.
|
|
|
// msgs points to the message buffer, and N is the number of non-padding items.
|
|
// msgs points to the message buffer, and N is the number of non-padding items.
|
|
|
// Return the number of messages sent
|
|
// Return the number of messages sent
|
|
|
static uint32_t send_round1b_msgs(const uint8_t *msgs, uint32_t N) {
|
|
static uint32_t send_round1b_msgs(const uint8_t *msgs, uint32_t N) {
|
|
@@ -823,7 +823,7 @@ static uint32_t send_round1b_msgs(const uint8_t *msgs, uint32_t N) {
|
|
|
// bytes for the receiver id in the next message we _didn't_
|
|
// bytes for the receiver id in the next message we _didn't_
|
|
|
// send, and 1 byte for the number of messages we have at the
|
|
// send, and 1 byte for the number of messages we have at the
|
|
|
// beginning of the buffer of messages we didn't send (max
|
|
// beginning of the buffer of messages we didn't send (max
|
|
|
- // pub_in) with the same receiver id
|
|
|
|
|
|
|
+ // id_in) with the same receiver id
|
|
|
nodecom.message_start(num_msgs * msg_size + 5);
|
|
nodecom.message_start(num_msgs * msg_size + 5);
|
|
|
nodecom.message_data(msgs, num_msgs * msg_size);
|
|
nodecom.message_data(msgs, num_msgs * msg_size);
|
|
|
uint32_t next_receiver_id = 0xffffffff;
|
|
uint32_t next_receiver_id = 0xffffffff;
|
|
@@ -835,15 +835,15 @@ static uint32_t send_round1b_msgs(const uint8_t *msgs, uint32_t N) {
|
|
|
num_msgs * msg_size);
|
|
num_msgs * msg_size);
|
|
|
next_rid_count = 1;
|
|
next_rid_count = 1;
|
|
|
|
|
|
|
|
- // If pub_in > 1, obliviously scan messages num_msgs+1 ..
|
|
|
|
|
- // num_msgs+(pub_in-1) and as long as they have the same
|
|
|
|
|
|
|
+ // If id_in > 1, obliviously scan messages num_msgs+1 ..
|
|
|
|
|
+ // num_msgs+(id_in-1) and as long as they have the same
|
|
|
// receiver id as next_receiver_id, add 1 to next_rid_count (but
|
|
// receiver id as next_receiver_id, add 1 to next_rid_count (but
|
|
|
// don't go past message N of course)
|
|
// don't go past message N of course)
|
|
|
|
|
|
|
|
// This count _includes_ the first message already scanned
|
|
// This count _includes_ the first message already scanned
|
|
|
// above. It is not private.
|
|
// above. It is not private.
|
|
|
uint8_t num_to_scan = uint8_t(std::min(N - num_msgs,
|
|
uint8_t num_to_scan = uint8_t(std::min(N - num_msgs,
|
|
|
- uint32_t(g_teems_config.m_pub_in)));
|
|
|
|
|
|
|
+ uint32_t(g_teems_config.m_id_in)));
|
|
|
const unsigned char *scan_msg = msgs +
|
|
const unsigned char *scan_msg = msgs +
|
|
|
(num_msgs + 1) * msg_size;
|
|
(num_msgs + 1) * msg_size;
|
|
|
for (uint8_t i=1; i<num_to_scan; ++i) {
|
|
for (uint8_t i=1; i<num_to_scan; ++i) {
|
|
@@ -1190,7 +1190,7 @@ static void round1c_processing(void *cbpointer) {
|
|
|
// priority. Going from the end of the buffer to the beginning
|
|
// priority. Going from the end of the buffer to the beginning
|
|
|
// (so as to encounter and keep the highest-priority messages
|
|
// (so as to encounter and keep the highest-priority messages
|
|
|
// for any given receiver first), obliviously turn any messages
|
|
// for any given receiver first), obliviously turn any messages
|
|
|
- // over the limit of pub_in for any given receiver into padding.
|
|
|
|
|
|
|
+ // over the limit of id_in for any given receiver into padding.
|
|
|
// Also keep track of which messages are not padding for use in
|
|
// Also keep track of which messages are not padding for use in
|
|
|
// later compaction.
|
|
// later compaction.
|
|
|
bool *is_not_padding = new bool[round1a_sorted.inserted];
|
|
bool *is_not_padding = new bool[round1a_sorted.inserted];
|
|
@@ -1198,7 +1198,7 @@ static void round1c_processing(void *cbpointer) {
|
|
|
uint8_t *header = round1a_sorted.buf +
|
|
uint8_t *header = round1a_sorted.buf +
|
|
|
msg_size * (round1a_sorted.inserted - 1 - i);
|
|
msg_size * (round1a_sorted.inserted - 1 - i);
|
|
|
uint32_t receiver_id = *(uint32_t*)header;
|
|
uint32_t receiver_id = *(uint32_t*)header;
|
|
|
- uint32_t pub_in = uint32_t(g_teems_config.m_pub_in);
|
|
|
|
|
|
|
+ uint32_t id_in = uint32_t(g_teems_config.m_id_in);
|
|
|
|
|
|
|
|
// These are the possible cases and what we need to do in
|
|
// These are the possible cases and what we need to do in
|
|
|
// each case, but we have to evaluate them obliviously
|
|
// each case, but we have to evaluate them obliviously
|
|
@@ -1207,11 +1207,11 @@ static void round1c_processing(void *cbpointer) {
|
|
|
// next_receiver_id = receiver_id
|
|
// next_receiver_id = receiver_id
|
|
|
// next_rid_count = 1
|
|
// next_rid_count = 1
|
|
|
// become_padding = 0
|
|
// become_padding = 0
|
|
|
- // receiver_id == next_receiver_id && next_rid_count < pub_in:
|
|
|
|
|
|
|
+ // receiver_id == next_receiver_id && next_rid_count < id_in:
|
|
|
// next_receiver_id = receiver_id
|
|
// next_receiver_id = receiver_id
|
|
|
// next_rid_count = next_rid_count + 1
|
|
// next_rid_count = next_rid_count + 1
|
|
|
// become_padding = 0
|
|
// become_padding = 0
|
|
|
- // receiver_id == next_receiver_id && next_rid_count >= pub_in:
|
|
|
|
|
|
|
+ // receiver_id == next_receiver_id && next_rid_count >= id_in:
|
|
|
// next_receiver_id = receiver_id
|
|
// next_receiver_id = receiver_id
|
|
|
// next_rid_count = next_rid_count
|
|
// next_rid_count = next_rid_count
|
|
|
// become_padding = 1
|
|
// become_padding = 1
|
|
@@ -1222,7 +1222,7 @@ static void round1c_processing(void *cbpointer) {
|
|
|
// This method (AND with -same_receiver_id) is more likely
|
|
// This method (AND with -same_receiver_id) is more likely
|
|
|
// to be constant time than multiplying by same_receiver_id.
|
|
// to be constant time than multiplying by same_receiver_id.
|
|
|
next_rid_count &= (-(uint32_t(same_receiver_id)));
|
|
next_rid_count &= (-(uint32_t(same_receiver_id)));
|
|
|
- bool become_padding = (next_rid_count >= pub_in);
|
|
|
|
|
|
|
+ bool become_padding = (next_rid_count >= id_in);
|
|
|
next_rid_count += !become_padding;
|
|
next_rid_count += !become_padding;
|
|
|
next_receiver_id = receiver_id;
|
|
next_receiver_id = receiver_id;
|
|
|
|
|
|
|
@@ -1443,7 +1443,7 @@ void ecall_routing_proceed(void *cbpointer)
|
|
|
printf_with_rtclock("begin oblivious sort (%u,%u)\n", inserted,
|
|
printf_with_rtclock("begin oblivious sort (%u,%u)\n", inserted,
|
|
|
route_state.tot_msg_per_ing);
|
|
route_state.tot_msg_per_ing);
|
|
|
#endif
|
|
#endif
|
|
|
- if (g_teems_config.private_routing) {
|
|
|
|
|
|
|
+ if (g_teems_config.token_channel) {
|
|
|
sort_mtobliv<UidKey>(g_teems_config.nthreads, ingbuf.buf,
|
|
sort_mtobliv<UidKey>(g_teems_config.nthreads, ingbuf.buf,
|
|
|
g_teems_config.msg_size, ingbuf.inserted,
|
|
g_teems_config.msg_size, ingbuf.inserted,
|
|
|
route_state.tot_msg_per_ing,
|
|
route_state.tot_msg_per_ing,
|
|
@@ -1491,9 +1491,9 @@ void ecall_routing_proceed(void *cbpointer)
|
|
|
ocall_routing_round_complete(cbpointer, 1);
|
|
ocall_routing_round_complete(cbpointer, 1);
|
|
|
}
|
|
}
|
|
|
} else if (route_state.step == ROUTE_ROUND_1) {
|
|
} else if (route_state.step == ROUTE_ROUND_1) {
|
|
|
- if (g_teems_config.private_routing) { // private routing next round
|
|
|
|
|
|
|
+ if (g_teems_config.token_channel) { // Token channel routing next round
|
|
|
round2_processing(my_roles, cbpointer, route_state.round1);
|
|
round2_processing(my_roles, cbpointer, route_state.round1);
|
|
|
- } else { // public routing next round
|
|
|
|
|
|
|
+ } else { // ID channel routing next round
|
|
|
round1a_processing(cbpointer);
|
|
round1a_processing(cbpointer);
|
|
|
}
|
|
}
|
|
|
} else if (route_state.step == ROUTE_ROUND_1A) {
|
|
} else if (route_state.step == ROUTE_ROUND_1A) {
|