Browse Source

Finished omap, nearly finished load balancer.

Kyle Fredrickson 1 year ago
parent
commit
2013fdb97b

+ 45 - 0
sparta/Cargo.lock

@@ -2,12 +2,55 @@
 # It is not intended for manual editing.
 version = 3
 
+[[package]]
+name = "arrayref"
+version = "0.3.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545"
+
+[[package]]
+name = "arrayvec"
+version = "0.7.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711"
+
+[[package]]
+name = "blake3"
+version = "1.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "30cca6d3674597c30ddf2c587bf8d9d65c9a84d2326d941cc79c9842dfe0ef52"
+dependencies = [
+ "arrayref",
+ "arrayvec",
+ "cc",
+ "cfg-if",
+ "constant_time_eq",
+]
+
 [[package]]
 name = "cc"
 version = "1.0.98"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "41c270e7540d725e65ac7f1b212ac8ce349719624d7bcff99f8e2e488e8cf03f"
 
+[[package]]
+name = "cfg-if"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
+
+[[package]]
+name = "constant_time_eq"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2"
+
+[[package]]
+name = "fastapprox"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9dfa3c0fd35278e839805680f4c2f673ca71eb91068115b4a611e71429bc0c46"
+
 [[package]]
 name = "omq"
 version = "0.1.0"
