|
@@ -146,9 +146,18 @@ bool route_init()
|
|
|
*/
|
|
|
|
|
|
// Create the route state
|
|
|
+ uint8_t my_roles = g_teems_config.roles[g_teems_config.my_node_num];
|
|
|
try {
|
|
|
- route_state.ingbuf.alloc(tot_msg_per_ing);
|
|
|
- route_state.round1.alloc(max_round2_msgs);
|
|
|
+ if (my_roles & ROLE_INGESTION) {
|
|
|
+ route_state.ingbuf.alloc(tot_msg_per_ing);
|
|
|
+ }
|
|
|
+ if (my_roles & ROLE_ROUTING) {
|
|
|
+ route_state.round1.alloc(max_round2_msgs);
|
|
|
+ }
|
|
|
+ if (my_roles & ROLE_STORAGE) {
|
|
|
+ route_state.round2.alloc(tot_msg_per_str +
|
|
|
+ g_teems_config.tot_weight);
|
|
|
+ }
|
|
|
} catch (std::bad_alloc&) {
|
|
|
printf("Memory allocation failed in route_init\n");
|
|
|
return false;
|
|
@@ -436,49 +445,62 @@ static void send_round1_msgs(const uint8_t *msgs, const uint64_t *indices,
|
|
|
// passed to ocall_routing_round_complete when the round is complete.
|
|
|
void ecall_routing_proceed(void *cbpointer)
|
|
|
{
|
|
|
+ uint8_t my_roles = g_teems_config.roles[g_teems_config.my_node_num];
|
|
|
+
|
|
|
if (route_state.step == ROUTE_NOT_STARTED) {
|
|
|
- route_state.cbpointer = cbpointer;
|
|
|
- MsgBuffer &ingbuf = route_state.ingbuf;
|
|
|
+ if (my_roles & ROLE_INGESTION) {
|
|
|
+ route_state.cbpointer = cbpointer;
|
|
|
+ MsgBuffer &ingbuf = route_state.ingbuf;
|
|
|
|
|
|
- pthread_mutex_lock(&ingbuf.mutex);
|
|
|
- // Ensure there are no pending messages currently being inserted
|
|
|
- // into the buffer
|
|
|
- while (ingbuf.reserved != ingbuf.inserted) {
|
|
|
- pthread_mutex_unlock(&ingbuf.mutex);
|
|
|
pthread_mutex_lock(&ingbuf.mutex);
|
|
|
- }
|
|
|
- // Sort the messages we've received
|
|
|
+ // Ensure there are no pending messages currently being inserted
|
|
|
+ // into the buffer
|
|
|
+ while (ingbuf.reserved != ingbuf.inserted) {
|
|
|
+ pthread_mutex_unlock(&ingbuf.mutex);
|
|
|
+ pthread_mutex_lock(&ingbuf.mutex);
|
|
|
+ }
|
|
|
+ // Sort the messages we've received
|
|
|
#ifdef PROFILE_ROUTING
|
|
|
- uint32_t inserted = ingbuf.inserted;
|
|
|
- unsigned long start = printf_with_rtclock("begin oblivious sort (%u,%u)\n", inserted, route_state.tot_msg_per_ing);
|
|
|
+ uint32_t inserted = ingbuf.inserted;
|
|
|
+ unsigned long start = printf_with_rtclock("begin oblivious sort (%u,%u)\n", inserted, route_state.tot_msg_per_ing);
|
|
|
#endif
|
|
|
- sort_mtobliv(g_teems_config.nthreads, ingbuf.buf,
|
|
|
- g_teems_config.msg_size, ingbuf.inserted,
|
|
|
- route_state.tot_msg_per_ing, send_round1_msgs);
|
|
|
+ sort_mtobliv(g_teems_config.nthreads, ingbuf.buf,
|
|
|
+ g_teems_config.msg_size, ingbuf.inserted,
|
|
|
+ route_state.tot_msg_per_ing, send_round1_msgs);
|
|
|
#ifdef PROFILE_ROUTING
|
|
|
- printf_with_rtclock_diff(start, "end oblivious sort (%u,%u)\n", inserted, route_state.tot_msg_per_ing);
|
|
|
+ printf_with_rtclock_diff(start, "end oblivious sort (%u,%u)\n", inserted, route_state.tot_msg_per_ing);
|
|
|
#endif
|
|
|
- ingbuf.reset();
|
|
|
- pthread_mutex_unlock(&ingbuf.mutex);
|
|
|
+ ingbuf.reset();
|
|
|
+ pthread_mutex_unlock(&ingbuf.mutex);
|
|
|
+ } else {
|
|
|
+ route_state.step = ROUTE_ROUND_1;
|
|
|
+ ocall_routing_round_complete(cbpointer, 1);
|
|
|
+ }
|
|
|
} else if (route_state.step == ROUTE_ROUND_1) {
|
|
|
- route_state.cbpointer = cbpointer;
|
|
|
- MsgBuffer &round1 = route_state.round1;
|
|
|
+ if (my_roles & ROLE_ROUTING) {
|
|
|
+ route_state.cbpointer = cbpointer;
|
|
|
+ MsgBuffer &round1 = route_state.round1;
|
|
|
|
|
|
- pthread_mutex_lock(&round1.mutex);
|
|
|
- // Ensure there are no pending messages currently being inserted
|
|
|
- // into the buffer
|
|
|
- while (round1.reserved != round1.inserted) {
|
|
|
- pthread_mutex_unlock(&round1.mutex);
|
|
|
pthread_mutex_lock(&round1.mutex);
|
|
|
- }
|
|
|
+ // Ensure there are no pending messages currently being inserted
|
|
|
+ // into the buffer
|
|
|
+ while (round1.reserved != round1.inserted) {
|
|
|
+ pthread_mutex_unlock(&round1.mutex);
|
|
|
+ pthread_mutex_lock(&round1.mutex);
|
|
|
+ }
|
|
|
|
|
|
- uint32_t msg_size = g_teems_config.msg_size;
|
|
|
- for(uint32_t i=0;i<round1.inserted;++i) {
|
|
|
- uint32_t destaddr = *(uint32_t*)(round1.buf+i*msg_size);
|
|
|
- printf("%08x\n", destaddr);
|
|
|
- }
|
|
|
+ uint32_t msg_size = g_teems_config.msg_size;
|
|
|
+ for(uint32_t i=0;i<round1.inserted;++i) {
|
|
|
+ uint32_t destaddr = *(uint32_t*)(round1.buf+i*msg_size);
|
|
|
+ printf("%08x\n", destaddr);
|
|
|
+ }
|
|
|
|
|
|
- round1.reset();
|
|
|
- pthread_mutex_unlock(&round1.mutex);
|
|
|
+ round1.reset();
|
|
|
+ pthread_mutex_unlock(&round1.mutex);
|
|
|
+ } else {
|
|
|
+ // We're done
|
|
|
+ route_state.step = ROUTE_NOT_STARTED;
|
|
|
+ ocall_routing_round_complete(cbpointer, 2);
|
|
|
+ }
|
|
|
}
|
|
|
}
|