Bläddra i källkod

Changes to avoid unnecessary copying.

Kyle Fredrickson 1 år sedan
förälder
incheckning
7210c13249

+ 15 - 9
baseline/src/main.rs

@@ -1,8 +1,6 @@
 mod omq;
-mod request;
-
-use crate::omq::Omq;
-use crate::request::{Fetch, Send};
+use crate::omq::{Fetch, Omq, Send};
+// use crate::types::{fetch::Fetch, send::Send};
 
 fn main() {
     let mut o = Omq::new();
@@ -10,10 +8,18 @@ fn main() {
         .rev()
         .map(|x| Send::new(x, x.try_into().unwrap()))
         .collect();
+    o.batch_send(sends);
 
-    let fetch: Vec<Fetch> = (0..3)
-        .rev()
-        .map(|x| Fetch::new(x, (x + 1).try_into().unwrap()))
-        .collect();
-    o.process_batch(sends, fetch);
+    let fetches: Vec<Fetch> = (0..3).rev().map(|x| Fetch::new(x, 2)).collect();
+
+    let deliver = o.batch_fetch(fetches);
+    for m in deliver {
+        println!("{:?}", m);
+    }
+
+    let fetches: Vec<Fetch> = (3..6).rev().map(|x| Fetch::new(x, 1)).collect();
+    let deliver = o.batch_fetch(fetches);
+    for m in deliver {
+        println!("{:?}", m);
+    }
 }

+ 0 - 98
baseline/src/omq.rs

@@ -1,98 +0,0 @@
-use crate::request::{Fetch, Request, Send, FETCH};
-use otils::ObliviousOps;
-
-#[derive(Debug)]
-pub struct Omq {
-    message_store: Vec<Request>,
-}
-
-impl Omq {
-    pub fn new() -> Self {
-        Omq {
-            message_store: Vec::new(),
-        }
-    }
-
-    fn update_store(&mut self, sends: Vec<Send>, fetches: Vec<Fetch>, fetch_sum: usize) {
-        let mut size = (self.message_store.len() + sends.len() + fetches.len() + fetch_sum)
-            .next_power_of_two();
-
-        size -= self.message_store.len();
-        self.message_store.reserve(size);
-
-        size -= fetch_sum + fetches.len() + sends.len();
-        for fetch in fetches.iter() {
-            self.message_store
-                .extend(Request::dummies(fetch.receiver, fetch.volume));
-        }
-
-        self.message_store.extend(
-            fetches
-                .into_iter()
-                .map(|x| <Fetch as Into<Request>>::into(x)),
-        );
-
-        self.message_store
-            .extend(sends.into_iter().map(|x| <Send as Into<Request>>::into(x)));
-
-        self.message_store.extend(Request::dummies(-1, size));
-    }
-
-    pub fn process_batch(&mut self, sends: Vec<Send>, fetches: Vec<Fetch>) {
-        let final_size = self.message_store.len() + sends.len();
-        let fetch_sum = fetches.iter().fold(0, |acc, f| acc + f.volume) as usize;
-
-        self.update_store(sends, fetches, fetch_sum);
-
-        otils::osort(&mut self.message_store[..], 8);
-
-        let mut user_sum: isize = 0;
-        let mut prev_user: i64 = -1;
-        for request in self.message_store.iter_mut() {
-            let same_user = prev_user == request.receiver;
-            user_sum = isize::oselect(same_user, user_sum, 0);
-
-            let is_fetch = request.req_type == FETCH;
-            let fetch_more = user_sum > 0;
-            request.mark = u32::oselect(is_fetch, 0, u32::oselect(fetch_more, 1, 0));
-
-            prev_user = request.receiver;
-            user_sum += isize::oselect(
-                is_fetch,
-                request.volume as isize,
-                isize::oselect(fetch_more, -1, 0),
-            );
-        }
-
-        println!("message store");
-        for request in self.message_store.iter() {
-            println!("{:?}", request);
-        }
-
-        let deliver = otils::ofilter(
-            self.message_store.clone(),
-            |r| r.req_type != FETCH && r.mark == 1,
-            fetch_sum,
-            8,
-        );
-
-        println!("deliver");
-        for request in deliver.iter() {
-            println!("{:?}", request);
-        }
-
-        self.message_store = otils::ofilter(
-            self.message_store.clone(),
-            |r| r.receiver >= 0 && r.req_type != FETCH && r.mark == 0,
-            final_size,
-            8,
-        );
-
-        println!("new message store");
-        for request in self.message_store.iter() {
-            println!("{:?}", request);
-        }
-
-        // otils::ofilter(data, f, num_matches, threads)
-    }
-}

+ 75 - 0
baseline/src/omq/mod.rs

@@ -0,0 +1,75 @@
+mod types;
+use otils::ObliviousOps;
+pub use types::{Fetch, Request, Send};
+
+#[derive(Debug)]
+pub struct Omq {
+    message_store: Vec<Request>,
+}
+
+impl Omq {
+    pub fn new() -> Self {
+        Omq {
+            message_store: Vec::new(),
+        }
+    }
+
+    fn update_store(&mut self, fetches: Vec<Fetch>, fetch_sum: usize) {
+        let mut size = (self.message_store.len() + fetches.len() + fetch_sum).next_power_of_two();
+
+        size -= self.message_store.len();
+        self.message_store.reserve(size);
+
+        size -= fetch_sum + fetches.len();
+        for fetch in fetches.iter() {
+            self.message_store
+                .extend(Request::dummies(fetch.receiver, fetch.volume));
+        }
+
+        self.message_store.extend(
+            fetches
+                .into_iter()
+                .map(|x| <Fetch as Into<Request>>::into(x)),
+        );
+
+        self.message_store.extend(Request::dummies(-1, size)); // TODO this is hacky
+    }
+
+    pub fn batch_send(&mut self, sends: Vec<Send>) {
+        self.message_store
+            .extend(sends.into_iter().map(|x| <Send as Into<Request>>::into(x)));
+    }
+
+    pub fn batch_fetch(&mut self, fetches: Vec<Fetch>) -> Vec<Send> {
+        let final_size = self.message_store.len();
+        let fetch_sum = fetches.iter().fold(0, |acc, f| acc + f.volume) as usize;
+        self.update_store(fetches, fetch_sum);
+
+        otils::osort(&mut self.message_store[..], 8);
+
+        let mut user_sum: isize = 0;
+        let mut prev_user: i64 = -1;
+        for request in self.message_store.iter_mut() {
+            let same_user = prev_user == request.receiver;
+            user_sum = isize::oselect(same_user, user_sum, 0);
+
+            let fetch_more = user_sum > 0;
+            request.mark = u32::oselect(request.is_fetch(), 0, u32::oselect(fetch_more, 1, 0));
+
+            prev_user = request.receiver;
+            user_sum += isize::oselect(
+                request.is_fetch(),
+                request.volume as isize,
+                isize::oselect(fetch_more, -1, 0),
+            );
+        }
+
+        otils::ofilter(&mut self.message_store[..], |r| r.should_deliver(), 8);
+        let deliver = self.message_store[0..fetch_sum].to_vec();
+
+        otils::ofilter(&mut self.message_store[..], |r| r.should_defer(), 8);
+        self.message_store.truncate(final_size);
+
+        deliver.into_iter().map(|x| x.into()).collect()
+    }
+}

+ 11 - 0
baseline/src/omq/types/fetch.rs

@@ -0,0 +1,11 @@
+#[derive(Debug)]
+pub struct Fetch {
+    pub receiver: i64,
+    pub volume: usize,
+}
+
+impl Fetch {
+    pub fn new(receiver: i64, volume: usize) -> Self {
+        Fetch { receiver, volume }
+    }
+}

+ 8 - 0
baseline/src/omq/types/mod.rs

@@ -0,0 +1,8 @@
+pub mod fetch;
+pub use fetch::Fetch;
+
+pub mod send;
+pub use send::Send;
+
+pub mod request;
+pub use request::Request;

+ 13 - 24
baseline/src/request.rs → baseline/src/omq/types/request.rs

@@ -1,3 +1,4 @@
+use crate::omq::{Fetch, Send};
 use otils::ObliviousOps;
 use std::cmp::Ordering;
 
@@ -5,30 +6,6 @@ pub const FETCH: u32 = 0;
 pub const SEND: u32 = 1;
 pub const DUMMY: u32 = 2;
 
-#[derive(Debug)]
-pub struct Send {
-    pub receiver: i64,
-    pub message: u64,
-}
-
-impl Send {
-    pub fn new(receiver: i64, message: u64) -> Self {
-        Send { receiver, message }
-    }
-}
-
-#[derive(Debug)]
-pub struct Fetch {
-    pub receiver: i64,
-    pub volume: usize,
-}
-
-impl Fetch {
-    pub fn new(receiver: i64, volume: usize) -> Self {
-        Fetch { receiver, volume }
-    }
-}
-
 #[derive(Debug, Clone, Copy)]
 pub struct Request {
     pub receiver: i64,
@@ -50,6 +27,18 @@ impl Request {
             })
             .collect()
     }
+
+    pub fn is_fetch(&self) -> bool {
+        self.req_type == FETCH
+    }
+
+    pub fn should_deliver(&self) -> bool {
+        !self.is_fetch() && self.mark == 1
+    }
+
+    pub fn should_defer(&self) -> bool {
+        self.receiver >= 0 && !self.is_fetch() && self.mark == 0 // this is also hacky
+    }
 }
 
 impl From<Send> for Request {

+ 23 - 0
baseline/src/omq/types/send.rs

@@ -0,0 +1,23 @@
+// use crate::types::Request;
+use crate::omq::Request;
+
+#[derive(Debug)]
+pub struct Send {
+    pub receiver: i64,
+    pub message: u64,
+}
+
+impl Send {
+    pub fn new(receiver: i64, message: u64) -> Self {
+        Send { receiver, message }
+    }
+}
+
+impl From<Request> for Send {
+    fn from(r: Request) -> Self {
+        Self {
+            receiver: r.receiver,
+            message: r.message,
+        }
+    }
+}

+ 1 - 1
otils

@@ -1 +1 @@
-Subproject commit d9fb93f2ef8ace4dffdb440013ff6ec78b27befe
+Subproject commit 90685fc013a12244824499fabb13d8d2eba187e4