Browse Source

implement Updater primitive, and use it once in the server

Justin Tracey 1 year ago
parent
commit
0c63ae0da2
3 changed files with 50 additions and 7 deletions
  1. 7 7
      src/bin/server.rs
  2. 2 0
      src/lib.rs
  3. 41 0
      src/updater.rs

+ 7 - 7
src/bin/server.rs

@@ -1,4 +1,4 @@
-use mgen::{log, parse_identifier, SerializedMessage};
+use mgen::{log, parse_identifier, updater::Updater, SerializedMessage};
 use std::collections::HashMap;
 use std::error::Error;
 use std::result::Result;
@@ -24,8 +24,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
         ID,
         mpsc::UnboundedSender<Arc<SerializedMessage>>,
     >::new()));
-    // FIXME: this should probably be a Notify + Mutex
-    let mut writer_db = HashMap::<ID, mpsc::Sender<(OwnedWriteHalf, watch::Sender<bool>)>>::new();
+
+    let mut writer_db = HashMap::<ID, Updater<(OwnedWriteHalf, watch::Sender<bool>)>>::new();
 
     loop {
         let (socket, _) = listener.accept().await?;
@@ -41,7 +41,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
             spawn_message_receiver(rd, snd_db.clone(), watch_rcv);
 
             // give the writer thread the new write half of the socket and watch
-            socket_updater.send((wr, watch_snd)).await?;
+            socket_updater.send((wr, watch_snd)).await;
         } else {
             // newly-registered client
             log!("New client");
@@ -59,8 +59,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
             let (watch_snd, watch_rcv) = watch::channel(false);
 
             // socket updater, used to give the sender thread a new socket and watch
-            let (socket_updater_snd, socket_updater_rcv) = mpsc::channel(8);
-            socket_updater_snd.send((wr, watch_snd)).await?;
+            let (socket_updater_snd, socket_updater_rcv) = Updater::channel();
+            socket_updater_snd.send((wr, watch_snd)).await;
 
             writer_db.insert(id.clone(), socket_updater_snd);
 
@@ -147,7 +147,7 @@ async fn get_messages(
 /// and sending them out on the associated socket.
 async fn send_messages(
     mut msg_rcv: mpsc::UnboundedReceiver<Arc<SerializedMessage>>,
-    mut socket_updater: mpsc::Receiver<(OwnedWriteHalf, watch::Sender<bool>)>,
+    mut socket_updater: Updater<(OwnedWriteHalf, watch::Sender<bool>)>,
 ) {
     let (mut current_socket, mut current_watch) =
         socket_updater.recv().await.expect("socket updater closed");

+ 2 - 0
src/lib.rs

@@ -2,6 +2,8 @@ use std::mem::size_of;
 use std::num::NonZeroU32;
 use tokio::io::{copy, sink, AsyncReadExt, AsyncWriteExt};
 
+pub mod updater;
+
 /// The padding interval. All message bodies are a size of some multiple of this.
 /// All messages bodies are a minimum  of this size.
 // FIXME: double check what this should be

+ 41 - 0
src/updater.rs

@@ -0,0 +1,41 @@
+use std::sync::{Arc, Mutex};
+use tokio::sync::Notify;
+
+pub struct Updater<T>(Arc<(Mutex<Option<T>>, Notify)>);
+
+impl<T> Updater<T> {
+    pub async fn send(&self, value: T) {
+        let mut locked_object = self.0 .0.lock().expect("send failed to lock mutex");
+        *locked_object = Some(value);
+        self.0 .1.notify_one();
+    }
+
+    pub async fn recv(&mut self) -> Option<T> {
+        self.0 .1.notified().await;
+        {
+            let mut locked_object = self.0 .0.lock().unwrap();
+            if locked_object.is_some() {
+                return locked_object.take();
+            }
+        }
+
+        // We must have gotten the last value from a stale notification:
+        //       ...
+        //                send.object.update
+        //                send.notify
+        // recv.notified
+        //                send.object.update
+        //                send.notify
+        // recv.object.update
+        //       ...
+        // recv.notified <- notified but no new object
+        // Waiting one more time should do the trick.
+        self.0 .1.notified().await;
+        self.0 .0.lock().unwrap().take()
+    }
+
+    pub fn channel() -> (Self, Self) {
+        let body = Arc::new((Mutex::new(None), Notify::new()));
+        (Updater(body.clone()), Updater(body))
+    }
+}