state.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. // The state machine used to represent one end of a conversation.
  2. // This includes inducing transitions and actions taken during transitions,
  3. // so messages are constructed and passed to other threads from here.
  4. use mgen::{log, MessageHeader, SerializedMessage};
  5. use rand_distr::Distribution;
  6. use rand_xoshiro::Xoshiro256PlusPlus;
  7. use std::borrow::Borrow;
  8. use std::collections::HashMap;
  9. use std::fmt::Debug;
  10. use std::sync::Arc;
  11. use tokio::sync::mpsc;
  12. use tokio::time::Instant;
  13. use crate::messenger::dists::Distributions;
  14. use crate::messenger::message::{construct_message, construct_receipt};
  15. /// All possible Conversation state machine states
  16. pub enum StateMachine {
  17. Idle(Conversation<Idle>),
  18. Active(Conversation<Active>),
  19. }
  20. impl StateMachine {
  21. pub fn start(dists: Distributions, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
  22. Self::Idle(Conversation::<Idle>::start(dists, rng))
  23. }
  24. fn name(&self) -> &str {
  25. match self {
  26. Self::Idle(_) => Idle::NAME,
  27. Self::Active(_) => Active::NAME,
  28. }
  29. }
  30. }
  31. /// The state machine representing a conversation state and its transitions.
  32. pub struct Conversation<S: State> {
  33. dists: Distributions,
  34. delay: Instant,
  35. next_id: u32,
  36. state: S,
  37. }
  38. pub trait State {
  39. const NAME: &'static str;
  40. fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine
  41. where
  42. Self: Sized;
  43. fn received(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine
  44. where
  45. Self: Sized;
  46. fn to_machine(conversation: Conversation<Self>) -> StateMachine
  47. where
  48. Self: Sized;
  49. }
  50. pub struct Idle {}
  51. pub struct Active {
  52. wait: Instant,
  53. }
  54. impl State for Idle {
  55. const NAME: &'static str = "Idle";
  56. fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
  57. let next_id = conversation.next_id + 1;
  58. if conversation.dists.s.sample(rng) {
  59. let delay = Instant::now() + conversation.dists.a_s.sample_secs(rng);
  60. let wait = Instant::now() + conversation.dists.w.sample_secs(rng);
  61. StateMachine::Active({
  62. Conversation::<Active> {
  63. dists: conversation.dists,
  64. delay,
  65. next_id,
  66. state: Active { wait },
  67. }
  68. })
  69. } else {
  70. let delay = Instant::now() + conversation.dists.i.sample_secs(rng);
  71. StateMachine::Idle({
  72. Conversation::<Idle> {
  73. dists: conversation.dists,
  74. delay,
  75. next_id,
  76. state: Idle {},
  77. }
  78. })
  79. }
  80. }
  81. fn received(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
  82. if conversation.dists.r.sample(rng) {
  83. let wait = Instant::now() + conversation.dists.w.sample_secs(rng);
  84. let delay = Instant::now() + conversation.dists.a_r.sample_secs(rng);
  85. StateMachine::Active({
  86. Conversation::<Active> {
  87. dists: conversation.dists,
  88. delay,
  89. next_id: conversation.next_id,
  90. state: Active { wait },
  91. }
  92. })
  93. } else {
  94. StateMachine::Idle(conversation)
  95. }
  96. }
  97. fn to_machine(conversation: Conversation<Self>) -> StateMachine {
  98. StateMachine::Idle(conversation)
  99. }
  100. }
  101. impl State for Active {
  102. const NAME: &'static str = "Active";
  103. fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
  104. let delay = Instant::now() + conversation.dists.a_s.sample_secs(rng);
  105. StateMachine::Active(Conversation::<Active> {
  106. dists: conversation.dists,
  107. delay,
  108. next_id: conversation.next_id + 1,
  109. state: conversation.state,
  110. })
  111. }
  112. fn received(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
  113. let delay = Instant::now() + conversation.dists.a_r.sample_secs(rng);
  114. StateMachine::Active(Conversation::<Active> {
  115. dists: conversation.dists,
  116. delay,
  117. next_id: conversation.next_id,
  118. state: conversation.state,
  119. })
  120. }
  121. fn to_machine(conversation: Conversation<Self>) -> StateMachine {
  122. StateMachine::Active(conversation)
  123. }
  124. }
  125. impl Conversation<Idle> {
  126. fn start(dists: Distributions, rng: &mut Xoshiro256PlusPlus) -> Self {
  127. let delay = Instant::now() + dists.i.sample_secs(rng);
  128. Self {
  129. dists,
  130. delay,
  131. next_id: 0,
  132. state: Idle {},
  133. }
  134. }
  135. }
  136. impl Conversation<Active> {
  137. fn waited(self, rng: &mut Xoshiro256PlusPlus) -> Conversation<Idle> {
  138. let delay = Instant::now() + self.dists.i.sample_secs(rng);
  139. Conversation::<Idle> {
  140. dists: self.dists,
  141. delay,
  142. next_id: self.next_id,
  143. state: Idle {},
  144. }
  145. }
  146. async fn sleep(delay: Instant, wait: Instant) -> ActiveGroupActions {
  147. if delay < wait {
  148. tokio::time::sleep_until(delay).await;
  149. ActiveGroupActions::Send
  150. } else {
  151. tokio::time::sleep_until(wait).await;
  152. ActiveGroupActions::Idle
  153. }
  154. }
  155. }
  156. /// Type for getting messages from the reader thread in the state thread.
  157. pub type StateFromReader = mpsc::UnboundedReceiver<MessageHeader>;
  158. /// Type for sending messages from the state thread to the writer thread.
  159. pub struct StateToWriter<S: MessageHolder> {
  160. pub channel: mpsc::UnboundedSender<S>,
  161. }
  162. pub trait MessageHolder: Borrow<SerializedMessage> + Debug {
  163. fn new(m: SerializedMessage) -> Self;
  164. fn clone(&self) -> Self;
  165. }
  166. impl MessageHolder for Arc<SerializedMessage> {
  167. fn new(m: SerializedMessage) -> Self {
  168. Self::new(m)
  169. }
  170. fn clone(&self) -> Self {
  171. Clone::clone(self)
  172. }
  173. }
  174. impl MessageHolder for Box<SerializedMessage> {
  175. fn new(m: SerializedMessage) -> Self {
  176. Self::new(m)
  177. }
  178. fn clone(&self) -> Self {
  179. panic!("Box holders should never clone");
  180. }
  181. }
  182. pub trait StreamMap<'a, S: 'a + MessageHolder, I: Iterator<Item = &'a mut StateToWriter<S>>> {
  183. fn channel_for(&self, name: &str) -> &StateToWriter<S>;
  184. fn values(&'a mut self) -> I;
  185. }
  186. impl<'a, S: MessageHolder>
  187. StreamMap<'a, S, std::collections::hash_map::ValuesMut<'a, String, StateToWriter<S>>>
  188. for HashMap<String, StateToWriter<S>>
  189. {
  190. fn channel_for(&self, name: &str) -> &StateToWriter<S> {
  191. &self[name]
  192. }
  193. fn values(&'a mut self) -> std::collections::hash_map::ValuesMut<'a, String, StateToWriter<S>> {
  194. self.values_mut()
  195. }
  196. }
  197. impl<'a, S: MessageHolder> StreamMap<'a, S, std::iter::Once<&'a mut StateToWriter<S>>>
  198. for StateToWriter<S>
  199. {
  200. fn channel_for(&self, _name: &str) -> &StateToWriter<S> {
  201. self
  202. }
  203. fn values(&'a mut self) -> std::iter::Once<&'a mut StateToWriter<S>> {
  204. std::iter::once(self)
  205. }
  206. }
  207. async fn send_action<
  208. 'a,
  209. S: 'a + MessageHolder,
  210. T: State,
  211. I: ExactSizeIterator<Item = &'a mut StateToWriter<S>>,
  212. >(
  213. conversation: Conversation<T>,
  214. mut streams: I,
  215. our_id: &str,
  216. group: &str,
  217. rng: &mut Xoshiro256PlusPlus,
  218. ) -> StateMachine {
  219. let size = conversation.dists.m.sample(rng);
  220. let id = conversation.next_id;
  221. let m = S::new(construct_message(
  222. our_id.to_string(),
  223. group.to_string(),
  224. id,
  225. size,
  226. ));
  227. if streams.len() == 1 {
  228. streams
  229. .next()
  230. .unwrap()
  231. .channel
  232. .send(m)
  233. .expect("Internal stream closed with messages still being sent");
  234. } else {
  235. for stream in streams {
  236. stream
  237. .channel
  238. .send(m.clone())
  239. .expect("Internal stream closed with messages still being sent");
  240. }
  241. }
  242. let ret = T::sent(conversation, rng);
  243. log!(
  244. "{},{},send,{},{},{},{}",
  245. our_id,
  246. group,
  247. T::NAME,
  248. ret.name(),
  249. size,
  250. id
  251. );
  252. ret
  253. }
  254. async fn receive_action<
  255. 'a,
  256. S: 'a + MessageHolder,
  257. T: State,
  258. I: std::iter::Iterator<Item = &'a mut StateToWriter<S>>,
  259. M: StreamMap<'a, S, I>,
  260. >(
  261. msg: MessageHeader,
  262. conversation: Conversation<T>,
  263. stream_map: &mut M,
  264. our_id: &str,
  265. group: Option<&str>,
  266. rng: &mut Xoshiro256PlusPlus,
  267. ) -> StateMachine {
  268. match msg.body {
  269. mgen::MessageBody::Size(size) => {
  270. let ret = T::received(conversation, rng);
  271. log!(
  272. "{},{},receive,{},{},{},{},{}",
  273. our_id,
  274. msg.group,
  275. T::NAME,
  276. ret.name(),
  277. msg.sender,
  278. size,
  279. msg.id
  280. );
  281. let stream = stream_map.channel_for(&msg.sender);
  282. let recipient = if group.is_none() {
  283. msg.group
  284. } else {
  285. msg.sender
  286. };
  287. let m = construct_receipt(our_id.to_string(), recipient, msg.id);
  288. stream
  289. .channel
  290. .send(S::new(m))
  291. .expect("channel from receive_action to sender closed");
  292. ret
  293. }
  294. mgen::MessageBody::Receipt => {
  295. let group = match group {
  296. Some(group) => group,
  297. None => &msg.group,
  298. };
  299. log!(
  300. "{},{},receive,{},{},{},receipt,{}",
  301. our_id,
  302. group,
  303. T::NAME,
  304. T::NAME,
  305. msg.sender,
  306. msg.id
  307. );
  308. T::to_machine(conversation)
  309. }
  310. }
  311. }
  312. enum IdleGroupActions {
  313. Send,
  314. Receive(MessageHeader),
  315. }
  316. /// Handle a state transition from Idle, including I/O, for a multi-connection conversation.
  317. /// Used for Idle group p2p conversations.
  318. pub async fn manage_idle_conversation<
  319. 'a,
  320. const P2P: bool,
  321. S: 'a + MessageHolder,
  322. I: std::iter::ExactSizeIterator<Item = &'a mut StateToWriter<S>>,
  323. M: StreamMap<'a, S, I> + 'a,
  324. >(
  325. conversation: Conversation<Idle>,
  326. inbound: &mut StateFromReader,
  327. stream_map: &'a mut M,
  328. our_id: &str,
  329. group: &str,
  330. rng: &mut Xoshiro256PlusPlus,
  331. ) -> StateMachine {
  332. log!("{},{},Idle", our_id, group);
  333. let action = tokio::select! {
  334. () = tokio::time::sleep_until(conversation.delay) => IdleGroupActions::Send,
  335. res = inbound.recv() =>
  336. IdleGroupActions::Receive(res.expect("inbound channel closed")),
  337. };
  338. match action {
  339. IdleGroupActions::Send => {
  340. send_action(conversation, stream_map.values(), our_id, group, rng).await
  341. }
  342. IdleGroupActions::Receive(msg) => {
  343. let group = if P2P { None } else { Some(group) };
  344. receive_action(msg, conversation, stream_map, our_id, group, rng).await
  345. }
  346. }
  347. }
  348. enum ActiveGroupActions {
  349. Send,
  350. Receive(MessageHeader),
  351. Idle,
  352. }
  353. /// Handle a state transition from Active.
  354. pub async fn manage_active_conversation<
  355. 'a,
  356. S: 'a + MessageHolder,
  357. I: std::iter::ExactSizeIterator<Item = &'a mut StateToWriter<S>>,
  358. M: StreamMap<'a, S, I> + 'a,
  359. >(
  360. conversation: Conversation<Active>,
  361. inbound: &mut StateFromReader,
  362. stream_map: &'a mut M,
  363. our_id: &str,
  364. group: &str,
  365. p2p: bool,
  366. rng: &mut Xoshiro256PlusPlus,
  367. ) -> StateMachine {
  368. let action = tokio::select! {
  369. action = Conversation::<Active>::sleep(conversation.delay, conversation.state.wait) => action,
  370. res = inbound.recv() =>
  371. ActiveGroupActions::Receive(res.expect("inbound channel closed")),
  372. };
  373. match action {
  374. ActiveGroupActions::Send => {
  375. send_action(conversation, stream_map.values(), our_id, group, rng).await
  376. }
  377. ActiveGroupActions::Receive(msg) => {
  378. let group = if p2p { None } else { Some(group) };
  379. receive_action(msg, conversation, stream_map, our_id, group, rng).await
  380. }
  381. ActiveGroupActions::Idle => StateMachine::Idle(conversation.waited(rng)),
  382. }
  383. }