2 Commits 554f035015 ... 2291a799e8

Author SHA1 Message Date
  Justin Tracey 2291a799e8 client: fix bug in http over socks 2 months ago
  Justin Tracey 958648ce92 client: add some timing noise to first connection 2 months ago
3 changed files with 69 additions and 38 deletions
  1. 1 0
      Cargo.toml
  2. 67 37
      src/bin/mgen-client.rs
  3. 1 1
      src/bin/mgen-server.rs

+ 1 - 0
Cargo.toml

@@ -15,6 +15,7 @@ hyper-socks2 = { version = "0.8.0", default-features = false, features = ["rustl
 rand = "0.8.5"
 rand_distr = { version = "0.4.3", features = ["serde1"] }
 rand_xoshiro = "0.6.0"
+rustc-hash = "1.1.0"
 rustls-pemfile = "1.0.3"
 serde = { version = "1.0.158", features = ["derive"] }
 serde_yaml = "0.9.21"

+ 67 - 37
src/bin/mgen-client.rs

@@ -3,15 +3,19 @@
 use mgen::{log, updater::Updater, HandshakeRef, MessageHeader, SerializedMessage};
 
 use futures::future::try_join_all;
+use hyper::client::HttpConnector;
+use hyper_rustls::HttpsConnector;
+use hyper_socks2::SocksConnector;
 use rand_xoshiro::{rand_core::SeedableRng, Xoshiro256PlusPlus};
 use serde::Deserialize;
+use std::hash::{Hash, Hasher};
 use std::result::Result;
 use std::sync::Arc;
 use tokio::io::{split, AsyncWriteExt, ReadHalf, WriteHalf};
 use tokio::net::TcpStream;
 use tokio::spawn;
 use tokio::sync::mpsc;
-use tokio::time::Duration;
+use tokio::time::{sleep, Duration};
 use tokio_rustls::{client::TlsStream, TlsConnector};
 
 mod messenger;
@@ -76,6 +80,42 @@ fn web_url(target: &str, size: usize, user: &str) -> hyper::Uri {
         .unwrap_or_else(|_| panic!("Invalid URI: {}", formatted))
 }
 
+fn get_plain_https_client(
+    tls_config: tokio_rustls::rustls::ClientConfig,
+) -> hyper::client::Client<HttpsConnector<HttpConnector>> {
+    let https = hyper_rustls::HttpsConnectorBuilder::new()
+        .with_tls_config(tls_config)
+        .https_or_http()
+        .enable_http1()
+        .build();
+    hyper::Client::builder().build(https)
+}
+
+fn get_socks_https_client(
+    tls_config: tokio_rustls::rustls::ClientConfig,
+    username: String,
+    password: String,
+    proxy: String,
+) -> hyper::client::Client<HttpsConnector<SocksConnector<HttpConnector>>> {
+    let mut http = hyper::client::HttpConnector::new();
+    http.enforce_http(false);
+
+    let auth = hyper_socks2::Auth { username, password };
+    let socks = hyper_socks2::SocksConnector {
+        proxy_addr: format!("socks5://{}", proxy)
+            .parse()
+            .expect("Invalid proxy URI"),
+        auth: Some(auth),
+        connector: http,
+    };
+    let https = hyper_rustls::HttpsConnectorBuilder::new()
+        .with_tls_config(tls_config)
+        .https_or_http()
+        .enable_http1()
+        .wrap_connector(socks);
+    hyper::Client::builder().build(https)
+}
+
 /// The thread responsible for getting incoming messages,
 /// checking for any network errors while doing so,
 /// and giving messages to the state thread.
@@ -87,24 +127,14 @@ async fn reader(
     socket_updater: ReadSocketUpdaterOut,
     error_channel: ErrorChannelIn,
 ) {
-    let https = hyper_rustls::HttpsConnectorBuilder::new()
-        .with_tls_config(tls_config)
-        .https_only()
-        .enable_http1()
-        .build();
-
     match web_params.socks {
         Some(proxy) => {
-            let auth = hyper_socks2::Auth {
-                username: web_params.user.clone(),
-                password: web_params.recipient,
-            };
-            let socks = hyper_socks2::SocksConnector {
-                proxy_addr: proxy.parse().expect("Invalid proxy URI"),
-                auth: Some(auth),
-                connector: https,
-            };
-            let client: hyper::Client<_, hyper::Body> = hyper::Client::builder().build(socks);
+            let client = get_socks_https_client(
+                tls_config,
+                web_params.user.clone(),
+                web_params.target.clone(),
+                proxy,
+            );
             worker(
                 web_params.target,
                 web_params.user,
@@ -117,7 +147,7 @@ async fn reader(
             .await
         }
         None => {
-            let client = hyper::Client::builder().build(https);
+            let client = get_plain_https_client(tls_config);
             worker(
                 web_params.target,
                 web_params.user,
@@ -129,7 +159,7 @@ async fn reader(
             )
             .await
         }
-    }
+    };
 
     async fn worker<C>(
         target: String,
@@ -181,24 +211,14 @@ async fn uploader(
     tls_config: tokio_rustls::rustls::ClientConfig,
     size_channel: SizeChannelOut,
 ) {
-    let https = hyper_rustls::HttpsConnectorBuilder::new()
-        .with_tls_config(tls_config)
-        .https_only()
-        .enable_http1()
-        .build();
-
     match web_params.socks {
         Some(proxy) => {
-            let auth = hyper_socks2::Auth {
-                username: web_params.user.clone(),
-                password: web_params.recipient,
-            };
-            let socks = hyper_socks2::SocksConnector {
-                proxy_addr: proxy.parse().expect("Invalid proxy URI"),
-                auth: Some(auth),
-                connector: https,
-            };
-            let client = hyper::Client::builder().build(socks);
+            let client = get_socks_https_client(
+                tls_config,
+                web_params.user.clone(),
+                web_params.target.clone(),
+                proxy,
+            );
             worker(
                 web_params.target,
                 web_params.user,
@@ -209,7 +229,7 @@ async fn uploader(
             .await
         }
         None => {
-            let client = hyper::Client::builder().build(https);
+            let client = get_plain_https_client(tls_config);
             worker(
                 web_params.target,
                 web_params.user,
@@ -239,7 +259,7 @@ async fn uploader(
                 .expect("Invalid HTTP request attempted to construct");
             let mut res = client.request(request).await;
             while res.is_err() {
-                log!("Error uploading: {}", res.unwrap_err());
+                log!("{},{},Error uploading: {}", user, url, res.unwrap_err());
                 tokio::time::sleep(retry).await;
                 res = client.get(url.clone()).await;
             }
@@ -293,6 +313,7 @@ async fn socket_updater(
     let tls_server_str = str_params.target.split(':').next().unwrap();
     let tls_server_name =
         tokio_rustls::rustls::ServerName::try_from(tls_server_str).expect("invalid server name");
+
     loop {
         let stream: TcpStream = match connect(&str_params).await {
             Ok(stream) => stream,
@@ -395,6 +416,15 @@ async fn manage_conversation(
 
 /// Spawns all other threads for this conversation.
 async fn spawn_threads(config: FullConfig) -> Result<(), MessengerError> {
+    // without noise during Shadow's bootstrap period, we can overload the SOMAXCONN of the server,
+    // so we wait a small(ish) pseudorandom amount of time to spread things out
+    let mut hasher = rustc_hash::FxHasher::default();
+    config.user.hash(&mut hasher);
+    config.group.hash(&mut hasher);
+    let hash = hasher.finish() % 10_000;
+    log!("{},{},waiting,{}", config.user, config.group, hash);
+    sleep(Duration::from_millis(hash)).await;
+
     let message_server_params = SocksParams {
         socks: config.socks.clone(),
         target: config.message_server.clone(),

+ 1 - 1
src/bin/mgen-server.rs

@@ -65,7 +65,7 @@ async fn main_worker() -> Result<(), Box<dyn Error>> {
 
     let listen_addr = args.next().unwrap_or("127.0.0.1:6397".to_string());
 
-    let reg_time = args.next().unwrap_or("5".to_string()).parse()?;
+    let reg_time = args.next().unwrap_or("30".to_string()).parse()?;
     let reg_time = Instant::now() + Duration::from_secs(reg_time);
 
     let certfile = std::fs::File::open(cert_filename).expect("cannot open certificate file");