Browse Source

communicator: revise again

Lennart Braun 2 years ago
parent
commit
3429f5e0ba
1 changed files with 18 additions and 65 deletions
  1. 18 65
      communicator/src/communicator.rs

+ 18 - 65
communicator/src/communicator.rs

@@ -4,50 +4,19 @@ use std::collections::HashMap;
 use std::fmt::Debug;
 use std::io::{BufReader, BufWriter, Read, Write};
 use std::marker::PhantomData;
-use std::mem;
-use std::sync::mpsc::{channel, Sender};
-use std::sync::{Arc, Condvar, Mutex};
+use std::sync::mpsc::{channel, Receiver, Sender};
+use std::sync::{Arc, Mutex};
 use std::thread;
 
-struct SharedState {
-    pub mutex: Mutex<Option<Vec<u8>>>,
-    pub cvar: Condvar,
-}
-
-impl SharedState {
-    pub fn new() -> Self {
-        Self {
-            mutex: Mutex::new(None),
-            cvar: Condvar::new(),
-        }
-    }
-
-    pub fn put(&self, data: Vec<u8>) {
-        let mut lock = self.mutex.lock().unwrap();
-        *lock = Some(data);
-        self.cvar.notify_one();
-    }
-
-    pub fn get(&self) -> Vec<u8> {
-        let mut val = None;
-        let mut lock = self
-            .cvar
-            .wait_while(self.mutex.lock().unwrap(), |x| x.is_none())
-            .unwrap();
-        mem::swap(&mut val, &mut lock);
-        val.unwrap()
-    }
-}
-
 pub struct MyFut<T: Serializable> {
-    shared_state: Arc<SharedState>,
+    buf_rx: Arc<Mutex<Receiver<Vec<u8>>>>,
     _phantom: PhantomData<T>,
 }
 
 impl<T: Serializable> MyFut<T> {
-    fn new(shared_state: Arc<SharedState>) -> Self {
+    fn new(buf_rx: Arc<Mutex<Receiver<Vec<u8>>>>) -> Self {
         Self {
-            shared_state,
+            buf_rx,
             _phantom: PhantomData,
         }
     }
@@ -55,7 +24,7 @@ impl<T: Serializable> MyFut<T> {
 
 impl<T: Serializable> Fut<T> for MyFut<T> {
     fn get(self) -> Result<T, Error> {
-        let buf = self.shared_state.get();
+        let buf = self.buf_rx.lock().unwrap().recv()?;
         let (data, size) = bincode::decode_from_slice(&buf, bincode::config::standard())?;
         assert_eq!(size, buf.len());
         Ok(data)
@@ -65,18 +34,17 @@ impl<T: Serializable> Fut<T> for MyFut<T> {
 /// Thread to receive messages in the background.
 #[derive(Debug)]
 struct ReceiverThread {
-    promise_tx: Sender<Arc<SharedState>>,
-    join_handle_1: thread::JoinHandle<Result<(), Error>>,
-    join_handle_2: thread::JoinHandle<Result<(), Error>>,
+    buf_rx: Arc<Mutex<Receiver<Vec<u8>>>>,
+    join_handle: thread::JoinHandle<Result<(), Error>>,
 }
 
 impl ReceiverThread {
     pub fn from_reader<R: Debug + Read + Send + 'static>(reader: R) -> Self {
-        let mut reader = BufReader::new(reader);
-        let (promise_tx, promise_rx) = channel::<Arc<SharedState>>();
+        let mut reader = BufReader::with_capacity(1 << 16, reader);
         let (buf_tx, buf_rx) = channel::<Vec<u8>>();
-        let join_handle_1 = thread::Builder::new()
-            .name("Receiver-1".to_owned())
+        let buf_rx = Arc::new(Mutex::new(buf_rx));
+        let join_handle = thread::Builder::new()
+            .name("Receiver".to_owned())
             .spawn(move || {
                 loop {
                     let mut msg_size = [0u8; 4];
@@ -94,34 +62,19 @@ impl ReceiverThread {
                 }
             })
             .unwrap();
-        let join_handle_2 = thread::Builder::new()
-            .name("Receiver-2".to_owned())
-            .spawn(move || {
-                for shared_state in promise_rx {
-                    let buf = buf_rx.recv()?;
-                    shared_state.put(buf);
-                }
-                Ok(())
-            })
-            .unwrap();
         Self {
-            promise_tx,
-            join_handle_1,
-            join_handle_2,
+            join_handle,
+            buf_rx,
         }
     }
 
     pub fn receive<T: Serializable>(&mut self) -> Result<MyFut<T>, Error> {
-        let shared_state_promise = Arc::new(SharedState::new());
-        let shared_state_future = shared_state_promise.clone();
-        self.promise_tx.send(shared_state_promise)?;
-        Ok(MyFut::new(shared_state_future))
+        Ok(MyFut::new(self.buf_rx.clone()))
     }
 
     pub fn join(self) -> Result<(), Error> {
-        drop(self.promise_tx);
-        self.join_handle_1.join().expect("join failed")?;
-        self.join_handle_2.join().expect("join failed")?;
+        drop(self.buf_rx);
+        self.join_handle.join().expect("join failed")?;
         Ok(())
     }
 }
@@ -135,7 +88,7 @@ struct SenderThread {
 
 impl SenderThread {
     pub fn from_writer<W: Debug + Write + Send + 'static>(writer: W) -> Self {
-        let mut writer = BufWriter::with_capacity(1 << 15, writer);
+        let mut writer = BufWriter::with_capacity(1 << 16, writer);
         let (buf_tx, buf_rx) = channel::<Vec<u8>>();
         let join_handle = thread::Builder::new()
             .name("Sender-1".to_owned())