state.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. // The state machine used to represent one end of a conversation.
  2. // This includes inducing transitions and actions taken during transitions,
  3. // so a lot of the messenger network code is here.
  4. use mgen::{log, MessageHeader, SerializedMessage};
  5. use rand_distr::Distribution;
  6. use rand_xoshiro::Xoshiro256PlusPlus;
  7. use tokio::io::AsyncReadExt;
  8. use tokio::net::TcpStream;
  9. use tokio::sync::mpsc;
  10. use tokio::time::Instant;
  11. use std::collections::HashMap;
  12. use std::sync::Arc;
  13. use crate::messenger::dists::Distributions;
  14. use crate::messenger::error::MessengerError;
  15. use crate::messenger::message::{construct_message, construct_receipt};
  16. /// All possible Conversation state machine states
  17. pub enum StateMachine {
  18. Idle(Conversation<Idle>),
  19. Active(Conversation<Active>),
  20. }
  21. impl StateMachine {
  22. pub fn start(dists: Distributions, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
  23. Self::Idle(Conversation::<Idle>::start(dists, rng))
  24. }
  25. }
  26. /// The state machine representing a conversation state and its transitions.
  27. pub struct Conversation<S: State> {
  28. dists: Distributions,
  29. delay: Instant,
  30. state: S,
  31. }
  32. pub trait State {
  33. fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine
  34. where
  35. Self: Sized;
  36. fn received(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine
  37. where
  38. Self: Sized;
  39. fn to_machine(conversation: Conversation<Self>) -> StateMachine
  40. where
  41. Self: Sized;
  42. }
  43. pub struct Idle {}
  44. pub struct Active {
  45. wait: Instant,
  46. }
  47. impl State for Idle {
  48. fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
  49. if conversation.dists.s.sample(rng) {
  50. log!("Idle: [sent] tranisition to [Active]");
  51. let delay = Instant::now() + conversation.dists.a_s.sample_secs(rng);
  52. let wait = Instant::now() + conversation.dists.w.sample_secs(rng);
  53. StateMachine::Active({
  54. Conversation::<Active> {
  55. dists: conversation.dists,
  56. delay,
  57. state: Active { wait },
  58. }
  59. })
  60. } else {
  61. log!("Idle: [sent] tranisition to [Idle]");
  62. let delay = Instant::now() + conversation.dists.i.sample_secs(rng);
  63. StateMachine::Idle({
  64. Conversation::<Idle> {
  65. dists: conversation.dists,
  66. delay,
  67. state: Idle {},
  68. }
  69. })
  70. }
  71. }
  72. fn received(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
  73. if conversation.dists.r.sample(rng) {
  74. log!("Idle: [recv'd] tranisition to [Active]");
  75. let wait = Instant::now() + conversation.dists.w.sample_secs(rng);
  76. let delay = Instant::now() + conversation.dists.a_r.sample_secs(rng);
  77. StateMachine::Active({
  78. Conversation::<Active> {
  79. dists: conversation.dists,
  80. delay,
  81. state: Active { wait },
  82. }
  83. })
  84. } else {
  85. log!("Idle: [recv'd] tranisition to [Idle]");
  86. StateMachine::Idle(conversation)
  87. }
  88. }
  89. fn to_machine(conversation: Conversation<Self>) -> StateMachine {
  90. StateMachine::Idle(conversation)
  91. }
  92. }
  93. impl State for Active {
  94. fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
  95. log!("Active: [sent] transition to [Active]");
  96. let delay = Instant::now() + conversation.dists.a_s.sample_secs(rng);
  97. StateMachine::Active(Conversation::<Active> {
  98. dists: conversation.dists,
  99. delay,
  100. state: conversation.state,
  101. })
  102. }
  103. fn received(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
  104. log!("Active: [recv'd] transition to [Active]");
  105. let delay = Instant::now() + conversation.dists.a_r.sample_secs(rng);
  106. StateMachine::Active(Conversation::<Active> {
  107. dists: conversation.dists,
  108. delay,
  109. state: conversation.state,
  110. })
  111. }
  112. fn to_machine(conversation: Conversation<Self>) -> StateMachine {
  113. StateMachine::Active(conversation)
  114. }
  115. }
  116. impl Conversation<Idle> {
  117. fn start(dists: Distributions, rng: &mut Xoshiro256PlusPlus) -> Self {
  118. let delay = Instant::now() + dists.i.sample_secs(rng);
  119. log!("[start]");
  120. Self {
  121. dists,
  122. delay,
  123. state: Idle {},
  124. }
  125. }
  126. }
  127. impl Conversation<Active> {
  128. fn waited(self, rng: &mut Xoshiro256PlusPlus) -> Conversation<Idle> {
  129. log!("Active: [waited] tranision to [Idle]");
  130. let delay = Instant::now() + self.dists.i.sample_secs(rng);
  131. Conversation::<Idle> {
  132. dists: self.dists,
  133. delay,
  134. state: Idle {},
  135. }
  136. }
  137. async fn sleep(delay: Instant, wait: Instant) -> ActiveGroupActions {
  138. if delay < wait {
  139. log!("delaying for {:?}", delay - Instant::now());
  140. tokio::time::sleep_until(delay).await;
  141. ActiveGroupActions::Send
  142. } else {
  143. log!("waiting for {:?}", wait - Instant::now());
  144. tokio::time::sleep_until(wait).await;
  145. ActiveGroupActions::Idle
  146. }
  147. }
  148. }
  149. /// Attempt to read some portion of the header size from the stream.
  150. /// The number of bytes written is returned in the Ok case.
  151. /// The caller must read any remaining bytes less than 4.
  152. // N.B.: This must be written cancellation safe!
  153. // https://docs.rs/tokio/1.26.0/tokio/macro.select.html#cancellation-safety
  154. async fn read_header_size(
  155. stream: &mut TcpStream,
  156. header_size: &mut [u8; 4],
  157. ) -> Result<usize, MessengerError> {
  158. let read = stream.read(header_size).await?;
  159. if read == 0 {
  160. Err(tokio::io::Error::new(
  161. tokio::io::ErrorKind::WriteZero,
  162. "failed to read any bytes from message with bytes remaining",
  163. )
  164. .into())
  165. } else {
  166. Ok(read)
  167. }
  168. }
  169. async fn read_message(
  170. stream: &mut TcpStream,
  171. n: usize,
  172. mut header_size: [u8; 4],
  173. ) -> Result<MessageHeader, MessengerError> {
  174. if n < 4 {
  175. // we didn't get the whole size, but we can use read_exact now
  176. stream.read_exact(&mut header_size[n..]).await?;
  177. }
  178. let (header, _) = mgen::get_message_with_header_size(stream, header_size).await?;
  179. Ok(header)
  180. }
  181. async fn send_action<'a, T: State, I: Iterator<Item = &'a mut mpsc::UnboundedSender<Arc<SerializedMessage>>>> (
  182. conversation: Conversation<T>,
  183. streams: I,
  184. our_id: &str,
  185. recipients: Vec<&str>,
  186. rng: &mut Xoshiro256PlusPlus,
  187. ) -> StateMachine {
  188. let size = conversation.dists.m.sample(rng);
  189. log!(
  190. "sending message from {} to {:?} of size {}",
  191. our_id,
  192. recipients,
  193. size
  194. );
  195. let m = Arc::new(construct_message(
  196. our_id.to_string(),
  197. recipients.iter().map(|s| s.to_string()).collect(),
  198. size,
  199. ));
  200. for stream in streams {
  201. stream.send(m.clone()).expect("Internal stream closed with messages still being sent");
  202. }
  203. T::sent(conversation, rng)
  204. }
  205. async fn receive_action<T: State>(
  206. msg: MessageHeader,
  207. conversation: Conversation<T>,
  208. stream_map: &HashMap<String, mpsc::UnboundedSender<Arc<SerializedMessage>>>,
  209. our_id: &str,
  210. rng: &mut Xoshiro256PlusPlus,
  211. ) -> StateMachine {
  212. match msg.body {
  213. mgen::MessageBody::Size(size) => {
  214. log!(
  215. "{:?} got message from {} of size {}",
  216. msg.recipients,
  217. msg.sender,
  218. size
  219. );
  220. let stream = stream_map.get(&msg.sender).expect(&format!("No such contact: {}", msg.sender));
  221. let m = construct_receipt(our_id.to_string(), msg.sender);
  222. stream.send(Arc::new(m));
  223. T::received(conversation, rng)
  224. }
  225. mgen::MessageBody::Receipt => T::to_machine(conversation),
  226. }
  227. }
  228. /*
  229. enum IdleActions {
  230. Send,
  231. Receive(usize),
  232. }
  233. /// Handle a state transition from Idle, including I/O, for a single-connection conversation.
  234. /// Used for Idle client-server and single-contact p2p conversations.
  235. pub async fn manage_single_idle_conversation(
  236. conversation: Conversation<Idle>,
  237. stream: &mut TcpStream,
  238. our_id: &str,
  239. recipients: Vec<&str>,
  240. rng: &mut Xoshiro256PlusPlus,
  241. ) -> Result<StateMachine, (StateMachine, MessengerError)> {
  242. log!("delaying for {:?}", conversation.delay - Instant::now());
  243. let mut header_size = [0; 4];
  244. let action = tokio::select! {
  245. () = tokio::time::sleep_until(conversation.delay) => {
  246. Ok(IdleActions::Send)
  247. }
  248. res = read_header_size(stream, &mut header_size) => {
  249. match res {
  250. Ok(n) => Ok(IdleActions::Receive(n)),
  251. Err(e) => Err(e),
  252. }
  253. }
  254. };
  255. let action = match action {
  256. Ok(action) => action,
  257. Err(e) => return Err((StateMachine::Idle(conversation), e)),
  258. };
  259. match action {
  260. IdleActions::Send => send_action(conversation, stream, our_id, recipients, rng).await,
  261. IdleActions::Receive(n) => {
  262. let msg = match read_message(stream, n, header_size).await {
  263. Ok(msg) => msg,
  264. Err(e) => return Err((StateMachine::Idle(conversation), e)),
  265. };
  266. receive_action(msg, conversation, stream, our_id, rng).await
  267. }
  268. }
  269. }
  270. enum ActiveActions {
  271. Send,
  272. Receive(usize),
  273. Idle,
  274. }
  275. /// Handle a state transition from Active, including I/O, for a single-connection conversation.
  276. /// Used for Active client-server and single-contact p2p conversations.
  277. pub async fn manage_single_active_conversation(
  278. conversation: Conversation<Active>,
  279. stream: &mut TcpStream,
  280. our_id: &str,
  281. recipients: Vec<&str>,
  282. rng: &mut Xoshiro256PlusPlus,
  283. ) -> Result<StateMachine, (StateMachine, MessengerError)> {
  284. let mut header_size = [0; 4];
  285. let action = tokio::select! {
  286. action = Conversation::<Active>::sleep(conversation.delay, conversation.state.wait) => {
  287. Ok(action)
  288. }
  289. res = read_header_size(stream, &mut header_size) => {
  290. match res {
  291. Ok(n) => Ok(ActiveActions::Receive(n)),
  292. Err(e) => Err(e),
  293. }
  294. }
  295. };
  296. let action = match action {
  297. Ok(action) => action,
  298. Err(e) => return Err((StateMachine::Active(conversation), e)),
  299. };
  300. match action {
  301. ActiveActions::Send => send_action(conversation, stream, our_id, recipients, rng).await,
  302. ActiveActions::Receive(n) => {
  303. let msg = match read_message(stream, n, header_size).await {
  304. Ok(msg) => msg,
  305. Err(e) => return Err((StateMachine::Active(conversation), e)),
  306. };
  307. receive_action(msg, conversation, stream, our_id, rng).await
  308. }
  309. ActiveActions::Idle => Ok(StateMachine::Idle(conversation.waited(rng))),
  310. }
  311. }
  312. */
  313. enum IdleGroupActions {
  314. Send,
  315. Receive(MessageHeader),
  316. }
  317. /// Handle a state transition from Idle, including I/O, for a multi-connection conversation.
  318. /// Used for Idle group p2p conversations.
  319. pub async fn manage_group_idle_conversation(
  320. conversation: Conversation<Idle>,
  321. inbound: &mut mpsc::UnboundedReceiver<MessageHeader>,
  322. stream_map: &mut HashMap<String, mpsc::UnboundedSender<Arc<SerializedMessage>>>,
  323. our_id: &str,
  324. recipients: Vec<&str>,
  325. rng: &mut Xoshiro256PlusPlus,
  326. ) -> StateMachine {
  327. log!("delaying for {:?}", conversation.delay - Instant::now());
  328. let action = tokio::select! {
  329. () = tokio::time::sleep_until(conversation.delay) => IdleGroupActions::Send,
  330. res = inbound.recv() =>
  331. IdleGroupActions::Receive(res.expect("inbound channel closed")),
  332. };
  333. match action {
  334. IdleGroupActions::Send => send_action(conversation, stream_map.values_mut(), our_id, recipients, rng).await,
  335. IdleGroupActions::Receive(msg) => receive_action(msg, conversation, stream_map, our_id, rng).await,
  336. }
  337. }
  338. enum ActiveGroupActions {
  339. Send,
  340. Receive(MessageHeader),
  341. Idle,
  342. }
  343. /// Handle a state transition from Active, including I/O, for a multi-connection conversation.
  344. /// Used for Active group p2p conversations.
  345. pub async fn manage_group_active_conversation(
  346. conversation: Conversation<Active>,
  347. inbound: &mut mpsc::UnboundedReceiver<MessageHeader>,
  348. stream_map: &mut HashMap<String, mpsc::UnboundedSender<Arc<SerializedMessage>>>,
  349. our_id: &str,
  350. recipients: Vec<&str>,
  351. rng: &mut Xoshiro256PlusPlus,
  352. ) -> StateMachine {
  353. let action = tokio::select! {
  354. action = Conversation::<Active>::sleep(conversation.delay, conversation.state.wait) => action,
  355. res = inbound.recv() =>
  356. ActiveGroupActions::Receive(res.expect("inbound channel closed")),
  357. };
  358. match action {
  359. ActiveGroupActions::Send => send_action(conversation, stream_map.values_mut(), our_id, recipients, rng).await,
  360. ActiveGroupActions::Receive(msg) => receive_action(msg, conversation, stream_map, our_id, rng).await,
  361. ActiveGroupActions::Idle => StateMachine::Idle(conversation.waited(rng)),
  362. }
  363. }