use http::status::StatusCode; use http_body_util::{BodyExt, Empty}; use hyper::{body::Bytes, Body, Client, Method, Request}; use hyper_util::rt::TokioExecutor; use lazy_static::lazy_static; //use select::{document::Document, predicate::Name}; use serde::{Deserialize, Serialize}; use sled::Db; use std::{ collections::{BTreeMap, HashMap, HashSet}, fmt, }; use x25519_dalek::{PublicKey, StaticSecret}; pub mod analysis; pub mod bridge_verification_info; pub mod crypto; pub mod extra_info; pub mod negative_report; pub mod positive_report; pub mod request_handler; use analysis::Analyzer; use extra_info::*; use negative_report::*; use positive_report::*; lazy_static! { // known country codes based on Tor geoIP database // Produced with `cat /usr/share/tor/geoip{,6} | grep -v ^# | grep -o ..$ | sort | uniq | tr '[:upper:]' '[:lower:]' | tr '\n' ',' | sed 's/,/","/g'` pub static ref COUNTRY_CODES: HashSet<&'static str> = HashSet::from(["??","ad","ae","af","ag","ai","al","am","ao","ap","aq","ar","as","at","au","aw","ax","az","ba","bb","bd","be","bf","bg","bh","bi","bj","bl","bm","bn","bo","bq","br","bs","bt","bv","bw","by","bz","ca","cc","cd","cf","cg","ch","ci","ck","cl","cm","cn","co","cr","cs","cu","cv","cw","cx","cy","cz","de","dj","dk","dm","do","dz","ec","ee","eg","eh","er","es","et","eu","fi","fj","fk","fm","fo","fr","ga","gb","gd","ge","gf","gg","gh","gi","gl","gm","gn","gp","gq","gr","gs","gt","gu","gw","gy","hk","hm","hn","hr","ht","hu","id","ie","il","im","in","io","iq","ir","is","it","je","jm","jo","jp","ke","kg","kh","ki","km","kn","kp","kr","kw","ky","kz","la","lb","lc","li","lk","lr","ls","lt","lu","lv","ly","ma","mc","md","me","mf","mg","mh","mk","ml","mm","mn","mo","mp","mq","mr","ms","mt","mu","mv","mw","mx","my","mz","na","nc","ne","nf","ng","ni","nl","no","np","nr","nu","nz","om","pa","pe","pf","pg","ph","pk","pl","pm","pn","pr","ps","pt","pw","py","qa","re","ro","rs","ru","rw","sa","sb","sc","sd","se","sg","sh","si","sj","sk","sl","sm","sn","so","sr","ss","st","sv","sx","sy","sz","tc","td","tf","tg","th","tj","tk","tl","tm","tn","to","tr","tt","tv","tw","tz","ua","ug","um","us","uy","uz","va","vc","ve","vg","vi","vn","vu","wf","ws","ye","yt","za","zm","zw"]); } /// We will accept reports up to this many days old. pub const MAX_BACKDATE: u32 = 3; /// Get Julian date pub fn get_date() -> u32 { time::OffsetDateTime::now_utc() .date() .to_julian_day() .try_into() .unwrap() } #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] pub enum BridgeDistributor { Lox, } /// All the info for a bridge, to be stored in the database #[derive(Serialize, Deserialize)] pub struct BridgeInfo { /// hashed fingerprint (SHA-1 hash of 20-byte bridge ID) pub fingerprint: [u8; 20], /// nickname of bridge (probably not necessary) pub nickname: String, /// first Julian date we started collecting data on this bridge pub first_seen: u32, /// map of countries to data for this bridge in that country pub info_by_country: HashMap, } impl BridgeInfo { pub fn new(fingerprint: [u8; 20], nickname: &String) -> Self { Self { fingerprint: fingerprint, nickname: nickname.to_string(), first_seen: get_date(), info_by_country: HashMap::::new(), } } } impl fmt::Display for BridgeInfo { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut str = format!( "fingerprint:{}\n", array_bytes::bytes2hex("", self.fingerprint).as_str() ); str.push_str(format!("nickname: {}\n", self.nickname).as_str()); str.push_str(format!("first_seen: {}\n", self.first_seen).as_str()); str.push_str("info_by_country:"); for country in self.info_by_country.keys() { str.push_str(format!("\n country: {}", country).as_str()); let country_info = self.info_by_country.get(country).unwrap(); for line in country_info.to_string().lines() { str.push_str(format!("\n {}", line).as_str()); } } write!(f, "{}", str) } } #[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] pub enum BridgeInfoType { BridgeIps, NegativeReports, PositiveReports, } /// Information about bridge reachability from a given country #[derive(Serialize, Deserialize)] pub struct BridgeCountryInfo { pub info_by_day: BTreeMap>, pub blocked: bool, } impl BridgeCountryInfo { pub fn new() -> Self { Self { info_by_day: BTreeMap::>::new(), blocked: false, } } pub fn add_info(&mut self, info_type: BridgeInfoType, date: u32, count: u32) { if self.info_by_day.contains_key(&date) { let info = self.info_by_day.get_mut(&date).unwrap(); if !info.contains_key(&info_type) { info.insert(info_type, count); } else if info_type == BridgeInfoType::BridgeIps { if *info.get(&info_type).unwrap() < count { // Use highest value we've seen today info.insert(info_type, count); } } else { // Add count to previous count for reports let new_count = info.get(&info_type).unwrap() + count; info.insert(info_type, new_count); } } else { let mut info = BTreeMap::::new(); info.insert(info_type, count); self.info_by_day.insert(date, info); } } } impl fmt::Display for BridgeCountryInfo { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut str = String::from("info:"); for date in self.info_by_day.keys() { let info = self.info_by_day.get(date).unwrap(); let ip_count = match info.get(&BridgeInfoType::BridgeIps) { Some(v) => v, None => &0, }; let nr_count = match info.get(&BridgeInfoType::NegativeReports) { Some(v) => v, None => &0, }; let pr_count = match info.get(&BridgeInfoType::PositiveReports) { Some(v) => v, None => &0, }; if ip_count > &0 || nr_count > &0 || pr_count > &0 { str.push_str( format!( "\n date: {}\n connections: {}\n negative reports: {}\n positive reports: {}", date, ip_count, nr_count, pr_count, ) .as_str(), ); } } write!(f, "{}", str) } } /// We store a set of all known bridges so that we can later iterate over them. /// This function just adds a bridge fingerprint to that set. pub fn add_bridge_to_db(db: &Db, fingerprint: [u8; 20]) { let mut bridges = match db.get("bridges").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => HashSet::<[u8; 20]>::new(), }; bridges.insert(fingerprint); db.insert("bridges", bincode::serialize(&bridges).unwrap()) .unwrap(); } // Download a webpage and return it as a string pub async fn download(url: &str) -> Result> { let https = hyper_rustls::HttpsConnectorBuilder::new() .with_native_roots() .expect("no native root CA certificates found") .https_only() .enable_http1() .build(); let client: hyper_util::client::legacy::Client<_, Empty> = hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https); println!("Downloading {}", url); let mut res = client.get(url.parse()?).await?; assert_eq!(res.status(), StatusCode::OK); let mut body_str = String::default(); while let Some(next) = res.frame().await { let frame = next?; if let Some(chunk) = frame.data_ref() { body_str.push_str(&String::from_utf8(chunk.to_vec())?); } } Ok(body_str) } // Process extra-infos /// Adds the extra-info data for a single bridge to the database. If the /// database already contains an extra-info for this bridge for thid date, /// but this extra-info contains different data for some reason, use the /// greater count of connections from each country. pub fn add_extra_info_to_db(db: &Db, extra_info: ExtraInfo) { let fingerprint = extra_info.fingerprint; let mut bridge_info = match db.get(fingerprint).unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => { add_bridge_to_db(&db, fingerprint); BridgeInfo::new(fingerprint, &extra_info.nickname) } }; for country in extra_info.bridge_ips.keys() { if bridge_info.info_by_country.contains_key::(country) { bridge_info .info_by_country .get_mut(country) .unwrap() .add_info( BridgeInfoType::BridgeIps, extra_info.date, *extra_info.bridge_ips.get(country).unwrap(), ); } else { // No existing entry; make a new one. let mut bridge_country_info = BridgeCountryInfo::new(); bridge_country_info.add_info( BridgeInfoType::BridgeIps, extra_info.date, *extra_info.bridge_ips.get(country).unwrap(), ); bridge_info .info_by_country .insert(country.to_string(), bridge_country_info); } } // Commit changes to database db.insert(fingerprint, bincode::serialize(&bridge_info).unwrap()) .unwrap(); } /// Download new extra-infos files and add their data to the database pub async fn update_extra_infos( db: &Db, base_url: &str, ) -> Result<(), Box> { // Track which files have been processed. This is slightly redundant // because we're only downloading files we don't already have, but it // might be a good idea to check in case we downloaded a file but didn't // process it for some reason. let mut processed_extra_infos_files = match db.get(b"extra_infos_files").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => HashSet::::new(), }; let dir_page = download(base_url).await?; // Causes Send issues, so use solution below instead //let doc = Document::from(dir_page.as_str()); //let links = doc.find(Name("a")).filter_map(|n| n.attr("href")); // Alternative, less robust solution let mut links = HashSet::::new(); for line in dir_page.lines() { let begin_match = ""; if line.contains(begin_match) { let link = &line[line.find(begin_match).unwrap() + begin_match.len()..]; if link.contains(end_match) { let link = &link[0..link.find(end_match).unwrap()]; links.insert(link.to_string()); } } } let mut new_extra_infos = HashSet::::new(); // We should now have an iterable collection of links to consider downloading. for link in links { if link.ends_with("-extra-infos") && !processed_extra_infos_files.contains(&link) { let extra_infos_url = format!("{}{}", base_url, link); let extra_info_str = download(&extra_infos_url).await?; //ExtraInfo::parse_file(&extra_info_str, &mut new_extra_infos); let extra_infos = ExtraInfo::parse_file(&extra_info_str); new_extra_infos.extend(extra_infos); processed_extra_infos_files.insert(link); } } // Add new extra-infos data to database for extra_info in new_extra_infos { add_extra_info_to_db(&db, extra_info); } // Store which files we've already downloaded and processed db.insert( b"extra_infos_files", bincode::serialize(&processed_extra_infos_files).unwrap(), ) .unwrap(); Ok(()) } // Process negative reports /// If there is already a negative report ECDH key for this date, return None. /// Otherwise, generate a new keypair, save the secret part in the db, and /// return the public part. pub fn new_negative_report_key(db: &Db, date: u32) -> Option { let mut nr_keys = if !db.contains_key("nr-keys").unwrap() { BTreeMap::::new() } else { match bincode::deserialize(&db.get("nr-keys").unwrap().unwrap()) { Ok(v) => v, Err(_) => BTreeMap::::new(), } }; if nr_keys.contains_key(&date) { None } else { let mut rng = rand::thread_rng(); let secret = StaticSecret::random_from_rng(&mut rng); let public = PublicKey::from(&secret); nr_keys.insert(date, secret); db.insert("nr-keys", bincode::serialize(&nr_keys).unwrap()) .unwrap(); Some(public) } } /// Receive an encrypted negative report. Attempt to decrypt it and if /// successful, add it to the database to be processed later. pub fn handle_encrypted_negative_report(db: &Db, enc_report: EncryptedNegativeReport) { if db.contains_key("nr-keys").unwrap() { let nr_keys: BTreeMap = match bincode::deserialize(&db.get("nr-keys").unwrap().unwrap()) { Ok(map) => map, Err(_) => { return; } }; if nr_keys.contains_key(&enc_report.date) { let secret = nr_keys.get(&enc_report.date).unwrap(); let nr = match enc_report.decrypt(&secret) { Ok(nr) => nr, Err(_) => { return; } }; save_negative_report_to_process(&db, nr); } } } /// We store to-be-processed negative reports as a vector. Add this NR /// to that vector (or create a new vector if necessary) pub fn save_negative_report_to_process(db: &Db, nr: NegativeReport) { // TODO: Purge these database entries sometimes let mut nonces = match db.get(format!("nonces_{}", &nr.date)).unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => HashSet::<[u8; 32]>::new(), }; // Just ignore the report if we've seen the nonce before if nonces.insert(nr.nonce) { db.insert( format!("nonces_{}", &nr.date), bincode::serialize(&nonces).unwrap(), ) .unwrap(); let mut reports = match db.get("nrs-to-process").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::>::new(), }; // Store to-be-processed reports with key [fingerprint]_[country]_[date] let map_key = format!( "{}_{}_{}", array_bytes::bytes2hex("", &nr.fingerprint), &nr.country, &nr.date, ); if reports.contains_key(&map_key) { reports .get_mut(&map_key) .unwrap() .push(nr.to_serializable_report()); } else { let mut nrs = Vec::::new(); nrs.push(nr.to_serializable_report()); reports.insert(map_key, nrs); } // Commit changes to database db.insert("nrs-to-process", bincode::serialize(&reports).unwrap()) .unwrap(); } } /// Sends a collection of negative reports to the Lox Authority and returns the /// number of valid reports returned by the server. The negative reports in the /// collection should all have the same bridge fingerprint, date, country, and /// distributor. pub async fn verify_negative_reports( distributors: &BTreeMap, reports: &Vec, ) -> u32 { // Don't make a network call if we don't have any reports anyway if reports.is_empty() { return 0; } // Get one report, assume the rest have the same distributor let first_report = &reports[0]; let distributor = first_report.distributor; let client = Client::new(); let uri: String = (distributors.get(&distributor).unwrap().to_owned() + "/verifynegative") .parse() .unwrap(); let req = Request::builder() .method(Method::POST) .uri(uri) .body(Body::from(serde_json::to_string(&reports).unwrap())) .unwrap(); let resp = client.request(req).await.unwrap(); let buf = hyper::body::to_bytes(resp).await.unwrap(); serde_json::from_slice(&buf).unwrap() } /// Process today's negative reports and store the count of verified reports in /// the database. pub async fn update_negative_reports(db: &Db, distributors: &BTreeMap) { let mut all_negative_reports = match db.get("nrs-to-process").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::>::new(), }; // Key is [fingerprint]_[country]_[date] for bridge_country_date in all_negative_reports.keys() { let reports = all_negative_reports.get(bridge_country_date).unwrap(); if !reports.is_empty() { let first_report = &reports[0]; let fingerprint = first_report.fingerprint; let date = first_report.date; let country = first_report.country.clone(); let count_valid = verify_negative_reports(&distributors, reports).await; // Get bridge info or make new one let mut bridge_info = match db.get(fingerprint).unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => { // This case shouldn't happen unless the bridge hasn't // published any bridge stats. add_bridge_to_db(&db, fingerprint); BridgeInfo::new(fingerprint, &String::default()) } }; // Add the new report count to it if bridge_info.info_by_country.contains_key(&country) { let bridge_country_info = bridge_info.info_by_country.get_mut(&country).unwrap(); bridge_country_info.add_info(BridgeInfoType::NegativeReports, date, count_valid); } else { // No existing entry; make a new one. let mut bridge_country_info = BridgeCountryInfo::new(); bridge_country_info.add_info(BridgeInfoType::NegativeReports, date, count_valid); bridge_info .info_by_country .insert(country, bridge_country_info); } // Commit changes to database db.insert(fingerprint, bincode::serialize(&bridge_info).unwrap()) .unwrap(); } } // TODO: Would it be cheaper to just recreate it? all_negative_reports.clear(); // Remove the now-processed reports from the database db.insert( "nrs-to-process", bincode::serialize(&all_negative_reports).unwrap(), ) .unwrap(); } // Process positive reports /// We store to-be-processed positive reports as a vector. Add this PR /// to that vector (or create a new vector if necessary). pub fn save_positive_report_to_process(db: &Db, pr: PositiveReport) { let mut reports = match db.get("prs-to-process").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::>::new(), }; // Store to-be-processed reports with key [fingerprint]_[country]_[date] let map_key = format!( "{}_{}_{}", array_bytes::bytes2hex("", &pr.fingerprint), &pr.country, &pr.date, ); if reports.contains_key(&map_key) { reports .get_mut(&map_key) .unwrap() .push(pr.to_serializable_report()); } else { let mut prs = Vec::::new(); prs.push(pr.to_serializable_report()); reports.insert(map_key, prs); } // Commit changes to database db.insert("prs-to-process", bincode::serialize(&reports).unwrap()) .unwrap(); } /// Sends a collection of positive reports to the Lox Authority and returns the /// number of valid reports returned by the server. The positive reports in the /// collection should all have the same bridge fingerprint, date, and country. pub async fn verify_positive_reports( distributors: &BTreeMap, reports: &Vec, ) -> u32 { // Don't make a network call if we don't have any reports anyway if reports.is_empty() { return 0; } let client = Client::new(); let uri: String = (distributors .get(&BridgeDistributor::Lox) .unwrap() .to_owned() + "/verifypositive") .parse() .unwrap(); let req = Request::builder() .method(Method::POST) .uri(uri) .body(Body::from(serde_json::to_string(&reports).unwrap())) .unwrap(); let resp = client.request(req).await.unwrap(); let buf = hyper::body::to_bytes(resp).await.unwrap(); serde_json::from_slice(&buf).unwrap() } /// Process today's positive reports and store the count of verified reports in /// the database. pub async fn update_positive_reports(db: &Db, distributors: &BTreeMap) { let mut all_positive_reports = match db.get("prs-to-process").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::>::new(), }; // Key is [fingerprint]_[country]_[date] for bridge_country_date in all_positive_reports.keys() { let reports = all_positive_reports.get(bridge_country_date).unwrap(); if !reports.is_empty() { let first_report = &reports[0]; let fingerprint = first_report.fingerprint; let date = first_report.date; let country = first_report.country.clone(); let count_valid = verify_positive_reports(&distributors, reports).await; // Get bridge info or make new one let mut bridge_info = match db.get(fingerprint).unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => { // This case shouldn't happen unless the bridge hasn't // published any bridge stats. add_bridge_to_db(&db, fingerprint); BridgeInfo::new(fingerprint, &String::default()) } }; // Add the new report count to it if bridge_info.info_by_country.contains_key(&country) { let bridge_country_info = bridge_info.info_by_country.get_mut(&country).unwrap(); bridge_country_info.add_info(BridgeInfoType::PositiveReports, date, count_valid); } else { // No existing entry; make a new one. let mut bridge_country_info = BridgeCountryInfo::new(); bridge_country_info.add_info(BridgeInfoType::PositiveReports, date, count_valid); bridge_info .info_by_country .insert(country, bridge_country_info); } // Commit changes to database db.insert(fingerprint, bincode::serialize(&bridge_info).unwrap()) .unwrap(); } } // TODO: Would it be cheaper to just recreate it? all_positive_reports.clear(); // Remove the now-processed reports from the database db.insert( "prs-to-process", bincode::serialize(&all_positive_reports).unwrap(), ) .unwrap(); } // Verdict on bridge reachability /// Guess which countries block a bridge. This function returns a map of new /// blockages (fingerprint : set of countries which block the bridge) pub fn guess_blockages( db: &Db, analyzer: &dyn Analyzer, confidence: f64, ) -> HashMap<[u8; 20], HashSet> { // Map of bridge fingerprint to set of countries which newly block it let mut blockages = HashMap::<[u8; 20], HashSet>::new(); // Get list of bridges from database let bridges = match db.get("bridges").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => HashSet::<[u8; 20]>::new(), }; // Guess for each bridge for fingerprint in bridges { let mut bridge_info: BridgeInfo = bincode::deserialize(&db.get(fingerprint).unwrap().unwrap()).unwrap(); let mut new_blockages = HashSet::::new(); let blocked_in = analysis::blocked_in(analyzer, &bridge_info, confidence, get_date()); for country in blocked_in { let bridge_country_info = bridge_info.info_by_country.get_mut(&country).unwrap(); if !bridge_country_info.blocked { new_blockages.insert(country.to_string()); // Mark bridge as blocked when db gets updated bridge_country_info.blocked = true; } } blockages.insert(fingerprint, new_blockages); // Commit changes to database db.insert(fingerprint, bincode::serialize(&bridge_info).unwrap()) .unwrap(); } // Return map of new blockages blockages } /// Report blocked bridges to bridge distributor pub async fn report_blockages( distributors: &BTreeMap, blockages: HashMap<[u8; 20], HashSet>, ) { // For now, only report to Lox // TODO: Support more distributors let uri: String = (distributors .get(&BridgeDistributor::Lox) .unwrap() .to_owned() + "/reportblocked") .parse() .unwrap(); // Convert map keys from [u8; 20] to 40-character hex strings let mut blockages_str = HashMap::>::new(); for (fingerprint, countries) in blockages { let fpr_string = array_bytes::bytes2hex("", fingerprint); blockages_str.insert(fpr_string, countries); } // Report blocked bridges to bridge distributor let client = Client::new(); let req = Request::builder() .method(Method::POST) .uri(uri) .body(Body::from(serde_json::to_string(&blockages_str).unwrap())) .unwrap(); let resp = client.request(req).await.unwrap(); let buf = hyper::body::to_bytes(resp).await.unwrap(); let resp_str: String = serde_json::from_slice(&buf).unwrap(); assert_eq!("OK", resp_str); } // Unit tests #[cfg(test)] mod tests;