#ifndef __ROUTE_HPP__ #define __ROUTE_HPP__ #include #define ROUND_1A 11 #define ROUND_1B 12 #define ROUND_1C 13 struct MsgBuffer { pthread_mutex_t mutex; uint8_t *buf; // The number of messages (not bytes) in, or on their way into, the // buffer uint32_t reserved; // The number of messages definitely in the buffer uint32_t inserted; // The number of messages that can fit in buf uint32_t bufsize; // The number of nodes we've heard from nodenum_t nodes_received; // Have we completed the previous round yet? bool completed_prev_round; MsgBuffer() : buf(NULL), reserved(0), inserted(0), bufsize(0), nodes_received(0), completed_prev_round(false) { pthread_mutex_init(&mutex, NULL); } ~MsgBuffer() { delete[] buf; } // The number passed is messages, not bytes void alloc(uint32_t msgs) { delete[] buf; buf = NULL; reserved = 0; inserted = 0; // This may throw bad_alloc, but we'll catch it higher up buf = new uint8_t[size_t(msgs) * g_teems_config.msg_size]; memset(buf, 0, size_t(msgs) * g_teems_config.msg_size); bufsize = msgs; nodes_received = 0; completed_prev_round = false; } // Reset the contents of the buffer void reset() { memset(buf, 0, bufsize * g_teems_config.msg_size); reserved = 0; inserted = 0; nodes_received = 0; completed_prev_round = false; } // You can't copy a MsgBuffer MsgBuffer(const MsgBuffer&) = delete; MsgBuffer &operator=(const MsgBuffer&) = delete; }; enum RouteStep { ROUTE_NOT_STARTED, ROUTE_ROUND_1, ROUTE_ROUND_1A, ROUTE_ROUND_1B, ROUTE_ROUND_1C, ROUTE_ROUND_2 }; // The ingbuf MsgBuffer stores messages an ingestion node ingests while // waiting for round 1 to start, which will be sorted and sent out in // round 1. The round1 MsgBuffer stores messages a routing node // receives in round 1, which will be padded, sorted, and sent out in // round 2. The round2 MsgBuffer stores messages a storage node // receives in round 2. struct RouteState { MsgBuffer ingbuf; MsgBuffer round1; MsgBuffer round1a; MsgBuffer round1a_sorted; MsgBuffer round1b_prev; MsgBuffer round1b_next; MsgBuffer round1c; MsgBuffer round2; RouteStep step; uint32_t tot_msg_per_ing; uint32_t max_msg_to_each_stg; uint32_t max_round1_msgs; uint32_t max_round1a_msgs; uint32_t max_round1b_msgs_to_adj_rtr; uint32_t max_round1c_msgs; uint32_t max_round2_msgs; uint32_t max_stg_msgs; void *cbpointer; }; extern RouteState route_state; // Call this near the end of ecall_config_load, but before // comms_init_nodestate. Returns true on success, false on failure. bool route_init(); // Call when shutting system down to deallocate routing state void route_close(); // For a given other node, set the received message handler to the first // message we would expect from them, given their roles and our roles. void route_init_msg_handler(nodenum_t node_num); #endif