Browse Source

mgentools: assume a 30s timeout

Justin Tracey 3 weeks ago
parent
commit
16fcdfec84
1 changed files with 86 additions and 6 deletions
  1. 86 6
      mgentools/src/main.rs

+ 86 - 6
mgentools/src/main.rs

@@ -7,8 +7,10 @@ use serde::{ser::SerializeSeq, Deserialize, Serializer};
 // Sat Jan  1 12:00:00 AM UTC 2000
 // expressed as a unix timestamp
 const EXPERIMENT_START: f64 = 946684800.0;
+const TIMEOUT: f64 = 30.0;
 
 #[derive(Debug, Deserialize)]
+/// A record from the mgen log file.
 struct Record {
     _hr_time: String,
     time: f64, // FIXME: use a better type, either a real datetime or our own two ints
@@ -17,18 +19,26 @@ struct Record {
     action_data: Vec<String>,
 }
 
+/// Running values for a (user, group) tuple
 struct RunningValue {
     sent_count: u32,
-    receipt_count: u32,
+    sent_timeout: u32,
+    incoming_receipt_count: u32,
+    outgoing_receipt_count: u32,
+    receipt_timeout: u32,
     running_rtt_mean: f64,
 }
 
+/// When a (user, group) starts sending messages back
 type BootstrapTable = HashMap<(String, String), f64>;
 
 struct Serializers<T: SerializeSeq> {
     rtt_all: T,
+    rtt_timeout: T,
     rtt_mean: T,
     sent_count: T,
+    timeout_by_send: T,
+    timeout_by_receive: T,
 }
 
 fn process_log<T>(
@@ -70,7 +80,10 @@ where
                     .and_modify(|running_value| running_value.sent_count += 1)
                     .or_insert(RunningValue {
                         sent_count: 1,
-                        receipt_count: 0,
+                        sent_timeout: 0,
+                        incoming_receipt_count: 0,
+                        outgoing_receipt_count: 0,
+                        receipt_timeout: 0,
                         running_rtt_mean: 0.0,
                     });
             }
@@ -81,7 +94,7 @@ where
 
                 let sender = &record.action_data[3];
                 let id = record.action_data[5].parse()?;
-                let key = (sender.to_string(), record.group);
+                let key = (sender.to_string(), record.group.clone());
                 let bootstrap_time = bootstrap_table.get(&key).unwrap_or_else(|| {
                     panic!("could not find key {:?}", key);
                 });
@@ -108,14 +121,40 @@ where
                     .unwrap_or_else(|e| {
                         panic!("unable to serialize rtt: {:?}", e);
                     });
+                if rtt <= TIMEOUT {
+                    serializers
+                        .rtt_timeout
+                        .serialize_element(&rtt)
+                        .unwrap_or_else(|e| {
+                            panic!("unable to serialize rtt: {:?}", e);
+                        });
+                }
 
                 let key = (key.0, key.1);
                 running_values.entry(key).and_modify(|running_value| {
-                    running_value.receipt_count += 1;
+                    running_value.incoming_receipt_count += 1;
+                    if rtt > TIMEOUT {
+                        running_value.sent_timeout += 1;
+                    }
                     running_value.running_rtt_mean = running_value.running_rtt_mean
                         + (rtt - running_value.running_rtt_mean)
-                            / (running_value.receipt_count as f64);
+                            / (running_value.incoming_receipt_count as f64);
                 });
+
+                let key = (sender.to_string(), record.group);
+                let receipt_sender = running_values.entry(key).or_insert(RunningValue {
+                    sent_count: 0,
+                    sent_timeout: 0,
+                    incoming_receipt_count: 0,
+                    outgoing_receipt_count: 0,
+                    receipt_timeout: 0,
+                    running_rtt_mean: 0.0,
+                });
+
+                receipt_sender.outgoing_receipt_count += 1;
+                if rtt > TIMEOUT {
+                    receipt_sender.receipt_timeout += 1;
+                }
             }
             _ => (),
         }
