Forráskód Böngészése

Add analyzer which evaluates data as multivariate normal distribution

Vecna 1 hónapja
szülő
commit
f2626eb800
3 módosított fájl, 282 hozzáadás és 6 törlés
  1. 2 0
      Cargo.toml
  2. 248 3
      src/analyzer.rs
  3. 32 3
      src/bin/server.rs

+ 2 - 0
Cargo.toml

@@ -21,6 +21,7 @@ hyper-util = { version = "0.1", features = ["full"] }
 julianday = "1.2.0"
 lazy_static = "1"
 lox-library = { git = "https://gitlab.torproject.org/vecna/lox.git", version = "0.1.0" }
+nalgebra = "0.29"
 rand = { version = "0.8" }
 #select = "0.6.0"
 serde = "1.0.197"
@@ -29,6 +30,7 @@ serde_with = {version = "3.7.0", features = ["json"]}
 sha1 = "0.10"
 sha3 = "0.10"
 sled = "0.34.7"
+statrs = "0.16"
 time = "0.3.30"
 tokio = { version = "1", features = ["full"] }
 tokio-cron = "0.1.2"

+ 248 - 3
src/analyzer.rs

@@ -1,20 +1,265 @@
-use crate::BridgeInfo;
-use std::collections::HashSet;
+use crate::{get_date, BridgeInfo, BridgeInfoType};
+use lox_library::proto::trust_promotion::UNTRUSTED_INTERVAL;
+use nalgebra::DVector;
+use statrs::distribution::{Continuous, MultivariateNormal};
+use std::collections::{BTreeMap, HashSet};
 
 /// Provides a function for predicting which countries block this bridge
 pub trait Analyzer {
     fn blocked_in(&self, bridge_info: &BridgeInfo, confidence: f64) -> HashSet<String>;
 }
 
+/// Dummy example that just tells us about blockages we already know about
 pub struct ExampleAnalyzer {}
 
