Browse Source

Added bench, fixed queue ordering.

Kyle Fredrickson 1 year ago
parent
commit
37e7927776
3 changed files with 46 additions and 16 deletions
  1. 10 9
      baseline/src/main.rs
  2. 25 4
      baseline/src/omq/mod.rs
  3. 11 3
      baseline/src/omq/types/request.rs

+ 10 - 9
baseline/src/main.rs

@@ -1,25 +1,26 @@
+#![feature(test)]
+
 mod omq;
 use crate::omq::{Fetch, Omq, Send};
 // use crate::types::{fetch::Fetch, send::Send};
 
 fn main() {
     let mut o = Omq::new();
-    let sends: Vec<Send> = (0..8)
-        .rev()
-        .map(|x| Send::new(x, x.try_into().unwrap()))
+    let sends: Vec<Send> = (0..100)
+        .map(|x| Send::new(0, x.try_into().unwrap()))
         .collect();
     o.batch_send(sends);
 
-    let fetches: Vec<Fetch> = (0..3).rev().map(|x| Fetch::new(x, 2)).collect();
+    let fetches: Vec<Fetch> = vec![Fetch::new(0, 20)];
 
     let deliver = o.batch_fetch(fetches);
     for m in deliver {
         println!("{:?}", m);
     }
 
-    let fetches: Vec<Fetch> = (3..6).rev().map(|x| Fetch::new(x, 1)).collect();
-    let deliver = o.batch_fetch(fetches);
-    for m in deliver {
-        println!("{:?}", m);
-    }
+    // let fetches: Vec<Fetch> = (3..6).rev().map(|x| Fetch::new(x, 1)).collect();
+    // let deliver = o.batch_fetch(fetches);
+    // for m in deliver {
+    //     println!("{:?}", m);
+    // }
 }

+ 25 - 4
baseline/src/omq/mod.rs

@@ -36,8 +36,10 @@ impl Omq {
     }
 
     pub fn batch_send(&mut self, sends: Vec<Send>) {
-        self.message_store
-            .extend(sends.into_iter().map(|x| <Send as Into<Request>>::into(x)));
+        let requests = sends
+            .into_iter()
+            .map(|send| <Send as Into<Request>>::into(send));
+        self.message_store.extend(requests);
     }
 
     pub fn batch_fetch(&mut self, fetches: Vec<Fetch>) -> Vec<Send> {
@@ -64,12 +66,31 @@ impl Omq {
             );
         }
 
-        otils::ofilter(&mut self.message_store[..], |r| r.should_deliver(), 8);
+        otils::ocompact(&mut self.message_store[..], |r| r.should_deliver(), 8);
         let deliver = self.message_store[0..fetch_sum].to_vec();
 
-        otils::ofilter(&mut self.message_store[..], |r| r.should_defer(), 8);
+        otils::ocompact(&mut self.message_store[..], |r| r.should_defer(), 8);
         self.message_store.truncate(final_size);
 
         deliver.into_iter().map(|x| x.into()).collect()
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    extern crate test;
+    use test::Bencher;
+
+    #[bench]
+    fn bench_fetch(b: &mut Bencher) {
+        let mut o = Omq::new();
+        let sends: Vec<Send> = (0..0x100000)
+            .map(|x| Send::new(0, x.try_into().unwrap()))
+            .collect();
+        o.batch_send(sends);
+
+        b.iter(|| o.batch_fetch(vec![Fetch::new(0, 0x80000)]));
+    }
+}

+ 11 - 3
baseline/src/omq/types/request.rs

@@ -1,6 +1,6 @@
 use crate::omq::{Fetch, Send};
 use otils::ObliviousOps;
-use std::cmp::Ordering;
+use std::{cmp::Ordering, time::UNIX_EPOCH};
 
 pub const FETCH: u32 = 0;
 pub const SEND: u32 = 1;
@@ -47,7 +47,10 @@ impl From<Send> for Request {
             receiver: s.receiver,
             req_type: SEND,
             mark: 0,
-            volume: 0,
+            volume: std::time::SystemTime::now()
+                .duration_since(UNIX_EPOCH)
+                .unwrap()
+                .as_nanos() as usize,
             message: s.message,
         }
     }
@@ -76,8 +79,13 @@ impl PartialOrd for Request {
     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
         let receiver_ord = self.receiver.partial_cmp(&other.receiver);
         let type_ord = self.req_type.partial_cmp(&other.req_type);
+        let vol_ord = self.volume.partial_cmp(&other.volume);
         match receiver_ord {
-            Some(Ordering::Equal) => type_ord,
+            Some(Ordering::Equal) => match type_ord {
+                Some(Ordering::Equal) => vol_ord,
+                Some(x) => Some(x),
+                None => None,
+            },
             Some(x) => Some(x),
             None => None,
         }