Browse Source

Set up daily updater to be controlled by simulation

Vecna 9 months ago
parent
commit
895623a2a2
3 changed files with 87 additions and 21 deletions
  1. 3 0
      config.json
  2. 83 20
      src/bin/server.rs
  3. 1 1
      src/request_handler.rs

+ 3 - 0
config.json

@@ -10,7 +10,10 @@
     "confidence": 0.95,
     "max_threshold": 5,
     "scaling_factor": 0.25,
+    "min_historical_days": 30,
+    "max_historical_days": 30,
     "port": 8003,
     "require_bridge_token": false,
+    "updater_port": 8123,
     "updater_schedule": "* * 22 * * * *"
 }

+ 83 - 20
src/bin/server.rs

@@ -1,7 +1,8 @@
-use troll_patrol::{request_handler::handle, *};
+use troll_patrol::{request_handler::*, *};
 
 use clap::Parser;
 use futures::future;
+use futures::join;
 use hyper::{
     server::conn::AddrStream,
     service::{make_service_fn, service_fn},
@@ -10,14 +11,20 @@ use hyper::{
 use serde::Deserialize;
 use sled::Db;
 use std::{
-    collections::BTreeMap, convert::Infallible, fs::File, io::BufReader, net::SocketAddr,
-    path::PathBuf, time::Duration,
+    collections::{BTreeMap, HashMap, HashSet},
+    convert::Infallible,
+    fs::File,
+    io::BufReader,
+    net::SocketAddr,
+    path::PathBuf,
+    time::Duration,
 };
 use tokio::{
     signal, spawn,
     sync::{broadcast, mpsc, oneshot},
     time::sleep,
 };
+#[cfg(not(features = "simulation"))]
 use tokio_cron::{Job, Scheduler};
 
 async fn shutdown_signal() {
@@ -60,6 +67,7 @@ pub struct Config {
 
     //require_bridge_token: bool,
     port: u16,
+    updater_port: u16,
     updater_schedule: String,
 }
 
@@ -86,7 +94,7 @@ async fn update_daily_info(
     scaling_factor: f64,
     min_historical_days: u32,
     max_historical_days: u32,
-) {
+) -> HashMap<[u8; 20], HashSet<String>> {
     update_extra_infos(&db, &extra_infos_base_url)
         .await
         .unwrap();
@@ -99,15 +107,22 @@ async fn update_daily_info(
         min_historical_days,
         max_historical_days,
     );
-    report_blockages(&distributors, new_blockages).await;
+    report_blockages(&distributors, new_blockages.clone()).await;
 
     // Generate tomorrow's key if we don't already have it
     new_negative_report_key(&db, get_date() + 1);
+
+    // Return new detected blockages
+    new_blockages
 }
 
+/*
 async fn run_updater(updater_tx: mpsc::Sender<Command>) {
-    updater_tx.send(Command::Update {}).await.unwrap();
+    updater_tx.send(Command::Update {
+
+    }).await.unwrap();
 }
+*/
 
 async fn create_context_manager(
     db_config: DbConfig,
@@ -155,8 +170,8 @@ async fn context_manager(
                 drop(shutdown_sig);
                 println!("Shutdown Sent.");
             }
-            Update {} => {
-                update_daily_info(
+            Update { _req, sender } => {
+                let blockages = update_daily_info(
                     &db,
                     &distributors,
                     &extra_infos_base_url,
@@ -167,6 +182,23 @@ async fn context_manager(
                     max_historical_days,
                 )
                 .await;
+                let response = if cfg!(feature = "simulation") {
+                    // Convert map keys from [u8; 20] to 40-character hex strings
+                    let mut blockages_str = HashMap::<String, HashSet<String>>::new();
+                    for (fingerprint, countries) in blockages {
+                        let fpr_string = array_bytes::bytes2hex("", fingerprint);
+                        blockages_str.insert(fpr_string, countries);
+                    }
+                    Ok(prepare_header(
+                        serde_json::to_string(&blockages_str).unwrap(),
+                    ))
+                } else {
+                    Ok(prepare_header("OK".to_string()))
+                };
+                if let Err(e) = sender.send(response) {
+                    eprintln!("Update Response Error: {:?}", e);
+                };
+                sleep(Duration::from_millis(1)).await;
             }
         }
     }
@@ -182,7 +214,10 @@ enum Command {
     Shutdown {
         shutdown_sig: broadcast::Sender<()>,
     },
-    Update {},
+    Update {
+        _req: Request<Body>,
+        sender: oneshot::Sender<Result<Response<Body>, Infallible>>,
+    },
 }
 
 #[tokio::main]
@@ -218,14 +253,17 @@ async fn main() {
         }
     });
 
-    let updater = spawn(async move {
-        // Run updater once per day
-        let mut sched = Scheduler::utc();
-        sched.add(Job::new(config.updater_schedule, move || {
-            run_updater(updater_tx.clone())
-        }));
-    });
-
+    // TODO: Reintroduce this
+    /*
+        #[cfg(not(feature = "simulation"))]
+        let updater = spawn(async move {
+            // Run updater once per day
+            let mut sched = Scheduler::utc();
+            sched.add(Job::new(config.updater_schedule, move || {
+                run_updater(updater_tx.clone())
+            }));
+        });
+    */
     let context_manager = spawn(async move {
         create_context_manager(
             config.db,
@@ -259,12 +297,37 @@ async fn main() {
         async move { Ok::<_, Infallible>(service) }
     });
 
+    let updater_make_service = make_service_fn(move |_conn: &AddrStream| {
+        let request_tx = updater_tx.clone();
+        let service = service_fn(move |_req| {
+            let request_tx = request_tx.clone();
+            let (response_tx, response_rx) = oneshot::channel();
+            let cmd = Command::Update {
+                _req,
+                sender: response_tx,
+            };
+            async move {
+                request_tx.send(cmd).await.unwrap();
+                response_rx.await.unwrap()
+            }
+        });
+        async move { Ok::<_, Infallible>(service) }
+    });
+
     let addr = SocketAddr::from(([0, 0, 0, 0], config.port));
     let server = Server::bind(&addr).serve(make_service);
     let graceful = server.with_graceful_shutdown(shutdown_signal());
+    let updater_addr = SocketAddr::from(([127, 0, 0, 1], config.updater_port));
+    let updater_server = Server::bind(&updater_addr).serve(updater_make_service);
+    let updater_graceful = updater_server.with_graceful_shutdown(shutdown_signal());
     println!("Listening on {}", addr);
-    if let Err(e) = graceful.await {
-        eprintln!("server error: {}", e);
+    println!("Updater listening on {}", updater_addr);
+    let (a, b) = join!(graceful, updater_graceful);
+    if a.is_err() {
+        eprintln!("server error: {}", a.unwrap_err());
+    }
+    if b.is_err() {
+        eprintln!("server error: {}", b.unwrap_err());
     }
-    future::join_all([context_manager, updater, shutdown_handler]).await;
+    future::join_all([context_manager, shutdown_handler]).await;
 }

+ 1 - 1
src/request_handler.rs

@@ -76,7 +76,7 @@ pub async fn handle(db: &Db, req: Request<Body>) -> Result<Response<Body>, Infal
 }
 
 // Prepare HTTP Response for successful Server Request
-fn prepare_header(response: String) -> Response<Body> {
+pub fn prepare_header(response: String) -> Response<Body> {
     let mut resp = Response::new(Body::from(response));
     resp.headers_mut()
         .insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));