Browse Source

fix bugs in p2p mode

This better handles the case where handshakes cross in flight, and handles the
fact that p2p mode can't use the connection to determine the receipt group.
Justin Tracey 1 year ago
parent
commit
1fe669e5c3
4 changed files with 42 additions and 29 deletions
  1. 2 0
      src/bin/client.rs
  2. 7 3
      src/bin/messenger/state.rs
  3. 29 26
      src/bin/peer.rs
  4. 4 0
      src/lib.rs

+ 2 - 0
src/bin/client.rs

@@ -196,6 +196,7 @@ async fn manage_conversation(config: Config) -> Result<(), MessengerError> {
                     &mut state_to_writer,
                     &mut state_to_writer,
                     &config.user,
                     &config.user,
                     &config.group,
                     &config.group,
+                    false,
                     &mut rng,
                     &mut rng,
                 )
                 )
                 .await
                 .await
@@ -207,6 +208,7 @@ async fn manage_conversation(config: Config) -> Result<(), MessengerError> {
                     &mut state_to_writer,
                     &mut state_to_writer,
                     &config.user,
                     &config.user,
                     &config.group,
                     &config.group,
+                    false,
                     &mut rng,
                     &mut rng,
                 )
                 )
                 .await
                 .await

+ 7 - 3
src/bin/messenger/state.rs

@@ -284,6 +284,7 @@ async fn receive_action<
     conversation: Conversation<T>,
     conversation: Conversation<T>,
     stream_map: &mut M,
     stream_map: &mut M,
     our_id: &str,
     our_id: &str,
+    p2p: bool,
     rng: &mut Xoshiro256PlusPlus,
     rng: &mut Xoshiro256PlusPlus,
 ) -> StateMachine {
 ) -> StateMachine {
     match msg.body {
     match msg.body {
@@ -299,7 +300,8 @@ async fn receive_action<
                 size
                 size
             );
             );
             let stream = stream_map.channel_for(&msg.sender);
             let stream = stream_map.channel_for(&msg.sender);
-            let m = construct_receipt(our_id.to_string(), msg.sender);
+            let recipient = if p2p { msg.group } else { msg.sender };
+            let m = construct_receipt(our_id.to_string(), recipient);
             stream
             stream
                 .channel
                 .channel
                 .send(S::new(m))
                 .send(S::new(m))
@@ -338,6 +340,7 @@ pub async fn manage_idle_conversation<
     stream_map: &'a mut M,
     stream_map: &'a mut M,
     our_id: &str,
     our_id: &str,
     group: &str,
     group: &str,
+    p2p: bool,
     rng: &mut Xoshiro256PlusPlus,
     rng: &mut Xoshiro256PlusPlus,
 ) -> StateMachine {
 ) -> StateMachine {
     log!("{},{},Idle", our_id, group);
     log!("{},{},Idle", our_id, group);
@@ -353,7 +356,7 @@ pub async fn manage_idle_conversation<
             send_action(conversation, stream_map.values(), our_id, group, rng).await
             send_action(conversation, stream_map.values(), our_id, group, rng).await
         }
         }
         IdleGroupActions::Receive(msg) => {
         IdleGroupActions::Receive(msg) => {
-            receive_action(msg, conversation, stream_map, our_id, rng).await
+            receive_action(msg, conversation, stream_map, our_id, p2p, rng).await
         }
         }
     }
     }
 }
 }
@@ -376,6 +379,7 @@ pub async fn manage_active_conversation<
     stream_map: &'a mut M,
     stream_map: &'a mut M,
     our_id: &str,
     our_id: &str,
     group: &str,
     group: &str,
+    p2p: bool,
     rng: &mut Xoshiro256PlusPlus,
     rng: &mut Xoshiro256PlusPlus,
 ) -> StateMachine {
 ) -> StateMachine {
     let action = tokio::select! {
     let action = tokio::select! {
@@ -390,7 +394,7 @@ pub async fn manage_active_conversation<
             send_action(conversation, stream_map.values(), our_id, group, rng).await
             send_action(conversation, stream_map.values(), our_id, group, rng).await
         }
         }
         ActiveGroupActions::Receive(msg) => {
         ActiveGroupActions::Receive(msg) => {
-            receive_action(msg, conversation, stream_map, our_id, rng).await
+            receive_action(msg, conversation, stream_map, our_id, p2p, rng).await
         }
         }
         ActiveGroupActions::Idle => StateMachine::Idle(conversation.waited(rng)),
         ActiveGroupActions::Idle => StateMachine::Idle(conversation.waited(rng)),
     }
     }

