lib.rs 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. #![feature(test)]
  2. mod request;
  3. use otils::ObliviousOps;
  4. pub use request::Request;
  5. #[derive(Debug)]
  6. pub struct Omq {
  7. message_store: Vec<Request>,
  8. }
  9. impl Omq {
  10. pub fn new() -> Self {
  11. Omq {
  12. message_store: Vec::new(),
  13. }
  14. }
  15. pub fn batch_send(&mut self, sends: Vec<Request>) {
  16. for s in sends.iter() {
  17. println!("{:?}", s);
  18. }
  19. println!();
  20. self.message_store.reserve(sends.len());
  21. self.message_store.extend(sends);
  22. }
  23. fn update_store(&mut self, fetches: Vec<Request>, fetch_sum: usize) {
  24. let size = self.message_store.len() + fetches.len() + fetch_sum;
  25. self.message_store.reserve(size - self.message_store.len());
  26. for fetch in fetches.iter() {
  27. self.message_store
  28. .extend(Request::dummies(fetch.uid, fetch.volume));
  29. }
  30. self.message_store.extend(fetches);
  31. }
  32. pub fn batch_fetch(&mut self, fetches: Vec<Request>) -> Vec<Request> {
  33. let final_size = self.message_store.len();
  34. let fetch_sum = fetches.iter().fold(0, |acc, f| acc + f.volume) as usize;
  35. self.update_store(fetches, fetch_sum);
  36. self.message_store = otils::sort(std::mem::take(&mut self.message_store), 8);
  37. println!("sorted");
  38. for record in self.message_store.iter() {
  39. println!("{:?}", record);
  40. }
  41. println!();
  42. let mut user_sum: isize = 0;
  43. let mut prev_user: i32 = -1;
  44. for request in self.message_store.iter_mut() {
  45. let same_user = prev_user == request.uid;
  46. user_sum = isize::oselect(same_user, user_sum, 0);
  47. let fetch_more = user_sum > 0;
  48. request.mark = u16::oselect(request.is_fetch(), 0, u16::oselect(fetch_more, 1, 0));
  49. prev_user = request.uid;
  50. user_sum += isize::oselect(
  51. request.is_fetch(),
  52. request.volume as isize,
  53. isize::oselect(fetch_more, -1, 0),
  54. );
  55. }
  56. for record in self.message_store.iter() {
  57. println!("{:?}", record);
  58. }
  59. println!();
  60. otils::compact(&mut self.message_store[..], |r| r.should_deliver(), 8);
  61. let deliver = self.message_store[0..fetch_sum].to_vec();
  62. for record in deliver.iter() {
  63. println!("{:?}", record);
  64. }
  65. println!();
  66. otils::compact(&mut self.message_store[..], |r| r.should_defer(), 8);
  67. self.message_store.truncate(final_size);
  68. for record in self.message_store.iter() {
  69. println!("{:?}", record);
  70. }
  71. println!();
  72. deliver
  73. }
  74. }
  75. #[cfg(test)]
  76. mod tests {
  77. use super::*;
  78. extern crate test;
  79. use test::Bencher;
  80. #[bench]
  81. fn bench_fetch(b: &mut Bencher) {
  82. let mut o = Omq::new();
  83. let sends: Vec<Request> = (0..1048576)
  84. .map(|x| Request::new_send(0, x.try_into().unwrap()))
  85. .collect();
  86. o.batch_send(sends);
  87. // b.iter(|| 1 + 1);
  88. b.iter(|| o.batch_fetch(vec![Request::new_fetch(0, 1048575)]));
  89. }
  90. }