Browse Source

First pass at omq.

Kyle Fredrickson 1 year ago
parent
commit
2e96765229
9 changed files with 315 additions and 4 deletions
  1. 16 4
      baseline/src/main.rs
  2. 98 0
      baseline/src/omq.rs
  3. 152 0
      baseline/src/request.rs
  4. 6 0
      load_balancer/Cargo.toml
  5. 14 0
      load_balancer/src/lib.rs
  6. 6 0
      omq/Cargo.toml
  7. 14 0
      omq/src/lib.rs
  8. 6 0
      sparta/Cargo.toml
  9. 3 0
      sparta/src/main.rs

+ 16 - 4
baseline/src/main.rs

@@ -1,7 +1,19 @@
-use otils;
+mod omq;
+mod request;
+
+use crate::omq::Omq;
+use crate::request::{Fetch, Send};
 
 fn main() {
-    let mut v: Vec<i64> = (0..128).rev().collect();
-    otils::osort(&mut v[..], 8);
-    println!("{:?}", v);
+    let mut o = Omq::new();
+    let sends: Vec<Send> = (0..8)
+        .rev()
+        .map(|x| Send::new(x, x.try_into().unwrap()))
+        .collect();
+
+    let fetch: Vec<Fetch> = (0..3)
+        .rev()
+        .map(|x| Fetch::new(x, (x + 1).try_into().unwrap()))
+        .collect();
+    o.process_batch(sends, fetch);
 }

+ 98 - 0
baseline/src/omq.rs

@@ -0,0 +1,98 @@
+use crate::request::{Fetch, Request, Send, FETCH};
+use otils::ObliviousOps;
+
+#[derive(Debug)]
+pub struct Omq {
+    message_store: Vec<Request>,
+}
+
+impl Omq {
+    pub fn new() -> Self {
+        Omq {
+            message_store: Vec::new(),
+        }
+    }
+
+    fn update_store(&mut self, sends: Vec<Send>, fetches: Vec<Fetch>, fetch_sum: usize) {
+        let mut size = (self.message_store.len() + sends.len() + fetches.len() + fetch_sum)
+            .next_power_of_two();
+
+        size -= self.message_store.len();
+        self.message_store.reserve(size);
+
+        size -= fetch_sum + fetches.len() + sends.len();
+        for fetch in fetches.iter() {
+            self.message_store
+                .extend(Request::dummies(fetch.receiver, fetch.volume));
+        }
+
+        self.message_store.extend(
+            fetches
+                .into_iter()
+                .map(|x| <Fetch as Into<Request>>::into(x)),
+        );
+
+        self.message_store
+            .extend(sends.into_iter().map(|x| <Send as Into<Request>>::into(x)));
+
+        self.message_store.extend(Request::dummies(-1, size));
+    }
+
+    pub fn process_batch(&mut self, sends: Vec<Send>, fetches: Vec<Fetch>) {
+        let final_size = self.message_store.len() + sends.len();
+        let fetch_sum = fetches.iter().fold(0, |acc, f| acc + f.volume) as usize;
+
+        self.update_store(sends, fetches, fetch_sum);
+
+        otils::osort(&mut self.message_store[..], 8);
+
+        let mut user_sum: isize = 0;
+        let mut prev_user: i64 = -1;
+        for request in self.message_store.iter_mut() {
+            let same_user = prev_user == request.receiver;
+            user_sum = isize::oselect(same_user, user_sum, 0);
+
+            let is_fetch = request.req_type == FETCH;
+            let fetch_more = user_sum > 0;
+            request.mark = u32::oselect(is_fetch, 0, u32::oselect(fetch_more, 1, 0));
+
+            prev_user = request.receiver;
+            user_sum += isize::oselect(
+                is_fetch,
+                request.volume as isize,
+                isize::oselect(fetch_more, -1, 0),
+            );
+        }
+
+        println!("message store");
+        for request in self.message_store.iter() {
+            println!("{:?}", request);
+        }
+
+        let deliver = otils::ofilter(
+            self.message_store.clone(),
+            |r| r.req_type != FETCH && r.mark == 1,
+            fetch_sum,
+            8,
+        );
+
+        println!("deliver");
+        for request in deliver.iter() {
+            println!("{:?}", request);
+        }
+
+        self.message_store = otils::ofilter(
+            self.message_store.clone(),
+            |r| r.receiver >= 0 && r.req_type != FETCH && r.mark == 0,
+            final_size,
+            8,
+        );
+
+        println!("new message store");
+        for request in self.message_store.iter() {
+            println!("{:?}", request);
+        }
+
+        // otils::ofilter(data, f, num_matches, threads)
+    }
+}

+ 152 - 0
baseline/src/request.rs

