123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- #ifndef __ROUTE_HPP__
- #define __ROUTE_HPP__
- #include <pthread.h>
- #define ROUND_1A 11
- #define ROUND_1B 12
- #define ROUND_1C 13
- struct MsgBuffer {
- pthread_mutex_t mutex;
- uint8_t *buf;
- // The number of messages (not bytes) in, or on their way into, the
- // buffer
- uint32_t reserved;
- // The number of messages definitely in the buffer
- uint32_t inserted;
- // The number of messages that can fit in buf
- uint32_t bufsize;
- // The number of nodes we've heard from
- nodenum_t nodes_received;
- // Have we completed the previous round yet?
- bool completed_prev_round;
- MsgBuffer() : buf(NULL), reserved(0), inserted(0), bufsize(0),
- nodes_received(0), completed_prev_round(false) {
- pthread_mutex_init(&mutex, NULL);
- }
- ~MsgBuffer() {
- delete[] buf;
- }
- // The number passed is messages, not bytes
- void alloc(uint32_t msgs) {
- delete[] buf;
- buf = NULL;
- reserved = 0;
- inserted = 0;
- // This may throw bad_alloc, but we'll catch it higher up
- buf = new uint8_t[size_t(msgs) * g_teems_config.msg_size];
- memset(buf, 0, size_t(msgs) * g_teems_config.msg_size);
- bufsize = msgs;
- nodes_received = 0;
- completed_prev_round = false;
- }
- // Reset the contents of the buffer
- void reset() {
- memset(buf, 0, bufsize * g_teems_config.msg_size);
- reserved = 0;
- inserted = 0;
- nodes_received = 0;
- completed_prev_round = false;
- }
- // You can't copy a MsgBuffer
- MsgBuffer(const MsgBuffer&) = delete;
- MsgBuffer &operator=(const MsgBuffer&) = delete;
- };
- enum RouteStep {
- ROUTE_NOT_STARTED,
- ROUTE_ROUND_1,
- ROUTE_ROUND_1A,
- ROUTE_ROUND_1B,
- ROUTE_ROUND_1C,
- ROUTE_ROUND_2
- };
- // The ingbuf MsgBuffer stores messages an ingestion node ingests while
- // waiting for round 1 to start, which will be sorted and sent out in
- // round 1. The round1 MsgBuffer stores messages a routing node
- // receives in round 1, which will be padded, sorted, and sent out in
- // round 2. The round2 MsgBuffer stores messages a storage node
- // receives in round 2.
- struct RouteState {
- MsgBuffer ingbuf;
- MsgBuffer round1;
- MsgBuffer round1a;
- MsgBuffer round1a_sorted;
- MsgBuffer round1b_prev;
- MsgBuffer round1b_next;
- MsgBuffer round1c;
- MsgBuffer round2;
- RouteStep step;
- uint32_t tot_msg_per_ing;
- uint32_t max_msg_to_each_stg;
- uint32_t max_round1_msgs;
- uint32_t max_round1a_msgs;
- uint32_t max_round1b_msgs_to_adj_rtr;
- uint32_t max_round1c_msgs;
- uint32_t max_round2_msgs;
- uint32_t max_stg_msgs;
- void *cbpointer;
- };
- extern RouteState route_state;
- // Call this near the end of ecall_config_load, but before
- // comms_init_nodestate. Returns true on success, false on failure.
- bool route_init();
- // Call when shutting system down to deallocate routing state
- void route_close();
- // For a given other node, set the received message handler to the first
- // message we would expect from them, given their roles and our roles.
- void route_init_msg_handler(nodenum_t node_num);
- #endif
|