@@ -26,6 +69,8 @@ dependencies = [
 name = "sparta"
 version = "0.1.0"
 dependencies = [
+ "blake3",
+ "fastapprox",
  "omq",
  "otils",
 ]

+ 2 - 0
sparta/Cargo.toml

@@ -6,6 +6,8 @@ edition = "2021"
 [dependencies]
 omq = { path = "../omq" }
 otils = { path = "../otils" }
+blake3 = "1.5.1"
+fastapprox = "0.3.1"
 
 [package.metadata.fortanix-sgx]
 stack-size=0x400000

+ 267 - 0
sparta/src/load_balancer.rs

@@ -0,0 +1,267 @@
+pub use crate::record::{Record, SubmapRequest};
+use crate::ObliviousMap;
+use fastapprox::fast;
+use otils::{self, ObliviousOps};
+use std::{cmp, f64::consts::E};
+
+const LAMBDA: usize = 128;
+
+pub struct BalanceRecord(Record);
+
+impl BalanceRecord {}
+
+impl PartialEq for BalanceRecord {
+    fn eq(&self, other: &Self) -> bool {}
+}
+
+impl PartialOrd for BalanceRecord {
+    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {}
+}
+
+pub struct LoadBalancer {
+    num_users: i64,
+    num_threads: usize,
+    num_submaps: usize,
+
+    pub user_store: Vec<Record>,
+    pub omaps: Vec<ObliviousMap>,
+}
+
+impl LoadBalancer {
+    pub fn new(num_users: i64, num_threads: usize, num_submaps: usize) -> Self {
+        let mut user_store = Vec::new();
+        user_store.reserve(num_users as usize);
+        user_store.extend((0..num_users).map(|i| Record::new(i)));
+
+        let mut omaps = Vec::new();
+        omaps.reserve(num_submaps as usize);
+        omaps.extend(
+            (0..num_submaps).map(|_| ObliviousMap::new(num_threads / num_submaps as usize)),
+        );
+
+        LoadBalancer {
+            num_users,
+            num_threads,
+            num_submaps,
+            user_store,
+            omaps,
+        }
+    }
+
+    fn pad_size(&self, num_requests: f64) -> usize {
+        let num_submaps = self.num_submaps as f64;
+        let mu = num_requests / num_submaps;
+        let gamma = (num_submaps + 2_f64.powf(LAMBDA as f64)).ln();
+        let rhs = (gamma / mu - 1_f64) / E;
+        num_requests
+            .min(mu * E.powf(fast::lambertw(rhs as f32) as f64 + 1_f64))
+            .ceil() as usize
+    }
+
+    fn update_with_sends(&mut self, sends: Vec<Record>) {
+        let mut size = (self.user_store.len() + sends.len()).next_power_of_two();
+        size -= self.user_store.len();
+        self.user_store.reserve(size);
+        size -= sends.len();
+
+        self.user_store.extend(sends);
+
+        self.user_store.extend((0..size).map(|_| Record::max()));
+    }
+
+    fn update_with_fetches(&mut self, fetches: Vec<Record>, num_fetches: usize) {
+        let mut size = (self.user_store.len() + num_fetches).next_power_of_two();
+        size -= self.user_store.len();
+        self.user_store.reserve(size);
+
+        size -= num_fetches;
+        for fetch in fetches.into_iter() {
+            self.user_store.extend(fetch.dummies());
+        }
+
+        self.user_store.extend((0..size).map(|_| Record::max()));
+    }
+
+    fn construct_send_indices(&mut self) {
+        let mut idx: u32 = 0;
+        let mut is_same_u: bool;
+
+        let mut user_store_iter = self.user_store.iter_mut().peekable();
+        while let Some(record) = user_store_iter.next() {
+            let is_user_store = record.is_user_store();
+
+            idx = u32::oselect(
+                is_user_store,
+                cmp::max(record.last_fetch, record.last_send),
+                idx + 1,
+            );
+
+            record.idx = u32::oselect(is_user_store, 0, record.get_idx(idx));
+            record.map = (record.idx % self.num_submaps) as u8;
+            record.last_send = idx;
+
+            if let Some(next_record) = user_store_iter.peek() {
+                is_same_u = record.uid == next_record.uid;
+            } else {
+                is_same_u = false;
+            }
+            record.mark = u16::oselect(is_same_u, 0, 1);
+        }
+    }
+
+    fn construct_fetch_indices(&mut self) {
+        let mut idx: u32 = 0;
+        let mut is_same_u: bool;
+
+        let mut user_store_iter = self.user_store.iter_mut().peekable();
+        while let Some(record) = user_store_iter.next() {
+            let is_user_store = record.is_user_store();
+
+            idx = u32::oselect(is_user_store, record.last_fetch, idx + 1);
+
+            record.idx = u32::oselect(is_user_store, 0, record.get_idx(idx));
+            record.map = (record.idx % self.num_submaps) as u8;
+            record.last_fetch = idx;
+
+            if let Some(next_record) = user_store_iter.peek() {
+                is_same_u = record.uid == next_record.uid;
+            } else {
+                is_same_u = false;
+            }
+            record.mark = u16::oselect(is_same_u, 0, 1);
+        }
+    }
+
+    pub fn get_send_requests(&mut self, sends: Vec<Record>) -> Vec<Record> {
+        let num_requests = sends.len();
+        self.update_with_sends(sends);
+
+        otils::sort(&mut self.user_store[..], self.num_threads);
+        self.construct_send_indices();
+
+        otils::compact(
+            &mut self.user_store[..],
+            |r| r.is_request(),
+            self.num_threads,
+        );
+        let requests = self.user_store[0..num_requests].to_vec();
+
+        otils::compact(
+            &mut self.user_store[..],
+            |r| r.is_new_user_store(),
+            self.num_threads,
+        );
+        self.user_store.truncate(self.num_users as usize);
+        self.user_store.iter_mut().for_each(|r| {
+            r.set_user_store();
+        });
+
+        requests
+    }
+
+    pub fn get_fetch_requests(&mut self, fetches: Vec<Record>) -> Vec<Record> {
+        let num_requests = fetches
+            .iter()
+            .fold(0, |acc, fetch| acc + fetch.data as usize);
+        self.update_with_fetches(fetches, num_requests);
+
+        otils::sort(&mut self.user_store[..], self.num_threads);
+        self.construct_fetch_indices();
+
+        otils::compact(
+            &mut self.user_store[..],
+            |r| r.is_request(),
+            self.num_threads,
+        );
+        let deliver = self.user_store[0..num_requests].to_vec();
+
+        otils::compact(
+            &mut self.user_store[..],
+            |r| r.is_new_user_store(),
+            self.num_threads,
+        );
+        self.user_store.truncate(self.num_users as usize);
+        self.user_store.iter_mut().for_each(|r| {
+            r.set_user_store();
+        });
+
+        deliver
+    }
+
+    pub fn pad_for_submap(&self, requests: Vec<Record>, submap_size: usize) -> Vec<SubmapRequest> {
+        let num_submaps = self.num_submaps as usize;
+        let mut remaining = (requests.len() + num_submaps * submap_size).next_power_of_two();
+        remaining -= requests.len();
+
+        let mut requests: Vec<SubmapRequest> = requests.into_iter().map(|r| r.into()).collect();
+        requests.reserve(remaining);
+
+        for idx in 0..num_submaps {
+            requests.extend(SubmapRequest::dummies(
+                submap_size,
+                idx as u32,
+                self.num_submaps,
+            ));
+        }
+        remaining -= num_submaps * submap_size;
+        requests.extend((0..remaining).map(|_| Record::max().into()));
+        requests
+    }
+
+    pub fn get_submap_requests(
+        &self,
+        requests: Vec<Record>,
+        submap_size: usize,
+    ) -> Vec<SubmapRequest> {
+        let mut requests = self.pad_for_submap(requests, submap_size);
+
+        otils::sort(&mut requests[..], self.num_threads); // sort by omap, then by dummy
+
+        let mut prev_map = self.num_submaps;
+        let mut remaining_marks = submap_size as i32;
+        for request in requests.iter_mut() {
+            let submap = request.value.map as u32;
+            remaining_marks = i32::oselect(submap != prev_map, submap_size as i32, remaining_marks);
+            request.value.mark = u16::oselect(remaining_marks > 0, 1, 0);
+            remaining_marks += i32::oselect(remaining_marks > 0, -1, 0);
+            prev_map = submap;
+        }
+
+        otils::compact(&mut requests[..], |r| r.value.mark == 1, self.num_threads);
+        requests
+    }
+
+    pub fn batch_send(&mut self, sends: Vec<Record>) {
+        let requests = self.get_send_requests(sends);
+        let submap_size = self.pad_size(requests.len() as f64);
+        let mut requests: Vec<Record> = self
+            .get_submap_requests(requests, submap_size)
+            .into_iter()
+            .map(|r| r.value)
+            .collect();
+
+        for idx in 0..self.num_submaps {
+            let batch = requests.drain(0..submap_size).collect();
+            self.omaps[idx].batch_send(batch);
+        }
+    }
+
+    pub fn batch_fetch(&mut self, fetches: Vec<Record>) -> Vec<Record> {
+        let requests = self.get_fetch_requests(fetches);
+        let submap_size = self.pad_size(requests.len() as f64);
+        let mut requests: Vec<Record> = self
+            .get_submap_requests(requests, submap_size)
+            .into_iter()
+            .map(|r| r.value)
+            .collect();
+
+        let mut responses: Vec<Record> = Vec::new();
+        responses.reserve(submap_size * self.num_submaps);
+        for idx in 0..self.num_submaps {
+            let batch = requests.drain(0..submap_size).collect();
+            responses.extend(self.omaps[idx].batch_fetch(batch));
+        }
+
+        responses
+    }
+}

+ 0 - 142
sparta/src/load_balancer/mod.rs

@@ -1,142 +0,0 @@
-mod user_record;
-use user_record::UserRecord;
-
-use omq::{Fetch, Send};
-use otils::{self, ObliviousOps};
-use std::cmp;
-
-pub struct LoadBalancer {
-    num_users: i64,
-    user_store: Vec<UserRecord>,
-}
-
-impl LoadBalancer {
-    pub fn new(num_users: i64) -> Self {
-        let mut user_store = Vec::new();
-        user_store.reserve(num_users as usize);
-        user_store.extend((0..num_users).map(|i| UserRecord::new(i)));
-        LoadBalancer {
-            num_users,
-            user_store,
-        }
-    }
-
-    fn update_with_sends(&mut self, sends: Vec<Send>) {
-        let mut size = (self.user_store.len() + sends.len()).next_power_of_two();
-        size -= self.user_store.len();
-        self.user_store.reserve(size);
-        size -= sends.len();
-
-        self.user_store
-            .extend(sends.into_iter().map(|s| UserRecord::from_send(s)));
-
-        self.user_store.extend((0..size).map(|_| UserRecord::max()));
-    }
-
-    fn construct_send_indices(&mut self) {
-        let mut idx: u32 = 0;
-        let mut is_same_u: bool;
-
-        let mut user_store_iter = self.user_store.iter_mut().peekable();
-        while let Some(record) = user_store_iter.next() {
-            let is_user_store = record.is_user_store();
-
-            idx = u32::oselect(
-                is_user_store,
-                cmp::max(record.last_fetch, record.last_send),
-                idx + 1,
-            );
-
-            record.idx = u32::oselect(is_user_store, 0, idx);
-            record.last_send = idx;
-
-            if let Some(next_record) = user_store_iter.peek() {
-                is_same_u = record.uid == next_record.uid;
-            } else {
-                is_same_u = false;
-            }
-            record.mark = u16::oselect(is_same_u, 0, 1);
-        }
-    }
-
-    pub fn get_send_requests(&mut self, sends: Vec<Send>) -> Vec<UserRecord> {
-        let num_requests = sends.len();
-        self.update_with_sends(sends);
-
-        otils::osort(&mut self.user_store[..], 8);
-        self.construct_send_indices();
-
-        otils::ocompact(&mut self.user_store[..], |r| r.is_request(), 8);
-        let requests = self.user_store[0..num_requests].to_vec();
-
-        otils::ocompact(&mut self.user_store[..], |r| r.is_new_user_store(), 8);
-        self.user_store.truncate(self.num_users as usize);
-        self.user_store.iter_mut().for_each(|r| {
-            r.set_user_store();
-        });
-        for record in self.user_store.iter() {
-            println!("{:?}", record);
-        }
-        requests
-    }
-
-    fn update_with_fetches(&mut self, fetches: Vec<Fetch>, num_fetches: usize) {
-        let mut size = (self.user_store.len() + num_fetches).next_power_of_two();
-        size -= self.user_store.len();
-        self.user_store.reserve(size);
-
-        size -= num_fetches;
-        for fetch in fetches.into_iter() {
-            self.user_store.extend(UserRecord::from_fetch(fetch));
-        }
-
-        self.user_store.extend((0..size).map(|_| UserRecord::max()));
-    }
-
-    fn construct_fetch_indices(&mut self) {
-        let mut idx: u32 = 0;
-        let mut is_same_u: bool;
-
-        let mut user_store_iter = self.user_store.iter_mut().peekable();
-        while let Some(record) = user_store_iter.next() {
-            let is_user_store = record.is_user_store();
-
-            idx = u32::oselect(is_user_store, record.last_fetch, idx + 1);
-
-            record.idx = u32::oselect(is_user_store, 0, idx);
-            record.last_fetch = idx;
-
-            if let Some(next_record) = user_store_iter.peek() {
-                is_same_u = record.uid == next_record.uid;
-            } else {
-                is_same_u = false;
-            }
-            record.mark = u16::oselect(is_same_u, 0, 1);
-        }
-    }
-
-    pub fn get_fetch_requests(&mut self, fetches: Vec<Fetch>) -> Vec<UserRecord> {
-        let num_requests = fetches.iter().fold(0, |acc, x| acc + x.volume as usize);
-        self.update_with_fetches(fetches, num_requests);
-
-        otils::osort(&mut self.user_store[..], 8);
-        self.construct_fetch_indices();
-
-        otils::ocompact(&mut self.user_store[..], |r| r.is_request(), 8);
-        let deliver = self.user_store[0..num_requests].to_vec();
-
-        otils::ocompact(&mut self.user_store[..], |r| r.is_new_user_store(), 8);
-        self.user_store.truncate(self.num_users as usize);
-        self.user_store.iter_mut().for_each(|r| {
-            r.set_user_store();
-        });
-        for record in self.user_store.iter() {
-            println!("{:?}", record);
-        }
-        deliver
-    }
-
-    // pub fn batch_fetch(&mut self, fetches: Vec<Fetch>) -> Vec<Send> {
-    //     let deliver_size = fetches.iter().fold(0, |acc, x| acc + x.volume as usize);
-    // }
-}

+ 0 - 131
sparta/src/load_balancer/user_record.rs

@@ -1,131 +0,0 @@
-use omq::{Fetch, Send};
-use otils::{self, ObliviousOps};
-use std::cmp::Ordering;
-
-const USER_STORE: u16 = 0;
-const FETCH: u16 = 1;
-const SEND: u16 = 2;
-
-#[derive(Debug, Clone)]
-pub struct UserRecord {
-    pub uid: i64,
-    pub type_rec: u16,
-    pub mark: u16,
-    pub last_fetch: u32,
-    pub last_send: u32,
-    pub idx: u32,
-    pub message: u64,
-}
-
-impl UserRecord {
-    pub fn new(uid: i64) -> Self {
-        UserRecord {
-            uid,
-            type_rec: USER_STORE,
-            mark: 0,
-            last_fetch: 0,
-            last_send: 0,
-            idx: 0,
-            message: 0,
-        }
-    }
-
-    pub fn max() -> Self {
-        UserRecord {
-            uid: i64::MAX,
-            type_rec: USER_STORE,
-            mark: 0,
-            last_fetch: 0,
-            last_send: 0,
-            idx: 0,
-            message: 0,
-        }
-    }
-
-    pub fn from_fetch(f: Fetch) -> Vec<Self> {
-        (0..f.volume)
-            .map(|_| UserRecord {
-                uid: f.receiver,
-                type_rec: FETCH,
-                mark: 0,
-                last_fetch: 0,
-                last_send: 0,
-                idx: 0,
-                message: 0,
-            })
-            .collect()
-    }
-
-    pub fn from_send(s: Send) -> Self {
-        UserRecord {
-            uid: s.receiver,
-            type_rec: SEND,
-            mark: 0,
-            last_fetch: 0,
-            last_send: 0,
-            idx: 0,
-            message: s.message,
-        }
-    }
-
-    pub fn is_request(&self) -> bool {
-        self.type_rec != USER_STORE
-    }
-
-    pub fn is_new_user_store(&self) -> bool {
-        self.mark == 1 && self.uid != i64::MAX
-    }
-
-    pub fn is_user_store(&self) -> bool {
-        self.type_rec == USER_STORE
-    }
-
-    pub fn is_fetch(&self) -> bool {
-        self.type_rec == FETCH
-    }
-
-    pub fn set_user_store(&mut self) {
-        self.type_rec = USER_STORE;
-    }
-}
-
-impl PartialEq for UserRecord {
-    fn eq(&self, other: &Self) -> bool {
-        self.uid == other.uid && self.type_rec == other.type_rec
-    }
-}
-
-impl PartialOrd for UserRecord {
-    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
-        let user_ord = self.uid.partial_cmp(&other.uid);
-        let type_ord = self.type_rec.partial_cmp(&other.type_rec);
-        match user_ord {
-            Some(Ordering::Equal) => type_ord,
-            x => x,
-        }
-    }
-}
-
-impl ObliviousOps for UserRecord {
-    fn oselect(cond: bool, a: Self, b: Self) -> Self {
-        UserRecord {
-            uid: i64::oselect(cond, a.uid, b.uid),
-            type_rec: u16::oselect(cond, a.type_rec, b.type_rec),
-            mark: u16::oselect(cond, a.mark, b.mark),
-            last_fetch: u32::oselect(cond, a.last_fetch, b.last_fetch),
-            last_send: u32::oselect(cond, a.last_send, b.last_send),
-            idx: u32::oselect(cond, a.idx, b.idx),
-            message: u64::oselect(cond, a.message, b.message),
-        }
-    }
-
-    fn oswap(cond: bool, a: &mut Self, b: &mut Self) {
-        i64::oswap(cond, &mut a.uid, &mut b.uid);
-        u16::oswap(cond, &mut a.type_rec, &mut b.type_rec);
-        u16::oswap(cond, &mut a.mark, &mut b.mark);
-        u32::oswap(cond, &mut a.last_fetch, &mut b.last_fetch);
-        u32::oswap(cond, &mut a.last_send, &mut b.last_send);
-        u32::oswap(cond, &mut a.idx, &mut b.idx);
-        u64::oswap(cond, &mut a.message, &mut b.message);
-    }
-}

