Browse Source

Modified sort that sorting is easier.

Kyle Fredrickson 1 year ago
parent
commit
6c9b3d9753
6 changed files with 140 additions and 171 deletions
  1. 10 22
      baseline/src/main.rs
  2. 46 32
      omq/src/lib.rs
  3. 84 69
      omq/src/request.rs
  4. 0 14
      omq/src/types/fetch.rs
  5. 0 8
      omq/src/types/mod.rs
  6. 0 26
      omq/src/types/send.rs

+ 10 - 22
baseline/src/main.rs

@@ -1,29 +1,17 @@
-use omq::{Fetch, Omq, Send};
+use omq::{Omq, Request};
 
 fn main() {
     let mut o = Omq::new();
-    let sends: Vec<Send> = (0..6)
-        .map(|x| Send::new(0, x.try_into().unwrap()))
+
+    let sends: Vec<Request> = (0..0x8)
+        .map(|x| Request::new_send(0, x.try_into().unwrap()))
         .collect();
     o.batch_send(sends);
+    // let mut v = vec![Request::new_fetch(0, 1); 0x200000];
 
-    let fetches: Vec<Fetch> = vec![Fetch::new(0, 3)];
-
-    let deliver = o.batch_fetch(fetches);
-    for m in deliver {
-        println!("{:?}", m);
-    }
-
-    let fetches: Vec<Fetch> = vec![Fetch::new(0, 3)];
-
-    let deliver = o.batch_fetch(fetches);
-    for m in deliver {
-        println!("{:?}", m);
-    }
-
-    // let fetches: Vec<Fetch> = (3..6).rev().map(|x| Fetch::new(x, 1)).collect();
-    // let deliver = o.batch_fetch(fetches);
-    // for m in deliver {
-    //     println!("{:?}", m);
-    // }
+    // let start = Instant::now();
+    // sort(&mut v[..], 8);
+    o.batch_fetch(vec![Request::new_fetch(0, 0x7)]);
+    //     let time = start.elapsed();
+    //     println!("{:?}", time);
 }

+ 46 - 32
omq/src/lib.rs

@@ -1,8 +1,8 @@
 #![feature(test)]
 
-mod types;
+mod request;
 use otils::ObliviousOps;
-pub use types::{Fetch, Request, Send};
+pub use request::Request;
 
 #[derive(Debug)]
 pub struct Omq {
@@ -16,65 +16,76 @@ impl Omq {
         }
     }
 
