Преглед на файлове

make attachments download over HTTP server

Justin Tracey преди 10 месеца
родител
ревизия
04bc61c9b7

+ 1 - 0
Cargo.toml

@@ -9,6 +9,7 @@ futures = "0.3.28"
 glob = "0.3.1"
 hyper = { version = "0.14", default-features = false, features = ["client"] }
 hyper-rustls = "0.24.1"
+hyper-socks2 = { version = "0.8.0", default-features = false, features = ["rustls"] }
 rand = "0.8.5"
 rand_distr = { version = "0.4.3", features = ["serde1"] }
 rand_xoshiro = "0.6.0"

+ 14 - 6
README.md

@@ -16,25 +16,30 @@ These receipts can make up to half of all traffic.
 ## Usage
 
 MGen is written entirely in Rust, and is built like most pure Rust projects.
-If you have a working [Rust install](https://rustup.rs/) with Cargo, you can build and install the peer, client, and server with cargo by running from the project root:
+If you have a working [Rust install](https://rustup.rs/) with Cargo, you can build and install the peer, client, and servers with cargo by running from the project root:
 
 `cargo install --path .`
 
 This will generally place them somewhere in your environment's PATH.
 Normal cargo features also apply—e.g., use `cargo build` to build debug builds, or use the `--release` flag to enable release optimizations without installing.
-The server can be built and executed in debug mode with `cargo run --bin mgen-server`, and similar for the client and peer.
+The message server can be built and executed in debug mode with `cargo run --bin mgen-server`, and similar for the client, peer, and web server (`web`).
 Alternatively, you can run the executables directly from the respective target directory after building (e.g., `./target/release/server`).
 
 ### Invocation
 
-#### Server
+#### Servers
 `mgen-server server.crt server.key [addr:port]`
 
+`mgen-web server.crt server.key [addr:port]`
+
+There are two servers for the centralized model: a message server that relays messages, and a web server used to serve file attachments (no files are actually hosted or served, the traffic is simply generated).
+
+Both servers are invoked in the same way.
 The first two arguments are the paths to the server's TLS certificate and private key files.
 The client is configured to skip validation, so any valid certificate with typical parameters is fine.
-If you don't want to generate your own, you can find pre-generated ones in the [test server template directory](/shadow/client/shadow.data.template/hosts/server/)
+If you don't want to generate your own, you can find pre-generated ones in the [test server template directory](/shadow/client/shadow.data.template/hosts/server/).
 The server will listen for connections on the given interface in the optional third argument.
-If no such argument is given, it will listen on `127.0.0.1:6397`.
+If no such argument is given, it will listen on `127.0.0.1:6397` for the message server, and `127.0.0.1:6398` for the web server.
 
 #### Client/Peer
 `mgen-client [config.yaml]...`
@@ -69,7 +74,10 @@ conversations:
 
     # The <address>:<port> of the message server,
     # where <address> is an IP, onion address, or hostname.
-    server: "server.maybe.onion:6397"
+    message_server: "server.maybe.onion:6397"
+
+    # Similarly, but for the web server.
+    web_server: "server.maybe.onion:6398"
 
     # The number of seconds to wait until the client starts sending messages.
     # This should be long enough that all clients have had time to start

+ 4 - 2
shadow/client/shadow.data.template/hosts/client1/alice.yaml

@@ -1,7 +1,8 @@
 user: "Alice"
 conversations:
   - group: "group1"
-    server: "server:6397"
+    message_server: "server:6397"
+    web_server: "web:6398"
     bootstrap: 5.0
     retry: 5.0
     distributions: &dists
@@ -13,7 +14,8 @@ conversations:
       a_s: { distribution: "Normal", mean: 10.0, std_dev: 5.0 }
       a_r: { distribution: "Normal", mean: 10.0, std_dev: 5.0 }
   - group: "group2"
-    server: "server:6397"
+    message_server: "server:6397"
+    web_server: "web:6398"
     bootstrap: 5.0
     retry: 5.0
     distributions: *dists

+ 4 - 2
shadow/client/shadow.data.template/hosts/client2/bob.yaml

@@ -1,7 +1,8 @@
 user: "Bob"
 conversations:
   - group: "group1"
-    server: "server:6397"
+    message_server: "server:6397"
+    web_server: "web:6398"
     bootstrap: 5.0
     retry: 5.0
     distributions: &dists
@@ -13,7 +14,8 @@ conversations:
       a_s: { distribution: "Normal", mean: 10.0, std_dev: 5.0 }
       a_r: { distribution: "Normal", mean: 10.0, std_dev: 5.0 }
   - group: "group2"
-    server: "server:6397"
+    message_server: "server:6397"
+    web_server: "web:6398"
     bootstrap: 5.0
     retry: 5.0
     distributions: *dists

+ 2 - 1
shadow/client/shadow.data.template/hosts/client2/carol.yaml

@@ -1,7 +1,8 @@
 user: "Carol"
 conversations:
   - group: "group1"
-    server: "server:6397"
+    message_server: "server:6397"
+    web_server: "web:6398"
     bootstrap: 5.0
     retry: 5.0
     distributions:

+ 2 - 1
shadow/client/shadow.data.template/hosts/client3/dave.yaml

@@ -1,7 +1,8 @@
 user: "Dave"
 conversations:
   - group: "group2"
-    server: "server:6397"
+    message_server: "server:6397"
+    web_server: "web:6398"
     bootstrap: 5.0
     retry: 5.0
     distributions:

+ 1 - 0
shadow/client/shadow.data.template/hosts/web/server.crt

@@ -0,0 +1 @@
+../server/server.crt

+ 1 - 0
shadow/client/shadow.data.template/hosts/web/server.key

@@ -0,0 +1 @@
+../server/server.key

+ 9 - 0
shadow/client/shadow.yaml

@@ -17,6 +17,15 @@ hosts:
       start_time: 3s
       expected_final_state: running
 
+  web:
+    network_node_id: 0
+    ip_addr: 100.0.0.2
+    processes:
+    - path: mgen-web
+      args: [server.crt, server.key, 100.0.0.2:6398]
+      start_time: 3s
+      expected_final_state: running
+
   client1: &client_host
     network_node_id: 0
     processes:

+ 2 - 1
src/bin/messenger/tcp.rs

@@ -3,8 +3,9 @@ use tokio_socks::tcp::Socks5Stream;
 
 use crate::messenger::error::MessengerError;
 
-/// Parameters used in establishing the connection, optionally through a socks proxy.
+/// Parameters used in establishing a connection, optionally through a socks proxy.
 /// (Members may be useful elsewhere as well, but that's the primary purpose.)
+#[derive(Clone)]
 pub struct SocksParams {
     /// Optional socks proxy address.
     pub socks: Option<String>,

+ 212 - 25
src/bin/mgen-client.rs

@@ -1,7 +1,7 @@
 // Code specific to the client in the client-server mode.
 
 use mgen::updater::Updater;
-use mgen::{HandshakeRef, MessageHeader, SerializedMessage};
+use mgen::{log, HandshakeRef, MessageHeader, SerializedMessage};
 use rand_xoshiro::{rand_core::SeedableRng, Xoshiro256PlusPlus};
 use serde::Deserialize;
 use std::result::Result;
@@ -40,6 +40,10 @@ type WriteSocketUpdaterOut = Updater<WriteHalf<TlsStream<TcpStream>>>;
 type ErrorChannelIn = mpsc::UnboundedSender<MessengerError>;
 /// Type for getting errors from other threads.
 type ErrorChannelOut = mpsc::UnboundedReceiver<MessengerError>;
+/// Type for sending sizes to the attachment sender thread.
+type SizeChannelIn = mpsc::UnboundedSender<usize>;
+/// Type for getting sizes from other threads.
+type SizeChannelOut = mpsc::UnboundedReceiver<usize>;
 
 // we gain a (very) tiny performance win by not bothering to validate the cert
 struct NoCertificateVerification {}
@@ -62,24 +66,174 @@ impl tokio_rustls::rustls::client::ServerCertVerifier for NoCertificateVerificat
 /// checking for any network errors while doing so,
 /// and giving messages to the state thread.
 async fn reader(
+    web_params: SocksParams,
+    retry: Duration,
+    tls_config: tokio_rustls::rustls::ClientConfig,
     message_channel: ReaderToState,
-    mut socket_updater: ReadSocketUpdaterOut,
+    socket_updater: ReadSocketUpdaterOut,
     error_channel: ErrorChannelIn,
 ) {
-    loop {
-        let mut stream = socket_updater.recv().await;
+    let https = hyper_rustls::HttpsConnectorBuilder::new()
+        .with_tls_config(tls_config)
+        .https_only()
+        .enable_http1()
+        .build();
+
+    match web_params.socks {
+        Some(proxy) => {
+            let auth = hyper_socks2::Auth {
+                username: web_params.user.clone(),
+                password: web_params.recipient,
+            };
+            let socks = hyper_socks2::SocksConnector {
+                proxy_addr: proxy.parse().expect("Invalid proxy URI"),
+                auth: Some(auth),
+                connector: https,
+            };
+            let client: hyper::Client<_, hyper::Body> = hyper::Client::builder().build(socks);
+            worker(
+                web_params.target,
+                web_params.user,
+                retry,
+                client,
+                message_channel,
+                socket_updater,
+                error_channel,
+            )
+            .await
+        }
+        None => {
+            let client = hyper::Client::builder().build(https);
+            worker(
+                web_params.target,
+                web_params.user,
+                retry,
+                client,
+                message_channel,
+                socket_updater,
+                error_channel,
+            )
+            .await
+        }
+    }
+
+    async fn worker<C>(
+        target: String,
+        user: String,
+        retry: Duration,
+        client: hyper::Client<C, hyper::Body>,
+        message_channel: ReaderToState,
+        mut socket_updater: ReadSocketUpdaterOut,
+        error_channel: ErrorChannelIn,
+    ) where
+        C: hyper::client::connect::Connect + Clone + Send + Sync + 'static,
+    {
         loop {
-            let msg = match mgen::get_message(&mut stream).await {
-                Ok(msg) => msg,
-                Err(e) => {
-                    error_channel.send(e.into()).expect("Error channel closed");
-                    break;
+            let mut message_stream = socket_updater.recv().await;
+
+            loop {
+                let msg = match mgen::get_message(&mut message_stream).await {
+                    Ok(msg) => msg,
+                    Err(e) => {
+                        error_channel.send(e.into()).expect("Error channel closed");
+                        break;
+                    }
+                };
+
+                if msg.body.has_attachment() {
+                    let url: hyper::Uri =
+                        format!("{}/?size={}&user={}", target, msg.body.total_size(), user)
+                            .parse()
+                            .expect("Invalid URI");
+                    let client = client.clone();
+                    tokio::spawn(async move {
+                        let mut res = client.get(url.clone()).await;
+                        while res.is_err() {
+                            log!("Error fetching: {}", res.unwrap_err());
+                            tokio::time::sleep(retry).await;
+                            res = client.get(url.clone()).await;
+                        }
+                    });
                 }
+
+                message_channel
+                    .send(msg)
+                    .expect("Reader message channel closed");
+            }
+        }
+    }
+}
+
+async fn uploader(
+    web_params: SocksParams,
+    retry: Duration,
+    tls_config: tokio_rustls::rustls::ClientConfig,
+    size_channel: SizeChannelOut,
+) {
+    let https = hyper_rustls::HttpsConnectorBuilder::new()
+        .with_tls_config(tls_config)
+        .https_only()
+        .enable_http1()
+        .build();
+
+    match web_params.socks {
+        Some(proxy) => {
+            let auth = hyper_socks2::Auth {
+                username: web_params.user.clone(),
+                password: web_params.recipient,
+            };
+            let socks = hyper_socks2::SocksConnector {
+                proxy_addr: proxy.parse().expect("Invalid proxy URI"),
+                auth: Some(auth),
+                connector: https,
             };
+            let client = hyper::Client::builder().build(socks);
+            worker(
+                web_params.target,
+                web_params.user,
+                retry,
+                client,
+                size_channel,
+            )
+            .await
+        }
+        None => {
+            let client = hyper::Client::builder().build(https);
+            worker(
+                web_params.target,
+                web_params.user,
+                retry,
+                client,
+                size_channel,
+            )
+            .await
+        }
+    }
 
-            message_channel
-                .send(msg)
-                .expect("Reader message channel closed");
+    async fn worker<C>(
+        target: String,
+        user: String,
+        retry: Duration,
+        client: hyper::Client<C, hyper::Body>,
+        mut size_channel: SizeChannelOut,
+    ) where
+        C: hyper::client::connect::Connect + Clone + Send + Sync + 'static,
+    {
+        loop {
+            let size = size_channel.recv().await.expect("Size channel closed");
+            let client = client.clone();
+            let url: hyper::Uri = format!("{}/?size={}&user={}", target, size, user)
+                .parse()
+                .expect("Invalid URI");
+            let request = hyper::Request::put(url.clone())
+                .body(hyper::Body::empty())
+                .expect("Invalid HTTP request attempted to construct");
+            let mut res = client.request(request).await;
+            while res.is_err() {
+                log!("Error uploading: {}", res.unwrap_err());
+                tokio::time::sleep(retry).await;
+                res = client.get(url.clone()).await;
+            }
         }
     }
 }
@@ -88,6 +242,7 @@ async fn reader(
 /// and checking for any network errors while doing so.
 async fn writer(
     mut message_channel: WriterFromState,
+    attachment_channel: SizeChannelIn,
     mut socket_updater: WriteSocketUpdaterOut,
     error_channel: ErrorChannelIn,
 ) {
@@ -98,6 +253,13 @@ async fn writer(
                 .recv()
                 .await
                 .expect("Writer message channel closed");
+
+            if msg.body.has_attachment() {
+                attachment_channel
+                    .send(msg.body.total_size())
+                    .expect("Attachment channel closed");
+            }
+
             if let Err(e) = msg.write_all_to(&mut stream).await {
                 error_channel.send(e.into()).expect("Error channel closed");
                 break;
@@ -110,17 +272,12 @@ async fn writer(
 /// and determining how to handle errors this or other threads receive.
 async fn socket_updater(
     str_params: SocksParams,
-    retry: f64,
+    retry: Duration,
+    tls_config: tokio_rustls::rustls::ClientConfig,
     mut error_channel: ErrorChannelOut,
     reader_channel: ReadSocketUpdaterIn,
     writer_channel: WriteSocketUpdaterIn,
 ) -> FatalError {
-    let retry = Duration::from_secs_f64(retry);
-
-    let tls_config = tokio_rustls::rustls::ClientConfig::builder()
-        .with_safe_defaults()
-        .with_custom_certificate_verifier(Arc::new(NoCertificateVerification {}))
-        .with_no_client_auth();
     let connector = TlsConnector::from(Arc::new(tls_config));
 
     // unwrap is safe, split always returns at least one element
@@ -176,9 +333,16 @@ async fn manage_conversation(
     let mut rng = Xoshiro256PlusPlus::from_entropy();
     let distributions: Distributions = config.distributions.try_into()?;
 
-    let str_params = SocksParams {
+    let message_server_params = SocksParams {
+        socks: socks.clone(),
+        target: config.message_server,
+        user: user.clone(),
+        recipient: config.group.clone(),
+    };
+
+    let web_server_params = SocksParams {
         socks,
-        target: config.server,
+        target: config.web_server,
         user: user.clone(),
         recipient: config.group.clone(),
     };
@@ -192,16 +356,38 @@ async fn manage_conversation(
     let write_socket_updater_in = Updater::new();
     let write_socket_updater_out = write_socket_updater_in.clone();
     let (errs_in, errs_out) = mpsc::unbounded_channel();
+    let (writer_to_uploader, uploader_from_writer) = mpsc::unbounded_channel();
+
+    let retry = Duration::from_secs_f64(config.retry);
+    let tls_config = tokio_rustls::rustls::ClientConfig::builder()
+        .with_safe_defaults()
+        .with_custom_certificate_verifier(Arc::new(NoCertificateVerification {}))
+        .with_no_client_auth();
 
     tokio::spawn(reader(
+        web_server_params.clone(),
+        retry,
+        tls_config.clone(),
         reader_to_state,
         read_socket_updater_out,
         errs_in.clone(),
     ));
-    tokio::spawn(writer(writer_from_state, write_socket_updater_out, errs_in));
+    tokio::spawn(writer(
+        writer_from_state,
+        writer_to_uploader,
+        write_socket_updater_out,
+        errs_in,
+    ));
+    tokio::spawn(uploader(
+        web_server_params,
+        retry,
+        tls_config.clone(),
+        uploader_from_writer,
+    ));
     tokio::spawn(socket_updater(
-        str_params,
-        config.retry,
+        message_server_params,
+        retry,
+        tls_config,
         errs_out,
         read_socket_updater_in,
         write_socket_updater_in,
@@ -244,7 +430,8 @@ async fn manage_conversation(
 #[derive(Debug, Deserialize)]
 struct ConversationConfig {
     group: String,
-    server: String,
+    message_server: String,
+    web_server: String,
     bootstrap: f64,
     retry: f64,
     distributions: ConfigDistributions,

+ 25 - 4
src/lib.rs

@@ -65,8 +65,17 @@ pub enum MessageBody {
 }
 
 impl MessageBody {
-    /// Size on the wire of the message's body
-    fn size(&self) -> usize {
+    /// Whether the body of the message requires an HTTP GET
+    /// (attachment size is the message size).
+    pub fn has_attachment(&self) -> bool {
+        match self {
+            MessageBody::Receipt => false,
+            MessageBody::Size(size) => size > &NonZeroU32::new(INLINE_MAX_SIZE).unwrap(),
+        }
+    }
+
+    /// Size on the wire of the message's body, exluding bytes fetched via http
+    fn inline_size(&self) -> usize {
         match self {
             MessageBody::Receipt => PADDING_BLOCK_SIZE as usize,
             MessageBody::Size(size) => {
@@ -79,6 +88,14 @@ impl MessageBody {
             }
         }
     }
+
+    /// Size of the message's body, including bytes fetched via http
+    pub fn total_size(&self) -> usize {
+        match self {
+            MessageBody::Receipt => PADDING_BLOCK_SIZE as usize,
+            MessageBody::Size(size) => size.get() as usize,
+        }
+    }
 }
 
 /// Message metadata.
@@ -240,7 +257,11 @@ async fn get_message_with_header_size<T: AsyncReadExt + std::marker::Unpin>(
     let header = MessageHeader::deserialize(&header_buf[4..])?;
     let header_size_buf = &mut header_buf[..4];
     header_size_buf.copy_from_slice(&header_size_bytes);
-    copy(&mut stream.take(header.body.size() as u64), &mut sink()).await?;
+    copy(
+        &mut stream.take(header.body.inline_size() as u64),
+        &mut sink(),
+    )
+    .await?;
     Ok(header_buf)
 }
 
@@ -289,7 +310,7 @@ impl SerializedMessage {
         &self,
         writer: &mut T,
     ) -> std::io::Result<()> {
-        let body_buf = vec![0; self.body.size()];
+        let body_buf = vec![0; self.body.inline_size()];
 
         // write_all_vectored is not yet stable x_x
         // https://github.com/rust-lang/rust/issues/70436