Browse Source

communicator: proper shutdown procedure

Lennart Braun 2 years ago
parent
commit
4bd90703ff
2 changed files with 28 additions and 10 deletions
  1. 27 9
      communicator/src/communicator.rs
  2. 1 1
      communicator/src/lib.rs

+ 27 - 9
communicator/src/communicator.rs

@@ -26,20 +26,24 @@ impl<T: Serializable> Fut<T> for MyFut<T> {
 }
 
 /// Thread to receive messages in the background.
-#[derive(Clone, Debug)]
+#[derive(Debug)]
 struct ReceiverThread {
     data_request_tx: Sender<Box<dyn FnOnce(&mut dyn Read) + Send>>,
+    join_handle: thread::JoinHandle<()>,
 }
 
 impl ReceiverThread {
     pub fn from_reader<R: Debug + Read + Send + 'static>(mut reader: R) -> Self {
         let (data_request_tx, data_request_rx) = channel::<Box<dyn FnOnce(&mut dyn Read) + Send>>();
-        let _join_handle = thread::spawn(move || {
+        let join_handle = thread::spawn(move || {
             for func in data_request_rx.iter() {
                 func(&mut reader);
             }
         });
-        Self { data_request_tx }
+        Self {
+            data_request_tx,
+            join_handle,
+        }
     }
 
     pub fn receive<T: Serializable>(&mut self) -> Result<MyFut<T>, Error> {
@@ -53,25 +57,34 @@ impl ReceiverThread {
             }))?;
         Ok(MyFut::new(data_rx.into()))
     }
+
+    pub fn join(self) {
+        drop(self.data_request_tx);
+        self.join_handle.join().expect("join failed")
+    }
 }
 
 /// Thread to send messages in the background.
-#[derive(Clone, Debug)]
+#[derive(Debug)]
 struct SenderThread {
     data_submission_tx: Sender<Box<dyn FnOnce(&mut dyn Write) + Send>>,
+    join_handle: thread::JoinHandle<()>,
 }
 
 impl SenderThread {
     pub fn from_writer<W: Debug + Write + Send + 'static>(mut writer: W) -> Self {
         let (data_submission_tx, data_submission_rx) =
             channel::<Box<dyn FnOnce(&mut dyn Write) + Send>>();
-        let _join_handle = thread::spawn(move || {
+        let join_handle = thread::spawn(move || {
             for func in data_submission_rx.iter() {
                 func(&mut writer);
             }
             writer.flush().expect("flush failed");
         });
-        Self { data_submission_tx }
+        Self {
+            data_submission_tx,
+            join_handle,
+        }
     }
 
     pub fn send<T: Serializable>(&mut self, data: T) -> Result<(), Error> {
@@ -82,10 +95,15 @@ impl SenderThread {
             }))?;
         Ok(())
     }
+
+    pub fn join(self) {
+        drop(self.data_submission_tx);
+        self.join_handle.join().expect("join failed")
+    }
 }
 
 /// Communicator that uses background threads to send and receive messages.
-#[derive(Clone, Debug)]
+#[derive(Debug)]
 pub struct Communicator {
     num_parties: usize,
     my_id: usize,
@@ -165,7 +183,7 @@ impl AbstractCommunicator for Communicator {
     }
 
     fn shutdown(&mut self) {
-        self.sender_threads.drain();
-        self.receiver_threads.drain();
+        self.sender_threads.drain().for_each(|(_, t)| t.join());
+        self.receiver_threads.drain().for_each(|(_, t)| t.join());
     }
 }

+ 1 - 1
communicator/src/lib.rs

@@ -17,7 +17,7 @@ pub trait Fut<T> {
 }
 
 /// Abstract communication interface between multiple parties
-pub trait AbstractCommunicator: Clone {
+pub trait AbstractCommunicator {
     type Fut<T: Serializable>: Fut<T>;
 
     /// How many parties N there are in total