route.hpp 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. #ifndef __ROUTE_HPP__
  2. #define __ROUTE_HPP__
  3. #include <pthread.h>
  4. #define ROUND_1A 11
  5. #define ROUND_1B 12
  6. #define ROUND_1C 13
  7. struct MsgBuffer {
  8. pthread_mutex_t mutex;
  9. uint8_t *buf;
  10. // The number of messages (not bytes) in, or on their way into, the
  11. // buffer
  12. uint32_t reserved;
  13. // The number of messages definitely in the buffer
  14. uint32_t inserted;
  15. // The number of messages that can fit in buf
  16. uint32_t bufsize;
  17. // The number of nodes we've heard from
  18. nodenum_t nodes_received;
  19. // Have we completed the previous round yet?
  20. bool completed_prev_round;
  21. MsgBuffer() : buf(NULL), reserved(0), inserted(0), bufsize(0),
  22. nodes_received(0), completed_prev_round(false) {
  23. pthread_mutex_init(&mutex, NULL);
  24. }
  25. ~MsgBuffer() {
  26. delete[] buf;
  27. }
  28. // The number passed is messages, not bytes
  29. void alloc(uint32_t msgs) {
  30. delete[] buf;
  31. buf = NULL;
  32. reserved = 0;
  33. inserted = 0;
  34. // This may throw bad_alloc, but we'll catch it higher up
  35. buf = new uint8_t[size_t(msgs) * g_teems_config.msg_size];
  36. memset(buf, 0, size_t(msgs) * g_teems_config.msg_size);
  37. bufsize = msgs;
  38. nodes_received = 0;
  39. completed_prev_round = false;
  40. }
  41. // Reset the contents of the buffer
  42. void reset() {
  43. memset(buf, 0, bufsize * g_teems_config.msg_size);
  44. reserved = 0;
  45. inserted = 0;
  46. nodes_received = 0;
  47. completed_prev_round = false;
  48. }
  49. // You can't copy a MsgBuffer
  50. MsgBuffer(const MsgBuffer&) = delete;
  51. MsgBuffer &operator=(const MsgBuffer&) = delete;
  52. };
  53. enum RouteStep {
  54. ROUTE_NOT_STARTED,
  55. ROUTE_ROUND_1,
  56. ROUTE_ROUND_1A,
  57. ROUTE_ROUND_1B,
  58. ROUTE_ROUND_1C,
  59. ROUTE_ROUND_2
  60. };
  61. // The ingbuf MsgBuffer stores messages an ingestion node ingests while
  62. // waiting for round 1 to start, which will be sorted and sent out in
  63. // round 1. The round1 MsgBuffer stores messages a routing node
  64. // receives in round 1, which will be padded, sorted, and sent out in
  65. // round 2. The round2 MsgBuffer stores messages a storage node
  66. // receives in round 2.
  67. struct RouteState {
  68. MsgBuffer ingbuf;
  69. MsgBuffer round1;
  70. MsgBuffer round1a;
  71. MsgBuffer round1a_sorted;
  72. MsgBuffer round1b;
  73. MsgBuffer round1c;
  74. MsgBuffer round2;
  75. RouteStep step;
  76. uint32_t tot_msg_per_ing;
  77. uint32_t max_msg_to_each_stg;
  78. uint32_t max_round1_msgs;
  79. uint32_t max_round1a_msgs;
  80. uint32_t max_round1b_msgs_to_adj_rtr;
  81. uint32_t max_round1b_msgs;
  82. uint32_t max_round1c_msgs;
  83. uint32_t max_round2_msgs;
  84. uint32_t max_stg_msgs;
  85. void *cbpointer;
  86. };
  87. extern RouteState route_state;
  88. // Call this near the end of ecall_config_load, but before
  89. // comms_init_nodestate. Returns true on success, false on failure.
  90. bool route_init();
  91. // Call when shutting system down to deallocate routing state
  92. void route_close();
  93. // For a given other node, set the received message handler to the first
  94. // message we would expect from them, given their roles and our roles.
  95. void route_init_msg_handler(nodenum_t node_num);
  96. #endif