|
@@ -6,21 +6,19 @@ pub use request::Request;
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
pub struct Omq {
|
|
|
+ num_threads: usize,
|
|
|
message_store: Vec<Request>,
|
|
|
}
|
|
|
|
|
|
impl Omq {
|
|
|
- pub fn new() -> Self {
|
|
|
+ pub fn new(num_threads: usize) -> Self {
|
|
|
Omq {
|
|
|
+ num_threads,
|
|
|
message_store: Vec::new(),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
pub fn batch_send(&mut self, sends: Vec<Request>) {
|
|
|
- for s in sends.iter() {
|
|
|
- println!("{:?}", s);
|
|
|
- }
|
|
|
- println!();
|
|
|
self.message_store.reserve(sends.len());
|
|
|
self.message_store.extend(sends);
|
|
|
}
|
|
@@ -43,12 +41,7 @@ impl Omq {
|
|
|
let fetch_sum = fetches.iter().fold(0, |acc, f| acc + f.volume) as usize;
|
|
|
self.update_store(fetches, fetch_sum);
|
|
|
|
|
|
- self.message_store = otils::sort(std::mem::take(&mut self.message_store), 8);
|
|
|
- println!("sorted");
|
|
|
- for record in self.message_store.iter() {
|
|
|
- println!("{:?}", record);
|
|
|
- }
|
|
|
- println!();
|
|
|
+ self.message_store = otils::sort(std::mem::take(&mut self.message_store), self.num_threads);
|
|
|
|
|
|
let mut user_sum: isize = 0;
|
|
|
let mut prev_user: i32 = -1;
|
|
@@ -66,24 +59,20 @@ impl Omq {
|
|
|
isize::oselect(fetch_more, -1, 0),
|
|
|
);
|
|
|
}
|
|
|
- for record in self.message_store.iter() {
|
|
|
- println!("{:?}", record);
|
|
|
- }
|
|
|
- println!();
|
|
|
|
|
|
- otils::compact(&mut self.message_store[..], |r| r.should_deliver(), 8);
|
|
|
+ otils::compact(
|
|
|
+ &mut self.message_store[..],
|
|
|
+ |r| r.should_deliver(),
|
|
|
+ self.num_threads,
|
|
|
+ );
|
|
|
let deliver = self.message_store[0..fetch_sum].to_vec();
|
|
|
- for record in deliver.iter() {
|
|
|
- println!("{:?}", record);
|
|
|
- }
|
|
|
- println!();
|
|
|
|
|
|
- otils::compact(&mut self.message_store[..], |r| r.should_defer(), 8);
|
|
|
+ otils::compact(
|
|
|
+ &mut self.message_store[..],
|
|
|
+ |r| r.should_defer(),
|
|
|
+ self.num_threads,
|
|
|
+ );
|
|
|
self.message_store.truncate(final_size);
|
|
|
- for record in self.message_store.iter() {
|
|
|
- println!("{:?}", record);
|
|
|
- }
|
|
|
- println!();
|
|
|
|
|
|
deliver
|
|
|
}
|
|
@@ -98,7 +87,7 @@ mod tests {
|
|
|
|
|
|
#[bench]
|
|
|
fn bench_fetch(b: &mut Bencher) {
|
|
|
- let mut o = Omq::new();
|
|
|
+ let mut o = Omq::new(8);
|
|
|
|
|
|
let sends: Vec<Request> = (0..1048576)
|
|
|
.map(|x| Request::new_send(0, x.try_into().unwrap()))
|