Browse Source

make mgentools less hacky

Justin Tracey 9 months ago
parent
commit
b5f4239b01
2 changed files with 155 additions and 41 deletions
  1. 3 0
      mgentools/Cargo.toml
  2. 152 41
      mgentools/src/main.rs

+ 3 - 0
mgentools/Cargo.toml

@@ -7,4 +7,7 @@ edition = "2021"
 
 
 [dependencies]
 [dependencies]
 csv = "1.3.0"
 csv = "1.3.0"
+glob = "0.3.1"
 serde = { version = "1.0.195", features = ["serde_derive"] }
 serde = { version = "1.0.195", features = ["serde_derive"] }
+serde_json = "1.0.114"
+serde_yaml = "0.9.32"

+ 152 - 41
mgentools/src/main.rs

@@ -1,7 +1,7 @@
 use std::collections::HashMap;
 use std::collections::HashMap;
-use std::{error::Error, fs::File, io, io::Write, process};
+use std::{error::Error, fs::File, path::Path, process};
 
 
-use serde::Deserialize;
+use serde::{ser::SerializeSeq, Deserialize, Serializer};
 
 
 #[derive(Debug, Deserialize)]
 #[derive(Debug, Deserialize)]
 struct Record {
 struct Record {
@@ -18,23 +18,44 @@ struct RunningValue {
     running_rtt_mean: f64,
     running_rtt_mean: f64,
 }
 }
 
 
-fn core(path: Option<String>) -> Result<(), Box<dyn Error>> {
-    let mut rtt_all_file = File::create("rtt_all.mgen.json")?;
-    writeln!(rtt_all_file, "[")?;
+type BootstrapTable = HashMap<(String, String), f64>;
+
+struct Serializers<T: SerializeSeq> {
+    rtt_all: T,
+    rtt_mean: T,
+    rtt_count: T,
+}
 
 
+fn process_log<T>(
+    file: &Path,
+    filter_time: f64,
+    bootstrap_table: &BootstrapTable,
+    serializers: &mut Serializers<T>,
+) -> Result<(), Box<dyn Error>>
+where
+    T: SerializeSeq,
+{
     let mut sent_times: HashMap<(String, String, u32), f64> = HashMap::new();
     let mut sent_times: HashMap<(String, String, u32), f64> = HashMap::new();
     let mut running_values: HashMap<(String, String), RunningValue> = HashMap::new();
     let mut running_values: HashMap<(String, String), RunningValue> = HashMap::new();
-    let file: Box<dyn io::Read> = if let Some(path) = path {
-        Box::new(File::open(path)?)
-    } else {
-        Box::new(io::stdin())
-    };
+    let file = File::open(file)?;
+
     let mut rdr = csv::ReaderBuilder::new()
     let mut rdr = csv::ReaderBuilder::new()
         .has_headers(false)
         .has_headers(false)
         .flexible(true)
         .flexible(true)
         .from_reader(file);
         .from_reader(file);
     for result in rdr.deserialize() {
     for result in rdr.deserialize() {
-        let record: Record = result?;
+        let record: Record = match result {
+            Ok(record) => record,
+            Err(e) => {
+                eprintln!("bad record: {:?}", e);
+                continue;
+            }
+        };
+
+        if record.time <= filter_time {
+            continue;
+        }
+
         match record.action_data[0].as_str() {
         match record.action_data[0].as_str() {
             "send" => {
             "send" => {
                 let id = record.action_data[4].parse()?;
                 let id = record.action_data[4].parse()?;
@@ -49,48 +70,138 @@ fn core(path: Option<String>) -> Result<(), Box<dyn Error>> {
                     });
                     });
             }
             }
             "receive" => {
             "receive" => {
-                if record.action_data[4] == "receipt" {
-                    let id = record.action_data[5].parse()?;
-                    let key = (record.user, record.group, id);
-                    let Some(sent_time) = sent_times.get(&key) else {
-                        // this should never happen in the client-server case,
-                        // but we filter out early conversation in the p2p case
-                        //eprintln!("receipt for unknown message: {:?}", key);
-                        //panic!();
-                        continue;
-                    };
-                    let rtt = record.time - sent_time;
-                    writeln!(rtt_all_file, "  {},", rtt)?;
-
-                    let key = (key.0, key.1);
-                    running_values.entry(key).and_modify(|running_value| {
-                        running_value.receipt_count += 1;
-                        running_value.running_rtt_mean = running_value.running_rtt_mean
-                            + (rtt - running_value.running_rtt_mean)
-                                / (running_value.receipt_count as f64);
-                    });
+                if record.action_data[4] != "receipt" {
+                    continue;
                 }
                 }
+
+                let sender = &record.action_data[3];
+                let id = record.action_data[5].parse()?;
+                let key = (sender.to_string(), record.group);
+                let bootstrap_time = bootstrap_table.get(&key).unwrap_or_else(|| {
+                    panic!("could not find key {:?}", key);
+                });
+
+                let key = (record.user, key.1, id);
+                let Some(sent_time) = sent_times.get(&key) else {
+                    // this should never happen in the client-server case,
+                    // but we filter out early conversation in the p2p case
+                    //eprintln!("receipt for unknown message: {:?}", key);
+                    //panic!();
+                    continue;
+                };
+
+                if bootstrap_time > sent_time {
+                    // the message was sent while the recipient was still bootstrapping,
+                    // don't count its receipt towards the rtt stats
+                    continue;
+                }
+
+                let rtt: f64 = record.time - sent_time;
+                serializers
+                    .rtt_all
+                    .serialize_element(&rtt)
+                    .unwrap_or_else(|e| {
+                        panic!("unable to serialize rtt: {:?}", e);
+                    });
+
+                let key = (key.0, key.1);
+                running_values.entry(key).and_modify(|running_value| {
+                    running_value.receipt_count += 1;
+                    running_value.running_rtt_mean = running_value.running_rtt_mean
+                        + (rtt - running_value.running_rtt_mean)
+                            / (running_value.receipt_count as f64);
+                });
             }
             }
             _ => (),
             _ => (),
         }
         }
     }
     }
