Browse Source

use a more standard logging output

Switches most log lines to CSV of the form:

human-readable time,unix time.fractional,user,group,action[,action-specific fields]*

Part of this change is no longer short-circuiting receipt messages on clients
and peers, so we can have a single location for all message logging.
Justin Tracey 1 year ago
parent
commit
95c00b6852
4 changed files with 54 additions and 43 deletions
  1. 3 5
      src/bin/client.rs
  2. 40 21
      src/bin/messenger/state.rs
  3. 7 9
      src/bin/peer.rs
  4. 4 8
      src/lib.rs

+ 3 - 5
src/bin/client.rs

@@ -58,11 +58,9 @@ async fn reader(
                 }
             };
 
-            if msg.body != mgen::MessageBody::Receipt {
-                message_channel
-                    .send(msg)
-                    .expect("Reader message channel closed");
-            }
+            message_channel
+                .send(msg)
+                .expect("Reader message channel closed");
         }
     }
 }

+ 40 - 21
src/bin/messenger/state.rs

@@ -25,6 +25,13 @@ impl StateMachine {
     pub fn start(dists: Distributions, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
         Self::Idle(Conversation::<Idle>::start(dists, rng))
     }
+
+    fn name(&self) -> &str {
+        match self {
+            Self::Idle(_) => Idle::NAME,
+            Self::Active(_) => Active::NAME,
+        }
+    }
 }
 
 /// The state machine representing a conversation state and its transitions.
