use std::collections::HashMap; use std::{error::Error, fs::File, path::Path, process}; use serde::{ser::SerializeSeq, Deserialize, Serializer}; #[derive(Debug, Deserialize)] struct Record { _hr_time: String, time: f64, // FIXME: use a better type, either a real datetime or our own two ints user: String, group: String, action_data: Vec, } struct RunningValue { sent_count: u32, receipt_count: u32, running_rtt_mean: f64, } type BootstrapTable = HashMap<(String, String), f64>; struct Serializers { rtt_all: T, rtt_mean: T, rtt_count: T, } fn process_log( file: &Path, filter_time: f64, bootstrap_table: &BootstrapTable, serializers: &mut Serializers, ) -> Result<(), Box> where T: SerializeSeq, { let mut sent_times: HashMap<(String, String, u32), f64> = HashMap::new(); let mut running_values: HashMap<(String, String), RunningValue> = HashMap::new(); let file = File::open(file)?; let mut rdr = csv::ReaderBuilder::new() .has_headers(false) .flexible(true) .from_reader(file); for result in rdr.deserialize() { let record: Record = match result { Ok(record) => record, Err(e) => { eprintln!("bad record: {:?}", e); continue; } }; if record.time <= filter_time { continue; } match record.action_data[0].as_str() { "send" => { let id = record.action_data[4].parse()?; sent_times.insert((record.user.clone(), record.group.clone(), id), record.time); running_values .entry((record.user, record.group)) .and_modify(|running_value| running_value.sent_count += 1) .or_insert(RunningValue { sent_count: 1, receipt_count: 0, running_rtt_mean: 0.0, }); } "receive" => { if record.action_data[4] != "receipt" { continue; } let sender = &record.action_data[3]; let id = record.action_data[5].parse()?; let key = (sender.to_string(), record.group); let bootstrap_time = bootstrap_table.get(&key).unwrap_or_else(|| { panic!("could not find key {:?}", key); }); let key = (record.user, key.1, id); let Some(sent_time) = sent_times.get(&key) else { // this should never happen in the client-server case, // but we filter out early conversation in the p2p case //eprintln!("receipt for unknown message: {:?}", key); //panic!(); continue; }; if bootstrap_time > sent_time { // the message was sent while the recipient was still bootstrapping, // don't count its receipt towards the rtt stats continue; } let rtt: f64 = record.time - sent_time; serializers .rtt_all .serialize_element(&rtt) .unwrap_or_else(|e| { panic!("unable to serialize rtt: {:?}", e); }); let key = (key.0, key.1); running_values.entry(key).and_modify(|running_value| { running_value.receipt_count += 1; running_value.running_rtt_mean = running_value.running_rtt_mean + (rtt - running_value.running_rtt_mean) / (running_value.receipt_count as f64); }); } _ => (), } } for value in running_values.into_values() { serializers .rtt_mean .serialize_element(&value.running_rtt_mean) .unwrap_or_else(|e| { panic!("unable to serialize rtt mean: {:?}", e); }); serializers .rtt_count .serialize_element(&value.sent_count) .unwrap_or_else(|e| { panic!("unable to serialize rtt count: {:?}", e); }); } Ok(()) } #[derive(Debug, Deserialize)] struct ConversationConfig { group: String, bootstrap: Option, } #[derive(Debug, Deserialize)] struct Config { user: String, bootstrap: f64, conversations: Vec, } fn build_bootstrap_table(path: &str) -> Result> { let mut ret = HashMap::new(); let configs = glob::glob(&format!("{}/shadow.data/hosts/*/user*.yaml", path))?; for config in configs { let config = config?; println!("{:?}", config); let config: Config = serde_yaml::from_reader(File::open(config)?)?; for conversation in config.conversations { let key = (config.user.clone(), conversation.group); let bootstrap = conversation.bootstrap.unwrap_or(config.bootstrap); ret.insert(key, bootstrap); } } Ok(ret) } fn core(path: Option) -> Result<(), Box> { // FIXME: turn these all into clap args let filter_until_minute = 20.0; let path = path.unwrap_or(".".to_string()); let bootstrap_table = build_bootstrap_table(&path)?; let filter_time: f64 = 946684800.0 + filter_until_minute * 60.0; let rtt_all_file = File::create("rtt_all.mgen.json")?; let rtt_mean_file = File::create("rtt_mean.mgen.json")?; let rtt_count_file = File::create("counts.mgen.json")?; let mut rtt_all_ser = serde_json::Serializer::new(rtt_all_file); let mut rtt_mean_ser = serde_json::Serializer::new(rtt_mean_file); let mut rtt_count_ser = serde_json::Serializer::new(rtt_count_file); let rtt_all = rtt_all_ser.serialize_seq(None)?; let rtt_mean = rtt_mean_ser.serialize_seq(None)?; let rtt_count = rtt_count_ser.serialize_seq(None)?; let mut serializers = Serializers { rtt_all, rtt_mean, rtt_count, }; let logs = glob::glob(&format!( "{}/shadow.data/hosts/*client*/mgen-*.stdout", path ))?; for log in logs { process_log(&log?, filter_time, &bootstrap_table, &mut serializers)?; } serializers.rtt_all.end()?; serializers.rtt_mean.end()?; serializers.rtt_count.end()?; Ok(()) } fn main() { let mut args = std::env::args(); let _ = args.next(); let path = args.next(); if let Err(err) = core(path) { println!("error running example: {}", err); process::exit(1); } }