Browse Source

Listen for reports

Downgrade hyper, listen for new reports, TODO: test
Vecna 2 months ago
parent
commit
8ea6940397
7 changed files with 260 additions and 42 deletions
  1. 5 3
      Cargo.toml
  2. 171 12
      src/bin/server.rs
  3. 3 2
      src/extra_info.rs
  4. 1 22
      src/lib.rs
  5. 9 1
      src/negative_report.rs
  6. 10 2
      src/positive_report.rs
  7. 61 0
      src/request_handler.rs

+ 5 - 3
Cargo.toml

@@ -12,17 +12,19 @@ chrono = "0.4"
 clap = { version = "4.4.14", features = ["derive"] }
 curve25519-dalek = { version = "4", default-features = false, features = ["serde", "rand_core", "digest"] }
 ed25519-dalek = { version = "2", features = ["serde", "rand_core"] }
+futures = "0.3.30"
+http = "1"
 http-body-util = "0.1"
-hyper = { version = "1", features = ["full"] }
+hyper = { version = "0.14.28", features = ["full"] }
 hyper-rustls = "0.26.0"
 hyper-util = { version = "0.1", features = ["full"] }
 julianday = "1.2.0"
 lazy_static = "1"
 lox-library = { git = "https://gitlab.torproject.org/vecna/lox.git", version = "0.1.0" }
 select = "0.6.0"
-serde = "1.0.195"
+serde = "1.0.197"
 serde_json = "1.0"
-serde_with = {version = "3.5.0", features = ["json"]}
+serde_with = {version = "3.7.0", features = ["json"]}
 sha1 = "0.10"
 sha3 = "0.10"
 sled = "0.34.7"

+ 171 - 12
src/bin/server.rs

@@ -2,25 +2,66 @@ use troll_patrol::{
     extra_info::{self, ExtraInfo},
     //negative_report::SerializableNegativeReport,
     //positive_report::SerializablePositiveReport,
+    request_handler::handle,
     *,
 };
 
 use clap::Parser;
+use futures::future;
+use hyper::{
+    server::conn::AddrStream,
+    service::{make_service_fn, service_fn},
+    Body, Request, Response, Server,
+};
+use serde::Deserialize;
 use sled::Db;
-use std::{collections::HashSet, path::PathBuf};
+use std::{
+    collections::HashSet, convert::Infallible, fs::File, io::BufReader, net::SocketAddr,
+    path::PathBuf, time::Duration,
+};
+use tokio::{
+    signal, spawn,
+    sync::{broadcast, mpsc, oneshot},
+    time::sleep,
+};
 