@@ -134,6 +173,30 @@ where
             .unwrap_or_else(|e| {
                 panic!("unable to serialize rtt count: {:?}", e);
             });
+        if value.incoming_receipt_count != 0 {
+            serializers
+                .timeout_by_send
+                .serialize_element(
+                    &(value.sent_timeout as f64 / value.incoming_receipt_count as f64),
+                )
+                .unwrap_or_else(|e| {
+                    panic!("unable to serialize rtt count: {:?}", e);
+                });
+        } else {
+            assert_eq!(value.sent_timeout, value.incoming_receipt_count);
+        }
+        if value.outgoing_receipt_count != 0 {
+            serializers
+                .timeout_by_receive
+                .serialize_element(
+                    &(value.receipt_timeout as f64 / value.outgoing_receipt_count as f64),
+                )
+                .unwrap_or_else(|e| {
+                    panic!("unable to serialize rtt count: {:?}", e);
+                });
+        } else {
+            assert_eq!(value.receipt_timeout, value.outgoing_receipt_count);
+        }
     }
 
     Ok(())
@@ -247,24 +310,38 @@ fn build_bootstrap_table(path: &str) -> Result<(f64, BootstrapTable), Box<dyn Er
 fn core(path: Option<String>) -> Result<(), Box<dyn Error>> {
     let path = path.unwrap_or(".".to_string());
 
-    let (filter_time, bootstrap_table) = build_bootstrap_table(&path)?;
+    let (_filter_time, bootstrap_table) = build_bootstrap_table(&path)?;
+    // we actually don't set the full bootstrap as bootstrap, so we need to set this manually
+    let filter_time = EXPERIMENT_START + 20.0 * 60.0;
 
     let rtt_all_file = File::create("rtt_all.mgen.json")?;
+    let rtt_timeout_file = File::create("rtt_timeout.mgen.json")?;
     let rtt_mean_file = File::create("rtt_mean.mgen.json")?;
     let sent_count_file = File::create("counts.mgen.json")?;
+    let timeout_by_send_file = File::create("timeout_by_send.mgen.json")?;
+    let timeout_by_receive_file = File::create("timeout_by_receive.mgen.json")?;
 
     let mut rtt_all_ser = serde_json::Serializer::new(rtt_all_file);
+    let mut rtt_timeout_ser = serde_json::Serializer::new(rtt_timeout_file);
     let mut rtt_mean_ser = serde_json::Serializer::new(rtt_mean_file);
     let mut sent_count_ser = serde_json::Serializer::new(sent_count_file);
+    let mut timeout_by_send_ser = serde_json::Serializer::new(timeout_by_send_file);
+    let mut timeout_by_receive_ser = serde_json::Serializer::new(timeout_by_receive_file);
 
     let rtt_all = rtt_all_ser.serialize_seq(None)?;
+    let rtt_timeout = rtt_timeout_ser.serialize_seq(None)?;
     let rtt_mean = rtt_mean_ser.serialize_seq(None)?;
     let sent_count = sent_count_ser.serialize_seq(None)?;
+    let timeout_by_send = timeout_by_send_ser.serialize_seq(None)?;
+    let timeout_by_receive = timeout_by_receive_ser.serialize_seq(None)?;
 
     let mut serializers = Serializers {
         rtt_all,
+        rtt_timeout,
         rtt_mean,
         sent_count,
+        timeout_by_send,
+        timeout_by_receive,
     };
 
     let logs = glob::glob(&format!(
@@ -275,8 +352,11 @@ fn core(path: Option<String>) -> Result<(), Box<dyn Error>> {
         process_log(&log?, filter_time, &bootstrap_table, &mut serializers)?;
     }
     serializers.rtt_all.end()?;
+    serializers.rtt_timeout.end()?;
     serializers.rtt_mean.end()?;
     serializers.sent_count.end()?;
+    serializers.timeout_by_send.end()?;
+    serializers.timeout_by_receive.end()?;
     Ok(())
 }