|
@@ -2,22 +2,10 @@ use crate::omap::ObliviousMap;
|
|
|
pub use crate::record::{IndexRecord, Record, RecordType, SubmapRecord};
|
|
|
use fastapprox::fast;
|
|
|
use otils::{self, ObliviousOps};
|
|
|
-use std::{cmp, f64::consts::E, thread};
|
|
|
+use std::{cmp, f64::consts::E, thread, time::UNIX_EPOCH};
|
|
|
|
|
|
const LAMBDA: usize = 128;
|
|
|
|
|
|
-// pub struct BalanceRecord(Record);
|
|
|
-
|
|
|
-// impl BalanceRecord {}
|
|
|
-
|
|
|
-// impl PartialEq for BalanceRecord {
|
|
|
-// fn eq(&self, other: &Self) -> bool {}
|
|
|
-// }
|
|
|
-
|
|
|
-// impl PartialOrd for BalanceRecord {
|
|
|
-// fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {}
|
|
|
-// }
|
|
|
-
|
|
|
pub struct LoadBalancer {
|
|
|
num_users: i64,
|
|
|
num_threads: usize,
|
|
@@ -33,15 +21,14 @@ impl LoadBalancer {
|
|
|
user_store.reserve(num_users as usize);
|
|
|
user_store.extend((0..num_users).map(|i| IndexRecord::new(i, RecordType::User)));
|
|
|
|
|
|
- let mut submaps = Vec::new();
|
|
|
- submaps.reserve(num_submaps as usize);
|
|
|
+ let mut submaps = Vec::with_capacity(num_submaps as usize);
|
|
|
submaps.extend(
|
|
|
(0..num_submaps).map(|_| ObliviousMap::new(num_threads / num_submaps as usize)),
|
|
|
);
|
|
|
|
|
|
LoadBalancer {
|
|
|
num_users,
|
|
|
- num_threads: num_threads / num_submaps,
|
|
|
+ num_threads,
|
|
|
num_submaps,
|
|
|
user_store,
|
|
|
submaps,
|
|
@@ -82,7 +69,8 @@ impl LoadBalancer {
|
|
|
submap_size: usize,
|
|
|
is_send: bool,
|
|
|
) -> Vec<SubmapRecord> {
|
|
|
- let requests = requests.into_iter().map(|r| SubmapRecord(r.0)).collect();
|
|
|
+ let requests: Vec<SubmapRecord> = requests.into_iter().map(|r| SubmapRecord(r.0)).collect();
|
|
|
+
|
|
|
let mut requests = self.pad_for_submap(requests, submap_size, is_send);
|
|
|
|
|
|
requests = otils::sort(requests, self.num_threads); // sort by omap, then by dummy
|
|
@@ -102,6 +90,7 @@ impl LoadBalancer {
|
|
|
}
|
|
|
|
|
|
otils::compact(&mut requests[..], |r| r.0.mark == 1, self.num_threads);
|
|
|
+ requests.truncate(self.num_submaps * submap_size);
|
|
|
requests
|
|
|
}
|
|
|
|
|
@@ -145,7 +134,7 @@ impl LoadBalancer {
|
|
|
|r| r.is_request(),
|
|
|
self.num_threads,
|
|
|
);
|
|
|
- let requests = self.user_store[0..num_requests].to_vec();
|
|
|
+ let requests = self.user_store.drain(0..num_requests).collect();
|
|
|
|
|
|
otils::compact(
|
|
|
&mut self.user_store[..],
|
|
@@ -172,24 +161,21 @@ impl LoadBalancer {
|
|
|
.collect();
|
|
|
|
|
|
let mut remaining_submaps = &mut self.submaps[..];
|
|
|
- let mut remaining_requests = &mut requests[..self.num_submaps * submap_size];
|
|
|
|
|
|
thread::scope(|s| {
|
|
|
for _ in 0..self.num_submaps - 1 {
|
|
|
let (submap, rest_submaps) = remaining_submaps.split_at_mut(1);
|
|
|
remaining_submaps = rest_submaps;
|
|
|
|
|
|
- let (requests, rest_requests) = remaining_requests.split_at_mut(submap_size);
|
|
|
- remaining_requests = rest_requests;
|
|
|
- s.spawn(|| submap[0].batch_send(requests.to_vec()));
|
|
|
+ let batch = requests.drain(0..submap_size).collect();
|
|
|
+ s.spawn(|| submap[0].batch_send(batch));
|
|
|
}
|
|
|
|
|
|
let (submap, rest_submaps) = remaining_submaps.split_at_mut(1);
|
|
|
remaining_submaps = rest_submaps;
|
|
|
|
|
|
- let (requests, rest_requests) = remaining_requests.split_at_mut(submap_size);
|
|
|
- remaining_requests = rest_requests;
|
|
|
- submap[0].batch_send(requests.to_vec());
|
|
|
+ let batch = requests.drain(0..submap_size).collect();
|
|
|
+ submap[0].batch_send(batch);
|
|
|
});
|
|
|
|
|
|
// parallelize
|
|
@@ -241,7 +227,7 @@ impl LoadBalancer {
|
|
|
|r| r.is_request(),
|
|
|
self.num_threads,
|
|
|
);
|
|
|
- let deliver = self.user_store[0..num_requests].to_vec();
|
|
|
+ let deliver = self.user_store.drain(0..num_requests).collect();
|
|
|
|
|
|
otils::compact(
|
|
|
&mut self.user_store[..],
|
|
@@ -262,52 +248,82 @@ impl LoadBalancer {
|
|
|
.iter()
|
|
|
.fold(0, |acc, fetch| acc + fetch.data as usize);
|
|
|
let fetches = fetches.into_iter().map(|r| IndexRecord(r)).collect();
|
|
|
+
|
|
|
+ // let start = std::time::SystemTime::now()
|
|
|
+ // .duration_since(UNIX_EPOCH)
|
|
|
+ // .unwrap()
|
|
|
+ // .as_nanos();
|
|
|
let requests = self.get_fetch_indices(fetches, num_requests);
|
|
|
+ // let end = std::time::SystemTime::now()
|
|
|
+ // .duration_since(UNIX_EPOCH)
|
|
|
+ // .unwrap()
|
|
|
+ // .as_nanos();
|
|
|
+ // println!("fetch idx {}: {}", requests.len(), end - start);
|
|
|
|
|
|
let submap_size = self.pad_size(requests.len() as f64);
|
|
|
+
|
|
|
+ // let start = std::time::SystemTime::now()
|
|
|
+ // .duration_since(UNIX_EPOCH)
|
|
|
+ // .unwrap()
|
|
|
+ // .as_nanos();
|
|
|
let mut requests: Vec<Record> = self
|
|
|
.get_submap_requests(requests, submap_size, false)
|
|
|
.into_iter()
|
|
|
.map(|r| r.0)
|
|
|
.collect();
|
|
|
-
|
|
|
- let mut responses: Vec<IndexRecord> = Vec::with_capacity(submap_size * self.num_submaps);
|
|
|
- (0..submap_size * self.num_submaps)
|
|
|
- .for_each(|_| responses.push(IndexRecord::new(0, RecordType::Dummy)));
|
|
|
+ // let end = std::time::SystemTime::now()
|
|
|
+ // .duration_since(UNIX_EPOCH)
|
|
|
+ // .unwrap()
|
|
|
+ // .as_nanos();
|
|
|
+ // println!("submap requests {}: {}", requests.len(), end - start);
|
|
|
|
|
|
let mut remaining_submaps = &mut self.submaps[..];
|
|
|
- let mut remaining_requests = &mut requests[..self.num_submaps * submap_size];
|
|
|
- let mut remaining_responses = &mut responses[..];
|
|
|
+ let mut responses: Vec<IndexRecord> = Vec::with_capacity(submap_size * self.num_submaps);
|
|
|
|
|
|
+ // let start = std::time::SystemTime::now()
|
|
|
+ // .duration_since(UNIX_EPOCH)
|
|
|
+ // .unwrap()
|
|
|
+ // .as_nanos();
|
|
|
thread::scope(|s| {
|
|
|
+ let mut handles = Vec::new();
|
|
|
for _ in 0..self.num_submaps - 1 {
|
|
|
let (submap, rest_submaps) = remaining_submaps.split_at_mut(1);
|
|
|
remaining_submaps = rest_submaps;
|
|
|
- // println!("submaps: {}", remaining_submaps.len());
|
|
|
+ let batch = requests.drain(0..submap_size).collect();
|
|
|
|
|
|
- let (batch, rest_requests) = remaining_requests.split_at_mut(submap_size);
|
|
|
- remaining_requests = rest_requests;
|
|
|
- // println!("");
|
|
|
-
|
|
|
- let (responses, rest_responses) = remaining_responses.split_at_mut(submap_size);
|
|
|
- remaining_responses = rest_responses;
|
|
|
- s.spawn(|| submap[0].batch_fetch(batch.to_vec(), responses));
|
|
|
+ handles.push(s.spawn(|| submap[0].batch_fetch(batch)));
|
|
|
}
|
|
|
|
|
|
let (submap, rest_submaps) = remaining_submaps.split_at_mut(1);
|
|
|
remaining_submaps = rest_submaps;
|
|
|
+ let batch = requests.drain(0..submap_size).collect();
|
|
|
|
|
|
- let (batch, rest_requests) = remaining_requests.split_at_mut(submap_size);
|
|
|
- remaining_requests = rest_requests;
|
|
|
+ let response = submap[0].batch_fetch(batch);
|
|
|
|
|
|
- let (responses, rest_responses) = remaining_responses.split_at_mut(submap_size);
|
|
|
- remaining_responses = rest_responses;
|
|
|
- submap[0].batch_fetch(batch.to_vec(), responses);
|
|
|
+ responses.extend(response);
|
|
|
+ for handle in handles.into_iter() {
|
|
|
+ responses.extend(handle.join().unwrap());
|
|
|
+ }
|
|
|
});
|
|
|
+ // let end = std::time::SystemTime::now()
|
|
|
+ // .duration_since(UNIX_EPOCH)
|
|
|
+ // .unwrap()
|
|
|
+ // .as_nanos();
|
|
|
+ // println!("submap response {}: {}", responses.len(), end - start);
|
|
|
|
|
|
// this only really needs to be a shuffle
|
|
|
+ // let start = std::time::SystemTime::now()
|
|
|
+ // .duration_since(UNIX_EPOCH)
|
|
|
+ // .unwrap()
|
|
|
+ // .as_nanos();
|
|
|
responses = otils::sort(responses, self.num_threads);
|
|
|
otils::compact(&mut responses, |r| r.0.is_send(), self.num_threads);
|
|
|
+ // let end = std::time::SystemTime::now()
|
|
|
+ // .duration_since(UNIX_EPOCH)
|
|
|
+ // .unwrap()
|
|
|
+ // .as_nanos();
|
|
|
+ // println!("final: {}", end - start);
|
|
|
+
|
|
|
responses.drain(0..num_requests).map(|r| r.0).collect()
|
|
|
}
|
|
|
}
|