mgen-peer.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. // Code specific to the peer in the p2p mode.
  2. use mgen::{log, updater::Updater, MessageHeader, SerializedMessage};
  3. use rand_xoshiro::{rand_core::SeedableRng, Xoshiro256PlusPlus};
  4. use serde::Deserialize;
  5. use std::collections::HashMap;
  6. use std::result::Result;
  7. use std::sync::Arc;
  8. use tokio::io::AsyncWriteExt;
  9. use tokio::net::{
  10. tcp::{OwnedReadHalf, OwnedWriteHalf},
  11. TcpListener,
  12. };
  13. use tokio::sync::mpsc;
  14. use tokio::task;
  15. use tokio::time::Duration;
  16. mod messenger;
  17. use crate::messenger::dists::{ConfigDistributions, Distributions};
  18. use crate::messenger::error::{FatalError, MessengerError};
  19. use crate::messenger::state::{
  20. manage_active_conversation, manage_idle_conversation, StateFromReader, StateMachine,
  21. StateToWriter,
  22. };
  23. use crate::messenger::tcp::{connect, SocksParams};
  24. /// Type for sending messages from the reader thread to the state thread.
  25. type ReaderToState = mpsc::UnboundedSender<MessageHeader>;
  26. /// Type for getting messages from the state thread in the writer thread.
  27. type WriterFromState = mpsc::UnboundedReceiver<Arc<SerializedMessage>>;
  28. /// Type for sending messages from the state thread to the writer thread.
  29. type MessageHolder = Arc<SerializedMessage>;
  30. /// Type for sending the updated read half of the socket.
  31. type ReadSocketUpdaterIn = Updater<OwnedReadHalf>;
  32. /// Type for getting the updated read half of the socket.
  33. type ReadSocketUpdaterOut = Updater<OwnedReadHalf>;
  34. /// Type for sending the updated write half of the socket.
  35. type WriteSocketUpdaterIn = Updater<OwnedWriteHalf>;
  36. /// Type for getting the updated write half of the socket.
  37. type WriteSocketUpdaterOut = Updater<OwnedWriteHalf>;
  38. /// The conversation (state) thread tracks the conversation state
  39. /// (i.e., whether the user is active or idle, and when to send messages).
  40. /// One state thread per conversation.
  41. async fn manage_conversation(
  42. user: String,
  43. group: String,
  44. distributions: Distributions,
  45. bootstrap: f64,
  46. mut state_from_reader: StateFromReader,
  47. mut state_to_writers: HashMap<String, StateToWriter<MessageHolder>>,
  48. ) {
  49. let mut rng = Xoshiro256PlusPlus::from_entropy();
  50. let user = &user;
  51. let group = &group;
  52. let mut state_machine = StateMachine::start(distributions, &mut rng);
  53. tokio::time::sleep(Duration::from_secs_f64(bootstrap)).await;
  54. loop {
  55. state_machine = match state_machine {
  56. StateMachine::Idle(conversation) => {
  57. manage_idle_conversation(
  58. conversation,
  59. &mut state_from_reader,
  60. &mut state_to_writers,
  61. user,
  62. group,
  63. true,
  64. &mut rng,
  65. )
  66. .await
  67. }
  68. StateMachine::Active(conversation) => {
  69. manage_active_conversation(
  70. conversation,
  71. &mut state_from_reader,
  72. &mut state_to_writers,
  73. user,
  74. group,
  75. true,
  76. &mut rng,
  77. )
  78. .await
  79. }
  80. };
  81. }
  82. }
  83. /// The listener thread listens for inbound connections on the given address.
  84. /// It breaks those connections into reader and writer halves,
  85. /// and gives them to the correct reader and writer threads.
  86. /// One listener thread per user.
  87. async fn listener(
  88. address: String,
  89. name_to_io_threads: HashMap<String, (ReadSocketUpdaterIn, WriteSocketUpdaterIn)>,
  90. ) -> Result<(), FatalError> {
  91. let listener = TcpListener::bind(&address).await?;
  92. log!("listening on {}", &address);
  93. async fn error_collector(
  94. address: &str,
  95. listener: &TcpListener,
  96. name_to_io_threads: &HashMap<String, (ReadSocketUpdaterIn, WriteSocketUpdaterIn)>,
  97. ) -> Result<(), MessengerError> {
  98. let (stream, _) = listener.accept().await?;
  99. let (mut rd, wr) = stream.into_split();
  100. let from = mgen::parse_identifier(&mut rd).await?;
  101. let (channel_to_reader, channel_to_writer) = name_to_io_threads
  102. .get(&from)
  103. .unwrap_or_else(|| panic!("{} got connection from unknown contact: {}", address, from));
  104. channel_to_reader.send(rd);
  105. channel_to_writer.send(wr);
  106. Ok(())
  107. }
  108. loop {
  109. if let Err(MessengerError::Fatal(e)) =
  110. error_collector(&address, &listener, &name_to_io_threads).await
  111. {
  112. return Err(e);
  113. }
  114. }
  115. }
  116. /// The reader thread reads messages from the socket it has been given,
  117. /// and sends them to the correct state thread.
  118. /// One reader thread per (user, recipient) pair.
  119. async fn reader(
  120. mut connection_channel: ReadSocketUpdaterOut,
  121. group_to_conversation_thread: HashMap<String, ReaderToState>,
  122. ) {
  123. loop {
  124. // wait for listener or writer thread to give us a stream to read from
  125. let mut stream = connection_channel.recv().await;
  126. loop {
  127. let msg = if let Ok(msg) = mgen::get_message(&mut stream).await {
  128. msg
  129. } else {
  130. // Unlike the client-server case, we can assume that if there
  131. // were a message someone was trying to send us, they'd make
  132. // sure to re-establish the connection; so when the socket
  133. // breaks, don't bother trying to reform it until we need to
  134. // send a message or the peer reaches out to us.
  135. break;
  136. };
  137. let channel_to_conversation = group_to_conversation_thread
  138. .get(&msg.group)
  139. .unwrap_or_else(|| panic!("Unknown group: {}", msg.group));
  140. channel_to_conversation
  141. .send(msg)
  142. .expect("reader: Channel to group closed");
  143. }
  144. }
  145. }
  146. /// The writer thread takes in messages from state threads,
  147. /// and sends it to the recipient associated with this thread.
  148. /// If it doesn't have a socket from the listener thread,
  149. /// it'll create its own and give the read half to the reader thread.
  150. /// One writer thread per (user, recipient) pair.
  151. async fn writer<'a>(
  152. mut messages_to_send: WriterFromState,
  153. mut write_socket_updater: WriteSocketUpdaterOut,
  154. read_socket_updater: ReadSocketUpdaterIn,
  155. socks_params: SocksParams,
  156. retry: Duration,
  157. ) -> Result<(), FatalError> {
  158. // make sure this is the first step to avoid connections until there's
  159. // something to send
  160. let mut msg = messages_to_send
  161. .recv()
  162. .await
  163. .expect("writer: Channel from conversations closed");
  164. let mut stream = establish_connection(
  165. &mut write_socket_updater,
  166. &read_socket_updater,
  167. &socks_params,
  168. retry,
  169. )
  170. .await
  171. .expect("Fatal error establishing connection");
  172. loop {
  173. while msg.write_all_to(&mut stream).await.is_err() {
  174. stream = establish_connection(
  175. &mut write_socket_updater,
  176. &read_socket_updater,
  177. &socks_params,
  178. retry,
  179. )
  180. .await
  181. .expect("Fatal error establishing connection");
  182. }
  183. msg = messages_to_send
  184. .recv()
  185. .await
  186. .expect("writer: Channel from conversations closed");
  187. }
  188. // helper functions
  189. /// Attempt to get a connection to the peer,
  190. /// whether by getting an existing connection from the listener,
  191. /// or by establishing a new connection.
  192. async fn establish_connection<'a>(
  193. write_socket_updater: &mut WriteSocketUpdaterOut,
  194. read_socket_updater: &ReadSocketUpdaterIn,
  195. socks_params: &SocksParams,
  196. retry: Duration,
  197. ) -> Result<OwnedWriteHalf, FatalError> {
  198. // first check if the listener thread already has a socket
  199. if let Some(wr) = write_socket_updater.maybe_recv() {
  200. return Ok(wr);
  201. }
  202. // immediately try to connect to the peer
  203. tokio::select! {
  204. connection_attempt = connect(socks_params) => {
  205. if let Ok(mut stream) = connection_attempt {
  206. log!(
  207. "connection attempt success from {} to {} on {}",
  208. &socks_params.user,
  209. &socks_params.recipient,
  210. &socks_params.target
  211. );
  212. stream
  213. .write_all(&mgen::serialize_str(&socks_params.user))
  214. .await?;
  215. let (rd, wr) = stream.into_split();
  216. read_socket_updater.send(rd);
  217. return Ok(wr);
  218. } else if let Err(MessengerError::Fatal(e)) = connection_attempt {
  219. return Err(e);
  220. }
  221. }
  222. stream = write_socket_updater.recv() => {return Ok(stream);},
  223. }
  224. // Usually we'll have returned by now, but sometimes we'll fail to
  225. // connect for whatever reason. Initiate a loop of waiting Duration,
  226. // then trying to connect again, allowing it to be inerrupted by
  227. // the listener thread.
  228. loop {
  229. match error_collector(
  230. write_socket_updater,
  231. read_socket_updater,
  232. socks_params,
  233. retry,
  234. )
  235. .await
  236. {
  237. Ok(wr) => return Ok(wr),
  238. Err(MessengerError::Recoverable(_)) => continue,
  239. Err(MessengerError::Fatal(e)) => return Err(e),
  240. }
  241. }
  242. async fn error_collector<'a>(
  243. write_socket_updater: &mut WriteSocketUpdaterOut,
  244. read_socket_updater: &ReadSocketUpdaterIn,
  245. socks_params: &SocksParams,
  246. retry: Duration,
  247. ) -> Result<OwnedWriteHalf, MessengerError> {
  248. tokio::select! {
  249. () = tokio::time::sleep(retry) => {
  250. let mut stream = connect(socks_params)
  251. .await?;
  252. stream.write_all(&mgen::serialize_str(&socks_params.user)).await?;
  253. let (rd, wr) = stream.into_split();
  254. read_socket_updater.send(rd);
  255. Ok(wr)
  256. },
  257. stream = write_socket_updater.recv() => Ok(stream),
  258. }
  259. }
  260. }
  261. }
  262. /// This user or a recipient.
  263. /// If this user, address is a local address to listen on.
  264. /// If a recipient, address is a remote address to send to.
  265. #[derive(Debug, Deserialize)]
  266. struct Peer {
  267. name: String,
  268. address: String,
  269. }
  270. #[derive(Debug, Deserialize)]
  271. struct ConversationConfig {
  272. group: String,
  273. recipients: Vec<Peer>,
  274. bootstrap: f64,
  275. retry: f64,
  276. distributions: ConfigDistributions,
  277. }
  278. #[derive(Debug, Deserialize)]
  279. struct Config {
  280. user: Peer,
  281. socks: Option<String>,
  282. conversations: Vec<ConversationConfig>,
  283. }
  284. #[tokio::main]
  285. async fn main() -> Result<(), Box<dyn std::error::Error>> {
  286. let mut args = std::env::args();
  287. let _ = args.next();
  288. struct ForIoThreads {
  289. state_to_writer: mpsc::UnboundedSender<MessageHolder>,
  290. writer_from_state: WriterFromState,
  291. reader_to_states: HashMap<String, ReaderToState>,
  292. str_params: SocksParams,
  293. retry: f64,
  294. }
  295. let mut handles = vec![];
  296. for config_file in args.flat_map(|a| glob::glob(a.as_str()).unwrap()) {
  297. let toml_s = std::fs::read_to_string(config_file?)?;
  298. let config: Config = toml::from_str(&toml_s)?;
  299. // map from `recipient` to things the (user, recipient) reader/writer threads will need
  300. let mut recipient_map = HashMap::<String, ForIoThreads>::new();
  301. for conversation in config.conversations.into_iter() {
  302. let (reader_to_state, state_from_reader) = mpsc::unbounded_channel();
  303. let mut conversation_recipient_map =
  304. HashMap::<String, StateToWriter<MessageHolder>>::with_capacity(
  305. conversation.recipients.len(),
  306. );
  307. for recipient in conversation.recipients.iter() {
  308. let state_to_writer = if !recipient_map.contains_key(&recipient.name) {
  309. let (state_to_writer, writer_from_state) = mpsc::unbounded_channel();
  310. let mut reader_to_states = HashMap::new();
  311. reader_to_states.insert(conversation.group.clone(), reader_to_state.clone());
  312. let str_params = SocksParams {
  313. socks: config.socks.clone(),
  314. target: recipient.address.clone(),
  315. user: config.user.name.clone(),
  316. recipient: recipient.name.clone(),
  317. };
  318. let for_io = ForIoThreads {
  319. state_to_writer: state_to_writer.clone(),
  320. writer_from_state,
  321. reader_to_states,
  322. str_params,
  323. retry: conversation.retry,
  324. };
  325. recipient_map.insert(recipient.name.clone(), for_io);
  326. state_to_writer
  327. } else {
  328. let for_io = recipient_map.get_mut(&recipient.name).unwrap();
  329. if !for_io.reader_to_states.contains_key(&conversation.group) {
  330. for_io
  331. .reader_to_states
  332. .insert(conversation.group.clone(), reader_to_state.clone());
  333. }
  334. for_io.state_to_writer.clone()
  335. };
  336. conversation_recipient_map.insert(
  337. recipient.name.clone(),
  338. StateToWriter {
  339. channel: state_to_writer,
  340. },
  341. );
  342. }
  343. let distributions: Distributions = conversation.distributions.try_into()?;
  344. tokio::spawn(manage_conversation(
  345. config.user.name.clone(),
  346. conversation.group,
  347. distributions,
  348. conversation.bootstrap,
  349. state_from_reader,
  350. conversation_recipient_map,
  351. ));
  352. }
  353. let mut name_to_io_threads: HashMap<String, (ReadSocketUpdaterIn, WriteSocketUpdaterIn)> =
  354. HashMap::new();
  355. for (recipient, for_io) in recipient_map.drain() {
  356. let listener_writer_to_reader = Updater::new();
  357. let reader_from_listener_writer = listener_writer_to_reader.clone();
  358. let listener_to_writer = Updater::new();
  359. let writer_from_listener = listener_to_writer.clone();
  360. name_to_io_threads.insert(
  361. recipient.to_string(),
  362. (listener_writer_to_reader.clone(), listener_to_writer),
  363. );
  364. tokio::spawn(reader(reader_from_listener_writer, for_io.reader_to_states));
  365. let retry = Duration::from_secs_f64(for_io.retry);
  366. let handle: task::JoinHandle<Result<(), FatalError>> = tokio::spawn(writer(
  367. for_io.writer_from_state,
  368. writer_from_listener,
  369. listener_writer_to_reader,
  370. for_io.str_params,
  371. retry,
  372. ));
  373. handles.push(handle);
  374. }
  375. let handle: task::JoinHandle<Result<(), FatalError>> =
  376. tokio::spawn(listener(config.user.address, name_to_io_threads));
  377. handles.push(handle);
  378. }
  379. handles.shrink_to_fit();
  380. for handle in handles {
  381. handle.await??;
  382. }
  383. Ok(())
  384. }