|
|
@@ -1,41 +1,41 @@
|
|
|
-pub use crate::record::{Record, SubmapRequest};
|
|
|
-use crate::ObliviousMap;
|
|
|
+use crate::omap::ObliviousMap;
|
|
|
+pub use crate::record::{IndexRecord, Record, RecordType, SubmapRecord};
|
|
|
use fastapprox::fast;
|
|
|
use otils::{self, ObliviousOps};
|
|
|
use std::{cmp, f64::consts::E};
|
|
|
|
|
|
const LAMBDA: usize = 128;
|
|
|
|
|
|
-pub struct BalanceRecord(Record);
|
|
|
+// pub struct BalanceRecord(Record);
|
|
|
|
|
|
-impl BalanceRecord {}
|
|
|
+// impl BalanceRecord {}
|
|
|
|
|
|
-impl PartialEq for BalanceRecord {
|
|
|
- fn eq(&self, other: &Self) -> bool {}
|
|
|
-}
|
|
|
+// impl PartialEq for BalanceRecord {
|
|
|
+// fn eq(&self, other: &Self) -> bool {}
|
|
|
+// }
|
|
|
|
|
|
-impl PartialOrd for BalanceRecord {
|
|
|
- fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {}
|
|
|
-}
|
|
|
+// impl PartialOrd for BalanceRecord {
|
|
|
+// fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {}
|
|
|
+// }
|
|
|
|
|
|
pub struct LoadBalancer {
|
|
|
num_users: i64,
|
|
|
num_threads: usize,
|
|
|
num_submaps: usize,
|
|
|
|
|
|
- pub user_store: Vec<Record>,
|
|
|
- pub omaps: Vec<ObliviousMap>,
|
|
|
+ pub user_store: Vec<IndexRecord>,
|
|
|
+ pub submaps: Vec<ObliviousMap>,
|
|
|
}
|
|
|
|
|
|
impl LoadBalancer {
|
|
|
pub fn new(num_users: i64, num_threads: usize, num_submaps: usize) -> Self {
|
|
|
let mut user_store = Vec::new();
|
|
|
user_store.reserve(num_users as usize);
|
|
|
- user_store.extend((0..num_users).map(|i| Record::new(i)));
|
|
|
+ user_store.extend((0..num_users).map(|i| IndexRecord::new(i, RecordType::User)));
|
|
|
|
|
|
- let mut omaps = Vec::new();
|
|
|
- omaps.reserve(num_submaps as usize);
|
|
|
- omaps.extend(
|
|
|
+ let mut submaps = Vec::new();
|
|
|
+ submaps.reserve(num_submaps as usize);
|
|
|
+ submaps.extend(
|
|
|
(0..num_submaps).map(|_| ObliviousMap::new(num_threads / num_submaps as usize)),
|
|
|
);
|
|
|
|
|
|
@@ -44,7 +44,7 @@ impl LoadBalancer {
|
|
|
num_threads,
|
|
|
num_submaps,
|
|
|
user_store,
|
|
|
- omaps,
|
|
|
+ submaps,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -58,86 +58,87 @@ impl LoadBalancer {
|
|
|
.ceil() as usize
|
|
|
}
|
|
|
|
|
|
- fn update_with_sends(&mut self, sends: Vec<Record>) {
|
|
|
- let mut size = (self.user_store.len() + sends.len()).next_power_of_two();
|
|
|
- size -= self.user_store.len();
|
|
|
- self.user_store.reserve(size);
|
|
|
- size -= sends.len();
|
|
|
-
|
|
|
- self.user_store.extend(sends);
|
|
|
+ pub fn pad_for_submap(
|
|
|
+ &self,
|
|
|
+ mut requests: Vec<SubmapRecord>,
|
|
|
+ submap_size: usize,
|
|
|
+ is_send: bool,
|
|
|
+ ) -> Vec<SubmapRecord> {
|
|
|
+ requests.reserve(self.num_submaps * submap_size);
|
|
|
|
|
|
- self.user_store.extend((0..size).map(|_| Record::max()));
|
|
|
+ for submap in 0..self.num_submaps {
|
|
|
+ if is_send {
|
|
|
+ requests.extend(SubmapRecord::dummy_send(submap_size, submap as u8));
|
|
|
+ } else {
|
|
|
+ requests.extend(SubmapRecord::dummy_fetch(submap_size, submap as u8));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ requests
|
|
|
}
|
|
|
|
|
|
- fn update_with_fetches(&mut self, fetches: Vec<Record>, num_fetches: usize) {
|
|
|
- let mut size = (self.user_store.len() + num_fetches).next_power_of_two();
|
|
|
- size -= self.user_store.len();
|
|
|
- self.user_store.reserve(size);
|
|
|
+ pub fn get_submap_requests(
|
|
|
+ &self,
|
|
|
+ requests: Vec<IndexRecord>,
|
|
|
+ submap_size: usize,
|
|
|
+ is_send: bool,
|
|
|
+ ) -> Vec<SubmapRecord> {
|
|
|
+ let requests = requests.into_iter().map(|r| SubmapRecord(r.0)).collect();
|
|
|
+ let mut requests = self.pad_for_submap(requests, submap_size, is_send);
|
|
|
|
|
|
- size -= num_fetches;
|
|
|
- for fetch in fetches.into_iter() {
|
|
|
- self.user_store.extend(fetch.dummies());
|
|
|
+ requests = otils::sort(requests, self.num_threads); // sort by omap, then by dummy
|
|
|
+
|
|
|
+ let mut prev_map = self.num_submaps;
|
|
|
+ let mut remaining_marks = submap_size as i32;
|
|
|
+ for request in requests.iter_mut() {
|
|
|
+ let submap = request.0.map as u32;
|
|
|
+ remaining_marks = i32::oselect(
|
|
|
+ submap != prev_map as u32,
|
|
|
+ submap_size as i32,
|
|
|
+ remaining_marks,
|
|
|
+ );
|
|
|
+ request.0.mark = u16::oselect(remaining_marks > 0, 1, 0);
|
|
|
+ remaining_marks += i32::oselect(remaining_marks > 0, -1, 0);
|
|
|
+ prev_map = submap as usize;
|
|
|
}
|
|
|
|
|
|
- self.user_store.extend((0..size).map(|_| Record::max()));
|
|
|
+ otils::compact(&mut requests[..], |r| r.0.mark == 1, self.num_threads);
|
|
|
+ requests
|
|
|
}
|
|
|
|
|
|
- fn construct_send_indices(&mut self) {
|
|
|
+ fn propagate_send_indices(&mut self) {
|
|
|
let mut idx: u32 = 0;
|
|
|
let mut is_same_u: bool;
|
|
|
|
|
|
let mut user_store_iter = self.user_store.iter_mut().peekable();
|
|
|
while let Some(record) = user_store_iter.next() {
|
|
|
- let is_user_store = record.is_user_store();
|
|
|
+ let is_user_store = record.0.is_user_store();
|
|
|
|
|
|
idx = u32::oselect(
|
|
|
is_user_store,
|
|
|
- cmp::max(record.last_fetch, record.last_send),
|
|
|
+ cmp::max(record.0.last_fetch, record.0.last_send),
|
|
|
idx + 1,
|
|
|
);
|
|
|
|
|
|
- record.idx = u32::oselect(is_user_store, 0, record.get_idx(idx));
|
|
|
- record.map = (record.idx % self.num_submaps) as u8;
|
|
|
- record.last_send = idx;
|
|
|
+ record.0.idx = u32::oselect(is_user_store, 0, record.get_idx(idx));
|
|
|
+ record.0.map = (record.0.idx % (self.num_submaps as u32)) as u8;
|
|
|
+ record.0.last_send = idx;
|
|
|
|
|
|
if let Some(next_record) = user_store_iter.peek() {
|
|
|
- is_same_u = record.uid == next_record.uid;
|
|
|
+ is_same_u = record.0.uid == next_record.0.uid;
|
|
|
} else {
|
|
|
is_same_u = false;
|
|
|
}
|
|
|
- record.mark = u16::oselect(is_same_u, 0, 1);
|
|
|
+ record.0.mark = u16::oselect(is_same_u, 0, 1);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- fn construct_fetch_indices(&mut self) {
|
|
|
- let mut idx: u32 = 0;
|
|
|
- let mut is_same_u: bool;
|
|
|
-
|
|
|
- let mut user_store_iter = self.user_store.iter_mut().peekable();
|
|
|
- while let Some(record) = user_store_iter.next() {
|
|
|
- let is_user_store = record.is_user_store();
|
|
|
-
|
|
|
- idx = u32::oselect(is_user_store, record.last_fetch, idx + 1);
|
|
|
-
|
|
|
- record.idx = u32::oselect(is_user_store, 0, record.get_idx(idx));
|
|
|
- record.map = (record.idx % self.num_submaps) as u8;
|
|
|
- record.last_fetch = idx;
|
|
|
-
|
|
|
- if let Some(next_record) = user_store_iter.peek() {
|
|
|
- is_same_u = record.uid == next_record.uid;
|
|
|
- } else {
|
|
|
- is_same_u = false;
|
|
|
- }
|
|
|
- record.mark = u16::oselect(is_same_u, 0, 1);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- pub fn get_send_requests(&mut self, sends: Vec<Record>) -> Vec<Record> {
|
|
|
+ pub fn get_send_indices(&mut self, sends: Vec<IndexRecord>) -> Vec<IndexRecord> {
|
|
|
let num_requests = sends.len();
|
|
|
- self.update_with_sends(sends);
|
|
|
+ self.user_store.reserve(num_requests);
|
|
|
+ self.user_store.extend(sends);
|
|
|
|
|
|
- otils::sort(&mut self.user_store[..], self.num_threads);
|
|
|
- self.construct_send_indices();
|
|
|
+ self.user_store = otils::sort(std::mem::take(&mut self.user_store), self.num_threads);
|
|
|
+ self.propagate_send_indices();
|
|
|
|
|
|
otils::compact(
|
|
|
&mut self.user_store[..],
|
|
|
@@ -148,9 +149,10 @@ impl LoadBalancer {
|
|
|
|
|
|
otils::compact(
|
|
|
&mut self.user_store[..],
|
|
|
- |r| r.is_new_user_store(),
|
|
|
+ |r| r.is_updated_user_store(),
|
|
|
self.num_threads,
|
|
|
);
|
|
|
+
|
|
|
self.user_store.truncate(self.num_users as usize);
|
|
|
self.user_store.iter_mut().for_each(|r| {
|
|
|
r.set_user_store();
|
|
|
@@ -159,14 +161,61 @@ impl LoadBalancer {
|
|
|
requests
|
|
|
}
|
|
|
|
|
|
- pub fn get_fetch_requests(&mut self, fetches: Vec<Record>) -> Vec<Record> {
|
|
|
- let num_requests = fetches
|
|
|
- .iter()
|
|
|
- .fold(0, |acc, fetch| acc + fetch.data as usize);
|
|
|
+ pub fn batch_send(&mut self, sends: Vec<Record>) {
|
|
|
+ let sends = sends.into_iter().map(|r| IndexRecord(r)).collect();
|
|
|
+ let requests = self.get_send_indices(sends);
|
|
|
+ let submap_size = self.pad_size(requests.len() as f64);
|
|
|
+ let mut requests: Vec<Record> = self
|
|
|
+ .get_submap_requests(requests, submap_size, true)
|
|
|
+ .into_iter()
|
|
|
+ .map(|r| r.0)
|
|
|
+ .collect();
|
|
|
+
|
|
|
+ for idx in 0..self.num_submaps {
|
|
|
+ let batch: Vec<Record> = requests.drain(0..submap_size).collect();
|
|
|
+ self.submaps[idx].batch_send(batch);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ fn update_with_fetches(&mut self, fetches: Vec<IndexRecord>, num_fetches: usize) {
|
|
|
+ self.user_store.reserve(num_fetches);
|
|
|
+ for fetch in fetches.into_iter() {
|
|
|
+ self.user_store.extend(fetch.dummy_fetches());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ fn propagate_fetch_indices(&mut self) {
|
|
|
+ let mut idx: u32 = 0;
|
|
|
+ let mut is_same_u: bool;
|
|
|
+
|
|
|
+ let mut user_store_iter = self.user_store.iter_mut().peekable();
|
|
|
+ while let Some(record) = user_store_iter.next() {
|
|
|
+ let is_user_store = record.0.is_user_store();
|
|
|
+
|
|
|
+ idx = u32::oselect(is_user_store, record.0.last_fetch, idx + 1);
|
|
|
+
|
|
|
+ record.0.idx = u32::oselect(is_user_store, 0, record.get_idx(idx));
|
|
|
+ record.0.map = (record.0.idx % (self.num_submaps as u32)) as u8;
|
|
|
+ record.0.last_fetch = idx;
|
|
|
+
|
|
|
+ if let Some(next_record) = user_store_iter.peek() {
|
|
|
+ is_same_u = record.0.uid == next_record.0.uid;
|
|
|
+ } else {
|
|
|
+ is_same_u = false;
|
|
|
+ }
|
|
|
+ record.0.mark = u16::oselect(is_same_u, 0, 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ pub fn get_fetch_indices(
|
|
|
+ &mut self,
|
|
|
+ fetches: Vec<IndexRecord>,
|
|
|
+ num_requests: usize,
|
|
|
+ ) -> Vec<IndexRecord> {
|
|
|
self.update_with_fetches(fetches, num_requests);
|
|
|
|
|
|
- otils::sort(&mut self.user_store[..], self.num_threads);
|
|
|
- self.construct_fetch_indices();
|
|
|
+ self.user_store = otils::sort(std::mem::take(&mut self.user_store), self.num_threads);
|
|
|
+ self.propagate_fetch_indices();
|
|
|
|
|
|
otils::compact(
|
|
|
&mut self.user_store[..],
|
|
|
@@ -177,9 +226,10 @@ impl LoadBalancer {
|
|
|
|
|
|
otils::compact(
|
|
|
&mut self.user_store[..],
|
|
|
- |r| r.is_new_user_store(),
|
|
|
+ |r| r.is_updated_user_store(),
|
|
|
self.num_threads,
|
|
|
);
|
|
|
+
|
|
|
self.user_store.truncate(self.num_users as usize);
|
|
|
self.user_store.iter_mut().for_each(|r| {
|
|
|
r.set_user_store();
|
|
|
@@ -188,80 +238,32 @@ impl LoadBalancer {
|
|
|
deliver
|
|
|
}
|
|
|
|
|
|
- pub fn pad_for_submap(&self, requests: Vec<Record>, submap_size: usize) -> Vec<SubmapRequest> {
|
|
|
- let num_submaps = self.num_submaps as usize;
|
|
|
- let mut remaining = (requests.len() + num_submaps * submap_size).next_power_of_two();
|
|
|
- remaining -= requests.len();
|
|
|
-
|
|
|
- let mut requests: Vec<SubmapRequest> = requests.into_iter().map(|r| r.into()).collect();
|
|
|
- requests.reserve(remaining);
|
|
|
-
|
|
|
- for idx in 0..num_submaps {
|
|
|
- requests.extend(SubmapRequest::dummies(
|
|
|
- submap_size,
|
|
|
- idx as u32,
|
|
|
- self.num_submaps,
|
|
|
- ));
|
|
|
- }
|
|
|
- remaining -= num_submaps * submap_size;
|
|
|
- requests.extend((0..remaining).map(|_| Record::max().into()));
|
|
|
- requests
|
|
|
- }
|
|
|
-
|
|
|
- pub fn get_submap_requests(
|
|
|
- &self,
|
|
|
- requests: Vec<Record>,
|
|
|
- submap_size: usize,
|
|
|
- ) -> Vec<SubmapRequest> {
|
|
|
- let mut requests = self.pad_for_submap(requests, submap_size);
|
|
|
-
|
|
|
- otils::sort(&mut requests[..], self.num_threads); // sort by omap, then by dummy
|
|
|
-
|
|
|
- let mut prev_map = self.num_submaps;
|
|
|
- let mut remaining_marks = submap_size as i32;
|
|
|
- for request in requests.iter_mut() {
|
|
|
- let submap = request.value.map as u32;
|
|
|
- remaining_marks = i32::oselect(submap != prev_map, submap_size as i32, remaining_marks);
|
|
|
- request.value.mark = u16::oselect(remaining_marks > 0, 1, 0);
|
|
|
- remaining_marks += i32::oselect(remaining_marks > 0, -1, 0);
|
|
|
- prev_map = submap;
|
|
|
- }
|
|
|
-
|
|
|
- otils::compact(&mut requests[..], |r| r.value.mark == 1, self.num_threads);
|
|
|
- requests
|
|
|
- }
|
|
|
-
|
|
|
- pub fn batch_send(&mut self, sends: Vec<Record>) {
|
|
|
- let requests = self.get_send_requests(sends);
|
|
|
- let submap_size = self.pad_size(requests.len() as f64);
|
|
|
- let mut requests: Vec<Record> = self
|
|
|
- .get_submap_requests(requests, submap_size)
|
|
|
- .into_iter()
|
|
|
- .map(|r| r.value)
|
|
|
- .collect();
|
|
|
-
|
|
|
- for idx in 0..self.num_submaps {
|
|
|
- let batch = requests.drain(0..submap_size).collect();
|
|
|
- self.omaps[idx].batch_send(batch);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
pub fn batch_fetch(&mut self, fetches: Vec<Record>) -> Vec<Record> {
|
|
|
- let requests = self.get_fetch_requests(fetches);
|
|
|
+ let num_requests = fetches
|
|
|
+ .iter()
|
|
|
+ .fold(0, |acc, fetch| acc + fetch.data as usize);
|
|
|
+ let fetches = fetches.into_iter().map(|r| IndexRecord(r)).collect();
|
|
|
+ let requests = self.get_fetch_indices(fetches, num_requests);
|
|
|
+
|
|
|
let submap_size = self.pad_size(requests.len() as f64);
|
|
|
let mut requests: Vec<Record> = self
|
|
|
- .get_submap_requests(requests, submap_size)
|
|
|
+ .get_submap_requests(requests, submap_size, false)
|
|
|
.into_iter()
|
|
|
- .map(|r| r.value)
|
|
|
+ .map(|r| r.0)
|
|
|
.collect();
|
|
|
|
|
|
- let mut responses: Vec<Record> = Vec::new();
|
|
|
+ let mut responses: Vec<IndexRecord> = Vec::new();
|
|
|
responses.reserve(submap_size * self.num_submaps);
|
|
|
+
|
|
|
+ // parallelize
|
|
|
for idx in 0..self.num_submaps {
|
|
|
- let batch = requests.drain(0..submap_size).collect();
|
|
|
- responses.extend(self.omaps[idx].batch_fetch(batch));
|
|
|
+ let batch: Vec<Record> = requests.drain(0..submap_size).collect();
|
|
|
+ responses.extend(self.submaps[idx].batch_fetch(batch));
|
|
|
}
|
|
|
|
|
|
- responses
|
|
|
+ // this only really needs to be a shuffle
|
|
|
+ responses = otils::sort(responses, self.num_threads);
|
|
|
+ otils::compact(&mut responses, |r| r.0.is_send(), self.num_threads);
|
|
|
+ responses.drain(0..num_requests).map(|r| r.0).collect()
|
|
|
}
|
|
|
}
|