storage.cpp 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. #include "utils.hpp"
  2. #include "config.hpp"
  3. #include "storage.hpp"
  4. #include "ORExpand.hpp"
  5. static struct {
  6. // A local storage buffer, used when we need to do non-in-place
  7. // sorts of the messages that have arrived
  8. MsgBuffer stg_buf;
  9. } storage_state;
  10. // route_init will call this function; no one else should call it
  11. // explicitly. The parameter is the number of messages that can fit in
  12. // the storage-side MsgBuffer. Returns true on success, false on
  13. // failure.
  14. bool storage_init(uint32_t msg_buf_size)
  15. {
  16. storage_state.stg_buf.alloc(msg_buf_size);
  17. return true;
  18. }
  19. // Handle the messages received by a storage node. Pass a _locked_
  20. // MsgBuffer. This function will itself reset and unlock it when it's
  21. // done with it.
  22. void storage_received(MsgBuffer &storage_buf)
  23. {
  24. // A dummy function for now that just counts how many real and
  25. // padding messages arrived
  26. uint16_t msg_size = g_teems_config.msg_size;
  27. nodenum_t my_node_num = g_teems_config.my_node_num;
  28. const uint8_t *msgs = storage_buf.buf;
  29. uint32_t num_msgs = storage_buf.inserted;
  30. uint32_t real = 0, padding = 0;
  31. uint32_t uid_mask = (1 << DEST_UID_BITS) - 1;
  32. printf("Storage server received %u messages:\n", num_msgs);
  33. for (uint32_t i=0; i<num_msgs; ++i) {
  34. uint32_t dest_addr = *(const uint32_t*)msgs;
  35. nodenum_t dest_node =
  36. g_teems_config.storage_map[dest_addr >> DEST_UID_BITS];
  37. if (dest_node != my_node_num) {
  38. char hexbuf[2*msg_size + 1];
  39. for (uint32_t j=0;j<msg_size;++j) {
  40. snprintf(hexbuf+2*j, 3, "%02x", msgs[j]);
  41. }
  42. printf("Misrouted message: %s\n", hexbuf);
  43. } else if ((dest_addr & uid_mask) == uid_mask) {
  44. ++padding;
  45. } else {
  46. ++real;
  47. }
  48. msgs += msg_size;
  49. }
  50. printf("%u real, %u padding\n", real, padding);
  51. storage_buf.reset();
  52. pthread_mutex_unlock(&storage_buf.mutex);
  53. }