Explorar el Código

make socks proxy optional for client and peer

Justin Tracey hace 2 años
padre
commit
a2dc42693d
Se han modificado 4 ficheros con 58 adiciones y 55 borrados
  1. 11 26
      src/bin/client.rs
  2. 1 0
      src/bin/messenger/mod.rs
  3. 39 0
      src/bin/messenger/tcp.rs
  4. 7 29
      src/bin/peer.rs

+ 11 - 26
src/bin/client.rs

@@ -6,11 +6,13 @@ use rand_xoshiro::{rand_core::SeedableRng, Xoshiro256PlusPlus};
 use serde::Deserialize;
 use serde::Deserialize;
 use std::result::Result;
 use std::result::Result;
 use tokio::io::AsyncWriteExt;
 use tokio::io::AsyncWriteExt;
-use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
+use tokio::net::{
+    tcp::{OwnedReadHalf, OwnedWriteHalf},
+    TcpStream,
+};
 use tokio::sync::mpsc;
 use tokio::sync::mpsc;
 use tokio::task;
 use tokio::task;
 use tokio::time::Duration;
 use tokio::time::Duration;
-use tokio_socks::tcp::Socks5Stream;
 
 
 mod messenger;
 mod messenger;
 
 
@@ -19,6 +21,7 @@ use crate::messenger::error::{FatalError, MessengerError};
 use crate::messenger::state::{
 use crate::messenger::state::{
     manage_active_conversation, manage_idle_conversation, StateMachine, StateToWriter,
     manage_active_conversation, manage_idle_conversation, StateMachine, StateToWriter,
 };
 };
+use crate::messenger::tcp::{connect, SocksParams};
 
 
 /// Type for sending messages from the reader thread to the state thread.
 /// Type for sending messages from the reader thread to the state thread.
 type ReaderToState = mpsc::UnboundedSender<MessageHeader>;
 type ReaderToState = mpsc::UnboundedSender<MessageHeader>;
@@ -87,15 +90,6 @@ async fn writer(
     }
     }
 }
 }
 
 
