|
@@ -1,21 +1,40 @@
|
|
|
#include "utils.hpp"
|
|
|
#include "config.hpp"
|
|
|
-#include "storage.hpp"
|
|
|
#include "ORExpand.hpp"
|
|
|
+#include "sort.hpp"
|
|
|
+#include "storage.hpp"
|
|
|
+
|
|
|
+#define PROFILE_STORAGE
|
|
|
|
|
|
static struct {
|
|
|
+ uint32_t max_users;
|
|
|
+ uint32_t my_storage_node_id;
|
|
|
// A local storage buffer, used when we need to do non-in-place
|
|
|
// sorts of the messages that have arrived
|
|
|
MsgBuffer stg_buf;
|
|
|
+ // The destination vector for ORExpand
|
|
|
+ std::vector<uint32_t> dest;
|
|
|
} storage_state;
|
|
|
|
|
|
// route_init will call this function; no one else should call it
|
|
|
// explicitly. The parameter is the number of messages that can fit in
|
|
|
// the storage-side MsgBuffer. Returns true on success, false on
|
|
|
// failure.
|
|
|
-bool storage_init(uint32_t msg_buf_size)
|
|
|
+bool storage_init(uint32_t max_users, uint32_t msg_buf_size)
|
|
|
{
|
|
|
+ storage_state.max_users = max_users;
|
|
|
storage_state.stg_buf.alloc(msg_buf_size);
|
|
|
+ storage_state.dest.resize(msg_buf_size);
|
|
|
+ uint32_t my_storage_node_id = 0;
|
|
|
+ for (nodenum_t i=0; i<g_teems_config.num_nodes; ++i) {
|
|
|
+ if (g_teems_config.roles[i] & ROLE_STORAGE) {
|
|
|
+ if (i == g_teems_config.my_node_num) {
|
|
|
+ storage_state.my_storage_node_id = my_storage_node_id << DEST_UID_BITS;
|
|
|
+ } else {
|
|
|
+ ++my_storage_node_id;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
return true;
|
|
|
}
|
|
|
|
|
@@ -24,15 +43,33 @@ bool storage_init(uint32_t msg_buf_size)
|
|
|
// done with it.
|
|
|
void storage_received(MsgBuffer &storage_buf)
|
|
|
{
|
|
|
- // A dummy function for now that just counts how many real and
|
|
|
- // padding messages arrived
|
|
|
uint16_t msg_size = g_teems_config.msg_size;
|
|
|
nodenum_t my_node_num = g_teems_config.my_node_num;
|
|
|
const uint8_t *msgs = storage_buf.buf;
|
|
|
uint32_t num_msgs = storage_buf.inserted;
|
|
|
uint32_t real = 0, padding = 0;
|
|
|
uint32_t uid_mask = (1 << DEST_UID_BITS) - 1;
|
|
|
+ uint32_t nid_mask = ~uid_mask;
|
|
|
|
|
|
+#ifdef PROFILE_STORAGE
|
|
|
+ unsigned long start_received = printf_with_rtclock("begin storage_received (%u)\n", storage_buf.inserted);
|
|
|
+#endif
|
|
|
+
|
|
|
+ // It's OK to test for errors in a way that's non-oblivous if
|
|
|
+ // there's an error (but it should be oblivious if there are no
|
|
|
+ // errors)
|
|
|
+ for (uint32_t i=0; i<num_msgs; ++i) {
|
|
|
+ uint32_t uid = *(const uint32_t*)(storage_buf.buf+(i*msg_size));
|
|
|
+ bool ok = ((((uid & nid_mask) == storage_state.my_storage_node_id)
|
|
|
+ & ((uid & uid_mask) < storage_state.max_users))
|
|
|
+ | ((uid & uid_mask) == uid_mask));
|
|
|
+ if (!ok) {
|
|
|
+ printf("Received bad uid: %08x\n", uid);
|
|
|
+ assert(ok);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Testing: report how many real and dummy messages arrived
|
|
|
printf("Storage server received %u messages:\n", num_msgs);
|
|
|
for (uint32_t i=0; i<num_msgs; ++i) {
|
|
|
uint32_t dest_addr = *(const uint32_t*)msgs;
|
|
@@ -53,6 +90,84 @@ void storage_received(MsgBuffer &storage_buf)
|
|
|
}
|
|
|
printf("%u real, %u padding\n", real, padding);
|
|
|
|
|
|
+ for (uint32_t i=0;i<num_msgs; ++i) {
|
|
|
+ printf("%3d: %08x %08x\n", i,
|
|
|
+ *(uint32_t*)(storage_buf.buf+(i*msg_size)),
|
|
|
+ *(uint32_t*)(storage_buf.buf+(i*msg_size+4)));
|
|
|
+ }
|
|
|
+ // Sort the received messages by userid into the
|
|
|
+ // storage_state.stg_buf MsgBuffer.
|
|
|
+#ifdef PROFILE_STORAGE
|
|
|
+ unsigned long start_sort = printf_with_rtclock("begin oblivious sort (%u)\n", storage_buf.inserted);
|
|
|
+#endif
|
|
|
+ sort_mtobliv<UidKey>(g_teems_config.nthreads, storage_buf.buf,
|
|
|
+ msg_size, storage_buf.inserted, storage_buf.bufsize,
|
|
|
+ storage_state.stg_buf.buf);
|
|
|
+#ifdef PROFILE_STORAGE
|
|
|
+ printf_with_rtclock_diff(start_sort, "end oblivious sort (%u)\n", storage_buf.inserted);
|
|
|
+#endif
|
|
|
+
|
|
|
+ for (uint32_t i=0;i<num_msgs; ++i) {
|
|
|
+ printf("%3d: %08x %08x\n", i,
|
|
|
+ *(uint32_t*)(storage_state.stg_buf.buf+(i*msg_size)),
|
|
|
+ *(uint32_t*)(storage_state.stg_buf.buf+(i*msg_size+4)));
|
|
|
+ }
|
|
|
+
|
|
|
+#ifdef PROFILE_STORAGE
|
|
|
+ unsigned long start_dest = printf_with_rtclock("begin setting dests (%u)\n", storage_state.stg_buf.bufsize);
|
|
|
+#endif
|
|
|
+ // Obliviously set the dest array
|
|
|
+ uint32_t *dests = storage_state.dest.data();
|
|
|
+ uint32_t stg_size = storage_state.stg_buf.bufsize;
|
|
|
+ const uint8_t *buf = storage_state.stg_buf.buf;
|
|
|
+ uint32_t m_priv_in = g_teems_config.m_priv_in;
|
|
|
+
|
|
|
+ uint32_t uid = *(uint32_t*)(buf);
|
|
|
+ // num_msgs is not a private value
|
|
|
+ if (num_msgs > 0) {
|
|
|
+ uid &= uid_mask;
|
|
|
+ dests[0] = oselect_uint32_t(uid * m_priv_in, 0xffffffff,
|
|
|
+ uid == uid_mask);
|
|
|
+ }
|
|
|
+ uint32_t prev_uid = uid;
|
|
|
+ for (uint32_t i=1; i<num_msgs; ++i) {
|
|
|
+ uid = *(uint32_t*)(buf + i*msg_size);
|
|
|
+ uid &= uid_mask;
|
|
|
+ dests[i] = oselect_uint32_t(
|
|
|
+ oselect_uint32_t(uid * m_priv_in, dests[i-1]+1, uid==prev_uid),
|
|
|
+ 0xffffffff, uid == uid_mask);
|
|
|
+ prev_uid = uid;
|
|
|
+ }
|
|
|
+ for (uint32_t i=num_msgs; i<stg_size; ++i) {
|
|
|
+ dests[i] = 0xffffffff;
|
|
|
+ *(uint32_t*)(buf + i*msg_size) = 0xffffffff;
|
|
|
+ }
|
|
|
+#ifdef PROFILE_STORAGE
|
|
|
+ printf_with_rtclock_diff(start_dest, "end setting dests (%u)\n", stg_size);
|
|
|
+#endif
|
|
|
+#ifdef PROFILE_STORAGE
|
|
|
+ unsigned long start_expand = printf_with_rtclock("begin ORExpand (%u)\n", stg_size);
|
|
|
+#endif
|
|
|
+ ORExpand_parallel<OSWAP_16X>(storage_state.stg_buf.buf, dests,
|
|
|
+ msg_size, stg_size, g_teems_config.nthreads);
|
|
|
+#ifdef PROFILE_STORAGE
|
|
|
+ printf_with_rtclock_diff(start_expand, "end ORExpand (%u)\n", stg_size);
|
|
|
+#endif
|
|
|
+ for (uint32_t i=0;i<stg_size; ++i) {
|
|
|
+ printf("%3d: %08x %08x\n", i,
|
|
|
+ *(uint32_t*)(storage_state.stg_buf.buf+(i*msg_size)),
|
|
|
+ *(uint32_t*)(storage_state.stg_buf.buf+(i*msg_size+4)));
|
|
|
+ }
|
|
|
+
|
|
|
+ // You can do more processing after these lines, as long as they
|
|
|
+ // don't touch storage_buf. They _can_ touch the backing buffer
|
|
|
+ // storage_state.stg_buf.
|
|
|
storage_buf.reset();
|
|
|
pthread_mutex_unlock(&storage_buf.mutex);
|
|
|
+
|
|
|
+ storage_state.stg_buf.reset();
|
|
|
+
|
|
|
+#ifdef PROFILE_STORAGE
|
|
|
+ printf_with_rtclock_diff(start_received, "end storage_received (%u)\n", storage_buf.inserted);
|
|
|
+#endif
|
|
|
}
|