use std::collections::HashMap; use std::{error::Error, fs::File, path::Path, process}; use serde::{ser::SerializeSeq, Deserialize, Serializer}; // default shadow simulation start // Sat Jan 1 12:00:00 AM UTC 2000 // expressed as a unix timestamp const EXPERIMENT_START: f64 = 946684800.0; #[derive(Debug, Deserialize)] struct Record { _hr_time: String, time: f64, // FIXME: use a better type, either a real datetime or our own two ints user: String, group: String, action_data: Vec, } struct RunningValue { sent_count: u32, receipt_count: u32, running_rtt_mean: f64, } type BootstrapTable = HashMap<(String, String), f64>; struct Serializers { rtt_all: T, rtt_mean: T, sent_count: T, } fn process_log( file: &Path, filter_time: f64, bootstrap_table: &BootstrapTable, serializers: &mut Serializers, ) -> Result<(), Box> where T: SerializeSeq, { let mut sent_times: HashMap<(String, String, u32), f64> = HashMap::new(); let mut running_values: HashMap<(String, String), RunningValue> = HashMap::new(); let file = File::open(file)?; let mut rdr = csv::ReaderBuilder::new() .has_headers(false) .flexible(true) .from_reader(file); for result in rdr.deserialize() { let record: Record = match result { Ok(record) => record, Err(_e) => { //eprintln!("bad record: {:?}", e); continue; } }; if record.time <= filter_time { continue; } match record.action_data[0].as_str() { "send" => { let id = record.action_data[4].parse()?; sent_times.insert((record.user.clone(), record.group.clone(), id), record.time); running_values .entry((record.user, record.group)) .and_modify(|running_value| running_value.sent_count += 1) .or_insert(RunningValue { sent_count: 1, receipt_count: 0, running_rtt_mean: 0.0, }); } "receive" => { if record.action_data[4] != "receipt" { continue; } let sender = &record.action_data[3]; let id = record.action_data[5].parse()?; let key = (sender.to_string(), record.group); let bootstrap_time = bootstrap_table.get(&key).unwrap_or_else(|| { panic!("could not find key {:?}", key); }); let key = (record.user, key.1, id); let Some(sent_time) = sent_times.get(&key) else { // this should never happen in the client-server case, // but we filter out early conversation in the p2p case //eprintln!("receipt for unknown message: {:?}", key); //panic!(); continue; }; if bootstrap_time > sent_time { // the message was sent while the recipient was still bootstrapping, // don't count its receipt towards the rtt stats continue; } let rtt: f64 = record.time - sent_time; serializers .rtt_all .serialize_element(&rtt) .unwrap_or_else(|e| { panic!("unable to serialize rtt: {:?}", e); }); let key = (key.0, key.1); running_values.entry(key).and_modify(|running_value| { running_value.receipt_count += 1; running_value.running_rtt_mean = running_value.running_rtt_mean + (rtt - running_value.running_rtt_mean) / (running_value.receipt_count as f64); }); } _ => (), } } for value in running_values.into_values() { serializers .rtt_mean .serialize_element(&value.running_rtt_mean) .unwrap_or_else(|e| { panic!("unable to serialize rtt mean: {:?}", e); }); serializers .sent_count .serialize_element(&value.sent_count) .unwrap_or_else(|e| { panic!("unable to serialize rtt count: {:?}", e); }); } Ok(()) } #[derive(Debug, Deserialize)] struct ConversationConfig { group: String, bootstrap: Option, } #[derive(Debug, Deserialize)] struct UserConfig { user: String, bootstrap: f64, conversations: Vec, } fn parse_time(time: &str) -> Result> { let suffix = time.as_bytes()[time.len() - 1]; Ok(match suffix { b's' => time[..time.len() - 1].parse::()?, // seconds b'm' => time[..time.len() - 1].parse::()? * 60.0, // minutes b'h' => time[..time.len() - 1].parse::()? * 60.0 * 60.0, // hours _ => time.parse::()?, // default is seconds, anything else is an error }) } #[derive(Debug, Deserialize)] struct ShadowGeneralConfig { bootstrap_end_time: Option, } #[derive(Debug, Deserialize)] #[serde(untagged)] enum ShadowProcessArgs { List(Vec), Command(String), } #[derive(Debug, Deserialize)] struct ShadowProcessConfig { args: ShadowProcessArgs, path: String, start_time: String, } #[derive(Debug, Deserialize)] struct ShadowHostConfig { processes: Vec, } #[derive(Debug, Deserialize)] struct ShadowConfig { general: ShadowGeneralConfig, hosts: HashMap, } fn build_bootstrap_table(path: &str) -> Result<(f64, BootstrapTable), Box> { let shadow_config: ShadowConfig = serde_yaml::from_reader(File::open(format!("{}/shadow.config.yaml", path))?)?; let bootstrap_end_time = if let Some(bootstrap_end_time) = shadow_config.general.bootstrap_end_time { parse_time(&bootstrap_end_time)? } else { 0.0 }; let filter_time = bootstrap_end_time + EXPERIMENT_START; let mut ret = HashMap::new(); for (host_name, host_config) in shadow_config.hosts { for proc in host_config.processes { if !proc.path.ends_with("mgen-client") && !proc.path.ends_with("mgen-peer") { continue; } let split_args: Vec<_> = match proc.args { ShadowProcessArgs::List(commands) => commands, ShadowProcessArgs::Command(command) => command .split_ascii_whitespace() .map(|s| s.to_string()) .collect(), }; let configs_in_args = split_args .into_iter() .filter_map(|arg| { if arg.contains(".yaml") { let glob_string = format!("{}/shadow.data/hosts/{}/{}", path, host_name, arg,); Some(glob::glob(&glob_string).expect(&glob_string)) } else { None } }) .flatten(); for config in configs_in_args { let config: UserConfig = serde_yaml::from_reader(File::open(config?)?)?; for conversation in config.conversations { let key = (config.user.clone(), conversation.group); let bootstrap = EXPERIMENT_START + parse_time(&proc.start_time)? + conversation.bootstrap.unwrap_or(config.bootstrap); ret.insert(key, bootstrap); } } } } Ok((filter_time, ret)) } fn core(path: Option) -> Result<(), Box> { let path = path.unwrap_or(".".to_string()); let (filter_time, bootstrap_table) = build_bootstrap_table(&path)?; let rtt_all_file = File::create("rtt_all.mgen.json")?; let rtt_mean_file = File::create("rtt_mean.mgen.json")?; let sent_count_file = File::create("counts.mgen.json")?; let mut rtt_all_ser = serde_json::Serializer::new(rtt_all_file); let mut rtt_mean_ser = serde_json::Serializer::new(rtt_mean_file); let mut sent_count_ser = serde_json::Serializer::new(sent_count_file); let rtt_all = rtt_all_ser.serialize_seq(None)?; let rtt_mean = rtt_mean_ser.serialize_seq(None)?; let sent_count = sent_count_ser.serialize_seq(None)?; let mut serializers = Serializers { rtt_all, rtt_mean, sent_count, }; let logs = glob::glob(&format!( "{}/shadow.data/hosts/*client*/mgen-*.stdout", path ))?; for log in logs { process_log(&log?, filter_time, &bootstrap_table, &mut serializers)?; } serializers.rtt_all.end()?; serializers.rtt_mean.end()?; serializers.sent_count.end()?; Ok(()) } fn main() { let mut args = std::env::args(); let _ = args.next(); let path = args.next(); if let Err(err) = core(path) { println!("error running core: {}", err); process::exit(1); } }