@@ -0,0 +1,152 @@
+use otils::ObliviousOps;
+use std::cmp::Ordering;
+
+pub const FETCH: u32 = 0;
+pub const SEND: u32 = 1;
+pub const DUMMY: u32 = 2;
+
+#[derive(Debug)]
+pub struct Send {
+    pub receiver: i64,
+    pub message: u64,
+}
+
+impl Send {
+    pub fn new(receiver: i64, message: u64) -> Self {
+        Send { receiver, message }
+    }
+}
+
+#[derive(Debug)]
+pub struct Fetch {
+    pub receiver: i64,
+    pub volume: usize,
+}
+
+impl Fetch {
+    pub fn new(receiver: i64, volume: usize) -> Self {
+        Fetch { receiver, volume }
+    }
+}
+
+#[derive(Debug, Clone, Copy)]
+pub struct Request {
+    pub receiver: i64,
+    pub req_type: u32,
+    pub mark: u32,
+    pub volume: usize,
+    pub message: u64,
+}
+
+impl Request {
+    pub fn dummies(receiver: i64, len: usize) -> Vec<Self> {
+        (0..len)
+            .map(|_| Request {
+                receiver: receiver,
+                req_type: DUMMY,
+                mark: 0,
+                volume: 0,
+                message: 0,
+            })
+            .collect()
+    }
+}
+
+impl From<Send> for Request {
+    fn from(s: Send) -> Self {
+        Request {
+            receiver: s.receiver,
+            req_type: SEND,
+            mark: 0,
+            volume: 0,
+            message: s.message,
+        }
+    }
+}
+
+impl From<Fetch> for Request {
+    fn from(f: Fetch) -> Self {
+        Request {
+            receiver: f.receiver,
+            req_type: FETCH,
+            mark: 0,
+            volume: f.volume,
+            message: 0,
+        }
+    }
+}
+
+impl PartialEq for Request {
+    fn eq(&self, other: &Self) -> bool {
+        self.receiver == other.receiver && self.req_type == other.req_type
+    }
+}
+
+// TODO this is not oblivious, just add the comparators back into otils later.
+impl PartialOrd for Request {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        let receiver_ord = self.receiver.partial_cmp(&other.receiver);
+        let type_ord = self.req_type.partial_cmp(&other.req_type);
+        match receiver_ord {
+            Some(Ordering::Equal) => type_ord,
+            Some(x) => Some(x),
+            None => None,
+        }
+    }
+}
+
+impl ObliviousOps for Request {
+    fn oselect(cond: bool, a: Self, b: Self) -> Self {
+        Request {
+            receiver: i64::oselect(cond, a.receiver, b.receiver),
+            req_type: u32::oselect(cond, a.req_type, b.req_type),
+            mark: u32::oselect(cond, a.mark, b.mark),
+            volume: usize::oselect(cond, a.volume, b.volume),
+            message: u64::oselect(cond, a.message, b.message),
+        }
+    }
+
+    fn oswap(cond: bool, a: &mut Self, b: &mut Self) {
+        i64::oswap(cond, &mut a.receiver, &mut b.receiver);
+        u32::oswap(cond, &mut a.req_type, &mut b.req_type);
+        u32::oswap(cond, &mut a.mark, &mut b.mark);
+        usize::oswap(cond, &mut a.volume, &mut b.volume);
+        u64::oswap(cond, &mut a.message, &mut b.message);
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_eq() {
+        let s_less: Request = Send::new(0, 0).into();
+        let f_less: Request = Fetch::new(0, 0).into();
+
+        assert!(s_less == s_less);
+        assert!(f_less != s_less);
+        assert!(f_less == f_less);
+    }
+
+    #[test]
+    fn test_ord() {
+        let s_less: Request = Send::new(0, 0).into();
+        let s_great: Request = Send::new(1, 0).into();
+        let f_less: Request = Fetch::new(0, 0).into();
+        let f_great: Request = Fetch::new(1, 0).into();
+
+        assert!(s_less < s_great);
+        assert!(s_great > s_less);
+        assert!(s_great == s_great);
+
+        assert!(f_less < f_great);
+        assert!(f_great > f_less);
+        assert!(f_great == f_great);
+
+        assert!(f_less < s_less);
+        assert!(f_less < s_great);
+        assert!(f_great < s_great);
+        assert!(f_great > s_less);
+    }
+}

+ 6 - 0
load_balancer/Cargo.toml

@@ -0,0 +1,6 @@
+[package]
+name = "load_balancer"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]

+ 14 - 0
load_balancer/src/lib.rs

@@ -0,0 +1,14 @@
+pub fn add(left: usize, right: usize) -> usize {
+    left + right
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn it_works() {
+        let result = add(2, 2);
+        assert_eq!(result, 4);
+    }
+}

+ 6 - 0
omq/Cargo.toml

@@ -0,0 +1,6 @@
+[package]
+name = "omq"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]

+ 14 - 0
omq/src/lib.rs

@@ -0,0 +1,14 @@
+pub fn add(left: usize, right: usize) -> usize {
+    left + right
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn it_works() {
+        let result = add(2, 2);
+        assert_eq!(result, 4);
+    }
+}

+ 6 - 0
sparta/Cargo.toml

@@ -0,0 +1,6 @@
+[package]
+name = "sparta"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]

+ 3 - 0
sparta/src/main.rs

@@ -0,0 +1,3 @@
+fn main() {
+    println!("Hello, world!");
+}