|
@@ -19,9 +19,6 @@ static struct {
|
|
|
MsgBuffer stg_buf;
|
|
|
// The destination vector for ORExpand
|
|
|
std::vector<uint32_t> dest;
|
|
|
- // The selected array for compaction during public-channel routing
|
|
|
- // Need a bool array for compaction, and std:vector<bool> lacks .data()
|
|
|
- bool *pub_selected;
|
|
|
} storage_state;
|
|
|
|
|
|
static bool storage_generateClientKeys(uint32_t num_clients,
|
|
@@ -348,7 +345,6 @@ bool storage_init(uint32_t max_users, uint32_t msg_buf_size)
|
|
|
storage_state.max_users = max_users;
|
|
|
storage_state.stg_buf.alloc(msg_buf_size);
|
|
|
storage_state.dest.resize(msg_buf_size);
|
|
|
- storage_state.pub_selected = new bool[msg_buf_size];
|
|
|
uint32_t my_storage_node_id = 0;
|
|
|
uint32_t my_stg_pos = 0;
|
|
|
for (nodenum_t i=0; i<g_teems_config.num_nodes; ++i) {
|
|
@@ -369,7 +365,6 @@ bool storage_init(uint32_t max_users, uint32_t msg_buf_size)
|
|
|
}
|
|
|
|
|
|
void storage_close() {
|
|
|
- delete[] storage_state.pub_selected;
|
|
|
}
|
|
|
|
|
|
// Handle the messages received by a storage node. Pass a _locked_
|
|
@@ -448,43 +443,6 @@ void storage_received(MsgBuffer &storage_buf)
|
|
|
storage_buf.inserted);
|
|
|
#endif
|
|
|
|
|
|
- // For public-channel routing, remove excess per-user messages by
|
|
|
- // making them padding, and then compact non-padding messages.
|
|
|
- if (!g_teems_config.private_routing) {
|
|
|
- uint8_t *msg = storage_state.stg_buf.buf;
|
|
|
- uint32_t uid;
|
|
|
- uint32_t prev_uid = uid_mask; // initialization technically unnecessary
|
|
|
- uint32_t num_user_msgs = 0; // number of messages to the user
|
|
|
- uint8_t sel;
|
|
|
- for (uint32_t i=0; i<num_msgs; ++i) {
|
|
|
- uid = (*(uint32_t*) msg) & uid_mask;
|
|
|
- num_user_msgs = oselect_uint32_t(1, num_user_msgs+1,
|
|
|
- uid == prev_uid);
|
|
|
- // Select if messages per user not exceeded and msg is not padding
|
|
|
- sel = ((uint8_t) ((num_user_msgs <= g_teems_config.m_pub_in))) &
|
|
|
- ((uint8_t) uid != uid_mask);
|
|
|
- storage_state.pub_selected[i] = (bool) sel;
|
|
|
- // Make padding if not selected
|
|
|
- *(uint32_t *) msg = (*(uint32_t *) msg) & nid_mask;
|
|
|
- *(uint32_t *) msg += oselect_uint32_t(uid_mask, uid, sel);
|
|
|
- msg += msg_size;
|
|
|
- prev_uid = uid;
|
|
|
- }
|
|
|
- #ifdef PROFILE_STORAGE
|
|
|
- unsigned long start_compaction =
|
|
|
- printf_with_rtclock("begin public-channel compaction (%u)\n",
|
|
|
- num_msgs);
|
|
|
- #endif
|
|
|
- TightCompact_parallel<OSWAP_16X>(
|
|
|
- (unsigned char *) storage_state.stg_buf.buf,
|
|
|
- num_msgs, msg_size, storage_state.pub_selected,
|
|
|
- g_teems_config.nthreads);
|
|
|
- #ifdef PROFILE_STORAGE
|
|
|
- printf_with_rtclock_diff(start_compaction,
|
|
|
- "end public-channel compaction (%u)\n", num_msgs);
|
|
|
- #endif
|
|
|
- }
|
|
|
-
|
|
|
/*
|
|
|
for (uint32_t i=0;i<num_msgs; ++i) {
|
|
|
printf("%3d: %08x %08x\n", i,
|