updater.rs 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. use std::sync::{Arc, Mutex};
  2. use tokio::sync::Notify;
  3. /// A channel for updating an object.
  4. /// Unlike a mpsc, there is no queue of objects, only the most recent can be obtained.
  5. /// Unlike a watch, the receiver owns the object received.
  6. /// Any copy of the owner (created via clone) can send or receive objects,
  7. /// but only one copy will receive any particular object.
  8. #[derive(Default)]
  9. pub struct Updater<T: Send + Sync>(Arc<(Mutex<Option<T>>, Notify)>);
  10. impl<T: Send + Sync> Updater<T> {
  11. /// Send an object T to the receiver end, repacing any currently queued object.
  12. pub fn send(&self, value: T) {
  13. let mut locked_object = self.0 .0.lock().expect("send failed to lock mutex");
  14. *locked_object = Some(value);
  15. self.0 .1.notify_one();
  16. }
  17. /// Get the object most recently sent by the sender end.
  18. pub async fn recv(&mut self) -> T {
  19. // According to a dev on GH, tokio's Notify is allowed false notifications.
  20. // This is conceptually better suited for a condvar, but the only async
  21. // implementations aren't cancellation safe.
  22. // Precondition: the only way for the object to be updated is to notify,
  23. // and no receiver consumes a notify without consuming the object as well.
  24. loop {
  25. self.0 .1.notified().await;
  26. {
  27. let mut locked_object = self.0 .0.lock().unwrap();
  28. if locked_object.is_some() {
  29. return locked_object.take().unwrap();
  30. }
  31. }
  32. }
  33. }
  34. /// Get the object most recently sent by the sender end, if one is already available.
  35. pub fn maybe_recv(&mut self) -> Option<T> {
  36. let mut locked_object = self.0 .0.lock().unwrap();
  37. locked_object.take()
  38. }
  39. pub fn new() -> Self {
  40. Updater(Arc::new((Mutex::new(None), Notify::new())))
  41. }
  42. }
  43. impl<T: Send + Sync> Clone for Updater<T> {
  44. fn clone(&self) -> Self {
  45. Updater(Arc::clone(&self.0))
  46. }
  47. }