浏览代码

actually add peer code, and allow Box or Arc for senders

Justin Tracey 1 年之前
父节点
当前提交
1d900f96b3
共有 3 个文件被更改,包括 540 次插入32 次删除
  1. 10 8
      src/bin/client.rs
  2. 75 24
      src/bin/messenger/state.rs
  3. 455 0
      src/bin/peer.rs

+ 10 - 8
src/bin/client.rs

@@ -5,7 +5,6 @@ use rand_xoshiro::{rand_core::SeedableRng, Xoshiro256PlusPlus};
 use serde::Deserialize;
 use serde::Deserialize;
 use std::env;
 use std::env;
 use std::result::Result;
 use std::result::Result;
-use std::sync::Arc;
 use tokio::io::AsyncWriteExt;
 use tokio::io::AsyncWriteExt;
 use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
 use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
 use tokio::sync::mpsc;
 use tokio::sync::mpsc;
@@ -17,16 +16,16 @@ mod messenger;
 
 
 use crate::messenger::dists::{ConfigDistributions, Distributions};
 use crate::messenger::dists::{ConfigDistributions, Distributions};
 use crate::messenger::error::{FatalError, MessengerError};
 use crate::messenger::error::{FatalError, MessengerError};
-use crate::messenger::state::{manage_active_conversation, manage_idle_conversation, StateMachine};
+use crate::messenger::state::{
+    manage_active_conversation, manage_idle_conversation, StateMachine, StateToWriter,
+};
 
 
 /// Type for sending messages from the reader thread to the state thread.
 /// Type for sending messages from the reader thread to the state thread.
 type ReaderToState = mpsc::UnboundedSender<MessageHeader>;
 type ReaderToState = mpsc::UnboundedSender<MessageHeader>;
-// implicit
-//type StateFromReader = mpsc::UnboundedReceiver<MessageHeader>;
+/// Type of messages sent to the writer thread.
+type MessageHolder = Box<SerializedMessage>;
 /// Type for getting messages from the state thread in the writer thread.
 /// Type for getting messages from the state thread in the writer thread.
-type WriterFromState = mpsc::UnboundedReceiver<Arc<SerializedMessage>>;
-// implicit
-//type StateToWriter = mpsc::UnboundedSender<Arc<SerializedMessage>>;
+type WriterFromState = mpsc::UnboundedReceiver<MessageHolder>;
 /// Type for sending the updated read half of the socket.
 /// Type for sending the updated read half of the socket.
 type ReadSocketUpdaterIn = mpsc::UnboundedSender<OwnedReadHalf>;
 type ReadSocketUpdaterIn = mpsc::UnboundedSender<OwnedReadHalf>;
 /// Type for getting the updated read half of the socket.
 /// Type for getting the updated read half of the socket.
@@ -178,7 +177,7 @@ async fn manage_conversation(config: Config) -> Result<(), MessengerError> {
     let recipients: Vec<&str> = config.recipients.iter().map(String::as_str).collect();
     let recipients: Vec<&str> = config.recipients.iter().map(String::as_str).collect();
 
 
     let (reader_to_state, mut state_from_reader) = mpsc::unbounded_channel();
     let (reader_to_state, mut state_from_reader) = mpsc::unbounded_channel();
-    let (mut state_to_writer, writer_from_state) = mpsc::unbounded_channel();
+    let (state_to_writer, writer_from_state) = mpsc::unbounded_channel();
     let (read_socket_updater_in, read_socket_updater_out) = mpsc::unbounded_channel();
     let (read_socket_updater_in, read_socket_updater_out) = mpsc::unbounded_channel();
     let (write_socket_updater_in, write_socket_updater_out) = mpsc::unbounded_channel();
     let (write_socket_updater_in, write_socket_updater_out) = mpsc::unbounded_channel();
     let (errs_in, errs_out) = mpsc::unbounded_channel();
     let (errs_in, errs_out) = mpsc::unbounded_channel();
@@ -197,6 +196,9 @@ async fn manage_conversation(config: Config) -> Result<(), MessengerError> {
         write_socket_updater_in,
         write_socket_updater_in,
     ));
     ));
 
 
