use http::status::StatusCode; use http_body_util::{BodyExt, Empty}; use hyper::{body::Bytes, Body, Client, Method, Request}; use hyper_util::rt::TokioExecutor; use lazy_static::lazy_static; //use select::{document::Document, predicate::Name}; use serde::{Deserialize, Serialize}; use sled::Db; use std::{ collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet}, fmt, }; use x25519_dalek::{PublicKey, StaticSecret}; #[cfg(any(feature = "simulation", test))] use { chrono::{DateTime, Utc}, julianday::JulianDay, std::{path::Path, time::UNIX_EPOCH}, }; pub mod analysis; pub mod bridge_verification_info; pub mod crypto; pub mod extra_info; pub mod negative_report; pub mod positive_report; pub mod request_handler; #[cfg(feature = "simulation")] pub mod simulation { pub mod bridge; pub mod censor; pub mod config; pub mod extra_infos_server; pub mod user; } #[cfg(test)] pub mod simulation { pub mod extra_infos_server; } use analysis::Analyzer; use extra_info::*; use negative_report::*; use positive_report::*; lazy_static! { // known country codes based on Tor geoIP database // Produced with `cat /usr/share/tor/geoip{,6} | grep -v ^# | grep -o ..$ | sort | uniq | tr '[:upper:]' '[:lower:]' | tr '\n' ',' | sed 's/,/","/g'` pub static ref COUNTRY_CODES: HashSet<&'static str> = HashSet::from(["??","ac","ad","ae","af","ag","ai","al","am","an","ao","ap","aq","ar","as","at","au","aw","ax","az","ba","bb","bd","be","bf","bg","bh","bi","bj","bl","bm","bn","bo","bq","br","bs","bt","bv","bw","by","bz","ca","cc","cd","cf","cg","ch","ci","ck","cl","cm","cn","co","cr","cs","cu","cv","cw","cx","cy","cz","de","dg","dj","dk","dm","do","dz","ea","ec","ee","eg","eh","er","es","et","eu","fi","fj","fk","fm","fo","fr","ga","gb","gd","ge","gf","gg","gh","gi","gl","gm","gn","gp","gq","gr","gs","gt","gu","gw","gy","hk","hm","hn","hr","ht","hu","ic","id","ie","il","im","in","io","iq","ir","is","it","je","jm","jo","jp","ke","kg","kh","ki","km","kn","kp","kr","kw","ky","kz","la","lb","lc","li","lk","lr","ls","lt","lu","lv","ly","ma","mc","md","me","mf","mg","mh","mk","ml","mm","mn","mo","mp","mq","mr","ms","mt","mu","mv","mw","mx","my","mz","na","nc","ne","nf","ng","ni","nl","no","np","nr","nu","nz","om","pa","pe","pf","pg","ph","pk","pl","pm","pn","pr","ps","pt","pw","py","qa","re","ro","rs","ru","rw","sa","sb","sc","sd","se","sg","sh","si","sj","sk","sl","sm","sn","so","sr","ss","st","sv","sx","sy","sz","ta","tc","td","tf","tg","th","tj","tk","tl","tm","tn","to","tr","tt","tv","tw","tz","ua","ug","uk","um","un","us","uy","uz","va","vc","ve","vg","vi","vn","vu","wf","ws","ye","yt","za","zm","zw"]); } /// We will accept reports up to this many days old. pub const MAX_BACKDATE: u32 = 3; #[cfg(any(feature = "simulation", test))] const FAKETIME_FILE: &str = "/tmp/troll-patrol-faketime"; /// Get real or simulated Julian date pub fn get_date() -> u32 { // If this is a simulation, get the simulated date #[cfg(any(feature = "simulation", test))] return get_simulated_date(); // If we're not running a simulation, return today's date #[allow(unreachable_code)] get_real_date() } fn get_real_date() -> u32 { time::OffsetDateTime::now_utc() .date() .to_julian_day() .try_into() .unwrap() } #[cfg(any(feature = "simulation", test))] fn get_simulated_date() -> u32 { faketime::enable(Path::new(FAKETIME_FILE)); JulianDay::from(DateTime::::from(UNIX_EPOCH + faketime::unix_time()).date_naive()) .inner() .try_into() .unwrap() } #[cfg(any(feature = "simulation", test))] pub fn set_simulated_date(date: u32) { faketime::enable(Path::new(FAKETIME_FILE)); let unix_date_ms = DateTime::::from_naive_utc_and_offset( JulianDay::new(date.try_into().unwrap()).to_date().into(), Utc, ) .timestamp_millis(); //str.push_str(format!("\nbridge-stats-end {} 23:59:59 (86400 s)", date).as_str()); faketime::write_millis(Path::new(FAKETIME_FILE), unix_date_ms.try_into().unwrap()).unwrap(); } #[cfg(any(feature = "simulation", test))] pub fn increment_simulated_date() { let date = get_date(); set_simulated_date(date + 1); } #[cfg(any(feature = "simulation", test))] pub fn reset_simulated_date() { set_simulated_date(get_real_date()); } #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] pub enum BridgeDistributor { Lox, } /// All the info for a bridge, to be stored in the database #[derive(Serialize, Deserialize)] pub struct BridgeInfo { /// hashed fingerprint (SHA-1 hash of 20-byte bridge ID) pub fingerprint: [u8; 20], /// nickname of bridge (probably not necessary) pub nickname: String, /// map of countries to data for this bridge in that country pub info_by_country: HashMap, } impl BridgeInfo { pub fn new(fingerprint: [u8; 20], nickname: &String) -> Self { Self { fingerprint, nickname: nickname.to_string(), info_by_country: HashMap::::new(), } } } impl fmt::Display for BridgeInfo { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut str = format!( "fingerprint:{}\n", array_bytes::bytes2hex("", self.fingerprint).as_str() ); str.push_str(format!("nickname: {}\n", self.nickname).as_str()); //str.push_str(format!("first_seen: {}\n", self.first_seen).as_str()); str.push_str("info_by_country:"); for country in self.info_by_country.keys() { str.push_str(format!("\n country: {}", country).as_str()); let country_info = self.info_by_country.get(country).unwrap(); for line in country_info.to_string().lines() { str.push_str(format!("\n {}", line).as_str()); } } write!(f, "{}", str) } } #[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] pub enum BridgeInfoType { BridgeIps, NegativeReports, PositiveReports, } /// Information about bridge reachability from a given country #[derive(Serialize, Deserialize)] pub struct BridgeCountryInfo { pub info_by_day: BTreeMap>, pub blocked: bool, /// first Julian date we saw data from this country for this bridge pub first_seen: u32, /// first Julian date we saw a positive report from this country for this bridge pub first_pr: Option, } impl BridgeCountryInfo { pub fn new(first_seen: u32) -> Self { Self { info_by_day: BTreeMap::>::new(), blocked: false, first_seen, first_pr: None, } } pub fn add_info(&mut self, info_type: BridgeInfoType, date: u32, count: u32) { if let btree_map::Entry::Vacant(e) = self.info_by_day.entry(date) { let mut info = BTreeMap::::new(); info.insert(info_type, count); e.insert(info); } else { let info = self.info_by_day.get_mut(&date).unwrap(); if !info.contains_key(&info_type) { info.insert(info_type, count); } else if info_type == BridgeInfoType::BridgeIps { if *info.get(&info_type).unwrap() < count { // Use highest value we've seen today info.insert(info_type, count); } } else { // Add count to previous count for reports let new_count = info.get(&info_type).unwrap() + count; info.insert(info_type, new_count); } } // If this is the first instance of positive reports, save the date if self.first_pr.is_none() && info_type == BridgeInfoType::PositiveReports && count > 0 { self.first_pr = Some(date); } } } impl fmt::Display for BridgeCountryInfo { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut str = format!("blocked: {}\n", self.blocked); str.push_str(format!("first seen: {}\n", self.first_seen).as_str()); let first_pr = if self.first_pr.is_none() { "never".to_string() } else { self.first_pr.unwrap().to_string() }; str.push_str(format!("first positive report observed: {}\n", first_pr).as_str()); str.push_str("info:"); for date in self.info_by_day.keys() { let info = self.info_by_day.get(date).unwrap(); let ip_count = match info.get(&BridgeInfoType::BridgeIps) { Some(&v) => v, None => 0, }; let nr_count = match info.get(&BridgeInfoType::NegativeReports) { Some(&v) => v, None => 0, }; let pr_count = match info.get(&BridgeInfoType::PositiveReports) { Some(&v) => v, None => 0, }; if ip_count > 0 || nr_count > 0 || pr_count > 0 { str.push_str( format!( "\n date: {}\n connections: {}\n negative reports: {}\n positive reports: {}", date, ip_count, nr_count, pr_count, ) .as_str(), ); } } write!(f, "{}", str) } } /// We store a set of all known bridges so that we can later iterate over them. /// This function just adds a bridge fingerprint to that set. pub fn add_bridge_to_db(db: &Db, fingerprint: [u8; 20]) { let mut bridges = match db.get("bridges").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => HashSet::<[u8; 20]>::new(), }; bridges.insert(fingerprint); db.insert("bridges", bincode::serialize(&bridges).unwrap()) .unwrap(); } // Download a webpage and return it as a string pub async fn download(url: &str) -> Result> { if url.starts_with("https://") { let https = hyper_rustls::HttpsConnectorBuilder::new() .with_native_roots() .expect("no native root CA certificates found") .https_only() .enable_http1() .build(); let client: hyper_util::client::legacy::Client<_, Empty> = hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https); println!("Downloading {}", url); let mut res = client.get(url.parse()?).await?; assert_eq!(res.status(), StatusCode::OK); let mut body_str = String::default(); while let Some(next) = res.frame().await { let frame = next?; if let Some(chunk) = frame.data_ref() { body_str.push_str(&String::from_utf8(chunk.to_vec())?); } } Ok(body_str) } else { let client: hyper_util::client::legacy::Client<_, Empty> = hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build_http(); println!("Downloading {}", url); let mut res = client.get(url.parse()?).await?; assert_eq!(res.status(), StatusCode::OK); let mut body_str = String::default(); while let Some(next) = res.frame().await { let frame = next?; if let Some(chunk) = frame.data_ref() { body_str.push_str(&String::from_utf8(chunk.to_vec())?); } } Ok(body_str) } } // Process extra-infos /// Adds the extra-info data for a single bridge to the database. If the /// database already contains an extra-info for this bridge for thid date, /// but this extra-info contains different data for some reason, use the /// greater count of connections from each country. pub fn add_extra_info_to_db(db: &Db, extra_info: ExtraInfo) { let fingerprint = extra_info.fingerprint; let mut bridge_info = match db.get(fingerprint).unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => { add_bridge_to_db(db, fingerprint); BridgeInfo::new(fingerprint, &extra_info.nickname) } }; for country in extra_info.bridge_ips.keys() { if bridge_info.info_by_country.contains_key::(country) { bridge_info .info_by_country .get_mut(country) .unwrap() .add_info( BridgeInfoType::BridgeIps, extra_info.date, *extra_info.bridge_ips.get(country).unwrap(), ); } else { // No existing entry; make a new one. let mut bridge_country_info = BridgeCountryInfo::new(extra_info.date); bridge_country_info.add_info( BridgeInfoType::BridgeIps, extra_info.date, *extra_info.bridge_ips.get(country).unwrap(), ); bridge_info .info_by_country .insert(country.to_string(), bridge_country_info); } } // Commit changes to database db.insert(fingerprint, bincode::serialize(&bridge_info).unwrap()) .unwrap(); } /// Download new extra-infos files and add their data to the database pub async fn update_extra_infos( db: &Db, base_url: &str, ) -> Result<(), Box> { // Track which files have been processed. This is slightly redundant // because we're only downloading files we don't already have, but it // might be a good idea to check in case we downloaded a file but didn't // process it for some reason. let mut processed_extra_infos_files = match db.get(b"extra_infos_files").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => HashSet::::new(), }; let dir_page = download(base_url).await?; // Causes Send issues, so use solution below instead //let doc = Document::from(dir_page.as_str()); //let links = doc.find(Name("a")).filter_map(|n| n.attr("href")); // Alternative, less robust solution let mut links = HashSet::::new(); for line in dir_page.lines() { let begin_match = ""; if line.contains(begin_match) { let link = &line[line.find(begin_match).unwrap() + begin_match.len()..]; if link.contains(end_match) { let link = &link[0..link.find(end_match).unwrap()]; links.insert(link.to_string()); } } } let mut new_extra_infos = HashSet::::new(); // We should now have an iterable collection of links to consider downloading. for link in links { if link.ends_with("-extra-infos") && !processed_extra_infos_files.contains(&link) { let extra_infos_url = format!("{}{}", base_url, link); let extra_info_str = download(&extra_infos_url).await?; //ExtraInfo::parse_file(&extra_info_str, &mut new_extra_infos); let extra_infos = ExtraInfo::parse_file(&extra_info_str); new_extra_infos.extend(extra_infos); processed_extra_infos_files.insert(link); } } // Add new extra-infos data to database for extra_info in new_extra_infos { add_extra_info_to_db(db, extra_info); } // Store which files we've already downloaded and processed db.insert( b"extra_infos_files", bincode::serialize(&processed_extra_infos_files).unwrap(), ) .unwrap(); Ok(()) } // Process negative reports /// If there is already a negative report ECDH key for this date, return None. /// Otherwise, generate a new keypair, save the secret part in the db, and /// return the public part. pub fn new_negative_report_key(db: &Db, date: u32) -> Option { let mut nr_keys = if !db.contains_key("nr-keys").unwrap() { BTreeMap::::new() } else { match bincode::deserialize(&db.get("nr-keys").unwrap().unwrap()) { Ok(v) => v, Err(_) => BTreeMap::::new(), } }; if let btree_map::Entry::Vacant(_e) = nr_keys.entry(date) { let rng = rand::thread_rng(); let secret = StaticSecret::random_from_rng(rng); let public = PublicKey::from(&secret); nr_keys.insert(date, secret); db.insert("nr-keys", bincode::serialize(&nr_keys).unwrap()) .unwrap(); Some(public) } else { None } } /// If we have a key for the requested day, return the secret part. pub fn get_negative_report_secret_key(db: &Db, date: u32) -> Option { if db.contains_key("nr-keys").unwrap() { let nr_keys: BTreeMap = match bincode::deserialize(&db.get("nr-keys").unwrap().unwrap()) { Ok(map) => map, Err(_) => { return None; } }; if nr_keys.contains_key(&date) { let secret = nr_keys.get(&date).unwrap(); Some(secret.clone()) } else { None } } else { None } } /// If we have a key for the requested day, return the public part. pub fn get_negative_report_public_key(db: &Db, date: u32) -> Option { get_negative_report_secret_key(db, date).map(|secret| PublicKey::from(&secret)) } /// Receive an encrypted negative report. Attempt to decrypt it and if /// successful, add it to the database to be processed later. pub fn handle_encrypted_negative_report(db: &Db, enc_report: EncryptedNegativeReport) { if let Some(secret) = get_negative_report_secret_key(db, enc_report.date) { if let Ok(nr) = enc_report.decrypt(&secret) { save_negative_report_to_process(db, nr); } } } /// We store to-be-processed negative reports as a vector. Add this NR /// to that vector (or create a new vector if necessary) pub fn save_negative_report_to_process(db: &Db, nr: NegativeReport) { // TODO: Purge these database entries sometimes let mut nonces = match db.get(format!("nonces_{}", &nr.date)).unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => HashSet::<[u8; 32]>::new(), }; // Just ignore the report if we've seen the nonce before if nonces.insert(nr.nonce) { db.insert( format!("nonces_{}", &nr.date), bincode::serialize(&nonces).unwrap(), ) .unwrap(); let mut reports = match db.get("nrs-to-process").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::>::new(), }; // Store to-be-processed reports with key [fingerprint]_[country]_[date] let map_key = format!( "{}_{}_{}", array_bytes::bytes2hex("", nr.fingerprint), &nr.country, &nr.date, ); if let btree_map::Entry::Vacant(e) = reports.entry(map_key.clone()) { let nrs = vec![nr.to_serializable_report()]; e.insert(nrs); } else { reports .get_mut(&map_key) .unwrap() .push(nr.to_serializable_report()); } // Commit changes to database db.insert("nrs-to-process", bincode::serialize(&reports).unwrap()) .unwrap(); } } /// Sends a collection of negative reports to the Lox Authority and returns the /// number of valid reports returned by the server. The negative reports in the /// collection should all have the same bridge fingerprint, date, country, and /// distributor. pub async fn verify_negative_reports( distributors: &BTreeMap, reports: &Vec, ) -> u32 { // Don't make a network call if we don't have any reports anyway if reports.is_empty() { return 0; } // Get one report, assume the rest have the same distributor let first_report = &reports[0]; let distributor = first_report.distributor; let client = Client::new(); let uri: String = (distributors.get(&distributor).unwrap().to_owned() + "/verifynegative") .parse() .unwrap(); let req = Request::builder() .method(Method::POST) .uri(uri) .body(Body::from(serde_json::to_string(&reports).unwrap())) .unwrap(); let resp = client.request(req).await.unwrap(); let buf = hyper::body::to_bytes(resp).await.unwrap(); serde_json::from_slice(&buf).unwrap() } /// Process today's negative reports and store the count of verified reports in /// the database. pub async fn update_negative_reports(db: &Db, distributors: &BTreeMap) { let all_negative_reports = match db.get("nrs-to-process").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::>::new(), }; let mut bridges_to_re_evaluate = match db.get("bridges-to-re-evaluate").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => HashMap::::new(), // We map fingerprint:date where date is the earliest date for // which we have new reports }; let today = get_date(); // Key is [fingerprint]_[country]_[date] for bridge_country_date in all_negative_reports.keys() { let reports = all_negative_reports.get(bridge_country_date).unwrap(); if !reports.is_empty() { let first_report = &reports[0]; let fingerprint = first_report.fingerprint; let date = first_report.date; let country = first_report.country.clone(); let count_valid = verify_negative_reports(distributors, reports).await; // If we have new historical data, re-evaluate this bridge if count_valid > 0 && date < today { let fpr_str = array_bytes::bytes2hex("", fingerprint); if bridges_to_re_evaluate.contains_key(&fpr_str) { if *bridges_to_re_evaluate.get(&fpr_str).unwrap() > date { bridges_to_re_evaluate.insert(fpr_str, date); } } else { bridges_to_re_evaluate.insert(fpr_str, date); } } // Get bridge info or make new one let mut bridge_info = match db.get(fingerprint).unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => { // This case shouldn't happen unless the bridge hasn't // published any bridge stats. add_bridge_to_db(db, fingerprint); BridgeInfo::new(fingerprint, &String::default()) } }; // Add the new report count to it if let hash_map::Entry::Vacant(_e) = bridge_info.info_by_country.entry(country.clone()) { // No existing entry; make a new one. let mut bridge_country_info = BridgeCountryInfo::new(date); bridge_country_info.add_info(BridgeInfoType::NegativeReports, date, count_valid); bridge_info .info_by_country .insert(country, bridge_country_info); } else { let bridge_country_info = bridge_info.info_by_country.get_mut(&country).unwrap(); bridge_country_info.add_info(BridgeInfoType::NegativeReports, date, count_valid); } // Commit changes to database db.insert(fingerprint, bincode::serialize(&bridge_info).unwrap()) .unwrap(); } } // Remove the now-processed reports from the database db.insert( "nrs-to-process", bincode::serialize(&BTreeMap::>::new()).unwrap(), ) .unwrap(); // Commit new set of bridges to re-evaluate db.insert( "bridges-to-re-evaluate", bincode::serialize(&bridges_to_re_evaluate).unwrap(), ) .unwrap(); } // Process positive reports /// We store to-be-processed positive reports as a vector. Add this PR /// to that vector (or create a new vector if necessary). pub fn save_positive_report_to_process(db: &Db, pr: PositiveReport) { let mut reports = match db.get("prs-to-process").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::>::new(), }; // Store to-be-processed reports with key [fingerprint]_[country]_[date] let map_key = format!( "{}_{}_{}", array_bytes::bytes2hex("", pr.fingerprint), &pr.country, &pr.date, ); if let btree_map::Entry::Vacant(e) = reports.entry(map_key.clone()) { let prs = vec![pr.to_serializable_report()]; e.insert(prs); } else { reports .get_mut(&map_key) .unwrap() .push(pr.to_serializable_report()); } // Commit changes to database db.insert("prs-to-process", bincode::serialize(&reports).unwrap()) .unwrap(); } /// Sends a collection of positive reports to the Lox Authority and returns the /// number of valid reports returned by the server. The positive reports in the /// collection should all have the same bridge fingerprint, date, and country. pub async fn verify_positive_reports( distributors: &BTreeMap, reports: &Vec, ) -> u32 { // Don't make a network call if we don't have any reports anyway if reports.is_empty() { return 0; } let client = Client::new(); let uri: String = (distributors .get(&BridgeDistributor::Lox) .unwrap() .to_owned() + "/verifypositive") .parse() .unwrap(); let req = Request::builder() .method(Method::POST) .uri(uri) .body(Body::from(serde_json::to_string(&reports).unwrap())) .unwrap(); let resp = client.request(req).await.unwrap(); let buf = hyper::body::to_bytes(resp).await.unwrap(); serde_json::from_slice(&buf).unwrap() } /// Process today's positive reports and store the count of verified reports in /// the database. pub async fn update_positive_reports(db: &Db, distributors: &BTreeMap) { let all_positive_reports = match db.get("prs-to-process").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::>::new(), }; let mut bridges_to_re_evaluate = match db.get("bridges-to-re-evaluate").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => HashMap::::new(), // We map fingerprint:date where date is the earliest date for // which we have new reports }; let today = get_date(); // Key is [fingerprint]_[country]_[date] for bridge_country_date in all_positive_reports.keys() { let reports = all_positive_reports.get(bridge_country_date).unwrap(); if !reports.is_empty() { let first_report = &reports[0]; let fingerprint = first_report.fingerprint; let date = first_report.date; let country = first_report.country.clone(); let count_valid = verify_positive_reports(distributors, reports).await; // If we have new historical data, re-evaluate this bridge if count_valid > 0 && date < today { let fpr_str = array_bytes::bytes2hex("", fingerprint); if bridges_to_re_evaluate.contains_key(&fpr_str) { if *bridges_to_re_evaluate.get(&fpr_str).unwrap() > date { bridges_to_re_evaluate.insert(fpr_str, date); } } else { bridges_to_re_evaluate.insert(fpr_str, date); } } // Get bridge info or make new one let mut bridge_info = match db.get(fingerprint).unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => { // This case shouldn't happen unless the bridge hasn't // published any bridge stats. add_bridge_to_db(db, fingerprint); BridgeInfo::new(fingerprint, &String::default()) } }; // Add the new report count to it if let hash_map::Entry::Vacant(e) = bridge_info.info_by_country.entry(country.clone()) { // No existing entry; make a new one. let mut bridge_country_info = BridgeCountryInfo::new(date); bridge_country_info.add_info(BridgeInfoType::PositiveReports, date, count_valid); e.insert(bridge_country_info); } else { let bridge_country_info = bridge_info.info_by_country.get_mut(&country).unwrap(); bridge_country_info.add_info(BridgeInfoType::PositiveReports, date, count_valid); } // Commit changes to database db.insert(fingerprint, bincode::serialize(&bridge_info).unwrap()) .unwrap(); } } // Remove the now-processed reports from the database db.insert( "prs-to-process", bincode::serialize(&BTreeMap::>::new()).unwrap(), ) .unwrap(); // Commit new set of bridges to re-evaluate db.insert( "bridges-to-re-evaluate", bincode::serialize(&bridges_to_re_evaluate).unwrap(), ) .unwrap(); } // Verdict on bridge reachability /// Guess which countries block a bridge. This function returns a map of new /// blockages (fingerprint : set of countries which block the bridge) pub fn guess_blockages( db: &Db, analyzer: &dyn Analyzer, confidence: f64, min_historical_days: u32, max_historical_days: u32, ) -> HashMap<[u8; 20], HashSet> { // Map of bridge fingerprint to set of countries which newly block it let mut blockages = HashMap::<[u8; 20], HashSet>::new(); // Get list of bridges from database let bridges = match db.get("bridges").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => HashSet::<[u8; 20]>::new(), }; // Get list of bridges with historical data to re-evaluate let bridges_to_re_evaluate = match db.get("bridges-to-re-evaluate").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => HashMap::::new(), }; // Guess for each bridge for fingerprint in bridges { let today = get_date(); let mut bridge_info: BridgeInfo = bincode::deserialize(&db.get(fingerprint).unwrap().unwrap()).unwrap(); let mut new_blockages = HashSet::::new(); let fpr_str = array_bytes::bytes2hex("", fingerprint); let first_date = if bridges_to_re_evaluate.contains_key(&fpr_str) { *bridges_to_re_evaluate.get(&fpr_str).unwrap() } else { today }; // Re-evaluate the last days from first_date to today. // (This approach is still suboptimal because we re-evaluate for // countries that don't have new reports.) for i in first_date..=today { let blocked_in = analysis::blocked_in( analyzer, &bridge_info, confidence, i, min_historical_days, max_historical_days, ); for country in blocked_in { let bridge_country_info = bridge_info.info_by_country.get_mut(&country).unwrap(); if !bridge_country_info.blocked { new_blockages.insert(country.to_string()); // Mark bridge as blocked when db gets updated bridge_country_info.blocked = true; } } } blockages.insert(fingerprint, new_blockages); // Commit changes to database db.insert(fingerprint, bincode::serialize(&bridge_info).unwrap()) .unwrap(); } // Remove all bridges to re-evaluate from DB db.insert( "bridges-to-re-evaluate", bincode::serialize(&HashMap::::new()).unwrap(), ) .unwrap(); // Return map of new blockages blockages } /// Report blocked bridges to bridge distributor pub async fn report_blockages( distributors: &BTreeMap, blockages: HashMap<[u8; 20], HashSet>, ) { // For now, only report to Lox // TODO: Support more distributors let uri: String = (distributors .get(&BridgeDistributor::Lox) .unwrap() .to_owned() + "/reportblocked") .parse() .unwrap(); // Convert map keys from [u8; 20] to 40-character hex strings let mut blockages_str = HashMap::>::new(); for (fingerprint, countries) in blockages { let fpr_string = array_bytes::bytes2hex("", fingerprint); if !countries.is_empty() { blockages_str.insert(fpr_string, countries); } } if !blockages_str.is_empty() { // Report blocked bridges to bridge distributor let client = Client::new(); let req = Request::builder() .method(Method::POST) .uri(uri) .body(Body::from(serde_json::to_string(&blockages_str).unwrap())) .unwrap(); let resp = client.request(req).await.unwrap(); let buf = hyper::body::to_bytes(resp).await.unwrap(); let resp_str = String::from_utf8(buf.to_vec()).unwrap(); assert_eq!("OK", resp_str); } } // Unit tests #[cfg(test)] mod tests;