Browse Source

client: simplify threading, and disable Nagle's

Justin Tracey 3 months ago
parent
commit
e3c9b94921

+ 1 - 1
.cargo/config.toml

@@ -1,2 +1,2 @@
 [build]
-rustflags = ["-Ctarget-cpu=native"]
+rustflags = ["-Ctarget-cpu=native", "--cfg", "tokio_unstable"]

+ 7 - 0
Cargo.toml

@@ -1,3 +1,4 @@
+workspace = { members = ["mgentools"] }
 [package]
 name = "mgen"
 version = "0.1.0"
@@ -5,6 +6,7 @@ edition = "2021"
 
 [dependencies]
 chrono = "0.4.24"
+console-subscriber = { version = "0.2.0", optional = true }
 futures = "0.3.28"
 glob = "0.3.1"
 hyper = { version = "0.14", default-features = false, features = ["client"] }
@@ -19,8 +21,13 @@ serde_yaml = "0.9.21"
 tokio = { version = "1", features = ["full"] }
 tokio-rustls = { version = "0.24.1", features = ["dangerous_configuration"] }
 tokio-socks = "0.5.1"
+tracing = { version = "0.1.40", optional = true }
 url = "2.4.0"
 
+[features]
+default = []
+tracing = ["dep:console-subscriber", "dep:tracing", "tokio/tracing"]
+
 [profile.release]
 lto = true
 codegen-units = 1

+ 2 - 2
shadow/client/shadow.data.template/hosts/client1/alice.yaml

@@ -1,6 +1,6 @@
 user: "Alice"
-message_server: "server:6397"
-web_server: "web:6398"
+message_server: "100.0.0.1:6397"
+web_server: "100.0.0.2:6398"
 bootstrap: 5.0
 retry: 5.0
 distributions:

+ 2 - 2
shadow/client/shadow.data.template/hosts/client2/bob.yaml

@@ -1,6 +1,6 @@
 user: "Bob"
-message_server: "server:6397"
-web_server: "web:6398"
+message_server: "100.0.0.1:6397"
+web_server: "100.0.0.2:6398"
 bootstrap: 5.0
 retry: 5.0
 distributions: &dists

+ 2 - 2
shadow/client/shadow.data.template/hosts/client2/carol.yaml

@@ -1,6 +1,6 @@
 user: "Carol"
-message_server: "server:6397"
-web_server: "web:6398"
+message_server: "100.0.0.1:6397"
+web_server: "100.0.0.2:6398"
 bootstrap: 5.0
 retry: 5.0
 distributions:

+ 2 - 2
shadow/client/shadow.data.template/hosts/client3/dave.yaml

@@ -1,6 +1,6 @@
 user: "Dave"
-message_server: "server:6397"
-web_server: "web:6398"
+message_server: "100.0.0.1:6397"
+web_server: "100.0.0.2:6398"
 bootstrap: 5.0
 retry: 5.0
 distributions:

+ 1 - 0
shadow/client/shadow.yaml

@@ -33,5 +33,6 @@ hosts:
       args: "*.yaml"
       start_time: 5s
       expected_final_state: running
+      environment: {RUST_LOG: "trace"}
   client2: *client_host
   client3: *client_host

+ 14 - 7
src/bin/messenger/tcp.rs

@@ -1,4 +1,4 @@
-use tokio::net::TcpStream;
+use tokio::net::{TcpSocket, TcpStream};
 use tokio_socks::tcp::Socks5Stream;
 
 use crate::messenger::error::MessengerError;
