| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404 |
- // The state machine used to represent one end of a conversation.
- // This includes inducing transitions and actions taken during transitions,
- // so a lot of the messenger network code is here.
- use mgen::{log, MessageHeader, SerializedMessage};
- use rand_distr::Distribution;
- use rand_xoshiro::Xoshiro256PlusPlus;
- use tokio::io::AsyncReadExt;
- use tokio::net::TcpStream;
- use tokio::sync::mpsc;
- use tokio::time::Instant;
- use std::collections::HashMap;
- use std::sync::Arc;
- use crate::messenger::dists::Distributions;
- use crate::messenger::error::MessengerError;
- use crate::messenger::message::{construct_message, construct_receipt};
- /// All possible Conversation state machine states
- pub enum StateMachine {
- Idle(Conversation<Idle>),
- Active(Conversation<Active>),
- }
- impl StateMachine {
- pub fn start(dists: Distributions, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
- Self::Idle(Conversation::<Idle>::start(dists, rng))
- }
- }
- /// The state machine representing a conversation state and its transitions.
- pub struct Conversation<S: State> {
- dists: Distributions,
- delay: Instant,
- state: S,
- }
- pub trait State {
- fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine
- where
- Self: Sized;
- fn received(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine
- where
- Self: Sized;
- fn to_machine(conversation: Conversation<Self>) -> StateMachine
- where
- Self: Sized;
- }
- pub struct Idle {}
- pub struct Active {
- wait: Instant,
- }
- impl State for Idle {
- fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
- if conversation.dists.s.sample(rng) {
- log!("Idle: [sent] tranisition to [Active]");
- let delay = Instant::now() + conversation.dists.a_s.sample_secs(rng);
- let wait = Instant::now() + conversation.dists.w.sample_secs(rng);
- StateMachine::Active({
- Conversation::<Active> {
- dists: conversation.dists,
- delay,
- state: Active { wait },
- }
- })
- } else {
- log!("Idle: [sent] tranisition to [Idle]");
- let delay = Instant::now() + conversation.dists.i.sample_secs(rng);
- StateMachine::Idle({
- Conversation::<Idle> {
- dists: conversation.dists,
- delay,
- state: Idle {},
- }
- })
- }
- }
- fn received(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
- if conversation.dists.r.sample(rng) {
- log!("Idle: [recv'd] tranisition to [Active]");
- let wait = Instant::now() + conversation.dists.w.sample_secs(rng);
- let delay = Instant::now() + conversation.dists.a_r.sample_secs(rng);
- StateMachine::Active({
- Conversation::<Active> {
- dists: conversation.dists,
- delay,
- state: Active { wait },
- }
- })
- } else {
- log!("Idle: [recv'd] tranisition to [Idle]");
- StateMachine::Idle(conversation)
- }
- }
- fn to_machine(conversation: Conversation<Self>) -> StateMachine {
- StateMachine::Idle(conversation)
- }
- }
- impl State for Active {
- fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
- log!("Active: [sent] transition to [Active]");
- let delay = Instant::now() + conversation.dists.a_s.sample_secs(rng);
- StateMachine::Active(Conversation::<Active> {
- dists: conversation.dists,
- delay,
- state: conversation.state,
- })
- }
- fn received(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
- log!("Active: [recv'd] transition to [Active]");
- let delay = Instant::now() + conversation.dists.a_r.sample_secs(rng);
- StateMachine::Active(Conversation::<Active> {
- dists: conversation.dists,
- delay,
- state: conversation.state,
- })
- }
- fn to_machine(conversation: Conversation<Self>) -> StateMachine {
- StateMachine::Active(conversation)
- }
- }
- impl Conversation<Idle> {
- fn start(dists: Distributions, rng: &mut Xoshiro256PlusPlus) -> Self {
- let delay = Instant::now() + dists.i.sample_secs(rng);
- log!("[start]");
- Self {
- dists,
- delay,
- state: Idle {},
- }
- }
- }
- impl Conversation<Active> {
- fn waited(self, rng: &mut Xoshiro256PlusPlus) -> Conversation<Idle> {
- log!("Active: [waited] tranision to [Idle]");
- let delay = Instant::now() + self.dists.i.sample_secs(rng);
- Conversation::<Idle> {
- dists: self.dists,
- delay,
- state: Idle {},
- }
- }
- async fn sleep(delay: Instant, wait: Instant) -> ActiveGroupActions {
- if delay < wait {
- log!("delaying for {:?}", delay - Instant::now());
- tokio::time::sleep_until(delay).await;
- ActiveGroupActions::Send
- } else {
- log!("waiting for {:?}", wait - Instant::now());
- tokio::time::sleep_until(wait).await;
- ActiveGroupActions::Idle
- }
- }
- }
- /// Attempt to read some portion of the header size from the stream.
- /// The number of bytes written is returned in the Ok case.
- /// The caller must read any remaining bytes less than 4.
- // N.B.: This must be written cancellation safe!
- // https://docs.rs/tokio/1.26.0/tokio/macro.select.html#cancellation-safety
- async fn read_header_size(
- stream: &mut TcpStream,
- header_size: &mut [u8; 4],
- ) -> Result<usize, MessengerError> {
- let read = stream.read(header_size).await?;
- if read == 0 {
- Err(tokio::io::Error::new(
- tokio::io::ErrorKind::WriteZero,
- "failed to read any bytes from message with bytes remaining",
- )
- .into())
- } else {
- Ok(read)
- }
- }
- async fn read_message(
- stream: &mut TcpStream,
- n: usize,
- mut header_size: [u8; 4],
- ) -> Result<MessageHeader, MessengerError> {
- if n < 4 {
- // we didn't get the whole size, but we can use read_exact now
- stream.read_exact(&mut header_size[n..]).await?;
- }
- let (header, _) = mgen::get_message_with_header_size(stream, header_size).await?;
- Ok(header)
- }
- async fn send_action<'a, T: State, I: Iterator<Item = &'a mut mpsc::UnboundedSender<Arc<SerializedMessage>>>> (
- conversation: Conversation<T>,
- streams: I,
- our_id: &str,
- recipients: Vec<&str>,
- rng: &mut Xoshiro256PlusPlus,
- ) -> StateMachine {
- let size = conversation.dists.m.sample(rng);
- log!(
- "sending message from {} to {:?} of size {}",
- our_id,
- recipients,
- size
- );
- let m = Arc::new(construct_message(
- our_id.to_string(),
- recipients.iter().map(|s| s.to_string()).collect(),
- size,
- ));
- for stream in streams {
- stream.send(m.clone()).expect("Internal stream closed with messages still being sent");
- }
- T::sent(conversation, rng)
- }
- async fn receive_action<T: State>(
- msg: MessageHeader,
- conversation: Conversation<T>,
- stream_map: &HashMap<String, mpsc::UnboundedSender<Arc<SerializedMessage>>>,
- our_id: &str,
- rng: &mut Xoshiro256PlusPlus,
- ) -> StateMachine {
- match msg.body {
- mgen::MessageBody::Size(size) => {
- log!(
- "{:?} got message from {} of size {}",
- msg.recipients,
- msg.sender,
- size
- );
- let stream = stream_map.get(&msg.sender).expect(&format!("No such contact: {}", msg.sender));
- let m = construct_receipt(our_id.to_string(), msg.sender);
- stream.send(Arc::new(m));
- T::received(conversation, rng)
- }
- mgen::MessageBody::Receipt => T::to_machine(conversation),
- }
- }
- /*
- enum IdleActions {
- Send,
- Receive(usize),
- }
- /// Handle a state transition from Idle, including I/O, for a single-connection conversation.
- /// Used for Idle client-server and single-contact p2p conversations.
- pub async fn manage_single_idle_conversation(
- conversation: Conversation<Idle>,
- stream: &mut TcpStream,
- our_id: &str,
- recipients: Vec<&str>,
- rng: &mut Xoshiro256PlusPlus,
- ) -> Result<StateMachine, (StateMachine, MessengerError)> {
- log!("delaying for {:?}", conversation.delay - Instant::now());
- let mut header_size = [0; 4];
- let action = tokio::select! {
- () = tokio::time::sleep_until(conversation.delay) => {
- Ok(IdleActions::Send)
- }
- res = read_header_size(stream, &mut header_size) => {
- match res {
- Ok(n) => Ok(IdleActions::Receive(n)),
- Err(e) => Err(e),
- }
- }
- };
- let action = match action {
- Ok(action) => action,
- Err(e) => return Err((StateMachine::Idle(conversation), e)),
- };
- match action {
- IdleActions::Send => send_action(conversation, stream, our_id, recipients, rng).await,
- IdleActions::Receive(n) => {
- let msg = match read_message(stream, n, header_size).await {
- Ok(msg) => msg,
- Err(e) => return Err((StateMachine::Idle(conversation), e)),
- };
- receive_action(msg, conversation, stream, our_id, rng).await
- }
- }
- }
- enum ActiveActions {
- Send,
- Receive(usize),
- Idle,
- }
- /// Handle a state transition from Active, including I/O, for a single-connection conversation.
- /// Used for Active client-server and single-contact p2p conversations.
- pub async fn manage_single_active_conversation(
- conversation: Conversation<Active>,
- stream: &mut TcpStream,
- our_id: &str,
- recipients: Vec<&str>,
- rng: &mut Xoshiro256PlusPlus,
- ) -> Result<StateMachine, (StateMachine, MessengerError)> {
- let mut header_size = [0; 4];
- let action = tokio::select! {
- action = Conversation::<Active>::sleep(conversation.delay, conversation.state.wait) => {
- Ok(action)
- }
- res = read_header_size(stream, &mut header_size) => {
- match res {
- Ok(n) => Ok(ActiveActions::Receive(n)),
- Err(e) => Err(e),
- }
- }
- };
- let action = match action {
- Ok(action) => action,
- Err(e) => return Err((StateMachine::Active(conversation), e)),
- };
- match action {
- ActiveActions::Send => send_action(conversation, stream, our_id, recipients, rng).await,
- ActiveActions::Receive(n) => {
- let msg = match read_message(stream, n, header_size).await {
- Ok(msg) => msg,
- Err(e) => return Err((StateMachine::Active(conversation), e)),
- };
- receive_action(msg, conversation, stream, our_id, rng).await
- }
- ActiveActions::Idle => Ok(StateMachine::Idle(conversation.waited(rng))),
- }
- }
- */
- enum IdleGroupActions {
- Send,
- Receive(MessageHeader),
- }
- /// Handle a state transition from Idle, including I/O, for a multi-connection conversation.
- /// Used for Idle group p2p conversations.
- pub async fn manage_group_idle_conversation(
- conversation: Conversation<Idle>,
- inbound: &mut mpsc::UnboundedReceiver<MessageHeader>,
- stream_map: &mut HashMap<String, mpsc::UnboundedSender<Arc<SerializedMessage>>>,
- our_id: &str,
- recipients: Vec<&str>,
- rng: &mut Xoshiro256PlusPlus,
- ) -> StateMachine {
- log!("delaying for {:?}", conversation.delay - Instant::now());
- let action = tokio::select! {
- () = tokio::time::sleep_until(conversation.delay) => IdleGroupActions::Send,
- res = inbound.recv() =>
- IdleGroupActions::Receive(res.expect("inbound channel closed")),
- };
- match action {
- IdleGroupActions::Send => send_action(conversation, stream_map.values_mut(), our_id, recipients, rng).await,
- IdleGroupActions::Receive(msg) => receive_action(msg, conversation, stream_map, our_id, rng).await,
- }
- }
- enum ActiveGroupActions {
- Send,
- Receive(MessageHeader),
- Idle,
- }
- /// Handle a state transition from Active, including I/O, for a multi-connection conversation.
- /// Used for Active group p2p conversations.
- pub async fn manage_group_active_conversation(
- conversation: Conversation<Active>,
- inbound: &mut mpsc::UnboundedReceiver<MessageHeader>,
- stream_map: &mut HashMap<String, mpsc::UnboundedSender<Arc<SerializedMessage>>>,
- our_id: &str,
- recipients: Vec<&str>,
- rng: &mut Xoshiro256PlusPlus,
- ) -> StateMachine {
- let action = tokio::select! {
- action = Conversation::<Active>::sleep(conversation.delay, conversation.state.wait) => action,
- res = inbound.recv() =>
- ActiveGroupActions::Receive(res.expect("inbound channel closed")),
- };
- match action {
- ActiveGroupActions::Send => send_action(conversation, stream_map.values_mut(), our_id, recipients, rng).await,
- ActiveGroupActions::Receive(msg) => receive_action(msg, conversation, stream_map, our_id, rng).await,
- ActiveGroupActions::Idle => StateMachine::Idle(conversation.waited(rng)),
- }
- }
|