12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 |
- #include "utils.hpp"
- #include "config.hpp"
- #include "storage.hpp"
- #include "ORExpand.hpp"
- static struct {
- // A local storage buffer, used when we need to do non-in-place
- // sorts of the messages that have arrived
- MsgBuffer stg_buf;
- } storage_state;
- // route_init will call this function; no one else should call it
- // explicitly. The parameter is the number of messages that can fit in
- // the storage-side MsgBuffer. Returns true on success, false on
- // failure.
- bool storage_init(uint32_t msg_buf_size)
- {
- storage_state.stg_buf.alloc(msg_buf_size);
- return true;
- }
- // Handle the messages received by a storage node. Pass a _locked_
- // MsgBuffer. This function will itself reset and unlock it when it's
- // done with it.
- void storage_received(MsgBuffer &storage_buf)
- {
- // A dummy function for now that just counts how many real and
- // padding messages arrived
- uint16_t msg_size = g_teems_config.msg_size;
- nodenum_t my_node_num = g_teems_config.my_node_num;
- const uint8_t *msgs = storage_buf.buf;
- uint32_t num_msgs = storage_buf.inserted;
- uint32_t real = 0, padding = 0;
- uint32_t uid_mask = (1 << DEST_UID_BITS) - 1;
- printf("Storage server received %u messages:\n", num_msgs);
- for (uint32_t i=0; i<num_msgs; ++i) {
- uint32_t dest_addr = *(const uint32_t*)msgs;
- nodenum_t dest_node =
- g_teems_config.storage_map[dest_addr >> DEST_UID_BITS];
- if (dest_node != my_node_num) {
- char hexbuf[2*msg_size + 1];
- for (uint32_t j=0;j<msg_size;++j) {
- snprintf(hexbuf+2*j, 3, "%02x", msgs[j]);
- }
- printf("Misrouted message: %s\n", hexbuf);
- } else if ((dest_addr & uid_mask) == uid_mask) {
- ++padding;
- } else {
- ++real;
- }
- msgs += msg_size;
- }
- printf("%u real, %u padding\n", real, padding);
- storage_buf.reset();
- pthread_mutex_unlock(&storage_buf.mutex);
- }
|