-    writeln!(rtt_all_file, "]")?;
-    drop(rtt_all_file);
 
 
-    let mut rtt_count_file = File::create("counts.mgen.json")?;
-    let mut rtt_mean_file = File::create("rtt_mean.mgen.json")?;
-    writeln!(rtt_count_file, "[")?;
-    writeln!(rtt_mean_file, "[")?;
     for value in running_values.into_values() {
     for value in running_values.into_values() {
-        writeln!(rtt_count_file, "  {},", value.sent_count)?;
-        writeln!(rtt_mean_file, "  {},", value.running_rtt_mean)?;
+        serializers
+            .rtt_mean
+            .serialize_element(&value.running_rtt_mean)
+            .unwrap_or_else(|e| {
+                panic!("unable to serialize rtt mean: {:?}", e);
+            });
+        serializers
+            .rtt_count
+            .serialize_element(&value.sent_count)
+            .unwrap_or_else(|e| {
+                panic!("unable to serialize rtt count: {:?}", e);
+            });
     }
     }
-    writeln!(rtt_count_file, "]")?;
-    writeln!(rtt_mean_file, "]")?;
 
 
     Ok(())
     Ok(())
 }
 }
 
 
+#[derive(Debug, Deserialize)]
+struct ConversationConfig {
+    group: String,
+    bootstrap: Option<f64>,
+}
+
+#[derive(Debug, Deserialize)]
+struct Config {
+    user: String,
+    bootstrap: f64,
+    conversations: Vec<ConversationConfig>,
+}
+
+fn build_bootstrap_table(path: &str) -> Result<BootstrapTable, Box<dyn Error>> {
+    let mut ret = HashMap::new();
+    let configs = glob::glob(&format!("{}/shadow.data/hosts/*/user*.yaml", path))?;
+    for config in configs {
+        let config = config?;
+        println!("{:?}", config);
+        let config: Config = serde_yaml::from_reader(File::open(config)?)?;
+        for conversation in config.conversations {
+            let key = (config.user.clone(), conversation.group);
+            let bootstrap = conversation.bootstrap.unwrap_or(config.bootstrap);
+            ret.insert(key, bootstrap);
+        }
+    }
+    Ok(ret)
+}
+
+fn core(path: Option<String>) -> Result<(), Box<dyn Error>> {
+    // FIXME: turn these all into clap args
+    let filter_until_minute = 20.0;
+    let path = path.unwrap_or(".".to_string());
+
+    let bootstrap_table = build_bootstrap_table(&path)?;
+
+    let filter_time: f64 = 946684800.0 + filter_until_minute * 60.0;
+    let rtt_all_file = File::create("rtt_all.mgen.json")?;
+    let rtt_mean_file = File::create("rtt_mean.mgen.json")?;
+    let rtt_count_file = File::create("counts.mgen.json")?;
+
+    let mut rtt_all_ser = serde_json::Serializer::new(rtt_all_file);
+    let mut rtt_mean_ser = serde_json::Serializer::new(rtt_mean_file);
+    let mut rtt_count_ser = serde_json::Serializer::new(rtt_count_file);
+
+    let rtt_all = rtt_all_ser.serialize_seq(None)?;
+    let rtt_mean = rtt_mean_ser.serialize_seq(None)?;
+    let rtt_count = rtt_count_ser.serialize_seq(None)?;
+
+    let mut serializers = Serializers {
+        rtt_all,
+        rtt_mean,
+        rtt_count,
+    };
+
+    let logs = glob::glob(&format!(
+        "{}/shadow.data/hosts/*client*/mgen-*.stdout",
+        path
+    ))?;
+    for log in logs {
+        process_log(&log?, filter_time, &bootstrap_table, &mut serializers)?;
+    }
+    serializers.rtt_all.end()?;
+    serializers.rtt_mean.end()?;
+    serializers.rtt_count.end()?;
+    Ok(())
+}
+
 fn main() {
 fn main() {
     let mut args = std::env::args();
     let mut args = std::env::args();
     let _ = args.next();
     let _ = args.next();