main.rs 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. use std::collections::HashMap;
  2. use std::{error::Error, fs::File, path::Path, process};
  3. use serde::{ser::SerializeSeq, Deserialize, Serializer};
  4. #[derive(Debug, Deserialize)]
  5. struct Record {
  6. _hr_time: String,
  7. time: f64, // FIXME: use a better type, either a real datetime or our own two ints
  8. user: String,
  9. group: String,
  10. action_data: Vec<String>,
  11. }
  12. struct RunningValue {
  13. sent_count: u32,
  14. receipt_count: u32,
  15. running_rtt_mean: f64,
  16. }
  17. type BootstrapTable = HashMap<(String, String), f64>;
  18. struct Serializers<T: SerializeSeq> {
  19. rtt_all: T,
  20. rtt_mean: T,
  21. rtt_count: T,
  22. }
  23. fn process_log<T>(
  24. file: &Path,
  25. filter_time: f64,
  26. bootstrap_table: &BootstrapTable,
  27. serializers: &mut Serializers<T>,
  28. ) -> Result<(), Box<dyn Error>>
  29. where
  30. T: SerializeSeq,
  31. {
  32. let mut sent_times: HashMap<(String, String, u32), f64> = HashMap::new();
  33. let mut running_values: HashMap<(String, String), RunningValue> = HashMap::new();
  34. let file = File::open(file)?;
  35. let mut rdr = csv::ReaderBuilder::new()
  36. .has_headers(false)
  37. .flexible(true)
  38. .from_reader(file);
  39. for result in rdr.deserialize() {
  40. let record: Record = match result {
  41. Ok(record) => record,
  42. Err(e) => {
  43. eprintln!("bad record: {:?}", e);
  44. continue;
  45. }
  46. };
  47. if record.time <= filter_time {
  48. continue;
  49. }
  50. match record.action_data[0].as_str() {
  51. "send" => {
  52. let id = record.action_data[4].parse()?;
  53. sent_times.insert((record.user.clone(), record.group.clone(), id), record.time);
  54. running_values
  55. .entry((record.user, record.group))
  56. .and_modify(|running_value| running_value.sent_count += 1)
  57. .or_insert(RunningValue {
  58. sent_count: 1,
  59. receipt_count: 0,
  60. running_rtt_mean: 0.0,
  61. });
  62. }
  63. "receive" => {
  64. if record.action_data[4] != "receipt" {
  65. continue;
  66. }
  67. let sender = &record.action_data[3];
  68. let id = record.action_data[5].parse()?;
  69. let key = (sender.to_string(), record.group);
  70. let bootstrap_time = bootstrap_table.get(&key).unwrap_or_else(|| {
  71. panic!("could not find key {:?}", key);
  72. });
  73. let key = (record.user, key.1, id);
  74. let Some(sent_time) = sent_times.get(&key) else {
  75. // this should never happen in the client-server case,
  76. // but we filter out early conversation in the p2p case
  77. //eprintln!("receipt for unknown message: {:?}", key);
  78. //panic!();
  79. continue;
  80. };
  81. if bootstrap_time > sent_time {
  82. // the message was sent while the recipient was still bootstrapping,
  83. // don't count its receipt towards the rtt stats
  84. continue;
  85. }
  86. let rtt: f64 = record.time - sent_time;
  87. serializers
  88. .rtt_all
  89. .serialize_element(&rtt)
  90. .unwrap_or_else(|e| {
  91. panic!("unable to serialize rtt: {:?}", e);
  92. });
  93. let key = (key.0, key.1);
  94. running_values.entry(key).and_modify(|running_value| {
  95. running_value.receipt_count += 1;
  96. running_value.running_rtt_mean = running_value.running_rtt_mean
  97. + (rtt - running_value.running_rtt_mean)
  98. / (running_value.receipt_count as f64);
  99. });
  100. }
  101. _ => (),
  102. }
  103. }
  104. for value in running_values.into_values() {
  105. serializers
  106. .rtt_mean
  107. .serialize_element(&value.running_rtt_mean)
  108. .unwrap_or_else(|e| {
  109. panic!("unable to serialize rtt mean: {:?}", e);
  110. });
  111. serializers
  112. .rtt_count
  113. .serialize_element(&value.sent_count)
  114. .unwrap_or_else(|e| {
  115. panic!("unable to serialize rtt count: {:?}", e);
  116. });
  117. }
  118. Ok(())
  119. }
  120. #[derive(Debug, Deserialize)]
  121. struct ConversationConfig {
  122. group: String,
  123. bootstrap: Option<f64>,
  124. }
  125. #[derive(Debug, Deserialize)]
  126. struct Config {
  127. user: String,
  128. bootstrap: f64,
  129. conversations: Vec<ConversationConfig>,
  130. }
  131. fn build_bootstrap_table(path: &str) -> Result<BootstrapTable, Box<dyn Error>> {
  132. let mut ret = HashMap::new();
  133. let configs = glob::glob(&format!("{}/shadow.data/hosts/*/user*.yaml", path))?;
  134. for config in configs {
  135. let config = config?;
  136. println!("{:?}", config);
  137. let config: Config = serde_yaml::from_reader(File::open(config)?)?;
  138. for conversation in config.conversations {
  139. let key = (config.user.clone(), conversation.group);
  140. let bootstrap = conversation.bootstrap.unwrap_or(config.bootstrap);
  141. ret.insert(key, bootstrap);
  142. }
  143. }
  144. Ok(ret)
  145. }
  146. fn core(path: Option<String>) -> Result<(), Box<dyn Error>> {
  147. // FIXME: turn these all into clap args
  148. let filter_until_minute = 20.0;
  149. let path = path.unwrap_or(".".to_string());
  150. let bootstrap_table = build_bootstrap_table(&path)?;
  151. let filter_time: f64 = 946684800.0 + filter_until_minute * 60.0;
  152. let rtt_all_file = File::create("rtt_all.mgen.json")?;
  153. let rtt_mean_file = File::create("rtt_mean.mgen.json")?;
  154. let rtt_count_file = File::create("counts.mgen.json")?;
  155. let mut rtt_all_ser = serde_json::Serializer::new(rtt_all_file);
  156. let mut rtt_mean_ser = serde_json::Serializer::new(rtt_mean_file);
  157. let mut rtt_count_ser = serde_json::Serializer::new(rtt_count_file);
  158. let rtt_all = rtt_all_ser.serialize_seq(None)?;
  159. let rtt_mean = rtt_mean_ser.serialize_seq(None)?;
  160. let rtt_count = rtt_count_ser.serialize_seq(None)?;
  161. let mut serializers = Serializers {
  162. rtt_all,
  163. rtt_mean,
  164. rtt_count,
  165. };
  166. let logs = glob::glob(&format!(
  167. "{}/shadow.data/hosts/*client*/mgen-*.stdout",
  168. path
  169. ))?;
  170. for log in logs {
  171. process_log(&log?, filter_time, &bootstrap_table, &mut serializers)?;
  172. }
  173. serializers.rtt_all.end()?;
  174. serializers.rtt_mean.end()?;
  175. serializers.rtt_count.end()?;
  176. Ok(())
  177. }
  178. fn main() {
  179. let mut args = std::env::args();
  180. let _ = args.next();
  181. let path = args.next();
  182. if let Err(err) = core(path) {
  183. println!("error running example: {}", err);
  184. process::exit(1);
  185. }
  186. }