// The state machine used to represent one end of a conversation. // This includes inducing transitions and actions taken during transitions, // so a lot of the messenger network code is here. use mgen::{log, MessageHeader, SerializedMessage}; use rand_distr::Distribution; use rand_xoshiro::Xoshiro256PlusPlus; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; use tokio::sync::mpsc; use tokio::time::Instant; use std::collections::HashMap; use std::sync::Arc; use crate::messenger::dists::Distributions; use crate::messenger::error::MessengerError; use crate::messenger::message::{construct_message, construct_receipt}; /// All possible Conversation state machine states pub enum StateMachine { Idle(Conversation), Active(Conversation), } impl StateMachine { pub fn start(dists: Distributions, rng: &mut Xoshiro256PlusPlus) -> StateMachine { Self::Idle(Conversation::::start(dists, rng)) } } /// The state machine representing a conversation state and its transitions. pub struct Conversation { dists: Distributions, delay: Instant, state: S, } pub trait State { fn sent(conversation: Conversation, rng: &mut Xoshiro256PlusPlus) -> StateMachine where Self: Sized; fn received(conversation: Conversation, rng: &mut Xoshiro256PlusPlus) -> StateMachine where Self: Sized; fn to_machine(conversation: Conversation) -> StateMachine where Self: Sized; } pub struct Idle {} pub struct Active { wait: Instant, } impl State for Idle { fn sent(conversation: Conversation, rng: &mut Xoshiro256PlusPlus) -> StateMachine { if conversation.dists.s.sample(rng) { log!("Idle: [sent] tranisition to [Active]"); let delay = Instant::now() + conversation.dists.a_s.sample_secs(rng); let wait = Instant::now() + conversation.dists.w.sample_secs(rng); StateMachine::Active({ Conversation:: { dists: conversation.dists, delay, state: Active { wait }, } }) } else { log!("Idle: [sent] tranisition to [Idle]"); let delay = Instant::now() + conversation.dists.i.sample_secs(rng); StateMachine::Idle({ Conversation:: { dists: conversation.dists, delay, state: Idle {}, } }) } } fn received(conversation: Conversation, rng: &mut Xoshiro256PlusPlus) -> StateMachine { if conversation.dists.r.sample(rng) { log!("Idle: [recv'd] tranisition to [Active]"); let wait = Instant::now() + conversation.dists.w.sample_secs(rng); let delay = Instant::now() + conversation.dists.a_r.sample_secs(rng); StateMachine::Active({ Conversation:: { dists: conversation.dists, delay, state: Active { wait }, } }) } else { log!("Idle: [recv'd] tranisition to [Idle]"); StateMachine::Idle(conversation) } } fn to_machine(conversation: Conversation) -> StateMachine { StateMachine::Idle(conversation) } } impl State for Active { fn sent(conversation: Conversation, rng: &mut Xoshiro256PlusPlus) -> StateMachine { log!("Active: [sent] transition to [Active]"); let delay = Instant::now() + conversation.dists.a_s.sample_secs(rng); StateMachine::Active(Conversation:: { dists: conversation.dists, delay, state: conversation.state, }) } fn received(conversation: Conversation, rng: &mut Xoshiro256PlusPlus) -> StateMachine { log!("Active: [recv'd] transition to [Active]"); let delay = Instant::now() + conversation.dists.a_r.sample_secs(rng); StateMachine::Active(Conversation:: { dists: conversation.dists, delay, state: conversation.state, }) } fn to_machine(conversation: Conversation) -> StateMachine { StateMachine::Active(conversation) } } impl Conversation { fn start(dists: Distributions, rng: &mut Xoshiro256PlusPlus) -> Self { let delay = Instant::now() + dists.i.sample_secs(rng); log!("[start]"); Self { dists, delay, state: Idle {}, } } } impl Conversation { fn waited(self, rng: &mut Xoshiro256PlusPlus) -> Conversation { log!("Active: [waited] tranision to [Idle]"); let delay = Instant::now() + self.dists.i.sample_secs(rng); Conversation:: { dists: self.dists, delay, state: Idle {}, } } async fn sleep(delay: Instant, wait: Instant) -> ActiveGroupActions { if delay < wait { log!("delaying for {:?}", delay - Instant::now()); tokio::time::sleep_until(delay).await; ActiveGroupActions::Send } else { log!("waiting for {:?}", wait - Instant::now()); tokio::time::sleep_until(wait).await; ActiveGroupActions::Idle } } } /// Attempt to read some portion of the header size from the stream. /// The number of bytes written is returned in the Ok case. /// The caller must read any remaining bytes less than 4. // N.B.: This must be written cancellation safe! // https://docs.rs/tokio/1.26.0/tokio/macro.select.html#cancellation-safety async fn read_header_size( stream: &mut TcpStream, header_size: &mut [u8; 4], ) -> Result { let read = stream.read(header_size).await?; if read == 0 { Err(tokio::io::Error::new( tokio::io::ErrorKind::WriteZero, "failed to read any bytes from message with bytes remaining", ) .into()) } else { Ok(read) } } async fn read_message( stream: &mut TcpStream, n: usize, mut header_size: [u8; 4], ) -> Result { if n < 4 { // we didn't get the whole size, but we can use read_exact now stream.read_exact(&mut header_size[n..]).await?; } let (header, _) = mgen::get_message_with_header_size(stream, header_size).await?; Ok(header) } async fn send_action<'a, T: State, I: Iterator>>> ( conversation: Conversation, streams: I, our_id: &str, recipients: Vec<&str>, rng: &mut Xoshiro256PlusPlus, ) -> StateMachine { let size = conversation.dists.m.sample(rng); log!( "sending message from {} to {:?} of size {}", our_id, recipients, size ); let m = Arc::new(construct_message( our_id.to_string(), recipients.iter().map(|s| s.to_string()).collect(), size, )); for stream in streams { stream.send(m.clone()).expect("Internal stream closed with messages still being sent"); } T::sent(conversation, rng) } async fn receive_action( msg: MessageHeader, conversation: Conversation, stream_map: &HashMap>>, our_id: &str, rng: &mut Xoshiro256PlusPlus, ) -> StateMachine { match msg.body { mgen::MessageBody::Size(size) => { log!( "{:?} got message from {} of size {}", msg.recipients, msg.sender, size ); let stream = stream_map.get(&msg.sender).expect(&format!("No such contact: {}", msg.sender)); let m = construct_receipt(our_id.to_string(), msg.sender); stream.send(Arc::new(m)); T::received(conversation, rng) } mgen::MessageBody::Receipt => T::to_machine(conversation), } } /* enum IdleActions { Send, Receive(usize), } /// Handle a state transition from Idle, including I/O, for a single-connection conversation. /// Used for Idle client-server and single-contact p2p conversations. pub async fn manage_single_idle_conversation( conversation: Conversation, stream: &mut TcpStream, our_id: &str, recipients: Vec<&str>, rng: &mut Xoshiro256PlusPlus, ) -> Result { log!("delaying for {:?}", conversation.delay - Instant::now()); let mut header_size = [0; 4]; let action = tokio::select! { () = tokio::time::sleep_until(conversation.delay) => { Ok(IdleActions::Send) } res = read_header_size(stream, &mut header_size) => { match res { Ok(n) => Ok(IdleActions::Receive(n)), Err(e) => Err(e), } } }; let action = match action { Ok(action) => action, Err(e) => return Err((StateMachine::Idle(conversation), e)), }; match action { IdleActions::Send => send_action(conversation, stream, our_id, recipients, rng).await, IdleActions::Receive(n) => { let msg = match read_message(stream, n, header_size).await { Ok(msg) => msg, Err(e) => return Err((StateMachine::Idle(conversation), e)), }; receive_action(msg, conversation, stream, our_id, rng).await } } } enum ActiveActions { Send, Receive(usize), Idle, } /// Handle a state transition from Active, including I/O, for a single-connection conversation. /// Used for Active client-server and single-contact p2p conversations. pub async fn manage_single_active_conversation( conversation: Conversation, stream: &mut TcpStream, our_id: &str, recipients: Vec<&str>, rng: &mut Xoshiro256PlusPlus, ) -> Result { let mut header_size = [0; 4]; let action = tokio::select! { action = Conversation::::sleep(conversation.delay, conversation.state.wait) => { Ok(action) } res = read_header_size(stream, &mut header_size) => { match res { Ok(n) => Ok(ActiveActions::Receive(n)), Err(e) => Err(e), } } }; let action = match action { Ok(action) => action, Err(e) => return Err((StateMachine::Active(conversation), e)), }; match action { ActiveActions::Send => send_action(conversation, stream, our_id, recipients, rng).await, ActiveActions::Receive(n) => { let msg = match read_message(stream, n, header_size).await { Ok(msg) => msg, Err(e) => return Err((StateMachine::Active(conversation), e)), }; receive_action(msg, conversation, stream, our_id, rng).await } ActiveActions::Idle => Ok(StateMachine::Idle(conversation.waited(rng))), } } */ enum IdleGroupActions { Send, Receive(MessageHeader), } /// Handle a state transition from Idle, including I/O, for a multi-connection conversation. /// Used for Idle group p2p conversations. pub async fn manage_group_idle_conversation( conversation: Conversation, inbound: &mut mpsc::UnboundedReceiver, stream_map: &mut HashMap>>, our_id: &str, recipients: Vec<&str>, rng: &mut Xoshiro256PlusPlus, ) -> StateMachine { log!("delaying for {:?}", conversation.delay - Instant::now()); let action = tokio::select! { () = tokio::time::sleep_until(conversation.delay) => IdleGroupActions::Send, res = inbound.recv() => IdleGroupActions::Receive(res.expect("inbound channel closed")), }; match action { IdleGroupActions::Send => send_action(conversation, stream_map.values_mut(), our_id, recipients, rng).await, IdleGroupActions::Receive(msg) => receive_action(msg, conversation, stream_map, our_id, rng).await, } } enum ActiveGroupActions { Send, Receive(MessageHeader), Idle, } /// Handle a state transition from Active, including I/O, for a multi-connection conversation. /// Used for Active group p2p conversations. pub async fn manage_group_active_conversation( conversation: Conversation, inbound: &mut mpsc::UnboundedReceiver, stream_map: &mut HashMap>>, our_id: &str, recipients: Vec<&str>, rng: &mut Xoshiro256PlusPlus, ) -> StateMachine { let action = tokio::select! { action = Conversation::::sleep(conversation.delay, conversation.state.wait) => action, res = inbound.recv() => ActiveGroupActions::Receive(res.expect("inbound channel closed")), }; match action { ActiveGroupActions::Send => send_action(conversation, stream_map.values_mut(), our_id, recipients, rng).await, ActiveGroupActions::Receive(msg) => receive_action(msg, conversation, stream_map, our_id, rng).await, ActiveGroupActions::Idle => StateMachine::Idle(conversation.waited(rng)), } }