main.rs 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. use std::collections::HashMap;
  2. use std::{error::Error, fs::File, path::Path, process};
  3. use serde::{ser::SerializeSeq, Deserialize, Serializer};
  4. // default shadow simulation start
  5. // Sat Jan 1 12:00:00 AM UTC 2000
  6. // expressed as a unix timestamp
  7. const EXPERIMENT_START: f64 = 946684800.0;
  8. #[derive(Debug, Deserialize)]
  9. struct Record {
  10. _hr_time: String,
  11. time: f64, // FIXME: use a better type, either a real datetime or our own two ints
  12. user: String,
  13. group: String,
  14. action_data: Vec<String>,
  15. }
  16. struct RunningValue {
  17. sent_count: u32,
  18. receipt_count: u32,
  19. running_rtt_mean: f64,
  20. }
  21. type BootstrapTable = HashMap<(String, String), f64>;
  22. struct Serializers<T: SerializeSeq> {
  23. rtt_all: T,
  24. rtt_mean: T,
  25. sent_count: T,
  26. }
  27. fn process_log<T>(
  28. file: &Path,
  29. filter_time: f64,
  30. bootstrap_table: &BootstrapTable,
  31. serializers: &mut Serializers<T>,
  32. ) -> Result<(), Box<dyn Error>>
  33. where
  34. T: SerializeSeq,
  35. {
  36. let mut sent_times: HashMap<(String, String, u32), f64> = HashMap::new();
  37. let mut running_values: HashMap<(String, String), RunningValue> = HashMap::new();
  38. let file = File::open(file)?;
  39. let mut rdr = csv::ReaderBuilder::new()
  40. .has_headers(false)
  41. .flexible(true)
  42. .from_reader(file);
  43. for result in rdr.deserialize() {
  44. let record: Record = match result {
  45. Ok(record) => record,
  46. Err(_e) => {
  47. //eprintln!("bad record: {:?}", e);
  48. continue;
  49. }
  50. };
  51. if record.time <= filter_time {
  52. continue;
  53. }
  54. match record.action_data[0].as_str() {
  55. "send" => {
  56. let id = record.action_data[4].parse()?;
  57. sent_times.insert((record.user.clone(), record.group.clone(), id), record.time);
  58. running_values
  59. .entry((record.user, record.group))
  60. .and_modify(|running_value| running_value.sent_count += 1)
  61. .or_insert(RunningValue {
  62. sent_count: 1,
  63. receipt_count: 0,
  64. running_rtt_mean: 0.0,
  65. });
  66. }
  67. "receive" => {
  68. if record.action_data[4] != "receipt" {
  69. continue;
  70. }
  71. let sender = &record.action_data[3];
  72. let id = record.action_data[5].parse()?;
  73. let key = (sender.to_string(), record.group);
  74. let bootstrap_time = bootstrap_table.get(&key).unwrap_or_else(|| {
  75. panic!("could not find key {:?}", key);
  76. });
  77. let key = (record.user, key.1, id);
  78. let Some(sent_time) = sent_times.get(&key) else {
  79. // this should never happen in the client-server case,
  80. // but we filter out early conversation in the p2p case
  81. //eprintln!("receipt for unknown message: {:?}", key);
  82. //panic!();
  83. continue;
  84. };
  85. if bootstrap_time > sent_time {
  86. // the message was sent while the recipient was still bootstrapping,
  87. // don't count its receipt towards the rtt stats
  88. continue;
  89. }
  90. let rtt: f64 = record.time - sent_time;
  91. serializers
  92. .rtt_all
  93. .serialize_element(&rtt)
  94. .unwrap_or_else(|e| {
  95. panic!("unable to serialize rtt: {:?}", e);
  96. });
  97. let key = (key.0, key.1);
  98. running_values.entry(key).and_modify(|running_value| {
  99. running_value.receipt_count += 1;
  100. running_value.running_rtt_mean = running_value.running_rtt_mean
  101. + (rtt - running_value.running_rtt_mean)
  102. / (running_value.receipt_count as f64);
  103. });
  104. }
  105. _ => (),
  106. }
  107. }
  108. for value in running_values.into_values() {
  109. serializers
  110. .rtt_mean
  111. .serialize_element(&value.running_rtt_mean)
  112. .unwrap_or_else(|e| {
  113. panic!("unable to serialize rtt mean: {:?}", e);
  114. });
  115. serializers
  116. .sent_count
  117. .serialize_element(&value.sent_count)
  118. .unwrap_or_else(|e| {
  119. panic!("unable to serialize rtt count: {:?}", e);
  120. });
  121. }
  122. Ok(())
  123. }
  124. #[derive(Debug, Deserialize)]
  125. struct ConversationConfig {
  126. group: String,
  127. bootstrap: Option<f64>,
  128. }
  129. #[derive(Debug, Deserialize)]
  130. struct UserConfig {
  131. user: String,
  132. bootstrap: f64,
  133. conversations: Vec<ConversationConfig>,
  134. }
  135. fn parse_time(time: &str) -> Result<f64, Box<dyn Error>> {
  136. let suffix = time.as_bytes()[time.len() - 1];
  137. Ok(match suffix {
  138. b's' => time[..time.len() - 1].parse::<f64>()?, // seconds
  139. b'm' => time[..time.len() - 1].parse::<f64>()? * 60.0, // minutes
  140. b'h' => time[..time.len() - 1].parse::<f64>()? * 60.0 * 60.0, // hours
  141. _ => time.parse::<f64>()?, // default is seconds, anything else is an error
  142. })
  143. }
  144. #[derive(Debug, Deserialize)]
  145. struct ShadowGeneralConfig {
  146. bootstrap_end_time: Option<String>,
  147. }
  148. #[derive(Debug, Deserialize)]
  149. #[serde(untagged)]
  150. enum ShadowProcessArgs {
  151. List(Vec<String>),
  152. Command(String),
  153. }
  154. #[derive(Debug, Deserialize)]
  155. struct ShadowProcessConfig {
  156. args: ShadowProcessArgs,
  157. path: String,
  158. start_time: String,
  159. }
  160. #[derive(Debug, Deserialize)]
  161. struct ShadowHostConfig {
  162. processes: Vec<ShadowProcessConfig>,
  163. }
  164. #[derive(Debug, Deserialize)]
  165. struct ShadowConfig {
  166. general: ShadowGeneralConfig,
  167. hosts: HashMap<String, ShadowHostConfig>,
  168. }
  169. fn build_bootstrap_table(path: &str) -> Result<(f64, BootstrapTable), Box<dyn Error>> {
  170. let shadow_config: ShadowConfig =
  171. serde_yaml::from_reader(File::open(format!("{}/shadow.config.yaml", path))?)?;
  172. let bootstrap_end_time =
  173. if let Some(bootstrap_end_time) = shadow_config.general.bootstrap_end_time {
  174. parse_time(&bootstrap_end_time)?
  175. } else {
  176. 0.0
  177. };
  178. let filter_time = bootstrap_end_time + EXPERIMENT_START;
  179. let mut ret = HashMap::new();
  180. for (host_name, host_config) in shadow_config.hosts {
  181. for proc in host_config.processes {
  182. if !proc.path.ends_with("mgen-client") && !proc.path.ends_with("mgen-peer") {
  183. continue;
  184. }
  185. let split_args: Vec<_> = match proc.args {
  186. ShadowProcessArgs::List(commands) => commands,
  187. ShadowProcessArgs::Command(command) => command
  188. .split_ascii_whitespace()
  189. .map(|s| s.to_string())
  190. .collect(),
  191. };
  192. let configs_in_args = split_args
  193. .into_iter()
  194. .filter_map(|arg| {
  195. if arg.contains(".yaml") {
  196. let glob_string =
  197. format!("{}/shadow.data/hosts/{}/{}", path, host_name, arg,);
  198. Some(glob::glob(&glob_string).expect(&glob_string))
  199. } else {
  200. None
  201. }
  202. })
  203. .flatten();
  204. for config in configs_in_args {
  205. let config: UserConfig = serde_yaml::from_reader(File::open(config?)?)?;
  206. for conversation in config.conversations {
  207. let key = (config.user.clone(), conversation.group);
  208. let bootstrap = EXPERIMENT_START
  209. + parse_time(&proc.start_time)?
  210. + conversation.bootstrap.unwrap_or(config.bootstrap);
  211. ret.insert(key, bootstrap);
  212. }
  213. }
  214. }
  215. }
  216. Ok((filter_time, ret))
  217. }
  218. fn core(path: Option<String>) -> Result<(), Box<dyn Error>> {
  219. let path = path.unwrap_or(".".to_string());
  220. let (filter_time, bootstrap_table) = build_bootstrap_table(&path)?;
  221. let rtt_all_file = File::create("rtt_all.mgen.json")?;
  222. let rtt_mean_file = File::create("rtt_mean.mgen.json")?;
  223. let sent_count_file = File::create("counts.mgen.json")?;
  224. let mut rtt_all_ser = serde_json::Serializer::new(rtt_all_file);
  225. let mut rtt_mean_ser = serde_json::Serializer::new(rtt_mean_file);
  226. let mut sent_count_ser = serde_json::Serializer::new(sent_count_file);
  227. let rtt_all = rtt_all_ser.serialize_seq(None)?;
  228. let rtt_mean = rtt_mean_ser.serialize_seq(None)?;
  229. let sent_count = sent_count_ser.serialize_seq(None)?;
  230. let mut serializers = Serializers {
  231. rtt_all,
  232. rtt_mean,
  233. sent_count,
  234. };
  235. let logs = glob::glob(&format!(
  236. "{}/shadow.data/hosts/*client*/mgen-*.stdout",
  237. path
  238. ))?;
  239. for log in logs {
  240. process_log(&log?, filter_time, &bootstrap_table, &mut serializers)?;
  241. }
  242. serializers.rtt_all.end()?;
  243. serializers.rtt_mean.end()?;
  244. serializers.sent_count.end()?;
  245. Ok(())
  246. }
  247. fn main() {
  248. let mut args = std::env::args();
  249. let _ = args.next();
  250. let path = args.next();
  251. if let Err(err) = core(path) {
  252. println!("error running core: {}", err);
  253. process::exit(1);
  254. }
  255. }