-    pub fn batch_send(&mut self, sends: Vec<Send>) {
-        let requests = sends
-            .into_iter()
-            .map(|send| <Send as Into<Request>>::into(send));
-        self.message_store.extend(requests);
+    pub fn batch_send(&mut self, sends: Vec<Request>) {
+        for s in sends.iter() {
+            println!("{:?}", s);
+        }
+        println!();
+        self.message_store.reserve(sends.len());
+        self.message_store.extend(sends);
     }
 
-    fn update_store(&mut self, fetches: Vec<Fetch>, fetch_sum: usize) {
-        let mut size = (self.message_store.len() + fetches.len() + fetch_sum).next_power_of_two();
+    fn update_store(&mut self, fetches: Vec<Request>, fetch_sum: usize) {
+        let size = self.message_store.len() + fetches.len() + fetch_sum;
 
-        size -= self.message_store.len();
-        self.message_store.reserve(size);
+        self.message_store.reserve(size - self.message_store.len());
 
-        size -= fetch_sum + fetches.len();
         for fetch in fetches.iter() {
             self.message_store
-                .extend(Request::dummies(fetch.receiver, fetch.volume));
+                .extend(Request::dummies(fetch.uid, fetch.volume));
         }
 
-        self.message_store.extend(
-            fetches
-                .into_iter()
-                .map(|x| <Fetch as Into<Request>>::into(x)),
-        );
-
-        self.message_store.extend((0..size).map(|_| Request::max()));
+        self.message_store.extend(fetches);
     }
 
-    pub fn batch_fetch(&mut self, fetches: Vec<Fetch>) -> Vec<Send> {
+    pub fn batch_fetch(&mut self, fetches: Vec<Request>) -> Vec<Request> {
         let final_size = self.message_store.len();
         let fetch_sum = fetches.iter().fold(0, |acc, f| acc + f.volume) as usize;
         self.update_store(fetches, fetch_sum);
 
-        otils::osort(&mut self.message_store[..], 8);
+        self.message_store = otils::sort(std::mem::take(&mut self.message_store), 8);
+        println!("sorted");
+        for record in self.message_store.iter() {
+            println!("{:?}", record);
+        }
+        println!();
 
         let mut user_sum: isize = 0;
-        let mut prev_user: i64 = -1;
+        let mut prev_user: i32 = -1;
         for request in self.message_store.iter_mut() {
-            let same_user = prev_user == request.receiver;
+            let same_user = prev_user == request.uid;
             user_sum = isize::oselect(same_user, user_sum, 0);
 
             let fetch_more = user_sum > 0;
-            request.mark = u32::oselect(request.is_fetch(), 0, u32::oselect(fetch_more, 1, 0));
+            request.mark = u16::oselect(request.is_fetch(), 0, u16::oselect(fetch_more, 1, 0));
 
-            prev_user = request.receiver;
+            prev_user = request.uid;
             user_sum += isize::oselect(
                 request.is_fetch(),
                 request.volume as isize,
                 isize::oselect(fetch_more, -1, 0),
             );
         }
+        for record in self.message_store.iter() {
+            println!("{:?}", record);
+        }
+        println!();
 
-        otils::ocompact(&mut self.message_store[..], |r| r.should_deliver(), 8);
+        otils::compact(&mut self.message_store[..], |r| r.should_deliver(), 8);
         let deliver = self.message_store[0..fetch_sum].to_vec();
+        for record in deliver.iter() {
+            println!("{:?}", record);
+        }
+        println!();
 
-        otils::ocompact(&mut self.message_store[..], |r| r.should_defer(), 8);
+        otils::compact(&mut self.message_store[..], |r| r.should_defer(), 8);
         self.message_store.truncate(final_size);
+        for record in self.message_store.iter() {
+            println!("{:?}", record);
+        }
+        println!();
 
-        deliver.into_iter().map(|x| x.into()).collect()
+        deliver
     }
 }
 
@@ -88,11 +99,14 @@ mod tests {
     #[bench]
     fn bench_fetch(b: &mut Bencher) {
         let mut o = Omq::new();
-        let sends: Vec<Send> = (0..0x100000)
-            .map(|x| Send::new(0, x.try_into().unwrap()))
+
+        let sends: Vec<Request> = (0..1048576)
+            .map(|x| Request::new_send(0, x.try_into().unwrap()))
             .collect();
         o.batch_send(sends);
 
-        b.iter(|| o.batch_fetch(vec![Fetch::new(0, 0x80000)]));
+        // b.iter(|| 1 + 1);
+
+        b.iter(|| o.batch_fetch(vec![Request::new_fetch(0, 1048575)]));
     }
 }

+ 84 - 69
omq/src/types/request.rs → omq/src/request.rs

@@ -1,43 +1,66 @@
-use crate::{Fetch, Send};
-use otils::ObliviousOps;
 use std::{cmp::Ordering, time::UNIX_EPOCH};
 
-pub const FETCH: u32 = 0;
-pub const SEND: u32 = 1;
-pub const DUMMY: u32 = 2;
+use otils::Max;
+
+pub const FETCH: u16 = 0;
+pub const SEND: u16 = 1;
+pub const DUMMY: u16 = 2;
 
 #[derive(Debug, Clone, Copy)]
 pub struct Request {
-    pub receiver: i64,
-    pub req_type: u32,
-    pub mark: u32,
+    pub uid: i32,
+    pub req_type: u16,
+    pub mark: u16,
     pub volume: usize,
     pub message: u64,
+    pub _dum: [u64; 13],
 }
 
 impl Request {
-    pub fn dummies(receiver: i64, len: usize) -> Vec<Self> {
+    pub fn new_send(uid: i32, message: u64) -> Self {
+        if uid >= i32::MAX {
+            panic!("uid: out of bounds.");
+        }
+        Request {
+            uid,
+            req_type: SEND,
+            mark: 0,
+            volume: std::time::SystemTime::now()
+                .duration_since(UNIX_EPOCH)
+                .unwrap()
+                .as_nanos() as usize,
+            message,
+            _dum: [0; 13],
+        }
+    }
+
+    pub fn new_fetch(uid: i32, volume: usize) -> Self {
+        if uid >= i32::MAX {
+            panic!("uid: out of bounds.");
+        }
+        Request {
+            uid,
+            req_type: FETCH,
+            mark: 0,
+            volume,
+            message: 0,
+            _dum: [0; 13],
+        }
+    }
+
+    pub fn dummies(uid: i32, len: usize) -> Vec<Self> {
         (0..len)
             .map(|_| Request {
-                receiver,
+                uid,
                 req_type: DUMMY,
                 mark: 0,
                 volume: 0,
                 message: 0,
+                _dum: [0; 13],
             })
             .collect()
     }
 
-    pub fn max() -> Self {
-        Request {
-            receiver: i64::MAX,
-            req_type: FETCH,
-            mark: 0,
-            volume: 0,
-            message: 0,
-        }
-    }
-
     pub fn is_fetch(&self) -> bool {
         self.req_type == FETCH
     }
@@ -51,43 +74,55 @@ impl Request {
     }
 }
 
-impl From<Send> for Request {
-    fn from(s: Send) -> Self {
-        Request {
-            receiver: s.receiver,
-            req_type: SEND,
-            mark: 0,
-            volume: std::time::SystemTime::now()
-                .duration_since(UNIX_EPOCH)
-                .unwrap()
-                .as_nanos() as usize,
-            message: s.message,
-        }
-    }
-}
-
-impl From<Fetch> for Request {
-    fn from(f: Fetch) -> Self {
+impl Max for Request {
+    fn maximum() -> Self {
         Request {
-            receiver: f.receiver,
-            req_type: FETCH,
+            uid: i32::MAX,
+            req_type: DUMMY,
             mark: 0,
-            volume: f.volume,
+            volume: 0,
             message: 0,
+            _dum: [0; 13],
         }
     }
 }
 
+// impl From<Send> for Request {
+//     fn from(s: Send) -> Self {
+//         Request {
+//             receiver: s.receiver,
+//             req_type: SEND,
+//             mark: 0,
+//             volume: std::time::SystemTime::now()
+//                 .duration_since(UNIX_EPOCH)
+//                 .unwrap()
+//                 .as_nanos() as usize,
+//             message: s.message,
+//         }
+//     }
+// }
+
+// impl From<Fetch> for Request {
+//     fn from(f: Fetch) -> Self {
+//         Request {
+//             receiver: f.receiver,
+//             req_type: FETCH,
+//             mark: 0,
+//             volume: f.volume,
+//             message: 0,
+//         }
+//     }
+// }
+
 impl PartialEq for Request {
     fn eq(&self, other: &Self) -> bool {
-        self.receiver == other.receiver && self.req_type == other.req_type
+        self.uid == other.uid && self.req_type == other.req_type && self.volume == other.volume
     }
 }
 
-// TODO this is not oblivious, just add the comparators back into otils later.
 impl PartialOrd for Request {
     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
-        let receiver_ord = self.receiver.partial_cmp(&other.receiver);
+        let receiver_ord = self.uid.partial_cmp(&other.uid);
         let type_ord = self.req_type.partial_cmp(&other.req_type);
         let vol_ord = self.volume.partial_cmp(&other.volume);
         match receiver_ord {
@@ -100,34 +135,14 @@ impl PartialOrd for Request {
     }
 }
 
-impl ObliviousOps for Request {
-    fn oselect(cond: bool, a: Self, b: Self) -> Self {
-        Request {
-            receiver: i64::oselect(cond, a.receiver, b.receiver),
-            req_type: u32::oselect(cond, a.req_type, b.req_type),
-            mark: u32::oselect(cond, a.mark, b.mark),
-            volume: usize::oselect(cond, a.volume, b.volume),
-            message: u64::oselect(cond, a.message, b.message),
-        }
-    }
-
-    fn oswap(cond: bool, a: &mut Self, b: &mut Self) {
-        i64::oswap(cond, &mut a.receiver, &mut b.receiver);
-        u32::oswap(cond, &mut a.req_type, &mut b.req_type);
-        u32::oswap(cond, &mut a.mark, &mut b.mark);
-        usize::oswap(cond, &mut a.volume, &mut b.volume);
-        u64::oswap(cond, &mut a.message, &mut b.message);
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
 
     #[test]
     fn test_eq() {
-        let s_less: Request = Send::new(0, 0).into();
-        let f_less: Request = Fetch::new(0, 0).into();
+        let s_less: Request = Request::new_send(0, 0);
+        let f_less: Request = Request::new_fetch(0, 0);
 
         assert!(s_less == s_less);
         assert!(f_less != s_less);
@@ -136,10 +151,10 @@ mod tests {
 
     #[test]
     fn test_ord() {
-        let s_less: Request = Send::new(0, 0).into();
-        let s_great: Request = Send::new(1, 0).into();
-        let f_less: Request = Fetch::new(0, 0).into();
-        let f_great: Request = Fetch::new(1, 0).into();
+        let s_less: Request = Request::new_send(0, 0);
+        let s_great: Request = Request::new_send(1, 0);
+        let f_less: Request = Request::new_fetch(0, 0);
+        let f_great: Request = Request::new_fetch(1, 0);
 
         assert!(s_less < s_great);
         assert!(s_great > s_less);

+ 0 - 14
omq/src/types/fetch.rs

@@ -1,14 +0,0 @@
-#[derive(Debug)]
-pub struct Fetch {
-    pub receiver: i64,
-    pub volume: usize,
-}
-
-impl Fetch {
-    pub fn new(receiver: i64, volume: usize) -> Self {
-        if receiver == i64::MAX {
-            panic!("Key out of bounds: {}", receiver);
-        }
-        Fetch { receiver, volume }
-    }
-}

+ 0 - 8
omq/src/types/mod.rs

@@ -1,8 +0,0 @@
-pub mod fetch;
-pub use fetch::Fetch;
-
-pub mod send;
-pub use send::Send;
-
-pub mod request;
-pub use request::Request;

+ 0 - 26
omq/src/types/send.rs

@@ -1,26 +0,0 @@
-// use crate::types::Request;
-use crate::Request;
-
-#[derive(Debug)]
-pub struct Send {
-    pub receiver: i64,
-    pub message: u64,
-}
-
-impl Send {
-    pub fn new(receiver: i64, message: u64) -> Self {
-        if receiver == i64::MAX {
-            panic!("Key out of bounds: {}", receiver);
-        }
-        Send { receiver, message }
-    }
-}
-
-impl From<Request> for Send {
-    fn from(r: Request) -> Self {
-        Self {
-            receiver: r.receiver,
-            message: r.message,
-        }
-    }
-}