+ 29 - 26
src/bin/peer.rs

@@ -67,6 +67,7 @@ async fn manage_conversation(
                     &mut state_to_writers,
                     &mut state_to_writers,
                     user,
                     user,
                     group,
                     group,
+                    true,
                     &mut rng,
                     &mut rng,
                 )
                 )
                 .await
                 .await
@@ -78,6 +79,7 @@ async fn manage_conversation(
                     &mut state_to_writers,
                     &mut state_to_writers,
                     user,
                     user,
                     group,
                     group,
+                    true,
                     &mut rng,
                     &mut rng,
                 )
                 )
                 .await
                 .await
@@ -145,11 +147,9 @@ async fn reader(
                 // send a message or the peer reaches out to us.
                 // send a message or the peer reaches out to us.
                 break;
                 break;
             };
             };
-
-            let group = msg.group.clone();
             let channel_to_conversation = group_to_conversation_thread
             let channel_to_conversation = group_to_conversation_thread
-                .get(&group)
-                .unwrap_or_else(|| panic!("Unknown group: {}", group));
+                .get(&msg.group)
+                .unwrap_or_else(|| panic!("Unknown group: {}", msg.group));
             channel_to_conversation
             channel_to_conversation
                 .send(msg)
                 .send(msg)
                 .expect("reader: Channel to group closed");
                 .expect("reader: Channel to group closed");
@@ -220,31 +220,34 @@ async fn writer<'a>(
         }
         }
 
 
         // immediately try to connect to the peer
         // immediately try to connect to the peer
-        let connection_attempt = Socks5Stream::connect_with_password(
-            socks_params.socks.as_str(),
-            socks_params.target.as_str(),
-            &socks_params.user,
-            &socks_params.recipient,
-        )
-        .await;
-        if let Ok(mut stream) = connection_attempt {
-            log!(
-                "connection attempt success from {} to {} on {}",
+        tokio::select! {
+            connection_attempt = Socks5Stream::connect_with_password(
+                socks_params.socks.as_str(),
+                socks_params.target.as_str(),
                 &socks_params.user,
                 &socks_params.user,
                 &socks_params.recipient,
                 &socks_params.recipient,
-                &socks_params.target
-            );
-            stream
-                .write_all(&mgen::serialize_str(&socks_params.user))
-                .await?;
-            let (rd, wr) = stream.into_inner().into_split();
-            read_socket_updater.send(rd);
-            return Ok(wr);
-        } else if let Err(e) = connection_attempt {
-            let e: MessengerError = e.into();
-            if let MessengerError::Fatal(e) = e {
-                return Err(e);
+            ) => {
+                if let Ok(mut stream) = connection_attempt {
+                    log!(
+                        "connection attempt success from {} to {} on {}",
+                        &socks_params.user,
+                        &socks_params.recipient,
+                        &socks_params.target
+                    );
+                    stream
+                        .write_all(&mgen::serialize_str(&socks_params.user))
+                        .await?;
+                    let (rd, wr) = stream.into_inner().into_split();
+                    read_socket_updater.send(rd);
+                    return Ok(wr);
+                } else if let Err(e) = connection_attempt {
+                    let e: MessengerError = e.into();
+                    if let MessengerError::Fatal(e) = e {
+                        return Err(e);
+                    }
+                }
             }
             }
+            stream = write_socket_updater.recv() => {return Ok(stream);},
         }
         }
 
 
         // Usually we'll have returned by now, but sometimes we'll fail to
         // Usually we'll have returned by now, but sometimes we'll fail to

+ 4 - 0
src/lib.rs

@@ -73,8 +73,12 @@ impl MessageBody {
 // FIXME: we should try to replace MessageHeader with MessageHeaderRef
 // FIXME: we should try to replace MessageHeader with MessageHeaderRef
 #[derive(Debug)]
 #[derive(Debug)]
 pub struct MessageHeader {
 pub struct MessageHeader {
+    /// User who constructed the message.
     pub sender: String,
     pub sender: String,
+    /// Group associated with the message.
+    /// In client-server mode receipts, this is the recipient instead.
     pub group: String,
     pub group: String,
+    /// The type and size of the message payload.
     pub body: MessageBody,
     pub body: MessageBody,
 }
 }