2 Commits b5f4239b01 ... f07913639c

Author SHA1 Message Date
  Justin Tracey f07913639c mgentools: fix bugs and workarounds 1 month ago
  Justin Tracey 6a08a0cc01 mgentools: make less verbose 1 month ago
1 changed files with 104 additions and 26 deletions
  1. 104 26
      mgentools/src/main.rs

+ 104 - 26
mgentools/src/main.rs

@@ -3,6 +3,11 @@ use std::{error::Error, fs::File, path::Path, process};
 
 use serde::{ser::SerializeSeq, Deserialize, Serializer};
 
+// default shadow simulation start
+// Sat Jan  1 12:00:00 AM UTC 2000
+// expressed as a unix timestamp
+const EXPERIMENT_START: f64 = 946684800.0;
+
 #[derive(Debug, Deserialize)]
 struct Record {
     _hr_time: String,
@@ -23,7 +28,7 @@ type BootstrapTable = HashMap<(String, String), f64>;
 struct Serializers<T: SerializeSeq> {
     rtt_all: T,
     rtt_mean: T,
-    rtt_count: T,
+    sent_count: T,
 }
 
 fn process_log<T>(
@@ -46,8 +51,8 @@ where
     for result in rdr.deserialize() {
         let record: Record = match result {
             Ok(record) => record,
-            Err(e) => {
-                eprintln!("bad record: {:?}", e);
+            Err(_e) => {
+                //eprintln!("bad record: {:?}", e);
                 continue;
             }
         };
@@ -124,7 +129,7 @@ where
                 panic!("unable to serialize rtt mean: {:?}", e);
             });
         serializers
-            .rtt_count
+            .sent_count
             .serialize_element(&value.sent_count)
             .unwrap_or_else(|e| {
                 panic!("unable to serialize rtt count: {:?}", e);
@@ -141,52 +146,125 @@ struct ConversationConfig {
 }
 
 #[derive(Debug, Deserialize)]
-struct Config {
+struct UserConfig {
     user: String,
     bootstrap: f64,
     conversations: Vec<ConversationConfig>,
 }
 
-fn build_bootstrap_table(path: &str) -> Result<BootstrapTable, Box<dyn Error>> {
+fn parse_time(time: &str) -> Result<f64, Box<dyn Error>> {
+    let suffix = time.as_bytes()[time.len() - 1];
+    Ok(match suffix {
+        b's' => time[..time.len() - 1].parse::<f64>()?, // seconds
+        b'm' => time[..time.len() - 1].parse::<f64>()? * 60.0, // minutes
+        b'h' => time[..time.len() - 1].parse::<f64>()? * 60.0 * 60.0, // hours
+        _ => time.parse::<f64>()?, // default is seconds, anything else is an error
+    })
+}
+
+#[derive(Debug, Deserialize)]
+struct ShadowGeneralConfig {
+    bootstrap_end_time: Option<String>,
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(untagged)]
+enum ShadowProcessArgs {
+    List(Vec<String>),
+    Command(String),
+}
+
+#[derive(Debug, Deserialize)]
+struct ShadowProcessConfig {
+    args: ShadowProcessArgs,
+    path: String,
+    start_time: String,
+}
+
+#[derive(Debug, Deserialize)]
+struct ShadowHostConfig {
+    processes: Vec<ShadowProcessConfig>,
+}
+
+#[derive(Debug, Deserialize)]
+struct ShadowConfig {
+    general: ShadowGeneralConfig,
+    hosts: HashMap<String, ShadowHostConfig>,
+}
+
+fn build_bootstrap_table(path: &str) -> Result<(f64, BootstrapTable), Box<dyn Error>> {
+    let shadow_config: ShadowConfig =
+        serde_yaml::from_reader(File::open(format!("{}/shadow.config.yaml", path))?)?;
+
+    let bootstrap_end_time =
+        if let Some(bootstrap_end_time) = shadow_config.general.bootstrap_end_time {
+            parse_time(&bootstrap_end_time)?
+        } else {
+            0.0
+        };
+    let filter_time = bootstrap_end_time + EXPERIMENT_START;
+
     let mut ret = HashMap::new();
-    let configs = glob::glob(&format!("{}/shadow.data/hosts/*/user*.yaml", path))?;
-    for config in configs {
-        let config = config?;
-        println!("{:?}", config);
-        let config: Config = serde_yaml::from_reader(File::open(config)?)?;
-        for conversation in config.conversations {
-            let key = (config.user.clone(), conversation.group);
-            let bootstrap = conversation.bootstrap.unwrap_or(config.bootstrap);
-            ret.insert(key, bootstrap);
+    for (host_name, host_config) in shadow_config.hosts {
+        for proc in host_config.processes {
+            if !proc.path.ends_with("mgen-client") && !proc.path.ends_with("mgen-peer") {
+                continue;
+            }
+            let split_args: Vec<_> = match proc.args {
+                ShadowProcessArgs::List(commands) => commands,
+                ShadowProcessArgs::Command(command) => command
+                    .split_ascii_whitespace()
+                    .map(|s| s.to_string())
+                    .collect(),
+            };
+            let configs_in_args = split_args
+                .into_iter()
+                .filter_map(|arg| {
+                    if arg.contains(".yaml") {
+                        let glob_string =
+                            format!("{}/shadow.data/hosts/{}/{}", path, host_name, arg,);
+                        Some(glob::glob(&glob_string).expect(&glob_string))
+                    } else {
+                        None
+                    }
+                })
+                .flatten();
+            for config in configs_in_args {
+                let config: UserConfig = serde_yaml::from_reader(File::open(config?)?)?;
+                for conversation in config.conversations {
+                    let key = (config.user.clone(), conversation.group);
+                    let bootstrap = EXPERIMENT_START
+                        + parse_time(&proc.start_time)?
+                        + conversation.bootstrap.unwrap_or(config.bootstrap);
+                    ret.insert(key, bootstrap);
+                }
+            }
         }
     }
-    Ok(ret)
+    Ok((filter_time, ret))
 }
 
 fn core(path: Option<String>) -> Result<(), Box<dyn Error>> {
-    // FIXME: turn these all into clap args
-    let filter_until_minute = 20.0;
     let path = path.unwrap_or(".".to_string());
 
-    let bootstrap_table = build_bootstrap_table(&path)?;
+    let (filter_time, bootstrap_table) = build_bootstrap_table(&path)?;
 
-    let filter_time: f64 = 946684800.0 + filter_until_minute * 60.0;
     let rtt_all_file = File::create("rtt_all.mgen.json")?;
     let rtt_mean_file = File::create("rtt_mean.mgen.json")?;
-    let rtt_count_file = File::create("counts.mgen.json")?;
+    let sent_count_file = File::create("counts.mgen.json")?;
 
     let mut rtt_all_ser = serde_json::Serializer::new(rtt_all_file);
     let mut rtt_mean_ser = serde_json::Serializer::new(rtt_mean_file);
-    let mut rtt_count_ser = serde_json::Serializer::new(rtt_count_file);
+    let mut sent_count_ser = serde_json::Serializer::new(sent_count_file);
 
     let rtt_all = rtt_all_ser.serialize_seq(None)?;
     let rtt_mean = rtt_mean_ser.serialize_seq(None)?;
-    let rtt_count = rtt_count_ser.serialize_seq(None)?;
+    let sent_count = sent_count_ser.serialize_seq(None)?;
 
     let mut serializers = Serializers {
         rtt_all,
         rtt_mean,
-        rtt_count,
+        sent_count,
     };
 
     let logs = glob::glob(&format!(
@@ -198,7 +276,7 @@ fn core(path: Option<String>) -> Result<(), Box<dyn Error>> {
     }
     serializers.rtt_all.end()?;
     serializers.rtt_mean.end()?;
-    serializers.rtt_count.end()?;
+    serializers.sent_count.end()?;
     Ok(())
 }
 
@@ -207,7 +285,7 @@ fn main() {
     let _ = args.next();
     let path = args.next();
     if let Err(err) = core(path) {
-        println!("error running example: {}", err);
+        println!("error running core: {}", err);
         process::exit(1);
     }
 }