-#[tokio::main]
-async fn main() {
-    // TODO: Currently, we're processing extra-infos here, but we want to:
-    // 1. Run a server to accept incoming reports
-    // 2. Periodically (daily):
-    //   a) download new extra-infos
-    //   b) determine whether we think each bridge is blocked or not
-    //   c) report these results to the LA
-    // 3. Store all our data
+async fn shutdown_signal() {
+    tokio::signal::ctrl_c()
+        .await
+        .expect("failed to listen for ctrl+c signal");
+    println!("Shut down Troll Patrol Server");
+}
 
-    let db: Db = sled::open(&CONFIG.db.db_path).unwrap();
+#[derive(Parser, Debug)]
+#[command(author, version, about, long_about = None)]
+struct Args {
+    /// Name/path of the configuration file
+    #[arg(short, long, default_value = "config.json")]
+    config: PathBuf,
+}
 
+#[derive(Debug, Deserialize)]
+pub struct Config {
+    pub db: DbConfig,
+    //require_bridge_token: bool,
+    port: u16,
+}
+
+#[derive(Debug, Deserialize)]
+pub struct DbConfig {
+    // The path for the server database, default is "server_db"
+    pub db_path: String,
+}
+
+impl Default for DbConfig {
+    fn default() -> DbConfig {
+        DbConfig {
+            db_path: "server_db".to_owned(),
+        }
+    }
+}
+
+async fn update_extra_infos(db: &Db) {
     // Track which files have been processed. This is slightly redundant
     // because we're only downloading files we don't already have, but it
     // might be a good idea to check in case we downloaded a file but didn't
@@ -45,5 +86,123 @@ async fn main() {
         add_extra_info_to_db(&db, extra_info);
     }
 
-    db.insert(b"extra_infos_files", bincode::serialize(&processed_extra_infos_files).unwrap()).unwrap();
+    db.insert(
+        b"extra_infos_files",
+        bincode::serialize(&processed_extra_infos_files).unwrap(),
+    )
+    .unwrap();
+}
+
+async fn create_context_manager(
+    db_config: DbConfig,
+    context_rx: mpsc::Receiver<Command>,
+    mut kill: broadcast::Receiver<()>,
+) {
+    tokio::select! {
+        create_context = context_manager(db_config, context_rx) => create_context,
+        _ = kill.recv() => {println!("Shut down manager");},
+    }
+}
+
+async fn context_manager(db_config: DbConfig, mut context_rx: mpsc::Receiver<Command>) {
+    let db: Db = sled::open(&db_config.db_path).unwrap();
+
+    while let Some(cmd) = context_rx.recv().await {
+        use Command::*;
+        match cmd {
+            Request { req, sender } => {
+                let response = handle(&db, req).await;
+                if let Err(e) = sender.send(response) {
+                    eprintln!("Server Response Error: {:?}", e);
+                };
+                sleep(Duration::from_millis(1)).await;
+            }
+            Shutdown { shutdown_sig } => {
+                println!("Sending Shutdown Signal, all threads should shutdown.");
+                drop(shutdown_sig);
+                println!("Shutdown Sent.");
+            }
+        }
+    }
+}
+
+// Each of the commands that can be handled
+#[derive(Debug)]
+enum Command {
+    Request {
+        req: Request<Body>,
+        sender: oneshot::Sender<Result<Response<Body>, Infallible>>,
+    },
+    Shutdown {
+        shutdown_sig: broadcast::Sender<()>,
+    },
+}
+
+#[tokio::main]
+async fn main() {
+    // TODO: Currently, we're processing extra-infos here, but we want to:
+    // 2. Periodically (daily):
+    //   a) download new extra-infos
+    //   b) determine whether we think each bridge is blocked or not
+    //   c) report these results to the LA
+    // 3. Store all our data
+
+    let args: Args = Args::parse();
+
+    let config: Config = serde_json::from_reader(BufReader::new(
+        File::open(&args.config).expect("Could not read config file"),
+    ))
+    .expect("Reading config file from JSON failed");
+
+    let (request_tx, request_rx) = mpsc::channel(32);
+
+    let shutdown_cmd_tx = request_tx.clone();
+
+    // create the shutdown broadcast channel and clone for every thread
+    let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16);
+    let kill = shutdown_tx.subscribe();
+
+    // Listen for ctrl_c, send signal to broadcast shutdown to all threads by dropping shutdown_tx
+    let shutdown_handler = spawn(async move {
+        tokio::select! {
+            _ = signal::ctrl_c() => {
+                let cmd = Command::Shutdown {
+                    shutdown_sig: shutdown_tx,
+                };
+                shutdown_cmd_tx.send(cmd).await.unwrap();
+                sleep(Duration::from_secs(1)).await;
+
+                _ = shutdown_rx.recv().await;
+            }
+        }
+    });
+
+    let context_manager =
+        spawn(async move { create_context_manager(config.db, request_rx, kill).await });
+
+    let make_service = make_service_fn(move |_conn: &AddrStream| {
+        let request_tx = request_tx.clone();
+        let service = service_fn(move |req| {
+            let request_tx = request_tx.clone();
+            let (response_tx, response_rx) = oneshot::channel();
+            let cmd = Command::Request {
+                req,
+                sender: response_tx,
+            };
+            async move {
+                request_tx.send(cmd).await.unwrap();
+                response_rx.await.unwrap()
+            }
+        });
+        async move { Ok::<_, Infallible>(service) }
+    });
+
+    let addr = SocketAddr::from(([0, 0, 0, 0], config.port));
+    let server = Server::bind(&addr).serve(make_service);
+    let graceful = server.with_graceful_shutdown(shutdown_signal());
+    println!("Listening on {}", addr);
+    if let Err(e) = graceful.await {
+        eprintln!("server error: {}", e);
+    }
+    future::join_all([context_manager, shutdown_handler]).await;
 }

+ 3 - 2
src/extra_info.rs

