|
@@ -510,7 +510,6 @@ void ecall_routing_proceed(void *cbpointer)
|
|
if (my_roles & ROLE_INGESTION) {
|
|
if (my_roles & ROLE_INGESTION) {
|
|
route_state.cbpointer = cbpointer;
|
|
route_state.cbpointer = cbpointer;
|
|
MsgBuffer &ingbuf = route_state.ingbuf;
|
|
MsgBuffer &ingbuf = route_state.ingbuf;
|
|
- MsgBuffer &round1 = route_state.round1;
|
|
|
|
|
|
|
|
pthread_mutex_lock(&ingbuf.mutex);
|
|
pthread_mutex_lock(&ingbuf.mutex);
|
|
// Ensure there are no pending messages currently being inserted
|
|
// Ensure there are no pending messages currently being inserted
|
|
@@ -534,6 +533,9 @@ void ecall_routing_proceed(void *cbpointer)
|
|
#endif
|
|
#endif
|
|
ingbuf.reset();
|
|
ingbuf.reset();
|
|
pthread_mutex_unlock(&ingbuf.mutex);
|
|
pthread_mutex_unlock(&ingbuf.mutex);
|
|
|
|
+ }
|
|
|
|
+ if (my_roles & ROLE_ROUTING) {
|
|
|
|
+ MsgBuffer &round1 = route_state.round1;
|
|
|
|
|
|
pthread_mutex_lock(&round1.mutex);
|
|
pthread_mutex_lock(&round1.mutex);
|
|
round1.completed_prev_round = true;
|
|
round1.completed_prev_round = true;
|
|
@@ -547,13 +549,13 @@ void ecall_routing_proceed(void *cbpointer)
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
route_state.step = ROUTE_ROUND_1;
|
|
route_state.step = ROUTE_ROUND_1;
|
|
|
|
+ route_state.round1.completed_prev_round = true;
|
|
ocall_routing_round_complete(cbpointer, 1);
|
|
ocall_routing_round_complete(cbpointer, 1);
|
|
}
|
|
}
|
|
} else if (route_state.step == ROUTE_ROUND_1) {
|
|
} else if (route_state.step == ROUTE_ROUND_1) {
|
|
if (my_roles & ROLE_ROUTING) {
|
|
if (my_roles & ROLE_ROUTING) {
|
|
route_state.cbpointer = cbpointer;
|
|
route_state.cbpointer = cbpointer;
|
|
MsgBuffer &round1 = route_state.round1;
|
|
MsgBuffer &round1 = route_state.round1;
|
|
- MsgBuffer &round2 = route_state.round2;
|
|
|
|
|
|
|
|
pthread_mutex_lock(&round1.mutex);
|
|
pthread_mutex_lock(&round1.mutex);
|
|
// Ensure there are no pending messages currently being inserted
|
|
// Ensure there are no pending messages currently being inserted
|
|
@@ -642,6 +644,11 @@ void ecall_routing_proceed(void *cbpointer)
|
|
round1.reset();
|
|
round1.reset();
|
|
pthread_mutex_unlock(&round1.mutex);
|
|
pthread_mutex_unlock(&round1.mutex);
|
|
|
|
|
|
|
|
+ }
|
|
|
|
+ if (my_roles & ROLE_STORAGE) {
|
|
|
|
+ route_state.cbpointer = cbpointer;
|
|
|
|
+ MsgBuffer &round2 = route_state.round2;
|
|
|
|
+
|
|
pthread_mutex_lock(&round2.mutex);
|
|
pthread_mutex_lock(&round2.mutex);
|
|
round2.completed_prev_round = true;
|
|
round2.completed_prev_round = true;
|
|
nodenum_t nodes_received = round2.nodes_received;
|
|
nodenum_t nodes_received = round2.nodes_received;
|
|
@@ -654,6 +661,7 @@ void ecall_routing_proceed(void *cbpointer)
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
route_state.step = ROUTE_ROUND_2;
|
|
route_state.step = ROUTE_ROUND_2;
|
|
|
|
+ route_state.round2.completed_prev_round = true;
|
|
ocall_routing_round_complete(cbpointer, 2);
|
|
ocall_routing_round_complete(cbpointer, 2);
|
|
}
|
|
}
|
|
} else if (route_state.step == ROUTE_ROUND_2) {
|
|
} else if (route_state.step == ROUTE_ROUND_2) {
|