Browse Source

Added load balancer indexing.

Kyle Fredrickson 1 year ago
parent
commit
2dc40ceb40

+ 1 - 1
.gitignore

@@ -1 +1 @@
-*/target
+target/

+ 1 - 0
baseline/Cargo.lock

@@ -7,6 +7,7 @@ name = "baseline"
 version = "0.1.0"
 dependencies = [
  "omq",
+ "otils",
 ]
 
 [[package]]

+ 1 - 0
baseline/Cargo.toml

@@ -5,6 +5,7 @@ edition = "2021"
 
 [dependencies]
 omq = { path = "../omq" }
+otils = { path = "../otils" }
 
 [package.metadata.fortanix-sgx]
 stack-size=0x400000

+ 9 - 2
baseline/src/main.rs

@@ -2,12 +2,19 @@ use omq::{Fetch, Omq, Send};
 
 fn main() {
     let mut o = Omq::new();
-    let sends: Vec<Send> = (0..100)
+    let sends: Vec<Send> = (0..6)
         .map(|x| Send::new(0, x.try_into().unwrap()))
         .collect();
     o.batch_send(sends);
 
-    let fetches: Vec<Fetch> = vec![Fetch::new(0, 20)];
+    let fetches: Vec<Fetch> = vec![Fetch::new(0, 3)];
+
+    let deliver = o.batch_fetch(fetches);
+    for m in deliver {
+        println!("{:?}", m);
+    }
+
+    let fetches: Vec<Fetch> = vec![Fetch::new(0, 3)];
 
     let deliver = o.batch_fetch(fetches);
     for m in deliver {

+ 0 - 6
load_balancer/Cargo.toml

@@ -1,6 +0,0 @@
-[package]
-name = "load_balancer"
-version = "0.1.0"
-edition = "2021"
-
-[dependencies]

+ 0 - 14
load_balancer/src/lib.rs

@@ -1,14 +0,0 @@
-pub fn add(left: usize, right: usize) -> usize {
-    left + right
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[test]
-    fn it_works() {
-        let result = add(2, 2);
-        assert_eq!(result, 4);
-    }
-}

+ 8 - 8
omq/src/lib.rs

@@ -16,6 +16,13 @@ impl Omq {
         }
     }
 
+    pub fn batch_send(&mut self, sends: Vec<Send>) {
+        let requests = sends
+            .into_iter()
+            .map(|send| <Send as Into<Request>>::into(send));
+        self.message_store.extend(requests);
+    }
+
     fn update_store(&mut self, fetches: Vec<Fetch>, fetch_sum: usize) {
         let mut size = (self.message_store.len() + fetches.len() + fetch_sum).next_power_of_two();
 
@@ -34,14 +41,7 @@ impl Omq {
                 .map(|x| <Fetch as Into<Request>>::into(x)),
         );
 
-        self.message_store.extend(Request::dummies(-1, size)); // TODO this is hacky
-    }
-
-    pub fn batch_send(&mut self, sends: Vec<Send>) {
-        let requests = sends
-            .into_iter()
-            .map(|send| <Send as Into<Request>>::into(send));
-        self.message_store.extend(requests);
+        self.message_store.extend((0..size).map(|_| Request::max()));
     }
 
     pub fn batch_fetch(&mut self, fetches: Vec<Fetch>) -> Vec<Send> {

+ 3 - 0
omq/src/types/fetch.rs

@@ -6,6 +6,9 @@ pub struct Fetch {
 
 impl Fetch {
     pub fn new(receiver: i64, volume: usize) -> Self {
+        if receiver == i64::MAX {
+            panic!("Key out of bounds: {}", receiver);
+        }
         Fetch { receiver, volume }
     }
 }

+ 14 - 6
omq/src/types/request.rs

@@ -19,7 +19,7 @@ impl Request {
     pub fn dummies(receiver: i64, len: usize) -> Vec<Self> {
         (0..len)
             .map(|_| Request {
-                receiver: receiver,
+                receiver,
                 req_type: DUMMY,
                 mark: 0,
                 volume: 0,
@@ -28,6 +28,16 @@ impl Request {
             .collect()
     }
 
+    pub fn max() -> Self {
+        Request {
+            receiver: i64::MAX,
+            req_type: FETCH,
+            mark: 0,
+            volume: 0,
+            message: 0,
+        }
+    }
+
     pub fn is_fetch(&self) -> bool {
         self.req_type == FETCH
     }
@@ -37,7 +47,7 @@ impl Request {
     }
 
     pub fn should_defer(&self) -> bool {
-        self.receiver >= 0 && !self.is_fetch() && self.mark == 0 // this is also hacky
+        !self.is_fetch() && self.mark == 0
     }
 }
 
@@ -83,11 +93,9 @@ impl PartialOrd for Request {
         match receiver_ord {
             Some(Ordering::Equal) => match type_ord {
                 Some(Ordering::Equal) => vol_ord,
-                Some(x) => Some(x),
-                None => None,
+                x => x,
             },
-            Some(x) => Some(x),
-            None => None,
+            x => x,
         }
     }
 }

+ 3 - 0
omq/src/types/send.rs

@@ -9,6 +9,9 @@ pub struct Send {
 
 impl Send {
     pub fn new(receiver: i64, message: u64) -> Self {
+        if receiver == i64::MAX {
+            panic!("Key out of bounds: {}", receiver);
+        }
         Send { receiver, message }
     }
 }

+ 5 - 0
sparta/.cargo/config.toml

@@ -0,0 +1,5 @@
+[target.x86_64-fortanix-unknown-sgx]
+runner = "ftxsgx-runner-cargo"
+
+[build]
+target = "x86_64-fortanix-unknown-sgx"

+ 31 - 0
sparta/Cargo.lock

@@ -0,0 +1,31 @@
+# This file is automatically @generated by Cargo.
+# It is not intended for manual editing.
+version = 3
+
+[[package]]
+name = "cc"
+version = "1.0.98"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "41c270e7540d725e65ac7f1b212ac8ce349719624d7bcff99f8e2e488e8cf03f"
+
+[[package]]
+name = "omq"
+version = "0.1.0"
+dependencies = [
+ "otils",
+]
+
+[[package]]
+name = "otils"
+version = "0.1.0"
+dependencies = [
+ "cc",
+]
+
+[[package]]
+name = "sparta"
+version = "0.1.0"
+dependencies = [
+ "omq",
+ "otils",
+]

+ 7 - 0
sparta/Cargo.toml

@@ -4,3 +4,10 @@ version = "0.1.0"
 edition = "2021"
 
 [dependencies]
+omq = { path = "../omq" }
+otils = { path = "../otils" }
+
+[package.metadata.fortanix-sgx]
+stack-size=0x400000
+heap-size=0x100000000
+threads=9

+ 142 - 0
sparta/src/load_balancer/mod.rs

@@ -0,0 +1,142 @@
+mod user_record;
+use user_record::UserRecord;
+
+use omq::{Fetch, Send};
+use otils::{self, ObliviousOps};
+use std::cmp;
+
+pub struct LoadBalancer {
+    num_users: i64,
+    user_store: Vec<UserRecord>,
+}
+
+impl LoadBalancer {
+    pub fn new(num_users: i64) -> Self {
+        let mut user_store = Vec::new();
+        user_store.reserve(num_users as usize);
+        user_store.extend((0..num_users).map(|i| UserRecord::new(i)));
+        LoadBalancer {
+            num_users,
+            user_store,
+        }
+    }
+
+    fn update_with_sends(&mut self, sends: Vec<Send>) {
+        let mut size = (self.user_store.len() + sends.len()).next_power_of_two();
+        size -= self.user_store.len();
+        self.user_store.reserve(size);
+        size -= sends.len();
+
+        self.user_store
+            .extend(sends.into_iter().map(|s| UserRecord::from_send(s)));
+
+        self.user_store.extend((0..size).map(|_| UserRecord::max()));
+    }
+
+    fn construct_send_indices(&mut self) {
+        let mut idx: u32 = 0;
+        let mut is_same_u: bool;
+
+        let mut user_store_iter = self.user_store.iter_mut().peekable();
+        while let Some(record) = user_store_iter.next() {
+            let is_user_store = record.is_user_store();
+
+            idx = u32::oselect(
+                is_user_store,
+                cmp::max(record.last_fetch, record.last_send),
+                idx + 1,
+            );
+
+            record.idx = u32::oselect(is_user_store, 0, idx);
+            record.last_send = idx;
+
+            if let Some(next_record) = user_store_iter.peek() {
+                is_same_u = record.uid == next_record.uid;
+            } else {
+                is_same_u = false;
+            }
+            record.mark = u16::oselect(is_same_u, 0, 1);
+        }
+    }
+
+    pub fn get_send_requests(&mut self, sends: Vec<Send>) -> Vec<UserRecord> {
+        let num_requests = sends.len();
+        self.update_with_sends(sends);
+
+        otils::osort(&mut self.user_store[..], 8);
+        self.construct_send_indices();
+
+        otils::ocompact(&mut self.user_store[..], |r| r.is_request(), 8);
+        let requests = self.user_store[0..num_requests].to_vec();
+
+        otils::ocompact(&mut self.user_store[..], |r| r.is_new_user_store(), 8);
+        self.user_store.truncate(self.num_users as usize);
+        self.user_store.iter_mut().for_each(|r| {
+            r.set_user_store();
+        });
+        for record in self.user_store.iter() {
+            println!("{:?}", record);
+        }
+        requests
+    }
+
+    fn update_with_fetches(&mut self, fetches: Vec<Fetch>, num_fetches: usize) {
+        let mut size = (self.user_store.len() + num_fetches).next_power_of_two();
+        size -= self.user_store.len();
+        self.user_store.reserve(size);
+
+        size -= num_fetches;
+        for fetch in fetches.into_iter() {
+            self.user_store.extend(UserRecord::from_fetch(fetch));
+        }
+
+        self.user_store.extend((0..size).map(|_| UserRecord::max()));
+    }
+
+    fn construct_fetch_indices(&mut self) {
+        let mut idx: u32 = 0;
+        let mut is_same_u: bool;
+
+        let mut user_store_iter = self.user_store.iter_mut().peekable();
+        while let Some(record) = user_store_iter.next() {
+            let is_user_store = record.is_user_store();
+
+            idx = u32::oselect(is_user_store, record.last_fetch, idx + 1);
+
+            record.idx = u32::oselect(is_user_store, 0, idx);
+            record.last_fetch = idx;
+
+            if let Some(next_record) = user_store_iter.peek() {
+                is_same_u = record.uid == next_record.uid;
+            } else {
+                is_same_u = false;
+            }
+            record.mark = u16::oselect(is_same_u, 0, 1);
+        }
+    }
+
+    pub fn get_fetch_requests(&mut self, fetches: Vec<Fetch>) -> Vec<UserRecord> {
+        let num_requests = fetches.iter().fold(0, |acc, x| acc + x.volume as usize);
+        self.update_with_fetches(fetches, num_requests);
+
+        otils::osort(&mut self.user_store[..], 8);
+        self.construct_fetch_indices();
+
+        otils::ocompact(&mut self.user_store[..], |r| r.is_request(), 8);
+        let deliver = self.user_store[0..num_requests].to_vec();
+
+        otils::ocompact(&mut self.user_store[..], |r| r.is_new_user_store(), 8);
+        self.user_store.truncate(self.num_users as usize);
+        self.user_store.iter_mut().for_each(|r| {
+            r.set_user_store();
+        });
+        for record in self.user_store.iter() {
+            println!("{:?}", record);
+        }
+        deliver
+    }
+
+    // pub fn batch_fetch(&mut self, fetches: Vec<Fetch>) -> Vec<Send> {
+    //     let deliver_size = fetches.iter().fold(0, |acc, x| acc + x.volume as usize);
+    // }
+}

+ 131 - 0
sparta/src/load_balancer/user_record.rs

@@ -0,0 +1,131 @@
+use omq::{Fetch, Send};
+use otils::{self, ObliviousOps};
+use std::cmp::Ordering;
+
+const USER_STORE: u16 = 0;
+const FETCH: u16 = 1;
+const SEND: u16 = 2;
+
+#[derive(Debug, Clone)]
+pub struct UserRecord {
+    pub uid: i64,
+    pub type_rec: u16,
+    pub mark: u16,
+    pub last_fetch: u32,
+    pub last_send: u32,
+    pub idx: u32,
+    pub message: u64,
+}
+
+impl UserRecord {
+    pub fn new(uid: i64) -> Self {
+        UserRecord {
+            uid,
+            type_rec: USER_STORE,
+            mark: 0,
+            last_fetch: 0,
+            last_send: 0,
+            idx: 0,
+            message: 0,
+        }
+    }
+
+    pub fn max() -> Self {
+        UserRecord {
+            uid: i64::MAX,
+            type_rec: USER_STORE,
+            mark: 0,
+            last_fetch: 0,
+            last_send: 0,
+            idx: 0,
+            message: 0,
+        }
+    }
+
+    pub fn from_fetch(f: Fetch) -> Vec<Self> {
+        (0..f.volume)
+            .map(|_| UserRecord {
+                uid: f.receiver,
+                type_rec: FETCH,
+                mark: 0,
+                last_fetch: 0,
+                last_send: 0,
+                idx: 0,
+                message: 0,
+            })
+            .collect()
+    }
+
+    pub fn from_send(s: Send) -> Self {
+        UserRecord {
+            uid: s.receiver,
+            type_rec: SEND,
+            mark: 0,
+            last_fetch: 0,
+            last_send: 0,
+            idx: 0,
+            message: s.message,
+        }
+    }
+
+    pub fn is_request(&self) -> bool {
+        self.type_rec != USER_STORE
+    }
+
+    pub fn is_new_user_store(&self) -> bool {
+        self.mark == 1 && self.uid != i64::MAX
+    }
+
+    pub fn is_user_store(&self) -> bool {
+        self.type_rec == USER_STORE
+    }
+
+    pub fn is_fetch(&self) -> bool {
+        self.type_rec == FETCH
+    }
+
+    pub fn set_user_store(&mut self) {
+        self.type_rec = USER_STORE;
+    }
+}
+
+impl PartialEq for UserRecord {
+    fn eq(&self, other: &Self) -> bool {
+        self.uid == other.uid && self.type_rec == other.type_rec
+    }
+}
+
+impl PartialOrd for UserRecord {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        let user_ord = self.uid.partial_cmp(&other.uid);
+        let type_ord = self.type_rec.partial_cmp(&other.type_rec);
+        match user_ord {
+            Some(Ordering::Equal) => type_ord,
+            x => x,
+        }
+    }
+}
+
+impl ObliviousOps for UserRecord {
+    fn oselect(cond: bool, a: Self, b: Self) -> Self {
+        UserRecord {
+            uid: i64::oselect(cond, a.uid, b.uid),
+            type_rec: u16::oselect(cond, a.type_rec, b.type_rec),
+            mark: u16::oselect(cond, a.mark, b.mark),
+            last_fetch: u32::oselect(cond, a.last_fetch, b.last_fetch),
+            last_send: u32::oselect(cond, a.last_send, b.last_send),
+            idx: u32::oselect(cond, a.idx, b.idx),
+            message: u64::oselect(cond, a.message, b.message),
+        }
+    }
+
+    fn oswap(cond: bool, a: &mut Self, b: &mut Self) {
+        i64::oswap(cond, &mut a.uid, &mut b.uid);
+        u16::oswap(cond, &mut a.type_rec, &mut b.type_rec);
+        u16::oswap(cond, &mut a.mark, &mut b.mark);
+        u32::oswap(cond, &mut a.last_fetch, &mut b.last_fetch);
+        u32::oswap(cond, &mut a.last_send, &mut b.last_send);
+        u32::oswap(cond, &mut a.idx, &mut b.idx);
+        u64::oswap(cond, &mut a.message, &mut b.message);
+    }
+}

+ 20 - 1
sparta/src/main.rs

@@ -1,3 +1,22 @@
+mod load_balancer;
+use load_balancer::LoadBalancer;
+use omq::{Fetch, Send};
+
 fn main() {
-    println!("Hello, world!");
+    let mut l = LoadBalancer::new(5);
+    let sends: Vec<Send> = (0..3)
+        .map(|x| Send::new(x, x.try_into().unwrap()))
+        .collect();
+
+    let fetches: Vec<Fetch> = vec![Fetch::new(0, 3)];
+
+    let indices = l.get_fetch_requests(fetches);
+    for i in indices.iter() {
+        println!("{:?}", i);
+    }
+
+    let indices = l.get_send_requests(sends);
+    for i in indices.iter() {
+        println!("{:?}", i);
+    }
 }