-/// Parameters used in establishing the connection through the socks proxy.
-/// (Members may be useful elsewhere as well, but that's the primary purpose.)
-struct SocksParams {
-    socks: String,
-    server: String,
-    user: String,
-    group: String,
-}
-
 /// The thread responsible for (re-)establishing connections to the server,
 /// The thread responsible for (re-)establishing connections to the server,
 /// and determining how to handle errors this or other threads receive.
 /// and determining how to handle errors this or other threads receive.
 async fn socket_updater(
 async fn socket_updater(
@@ -107,16 +101,7 @@ async fn socket_updater(
 ) -> FatalError {
 ) -> FatalError {
     let retry = Duration::from_secs_f64(retry);
     let retry = Duration::from_secs_f64(retry);
     loop {
     loop {
-        let socks_connection: Result<Socks5Stream<_>, MessengerError> =
-            Socks5Stream::connect_with_password(
-                str_params.socks.as_str(),
-                str_params.server.as_str(),
-                &str_params.user,
-                &str_params.group,
-            )
-            .await
-            .map_err(|e| e.into());
-        let mut stream = match socks_connection {
+        let mut stream: TcpStream = match connect(&str_params).await {
             Ok(stream) => stream,
             Ok(stream) => stream,
             Err(MessengerError::Recoverable(_)) => {
             Err(MessengerError::Recoverable(_)) => {
                 tokio::time::sleep(retry).await;
                 tokio::time::sleep(retry).await;
@@ -127,14 +112,14 @@ async fn socket_updater(
 
 
         let handshake = HandshakeRef {
         let handshake = HandshakeRef {
             sender: &str_params.user,
             sender: &str_params.user,
-            group: &str_params.group,
+            group: &str_params.recipient,
         };
         };
 
 
         if stream.write_all(&handshake.serialize()).await.is_err() {
         if stream.write_all(&handshake.serialize()).await.is_err() {
             continue;
             continue;
         }
         }
 
 
-        let (rd, wr) = stream.into_inner().into_split();
+        let (rd, wr) = stream.into_split();
         reader_channel.send(rd);
         reader_channel.send(rd);
         writer_channel.send(wr);
         writer_channel.send(wr);
 
 
@@ -154,9 +139,9 @@ async fn manage_conversation(config: Config) -> Result<(), MessengerError> {
 
 
     let str_params = SocksParams {
     let str_params = SocksParams {
         socks: config.socks,
         socks: config.socks,
-        server: config.server,
+        target: config.server,
         user: config.user.clone(),
         user: config.user.clone(),
-        group: config.group.clone(),
+        recipient: config.group.clone(),
     };
     };
 
 
     let mut state_machine = StateMachine::start(distributions, &mut rng);
     let mut state_machine = StateMachine::start(distributions, &mut rng);
@@ -221,7 +206,7 @@ async fn manage_conversation(config: Config) -> Result<(), MessengerError> {
 struct Config {
 struct Config {
     user: String,
     user: String,
     group: String,
     group: String,
-    socks: String,
+    socks: Option<String>,
     server: String,
     server: String,
     bootstrap: f64,
     bootstrap: f64,
     retry: f64,
     retry: f64,

+ 1 - 0
src/bin/messenger/mod.rs

@@ -3,3 +3,4 @@ pub mod dists;
 pub mod error;
 pub mod error;
 pub mod message;
 pub mod message;
 pub mod state;
 pub mod state;
+pub mod tcp;

+ 39 - 0
src/bin/messenger/tcp.rs

@@ -0,0 +1,39 @@
+use tokio::net::TcpStream;
+use tokio_socks::tcp::Socks5Stream;
+
+use crate::messenger::error::MessengerError;
+
+/// Parameters used in establishing the connection, optionally through a socks proxy.
+/// (Members may be useful elsewhere as well, but that's the primary purpose.)
+pub struct SocksParams {
+    /// Optional socks proxy address.
+    pub socks: Option<String>,
+    /// The target server or peer.
+    pub target: String,
+    /// The user who owns this connection.
+    pub user: String,
+    /// The recipient of messages sent on this connection.
+    /// Group for client-server, user for p2p.
+    pub recipient: String,
+}
+
+pub async fn connect(str_params: &SocksParams) -> Result<TcpStream, MessengerError> {
+    match &str_params.socks {
+        Some(socks) => {
+            let socks_connection = Socks5Stream::connect_with_password(
+                socks.as_str(),
+                str_params.target.as_str(),
+                &str_params.user,
+                &str_params.recipient,
+            )
+            .await;
+            match socks_connection {
+                Ok(stream) => Ok(stream.into_inner()),
+                Err(e) => Err(e.into()),
+            }
+        }
+        None => TcpStream::connect(&str_params.target)
+            .await
+            .map_err(|e| e.into()),
+    }
+}

+ 7 - 29
src/bin/peer.rs

@@ -13,7 +13,6 @@ use tokio::net::{
 use tokio::sync::mpsc;
 use tokio::sync::mpsc;
 use tokio::task;
 use tokio::task;
 use tokio::time::Duration;
 use tokio::time::Duration;
-use tokio_socks::tcp::Socks5Stream;
 
 
 mod messenger;
 mod messenger;
 
 
@@ -23,6 +22,7 @@ use crate::messenger::state::{
     manage_active_conversation, manage_idle_conversation, StateFromReader, StateMachine,
     manage_active_conversation, manage_idle_conversation, StateFromReader, StateMachine,
     StateToWriter,
     StateToWriter,
 };
 };
+use crate::messenger::tcp::{connect, SocksParams};
 
 
 /// Type for sending messages from the reader thread to the state thread.
 /// Type for sending messages from the reader thread to the state thread.
 type ReaderToState = mpsc::UnboundedSender<MessageHeader>;
 type ReaderToState = mpsc::UnboundedSender<MessageHeader>;
@@ -221,12 +221,7 @@ async fn writer<'a>(
 
 
         // immediately try to connect to the peer
         // immediately try to connect to the peer
         tokio::select! {
         tokio::select! {
-            connection_attempt = Socks5Stream::connect_with_password(
-                socks_params.socks.as_str(),
-                socks_params.target.as_str(),
-                &socks_params.user,
-                &socks_params.recipient,
-            ) => {
+            connection_attempt = connect(socks_params) => {
                 if let Ok(mut stream) = connection_attempt {
                 if let Ok(mut stream) = connection_attempt {
                     log!(
                     log!(
                         "connection attempt success from {} to {} on {}",
                         "connection attempt success from {} to {} on {}",
@@ -237,14 +232,11 @@ async fn writer<'a>(
                     stream
                     stream
                         .write_all(&mgen::serialize_str(&socks_params.user))
                         .write_all(&mgen::serialize_str(&socks_params.user))
                         .await?;
                         .await?;
-                    let (rd, wr) = stream.into_inner().into_split();
+                    let (rd, wr) = stream.into_split();
                     read_socket_updater.send(rd);
                     read_socket_updater.send(rd);
                     return Ok(wr);
                     return Ok(wr);
-                } else if let Err(e) = connection_attempt {
-                    let e: MessengerError = e.into();
-                    if let MessengerError::Fatal(e) = e {
+                } else if let Err(MessengerError::Fatal(e)) = connection_attempt {
                         return Err(e);
                         return Err(e);
-                    }
                 }
                 }
             }
             }
             stream = write_socket_updater.recv() => {return Ok(stream);},
             stream = write_socket_updater.recv() => {return Ok(stream);},
@@ -278,16 +270,11 @@ async fn writer<'a>(
         ) -> Result<OwnedWriteHalf, MessengerError> {
         ) -> Result<OwnedWriteHalf, MessengerError> {
             tokio::select! {
             tokio::select! {
                 () = tokio::time::sleep(retry) => {
                 () = tokio::time::sleep(retry) => {
-                    let mut stream = Socks5Stream::connect_with_password(
-                        socks_params.socks.as_str(),
-                        socks_params.target.as_str(),
-                        &socks_params.user,
-                        &socks_params.recipient,
-                    )
+                    let mut stream = connect(socks_params)
                         .await?;
                         .await?;
                     stream.write_all(&mgen::serialize_str(&socks_params.user)).await?;
                     stream.write_all(&mgen::serialize_str(&socks_params.user)).await?;
 
 
-                    let (rd, wr) = stream.into_inner().into_split();
+                    let (rd, wr) = stream.into_split();
                     read_socket_updater.send(rd);
                     read_socket_updater.send(rd);
                     Ok(wr)
                     Ok(wr)
                 },
                 },
@@ -297,15 +284,6 @@ async fn writer<'a>(
     }
     }
 }
 }
 
 
-/// Parameters used in establishing the connection through the socks proxy.
-/// (Members may be useful elsewhere as well, but that's the primary purpose.)
-struct SocksParams {
-    socks: String,
-    target: String,
-    user: String,
-    recipient: String,
-}
-
 /// This user or a recipient.
 /// This user or a recipient.
 /// If this user, address is a local address to listen on.
 /// If this user, address is a local address to listen on.
 /// If a recipient, address is a remote address to send to.
 /// If a recipient, address is a remote address to send to.
@@ -320,7 +298,7 @@ struct Config {
     user: Peer,
     user: Peer,
     group: String,
     group: String,
     recipients: Vec<Peer>,
     recipients: Vec<Peer>,
-    socks: String,
+    socks: Option<String>,
     bootstrap: f64,
     bootstrap: f64,
     retry: f64,
     retry: f64,
     distributions: ConfigDistributions,
     distributions: ConfigDistributions,