소스 검색

progress so far on p2p client (completely broken still, won't build)

Justin Tracey 1 년 전
부모
커밋
d440c0d88f
4개의 변경된 파일131개의 추가작업 그리고 58개의 파일을 삭제
  1. 5 3
      src/bin/client.rs
  2. 114 43
      src/bin/messenger/state.rs
  3. 1 11
      src/bin/server.rs
  4. 11 1
      src/lib.rs

+ 5 - 3
src/bin/client.rs

@@ -13,7 +13,9 @@ mod messenger;
 
 use crate::messenger::dists::{ConfigDistributions, Distributions};
 use crate::messenger::error::MessengerError;
-use crate::messenger::state::{manage_active_conversation, manage_idle_conversation, StateMachine};
+use crate::messenger::state::{
+    manage_single_active_conversation, manage_single_idle_conversation, StateMachine,
+};
 
 async fn manage_conversation(config: Config) -> Result<(), MessengerError> {
     let mut rng = Xoshiro256PlusPlus::from_entropy();
@@ -68,7 +70,7 @@ async fn manage_conversation(config: Config) -> Result<(), MessengerError> {
         loop {
             state_machine = match state_machine {
                 StateMachine::Idle(conversation) => {
-                    manage_idle_conversation(
+                    manage_single_idle_conversation(
                         conversation,
                         &mut stream,
                         str_params.sender,
@@ -78,7 +80,7 @@ async fn manage_conversation(config: Config) -> Result<(), MessengerError> {
                     .await?
                 }
                 StateMachine::Active(conversation) => {
-                    manage_active_conversation(
+                    manage_single_active_conversation(
                         conversation,
                         &mut stream,
                         str_params.sender,

+ 114 - 43
src/bin/messenger/state.rs

@@ -2,12 +2,15 @@
 // This includes inducing transitions and actions taken during transitions,
 // so a lot of the messenger network code is here.
 
-use mgen::log;
+use mgen::{log, MessageHeader, SerializedMessage};
 use rand_distr::Distribution;
 use rand_xoshiro::Xoshiro256PlusPlus;
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::io::AsyncReadExt;
 use tokio::net::TcpStream;
+use tokio::sync::mpsc;
 use tokio::time::Instant;
+use std::collections::HashMap;
+use std::sync::Arc;
 
 use crate::messenger::dists::Distributions;
 use crate::messenger::error::MessengerError;
@@ -147,15 +150,15 @@ impl Conversation<Active> {
         }
     }
 
-    async fn sleep(delay: Instant, wait: Instant) -> ActiveActions {
+    async fn sleep(delay: Instant, wait: Instant) -> ActiveGroupActions {
         if delay < wait {
             log!("delaying for {:?}", delay - Instant::now());
             tokio::time::sleep_until(delay).await;
-            ActiveActions::Send
+            ActiveGroupActions::Send
         } else {
             log!("waiting for {:?}", wait - Instant::now());
             tokio::time::sleep_until(wait).await;
-            ActiveActions::Idle
+            ActiveGroupActions::Idle
         }
     }
 }
@@ -182,13 +185,27 @@ async fn read_header_size(
     }
 }
 
-async fn send_action<T: State>(
-    conversation: Conversation<T>,
+async fn read_message(
     stream: &mut TcpStream,
+    n: usize,
+    mut header_size: [u8; 4],
+) -> Result<MessageHeader, MessengerError> {
+    if n < 4 {
+        // we didn't get the whole size, but we can use read_exact now
+        stream.read_exact(&mut header_size[n..]).await?;
+    }
+
+    let (header, _) = mgen::get_message_with_header_size(stream, header_size).await?;
+    Ok(header)
+}
+
+async fn send_action<'a, T: State, I: Iterator<Item = &'a mut mpsc::UnboundedSender<Arc<SerializedMessage>>>> (
+    conversation: Conversation<T>,
+    streams: I,
     our_id: &str,
     recipients: Vec<&str>,
     rng: &mut Xoshiro256PlusPlus,
-) -> Result<StateMachine, (StateMachine, MessengerError)> {
+) -> StateMachine {
     let size = conversation.dists.m.sample(rng);
     log!(
         "sending message from {} to {:?} of size {}",
@@ -196,41 +213,26 @@ async fn send_action<T: State>(
         recipients,
         size
     );
-    let m = construct_message(
+    let m = Arc::new(construct_message(
         our_id.to_string(),
         recipients.iter().map(|s| s.to_string()).collect(),
         size,
-    );
+    ));
 
-    if let Err(e) = m.write_all_to(stream).await {
-        return Err((T::to_machine(conversation), e.into()));
-    }
-    if let Err(e) = stream.flush().await {
-        return Err((T::to_machine(conversation), e.into()));
+    for stream in streams {
+        stream.send(m.clone()).expect("Internal stream closed with messages still being sent");
     }
 
-    Ok(T::sent(conversation, rng))
+    T::sent(conversation, rng)
 }
 
 async fn receive_action<T: State>(
-    n: usize,
-    mut header_size: [u8; 4],
+    msg: MessageHeader,
     conversation: Conversation<T>,
-    stream: &mut TcpStream,
+    stream_map: &HashMap<String, mpsc::UnboundedSender<Arc<SerializedMessage>>>,
     our_id: &str,
     rng: &mut Xoshiro256PlusPlus,
-) -> Result<StateMachine, (StateMachine, MessengerError)> {
-    if n < 4 {
-        // we didn't get the whole size, but we can use read_exact now
-        if let Err(e) = stream.read_exact(&mut header_size[n..]).await {
-            return Err((T::to_machine(conversation), e.into()));
-        }
-    }
-
-    let msg = match mgen::get_message_with_header_size(stream, header_size).await {
-        Ok((msg, _)) => msg,
-        Err(e) => return Err((T::to_machine(conversation), e.into())),
-    };
+) -> StateMachine {
 
     match msg.body {
         mgen::MessageBody::Size(size) => {
@@ -240,25 +242,24 @@ async fn receive_action<T: State>(
                 msg.sender,
                 size
             );
+            let stream = stream_map.get(&msg.sender).expect(&format!("No such contact: {}", msg.sender));
             let m = construct_receipt(our_id.to_string(), msg.sender);
-            if let Err(e) = m.write_all_to(stream).await {
-                return Err((T::to_machine(conversation), e.into()));
-            }
-            if let Err(e) = stream.flush().await {
-                return Err((T::to_machine(conversation), e.into()));
-            }
-            Ok(T::received(conversation, rng))
+            stream.send(Arc::new(m));
+            T::received(conversation, rng)
         }
-        mgen::MessageBody::Receipt => Ok(T::to_machine(conversation)),
+        mgen::MessageBody::Receipt => T::to_machine(conversation),
     }
 }
 
+/*
 enum IdleActions {
     Send,
     Receive(usize),
 }
 
-pub async fn manage_idle_conversation(
+/// Handle a state transition from Idle, including I/O, for a single-connection conversation.
+/// Used for Idle client-server and single-contact p2p conversations.
+pub async fn manage_single_idle_conversation(
     conversation: Conversation<Idle>,
     stream: &mut TcpStream,
     our_id: &str,
@@ -287,7 +288,11 @@ pub async fn manage_idle_conversation(
     match action {
         IdleActions::Send => send_action(conversation, stream, our_id, recipients, rng).await,
         IdleActions::Receive(n) => {
-            receive_action(n, header_size, conversation, stream, our_id, rng).await
+            let msg = match read_message(stream, n, header_size).await {
+                Ok(msg) => msg,
+                Err(e) => return Err((StateMachine::Idle(conversation), e)),
+            };
+            receive_action(msg, conversation, stream, our_id, rng).await
         }
     }
 }
@@ -298,7 +303,9 @@ enum ActiveActions {
     Idle,
 }
 
-pub async fn manage_active_conversation(
+/// Handle a state transition from Active, including I/O, for a single-connection conversation.
+/// Used for Active client-server and single-contact p2p conversations.
+pub async fn manage_single_active_conversation(
     conversation: Conversation<Active>,
     stream: &mut TcpStream,
     our_id: &str,
@@ -326,8 +333,72 @@ pub async fn manage_active_conversation(
     match action {
         ActiveActions::Send => send_action(conversation, stream, our_id, recipients, rng).await,
         ActiveActions::Receive(n) => {
-            receive_action(n, header_size, conversation, stream, our_id, rng).await
+            let msg = match read_message(stream, n, header_size).await {
+                Ok(msg) => msg,
+                Err(e) => return Err((StateMachine::Active(conversation), e)),
+            };
+            receive_action(msg, conversation, stream, our_id, rng).await
         }
         ActiveActions::Idle => Ok(StateMachine::Idle(conversation.waited(rng))),
     }
 }
+*/
+
+enum IdleGroupActions {
+    Send,
+    Receive(MessageHeader),
+}
+
+/// Handle a state transition from Idle, including I/O, for a multi-connection conversation.
+/// Used for Idle group p2p conversations.
+pub async fn manage_group_idle_conversation(
+    conversation: Conversation<Idle>,
+    inbound: &mut mpsc::UnboundedReceiver<MessageHeader>,
+    stream_map: &mut HashMap<String, mpsc::UnboundedSender<Arc<SerializedMessage>>>,
+    our_id: &str,
+    recipients: Vec<&str>,
+    rng: &mut Xoshiro256PlusPlus,
+) -> StateMachine {
+    log!("delaying for {:?}", conversation.delay - Instant::now());
+    let action = tokio::select! {
+        () = tokio::time::sleep_until(conversation.delay) => IdleGroupActions::Send,
+
+        res = inbound.recv() =>
+            IdleGroupActions::Receive(res.expect("inbound channel closed")),
+    };
+
+    match action {
+        IdleGroupActions::Send => send_action(conversation, stream_map.values_mut(), our_id, recipients, rng).await,
+        IdleGroupActions::Receive(msg) => receive_action(msg, conversation, stream_map, our_id, rng).await,
+    }
+}
+
+enum ActiveGroupActions {
+    Send,
+    Receive(MessageHeader),
+    Idle,
+}
+
+/// Handle a state transition from Active, including I/O, for a multi-connection conversation.
+/// Used for Active group p2p conversations.
+pub async fn manage_group_active_conversation(
+    conversation: Conversation<Active>,
+    inbound: &mut mpsc::UnboundedReceiver<MessageHeader>,
+    stream_map: &mut HashMap<String, mpsc::UnboundedSender<Arc<SerializedMessage>>>,
+    our_id: &str,
+    recipients: Vec<&str>,
+    rng: &mut Xoshiro256PlusPlus,
+) -> StateMachine {
+    let action = tokio::select! {
+        action = Conversation::<Active>::sleep(conversation.delay, conversation.state.wait) => action,
+
+        res = inbound.recv() =>
+            ActiveGroupActions::Receive(res.expect("inbound channel closed")),
+    };
+
+    match action {
+        ActiveGroupActions::Send => send_action(conversation, stream_map.values_mut(), our_id, recipients, rng).await,
+        ActiveGroupActions::Receive(msg) => receive_action(msg, conversation, stream_map, our_id, rng).await,
+        ActiveGroupActions::Idle => StateMachine::Idle(conversation.waited(rng)),
+    }
+}

+ 1 - 11
src/bin/server.rs

@@ -1,4 +1,4 @@
-use mgen::{log, SerializedMessage};
+use mgen::{log, parse_identifier, SerializedMessage};
 use std::collections::HashMap;
 use std::error::Error;
 use std::result::Result;
@@ -92,16 +92,6 @@ fn spawn_message_receiver(
     });
 }
 
-/// Parse the identifier from the start of the TcpStream.
-async fn parse_identifier(stream: &mut OwnedReadHalf) -> Result<ID, Box<dyn Error>> {
-    // this should maybe be buffered
-    let strlen = stream.read_u32().await?;
-    let mut buf = vec![0u8; strlen as usize];
-    stream.read_exact(&mut buf).await?;
-    let s = std::str::from_utf8(&buf)?;
-    Ok(s.to_string())
-}
-
 /// Loop for receiving messages on the socket, figuring out who to deliver them to,
 /// and forwarding them locally to the respective channel.
 async fn get_messages(

+ 11 - 1
src/lib.rs

@@ -75,7 +75,7 @@ pub struct MessageHeader {
 }
 
 impl MessageHeader {
-    /// Generate a consise serialization of the Message.
+    /// Generate a concise serialization of the Message.
     pub fn serialize(&self) -> SerializedMessage {
         // serialized message header: {
         //   header_len: u32,
@@ -150,6 +150,16 @@ impl MessageHeader {
     }
 }
 
+/// Parse the identifier from the start of the TcpStream.
+pub async fn parse_identifier<T: AsyncReadExt + std::marker::Unpin>(stream: &mut T) -> Result<String, Error> {
+    // this should maybe be buffered
+    let strlen = stream.read_u32().await?;
+    let mut buf = vec![0u8; strlen as usize];
+    stream.read_exact(&mut buf).await?;
+    let s = std::str::from_utf8(&buf)?;
+    Ok(s.to_string())
+}
+
 /// Gets a message from the stream and constructs a MessageHeader object, also returning the raw byte buffer
 pub async fn get_message<T: AsyncReadExt + std::marker::Unpin>(
     stream: &mut T,