+ 11 - 11
sparta/src/main.rs

@@ -1,22 +1,22 @@
 mod load_balancer;
+mod omap;
+mod record;
 use load_balancer::LoadBalancer;
-use omq::{Fetch, Send};
+use omap::ObliviousMap;
+use record::Record;
 
 fn main() {
-    let mut l = LoadBalancer::new(5);
-    let sends: Vec<Send> = (0..3)
-        .map(|x| Send::new(x, x.try_into().unwrap()))
+    let mut l = LoadBalancer::new(5, 8, 4);
+    let sends: Vec<Record> = (0..3)
+        .map(|x| Record::new_send(x, x.try_into().unwrap()))
         .collect();
 
-    let fetches: Vec<Fetch> = vec![Fetch::new(0, 3)];
+    // l.batch_send(sends);
 
-    let indices = l.get_fetch_requests(fetches);
-    for i in indices.iter() {
-        println!("{:?}", i);
-    }
+    let fetches: Vec<Record> = vec![Record::new_fetch(0, 3)];
 
-    let indices = l.get_send_requests(sends);
+    let indices = l.batch_fetch(fetches);
     for i in indices.iter() {
-        println!("{:?}", i);
+        // println!("{:?}", i);
     }
 }

+ 117 - 0
sparta/src/omap.rs

