12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- #ifndef __ROUTE_HPP__
- #define __ROUTE_HPP__
- #include <pthread.h>
- struct MsgBuffer {
- pthread_mutex_t mutex;
- uint8_t *buf;
- // The number of messages (not bytes) in, or on their way into, the
- // buffer
- uint32_t reserved;
- // The number of messages definitely in the buffer
- uint32_t inserted;
- // The number of messages that can fit in buf
- uint32_t bufsize;
- // The number of nodes we've heard from
- nodenum_t nodes_received;
- // Have we completed the previous round yet?
- bool completed_prev_round;
- MsgBuffer() : buf(NULL), reserved(0), inserted(0), bufsize(0),
- nodes_received(0), completed_prev_round(false) {
- pthread_mutex_init(&mutex, NULL);
- }
- ~MsgBuffer() {
- delete[] buf;
- }
- // The number passed is messages, not bytes
- void alloc(uint32_t msgs) {
- delete[] buf;
- buf = NULL;
- reserved = 0;
- inserted = 0;
- // This may throw bad_alloc, but we'll catch it higher up
- buf = new uint8_t[size_t(msgs) * g_teems_config.msg_size];
- memset(buf, 0, size_t(msgs) * g_teems_config.msg_size);
- bufsize = msgs;
- nodes_received = 0;
- completed_prev_round = false;
- }
- // Reset the contents of the buffer
- void reset() {
- memset(buf, 0, bufsize * g_teems_config.msg_size);
- reserved = 0;
- inserted = 0;
- nodes_received = 0;
- completed_prev_round = false;
- }
- // You can't copy a MsgBuffer
- MsgBuffer(const MsgBuffer&) = delete;
- MsgBuffer &operator=(const MsgBuffer&) = delete;
- };
- enum RouteStep {
- ROUTE_NOT_STARTED,
- ROUTE_ROUND_1,
- ROUTE_ROUND_2
- };
- // The ingbuf MsgBuffer stores messages an ingestion node ingests while
- // waiting for round 1 to start, which will be sorted and sent out in
- // round 1. The round1 MsgBuffer stores messages a routing node
- // receives in round 1, which will be padded, sorted, and sent out in
- // round 2. The round2 MsgBuffer stores messages a storage node
- // receives in round 2.
- struct RouteState {
- MsgBuffer ingbuf;
- MsgBuffer round1;
- MsgBuffer round2;
- RouteStep step;
- uint32_t tot_msg_per_ing;
- uint32_t max_msg_to_each_stg;
- uint32_t max_round2_msgs;
- uint32_t max_stg_msgs;
- void *cbpointer;
- };
- extern RouteState route_state;
- // Call this near the end of ecall_config_load, but before
- // comms_init_nodestate. Returns true on success, false on failure.
- bool route_init();
- // Call when shutting system down to deallocate routing state
- void route_close();
- // For a given other node, set the received message handler to the first
- // message we would expect from them, given their roles and our roles.
- void route_init_msg_handler(nodenum_t node_num);
- #endif
|