-/// Dummy example which just tells us about blockages we already know about
 impl Analyzer for ExampleAnalyzer {
+    fn blocked_in(&self, bridge_info: &BridgeInfo, _confidence: f64) -> HashSet<String> {
+        let mut blocked_in = HashSet::<String>::new();
+        for (country, info) in &bridge_info.info_by_country {
+            if info.blocked {
+                blocked_in.insert(country.to_string());
+            }
+        }
+        blocked_in
+    }
+}
+
+/// Model data as multivariate normal distribution
+pub struct NormalAnalyzer {
+    max_threshold: u32,
+    scaling_factor: f64,
+}
+
+impl NormalAnalyzer {
+    pub fn new(max_threshold: u32, scaling_factor: f64) -> Self {
+        Self {
+            max_threshold,
+            scaling_factor,
+        }
+    }
+
+    fn mean_vector_and_covariance_matrix(data: &[&[u32]]) -> (Vec<f64>, Vec<f64>) {
+        let n = data.len();
+
+        // Compute mean vector
+        let mean_vec = {
+            let mut mean_vec = Vec::<f64>::new();
+            for var in data {
+                mean_vec.push({
+                    let mut sum = 0.0;
+                    for count in *var {
+                        sum += *count as f64;
+                    }
+                    sum / var.len() as f64
+                });
+            }
+            mean_vec
+        };
+
+        // Compute covariance matrix
+        let cov_mat = {
+            let mut cov_mat = Vec::<f64>::new();
+            // We don't need to recompute Syx, but we currently do
+            for i in 0..n {
+                for j in 0..n {
+                    cov_mat.push({
+                        let var1 = data[i];
+                        let var1_mean = mean_vec[i];
+
+                        let var2 = data[j];
+                        let var2_mean = mean_vec[j];
+
+                        assert_eq!(var1.len(), var2.len());
+
+                        let mut sum = 0.0;
+                        for index in 0..var1.len() {
+                            sum +=
+                                (var1[index] as f64 - var1_mean) * (var2[index] as f64 - var2_mean);
+                        }
+                        sum / var1.len() as f64
+                    });
+                }
+            }
+            cov_mat
+        };
+
+        (mean_vec, cov_mat)
+    }
+
+    /// Evaluate open-entry bridge based on only today's data
+    fn stage_one(&self, bridge_ips_today: u32, negative_reports_today: u32) -> bool {
+        negative_reports_today > self.max_threshold
+            || f64::from(negative_reports_today) > self.scaling_factor * f64::from(bridge_ips_today)
+    }
+
+    /// Evaluate invite-only bridge based on last 30 days
+    fn stage_two(
+        &self,
+        confidence: f64,
+        bridge_ips: &[u32],
+        bridge_ips_today: u32,
+        negative_reports: &[u32],
+        negative_reports_today: u32,
+    ) -> bool {
+        assert!(bridge_ips.len() >= UNTRUSTED_INTERVAL as usize);
+        assert_eq!(bridge_ips.len(), negative_reports.len());
+
+        let (mean_vec, cov_mat) =
+            Self::mean_vector_and_covariance_matrix(&[bridge_ips, negative_reports]);
+        let bridge_ips_mean = mean_vec[0];
+        let negative_reports_mean = mean_vec[1];
+
+        let mvn = MultivariateNormal::new(mean_vec, cov_mat).unwrap();
+        if mvn.pdf(&DVector::from_vec(vec![
+            bridge_ips_today as f64,
+            negative_reports_today as f64,
+        ])) < confidence
+        {
+            (negative_reports_today as f64) > negative_reports_mean
+                || (bridge_ips_today as f64) < bridge_ips_mean
+        } else {
+            false
+        }
+    }
+
+    /// Evaluate invite-only bridge with lv3+ users submitting positive reports
+    fn stage_three(
+        &self,
+        confidence: f64,
+        bridge_ips: &[u32],
+        bridge_ips_today: u32,
+        negative_reports: &[u32],
+        negative_reports_today: u32,
+        positive_reports: &[u32],
+        positive_reports_today: u32,
+    ) -> bool {
+        assert!(bridge_ips.len() >= UNTRUSTED_INTERVAL as usize);
+        assert_eq!(bridge_ips.len(), negative_reports.len());
+        assert_eq!(bridge_ips.len(), positive_reports.len());
+
+        let (mean_vec, cov_mat) = Self::mean_vector_and_covariance_matrix(&[
+            bridge_ips,
+            negative_reports,
+            positive_reports,
+        ]);
+        let bridge_ips_mean = mean_vec[0];
+        let negative_reports_mean = mean_vec[1];
+        let positive_reports_mean = mean_vec[2];
+
+        let mvn = MultivariateNormal::new(mean_vec, cov_mat).unwrap();
+        if mvn.pdf(&DVector::from_vec(vec![
+            bridge_ips_today as f64,
+            negative_reports_today as f64,
+            positive_reports_today as f64,
+        ])) < confidence
+        {
+            (negative_reports_today as f64) > negative_reports_mean
+                || (bridge_ips_today as f64) < bridge_ips_mean
+                || (positive_reports_today as f64) < positive_reports_mean
+        } else {
+            false
+        }
+    }
+}
+
+impl Analyzer for NormalAnalyzer {
     fn blocked_in(&self, bridge_info: &BridgeInfo, confidence: f64) -> HashSet<String> {
+        // TODO: Re-evaluate past days if we have backdated reports
         let mut blocked_in = HashSet::<String>::new();
+        let today = get_date();
+        let age = bridge_info.first_seen - today;
         for (country, info) in &bridge_info.info_by_country {
             if info.blocked {
+                // Assume bridges never become unblocked
                 blocked_in.insert(country.to_string());
+            } else {
+                // Get today's values
+                let new_map_binding = BTreeMap::<BridgeInfoType, u32>::new();
+                // TODO: Evaluate on yesterday if we don't have data for today?
+                let today_info = match info.info_by_day.get(&today) {
+                    Some(v) => v,
+                    None => &new_map_binding,
+                };
+                let bridge_ips_today = match today_info.get(&BridgeInfoType::BridgeIps) {
+                    Some(v) => *v,
+                    None => 0,
+                };
+                let negative_reports_today = match today_info.get(&BridgeInfoType::NegativeReports)
+                {
+                    Some(v) => *v,
+                    None => 0,
+                };
+                let positive_reports_today = match today_info.get(&BridgeInfoType::PositiveReports)
+                {
+                    Some(v) => *v,
+                    None => 0,
+                };
+
+                if age < UNTRUSTED_INTERVAL {
+                    // open-entry bridge
+                    if self.stage_one(bridge_ips_today, negative_reports_today) {
+                        blocked_in.insert(country.to_string());
+                    }
+                } else {
+                    // invite-only bridge
+                    let mut bridge_ips = [0; UNTRUSTED_INTERVAL as usize];
+                    let mut negative_reports = [0; UNTRUSTED_INTERVAL as usize];
+                    let mut positive_reports = [0; UNTRUSTED_INTERVAL as usize];
+                    let mut stage_3 = false;
+
+                    // Populate time series
+                    for i in 0..UNTRUSTED_INTERVAL {
+                        let date = today - UNTRUSTED_INTERVAL + i - 1;
+                        let new_map_binding = BTreeMap::<BridgeInfoType, u32>::new();
+                        let day_info = match info.info_by_day.get(&date) {
+                            Some(v) => v,
+                            None => &new_map_binding,
+                        };
+                        bridge_ips[i as usize] = match day_info.get(&BridgeInfoType::BridgeIps) {
+                            Some(v) => *v,
+                            None => 0,
+                        };
+                        negative_reports[i as usize] =
+                            match day_info.get(&BridgeInfoType::NegativeReports) {
+                                Some(v) => *v,
+                                None => 0,
+                            };
+                        positive_reports[i as usize] =
+                            match day_info.get(&BridgeInfoType::PositiveReports) {
+                                Some(v) => {
+                                    stage_3 = true;
+                                    *v
+                                }
+                                None => 0,
+                            };
+                    }
+
+                    if stage_3 {
+                        // We've seen positive reports
+                        if self.stage_three(
+                            confidence,
+                            &bridge_ips,
+                            bridge_ips_today,
+                            &negative_reports,
+                            negative_reports_today,
+                            &positive_reports,
+                            positive_reports_today,
+                        ) {
+                            blocked_in.insert(country.to_string());
+                        }
+                    } else {
+                        // We have not seen positive reports
+                        if self.stage_two(
+                            confidence,
+                            &bridge_ips,
+                            bridge_ips_today,
+                            &negative_reports,
+                            negative_reports_today,
+                        ) {
+                            blocked_in.insert(country.to_string());
+                        }
+                    }
+                }
             }
         }
         blocked_in

+ 32 - 3
src/bin/server.rs

@@ -41,8 +41,17 @@ pub struct Config {
     // map of distributor name to IP:port to contact it
     pub distributors: BTreeMap<BridgeDistributor, String>,
     extra_infos_base_url: String,
+
     // confidence required to consider a bridge blocked
     confidence: f64,
+
+    // block open-entry bridges if they get more negative reports than this
+    max_threshold: u32,
+
+    // block open-entry bridges if they get more negative reports than
+    // scaling_factor * bridge_ips
+    scaling_factor: f64,
+
     //require_bridge_token: bool,
     port: u16,
     updater_schedule: String,
@@ -67,13 +76,19 @@ async fn update_daily_info(
     distributors: &BTreeMap<BridgeDistributor, String>,
     extra_infos_base_url: &str,
     confidence: f64,
+    max_threshold: u32,
+    scaling_factor: f64,
 ) {
     update_extra_infos(&db, &extra_infos_base_url)
         .await
         .unwrap();
     update_negative_reports(&db, &distributors).await;
     update_positive_reports(&db, &distributors).await;
-    let new_blockages = guess_blockages(&db, &analyzer::ExampleAnalyzer {}, confidence);
+    let new_blockages = guess_blockages(
+        &db,
+        &analyzer::NormalAnalyzer::new(max_threshold, scaling_factor),
+        confidence,
+    );
     report_blockages(&distributors, new_blockages).await;
 }
 
@@ -86,11 +101,13 @@ async fn create_context_manager(
     distributors: BTreeMap<BridgeDistributor, String>,
     extra_infos_base_url: &str,
     confidence: f64,
+    max_threshold: u32,
+    scaling_factor: f64,
     context_rx: mpsc::Receiver<Command>,
     mut kill: broadcast::Receiver<()>,
 ) {
     tokio::select! {
-        create_context = context_manager(db_config, distributors, extra_infos_base_url, confidence, context_rx) => create_context,
+        create_context = context_manager(db_config, distributors, extra_infos_base_url, confidence, max_threshold, scaling_factor, context_rx) => create_context,
         _ = kill.recv() => {println!("Shut down manager");},
     }
 }
@@ -100,6 +117,8 @@ async fn context_manager(
     distributors: BTreeMap<BridgeDistributor, String>,
     extra_infos_base_url: &str,
     confidence: f64,
+    max_threshold: u32,
+    scaling_factor: f64,
     mut context_rx: mpsc::Receiver<Command>,
 ) {
     let db: Db = sled::open(&db_config.db_path).unwrap();
@@ -120,7 +139,15 @@ async fn context_manager(
                 println!("Shutdown Sent.");
             }
             Update {} => {
-                update_daily_info(&db, &distributors, &extra_infos_base_url, confidence).await;
+                update_daily_info(
+                    &db,
+                    &distributors,
+                    &extra_infos_base_url,
+                    confidence,
+                    max_threshold,
+                    scaling_factor,
+                )
+                .await;
             }
         }
     }
@@ -188,6 +215,8 @@ async fn main() {
             config.distributors,
             &config.extra_infos_base_url,
             config.confidence,
+            config.max_threshold,
+            config.scaling_factor,
             request_rx,
             kill,
         )