+    let mut state_to_writer = StateToWriter {
+        channel: state_to_writer,
+    };
     loop {
     loop {
         state_machine = match state_machine {
         state_machine = match state_machine {
             StateMachine::Idle(conversation) => {
             StateMachine::Idle(conversation) => {

+ 75 - 24
src/bin/messenger/state.rs

@@ -5,7 +5,9 @@
 use mgen::{log, MessageHeader, SerializedMessage};
 use mgen::{log, MessageHeader, SerializedMessage};
 use rand_distr::Distribution;
 use rand_distr::Distribution;
 use rand_xoshiro::Xoshiro256PlusPlus;
 use rand_xoshiro::Xoshiro256PlusPlus;
+use std::borrow::Borrow;
 use std::collections::HashMap;
 use std::collections::HashMap;
+use std::fmt::Debug;
 use std::sync::Arc;
 use std::sync::Arc;
 use tokio::sync::mpsc;
 use tokio::sync::mpsc;
 use tokio::time::Instant;
 use tokio::time::Instant;
@@ -161,40 +163,75 @@ impl Conversation<Active> {
 }
 }
 
 
 /// Type for getting messages from the reader thread in the state thread.
 /// Type for getting messages from the reader thread in the state thread.
-type StateFromReader = mpsc::UnboundedReceiver<MessageHeader>;
+pub type StateFromReader = mpsc::UnboundedReceiver<MessageHeader>;
 /// Type for sending messages from the state thread to the writer thread.
 /// Type for sending messages from the state thread to the writer thread.
-type StateToWriter = mpsc::UnboundedSender<Arc<SerializedMessage>>;
+pub struct StateToWriter<S: MessageHolder> {
+    pub channel: mpsc::UnboundedSender<S>,
+}
+
+pub trait MessageHolder: Borrow<SerializedMessage> + Debug {
+    fn new(m: SerializedMessage) -> Self;
+    fn clone(&self) -> Self;
+}
+
+impl MessageHolder for Arc<SerializedMessage> {
+    fn new(m: SerializedMessage) -> Self {
+        Self::new(m)
+    }
+
+    fn clone(&self) -> Self {
+        Clone::clone(self)
+    }
+}
+
+impl MessageHolder for Box<SerializedMessage> {
+    fn new(m: SerializedMessage) -> Self {
+        Self::new(m)
+    }
 
 
-pub trait StreamMap<'a, I: Iterator<Item = &'a mut StateToWriter>> {
-    fn channel_for(&self, name: &str) -> &StateToWriter;
+    fn clone(&self) -> Self {
+        panic!("Box holders should never clone");
+    }
+}
+
+pub trait StreamMap<'a, S: 'a + MessageHolder, I: Iterator<Item = &'a mut StateToWriter<S>>> {
+    fn channel_for(&self, name: &str) -> &StateToWriter<S>;
     fn values(&'a mut self) -> I;
     fn values(&'a mut self) -> I;
 }
 }
 
 
-impl<'a> StreamMap<'a, std::collections::hash_map::ValuesMut<'a, String, StateToWriter>>
-    for HashMap<String, StateToWriter>
+impl<'a, S: MessageHolder>
+    StreamMap<'a, S, std::collections::hash_map::ValuesMut<'a, String, StateToWriter<S>>>
+    for HashMap<String, StateToWriter<S>>
 {
 {
-    fn channel_for(&self, name: &str) -> &StateToWriter {
+    fn channel_for(&self, name: &str) -> &StateToWriter<S> {
         &self[name]
         &self[name]
     }
     }
 
 
-    fn values(&'a mut self) -> std::collections::hash_map::ValuesMut<'a, String, StateToWriter> {
+    fn values(&'a mut self) -> std::collections::hash_map::ValuesMut<'a, String, StateToWriter<S>> {
         self.values_mut()
         self.values_mut()
     }
     }
 }
 }
 
 
-impl<'a> StreamMap<'a, std::iter::Once<&'a mut StateToWriter>> for StateToWriter {
-    fn channel_for(&self, _name: &str) -> &StateToWriter {
+impl<'a, S: MessageHolder> StreamMap<'a, S, std::iter::Once<&'a mut StateToWriter<S>>>
+    for StateToWriter<S>
+{
+    fn channel_for(&self, _name: &str) -> &StateToWriter<S> {
         self
         self
     }
     }
 
 
-    fn values(&'a mut self) -> std::iter::Once<&'a mut StateToWriter> {
+    fn values(&'a mut self) -> std::iter::Once<&'a mut StateToWriter<S>> {
         std::iter::once(self)
         std::iter::once(self)
     }
     }
 }
 }
 
 
-async fn send_action<'a, T: State, I: Iterator<Item = &'a mut StateToWriter>>(
+async fn send_action<
+    'a,
+    S: 'a + MessageHolder,
+    T: State,
+    I: ExactSizeIterator<Item = &'a mut StateToWriter<S>>,
+>(
     conversation: Conversation<T>,
     conversation: Conversation<T>,
-    streams: I,
+    mut streams: I,
     our_id: &str,
     our_id: &str,
     group: &str,
     group: &str,
     recipients: Vec<&str>,
     recipients: Vec<&str>,
@@ -207,17 +244,27 @@ async fn send_action<'a, T: State, I: Iterator<Item = &'a mut StateToWriter>>(
         recipients,
         recipients,
         size
         size
     );
     );
-    let m = Arc::new(construct_message(
+    let m = S::new(construct_message(
         our_id.to_string(),
         our_id.to_string(),
         group.to_string(),
         group.to_string(),
         recipients.iter().map(|s| s.to_string()).collect(),
         recipients.iter().map(|s| s.to_string()).collect(),
         size,
         size,
     ));
     ));
 
 
-    for stream in streams {
-        stream
-            .send(m.clone())
+    if streams.len() == 1 {
+        streams
+            .next()
+            .unwrap()
+            .channel
+            .send(m)
             .expect("Internal stream closed with messages still being sent");
             .expect("Internal stream closed with messages still being sent");
+    } else {
+        for stream in streams {
+            stream
+                .channel
+                .send(m.clone())
+                .expect("Internal stream closed with messages still being sent");
+        }
     }
     }
 
 
     T::sent(conversation, rng)
     T::sent(conversation, rng)
@@ -225,9 +272,10 @@ async fn send_action<'a, T: State, I: Iterator<Item = &'a mut StateToWriter>>(
 
 
 async fn receive_action<
 async fn receive_action<
     'a,
     'a,
+    S: 'a + MessageHolder,
     T: State,
     T: State,
-    I: std::iter::Iterator<Item = &'a mut StateToWriter>,
-    M: StreamMap<'a, I>,
+    I: std::iter::Iterator<Item = &'a mut StateToWriter<S>>,
+    M: StreamMap<'a, S, I>,
 >(
 >(
     msg: MessageHeader,
     msg: MessageHeader,
     conversation: Conversation<T>,
     conversation: Conversation<T>,
@@ -247,7 +295,8 @@ async fn receive_action<
             let stream = stream_map.channel_for(&msg.sender);
             let stream = stream_map.channel_for(&msg.sender);
             let m = construct_receipt(our_id.to_string(), group.to_string(), msg.sender);
             let m = construct_receipt(our_id.to_string(), group.to_string(), msg.sender);
             stream
             stream
-                .send(Arc::new(m))
+                .channel
+                .send(S::new(m))
                 .expect("channel from receive_action to sender closed");
                 .expect("channel from receive_action to sender closed");
             T::received(conversation, rng)
             T::received(conversation, rng)
         }
         }
@@ -264,8 +313,9 @@ enum IdleGroupActions {
 /// Used for Idle group p2p conversations.
 /// Used for Idle group p2p conversations.
 pub async fn manage_idle_conversation<
 pub async fn manage_idle_conversation<
     'a,
     'a,
-    I: std::iter::Iterator<Item = &'a mut StateToWriter>,
-    M: StreamMap<'a, I> + 'a,
+    S: 'a + MessageHolder,
+    I: std::iter::ExactSizeIterator<Item = &'a mut StateToWriter<S>>,
+    M: StreamMap<'a, S, I> + 'a,
 >(
 >(
     conversation: Conversation<Idle>,
     conversation: Conversation<Idle>,
     inbound: &mut StateFromReader,
     inbound: &mut StateFromReader,
@@ -310,8 +360,9 @@ enum ActiveGroupActions {
 /// Handle a state transition from Active.
 /// Handle a state transition from Active.
 pub async fn manage_active_conversation<
 pub async fn manage_active_conversation<
     'a,
     'a,
-    I: std::iter::Iterator<Item = &'a mut StateToWriter>,
-    M: StreamMap<'a, I> + 'a,
+    S: 'a + MessageHolder,
+    I: std::iter::ExactSizeIterator<Item = &'a mut StateToWriter<S>>,
+    M: StreamMap<'a, S, I> + 'a,
 >(
 >(
     conversation: Conversation<Active>,
     conversation: Conversation<Active>,
     inbound: &mut StateFromReader,
     inbound: &mut StateFromReader,

+ 455 - 0
src/bin/peer.rs

@@ -0,0 +1,455 @@
+// Code specific to the peer in the p2p mode.
+use mgen::{log, MessageHeader, SerializedMessage};
+use rand_xoshiro::{rand_core::SeedableRng, Xoshiro256PlusPlus};
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::env;
+use std::result::Result;
+use std::sync::Arc;
+use tokio::io::AsyncWriteExt;
+use tokio::net::{
+    tcp::{OwnedReadHalf, OwnedWriteHalf},
+    TcpListener,
+};
+use tokio::sync::mpsc;
+use tokio::task;
+use tokio::time::Duration;
+use tokio_socks::tcp::Socks5Stream;
+
+mod messenger;
+
+use crate::messenger::dists::{ConfigDistributions, Distributions};
+use crate::messenger::error::{FatalError, MessengerError};
+use crate::messenger::state::{
+    manage_active_conversation, manage_idle_conversation, StateFromReader, StateMachine,
+    StateToWriter,
+};
+
+/// Type for sending messages from the reader thread to the state thread.
+type ReaderToState = mpsc::UnboundedSender<MessageHeader>;
+/// Type for getting messages from the state thread in the writer thread.
+type WriterFromState = mpsc::UnboundedReceiver<Arc<SerializedMessage>>;
+/// Type for sending messages from the state thread to the writer thread.
+type MessageHolder = Arc<SerializedMessage>;
+/// Type for sending the updated read half of the socket.
+type ReadSocketUpdaterIn = mpsc::UnboundedSender<OwnedReadHalf>;
+/// Type for getting the updated read half of the socket.
+type ReadSocketUpdaterOut = mpsc::UnboundedReceiver<OwnedReadHalf>;
+/// Type for sending the updated write half of the socket.
+type WriteSocketUpdaterIn = mpsc::UnboundedSender<OwnedWriteHalf>;
+/// Type for getting the updated write half of the socket.
+type WriteSocketUpdaterOut = mpsc::UnboundedReceiver<OwnedWriteHalf>;
+
+/// The conversation (state) thread tracks the conversation state
+/// (i.e., whether the user is active or idle, and when to send messages).
+/// One state thread per conversation.
+async fn manage_conversation(
+    user: String,
+    group: String,
+    recipients: Vec<String>,
+    distributions: Distributions,
+    bootstrap: f64,
+    mut state_from_reader: StateFromReader,
+    mut state_to_writers: HashMap<String, StateToWriter<MessageHolder>>,
+) {
+    let mut rng = Xoshiro256PlusPlus::from_entropy();
+    let recipients: Vec<_> = recipients.iter().map(String::as_str).collect();
+    let user = &user;
+    let group = &group;
+
+    let mut state_machine = StateMachine::start(distributions, &mut rng);
+
+    tokio::time::sleep(Duration::from_secs_f64(bootstrap)).await;
+
+    loop {
+        state_machine = match state_machine {
+            StateMachine::Idle(conversation) => {
+                manage_idle_conversation(
+                    conversation,
+                    &mut state_from_reader,
+                    &mut state_to_writers,
+                    user,
+                    group,
+                    recipients.clone(),
+                    &mut rng,
+                )
+                .await
+            }
+            StateMachine::Active(conversation) => {
+                manage_active_conversation(
+                    conversation,
+                    &mut state_from_reader,
+                    &mut state_to_writers,
+                    user,
+                    group,
+                    recipients.clone(),
+                    &mut rng,
+                )
+                .await
+            }
+        };
+    }
+}
+
+/// The listener thread listens for inbound connections on the given address.
+/// It breaks those connections into reader and writer halves,
+/// and gives them to the correct reader and writer threads.
+/// One listener thread per user.
+async fn listener(
+    address: String,
+    name_to_io_threads: HashMap<String, (ReadSocketUpdaterIn, WriteSocketUpdaterIn)>,
+) -> Result<(), FatalError> {
+    let listener = TcpListener::bind(&address).await?;
+    log!("listening on {}", &address);
+
+    async fn error_collector(
+        address: &str,
+        listener: &TcpListener,
+        name_to_io_threads: &HashMap<String, (ReadSocketUpdaterIn, WriteSocketUpdaterIn)>,
+    ) -> Result<(), MessengerError> {
+        let (stream, _) = listener.accept().await?;
+        let (mut rd, wr) = stream.into_split();
+
+        let from = mgen::parse_identifier(&mut rd).await?;
+
+        let (channel_to_reader, channel_to_writer) = name_to_io_threads
+            .get(&from)
+            .unwrap_or_else(|| panic!("{} got connection from unknown contact: {}", address, from));
+        channel_to_reader
+            .send(rd)
+            .expect("listener: Channel to reader closed");
+        channel_to_writer
+            .send(wr)
+            .expect("listener: Channel to writer closed");
+        Ok(())
+    }
+
+    loop {
+        if let Err(MessengerError::Fatal(e)) =
+            error_collector(&address, &listener, &name_to_io_threads).await
+        {
+            return Err(e);
+        }
+    }
+}
+
+/// The reader thread reads messages from the socket it has been given,
+/// and sends them to the correct state thread.
+/// One reader thread per (user, recipient) pair.
+async fn reader(
+    mut connection_channel: ReadSocketUpdaterOut,
+    group_to_conversation_thread: HashMap<String, ReaderToState>,
+) {
+    loop {
+        // wait for listener or writer thread to give us a stream to read from
+        let mut stream = connection_channel
+            .recv()
+            .await
+            .expect("reader: Channel to reader closed");
+        loop {
+            let (msg, _) = if let Ok(msg) = mgen::get_message(&mut stream).await {
+                msg
+            } else {
+                // Unlike the client-server case, we can assume that if there
+                // were a message someone was trying to send us, they'd make
+                // sure to re-establish the connection; so when the socket
+                // breaks, don't bother trying to reform it until we need to
+                // send a message or the peer reaches out to us.
+                break;
+            };
+
+            if msg.body != mgen::MessageBody::Receipt {
+                let group = msg.group.clone();
+                let channel_to_conversation = group_to_conversation_thread
+                    .get(&group)
+                    .unwrap_or_else(|| panic!("Unknown group: {}", group));
+                channel_to_conversation
+                    .send(msg)
+                    .expect("reader: Channel to group closed");
+            }
+        }
+    }
+}
+
+/// The writer thread takes in messages from state threads,
+/// and sends it to the recipient associated with this thread.
+/// If it doesn't have a socket from the listener thread,
+/// it'll create its own and give the read half to the reader thread.
+/// One writer thread per (user, recipient) pair.
+async fn writer<'a>(
+    mut messages_to_send: WriterFromState,
+    mut write_socket_updater: WriteSocketUpdaterOut,
+    read_socket_updater: ReadSocketUpdaterIn,
+    socks_params: SocksParams,
+    retry: Duration,
+) -> Result<(), FatalError> {
+    // make sure this is the first step to avoid connections until there's
+    // something to send
+    let mut msg = messages_to_send
+        .recv()
+        .await
+        .expect("writer: Channel from conversations closed");
+
+    let mut stream = establish_connection(
+        &mut write_socket_updater,
+        &read_socket_updater,
+        &socks_params,
+        retry,
+    )
+    .await
+    .expect("Fatal error establishing connection");
+
+    loop {
+        while msg.write_all_to(&mut stream).await.is_err() {
+            stream = establish_connection(
+                &mut write_socket_updater,
+                &read_socket_updater,
+                &socks_params,
+                retry,
+            )
+            .await
+            .expect("Fatal error establishing connection");
+        }
+
+        msg = messages_to_send
+            .recv()
+            .await
+            .expect("writer: Channel from conversations closed");
+    }
+
+    // helper functions
+
+    /// Attempt to get a connection to the peer,
+    /// whether by getting an existing connection from the listener,
+    /// or by establishing a new connection.
+    async fn establish_connection<'a>(
+        write_socket_updater: &mut WriteSocketUpdaterOut,
+        read_socket_updater: &ReadSocketUpdaterIn,
+        socks_params: &SocksParams,
+        retry: Duration,
+    ) -> Result<OwnedWriteHalf, FatalError> {
+        // first check if the listener thread already has a socket
+        if let Ok(wr) = write_socket_updater.try_recv() {
+            return Ok(wr);
+        }
+
+        // immediately try to connect to the peer
+        let connection_attempt = Socks5Stream::connect_with_password(
+            socks_params.socks.as_str(),
+            socks_params.target.as_str(),
+            &socks_params.user,
+            &socks_params.recipient,
+        )
+        .await;
+        if let Ok(mut stream) = connection_attempt {
+            log!(
+                "connection attempt success from {} to {} on {}",
+                &socks_params.user,
+                &socks_params.recipient,
+                &socks_params.target
+            );
+            stream
+                .write_all(&mgen::serialize_str(&socks_params.user))
+                .await?;
+            let (rd, wr) = stream.into_inner().into_split();
+            read_socket_updater
+                .send(rd)
+                .expect("writer: Channel to reader closed");
+            return Ok(wr);
+        } else if let Err(e) = connection_attempt {
+            let e: MessengerError = e.into();
+            if let MessengerError::Fatal(e) = e {
+                return Err(e);
+            }
+        }
+
+        // Usually we'll have returned by now, but sometimes we'll fail to
+        // connect for whatever reason. Initiate a loop of waiting Duration,
+        // then trying to connect again, allowing it to be inerrupted by
+        // the listener thread.
+
+        loop {
+            match error_collector(
+                write_socket_updater,
+                read_socket_updater,
+                socks_params,
+                retry,
+            )
+            .await
+            {
+                Ok(wr) => return Ok(wr),
+                Err(MessengerError::Recoverable(_)) => continue,
+                Err(MessengerError::Fatal(e)) => return Err(e),
+            }
+        }
+
+        async fn error_collector<'a>(
+            write_socket_updater: &mut WriteSocketUpdaterOut,
+            read_socket_updater: &ReadSocketUpdaterIn,
+            socks_params: &SocksParams,
+            retry: Duration,
+        ) -> Result<OwnedWriteHalf, MessengerError> {
+            tokio::select! {
+                () = tokio::time::sleep(retry) => {
+                    let mut stream = Socks5Stream::connect_with_password(
+                        socks_params.socks.as_str(),
+                        socks_params.target.as_str(),
+                        &socks_params.user,
+                        &socks_params.recipient,
+                    )
+                        .await?;
+                    stream.write_all(&mgen::serialize_str(&socks_params.user)).await?;
+
+                    let (rd, wr) = stream.into_inner().into_split();
+                    read_socket_updater.send(rd).expect("writer: Channel to reader closed");
+                    Ok(wr)
+                },
+                stream = write_socket_updater.recv() => Ok(stream.expect("writer: Channel from listener closed")),
+            }
+        }
+    }
+}
+
+/// Parameters used in establishing the connection through the socks proxy.
+/// (Members may be useful elsewhere as well, but that's the primary purpose.)
+struct SocksParams {
+    socks: String,
+    target: String,
+    user: String,
+    recipient: String,
+}
+
+/// This user or a recipient.
+/// If this user, address is a local address to listen on.
+/// If a recipient, address is a remote address to send to.
+#[derive(Debug, Deserialize)]
+struct Peer {
+    name: String,
+    address: String,
+}
+
+#[derive(Debug, Deserialize)]
+struct Config {
+    user: Peer,
+    group: String,
+    recipients: Vec<Peer>,
+    socks: String,
+    bootstrap: f64,
+    retry: f64,
+    distributions: ConfigDistributions,
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+    let mut args = env::args();
+    let _ = args.next();
+
+    struct ForIoThreads {
+        state_to_writer: mpsc::UnboundedSender<MessageHolder>,
+        writer_from_state: WriterFromState,
+        reader_to_states: HashMap<String, ReaderToState>,
+        str_params: SocksParams,
+        retry: f64,
+    }
+
+    // a map from `user` to {a map from `recipient` to things the (user, recipient) reader/writer threads will need}
+    let mut recipient_maps = HashMap::<String, HashMap<String, ForIoThreads>>::new();
+
+    let mut handles = vec![];
+
+    for config_file in args {
+        let toml_s = std::fs::read_to_string(config_file)?;
+        let config: Config = toml::from_str(&toml_s)?;
+
+        let (reader_to_state, state_from_reader) = mpsc::unbounded_channel();
+
+        if !recipient_maps.contains_key(&config.user.address) {
+            let map = HashMap::<String, ForIoThreads>::new();
+            recipient_maps.insert(config.user.address.clone(), map);
+        }
+
+        let user_recipient_map = recipient_maps.get_mut(&config.user.address).unwrap();
+
+        let mut conversation_recipient_map =
+            HashMap::<String, StateToWriter<MessageHolder>>::with_capacity(config.recipients.len());
+
+        for recipient in config.recipients.iter() {
+            let state_to_writer = if !user_recipient_map.contains_key(&recipient.name) {
+                let (state_to_writer, writer_from_state) = mpsc::unbounded_channel();
+                let mut reader_to_states = HashMap::new();
+                reader_to_states.insert(config.group.clone(), reader_to_state.clone());
+                let str_params = SocksParams {
+                    socks: config.socks.clone(),
+                    target: recipient.address.clone(),
+                    user: config.user.name.clone(),
+                    recipient: recipient.name.clone(),
+                };
+                let for_io = ForIoThreads {
+                    state_to_writer: state_to_writer.clone(),
+                    writer_from_state,
+                    reader_to_states,
+                    str_params,
+                    retry: config.retry,
+                };
+                user_recipient_map.insert(recipient.name.clone(), for_io);
+                state_to_writer
+            } else {
+                user_recipient_map[&recipient.name].state_to_writer.clone()
+            };
+            conversation_recipient_map.insert(
+                recipient.name.clone(),
+                StateToWriter {
+                    channel: state_to_writer,
+                },
+            );
+        }
+
+        let distributions: Distributions = config.distributions.try_into()?;
+
+        tokio::spawn(manage_conversation(
+            config.user.name,
+            config.group,
+            config.recipients.iter().map(|p| p.name.clone()).collect(),
+            distributions,
+            config.bootstrap,
+            state_from_reader,
+            conversation_recipient_map,
+        ));
+    }
+
+    for (address, mut recipient_map) in recipient_maps.drain() {
+        let mut name_to_io_threads: HashMap<String, (ReadSocketUpdaterIn, WriteSocketUpdaterIn)> =
+            HashMap::new();
+
+        for (recipient, for_io) in recipient_map.drain() {
+            let (listener_writer_to_reader, reader_from_listener_writer) =
+                mpsc::unbounded_channel();
+            let (listener_to_writer, writer_from_listener) = mpsc::unbounded_channel();
+            name_to_io_threads.insert(
+                recipient.to_string(),
+                (listener_writer_to_reader.clone(), listener_to_writer),
+            );
+
+            tokio::spawn(reader(reader_from_listener_writer, for_io.reader_to_states));
+
+            let retry = Duration::from_secs_f64(for_io.retry);
+            let handle: task::JoinHandle<Result<(), FatalError>> = tokio::spawn(writer(
+                for_io.writer_from_state,
+                writer_from_listener,
+                listener_writer_to_reader,
+                for_io.str_params,
+                retry,
+            ));
+            handles.push(handle);
+        }
+
+        let handle: task::JoinHandle<Result<(), FatalError>> =
+            tokio::spawn(listener(address.clone(), name_to_io_threads));
+        handles.push(handle);
+    }
+
+    for handle in handles {
+        handle.await??;
+    }
+    Ok(())
+}