storage.cpp 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. #include "utils.hpp"
  2. #include "config.hpp"
  3. #include "ORExpand.hpp"
  4. #include "sort.hpp"
  5. #include "storage.hpp"
  6. #define PROFILE_STORAGE
  7. static struct {
  8. uint32_t max_users;
  9. uint32_t my_storage_node_id;
  10. // A local storage buffer, used when we need to do non-in-place
  11. // sorts of the messages that have arrived
  12. MsgBuffer stg_buf;
  13. // The destination vector for ORExpand
  14. std::vector<uint32_t> dest;
  15. } storage_state;
  16. // route_init will call this function; no one else should call it
  17. // explicitly. The parameter is the number of messages that can fit in
  18. // the storage-side MsgBuffer. Returns true on success, false on
  19. // failure.
  20. bool storage_init(uint32_t max_users, uint32_t msg_buf_size)
  21. {
  22. storage_state.max_users = max_users;
  23. storage_state.stg_buf.alloc(msg_buf_size);
  24. storage_state.dest.resize(msg_buf_size);
  25. uint32_t my_storage_node_id = 0;
  26. for (nodenum_t i=0; i<g_teems_config.num_nodes; ++i) {
  27. if (g_teems_config.roles[i] & ROLE_STORAGE) {
  28. if (i == g_teems_config.my_node_num) {
  29. storage_state.my_storage_node_id = my_storage_node_id << DEST_UID_BITS;
  30. } else {
  31. ++my_storage_node_id;
  32. }
  33. }
  34. }
  35. return true;
  36. }
  37. // Handle the messages received by a storage node. Pass a _locked_
  38. // MsgBuffer. This function will itself reset and unlock it when it's
  39. // done with it.
  40. void storage_received(MsgBuffer &storage_buf)
  41. {
  42. uint16_t msg_size = g_teems_config.msg_size;
  43. nodenum_t my_node_num = g_teems_config.my_node_num;
  44. const uint8_t *msgs = storage_buf.buf;
  45. uint32_t num_msgs = storage_buf.inserted;
  46. uint32_t real = 0, padding = 0;
  47. uint32_t uid_mask = (1 << DEST_UID_BITS) - 1;
  48. uint32_t nid_mask = ~uid_mask;
  49. #ifdef PROFILE_STORAGE
  50. unsigned long start_received = printf_with_rtclock("begin storage_received (%u)\n", storage_buf.inserted);
  51. #endif
  52. // It's OK to test for errors in a way that's non-oblivous if
  53. // there's an error (but it should be oblivious if there are no
  54. // errors)
  55. for (uint32_t i=0; i<num_msgs; ++i) {
  56. uint32_t uid = *(const uint32_t*)(storage_buf.buf+(i*msg_size));
  57. bool ok = ((((uid & nid_mask) == storage_state.my_storage_node_id)
  58. & ((uid & uid_mask) < storage_state.max_users))
  59. | ((uid & uid_mask) == uid_mask));
  60. if (!ok) {
  61. printf("Received bad uid: %08x\n", uid);
  62. assert(ok);
  63. }
  64. }
  65. // Testing: report how many real and dummy messages arrived
  66. printf("Storage server received %u messages:\n", num_msgs);
  67. for (uint32_t i=0; i<num_msgs; ++i) {
  68. uint32_t dest_addr = *(const uint32_t*)msgs;
  69. nodenum_t dest_node =
  70. g_teems_config.storage_map[dest_addr >> DEST_UID_BITS];
  71. if (dest_node != my_node_num) {
  72. char hexbuf[2*msg_size + 1];
  73. for (uint32_t j=0;j<msg_size;++j) {
  74. snprintf(hexbuf+2*j, 3, "%02x", msgs[j]);
  75. }
  76. printf("Misrouted message: %s\n", hexbuf);
  77. } else if ((dest_addr & uid_mask) == uid_mask) {
  78. ++padding;
  79. } else {
  80. ++real;
  81. }
  82. msgs += msg_size;
  83. }
  84. printf("%u real, %u padding\n", real, padding);
  85. /*
  86. for (uint32_t i=0;i<num_msgs; ++i) {
  87. printf("%3d: %08x %08x\n", i,
  88. *(uint32_t*)(storage_buf.buf+(i*msg_size)),
  89. *(uint32_t*)(storage_buf.buf+(i*msg_size+4)));
  90. }
  91. */
  92. // Sort the received messages by userid into the
  93. // storage_state.stg_buf MsgBuffer.
  94. #ifdef PROFILE_STORAGE
  95. unsigned long start_sort = printf_with_rtclock("begin oblivious sort (%u)\n", storage_buf.inserted);
  96. #endif
  97. sort_mtobliv<UidKey>(g_teems_config.nthreads, storage_buf.buf,
  98. msg_size, storage_buf.inserted, storage_buf.bufsize,
  99. storage_state.stg_buf.buf);
  100. #ifdef PROFILE_STORAGE
  101. printf_with_rtclock_diff(start_sort, "end oblivious sort (%u)\n", storage_buf.inserted);
  102. #endif
  103. /*
  104. for (uint32_t i=0;i<num_msgs; ++i) {
  105. printf("%3d: %08x %08x\n", i,
  106. *(uint32_t*)(storage_state.stg_buf.buf+(i*msg_size)),
  107. *(uint32_t*)(storage_state.stg_buf.buf+(i*msg_size+4)));
  108. }
  109. */
  110. #ifdef PROFILE_STORAGE
  111. unsigned long start_dest = printf_with_rtclock("begin setting dests (%u)\n", storage_state.stg_buf.bufsize);
  112. #endif
  113. // Obliviously set the dest array
  114. uint32_t *dests = storage_state.dest.data();
  115. uint32_t stg_size = storage_state.stg_buf.bufsize;
  116. const uint8_t *buf = storage_state.stg_buf.buf;
  117. uint32_t m_priv_in = g_teems_config.m_priv_in;
  118. uint32_t uid = *(uint32_t*)(buf);
  119. // num_msgs is not a private value
  120. if (num_msgs > 0) {
  121. uid &= uid_mask;
  122. dests[0] = oselect_uint32_t(uid * m_priv_in, 0xffffffff,
  123. uid == uid_mask);
  124. }
  125. uint32_t prev_uid = uid;
  126. for (uint32_t i=1; i<num_msgs; ++i) {
  127. uid = *(uint32_t*)(buf + i*msg_size);
  128. uid &= uid_mask;
  129. dests[i] = oselect_uint32_t(
  130. oselect_uint32_t(uid * m_priv_in, dests[i-1]+1, uid==prev_uid),
  131. 0xffffffff, uid == uid_mask);
  132. prev_uid = uid;
  133. }
  134. for (uint32_t i=num_msgs; i<stg_size; ++i) {
  135. dests[i] = 0xffffffff;
  136. *(uint32_t*)(buf + i*msg_size) = 0xffffffff;
  137. }
  138. /*
  139. for (uint32_t i=0;i<stg_size; ++i) {
  140. printf("%3d: %08x %08x %u\n", i,
  141. *(uint32_t*)(storage_state.stg_buf.buf+(i*msg_size)),
  142. *(uint32_t*)(storage_state.stg_buf.buf+(i*msg_size+4))),
  143. dests[i];
  144. }
  145. */
  146. #ifdef PROFILE_STORAGE
  147. printf_with_rtclock_diff(start_dest, "end setting dests (%u)\n", stg_size);
  148. #endif
  149. #ifdef PROFILE_STORAGE
  150. unsigned long start_expand = printf_with_rtclock("begin ORExpand (%u)\n", stg_size);
  151. #endif
  152. ORExpand_parallel<OSWAP_16X>(storage_state.stg_buf.buf, dests,
  153. msg_size, stg_size, g_teems_config.nthreads);
  154. #ifdef PROFILE_STORAGE
  155. printf_with_rtclock_diff(start_expand, "end ORExpand (%u)\n", stg_size);
  156. #endif
  157. /*
  158. for (uint32_t i=0;i<stg_size; ++i) {
  159. printf("%3d: %08x %08x %u\n", i,
  160. *(uint32_t*)(storage_state.stg_buf.buf+(i*msg_size)),
  161. *(uint32_t*)(storage_state.stg_buf.buf+(i*msg_size+4))),
  162. dests[i];
  163. }
  164. */
  165. // You can do more processing after these lines, as long as they
  166. // don't touch storage_buf. They _can_ touch the backing buffer
  167. // storage_state.stg_buf.
  168. storage_buf.reset();
  169. pthread_mutex_unlock(&storage_buf.mutex);
  170. storage_state.stg_buf.reset();
  171. #ifdef PROFILE_STORAGE
  172. printf_with_rtclock_diff(start_received, "end storage_received (%u)\n", storage_buf.inserted);
  173. #endif
  174. }