Browse Source

improve Updater interface

Justin Tracey 1 year ago
parent
commit
e979307ee8
4 changed files with 29 additions and 14 deletions
  1. 5 2
      src/bin/client.rs
  2. 6 4
      src/bin/peer.rs
  3. 2 1
      src/bin/server.rs
  4. 16 7
      src/updater.rs

+ 5 - 2
src/bin/client.rs

@@ -172,8 +172,10 @@ async fn manage_conversation(config: Config) -> Result<(), MessengerError> {
 
     let (reader_to_state, mut state_from_reader) = mpsc::unbounded_channel();
     let (state_to_writer, writer_from_state) = mpsc::unbounded_channel();
-    let (read_socket_updater_in, read_socket_updater_out) = Updater::channel();
-    let (write_socket_updater_in, write_socket_updater_out) = Updater::channel();
+    let read_socket_updater_in = Updater::new();
+    let read_socket_updater_out = read_socket_updater_in.clone();
+    let write_socket_updater_in = Updater::new();
+    let write_socket_updater_out = write_socket_updater_in.clone();
     let (errs_in, errs_out) = mpsc::unbounded_channel();
     tokio::spawn(reader(
         reader_to_state,
@@ -247,6 +249,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             tokio::spawn(manage_conversation(config));
         handles.push(handle);
     }
+    handles.shrink_to_fit();
     for handle in handles {
         handle.await??;
     }

+ 6 - 4
src/bin/peer.rs

@@ -31,7 +31,7 @@ type WriterFromState = mpsc::UnboundedReceiver<Arc<SerializedMessage>>;
 /// Type for sending messages from the state thread to the writer thread.
 type MessageHolder = Arc<SerializedMessage>;
 /// Type for sending the updated read half of the socket.
-type ReadSocketUpdaterIn = Arc<Updater<OwnedReadHalf>>;
+type ReadSocketUpdaterIn = Updater<OwnedReadHalf>;
 /// Type for getting the updated read half of the socket.
 type ReadSocketUpdaterOut = Updater<OwnedReadHalf>;
 /// Type for sending the updated write half of the socket.
@@ -412,9 +412,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             HashMap::new();
 
         for (recipient, for_io) in recipient_map.drain() {
-            let (listener_writer_to_reader, reader_from_listener_writer) = Updater::channel();
-            let listener_writer_to_reader = Arc::new(listener_writer_to_reader);
-            let (listener_to_writer, writer_from_listener) = Updater::channel();
+            let listener_writer_to_reader = Updater::new();
+            let reader_from_listener_writer = listener_writer_to_reader.clone();
+            let listener_to_writer = Updater::new();
+            let writer_from_listener = listener_to_writer.clone();
             name_to_io_threads.insert(
                 recipient.to_string(),
                 (listener_writer_to_reader.clone(), listener_to_writer),
@@ -438,6 +439,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
         handles.push(handle);
     }
 
+    handles.shrink_to_fit();
     for handle in handles {
         handle.await??;
     }

+ 2 - 1
src/bin/server.rs

@@ -80,7 +80,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
             // (we can't use the existing notify channel, else we get race conditions where
             // the reader thread terminates and spawns again before the sender thread
             // notices and activates its existing notify channel)
-            let (socket_updater_snd, socket_updater_rcv) = Updater::channel();
+            let socket_updater_snd = Updater::new();
+            let socket_updater_rcv = socket_updater_snd.clone();
             socket_updater_snd.send((wr, notify.clone()));
 
             writer_db.insert(id.clone(), socket_updater_snd);

+ 16 - 7
src/updater.rs

@@ -1,9 +1,12 @@
 use std::sync::{Arc, Mutex};
 use tokio::sync::Notify;
 
-/// A multi/single-producer, single-consumer channel for updating an object.
+/// A channel for updating an object.
 /// Unlike a mpsc, there is no queue of objects, only the most recent can be obtained.
 /// Unlike a watch, the receiver owns the object received.
+/// Any copy of the owner (created via clone) can send or receive objects,
+/// but only one copy will receive any particular object.
+#[derive(Default)]
 pub struct Updater<T>(Arc<(Mutex<Option<T>>, Notify)>);
 
 impl<T> Updater<T> {
@@ -17,6 +20,10 @@ impl<T> Updater<T> {
     /// Get the object most recently sent by the sender end.
     pub async fn recv(&mut self) -> T {
         // According to a dev on GH, tokio's Notify is allowed false notifications.
+        // This is conceptually better suited for a condvar, but the only async
+        // implementations aren't cancellation safe.
+        // Precondition: the only way for the object to be updated is to notify,
+        // and no receiver consumes a notify without consuming the object as well.
         loop {
             self.0 .1.notified().await;
             {
@@ -34,11 +41,13 @@ impl<T> Updater<T> {
         locked_object.take()
     }
 
-    /// Create an Updater channel.
-    /// Currently there is no distinction between sender and receiver updaters,
-    /// it's the caller's job to decide which is which.
-    pub fn channel() -> (Self, Self) {
-        let body = Arc::new((Mutex::new(None), Notify::new()));
-        (Updater(body.clone()), Updater(body))
+    pub fn new() -> Self {
+        Updater(Arc::new((Mutex::new(None), Notify::new())))
+    }
+}
+
+impl<T> Clone for Updater<T> {
+    fn clone(&self) -> Self {
+        Updater(Arc::clone(&self.0))
     }
 }