Browse Source

refactor client into modules

Justin Tracey 2 years ago
parent
commit
86448cf51f

+ 17 - 621
src/bin/client.rs

@@ -1,470 +1,21 @@
-use mgen::{log, SerializedMessage};
-use rand_distr::{
-    Bernoulli, BernoulliError, Binomial, BinomialError, Distribution, Exp, ExpError, GeoError,
-    Geometric, HyperGeoError, Hypergeometric, LogNormal, Normal, NormalError, Pareto, ParetoError,
-    Poisson, PoissonError, Uniform,
-};
+// Code specific to the client in the client-server mode.
+
 use rand_xoshiro::{rand_core::SeedableRng, Xoshiro256PlusPlus};
 use serde::Deserialize;
 use std::env;
-use std::num::NonZeroU32;
 use std::result::Result;
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
-use tokio::net::TcpStream;
+use tokio::io::AsyncWriteExt;
 use tokio::task;
-use tokio::time::{Duration, Instant};
-
-#[derive(Debug)]
-enum ClientError {
-    Recoverable(RecoverableError),
-    Fatal(FatalError),
-}
-
-/// Errors where it is possible reconnecting could resolve the problem.
-#[derive(Debug)]
-enum RecoverableError {
-    /// Recoverable errors from the socks connection.
-    Socks(tokio_socks::Error),
-    /// Network I/O errors.
-    // Note that all I/O handled by ClientError should be recoverable;
-    // if you need fatal I/O errors, use a different error type.
-    Io(std::io::Error),
-}
-
-/// Errors where something is wrong enough we should terminate.
-#[derive(Debug)]
-enum FatalError {
-    /// Fatal errors from the socks connection.
-    Socks(tokio_socks::Error),
-    /// Errors from parsing the conversation files.
-    Parameter(DistParameterError),
-    /// Error while trying to interpret bytes as a String.
-    Utf8Error(std::str::Utf8Error),
-    /// A message failed to deserialize.
-    MalformedSerialization(Vec<u8>, std::backtrace::Backtrace),
-}
-
-impl std::fmt::Display for ClientError {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(f, "{:?}", self)
-    }
-}
-
-impl std::error::Error for ClientError {}
-
-impl From<mgen::Error> for ClientError {
-    fn from(e: mgen::Error) -> Self {
-        match e {
-            mgen::Error::Io(e) => Self::Recoverable(RecoverableError::Io(e)),
-            mgen::Error::Utf8Error(e) => Self::Fatal(FatalError::Utf8Error(e)),
-            mgen::Error::MalformedSerialization(v, b) => {
-                Self::Fatal(FatalError::MalformedSerialization(v, b))
-            }
-        }
-    }
-}
-
-impl From<DistParameterError> for ClientError {
-    fn from(e: DistParameterError) -> Self {
-        Self::Fatal(FatalError::Parameter(e))
-    }
-}
-
-impl From<tokio_socks::Error> for ClientError {
-    fn from(e: tokio_socks::Error) -> Self {
-        match e {
-            tokio_socks::Error::Io(_)
-            | tokio_socks::Error::ProxyServerUnreachable
-            | tokio_socks::Error::GeneralSocksServerFailure
-            | tokio_socks::Error::HostUnreachable
-            | tokio_socks::Error::TtlExpired => Self::Recoverable(RecoverableError::Socks(e)),
-            _ => Self::Fatal(FatalError::Socks(e)),
-        }
-    }
-}
-
-impl From<std::io::Error> for ClientError {
-    fn from(e: std::io::Error) -> Self {
-        Self::Recoverable(RecoverableError::Io(e))
-    }
-}
-
-/// All possible Conversation state machine states
-enum StateMachine {
-    Idle(Conversation<Idle>),
-    Active(Conversation<Active>),
-}
-
-/// The state machine representing a conversation state and its transitions.
-struct Conversation<S: State> {
-    dists: Distributions,
-    delay: Instant,
-    state: S,
-}
-
-/// The set of Distributions we currently support for message sizes (in padding blocks).
-/// To modify the code to add support for more, one approach is to first add them here,
-/// then fix all the compiler errors and warnings that arise as a result.
-#[derive(Debug)]
-enum MessageDistribution {
-    // Poisson is only defined for floats for technical reasons.
-    // https://rust-random.github.io/book/guide-dist.html#integers
-    Poisson(Poisson<f64>),
-    Binomial(Binomial),
-    Geometric(Geometric),
-    Hypergeometric(Hypergeometric),
-}
-
-impl Distribution<u32> for MessageDistribution {
-    fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> u32 {
-        let ret = match self {
-            Self::Poisson(d) => d.sample(rng) as u64,
-            Self::Binomial(d) => d.sample(rng),
-            Self::Geometric(d) => d.sample(rng),
-            Self::Hypergeometric(d) => d.sample(rng),
-        };
-        std::cmp::min(ret, mgen::MAX_BLOCKS_IN_BODY.into()) as u32
-    }
-}
-
-/// The set of Distributions we currently support for timings.
-/// To modify the code to add support for more, one approach is to first add them here,
-/// then fix all the compiler errors and warnings that arise as a result.
-#[derive(Debug)]
-enum TimingDistribution {
-    Normal(Normal<f64>),
-    LogNormal(LogNormal<f64>),
-    Uniform(Uniform<f64>),
-    Exp(Exp<f64>),
-    Pareto(Pareto<f64>),
-}
-
-impl Distribution<f64> for TimingDistribution {
-    fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> f64 {
-        let ret = match self {
-            Self::Normal(d) => d.sample(rng),
-            Self::LogNormal(d) => d.sample(rng),
-            Self::Uniform(d) => d.sample(rng),
-            Self::Exp(d) => d.sample(rng),
-            Self::Pareto(d) => d.sample(rng),
-        };
-        ret.max(0.0)
-    }
-}
-
-/// The set of distributions necessary to represent the actions of the state machine.
-#[derive(Debug)]
-struct Distributions {
-    m: MessageDistribution,
-    i: TimingDistribution,
-    w: TimingDistribution,
-    a_s: TimingDistribution,
-    a_r: TimingDistribution,
-    s: Bernoulli,
-    r: Bernoulli,
-}
-
-trait State {
-    fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine
-    where
-        Self: Sized;
-    fn received(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine
-    where
-        Self: Sized;
-    fn to_machine(conversation: Conversation<Self>) -> StateMachine
-    where
-        Self: Sized;
-}
-
-struct Idle {}
-struct Active {
-    wait: Instant,
-}
-
-impl State for Idle {
-    fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
-        if conversation.dists.s.sample(rng) {
-            log!("Idle: [sent] tranisition to [Active]");
-            let delay = Instant::now() + conversation.dists.a_s.sample_secs(rng);
-            let wait = Instant::now() + conversation.dists.w.sample_secs(rng);
-            StateMachine::Active({
-                Conversation::<Active> {
-                    dists: conversation.dists,
-                    delay,
-                    state: Active { wait },
-                }
-            })
-        } else {
-            log!("Idle: [sent] tranisition to [Idle]");
-            let delay = Instant::now() + conversation.dists.i.sample_secs(rng);
-            StateMachine::Idle({
-                Conversation::<Idle> {
-                    dists: conversation.dists,
-                    delay,
-                    state: Idle {},
-                }
-            })
-        }
-    }
-
-    fn received(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
-        if conversation.dists.r.sample(rng) {
-            log!("Idle: [recv'd] tranisition to [Active]");
-            let wait = Instant::now() + conversation.dists.w.sample_secs(rng);
-            let delay = Instant::now() + conversation.dists.a_r.sample_secs(rng);
-            StateMachine::Active({
-                Conversation::<Active> {
-                    dists: conversation.dists,
-                    delay,
-                    state: Active { wait },
-                }
-            })
-        } else {
-            log!("Idle: [recv'd] tranisition to [Idle]");
-            StateMachine::Idle(conversation)
-        }
-    }
-
-    fn to_machine(conversation: Conversation<Self>) -> StateMachine {
-        StateMachine::Idle(conversation)
-    }
-}
-
-impl State for Active {
-    fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
-        log!("Active: [sent] transition to [Active]");
-        let delay = Instant::now() + conversation.dists.a_s.sample_secs(rng);
-        StateMachine::Active(Conversation::<Active> {
-            dists: conversation.dists,
-            delay,
-            state: conversation.state,
-        })
-    }
-
-    fn received(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
-        log!("Active: [recv'd] transition to [Active]");
-        let delay = Instant::now() + conversation.dists.a_r.sample_secs(rng);
-        StateMachine::Active(Conversation::<Active> {
-            dists: conversation.dists,
-            delay,
-            state: conversation.state,
-        })
-    }
-
-    fn to_machine(conversation: Conversation<Self>) -> StateMachine {
-        StateMachine::Active(conversation)
-    }
-}
-
-impl Conversation<Idle> {
-    fn start(dists: Distributions, rng: &mut Xoshiro256PlusPlus) -> Self {
-        let delay = Instant::now() + dists.i.sample_secs(rng);
-        log!("[start]");
-        Self {
-            dists,
-            delay,
-            state: Idle {},
-        }
-    }
-}
-
-impl Conversation<Active> {
-    fn waited(self, rng: &mut Xoshiro256PlusPlus) -> Conversation<Idle> {
-        log!("Active: [waited] tranision to [Idle]");
-        let delay = Instant::now() + self.dists.i.sample_secs(rng);
-        Conversation::<Idle> {
-            dists: self.dists,
-            delay,
-            state: Idle {},
-        }
-    }
-
-    async fn sleep(delay: Instant, wait: Instant) -> ActiveActions {
-        if delay < wait {
-            log!("delaying for {:?}", delay - Instant::now());
-            tokio::time::sleep_until(delay).await;
-            ActiveActions::Send
-        } else {
-            log!("waiting for {:?}", wait - Instant::now());
-            tokio::time::sleep_until(wait).await;
-            ActiveActions::Idle
-        }
-    }
-}
-
-/// Attempt to read some portion of the header size from the stream.
-/// The number of bytes written is returned in the Ok case.
-/// The caller must read any remaining bytes less than 4.
-// N.B.: This must be written cancellation safe!
-// https://docs.rs/tokio/1.26.0/tokio/macro.select.html#cancellation-safety
-async fn read_header_size(
-    stream: &mut TcpStream,
-    header_size: &mut [u8; 4],
-) -> Result<usize, ClientError> {
-    let read = stream.read(header_size).await?;
-
-    if read == 0 {
-        Err(tokio::io::Error::new(
-            tokio::io::ErrorKind::WriteZero,
-            "failed to read any bytes from message with bytes remaining",
-        )
-        .into())
-    } else {
-        Ok(read)
-    }
-}
-
-async fn send_action<T: State>(
-    conversation: Conversation<T>,
-    stream: &mut TcpStream,
-    our_id: &str,
-    recipients: Vec<&str>,
-    rng: &mut Xoshiro256PlusPlus,
-) -> Result<StateMachine, (StateMachine, ClientError)> {
-    let size = conversation.dists.m.sample(rng);
-    log!(
-        "sending message from {} to {:?} of size {}",
-        our_id,
-        recipients,
-        size
-    );
-    let m = construct_message(
-        our_id.to_string(),
-        recipients.iter().map(|s| s.to_string()).collect(),
-        size,
-    );
+use tokio::time::Duration;
+use tokio_socks::tcp::Socks5Stream;
 
-    if let Err(e) = m.write_all_to(stream).await {
-        return Err((T::to_machine(conversation), e.into()));
-    }
-    if let Err(e) = stream.flush().await {
-        return Err((T::to_machine(conversation), e.into()));
-    }
+mod messenger;
 
-    Ok(T::sent(conversation, rng))
-}
-
-async fn receive_action<T: State>(
-    n: usize,
-    mut header_size: [u8; 4],
-    conversation: Conversation<T>,
-    stream: &mut TcpStream,
-    our_id: &str,
-    rng: &mut Xoshiro256PlusPlus,
-) -> Result<StateMachine, (StateMachine, ClientError)> {
-    if n < 4 {
-        // we didn't get the whole size, but we can use read_exact now
-        if let Err(e) = stream.read_exact(&mut header_size[n..]).await {
-            return Err((T::to_machine(conversation), e.into()));
-        }
-    }
-
-    let msg = match mgen::get_message_with_header_size(stream, header_size).await {
-        Ok((msg, _)) => msg,
-        Err(e) => return Err((T::to_machine(conversation), e.into())),
-    };
-
-    match msg.body {
-        mgen::MessageBody::Size(size) => {
-            log!(
-                "{:?} got message from {} of size {}",
-                msg.recipients,
-                msg.sender,
-                size
-            );
-            let m = construct_receipt(our_id.to_string(), msg.sender);
-            if let Err(e) = m.write_all_to(stream).await {
-                return Err((T::to_machine(conversation), e.into()));
-            }
-            if let Err(e) = stream.flush().await {
-                return Err((T::to_machine(conversation), e.into()));
-            }
-            Ok(T::received(conversation, rng))
-        }
-        mgen::MessageBody::Receipt => Ok(T::to_machine(conversation)),
-    }
-}
-
-enum IdleActions {
-    Send,
-    Receive(usize),
-}
-
-async fn manage_idle_conversation(
-    conversation: Conversation<Idle>,
-    stream: &mut TcpStream,
-    our_id: &str,
-    recipients: Vec<&str>,
-    rng: &mut Xoshiro256PlusPlus,
-) -> Result<StateMachine, (StateMachine, ClientError)> {
-    log!("delaying for {:?}", conversation.delay - Instant::now());
-    let mut header_size = [0; 4];
-    let action = tokio::select! {
-        () = tokio::time::sleep_until(conversation.delay) => {
-            Ok(IdleActions::Send)
-        }
+use crate::messenger::dists::{ConfigDistributions, Distributions};
+use crate::messenger::error::MessengerError;
+use crate::messenger::state::{manage_active_conversation, manage_idle_conversation, StateMachine};
 
-        res = read_header_size(stream, &mut header_size) => {
-            match res {
-                Ok(n) => Ok(IdleActions::Receive(n)),
-                Err(e) => Err(e),
-            }
-        }
-    };
-    let action = match action {
-        Ok(action) => action,
-        Err(e) => return Err((StateMachine::Idle(conversation), e)),
-    };
-
-    match action {
-        IdleActions::Send => send_action(conversation, stream, our_id, recipients, rng).await,
-        IdleActions::Receive(n) => {
-            receive_action(n, header_size, conversation, stream, our_id, rng).await
-        }
-    }
-}
-
-enum ActiveActions {
-    Send,
-    Receive(usize),
-    Idle,
-}
-
-async fn manage_active_conversation(
-    conversation: Conversation<Active>,
-    stream: &mut TcpStream,
-    our_id: &str,
-    recipients: Vec<&str>,
-    rng: &mut Xoshiro256PlusPlus,
-) -> Result<StateMachine, (StateMachine, ClientError)> {
-    let mut header_size = [0; 4];
-    let action = tokio::select! {
-        action = Conversation::<Active>::sleep(conversation.delay, conversation.state.wait) => {
-            Ok(action)
-        }
-
-        res = read_header_size(stream, &mut header_size) => {
-            match res {
-                Ok(n) => Ok(ActiveActions::Receive(n)),
-                Err(e) => Err(e),
-            }
-        }
-    };
-    let action = match action {
-        Ok(action) => action,
-        Err(e) => return Err((StateMachine::Active(conversation), e)),
-    };
-
-    match action {
-        ActiveActions::Send => send_action(conversation, stream, our_id, recipients, rng).await,
-        ActiveActions::Receive(n) => {
-            receive_action(n, header_size, conversation, stream, our_id, rng).await
-        }
-        ActiveActions::Idle => Ok(StateMachine::Idle(conversation.waited(rng))),
-    }
-}
-
-async fn manage_conversation(config: Config) -> Result<(), ClientError> {
+async fn manage_conversation(config: Config) -> Result<(), MessengerError> {
     let mut rng = Xoshiro256PlusPlus::from_entropy();
     let distributions: Distributions = config.distributions.try_into()?;
 
@@ -481,8 +32,7 @@ async fn manage_conversation(config: Config) -> Result<(), ClientError> {
         group: &config.group,
     };
 
-    let mut state_machine =
-        StateMachine::Idle(Conversation::<Idle>::start(distributions, &mut rng));
+    let mut state_machine = StateMachine::start(distributions, &mut rng);
     let recipients: Vec<&str> = config.recipients.iter().map(String::as_str).collect();
 
     async fn error_collector<'a>(
@@ -491,8 +41,8 @@ async fn manage_conversation(config: Config) -> Result<(), ClientError> {
         rng: &mut Xoshiro256PlusPlus,
         mut state_machine: StateMachine,
         recipients: Vec<&str>,
-    ) -> Result<(), (StateMachine, ClientError)> {
-        let mut stream = match tokio_socks::tcp::Socks5Stream::connect_with_password(
+    ) -> Result<(), (StateMachine, MessengerError)> {
+        let mut stream = match Socks5Stream::connect_with_password(
             str_params.socks,
             str_params.server,
             str_params.sender,
@@ -503,6 +53,7 @@ async fn manage_conversation(config: Config) -> Result<(), ClientError> {
             Ok(stream) => stream,
             Err(e) => return Err((state_machine, e.into())),
         };
+
         if let Err(e) = stream
             .write_all(&mgen::serialize_str(str_params.sender))
             .await
@@ -553,7 +104,7 @@ async fn manage_conversation(config: Config) -> Result<(), ClientError> {
     .await
     .expect_err("Inner loop returned Ok?")
     {
-        (sm, ClientError::Recoverable(_)) => {
+        (sm, MessengerError::Recoverable(_)) => {
             state_machine = sm;
             tokio::time::sleep(retry).await;
         }
@@ -571,7 +122,7 @@ async fn manage_conversation(config: Config) -> Result<(), ClientError> {
         .await
         .expect_err("Inner loop returned Ok?")
         {
-            (sm, ClientError::Recoverable(_)) => {
+            (sm, MessengerError::Recoverable(_)) => {
                 state_machine = sm;
                 tokio::time::sleep(retry).await;
             }
@@ -580,161 +131,6 @@ async fn manage_conversation(config: Config) -> Result<(), ClientError> {
     }
 }
 
-impl TimingDistribution {
-    fn sample_secs(&self, rng: &mut Xoshiro256PlusPlus) -> Duration {
-        Duration::from_secs_f64(self.sample(rng))
-    }
-}
-
-/// Construct and serialize a message from the sender to the recipients with the given number of blocks.
-fn construct_message(sender: String, recipients: Vec<String>, blocks: u32) -> SerializedMessage {
-    let size = std::cmp::max(blocks, 1) * mgen::PADDING_BLOCK_SIZE;
-    let m = mgen::MessageHeader {
-        sender,
-        recipients,
-        body: mgen::MessageBody::Size(NonZeroU32::new(size).unwrap()),
-    };
-    m.serialize()
-}
-
-fn construct_receipt(sender: String, recipient: String) -> SerializedMessage {
-    let m = mgen::MessageHeader {
-        sender,
-        recipients: vec![recipient],
-        body: mgen::MessageBody::Receipt,
-    };
-    m.serialize()
-}
-
-/// The same as Distributions, but designed for easier deserialization.
-#[derive(Debug, Deserialize)]
-struct ConfigDistributions {
-    m: ConfigMessageDistribution,
-    i: ConfigTimingDistribution,
-    w: ConfigTimingDistribution,
-    a_s: ConfigTimingDistribution,
-    a_r: ConfigTimingDistribution,
-    s: f64,
-    r: f64,
-}
-
-/// The same as MessageDistribution, but designed for easier deserialization.
-#[derive(Debug, Deserialize)]
-#[serde(tag = "distribution")]
-enum ConfigMessageDistribution {
-    Poisson {
-        lambda: f64,
-    },
-    Binomial {
-        n: u64,
-        p: f64,
-    },
-    Geometric {
-        p: f64,
-    },
-    Hypergeometric {
-        total_population_size: u64,
-        population_with_feature: u64,
-        sample_size: u64,
-    },
-}
-
-/// The same as TimingDistribution, but designed for easier deserialization.
-#[derive(Debug, Deserialize)]
-#[serde(tag = "distribution")]
-enum ConfigTimingDistribution {
-    Normal { mean: f64, std_dev: f64 },
-    LogNormal { mean: f64, std_dev: f64 },
-    Uniform { low: f64, high: f64 },
-    Exp { lambda: f64 },
-    Pareto { scale: f64, shape: f64 },
-}
-
-#[derive(Debug)]
-enum DistParameterError {
-    Poisson(PoissonError),
-    Binomial(BinomialError),
-    Geometric(GeoError),
-    Hypergeometric(HyperGeoError),
-    Bernoulli(BernoulliError),
-    Normal(NormalError),
-    LogNormal(NormalError),
-    Uniform, // Uniform::new doesn't return an error, it just panics
-    Exp(ExpError),
-    Pareto(ParetoError),
-}
-
-impl TryFrom<ConfigMessageDistribution> for MessageDistribution {
-    type Error = DistParameterError;
-
-    fn try_from(dist: ConfigMessageDistribution) -> Result<Self, DistParameterError> {
-        let dist = match dist {
-            ConfigMessageDistribution::Poisson { lambda } => MessageDistribution::Poisson(
-                Poisson::new(lambda).map_err(DistParameterError::Poisson)?,
-            ),
-            ConfigMessageDistribution::Binomial { n, p } => MessageDistribution::Binomial(
-                Binomial::new(n, p).map_err(DistParameterError::Binomial)?,
-            ),
-            ConfigMessageDistribution::Geometric { p } => MessageDistribution::Geometric(
-                Geometric::new(p).map_err(DistParameterError::Geometric)?,
-            ),
-            ConfigMessageDistribution::Hypergeometric {
-                total_population_size,
-                population_with_feature,
-                sample_size,
-            } => MessageDistribution::Hypergeometric(
-                Hypergeometric::new(total_population_size, population_with_feature, sample_size)
-                    .map_err(DistParameterError::Hypergeometric)?,
-            ),
-        };
-        Ok(dist)
-    }
-}
-
-impl TryFrom<ConfigTimingDistribution> for TimingDistribution {
-    type Error = DistParameterError;
-
-    fn try_from(dist: ConfigTimingDistribution) -> Result<Self, DistParameterError> {
-        let dist = match dist {
-            ConfigTimingDistribution::Normal { mean, std_dev } => TimingDistribution::Normal(
-                Normal::new(mean, std_dev).map_err(DistParameterError::Normal)?,
-            ),
-            ConfigTimingDistribution::LogNormal { mean, std_dev } => TimingDistribution::LogNormal(
-                LogNormal::new(mean, std_dev).map_err(DistParameterError::LogNormal)?,
-            ),
-            ConfigTimingDistribution::Uniform { low, high } => {
-                if low >= high {
-                    return Err(DistParameterError::Uniform);
-                }
-                TimingDistribution::Uniform(Uniform::new(low, high))
-            }
-            ConfigTimingDistribution::Exp { lambda } => {
-                TimingDistribution::Exp(Exp::new(lambda).map_err(DistParameterError::Exp)?)
-            }
-            ConfigTimingDistribution::Pareto { scale, shape } => TimingDistribution::Pareto(
-                Pareto::new(scale, shape).map_err(DistParameterError::Pareto)?,
-            ),
-        };
-        Ok(dist)
-    }
-}
-
-impl TryFrom<ConfigDistributions> for Distributions {
-    type Error = DistParameterError;
-
-    fn try_from(config: ConfigDistributions) -> Result<Self, DistParameterError> {
-        Ok(Distributions {
-            m: config.m.try_into()?,
-            i: config.i.try_into()?,
-            w: config.w.try_into()?,
-            a_s: config.a_s.try_into()?,
-            a_r: config.a_r.try_into()?,
-            s: Bernoulli::new(config.s).map_err(DistParameterError::Bernoulli)?,
-            r: Bernoulli::new(config.r).map_err(DistParameterError::Bernoulli)?,
-        })
-    }
-}
-
 #[derive(Debug, Deserialize)]
 struct Config {
     sender: String,
@@ -755,7 +151,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     for config_file in args {
         let toml_s = std::fs::read_to_string(config_file)?;
         let config = toml::from_str(&toml_s)?;
-        let handle: task::JoinHandle<Result<(), ClientError>> =
+        let handle: task::JoinHandle<Result<(), MessengerError>> =
             tokio::spawn(manage_conversation(config));
         handles.push(handle);
     }

+ 207 - 0
src/bin/messenger/dists.rs

@@ -0,0 +1,207 @@
+// Representations of Distributions for sampling timing and message sizes.
+
+use rand_distr::{
+    Bernoulli, BernoulliError, Binomial, BinomialError, Distribution, Exp, ExpError, GeoError,
+    Geometric, HyperGeoError, Hypergeometric, LogNormal, Normal, NormalError, Pareto, ParetoError,
+    Poisson, PoissonError, Uniform,
+};
+use rand_xoshiro::Xoshiro256PlusPlus;
+use serde::Deserialize;
+use tokio::time::Duration;
+
+/// The set of Distributions we currently support for message sizes (in padding blocks).
+/// To modify the code to add support for more, one approach is to first add them here,
+/// then fix all the compiler errors and warnings that arise as a result.
+#[derive(Debug)]
+pub enum MessageDistribution {
+    // Poisson is only defined for floats for technical reasons.
+    // https://rust-random.github.io/book/guide-dist.html#integers
+    Poisson(Poisson<f64>),
+    Binomial(Binomial),
+    Geometric(Geometric),
+    Hypergeometric(Hypergeometric),
+}
+
+impl Distribution<u32> for MessageDistribution {
+    fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> u32 {
+        let ret = match self {
+            Self::Poisson(d) => d.sample(rng) as u64,
+            Self::Binomial(d) => d.sample(rng),
+            Self::Geometric(d) => d.sample(rng),
+            Self::Hypergeometric(d) => d.sample(rng),
+        };
+        std::cmp::min(ret, mgen::MAX_BLOCKS_IN_BODY.into()) as u32
+    }
+}
+
+/// The set of Distributions we currently support for timings.
+/// To modify the code to add support for more, one approach is to first add them here,
+/// then fix all the compiler errors and warnings that arise as a result.
+#[derive(Debug)]
+pub enum TimingDistribution {
+    Normal(Normal<f64>),
+    LogNormal(LogNormal<f64>),
+    Uniform(Uniform<f64>),
+    Exp(Exp<f64>),
+    Pareto(Pareto<f64>),
+}
+
+impl Distribution<f64> for TimingDistribution {
+    fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> f64 {
+        let ret = match self {
+            Self::Normal(d) => d.sample(rng),
+            Self::LogNormal(d) => d.sample(rng),
+            Self::Uniform(d) => d.sample(rng),
+            Self::Exp(d) => d.sample(rng),
+            Self::Pareto(d) => d.sample(rng),
+        };
+        ret.max(0.0)
+    }
+}
+
+/// The set of distributions necessary to represent the actions of the state machine.
+#[derive(Debug)]
+pub struct Distributions {
+    pub m: MessageDistribution,
+    pub i: TimingDistribution,
+    pub w: TimingDistribution,
+    pub a_s: TimingDistribution,
+    pub a_r: TimingDistribution,
+    pub s: Bernoulli,
+    pub r: Bernoulli,
+}
+
+impl TimingDistribution {
+    pub fn sample_secs(&self, rng: &mut Xoshiro256PlusPlus) -> Duration {
+        Duration::from_secs_f64(self.sample(rng))
+    }
+}
+
+/// The same as Distributions, but designed for easier deserialization.
+#[derive(Debug, Deserialize)]
+pub struct ConfigDistributions {
+    m: ConfigMessageDistribution,
+    i: ConfigTimingDistribution,
+    w: ConfigTimingDistribution,
+    a_s: ConfigTimingDistribution,
+    a_r: ConfigTimingDistribution,
+    s: f64,
+    r: f64,
+}
+
+/// The same as MessageDistribution, but designed for easier deserialization.
+#[derive(Debug, Deserialize)]
+#[serde(tag = "distribution")]
+enum ConfigMessageDistribution {
+    Poisson {
+        lambda: f64,
+    },
+    Binomial {
+        n: u64,
+        p: f64,
+    },
+    Geometric {
+        p: f64,
+    },
+    Hypergeometric {
+        total_population_size: u64,
+        population_with_feature: u64,
+        sample_size: u64,
+    },
+}
+
+/// The same as TimingDistribution, but designed for easier deserialization.
+#[derive(Debug, Deserialize)]
+#[serde(tag = "distribution")]
+enum ConfigTimingDistribution {
+    Normal { mean: f64, std_dev: f64 },
+    LogNormal { mean: f64, std_dev: f64 },
+    Uniform { low: f64, high: f64 },
+    Exp { lambda: f64 },
+    Pareto { scale: f64, shape: f64 },
+}
+
+#[derive(Debug)]
+pub enum DistParameterError {
+    Poisson(PoissonError),
+    Binomial(BinomialError),
+    Geometric(GeoError),
+    Hypergeometric(HyperGeoError),
+    Bernoulli(BernoulliError),
+    Normal(NormalError),
+    LogNormal(NormalError),
+    Uniform, // Uniform::new doesn't return an error, it just panics
+    Exp(ExpError),
+    Pareto(ParetoError),
+}
+
+impl TryFrom<ConfigMessageDistribution> for MessageDistribution {
+    type Error = DistParameterError;
+
+    fn try_from(dist: ConfigMessageDistribution) -> Result<Self, DistParameterError> {
+        let dist = match dist {
+            ConfigMessageDistribution::Poisson { lambda } => MessageDistribution::Poisson(
+                Poisson::new(lambda).map_err(DistParameterError::Poisson)?,
+            ),
+            ConfigMessageDistribution::Binomial { n, p } => MessageDistribution::Binomial(
+                Binomial::new(n, p).map_err(DistParameterError::Binomial)?,
+            ),
+            ConfigMessageDistribution::Geometric { p } => MessageDistribution::Geometric(
+                Geometric::new(p).map_err(DistParameterError::Geometric)?,
+            ),
+            ConfigMessageDistribution::Hypergeometric {
+                total_population_size,
+                population_with_feature,
+                sample_size,
+            } => MessageDistribution::Hypergeometric(
+                Hypergeometric::new(total_population_size, population_with_feature, sample_size)
+                    .map_err(DistParameterError::Hypergeometric)?,
+            ),
+        };
+        Ok(dist)
+    }
+}
+
+impl TryFrom<ConfigTimingDistribution> for TimingDistribution {
+    type Error = DistParameterError;
+
+    fn try_from(dist: ConfigTimingDistribution) -> Result<Self, DistParameterError> {
+        let dist = match dist {
+            ConfigTimingDistribution::Normal { mean, std_dev } => TimingDistribution::Normal(
+                Normal::new(mean, std_dev).map_err(DistParameterError::Normal)?,
+            ),
+            ConfigTimingDistribution::LogNormal { mean, std_dev } => TimingDistribution::LogNormal(
+                LogNormal::new(mean, std_dev).map_err(DistParameterError::LogNormal)?,
+            ),
+            ConfigTimingDistribution::Uniform { low, high } => {
+                if low >= high {
+                    return Err(DistParameterError::Uniform);
+                }
+                TimingDistribution::Uniform(Uniform::new(low, high))
+            }
+            ConfigTimingDistribution::Exp { lambda } => {
+                TimingDistribution::Exp(Exp::new(lambda).map_err(DistParameterError::Exp)?)
+            }
+            ConfigTimingDistribution::Pareto { scale, shape } => TimingDistribution::Pareto(
+                Pareto::new(scale, shape).map_err(DistParameterError::Pareto)?,
+            ),
+        };
+        Ok(dist)
+    }
+}
+
+impl TryFrom<ConfigDistributions> for Distributions {
+    type Error = DistParameterError;
+
+    fn try_from(config: ConfigDistributions) -> Result<Self, DistParameterError> {
+        Ok(Distributions {
+            m: config.m.try_into()?,
+            i: config.i.try_into()?,
+            w: config.w.try_into()?,
+            a_s: config.a_s.try_into()?,
+            a_r: config.a_r.try_into()?,
+            s: Bernoulli::new(config.s).map_err(DistParameterError::Bernoulli)?,
+            r: Bernoulli::new(config.r).map_err(DistParameterError::Bernoulli)?,
+        })
+    }
+}

+ 78 - 0
src/bin/messenger/error.rs

@@ -0,0 +1,78 @@
+// Representation of errors encountered while running the state machine.
+
+use crate::messenger::dists::DistParameterError;
+
+#[derive(Debug)]
+pub enum MessengerError {
+    Recoverable(RecoverableError),
+    Fatal(FatalError),
+}
+
+/// Errors where it is possible reconnecting could resolve the problem.
+#[derive(Debug)]
+pub enum RecoverableError {
+    /// Recoverable errors from the socks connection.
+    Socks(tokio_socks::Error),
+    /// Network I/O errors.
+    // Note that all I/O handled by MessengerError should be recoverable;
+    // if you need fatal I/O errors, use a different error type.
+    Io(std::io::Error),
+}
+
+/// Errors where something is wrong enough we should terminate.
+#[derive(Debug)]
+pub enum FatalError {
+    /// Fatal errors from the socks connection.
+    Socks(tokio_socks::Error),
+    /// Errors from parsing the conversation files.
+    Parameter(DistParameterError),
+    /// Error while trying to interpret bytes as a String.
+    Utf8Error(std::str::Utf8Error),
+    /// A message failed to deserialize.
+    MalformedSerialization(Vec<u8>, std::backtrace::Backtrace),
+}
+
+impl std::fmt::Display for MessengerError {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{:?}", self)
+    }
+}
+
+impl std::error::Error for MessengerError {}
+
+impl From<mgen::Error> for MessengerError {
+    fn from(e: mgen::Error) -> Self {
+        match e {
+            mgen::Error::Io(e) => Self::Recoverable(RecoverableError::Io(e)),
+            mgen::Error::Utf8Error(e) => Self::Fatal(FatalError::Utf8Error(e)),
+            mgen::Error::MalformedSerialization(v, b) => {
+                Self::Fatal(FatalError::MalformedSerialization(v, b))
+            }
+        }
+    }
+}
+
+impl From<DistParameterError> for MessengerError {
+    fn from(e: DistParameterError) -> Self {
+        Self::Fatal(FatalError::Parameter(e))
+    }
+}
+
+impl From<tokio_socks::Error> for MessengerError {
+    fn from(e: tokio_socks::Error) -> Self {
+        match e {
+            tokio_socks::Error::Io(_)
+            | tokio_socks::Error::ProxyServerUnreachable
+            | tokio_socks::Error::GeneralSocksServerFailure
+            | tokio_socks::Error::HostUnreachable
+            | tokio_socks::Error::TtlExpired => Self::Recoverable(RecoverableError::Socks(e)),
+            _ => Self::Fatal(FatalError::Socks(e)),
+        }
+    }
+}
+
+impl From<std::io::Error> for MessengerError {
+    fn from(e: std::io::Error) -> Self {
+        Self::Recoverable(RecoverableError::Io(e))
+    }
+}

+ 26 - 0
src/bin/messenger/message.rs

@@ -0,0 +1,26 @@
+// Some utility functions for constructing common messages in the messengers.
+// (Most message functionality is in the library, not this module.)
+
+/// Construct and serialize a message from the sender to the recipients with the given number of blocks.
+pub fn construct_message(
+    sender: String,
+    recipients: Vec<String>,
+    blocks: u32,
+) -> mgen::SerializedMessage {
+    let size = std::cmp::max(blocks, 1) * mgen::PADDING_BLOCK_SIZE;
+    let m = mgen::MessageHeader {
+        sender,
+        recipients,
+        body: mgen::MessageBody::Size(std::num::NonZeroU32::new(size).unwrap()),
+    };
+    m.serialize()
+}
+
+pub fn construct_receipt(sender: String, recipient: String) -> mgen::SerializedMessage {
+    let m = mgen::MessageHeader {
+        sender,
+        recipients: vec![recipient],
+        body: mgen::MessageBody::Receipt,
+    };
+    m.serialize()
+}

+ 5 - 0
src/bin/messenger/mod.rs

@@ -0,0 +1,5 @@
+// Modules for the client in client-server mode and the peer in peer-to-peer mode.
+pub mod dists;
+pub mod error;
+pub mod message;
+pub mod state;

+ 333 - 0
src/bin/messenger/state.rs

@@ -0,0 +1,333 @@
+// The state machine used to represent one end of a conversation.
+// This includes inducing transitions and actions taken during transitions,
+// so a lot of the messenger network code is here.
+
+use mgen::log;
+use rand_distr::Distribution;
+use rand_xoshiro::Xoshiro256PlusPlus;
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::net::TcpStream;
+use tokio::time::Instant;
+
+use crate::messenger::dists::Distributions;
+use crate::messenger::error::MessengerError;
+use crate::messenger::message::{construct_message, construct_receipt};
+
+/// All possible Conversation state machine states
+pub enum StateMachine {
+    Idle(Conversation<Idle>),
+    Active(Conversation<Active>),
+}
+
+impl StateMachine {
+    pub fn start(dists: Distributions, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
+        Self::Idle(Conversation::<Idle>::start(dists, rng))
+    }
+}
+
+/// The state machine representing a conversation state and its transitions.
+pub struct Conversation<S: State> {
+    dists: Distributions,
+    delay: Instant,
+    state: S,
+}
+
+pub trait State {
+    fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine
+    where
+        Self: Sized;
+    fn received(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine
+    where
+        Self: Sized;
+    fn to_machine(conversation: Conversation<Self>) -> StateMachine
+    where
+        Self: Sized;
+}
+
+pub struct Idle {}
+pub struct Active {
+    wait: Instant,
+}
+
+impl State for Idle {
+    fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
+        if conversation.dists.s.sample(rng) {
+            log!("Idle: [sent] tranisition to [Active]");
+            let delay = Instant::now() + conversation.dists.a_s.sample_secs(rng);
+            let wait = Instant::now() + conversation.dists.w.sample_secs(rng);
+            StateMachine::Active({
+                Conversation::<Active> {
+                    dists: conversation.dists,
+                    delay,
+                    state: Active { wait },
+                }
+            })
+        } else {
+            log!("Idle: [sent] tranisition to [Idle]");
+            let delay = Instant::now() + conversation.dists.i.sample_secs(rng);
+            StateMachine::Idle({
+                Conversation::<Idle> {
+                    dists: conversation.dists,
+                    delay,
+                    state: Idle {},
+                }
+            })
+        }
+    }
+
+    fn received(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
+        if conversation.dists.r.sample(rng) {
+            log!("Idle: [recv'd] tranisition to [Active]");
+            let wait = Instant::now() + conversation.dists.w.sample_secs(rng);
+            let delay = Instant::now() + conversation.dists.a_r.sample_secs(rng);
+            StateMachine::Active({
+                Conversation::<Active> {
+                    dists: conversation.dists,
+                    delay,
+                    state: Active { wait },
+                }
+            })
+        } else {
+            log!("Idle: [recv'd] tranisition to [Idle]");
+            StateMachine::Idle(conversation)
+        }
+    }
+
+    fn to_machine(conversation: Conversation<Self>) -> StateMachine {
+        StateMachine::Idle(conversation)
+    }
+}
+
+impl State for Active {
+    fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
+        log!("Active: [sent] transition to [Active]");
+        let delay = Instant::now() + conversation.dists.a_s.sample_secs(rng);
+        StateMachine::Active(Conversation::<Active> {
+            dists: conversation.dists,
+            delay,
+            state: conversation.state,
+        })
+    }
+
+    fn received(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
+        log!("Active: [recv'd] transition to [Active]");
+        let delay = Instant::now() + conversation.dists.a_r.sample_secs(rng);
+        StateMachine::Active(Conversation::<Active> {
+            dists: conversation.dists,
+            delay,
+            state: conversation.state,
+        })
+    }
+
+    fn to_machine(conversation: Conversation<Self>) -> StateMachine {
+        StateMachine::Active(conversation)
+    }
+}
+
+impl Conversation<Idle> {
+    fn start(dists: Distributions, rng: &mut Xoshiro256PlusPlus) -> Self {
+        let delay = Instant::now() + dists.i.sample_secs(rng);
+        log!("[start]");
+        Self {
+            dists,
+            delay,
+            state: Idle {},
+        }
+    }
+}
+
+impl Conversation<Active> {
+    fn waited(self, rng: &mut Xoshiro256PlusPlus) -> Conversation<Idle> {
+        log!("Active: [waited] tranision to [Idle]");
+        let delay = Instant::now() + self.dists.i.sample_secs(rng);
+        Conversation::<Idle> {
+            dists: self.dists,
+            delay,
+            state: Idle {},
+        }
+    }
+
+    async fn sleep(delay: Instant, wait: Instant) -> ActiveActions {
+        if delay < wait {
+            log!("delaying for {:?}", delay - Instant::now());
+            tokio::time::sleep_until(delay).await;
+            ActiveActions::Send
+        } else {
+            log!("waiting for {:?}", wait - Instant::now());
+            tokio::time::sleep_until(wait).await;
+            ActiveActions::Idle
+        }
+    }
+}
+
+/// Attempt to read some portion of the header size from the stream.
+/// The number of bytes written is returned in the Ok case.
+/// The caller must read any remaining bytes less than 4.
+// N.B.: This must be written cancellation safe!
+// https://docs.rs/tokio/1.26.0/tokio/macro.select.html#cancellation-safety
+async fn read_header_size(
+    stream: &mut TcpStream,
+    header_size: &mut [u8; 4],
+) -> Result<usize, MessengerError> {
+    let read = stream.read(header_size).await?;
+
+    if read == 0 {
+        Err(tokio::io::Error::new(
+            tokio::io::ErrorKind::WriteZero,
+            "failed to read any bytes from message with bytes remaining",
+        )
+        .into())
+    } else {
+        Ok(read)
+    }
+}
+
+async fn send_action<T: State>(
+    conversation: Conversation<T>,
+    stream: &mut TcpStream,
+    our_id: &str,
+    recipients: Vec<&str>,
+    rng: &mut Xoshiro256PlusPlus,
+) -> Result<StateMachine, (StateMachine, MessengerError)> {
+    let size = conversation.dists.m.sample(rng);
+    log!(
+        "sending message from {} to {:?} of size {}",
+        our_id,
+        recipients,
+        size
+    );
+    let m = construct_message(
+        our_id.to_string(),
+        recipients.iter().map(|s| s.to_string()).collect(),
+        size,
+    );
+
+    if let Err(e) = m.write_all_to(stream).await {
+        return Err((T::to_machine(conversation), e.into()));
+    }
+    if let Err(e) = stream.flush().await {
+        return Err((T::to_machine(conversation), e.into()));
+    }
+
+    Ok(T::sent(conversation, rng))
+}
+
+async fn receive_action<T: State>(
+    n: usize,
+    mut header_size: [u8; 4],
+    conversation: Conversation<T>,
+    stream: &mut TcpStream,
+    our_id: &str,
+    rng: &mut Xoshiro256PlusPlus,
+) -> Result<StateMachine, (StateMachine, MessengerError)> {
+    if n < 4 {
+        // we didn't get the whole size, but we can use read_exact now
+        if let Err(e) = stream.read_exact(&mut header_size[n..]).await {
+            return Err((T::to_machine(conversation), e.into()));
+        }
+    }
+
+    let msg = match mgen::get_message_with_header_size(stream, header_size).await {
+        Ok((msg, _)) => msg,
+        Err(e) => return Err((T::to_machine(conversation), e.into())),
+    };
+
+    match msg.body {
+        mgen::MessageBody::Size(size) => {
+            log!(
+                "{:?} got message from {} of size {}",
+                msg.recipients,
+                msg.sender,
+                size
+            );
+            let m = construct_receipt(our_id.to_string(), msg.sender);
+            if let Err(e) = m.write_all_to(stream).await {
+                return Err((T::to_machine(conversation), e.into()));
+            }
+            if let Err(e) = stream.flush().await {
+                return Err((T::to_machine(conversation), e.into()));
+            }
+            Ok(T::received(conversation, rng))
+        }
+        mgen::MessageBody::Receipt => Ok(T::to_machine(conversation)),
+    }
+}
+
+enum IdleActions {
+    Send,
+    Receive(usize),
+}
+
+pub async fn manage_idle_conversation(
+    conversation: Conversation<Idle>,
+    stream: &mut TcpStream,
+    our_id: &str,
+    recipients: Vec<&str>,
+    rng: &mut Xoshiro256PlusPlus,
+) -> Result<StateMachine, (StateMachine, MessengerError)> {
+    log!("delaying for {:?}", conversation.delay - Instant::now());
+    let mut header_size = [0; 4];
+    let action = tokio::select! {
+        () = tokio::time::sleep_until(conversation.delay) => {
+            Ok(IdleActions::Send)
+        }
+
+        res = read_header_size(stream, &mut header_size) => {
+            match res {
+                Ok(n) => Ok(IdleActions::Receive(n)),
+                Err(e) => Err(e),
+            }
+        }
+    };
+    let action = match action {
+        Ok(action) => action,
+        Err(e) => return Err((StateMachine::Idle(conversation), e)),
+    };
+
+    match action {
+        IdleActions::Send => send_action(conversation, stream, our_id, recipients, rng).await,
+        IdleActions::Receive(n) => {
+            receive_action(n, header_size, conversation, stream, our_id, rng).await
+        }
+    }
+}
+
+enum ActiveActions {
+    Send,
+    Receive(usize),
+    Idle,
+}
+
+pub async fn manage_active_conversation(
+    conversation: Conversation<Active>,
+    stream: &mut TcpStream,
+    our_id: &str,
+    recipients: Vec<&str>,
+    rng: &mut Xoshiro256PlusPlus,
+) -> Result<StateMachine, (StateMachine, MessengerError)> {
+    let mut header_size = [0; 4];
+    let action = tokio::select! {
+        action = Conversation::<Active>::sleep(conversation.delay, conversation.state.wait) => {
+            Ok(action)
+        }
+
+        res = read_header_size(stream, &mut header_size) => {
+            match res {
+                Ok(n) => Ok(ActiveActions::Receive(n)),
+                Err(e) => Err(e),
+            }
+        }
+    };
+    let action = match action {
+        Ok(action) => action,
+        Err(e) => return Err((StateMachine::Active(conversation), e)),
+    };
+
+    match action {
+        ActiveActions::Send => send_action(conversation, stream, our_id, recipients, rng).await,
+        ActiveActions::Receive(n) => {
+            receive_action(n, header_size, conversation, stream, our_id, rng).await
+        }
+        ActiveActions::Idle => Ok(StateMachine::Idle(conversation.waited(rng))),
+    }
+}