route.hpp 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. #ifndef __ROUTE_HPP__
  2. #define __ROUTE_HPP__
  3. #include <pthread.h>
  4. struct MsgBuffer {
  5. pthread_mutex_t mutex;
  6. uint8_t *buf;
  7. // The number of messages (not bytes) in, or on their way into, the
  8. // buffer
  9. uint32_t reserved;
  10. // The number of messages definitely in the buffer
  11. uint32_t inserted;
  12. // The number of messages that can fit in buf
  13. uint32_t bufsize;
  14. // The number of nodes we've heard from
  15. nodenum_t nodes_received;
  16. // Have we completed the previous round yet?
  17. bool completed_prev_round;
  18. MsgBuffer() : buf(NULL), reserved(0), inserted(0), bufsize(0),
  19. nodes_received(0), completed_prev_round(false) {
  20. pthread_mutex_init(&mutex, NULL);
  21. }
  22. ~MsgBuffer() {
  23. delete[] buf;
  24. }
  25. // The number passed is messages, not bytes
  26. void alloc(uint32_t msgs) {
  27. delete[] buf;
  28. buf = NULL;
  29. reserved = 0;
  30. inserted = 0;
  31. // This may throw bad_alloc, but we'll catch it higher up
  32. buf = new uint8_t[size_t(msgs) * g_teems_config.msg_size];
  33. memset(buf, 0, size_t(msgs) * g_teems_config.msg_size);
  34. bufsize = msgs;
  35. nodes_received = 0;
  36. completed_prev_round = false;
  37. }
  38. // Reset the contents of the buffer
  39. void reset() {
  40. memset(buf, 0, bufsize * g_teems_config.msg_size);
  41. reserved = 0;
  42. inserted = 0;
  43. nodes_received = 0;
  44. completed_prev_round = false;
  45. }
  46. // You can't copy a MsgBuffer
  47. MsgBuffer(const MsgBuffer&) = delete;
  48. MsgBuffer &operator=(const MsgBuffer&) = delete;
  49. };
  50. enum RouteStep {
  51. ROUTE_NOT_STARTED,
  52. ROUTE_ROUND_1,
  53. ROUTE_ROUND_2
  54. };
  55. // The ingbuf MsgBuffer stores messages an ingestion node ingests while
  56. // waiting for round 1 to start, which will be sorted and sent out in
  57. // round 1. The round1 MsgBuffer stores messages a routing node
  58. // receives in round 1, which will be padded, sorted, and sent out in
  59. // round 2. The round2 MsgBuffer stores messages a storage node
  60. // receives in round 2.
  61. struct RouteState {
  62. MsgBuffer ingbuf;
  63. MsgBuffer round1;
  64. MsgBuffer round2;
  65. RouteStep step;
  66. uint32_t tot_msg_per_ing;
  67. uint32_t max_msg_to_each_stg;
  68. uint32_t max_round2_msgs;
  69. uint32_t max_stg_msgs;
  70. void *cbpointer;
  71. };
  72. extern RouteState route_state;
  73. // Call this near the end of ecall_config_load, but before
  74. // comms_init_nodestate. Returns true on success, false on failure.
  75. bool route_init();
  76. // Call when shutting system down to deallocate routing state
  77. void route_close();
  78. // For a given other node, set the received message handler to the first
  79. // message we would expect from them, given their roles and our roles.
  80. void route_init_msg_handler(nodenum_t node_num);
  81. #endif