|
|
@@ -2,7 +2,7 @@ use crate::omap::ObliviousMap;
|
|
|
pub use crate::record::{IndexRecord, Record, RecordType, SubmapRecord};
|
|
|
use fastapprox::fast;
|
|
|
use otils::{self, ObliviousOps};
|
|
|
-use std::{cmp, f64::consts::E};
|
|
|
+use std::{cmp, f64::consts::E, thread};
|
|
|
|
|
|
const LAMBDA: usize = 128;
|
|
|
|
|
|
@@ -41,7 +41,7 @@ impl LoadBalancer {
|
|
|
|
|
|
LoadBalancer {
|
|
|
num_users,
|
|
|
- num_threads,
|
|
|
+ num_threads: num_threads / num_submaps,
|
|
|
num_submaps,
|
|
|
user_store,
|
|
|
submaps,
|
|
|
@@ -171,10 +171,29 @@ impl LoadBalancer {
|
|
|
.map(|r| r.0)
|
|
|
.collect();
|
|
|
|
|
|
- for idx in 0..self.num_submaps {
|
|
|
- let batch: Vec<Record> = requests.drain(0..submap_size).collect();
|
|
|
- self.submaps[idx].batch_send(batch);
|
|
|
- }
|
|
|
+ let mut remaining_submaps = &mut self.submaps[..];
|
|
|
+ let mut remaining_requests = &mut requests[..self.num_submaps * submap_size];
|
|
|
+
|
|
|
+ thread::scope(|s| {
|
|
|
+ for _ in 0..self.num_submaps - 1 {
|
|
|
+ let (submap, rest_submaps) = remaining_submaps.split_at_mut(1);
|
|
|
+ remaining_submaps = rest_submaps;
|
|
|
+
|
|
|
+ let (requests, rest_requests) = remaining_requests.split_at_mut(submap_size);
|
|
|
+ remaining_requests = rest_requests;
|
|
|
+ s.spawn(|| submap[0].batch_send(requests.to_vec()));
|
|
|
+ }
|
|
|
+
|
|
|
+ let (submap, rest_submaps) = remaining_submaps.split_at_mut(1);
|
|
|
+ remaining_submaps = rest_submaps;
|
|
|
+
|
|
|
+ let (requests, rest_requests) = remaining_requests.split_at_mut(submap_size);
|
|
|
+ remaining_requests = rest_requests;
|
|
|
+ submap[0].batch_send(requests.to_vec());
|
|
|
+ });
|
|
|
+
|
|
|
+ // parallelize
|
|
|
+ for _ in 0..self.num_submaps {}
|
|
|
}
|
|
|
|
|
|
fn update_with_fetches(&mut self, fetches: Vec<IndexRecord>, num_fetches: usize) {
|
|
|
@@ -252,14 +271,39 @@ impl LoadBalancer {
|
|
|
.map(|r| r.0)
|
|
|
.collect();
|
|
|
|
|
|
- let mut responses: Vec<IndexRecord> = Vec::new();
|
|
|
- responses.reserve(submap_size * self.num_submaps);
|
|
|
+ let mut responses: Vec<IndexRecord> = Vec::with_capacity(submap_size * self.num_submaps);
|
|
|
+ (0..submap_size * self.num_submaps)
|
|
|
+ .for_each(|_| responses.push(IndexRecord::new(0, RecordType::Dummy)));
|
|
|
|
|
|
- // parallelize
|
|
|
- for idx in 0..self.num_submaps {
|
|
|
- let batch: Vec<Record> = requests.drain(0..submap_size).collect();
|
|
|
- responses.extend(self.submaps[idx].batch_fetch(batch));
|
|
|
- }
|
|
|
+ let mut remaining_submaps = &mut self.submaps[..];
|
|
|
+ let mut remaining_requests = &mut requests[..self.num_submaps * submap_size];
|
|
|
+ let mut remaining_responses = &mut responses[..];
|
|
|
+
|
|
|
+ thread::scope(|s| {
|
|
|
+ for _ in 0..self.num_submaps - 1 {
|
|
|
+ let (submap, rest_submaps) = remaining_submaps.split_at_mut(1);
|
|
|
+ remaining_submaps = rest_submaps;
|
|
|
+ // println!("submaps: {}", remaining_submaps.len());
|
|
|
+
|
|
|
+ let (batch, rest_requests) = remaining_requests.split_at_mut(submap_size);
|
|
|
+ remaining_requests = rest_requests;
|
|
|
+ // println!("");
|
|
|
+
|
|
|
+ let (responses, rest_responses) = remaining_responses.split_at_mut(submap_size);
|
|
|
+ remaining_responses = rest_responses;
|
|
|
+ s.spawn(|| submap[0].batch_fetch(batch.to_vec(), responses));
|
|
|
+ }
|
|
|
+
|
|
|
+ let (submap, rest_submaps) = remaining_submaps.split_at_mut(1);
|
|
|
+ remaining_submaps = rest_submaps;
|
|
|
+
|
|
|
+ let (batch, rest_requests) = remaining_requests.split_at_mut(submap_size);
|
|
|
+ remaining_requests = rest_requests;
|
|
|
+
|
|
|
+ let (responses, rest_responses) = remaining_responses.split_at_mut(submap_size);
|
|
|
+ remaining_responses = rest_responses;
|
|
|
+ submap[0].batch_fetch(batch.to_vec(), responses);
|
|
|
+ });
|
|
|
|
|
|
// this only really needs to be a shuffle
|
|
|
responses = otils::sort(responses, self.num_threads);
|