Преглед на файлове

add an id to messages and log it

Justin Tracey преди 1 година
родител
ревизия
f45118e861
променени са 5 файла, в които са добавени 68 реда и са изтрити 16 реда
  1. 9 2
      src/bin/messenger/message.rs
  2. 21 7
      src/bin/messenger/state.rs
  3. 0 1
      src/bin/mgen-peer.rs
  4. 16 3
      src/bin/mgen-server.rs
  5. 22 3
      src/lib.rs

+ 9 - 2
src/bin/messenger/message.rs

@@ -2,20 +2,27 @@
 // (Most message functionality is in the library, not this module.)
 
 /// Construct and serialize a message from the sender to the recipients with the given number of blocks.
-pub fn construct_message(sender: String, group: String, blocks: u32) -> mgen::SerializedMessage {
+pub fn construct_message(
+    sender: String,
+    group: String,
+    id: u32,
+    blocks: u32,
+) -> mgen::SerializedMessage {
     let size = std::cmp::max(blocks, 1) * mgen::PADDING_BLOCK_SIZE;
     let m = mgen::MessageHeader {
         sender,
         group,
+        id,
         body: mgen::MessageBody::Size(std::num::NonZeroU32::new(size).unwrap()),
     };
     m.serialize()
 }
 
-pub fn construct_receipt(sender: String, recipient: String) -> mgen::SerializedMessage {
+pub fn construct_receipt(sender: String, recipient: String, id: u32) -> mgen::SerializedMessage {
     let m = mgen::MessageHeader {
         sender,
         group: recipient,
+        id,
         body: mgen::MessageBody::Receipt,
     };
     m.serialize()

+ 21 - 7
src/bin/messenger/state.rs

@@ -38,6 +38,7 @@ impl StateMachine {
 pub struct Conversation<S: State> {
     dists: Distributions,
     delay: Instant,
+    next_id: u32,
     state: S,
 }
 
@@ -62,6 +63,7 @@ pub struct Active {
 impl State for Idle {
     const NAME: &'static str = "Idle";
     fn sent(conversation: Conversation<Self>, rng: &mut Xoshiro256PlusPlus) -> StateMachine {
+        let next_id = conversation.next_id + 1;
         if conversation.dists.s.sample(rng) {
             let delay = Instant::now() + conversation.dists.a_s.sample_secs(rng);
             let wait = Instant::now() + conversation.dists.w.sample_secs(rng);
@@ -69,6 +71,7 @@ impl State for Idle {
                 Conversation::<Active> {
                     dists: conversation.dists,
                     delay,
+                    next_id,
                     state: Active { wait },
                 }
             })
@@ -78,6 +81,7 @@ impl State for Idle {
                 Conversation::<Idle> {
                     dists: conversation.dists,
                     delay,
+                    next_id,
                     state: Idle {},
                 }
             })
@@ -92,6 +96,7 @@ impl State for Idle {
                 Conversation::<Active> {
                     dists: conversation.dists,
                     delay,
+                    next_id: conversation.next_id,
                     state: Active { wait },
                 }
             })
@@ -112,6 +117,7 @@ impl State for Active {
         StateMachine::Active(Conversation::<Active> {
             dists: conversation.dists,
             delay,
+            next_id: conversation.next_id + 1,
             state: conversation.state,
         })
     }
@@ -121,6 +127,7 @@ impl State for Active {
         StateMachine::Active(Conversation::<Active> {
             dists: conversation.dists,
             delay,
+            next_id: conversation.next_id,
             state: conversation.state,
         })
     }
@@ -136,6 +143,7 @@ impl Conversation<Idle> {
         Self {
             dists,
             delay,
+            next_id: 0,
             state: Idle {},
         }
     }
@@ -147,6 +155,7 @@ impl Conversation<Active> {
         Conversation::<Idle> {
             dists: self.dists,
             delay,
+            next_id: self.next_id,
             state: Idle {},
         }
     }
@@ -237,9 +246,11 @@ async fn send_action<
     rng: &mut Xoshiro256PlusPlus,
 ) -> StateMachine {
     let size = conversation.dists.m.sample(rng);
+    let id = conversation.next_id;
     let m = S::new(construct_message(
         our_id.to_string(),
         group.to_string(),
+        id,
         size,
     ));
 
@@ -262,12 +273,13 @@ async fn send_action<
     let ret = T::sent(conversation, rng);
 
     log!(
-        "{},{},send,{},{},{}",
+        "{},{},send,{},{},{},{}",
         our_id,
         group,
         T::NAME,
         ret.name(),
-        size
+        size,
+        id
     );
 
     ret
@@ -291,13 +303,14 @@ async fn receive_action<
         mgen::MessageBody::Size(size) => {
             let ret = T::received(conversation, rng);
             log!(
-                "{},{},receive,{},{},{},{}",
+                "{},{},receive,{},{},{},{},{}",
                 our_id,
                 msg.group,
                 T::NAME,
                 ret.name(),
                 msg.sender,
-                size
+                size,
+                msg.id
             );
             let stream = stream_map.channel_for(&msg.sender);
             let recipient = if group.is_none() {
@@ -305,7 +318,7 @@ async fn receive_action<
             } else {
                 msg.sender
             };
-            let m = construct_receipt(our_id.to_string(), recipient);
+            let m = construct_receipt(our_id.to_string(), recipient, msg.id);
             stream
                 .channel
                 .send(S::new(m))
@@ -318,12 +331,13 @@ async fn receive_action<
                 None => &msg.group,
             };
             log!(
-                "{},{},receive,{},{},{},receipt",
+                "{},{},receive,{},{},{},receipt,{}",
                 our_id,
                 group,
                 T::NAME,
                 T::NAME,
-                msg.sender
+                msg.sender,
+                msg.id
             );
             T::to_machine(conversation)
         }

+ 0 - 1
src/bin/mgen-peer.rs

@@ -435,7 +435,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let _ = args.next();
     let hosts_file = std::fs::read_to_string(args.next().unwrap())?;
     let hosts_map = parse_hosts_file(&hosts_file);
-    println!("{:?}", hosts_map);
 
     let mut handles = vec![];
 

+ 16 - 3
src/bin/mgen-server.rs

@@ -165,7 +165,7 @@ async fn get_messages(
     match message.body {
         MessageBody::Size(_) => {
             assert!(message.group == group);
-            log!("received,{},{}", sender, group);
+            log!("received,{},{},{}", sender, group, message.id);
             let body = message.body;
             let m = Arc::new(SerializedMessage { header: buf, body });
             for recipient in message_channels.iter() {
@@ -173,7 +173,13 @@ async fn get_messages(
             }
         }
         MessageBody::Receipt => {
-            log!("receipt,{},{}", sender, group);
+            log!(
+                "receipt,{},{},{},{}",
+                sender,
+                group,
+                message.group,
+                message.id
+            );
             let recipient = &db[message.group];
             let body = message.body;
             let m = Arc::new(SerializedMessage { header: buf, body });
@@ -190,7 +196,7 @@ async fn get_messages(
         match message.body {
             MessageBody::Size(_) => {
                 assert!(message.group == group);
-                log!("received,{},{}", sender, group);
+                log!("received,{},{},{}", sender, group, message.id);
                 let body = message.body;
                 let m = Arc::new(SerializedMessage { header: buf, body });
                 for recipient in message_channels.iter() {
@@ -198,6 +204,13 @@ async fn get_messages(
                 }
             }
             MessageBody::Receipt => {
+                log!(
+                    "receipt,{},{},{},{}",
+                    sender,
+                    group,
+                    message.group,
+                    message.id
+                );
                 let recipient = &db[message.group];
                 let body = message.body;
                 let m = Arc::new(SerializedMessage { header: buf, body });

+ 22 - 3
src/lib.rs

@@ -78,6 +78,8 @@ pub struct MessageHeader {
     /// Group associated with the message.
     /// In client-server mode receipts, this is the recipient instead.
     pub group: String,
+    /// ID unique to a message and its receipt for a (sender, group) pair.
+    pub id: u32,
     /// The type and size of the message payload.
     pub body: MessageBody,
 }
@@ -87,8 +89,9 @@ impl MessageHeader {
     pub fn serialize(&self) -> SerializedMessage {
         // serialized message header: {
         //   header_len: u32,
-        //   sender: {u32, utf-8}
-        //   group: {u32, utf-8}
+        //   sender: {u32, utf-8},
+        //   group: {u32, utf-8},
+        //   id: u32,
         //   body_type: MessageBody (i.e., u32)
         // }
 
@@ -97,7 +100,8 @@ impl MessageHeader {
             MessageBody::Size(s) => s.get(),
         };
 
-        let header_len = (1 + 1 + 1 + 1) * size_of::<u32>() + self.sender.len() + self.group.len();
+        let header_len =
+            (1 + 1 + 1 + 1 + 1) * size_of::<u32>() + self.sender.len() + self.group.len();
 
         let mut header: Vec<u8> = Vec::with_capacity(header_len);
 
@@ -107,6 +111,8 @@ impl MessageHeader {
         serialize_str_to(&self.sender, &mut header);
         serialize_str_to(&self.group, &mut header);
 
+        header.extend(self.id.to_be_bytes());
+
         header.extend(body_type.to_be_bytes());
 
         assert!(header.len() == header_len as usize);
@@ -125,6 +131,8 @@ impl MessageHeader {
         let (group, buf) = deserialize_str(buf)?;
         let group = group.to_string();
 
+        let (id, buf) = deserialize_u32(buf)?;
+
         let (body, _) = deserialize_u32(buf)?;
         let body = if let Some(size) = NonZeroU32::new(body) {
             MessageBody::Size(size)
@@ -134,6 +142,7 @@ impl MessageHeader {
         Ok(Self {
             sender,
             group,
+            id,
             body,
         })
     }
@@ -146,6 +155,7 @@ impl MessageHeader {
 pub struct MessageHeaderRef<'a> {
     pub sender: &'a str,
     pub group: &'a str,
+    pub id: u32,
     pub body: MessageBody,
 }
 
@@ -159,6 +169,8 @@ impl<'a> MessageHeaderRef<'a> {
         let (group, buf) = deserialize_str(buf)?;
         let group = group;
 
+        let (id, buf) = deserialize_u32(buf)?;
+
         let (body, _) = deserialize_u32(buf)?;
         let body = if let Some(size) = NonZeroU32::new(body) {
             MessageBody::Size(size)
@@ -168,6 +180,7 @@ impl<'a> MessageHeaderRef<'a> {
         Ok(Self {
             sender,
             group,
+            id,
             body,
         })
     }
@@ -384,6 +397,7 @@ mod tests {
         let m1 = MessageHeader {
             sender: "Alice".to_string(),
             group: "group".to_string(),
+            id: 1024,
             body: MessageBody::Size(NonZeroU32::new(256).unwrap()),
         };
 
@@ -398,6 +412,7 @@ mod tests {
         let m1 = MessageHeader {
             sender: "Alice".to_string(),
             group: "group".to_string(),
+            id: 1024,
             body: MessageBody::Receipt,
         };
 
@@ -412,6 +427,7 @@ mod tests {
         let m1 = MessageHeader {
             sender: "Alice".to_string(),
             group: "group".to_string(),
+            id: 1024,
             body: MessageBody::Size(NonZeroU32::new(256).unwrap()),
         };
 
@@ -429,6 +445,7 @@ mod tests {
         let m1 = MessageHeader {
             sender: "Alice".to_string(),
             group: "group".to_string(),
+            id: 1024,
             body: MessageBody::Receipt,
         };
 
@@ -446,6 +463,7 @@ mod tests {
         let m1 = MessageHeader {
             sender: "Alice".to_string(),
             group: "group".to_string(),
+            id: 1024,
             body: MessageBody::Size(NonZeroU32::new(256).unwrap()),
         };
 
@@ -466,6 +484,7 @@ mod tests {
         let m1 = MessageHeader {
             sender: "Alice".to_string(),
             group: "group".to_string(),
+            id: 1024,
             body: MessageBody::Receipt,
         };