@@ -0,0 +1,117 @@
+use crate::record::{Record, RecordType};
+use otils::ObliviousOps;
+use std::cmp::Ordering;
+
+struct MapRecord(Record);
+
+impl MapRecord {
+    fn dummies(len: usize) -> Vec<Self> {
+        (0..len).map(|_| MapRecord(Record::max())).collect()
+    }
+
+    fn fetch_pad(record: Record) -> Self {
+        let mut record = Self(record);
+        record.0.type_rec = RecordType::Send;
+        record
+    }
+
+    fn should_deliver(&self) -> bool {
+        !self.0.is_fetch() && self.0.mark == 1
+    }
+
+    fn should_defer(&self) -> bool {
+        !self.0.is_fetch() && self.0.mark == 0
+    }
+}
+
+impl PartialEq for MapRecord {
+    fn eq(&self, other: &Self) -> bool {
+        self.0.idx == other.0.idx && self.0.type_rec == other.0.type_rec
+    }
+}
+
+impl PartialOrd for MapRecord {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        let idx_ord = self.0.idx.partial_cmp(&other.0.idx);
+        let type_ord = self.0.type_rec.partial_cmp(&other.0.type_rec);
+        match idx_ord {
+            Some(Ordering::Equal) => type_ord,
+            x => x,
+        }
+    }
+}
+
+pub struct ObliviousMap {
+    num_threads: usize,
+    message_store: Vec<MapRecord>,
+}
+
+impl ObliviousMap {
+    pub fn new(num_threads: usize) -> Self {
+        let message_store = Vec::new();
+        ObliviousMap {
+            num_threads,
+            message_store,
+        }
+    }
+
+    pub fn batch_send(&mut self, requests: Vec<Record>) {
+        self.message_store.reserve(requests.len());
+        self.message_store
+            .extend(requests.into_iter().map(|r| MapRecord(r)));
+    }
+
+    fn update_with_fetches(&mut self, requests: Vec<Record>) {
+        let mut remaining = (self.message_store.len() + 2 * requests.len()).next_power_of_two();
+        remaining -= self.message_store.len() + 2 * requests.len();
+        self.message_store.reserve(remaining);
+
+        // add padding for fetches
+        self.message_store.extend(
+            requests
+                .iter()
+                .map(|record| MapRecord::fetch_pad(record.clone())),
+        );
+
+        // add fetches
+        self.message_store
+            .extend(requests.into_iter().map(|r| MapRecord(r)));
+
+        // add padding to next power of two
+        self.message_store.extend(MapRecord::dummies(remaining));
+    }
+
+    pub fn batch_fetch(&mut self, requests: Vec<Record>) -> Vec<Record> {
+        let original_size = self.message_store.len();
+        let num_requests = requests.len();
+
+        self.update_with_fetches(requests);
+
+        otils::sort(&mut self.message_store[..], self.num_threads);
+
+        let mut prev_fetch = 0;
+        for record in self.message_store.iter_mut() {
+            record.0.mark = u16::oselect(prev_fetch == 1, 1, 0);
+            prev_fetch = i32::oselect(record.0.is_fetch(), 1, 0)
+        }
+
+        otils::compact(
+            &mut self.message_store[..],
+            |record| record.should_deliver(),
+            self.num_threads,
+        );
+        let response: Vec<Record> = self
+            .message_store
+            .drain(0..num_requests)
+            .map(|r| r.0)
+            .collect();
+
+        otils::compact(
+            &mut self.message_store[..],
+            |record| record.should_defer(),
+            self.num_threads,
+        );
+        self.message_store.truncate(original_size);
+        response
+    }
+}