@@ -3,6 +3,7 @@ Note, this is NOT a complete implementation of the document format.
 (https://spec.torproject.org/dir-spec/extra-info-document-format.html) */
 
 use chrono::DateTime;
+use http::status::StatusCode;
 use http_body_util::{BodyExt, Empty};
 use hyper::body::Bytes;
 use hyper_util::{client::legacy::Client, rt::TokioExecutor};
@@ -164,7 +165,7 @@ pub async fn download_extra_infos(
     println!("Downloading {}", base_url);
     let mut res = client.get(url).await?;
 
-    assert_eq!(res.status(), hyper::StatusCode::OK);
+    assert_eq!(res.status(), StatusCode::OK);
     let mut body_str = String::from("");
     while let Some(next) = res.frame().await {
         let frame = next?;
@@ -191,7 +192,7 @@ pub async fn download_extra_infos(
                 let extra_infos_url = format!("{}{}", base_url, link);
                 println!("Downloading {}", extra_infos_url);
                 let mut res = client.get(extra_infos_url.parse().unwrap()).await?;
-                assert_eq!(res.status(), hyper::StatusCode::OK);
+                assert_eq!(res.status(), StatusCode::OK);
                 let mut file = std::fs::File::create(filename).unwrap();
                 while let Some(next) = res.frame().await {
                     let frame = next?;

File diff suppressed because it is too large
+ 1 - 22
src/lib.rs


+ 9 - 1
src/negative_report.rs

@@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
 use sha1::{Digest, Sha1};
 use sha3::Sha3_256;
 
-#[derive(Debug)]
+#[derive(Debug, Serialize)]
 pub enum NegativeReportError {
     DateInFuture,
     FailedToDeserialize, // couldn't deserialize to SerializableNegativeReport
@@ -102,6 +102,14 @@ impl NegativeReport {
         }
     }
 
+    /// Deserializes the report from slice, eliding the underlying process
+    pub fn from_slice(slice: &[u8]) -> Result<Self, NegativeReportError> {
+        match serde_json::from_slice::<SerializableNegativeReport>(&slice) {
+            Ok(v) => v.to_report(),
+            Err(_) => Err(NegativeReportError::FailedToDeserialize),
+        }
+    }
+
     /// Verify the report
     pub fn verify(self, bridge_info: &BridgeVerificationInfo) -> bool {
         match self.bridge_pok {

+ 10 - 2
src/positive_report.rs

@@ -1,7 +1,7 @@
 // For Lox-related code where points are uppercase and scalars are lowercase
 #![allow(non_snake_case)]
 
-use crate::{bridge_verification_info::BridgeVerificationInfo, get_date, CONFIG, COUNTRY_CODES};
+use crate::{bridge_verification_info::BridgeVerificationInfo, get_date, COUNTRY_CODES};
 
 use curve25519_dalek::ristretto::RistrettoBasepointTable;
 use ed25519_dalek::{Signature, Signer, SigningKey, Verifier};
@@ -12,7 +12,7 @@ use std::option::Option;
 
 pub const REQUIRE_BRIDGE_TOKEN: bool = false;
 
-#[derive(Debug)]
+#[derive(Debug, Serialize)]
 pub enum PositiveReportError {
     DateInFuture,
     FailedToDeserialize, // couldn't deserialize to SerializablePositiveReport
@@ -105,6 +105,14 @@ impl PositiveReport {
         }
     }
 
+    /// Deserializes the report from slice, eliding the underlying process
+    pub fn from_slice(slice: &[u8]) -> Result<Self, PositiveReportError> {
+        match serde_json::from_slice::<SerializablePositiveReport>(&slice) {
+            Ok(v) => v.to_report(),
+            Err(_) => Err(PositiveReportError::FailedToDeserialize),
+        }
+    }
+
     /// Verify report
     pub fn verify(
         self,

+ 61 - 0
src/request_handler.rs

@@ -0,0 +1,61 @@
+use crate::{negative_report::NegativeReport, positive_report::PositiveReport, *};
+use hyper::{body, header::HeaderValue, Body, Method, Request, Response, StatusCode};
+use serde_json::json;
+use sled::Db;
+use std::convert::Infallible;
+
+// Handle submitted reports
+pub async fn handle(db: &Db, req: Request<Body>) -> Result<Response<Body>, Infallible> {
+    match req.method() {
+        &Method::OPTIONS => Ok(Response::builder()
+            .header("Access-Control-Allow-Origin", HeaderValue::from_static("*"))
+            .header("Access-Control-Allow-Headers", "accept, content-type")
+            .header("Access-Control-Allow-Methods", "POST")
+            .status(200)
+            .body(Body::from("Allow POST"))
+            .unwrap()),
+        _ => match (req.method(), req.uri().path()) {
+            (&Method::POST, "/negativereport") => Ok::<_, Infallible>({
+                let bytes = body::to_bytes(req.into_body()).await.unwrap();
+                let nr = match NegativeReport::from_slice(&bytes) {
+                    Ok(nr) => nr,
+                    Err(e) => {
+                        let response = json!({"error": e});
+                        let val = serde_json::to_string(&response).unwrap();
+                        return Ok(prepare_header(val));
+                    }
+                };
+                save_negative_report_to_process(&db, nr);
+                prepare_header("OK".to_string())
+            }),
+            (&Method::POST, "/positivereport") => Ok::<_, Infallible>({
+                let bytes = body::to_bytes(req.into_body()).await.unwrap();
+                let pr = match PositiveReport::from_slice(&bytes) {
+                    Ok(pr) => pr,
+                    Err(e) => {
+                        let response = json!({"error": e});
+                        let val = serde_json::to_string(&response).unwrap();
+                        return Ok(prepare_header(val));
+                    }
+                };
+                save_positive_report_to_process(&db, pr);
+                prepare_header("OK".to_string())
+            }),
+            _ => {
+                // Return 404 not found response.
+                Ok(Response::builder()
+                    .status(StatusCode::NOT_FOUND)
+                    .body(Body::from("Not found"))
+                    .unwrap())
+            }
+        },
+    }
+}
+
+// Prepare HTTP Response for successful Server Request
+fn prepare_header(response: String) -> Response<Body> {
+    let mut resp = Response::new(Body::from(response));
+    resp.headers_mut()
+        .insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
+    resp
+}

Some files were not shown because too many files changed in this diff