@@ -21,20 +21,27 @@ pub struct SocksParams {
 pub async fn connect(str_params: &SocksParams) -> Result<TcpStream, MessengerError> {
     match &str_params.socks {
         Some(socks) => {
-            let socks_connection = Socks5Stream::connect_with_password(
-                socks.as_str(),
+            let socks_addr = &socks.as_str().parse().unwrap();
+            let socks_socket = TcpSocket::new_v4()?;
+            socks_socket.set_nodelay(true)?;
+            let socks_connection = socks_socket.connect(*socks_addr).await?;
+            let target_connection = Socks5Stream::connect_with_password_and_socket(
+                socks_connection,
                 str_params.target.as_str(),
                 &str_params.user,
                 &str_params.recipient,
             )
             .await;
-            match socks_connection {
+            match target_connection {
                 Ok(stream) => Ok(stream.into_inner()),
                 Err(e) => Err(e.into()),
             }
         }
-        None => TcpStream::connect(&str_params.target)
-            .await
-            .map_err(|e| e.into()),
+        None => {
+            let addr = &str_params.target.parse().unwrap();
+            let socket = TcpSocket::new_v4()?;
+            socket.set_nodelay(true)?;
+            socket.connect(*addr).await.map_err(|e| e.into())
+        }
     }
 }

+ 69 - 57
src/bin/mgen-client.rs

@@ -1,5 +1,6 @@
 // Code specific to the client in the client-server mode.
 
+use futures::future::try_join_all;
 use mgen::updater::Updater;
 use mgen::{log, HandshakeRef, MessageHeader, SerializedMessage};
 use rand_xoshiro::{rand_core::SeedableRng, Xoshiro256PlusPlus};
@@ -7,9 +8,9 @@ use serde::Deserialize;
 use std::result::Result;
 use std::sync::Arc;
 use tokio::io::{split, AsyncWriteExt, ReadHalf, WriteHalf};
+use tokio::join;
 use tokio::net::TcpStream;
 use tokio::sync::mpsc;
-use tokio::task;
 use tokio::time::Duration;
 use tokio_rustls::{client::TlsStream, TlsConnector};
 
@@ -18,7 +19,8 @@ mod messenger;
 use crate::messenger::dists::{ConfigDistributions, Distributions};
 use crate::messenger::error::{FatalError, MessengerError};
 use crate::messenger::state::{
-    manage_active_conversation, manage_idle_conversation, StateMachine, StateToWriter,
+    manage_active_conversation, manage_idle_conversation, StateFromReader, StateMachine,
+    StateToWriter,
 };
 use crate::messenger::tcp::{connect, SocksParams};
 
@@ -332,27 +334,63 @@ async fn socket_updater(
 
 /// The thread responsible for handling the conversation state
 /// (i.e., whether the user is active or idle, and when to send messages).
-/// Spawns all other threads for this conversation.
-async fn manage_conversation(config: FullConfig) -> Result<(), MessengerError> {
+async fn manage_conversation(
+    config: FullConfig,
+    mut state_from_reader: StateFromReader,
+    mut state_to_writer: StateToWriter<MessageHolder>,
+) {
+    tokio::time::sleep(Duration::from_secs_f64(config.bootstrap)).await;
+    log!("{},{},awake", &config.user, &config.group);
+
     let mut rng = Xoshiro256PlusPlus::from_entropy();
+    let mut state_machine = StateMachine::start(config.distributions, &mut rng);
+
+    loop {
+        state_machine = match state_machine {
+            StateMachine::Idle(conversation) => {
+                manage_idle_conversation::<false, _, _, _>(
+                    conversation,
+                    &mut state_from_reader,
+                    &mut state_to_writer,
+                    &config.user,
+                    &config.group,
+                    &mut rng,
+                )
+                .await
+            }
+            StateMachine::Active(conversation) => {
+                manage_active_conversation(
+                    conversation,
+                    &mut state_from_reader,
+                    &mut state_to_writer,
+                    &config.user,
+                    &config.group,
+                    false,
+                    &mut rng,
+                )
+                .await
+            }
+        };
+    }
+}
 
+/// Spawns all other threads for this conversation.
+async fn spawn_threads(config: FullConfig) -> Result<(), MessengerError> {
     let message_server_params = SocksParams {
         socks: config.socks.clone(),
-        target: config.message_server,
+        target: config.message_server.clone(),
         user: config.user.clone(),
         recipient: config.group.clone(),
     };
 
     let web_server_params = SocksParams {
-        socks: config.socks,
-        target: config.web_server,
+        socks: config.socks.clone(),
+        target: config.web_server.clone(),
         user: config.user.clone(),
         recipient: config.group.clone(),
     };
 
-    let mut state_machine = StateMachine::start(config.distributions, &mut rng);
-
-    let (reader_to_state, mut state_from_reader) = mpsc::unbounded_channel();
+    let (reader_to_state, state_from_reader) = mpsc::unbounded_channel();
     let (state_to_writer, writer_from_state) = mpsc::unbounded_channel();
     let read_socket_updater_in = Updater::new();
     let read_socket_updater_out = read_socket_updater_in.clone();
@@ -361,73 +399,48 @@ async fn manage_conversation(config: FullConfig) -> Result<(), MessengerError> {
     let (errs_in, errs_out) = mpsc::unbounded_channel();
     let (writer_to_uploader, uploader_from_writer) = mpsc::unbounded_channel();
 
+    let state_to_writer = StateToWriter {
+        channel: state_to_writer,
+    };
+
     let retry = Duration::from_secs_f64(config.retry);
     let tls_config = tokio_rustls::rustls::ClientConfig::builder()
         .with_safe_defaults()
         .with_custom_certificate_verifier(Arc::new(NoCertificateVerification {}))
         .with_no_client_auth();
 
-    tokio::spawn(reader(
+    let reader = reader(
         web_server_params.clone(),
         retry,
         tls_config.clone(),
         reader_to_state,
         read_socket_updater_out,
         errs_in.clone(),
-    ));
-    tokio::spawn(writer(
+    );
+    let writer = writer(
         writer_from_state,
         writer_to_uploader,
         write_socket_updater_out,
         errs_in,
-    ));
-    tokio::spawn(uploader(
+    );
+    let uploader = uploader(
         web_server_params,
         retry,
         tls_config.clone(),
         uploader_from_writer,
-    ));
-    tokio::spawn(socket_updater(
+    );
+    let updater = socket_updater(
         message_server_params,
         retry,
         tls_config,
         errs_out,
         read_socket_updater_in,
         write_socket_updater_in,
-    ));
-
-    tokio::time::sleep(Duration::from_secs_f64(config.bootstrap)).await;
+    );
+    let manager = manage_conversation(config, state_from_reader, state_to_writer);
 
-    let mut state_to_writer = StateToWriter {
-        channel: state_to_writer,
-    };
-    loop {
-        state_machine = match state_machine {
-            StateMachine::Idle(conversation) => {
-                manage_idle_conversation::<false, _, _, _>(
-                    conversation,
-                    &mut state_from_reader,
-                    &mut state_to_writer,
-                    &config.user,
-                    &config.group,
-                    &mut rng,
-                )
-                .await
-            }
-            StateMachine::Active(conversation) => {
-                manage_active_conversation(
-                    conversation,
-                    &mut state_from_reader,
-                    &mut state_to_writer,
-                    &config.user,
-                    &config.group,
-                    false,
-                    &mut rng,
-                )
-                .await
-            }
-        };
-    }
+    join!(reader, writer, uploader, updater, manager);
+    Ok(())
 }
 
 struct FullConfig {
@@ -463,8 +476,11 @@ struct Config {
     conversations: Vec<ConversationConfig>,
 }
 
-#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
+#[tokio::main(flavor = "current_thread")]
 async fn main() -> Result<(), Box<dyn std::error::Error>> {
+    #[cfg(feature = "tracing")]
+    console_subscriber::init();
+
     let mut args = std::env::args();
     let _ = args.next();
     let mut handles = vec![];
@@ -491,16 +507,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                 retry: conversation.retry.unwrap_or(config.retry),
                 distributions,
             };
-            let handle: task::JoinHandle<Result<(), MessengerError>> =
-                tokio::spawn(manage_conversation(filled_conversation));
+            let handle = spawn_threads(filled_conversation);
             handles.push(handle);
         }
     }
 
-    let handles: futures::stream::FuturesUnordered<_> = handles.into_iter().collect();
-    for handle in handles {
-        handle.await??;
-    }
+    try_join_all(handles.into_iter()).await?;
     Ok(())
 }