|
@@ -14,6 +14,9 @@ static struct {
|
|
|
MsgBuffer stg_buf;
|
|
|
// The destination vector for ORExpand
|
|
|
std::vector<uint32_t> dest;
|
|
|
+ // The selected array for compaction during public routing
|
|
|
+ // Need an bool array for compaction, and std:vector<bool> lacks .data()
|
|
|
+ bool *pub_selected;
|
|
|
} storage_state;
|
|
|
|
|
|
// route_init will call this function; no one else should call it
|
|
@@ -25,6 +28,7 @@ bool storage_init(uint32_t max_users, uint32_t msg_buf_size)
|
|
|
storage_state.max_users = max_users;
|
|
|
storage_state.stg_buf.alloc(msg_buf_size);
|
|
|
storage_state.dest.resize(msg_buf_size);
|
|
|
+ storage_state.pub_selected = new bool[msg_buf_size];
|
|
|
uint32_t my_storage_node_id = 0;
|
|
|
for (nodenum_t i=0; i<g_teems_config.num_nodes; ++i) {
|
|
|
if (g_teems_config.roles[i] & ROLE_STORAGE) {
|
|
@@ -38,6 +42,10 @@ bool storage_init(uint32_t max_users, uint32_t msg_buf_size)
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
+void storage_close() {
|
|
|
+ delete[] storage_state.pub_selected;
|
|
|
+}
|
|
|
+
|
|
|
// Handle the messages received by a storage node. Pass a _locked_
|
|
|
// MsgBuffer. This function will itself reset and unlock it when it's
|
|
|
// done with it.
|
|
@@ -109,32 +117,38 @@ void storage_received(MsgBuffer &storage_buf)
|
|
|
printf_with_rtclock_diff(start_sort, "end oblivious sort (%u)\n", storage_buf.inserted);
|
|
|
#endif
|
|
|
|
|
|
- // For public routing, remove excess per-user messages and compact
|
|
|
+ // For public routing, remove excess per-user messages by making them
|
|
|
+ // padding, and then compact non-padding messages.
|
|
|
if (!g_teems_config.private_routing) {
|
|
|
- bool *selected = new bool[num_msgs];
|
|
|
uint8_t *msg = storage_state.stg_buf.buf;
|
|
|
uint32_t uid;
|
|
|
- uint32_t prev_uid = uid;
|
|
|
- uint32_t num_user_msgs = 0; // number of messages seen to the user
|
|
|
+ uint32_t prev_uid = uid_mask; // initialization technically unnecessary
|
|
|
+ uint32_t num_user_msgs = 0; // number of messages to the user
|
|
|
uint8_t sel;
|
|
|
for (uint32_t i=0; i<num_msgs; ++i) {
|
|
|
- uid = *(uint32_t*) msg;
|
|
|
- uid &= uid_mask;
|
|
|
+ uid = (*(uint32_t*) msg) &= uid_mask;
|
|
|
+ num_user_msgs = oselect_uint32_t(1, num_user_msgs+1,
|
|
|
+ uid == prev_uid);
|
|
|
+ // Select if messages per user not exceeded and msg is not padding
|
|
|
sel = ((uint8_t) ((num_user_msgs <= g_teems_config.m_pub_in))) &
|
|
|
((uint8_t) uid != uid_mask);
|
|
|
- // Make padding if too many messages for user
|
|
|
+ storage_state.pub_selected[i] = (bool) sel;
|
|
|
+ // Make padding if not selected
|
|
|
*(uint32_t *) msg = (*(uint32_t *) msg) & nid_mask;
|
|
|
*(uint32_t *) msg += oselect_uint32_t(uid_mask, uid, sel);
|
|
|
- // Mark as selected only if messages per user not exceeded
|
|
|
- selected[i] = (bool) oselect_uint32_t(0, 1, sel);
|
|
|
- prev_uid = uid;
|
|
|
- num_user_msgs = oselect_uint32_t(1, num_user_msgs+1,
|
|
|
- uid == prev_uid);
|
|
|
msg += msg_size;
|
|
|
+ prev_uid = uid;
|
|
|
}
|
|
|
- TightCompact_parallel((unsigned char *) storage_state.stg_buf.buf,
|
|
|
- num_msgs, msg_size, selected, g_teems_config.nthreads);
|
|
|
- delete[] selected;
|
|
|
+ #ifdef PROFILE_STORAGE
|
|
|
+ unsigned long start_compaction = printf_with_rtclock("begin public-channel compaction (%u)\n", num_msgs);
|
|
|
+ #endif
|
|
|
+ TightCompact_parallel<OSWAP_16X>(
|
|
|
+ (unsigned char *) storage_state.stg_buf.buf,
|
|
|
+ num_msgs, msg_size, storage_state.pub_selected,
|
|
|
+ g_teems_config.nthreads);
|
|
|
+ #ifdef PROFILE_STORAGE
|
|
|
+ printf_with_rtclock_diff(start_compaction, "end public-channel compaction (%u)\n", num_msgs);
|
|
|
+ #endif
|
|
|
}
|
|
|
|
|
|
/*
|