@@ -35,6 +42,7 @@ pub struct Conversation<S: State> {
 }
 
 pub trait State {
+    const NAME: &'static str;
     fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine
     where
         Self: Sized;
@@ -52,9 +60,9 @@ pub struct Active {
 }
 
 impl State for Idle {
+    const NAME: &'static str = "Idle";
     fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
         if conversation.dists.s.sample(rng) {
-            log!("Idle: [sent] tranisition to [Active]");
             let delay = Instant::now() + conversation.dists.a_s.sample_secs(rng);
             let wait = Instant::now() + conversation.dists.w.sample_secs(rng);
             StateMachine::Active({
@@ -65,7 +73,6 @@ impl State for Idle {
                 }
             })
         } else {
-            log!("Idle: [sent] tranisition to [Idle]");
             let delay = Instant::now() + conversation.dists.i.sample_secs(rng);
             StateMachine::Idle({
                 Conversation::<Idle> {
@@ -79,7 +86,6 @@ impl State for Idle {
 
     fn received(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
         if conversation.dists.r.sample(rng) {
-            log!("Idle: [recv'd] tranisition to [Active]");
             let wait = Instant::now() + conversation.dists.w.sample_secs(rng);
             let delay = Instant::now() + conversation.dists.a_r.sample_secs(rng);
             StateMachine::Active({
@@ -90,7 +96,6 @@ impl State for Idle {
                 }
             })
         } else {
-            log!("Idle: [recv'd] tranisition to [Idle]");
             StateMachine::Idle(conversation)
         }
     }
@@ -101,8 +106,8 @@ impl State for Idle {
 }
 
 impl State for Active {
+    const NAME: &'static str = "Active";
     fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
-        log!("Active: [sent] transition to [Active]");
         let delay = Instant::now() + conversation.dists.a_s.sample_secs(rng);
         StateMachine::Active(Conversation::<Active> {
             dists: conversation.dists,
@@ -112,7 +117,6 @@ impl State for Active {
     }
 
     fn received(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
-        log!("Active: [recv'd] transition to [Active]");
         let delay = Instant::now() + conversation.dists.a_r.sample_secs(rng);
         StateMachine::Active(Conversation::<Active> {
             dists: conversation.dists,
@@ -129,7 +133,6 @@ impl State for Active {
 impl Conversation<Idle> {
     fn start(dists: Distributions, rng: &mut Xoshiro256PlusPlus) -> Self {
         let delay = Instant::now() + dists.i.sample_secs(rng);
-        log!("[start]");
         Self {
             dists,
             delay,
@@ -140,7 +143,6 @@ impl Conversation<Idle> {
 
 impl Conversation<Active> {
     fn waited(self, rng: &mut Xoshiro256PlusPlus) -> Conversation<Idle> {
-        log!("Active: [waited] tranision to [Idle]");
         let delay = Instant::now() + self.dists.i.sample_secs(rng);
         Conversation::<Idle> {
             dists: self.dists,
@@ -151,11 +153,9 @@ impl Conversation<Active> {
 
     async fn sleep(delay: Instant, wait: Instant) -> ActiveGroupActions {
         if delay < wait {
-            log!("delaying for {:?}", delay - Instant::now());
             tokio::time::sleep_until(delay).await;
             ActiveGroupActions::Send
         } else {
-            log!("waiting for {:?}", wait - Instant::now());
             tokio::time::sleep_until(wait).await;
             ActiveGroupActions::Idle
         }
@@ -237,12 +237,6 @@ async fn send_action<
     rng: &mut Xoshiro256PlusPlus,
 ) -> StateMachine {
     let size = conversation.dists.m.sample(rng);
-    log!(
-        "sending message from {} to {} of size {}",
-        our_id,
-        group,
-        size
-    );
     let m = S::new(construct_message(
         our_id.to_string(),
         group.to_string(),
@@ -265,7 +259,18 @@ async fn send_action<
         }
     }
 
-    T::sent(conversation, rng)
+    let ret = T::sent(conversation, rng);
+
+    log!(
+        "{},{},send,{},{},{}",
+        our_id,
+        group,
+        T::NAME,
+        ret.name(),
+        size
+    );
+
+    ret
 }
 
 async fn receive_action<
@@ -283,9 +288,13 @@ async fn receive_action<
 ) -> StateMachine {
     match msg.body {
         mgen::MessageBody::Size(size) => {
+            let ret = T::received(conversation, rng);
             log!(
-                "{} got message from {} of size {}",
+                "{},{},receive,{},{},{},{}",
+                our_id,
                 msg.group,
+                T::NAME,
+                ret.name(),
                 msg.sender,
                 size
             );
@@ -295,9 +304,19 @@ async fn receive_action<
                 .channel
                 .send(S::new(m))
                 .expect("channel from receive_action to sender closed");
-            T::received(conversation, rng)
+            ret
+        }
+        mgen::MessageBody::Receipt => {
+            log!(
+                "{},{},receive,{},{},{},receipt",
+                our_id,
+                msg.group,
+                T::NAME,
+                T::NAME,
+                msg.sender
+            );
+            T::to_machine(conversation)
         }
-        mgen::MessageBody::Receipt => T::to_machine(conversation),
     }
 }
 
@@ -321,7 +340,7 @@ pub async fn manage_idle_conversation<
     group: &str,
     rng: &mut Xoshiro256PlusPlus,
 ) -> StateMachine {
-    log!("delaying for {:?}", conversation.delay - Instant::now());
+    log!("{},{},Idle", our_id, group);
     let action = tokio::select! {
         () = tokio::time::sleep_until(conversation.delay) => IdleGroupActions::Send,
 

+ 7 - 9
src/bin/peer.rs

@@ -146,15 +146,13 @@ async fn reader(
                 break;
             };
 
-            if msg.body != mgen::MessageBody::Receipt {
-                let group = msg.group.clone();
-                let channel_to_conversation = group_to_conversation_thread
-                    .get(&group)
-                    .unwrap_or_else(|| panic!("Unknown group: {}", group));
-                channel_to_conversation
-                    .send(msg)
-                    .expect("reader: Channel to group closed");
-            }
+            let group = msg.group.clone();
+            let channel_to_conversation = group_to_conversation_thread
+                .get(&group)
+                .unwrap_or_else(|| panic!("Unknown group: {}", group));
+            channel_to_conversation
+                .send(msg)
+                .expect("reader: Channel to group closed");
         }
     }
 }

+ 4 - 8
src/lib.rs

@@ -15,8 +15,10 @@ pub const MAX_BLOCKS_IN_BODY: u32 = (100 * 1024 * 1024) / PADDING_BLOCK_SIZE;
 #[macro_export]
 macro_rules! log {
     ( $( $x:expr ),* ) => {
-        print!("{}", chrono::offset::Utc::now().format("%F %T: "));
-        println!($( $x ),*)
+        println!("{}{}",
+                 chrono::offset::Utc::now().format("%F %T,%s.%f,"),
+                 format_args!($( $x ),*)
+        );
     }
 }
 
@@ -205,12 +207,6 @@ async fn get_message_with_header_size<T: AsyncReadExt + std::marker::Unpin>(
     let mut header_buf = vec![0; header_size as usize];
     stream.read_exact(&mut header_buf[4..]).await?;
     let header = MessageHeader::deserialize(&header_buf[4..])?;
-    log!(
-        "got header from {} to {}, about to read {} bytes",
-        header.sender,
-        header.group,
-        header.body.size()
-    );
     let header_size_buf = &mut header_buf[..4];
     header_size_buf.copy_from_slice(&header_size_bytes);
     copy(&mut stream.take(header.body.size() as u64), &mut sink()).await?;