+ 179 - 0
sparta/src/record.rs

@@ -0,0 +1,179 @@
+use blake3;
+use std::cmp::Ordering;
+
+#[derive(Clone, Debug, PartialEq, PartialOrd)]
+pub enum RecordType {
+    User,
+    Fetch,
+    Send,
+    Dummy,
+}
+
+#[derive(Debug, Clone)]
+pub struct Record {
+    pub uid: i64,
+
+    pub map: u8,
+    pub type_rec: RecordType,
+    pub mark: u16,
+    pub idx: u32,
+
+    pub last_fetch: u32,
+    pub last_send: u32,
+
+    pub data: u64,
+
+    pub _dum: [u64; 12],
+}
+
+impl Record {
+    pub fn new(uid: i64) -> Self {
+        Record {
+            uid,
+            map: 0,
+            type_rec: RecordType::User,
+            mark: 0,
+            idx: 0,
+            last_fetch: 0,
+            last_send: 0,
+            data: 0,
+            _dum: [0; 12],
+        }
+    }
+
+    pub fn new_send(uid: i64, message: u64) -> Self {
+        Record {
+            uid,
+            map: 0,
+            type_rec: RecordType::Send,
+            mark: 0,
+            idx: 0,
+            last_fetch: 0,
+            last_send: 0,
+            data: message,
+            _dum: [0; 12],
+        }
+    }
+
+    pub fn new_fetch(uid: i64, volume: u64) -> Self {
+        Record {
+            uid,
+            map: 0,
+            type_rec: RecordType::Fetch,
+            mark: 0,
+            idx: 0,
+            last_fetch: 0,
+            last_send: 0,
+            data: volume,
+            _dum: [0; 12],
+        }
+    }
+
+    pub fn dummies(&self) -> Vec<Self> {
+        (0..self.data)
+            .map(|_| Record::new_fetch(self.uid, 0))
+            .collect()
+    }
+
+    pub fn max() -> Self {
+        Record {
+            uid: Self::max_uid(),
+            map: 0,
+            type_rec: RecordType::User,
+            mark: 0,
+            idx: 0,
+            last_fetch: 0,
+            last_send: 0,
+            data: 0,
+            _dum: [0; 12],
+        }
+    }
+
+    pub fn max_uid() -> i64 {
+        i64::MAX
+    }
+
+    pub fn is_request(&self) -> bool {
+        self.type_rec != RecordType::User
+    }
+
+    pub fn is_new_user_store(&self) -> bool {
+        self.mark == 1 && self.uid != i64::MAX
+    }
+
+    pub fn is_user_store(&self) -> bool {
+        self.type_rec == RecordType::User
+    }
+
+    pub fn is_fetch(&self) -> bool {
+        self.type_rec == RecordType::Fetch
+    }
+
+    pub fn set_user_store(&mut self) {
+        self.type_rec = RecordType::User;
+    }
+
+    pub fn get_idx(&mut self, idx: u32) -> u32 {
+        let mut hasher = blake3::Hasher::new();
+        hasher.update(&self.uid.to_ne_bytes());
+        hasher.update(&idx.to_ne_bytes());
+        let hash = hasher.finalize();
+        u32::from_ne_bytes(<[u8; 4]>::try_from(&hash.as_bytes()[0..4]).unwrap())
+    }
+}
+
+impl PartialEq for Record {
+    fn eq(&self, other: &Self) -> bool {
+        self.uid == other.uid && self.type_rec == other.type_rec
+    }
+}
+
+impl PartialOrd for Record {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        let user_ord = self.uid.partial_cmp(&other.uid);
+        let type_ord = self.type_rec.partial_cmp(&other.type_rec);
+        match user_ord {
+            Some(Ordering::Equal) => type_ord,
+            x => x,
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct SubmapRequest {
+    pub value: Record,
+}
+
+impl SubmapRequest {
+    pub fn dummies(num: usize, idx: u32, num_submaps: u32) -> Vec<Self> {
+        (0..num)
+            .map(|_| {
+                let mut m = Record::max();
+                m.map = (idx % num_submaps) as u8;
+                m.into()
+            })
+            .collect()
+    }
+}
+
+impl From<Record> for SubmapRequest {
+    fn from(value: Record) -> Self {
+        SubmapRequest { value }
+    }
+}
+
+impl PartialEq for SubmapRequest {
+    fn eq(&self, other: &Self) -> bool {
+        self.value.idx == other.value.idx && self.value.uid == other.value.uid
+    }
+}
+impl PartialOrd for SubmapRequest {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        let map_ord = self.value.map.partial_cmp(&other.value.map);
+        let uid_ord = self.value.uid.partial_cmp(&other.value.uid);
+        match map_ord {
+            Some(Ordering::Equal) => uid_ord,
+            x => x,
+        }
+    }
+}