Просмотр исходного кода

misc. bug fixes

 - Make client bootstrap apply to state machine, not socket updating.
 - Remove recipients args where we don't really need them.
 - Our socket peek loop in the server was busy looping in practice, replace with actual message handler.
 - Avoid extra arc clones.
Justin Tracey 2 лет назад
Родитель
Сommit
bc320250d5
4 измененных файлов с 31 добавлено и 43 удалено
  1. 2 12
      src/bin/client.rs
  2. 4 23
      src/bin/messenger/state.rs
  3. 0 5
      src/bin/peer.rs
  4. 25 3
      src/bin/server.rs

+ 2 - 12
src/bin/client.rs

@@ -102,13 +102,11 @@ struct SocksParams {
 /// and determining how to handle errors this or other threads receive.
 /// and determining how to handle errors this or other threads receive.
 async fn socket_updater(
 async fn socket_updater(
     str_params: SocksParams,
     str_params: SocksParams,
-    bootstrap: f64,
     retry: f64,
     retry: f64,
     mut error_channel: ErrorChannelOut,
     mut error_channel: ErrorChannelOut,
     reader_channel: ReadSocketUpdaterIn,
     reader_channel: ReadSocketUpdaterIn,
     writer_channel: WriteSocketUpdaterIn,
     writer_channel: WriteSocketUpdaterIn,
 ) -> FatalError {
 ) -> FatalError {
-    let mut first_run: bool = true;
     let retry = Duration::from_secs_f64(retry);
     let retry = Duration::from_secs_f64(retry);
     loop {
     loop {
         let socks_connection: Result<Socks5Stream<_>, MessengerError> =
         let socks_connection: Result<Socks5Stream<_>, MessengerError> =
@@ -138,11 +136,6 @@ async fn socket_updater(
             continue;
             continue;
         }
         }
 
 
-        if first_run {
-            tokio::time::sleep(Duration::from_secs_f64(bootstrap)).await;
-            first_run = false;
-        }
-
         let (rd, wr) = stream.into_inner().into_split();
         let (rd, wr) = stream.into_inner().into_split();
         reader_channel.send(rd);
         reader_channel.send(rd);
         writer_channel.send(wr);
         writer_channel.send(wr);
@@ -169,7 +162,6 @@ async fn manage_conversation(config: Config) -> Result<(), MessengerError> {
     };
     };
 
 
     let mut state_machine = StateMachine::start(distributions, &mut rng);
     let mut state_machine = StateMachine::start(distributions, &mut rng);
-    let recipients: Vec<&str> = config.recipients.iter().map(String::as_str).collect();
 
 
     let (reader_to_state, mut state_from_reader) = mpsc::unbounded_channel();
     let (reader_to_state, mut state_from_reader) = mpsc::unbounded_channel();
     let (state_to_writer, writer_from_state) = mpsc::unbounded_channel();
     let (state_to_writer, writer_from_state) = mpsc::unbounded_channel();
@@ -186,13 +178,14 @@ async fn manage_conversation(config: Config) -> Result<(), MessengerError> {
     tokio::spawn(writer(writer_from_state, write_socket_updater_out, errs_in));
     tokio::spawn(writer(writer_from_state, write_socket_updater_out, errs_in));
     tokio::spawn(socket_updater(
     tokio::spawn(socket_updater(
         str_params,
         str_params,
-        config.bootstrap,
         config.retry,
         config.retry,
         errs_out,
         errs_out,
         read_socket_updater_in,
         read_socket_updater_in,
         write_socket_updater_in,
         write_socket_updater_in,
     ));
     ));
 
 
+    tokio::time::sleep(Duration::from_secs_f64(config.bootstrap)).await;
+
     let mut state_to_writer = StateToWriter {
     let mut state_to_writer = StateToWriter {
         channel: state_to_writer,
         channel: state_to_writer,
     };
     };
@@ -205,7 +198,6 @@ async fn manage_conversation(config: Config) -> Result<(), MessengerError> {
                     &mut state_to_writer,
                     &mut state_to_writer,
                     &config.user,
                     &config.user,
                     &config.group,
                     &config.group,
-                    recipients.clone(),
                     &mut rng,
                     &mut rng,
                 )
                 )
                 .await
                 .await
@@ -217,7 +209,6 @@ async fn manage_conversation(config: Config) -> Result<(), MessengerError> {
                     &mut state_to_writer,
                     &mut state_to_writer,
                     &config.user,
                     &config.user,
                     &config.group,
                     &config.group,
-                    recipients.clone(),
                     &mut rng,
                     &mut rng,
                 )
                 )
                 .await
                 .await
@@ -230,7 +221,6 @@ async fn manage_conversation(config: Config) -> Result<(), MessengerError> {
 struct Config {
 struct Config {
     user: String,
     user: String,
     group: String,
     group: String,
-    recipients: Vec<String>,
     socks: String,
     socks: String,
     server: String,
     server: String,
     bootstrap: f64,
     bootstrap: f64,

+ 4 - 23
src/bin/messenger/state.rs

@@ -234,14 +234,13 @@ async fn send_action<
     mut streams: I,
     mut streams: I,
     our_id: &str,
     our_id: &str,
     group: &str,
     group: &str,
-    recipients: Vec<&str>,
     rng: &mut Xoshiro256PlusPlus,
     rng: &mut Xoshiro256PlusPlus,
 ) -> StateMachine {
 ) -> StateMachine {
     let size = conversation.dists.m.sample(rng);
     let size = conversation.dists.m.sample(rng);
     log!(
     log!(
-        "sending message from {} to {:?} of size {}",
+        "sending message from {} to {} of size {}",
         our_id,
         our_id,
-        recipients,
+        group,
         size
         size
     );
     );
     let m = S::new(construct_message(
     let m = S::new(construct_message(
@@ -320,7 +319,6 @@ pub async fn manage_idle_conversation<
     stream_map: &'a mut M,
     stream_map: &'a mut M,
     our_id: &str,
     our_id: &str,
     group: &str,
     group: &str,
-    recipients: Vec<&str>,
     rng: &mut Xoshiro256PlusPlus,
     rng: &mut Xoshiro256PlusPlus,
 ) -> StateMachine {
 ) -> StateMachine {
     log!("delaying for {:?}", conversation.delay - Instant::now());
     log!("delaying for {:?}", conversation.delay - Instant::now());
@@ -333,15 +331,7 @@ pub async fn manage_idle_conversation<
 
 
     match action {
     match action {
         IdleGroupActions::Send => {
         IdleGroupActions::Send => {
-            send_action(
-                conversation,
-                stream_map.values(),
-                our_id,
-                group,
-                recipients,
-                rng,
-            )
-            .await
+            send_action(conversation, stream_map.values(), our_id, group, rng).await
         }
         }
         IdleGroupActions::Receive(msg) => {
         IdleGroupActions::Receive(msg) => {
             receive_action(msg, conversation, stream_map, our_id, rng).await
             receive_action(msg, conversation, stream_map, our_id, rng).await
@@ -367,7 +357,6 @@ pub async fn manage_active_conversation<
     stream_map: &'a mut M,
     stream_map: &'a mut M,
     our_id: &str,
     our_id: &str,
     group: &str,
     group: &str,
-    recipients: Vec<&str>,
     rng: &mut Xoshiro256PlusPlus,
     rng: &mut Xoshiro256PlusPlus,
 ) -> StateMachine {
 ) -> StateMachine {
     let action = tokio::select! {
     let action = tokio::select! {
@@ -379,15 +368,7 @@ pub async fn manage_active_conversation<
 
 
     match action {
     match action {
         ActiveGroupActions::Send => {
         ActiveGroupActions::Send => {
-            send_action(
-                conversation,
-                stream_map.values(),
-                our_id,
-                group,
-                recipients,
-                rng,
-            )
-            .await
+            send_action(conversation, stream_map.values(), our_id, group, rng).await
         }
         }
         ActiveGroupActions::Receive(msg) => {
         ActiveGroupActions::Receive(msg) => {
             receive_action(msg, conversation, stream_map, our_id, rng).await
             receive_action(msg, conversation, stream_map, our_id, rng).await

+ 0 - 5
src/bin/peer.rs

@@ -45,14 +45,12 @@ type WriteSocketUpdaterOut = Updater<OwnedWriteHalf>;
 async fn manage_conversation(
 async fn manage_conversation(
     user: String,
     user: String,
     group: String,
     group: String,
-    recipients: Vec<String>,
     distributions: Distributions,
     distributions: Distributions,
     bootstrap: f64,
     bootstrap: f64,
     mut state_from_reader: StateFromReader,
     mut state_from_reader: StateFromReader,
     mut state_to_writers: HashMap<String, StateToWriter<MessageHolder>>,
     mut state_to_writers: HashMap<String, StateToWriter<MessageHolder>>,
 ) {
 ) {
     let mut rng = Xoshiro256PlusPlus::from_entropy();
     let mut rng = Xoshiro256PlusPlus::from_entropy();
-    let recipients: Vec<_> = recipients.iter().map(String::as_str).collect();
     let user = &user;
     let user = &user;
     let group = &group;
     let group = &group;
 
 
@@ -69,7 +67,6 @@ async fn manage_conversation(
                     &mut state_to_writers,
                     &mut state_to_writers,
                     user,
                     user,
                     group,
                     group,
-                    recipients.clone(),
                     &mut rng,
                     &mut rng,
                 )
                 )
                 .await
                 .await
@@ -81,7 +78,6 @@ async fn manage_conversation(
                     &mut state_to_writers,
                     &mut state_to_writers,
                     user,
                     user,
                     group,
                     group,
-                    recipients.clone(),
                     &mut rng,
                     &mut rng,
                 )
                 )
                 .await
                 .await
@@ -399,7 +395,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
         tokio::spawn(manage_conversation(
         tokio::spawn(manage_conversation(
             config.user.name,
             config.user.name,
             config.group,
             config.group,
-            config.recipients.iter().map(|p| p.name.clone()).collect(),
             distributions,
             distributions,
             config.bootstrap,
             config.bootstrap,
             state_from_reader,
             state_from_reader,

+ 25 - 3
src/bin/server.rs

@@ -140,8 +140,8 @@ async fn get_messages(
 ) -> Result<(), Box<dyn Error>> {
 ) -> Result<(), Box<dyn Error>> {
     // Wait for the next message to be received before populating our local copy of the db,
     // Wait for the next message to be received before populating our local copy of the db,
     // that way other clients have time to register.
     // that way other clients have time to register.
-    let mut discard = vec![0u8];
-    while socket.peek(&mut discard).await? == 0 {}
+    log!("waiting for message from {} to {}", sender, group);
+    let buf = mgen::get_message_bytes(&mut socket).await?;
 
 
     let db = global_db.read().await.clone();
     let db = global_db.read().await.clone();
     let message_channels: Vec<_> = db
     let message_channels: Vec<_> = db
@@ -149,6 +149,28 @@ async fn get_messages(
         .filter_map(|(k, v)| if *k != sender { Some(v) } else { None })
         .filter_map(|(k, v)| if *k != sender { Some(v) } else { None })
         .collect();
         .collect();
 
 
+    let message = MessageHeaderRef::deserialize(&buf[4..])?;
+    assert!(message.sender == sender);
+
+    match message.body {
+        MessageBody::Size(_) => {
+            assert!(message.group == group);
+            log!("got message from {} for {}", sender, group);
+            let body = message.body;
+            let m = Arc::new(SerializedMessage { header: buf, body });
+            for recipient in message_channels.iter() {
+                recipient.send(m.clone()).unwrap();
+            }
+        }
+        MessageBody::Receipt => {
+            let recipient = &db[message.group];
+            let body = message.body;
+            let m = Arc::new(SerializedMessage { header: buf, body });
+            recipient.send(m).unwrap();
+        }
+    }
+
+    // we never have to update the DB again, so repeat the above, skipping that step
     loop {
     loop {
         log!("waiting for message from {} to {}", sender, group);
         log!("waiting for message from {} to {}", sender, group);
         let buf = mgen::get_message_bytes(&mut socket).await?;
         let buf = mgen::get_message_bytes(&mut socket).await?;
@@ -169,7 +191,7 @@ async fn get_messages(
                 let recipient = &db[message.group];
                 let recipient = &db[message.group];
                 let body = message.body;
                 let body = message.body;
                 let m = Arc::new(SerializedMessage { header: buf, body });
                 let m = Arc::new(SerializedMessage { header: buf, body });
-                recipient.send(m.clone()).unwrap();
+                recipient.send(m).unwrap();
             }
             }
         }
         }
     }
     }