|
@@ -3,15 +3,19 @@
|
|
|
use mgen::{log, updater::Updater, HandshakeRef, MessageHeader, SerializedMessage};
|
|
|
|
|
|
use futures::future::try_join_all;
|
|
|
+use hyper::client::HttpConnector;
|
|
|
+use hyper_rustls::HttpsConnector;
|
|
|
+use hyper_socks2::SocksConnector;
|
|
|
use rand_xoshiro::{rand_core::SeedableRng, Xoshiro256PlusPlus};
|
|
|
use serde::Deserialize;
|
|
|
+use std::hash::{Hash, Hasher};
|
|
|
use std::result::Result;
|
|
|
use std::sync::Arc;
|
|
|
use tokio::io::{split, AsyncWriteExt, ReadHalf, WriteHalf};
|
|
|
use tokio::net::TcpStream;
|
|
|
use tokio::spawn;
|
|
|
use tokio::sync::mpsc;
|
|
|
-use tokio::time::Duration;
|
|
|
+use tokio::time::{sleep, Duration};
|
|
|
use tokio_rustls::{client::TlsStream, TlsConnector};
|
|
|
|
|
|
mod messenger;
|
|
@@ -76,6 +80,42 @@ fn web_url(target: &str, size: usize, user: &str) -> hyper::Uri {
|
|
|
.unwrap_or_else(|_| panic!("Invalid URI: {}", formatted))
|
|
|
}
|
|
|
|
|
|
+fn get_plain_https_client(
|
|
|
+ tls_config: tokio_rustls::rustls::ClientConfig,
|
|
|
+) -> hyper::client::Client<HttpsConnector<HttpConnector>> {
|
|
|
+ let https = hyper_rustls::HttpsConnectorBuilder::new()
|
|
|
+ .with_tls_config(tls_config)
|
|
|
+ .https_or_http()
|
|
|
+ .enable_http1()
|
|
|
+ .build();
|
|
|
+ hyper::Client::builder().build(https)
|
|
|
+}
|
|
|
+
|
|
|
+fn get_socks_https_client(
|
|
|
+ tls_config: tokio_rustls::rustls::ClientConfig,
|
|
|
+ username: String,
|
|
|
+ password: String,
|
|
|
+ proxy: String,
|
|
|
+) -> hyper::client::Client<HttpsConnector<SocksConnector<HttpConnector>>> {
|
|
|
+ let mut http = hyper::client::HttpConnector::new();
|
|
|
+ http.enforce_http(false);
|
|
|
+
|
|
|
+ let auth = hyper_socks2::Auth { username, password };
|
|
|
+ let socks = hyper_socks2::SocksConnector {
|
|
|
+ proxy_addr: format!("socks5://{}", proxy)
|
|
|
+ .parse()
|
|
|
+ .expect("Invalid proxy URI"),
|
|
|
+ auth: Some(auth),
|
|
|
+ connector: http,
|
|
|
+ };
|
|
|
+ let https = hyper_rustls::HttpsConnectorBuilder::new()
|
|
|
+ .with_tls_config(tls_config)
|
|
|
+ .https_or_http()
|
|
|
+ .enable_http1()
|
|
|
+ .wrap_connector(socks);
|
|
|
+ hyper::Client::builder().build(https)
|
|
|
+}
|
|
|
+
|
|
|
/// The thread responsible for getting incoming messages,
|
|
|
/// checking for any network errors while doing so,
|
|
|
/// and giving messages to the state thread.
|
|
@@ -87,24 +127,14 @@ async fn reader(
|
|
|
socket_updater: ReadSocketUpdaterOut,
|
|
|
error_channel: ErrorChannelIn,
|
|
|
) {
|
|
|
- let https = hyper_rustls::HttpsConnectorBuilder::new()
|
|
|
- .with_tls_config(tls_config)
|
|
|
- .https_only()
|
|
|
- .enable_http1()
|
|
|
- .build();
|
|
|
-
|
|
|
match web_params.socks {
|
|
|
Some(proxy) => {
|
|
|
- let auth = hyper_socks2::Auth {
|
|
|
- username: web_params.user.clone(),
|
|
|
- password: web_params.recipient,
|
|
|
- };
|
|
|
- let socks = hyper_socks2::SocksConnector {
|
|
|
- proxy_addr: proxy.parse().expect("Invalid proxy URI"),
|
|
|
- auth: Some(auth),
|
|
|
- connector: https,
|
|
|
- };
|
|
|
- let client: hyper::Client<_, hyper::Body> = hyper::Client::builder().build(socks);
|
|
|
+ let client = get_socks_https_client(
|
|
|
+ tls_config,
|
|
|
+ web_params.user.clone(),
|
|
|
+ web_params.target.clone(),
|
|
|
+ proxy,
|
|
|
+ );
|
|
|
worker(
|
|
|
web_params.target,
|
|
|
web_params.user,
|
|
@@ -117,7 +147,7 @@ async fn reader(
|
|
|
.await
|
|
|
}
|
|
|
None => {
|
|
|
- let client = hyper::Client::builder().build(https);
|
|
|
+ let client = get_plain_https_client(tls_config);
|
|
|
worker(
|
|
|
web_params.target,
|
|
|
web_params.user,
|
|
@@ -129,7 +159,7 @@ async fn reader(
|
|
|
)
|
|
|
.await
|
|
|
}
|
|
|
- }
|
|
|
+ };
|
|
|
|
|
|
async fn worker<C>(
|
|
|
target: String,
|
|
@@ -181,24 +211,14 @@ async fn uploader(
|
|
|
tls_config: tokio_rustls::rustls::ClientConfig,
|
|
|
size_channel: SizeChannelOut,
|
|
|
) {
|
|
|
- let https = hyper_rustls::HttpsConnectorBuilder::new()
|
|
|
- .with_tls_config(tls_config)
|
|
|
- .https_only()
|
|
|
- .enable_http1()
|
|
|
- .build();
|
|
|
-
|
|
|
match web_params.socks {
|
|
|
Some(proxy) => {
|
|
|
- let auth = hyper_socks2::Auth {
|
|
|
- username: web_params.user.clone(),
|
|
|
- password: web_params.recipient,
|
|
|
- };
|
|
|
- let socks = hyper_socks2::SocksConnector {
|
|
|
- proxy_addr: proxy.parse().expect("Invalid proxy URI"),
|
|
|
- auth: Some(auth),
|
|
|
- connector: https,
|
|
|
- };
|
|
|
- let client = hyper::Client::builder().build(socks);
|
|
|
+ let client = get_socks_https_client(
|
|
|
+ tls_config,
|
|
|
+ web_params.user.clone(),
|
|
|
+ web_params.target.clone(),
|
|
|
+ proxy,
|
|
|
+ );
|
|
|
worker(
|
|
|
web_params.target,
|
|
|
web_params.user,
|
|
@@ -209,7 +229,7 @@ async fn uploader(
|
|
|
.await
|
|
|
}
|
|
|
None => {
|
|
|
- let client = hyper::Client::builder().build(https);
|
|
|
+ let client = get_plain_https_client(tls_config);
|
|
|
worker(
|
|
|
web_params.target,
|
|
|
web_params.user,
|
|
@@ -239,7 +259,7 @@ async fn uploader(
|
|
|
.expect("Invalid HTTP request attempted to construct");
|
|
|
let mut res = client.request(request).await;
|
|
|
while res.is_err() {
|
|
|
- log!("Error uploading: {}", res.unwrap_err());
|
|
|
+ log!("{},{},Error uploading: {}", user, url, res.unwrap_err());
|
|
|
tokio::time::sleep(retry).await;
|
|
|
res = client.get(url.clone()).await;
|
|
|
}
|
|
@@ -293,6 +313,7 @@ async fn socket_updater(
|
|
|
let tls_server_str = str_params.target.split(':').next().unwrap();
|
|
|
let tls_server_name =
|
|
|
tokio_rustls::rustls::ServerName::try_from(tls_server_str).expect("invalid server name");
|
|
|
+
|
|
|
loop {
|
|
|
let stream: TcpStream = match connect(&str_params).await {
|
|
|
Ok(stream) => stream,
|
|
@@ -395,6 +416,15 @@ async fn manage_conversation(
|
|
|
|
|
|
/// Spawns all other threads for this conversation.
|
|
|
async fn spawn_threads(config: FullConfig) -> Result<(), MessengerError> {
|
|
|
+ // without noise during Shadow's bootstrap period, we can overload the SOMAXCONN of the server,
|
|
|
+ // so we wait a small(ish) pseudorandom amount of time to spread things out
|
|
|
+ let mut hasher = rustc_hash::FxHasher::default();
|
|
|
+ config.user.hash(&mut hasher);
|
|
|
+ config.group.hash(&mut hasher);
|
|
|
+ let hash = hasher.finish() % 10_000;
|
|
|
+ log!("{},{},waiting,{}", config.user, config.group, hash);
|
|
|
+ sleep(Duration::from_millis(hash)).await;
|
|
|
+
|
|
|
let message_server_params = SocksParams {
|
|
|
socks: config.socks.clone(),
|
|
|
target: config.message_server.clone(),
|