main.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. use std::collections::HashMap;
  2. use std::{error::Error, fs::File, path::Path, process};
  3. use serde::{ser::SerializeSeq, Deserialize, Serializer};
  4. // default shadow simulation start
  5. // Sat Jan 1 12:00:00 AM UTC 2000
  6. // expressed as a unix timestamp
  7. const EXPERIMENT_START: f64 = 946684800.0;
  8. const TIMEOUT: f64 = 30.0;
  9. #[derive(Debug, Deserialize)]
  10. /// A record from the mgen log file.
  11. struct Record {
  12. _hr_time: String,
  13. time: f64, // FIXME: use a better type, either a real datetime or our own two ints
  14. user: String,
  15. group: String,
  16. action_data: Vec<String>,
  17. }
  18. /// Running values for a (user, group) tuple
  19. struct RunningValue {
  20. sent_count: u32,
  21. sent_timeout: u32,
  22. incoming_receipt_count: u32,
  23. outgoing_receipt_count: u32,
  24. receipt_timeout: u32,
  25. running_rtt_mean: f64,
  26. }
  27. /// When a (user, group) starts sending messages back
  28. type BootstrapTable = HashMap<(String, String), f64>;
  29. struct Serializers<T: SerializeSeq> {
  30. rtt_all: T,
  31. rtt_timeout: T,
  32. rtt_mean: T,
  33. sent_count: T,
  34. timeout_by_send: T,
  35. timeout_by_receive: T,
  36. }
  37. fn process_log<T>(
  38. file: &Path,
  39. filter_time: f64,
  40. bootstrap_table: &BootstrapTable,
  41. serializers: &mut Serializers<T>,
  42. ) -> Result<(), Box<dyn Error>>
  43. where
  44. T: SerializeSeq,
  45. {
  46. let mut sent_times: HashMap<(String, String, u32), f64> = HashMap::new();
  47. let mut running_values: HashMap<(String, String), RunningValue> = HashMap::new();
  48. let file = File::open(file)?;
  49. let mut rdr = csv::ReaderBuilder::new()
  50. .has_headers(false)
  51. .flexible(true)
  52. .from_reader(file);
  53. for result in rdr.deserialize() {
  54. let record: Record = match result {
  55. Ok(record) => record,
  56. Err(_e) => {
  57. //eprintln!("bad record: {:?}", e);
  58. continue;
  59. }
  60. };
  61. if record.time <= filter_time {
  62. continue;
  63. }
  64. match record.action_data[0].as_str() {
  65. "send" => {
  66. let id = record.action_data[4].parse()?;
  67. sent_times.insert((record.user.clone(), record.group.clone(), id), record.time);
  68. running_values
  69. .entry((record.user, record.group))
  70. .and_modify(|running_value| running_value.sent_count += 1)
  71. .or_insert(RunningValue {
  72. sent_count: 1,
  73. sent_timeout: 0,
  74. incoming_receipt_count: 0,
  75. outgoing_receipt_count: 0,
  76. receipt_timeout: 0,
  77. running_rtt_mean: 0.0,
  78. });
  79. }
  80. "receive" => {
  81. if record.action_data[4] != "receipt" {
  82. continue;
  83. }
  84. let sender = &record.action_data[3];
  85. let id = record.action_data[5].parse()?;
  86. let key = (sender.to_string(), record.group.clone());
  87. let bootstrap_time = bootstrap_table.get(&key).unwrap_or_else(|| {
  88. panic!("could not find key {:?}", key);
  89. });
  90. let key = (record.user, key.1, id);
  91. let Some(sent_time) = sent_times.get(&key) else {
  92. // this should never happen in the client-server case,
  93. // but we filter out early conversation in the p2p case
  94. //eprintln!("receipt for unknown message: {:?}", key);
  95. //panic!();
  96. continue;
  97. };
  98. if bootstrap_time > sent_time {
  99. // the message was sent while the recipient was still bootstrapping,
  100. // don't count its receipt towards the rtt stats
  101. continue;
  102. }
  103. let rtt: f64 = record.time - sent_time;
  104. serializers
  105. .rtt_all
  106. .serialize_element(&rtt)
  107. .unwrap_or_else(|e| {
  108. panic!("unable to serialize rtt: {:?}", e);
  109. });
  110. if rtt <= TIMEOUT {
  111. serializers
  112. .rtt_timeout
  113. .serialize_element(&rtt)
  114. .unwrap_or_else(|e| {
  115. panic!("unable to serialize rtt: {:?}", e);
  116. });
  117. }
  118. let key = (key.0, key.1);
  119. running_values.entry(key).and_modify(|running_value| {
  120. running_value.incoming_receipt_count += 1;
  121. if rtt > TIMEOUT {
  122. running_value.sent_timeout += 1;
  123. }
  124. running_value.running_rtt_mean = running_value.running_rtt_mean
  125. + (rtt - running_value.running_rtt_mean)
  126. / (running_value.incoming_receipt_count as f64);
  127. });
  128. let key = (sender.to_string(), record.group);
  129. let receipt_sender = running_values.entry(key).or_insert(RunningValue {
  130. sent_count: 0,
  131. sent_timeout: 0,
  132. incoming_receipt_count: 0,
  133. outgoing_receipt_count: 0,
  134. receipt_timeout: 0,
  135. running_rtt_mean: 0.0,
  136. });
  137. receipt_sender.outgoing_receipt_count += 1;
  138. if rtt > TIMEOUT {
  139. receipt_sender.receipt_timeout += 1;
  140. }
  141. }
  142. _ => (),
  143. }
  144. }
  145. for value in running_values.into_values() {
  146. serializers
  147. .rtt_mean
  148. .serialize_element(&value.running_rtt_mean)
  149. .unwrap_or_else(|e| {
  150. panic!("unable to serialize rtt mean: {:?}", e);
  151. });
  152. serializers
  153. .sent_count
  154. .serialize_element(&value.sent_count)
  155. .unwrap_or_else(|e| {
  156. panic!("unable to serialize rtt count: {:?}", e);
  157. });
  158. if value.incoming_receipt_count != 0 {
  159. serializers
  160. .timeout_by_send
  161. .serialize_element(
  162. &(value.sent_timeout as f64 / value.incoming_receipt_count as f64),
  163. )
  164. .unwrap_or_else(|e| {
  165. panic!("unable to serialize rtt count: {:?}", e);
  166. });
  167. } else {
  168. assert_eq!(value.sent_timeout, value.incoming_receipt_count);
  169. }
  170. if value.outgoing_receipt_count != 0 {
  171. serializers
  172. .timeout_by_receive
  173. .serialize_element(
  174. &(value.receipt_timeout as f64 / value.outgoing_receipt_count as f64),
  175. )
  176. .unwrap_or_else(|e| {
  177. panic!("unable to serialize rtt count: {:?}", e);
  178. });
  179. } else {
  180. assert_eq!(value.receipt_timeout, value.outgoing_receipt_count);
  181. }
  182. }
  183. Ok(())
  184. }
  185. #[derive(Debug, Deserialize)]
  186. struct ConversationConfig {
  187. group: String,
  188. bootstrap: Option<f64>,
  189. }
  190. #[derive(Debug, Deserialize)]
  191. struct UserConfig {
  192. user: String,
  193. bootstrap: f64,
  194. conversations: Vec<ConversationConfig>,
  195. }
  196. fn parse_time(time: &str) -> Result<f64, Box<dyn Error>> {
  197. let suffix = time.as_bytes()[time.len() - 1];
  198. Ok(match suffix {
  199. b's' => time[..time.len() - 1].parse::<f64>()?, // seconds
  200. b'm' => time[..time.len() - 1].parse::<f64>()? * 60.0, // minutes
  201. b'h' => time[..time.len() - 1].parse::<f64>()? * 60.0 * 60.0, // hours
  202. _ => time.parse::<f64>()?, // default is seconds, anything else is an error
  203. })
  204. }
  205. #[derive(Debug, Deserialize)]
  206. struct ShadowGeneralConfig {
  207. bootstrap_end_time: Option<String>,
  208. }
  209. #[derive(Debug, Deserialize)]
  210. #[serde(untagged)]
  211. enum ShadowProcessArgs {
  212. List(Vec<String>),
  213. Command(String),
  214. }
  215. #[derive(Debug, Deserialize)]
  216. struct ShadowProcessConfig {
  217. args: ShadowProcessArgs,
  218. path: String,
  219. start_time: String,
  220. }
  221. #[derive(Debug, Deserialize)]
  222. struct ShadowHostConfig {
  223. processes: Vec<ShadowProcessConfig>,
  224. }
  225. #[derive(Debug, Deserialize)]
  226. struct ShadowConfig {
  227. general: ShadowGeneralConfig,
  228. hosts: HashMap<String, ShadowHostConfig>,
  229. }
  230. fn build_bootstrap_table(path: &str) -> Result<(f64, BootstrapTable), Box<dyn Error>> {
  231. let shadow_config: ShadowConfig =
  232. serde_yaml::from_reader(File::open(format!("{}/shadow.config.yaml", path))?)?;
  233. let bootstrap_end_time =
  234. if let Some(bootstrap_end_time) = shadow_config.general.bootstrap_end_time {
  235. parse_time(&bootstrap_end_time)?
  236. } else {
  237. 0.0
  238. };
  239. let filter_time = bootstrap_end_time + EXPERIMENT_START;
  240. let mut ret = HashMap::new();
  241. for (host_name, host_config) in shadow_config.hosts {
  242. for proc in host_config.processes {
  243. if !proc.path.ends_with("mgen-client") && !proc.path.ends_with("mgen-peer") {
  244. continue;
  245. }
  246. let split_args: Vec<_> = match proc.args {
  247. ShadowProcessArgs::List(commands) => commands,
  248. ShadowProcessArgs::Command(command) => command
  249. .split_ascii_whitespace()
  250. .map(|s| s.to_string())
  251. .collect(),
  252. };
  253. let configs_in_args = split_args
  254. .into_iter()
  255. .filter_map(|arg| {
  256. if arg.contains(".yaml") {
  257. let glob_string =
  258. format!("{}/shadow.data/hosts/{}/{}", path, host_name, arg,);
  259. Some(glob::glob(&glob_string).expect(&glob_string))
  260. } else {
  261. None
  262. }
  263. })
  264. .flatten();
  265. for config in configs_in_args {
  266. let config: UserConfig = serde_yaml::from_reader(File::open(config?)?)?;
  267. for conversation in config.conversations {
  268. let key = (config.user.clone(), conversation.group);
  269. let bootstrap = EXPERIMENT_START
  270. + parse_time(&proc.start_time)?
  271. + conversation.bootstrap.unwrap_or(config.bootstrap);
  272. ret.insert(key, bootstrap);
  273. }
  274. }
  275. }
  276. }
  277. Ok((filter_time, ret))
  278. }
  279. fn core(path: Option<String>) -> Result<(), Box<dyn Error>> {
  280. let path = path.unwrap_or(".".to_string());
  281. let (_filter_time, bootstrap_table) = build_bootstrap_table(&path)?;
  282. // we actually don't set the full bootstrap as bootstrap, so we need to set this manually
  283. let filter_time = EXPERIMENT_START + 20.0 * 60.0;
  284. let rtt_all_file = File::create("rtt_all.mgen.json")?;
  285. let rtt_timeout_file = File::create("rtt_timeout.mgen.json")?;
  286. let rtt_mean_file = File::create("rtt_mean.mgen.json")?;
  287. let sent_count_file = File::create("counts.mgen.json")?;
  288. let timeout_by_send_file = File::create("timeout_by_send.mgen.json")?;
  289. let timeout_by_receive_file = File::create("timeout_by_receive.mgen.json")?;
  290. let mut rtt_all_ser = serde_json::Serializer::new(rtt_all_file);
  291. let mut rtt_timeout_ser = serde_json::Serializer::new(rtt_timeout_file);
  292. let mut rtt_mean_ser = serde_json::Serializer::new(rtt_mean_file);
  293. let mut sent_count_ser = serde_json::Serializer::new(sent_count_file);
  294. let mut timeout_by_send_ser = serde_json::Serializer::new(timeout_by_send_file);
  295. let mut timeout_by_receive_ser = serde_json::Serializer::new(timeout_by_receive_file);
  296. let rtt_all = rtt_all_ser.serialize_seq(None)?;
  297. let rtt_timeout = rtt_timeout_ser.serialize_seq(None)?;
  298. let rtt_mean = rtt_mean_ser.serialize_seq(None)?;
  299. let sent_count = sent_count_ser.serialize_seq(None)?;
  300. let timeout_by_send = timeout_by_send_ser.serialize_seq(None)?;
  301. let timeout_by_receive = timeout_by_receive_ser.serialize_seq(None)?;
  302. let mut serializers = Serializers {
  303. rtt_all,
  304. rtt_timeout,
  305. rtt_mean,
  306. sent_count,
  307. timeout_by_send,
  308. timeout_by_receive,
  309. };
  310. let logs = glob::glob(&format!(
  311. "{}/shadow.data/hosts/*client*/mgen-*.stdout",
  312. path
  313. ))?;
  314. for log in logs {
  315. process_log(&log?, filter_time, &bootstrap_table, &mut serializers)?;
  316. }
  317. serializers.rtt_all.end()?;
  318. serializers.rtt_timeout.end()?;
  319. serializers.rtt_mean.end()?;
  320. serializers.sent_count.end()?;
  321. serializers.timeout_by_send.end()?;
  322. serializers.timeout_by_receive.end()?;
  323. Ok(())
  324. }
  325. fn main() {
  326. let mut args = std::env::args();
  327. let _ = args.next();
  328. let path = args.next();
  329. if let Err(err) = core(path) {
  330. println!("error running core: {}", err);
  331. process::exit(1);
  332. }
  333. }