123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- use crate::extra_info::ExtraInfo;
- use hyper::{
- body::{self, Bytes},
- header::HeaderValue,
- server::conn::AddrStream,
- service::{make_service_fn, service_fn},
- Body, Method, Request, Response, Server,
- };
- use serde_json::json;
- use std::{collections::HashSet, convert::Infallible, net::SocketAddr, time::Duration};
- use tokio::{
- spawn,
- sync::{broadcast, mpsc, oneshot},
- time::sleep,
- };
- async fn serve_extra_infos(
- extra_infos_pages: &mut Vec<String>,
- req: Request<Body>,
- ) -> Result<Response<Body>, Infallible> {
- match req.method() {
- &Method::OPTIONS => Ok(Response::builder()
- .header("Access-Control-Allow-Origin", HeaderValue::from_static("*"))
- .header("Access-Control-Allow-Headers", "accept, content-type")
- .header("Access-Control-Allow-Methods", "POST")
- .status(200)
- .body(Body::from("Allow POST"))
- .unwrap()),
- _ => match req.uri().path() {
- "/" => Ok::<_, Infallible>(serve_index(&extra_infos_pages)),
- "/add" => Ok::<_, Infallible>({
- let bytes = body::to_bytes(req.into_body()).await.unwrap();
- add_extra_infos(extra_infos_pages, bytes)
- }),
- path => Ok::<_, Infallible>({
- // Serve the requested file
- serve_extra_infos_file(&extra_infos_pages, path)
- }),
- },
- }
- }
- pub async fn server() {
- let (context_tx, context_rx) = mpsc::channel(32);
- let request_tx = context_tx.clone();
- let shutdown_cmd_tx = context_tx.clone();
- let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16);
- let kill_context = shutdown_tx.subscribe();
- let context_manager =
- spawn(async move { create_context_manager(context_rx, kill_context).await });
- let addr = SocketAddr::from(([127, 0, 0, 1], 8004));
- let make_svc = make_service_fn(move |_conn: &AddrStream| {
- let request_tx = request_tx.clone();
- let service = service_fn(move |req| {
- let request_tx = request_tx.clone();
- let (response_tx, response_rx) = oneshot::channel();
- let cmd = Command::Request {
- req,
- sender: response_tx,
- };
- async move {
- request_tx.send(cmd).await.unwrap();
- response_rx.await.unwrap()
- }
- });
- async move { Ok::<_, Infallible>(service) }
- });
- let server = Server::bind(&addr).serve(make_svc);
- println!("Listening on localhost:8004");
- if let Err(e) = server.await {
- eprintln!("server error: {}", e);
- }
- }
- async fn create_context_manager(
- context_rx: mpsc::Receiver<Command>,
- mut kill: broadcast::Receiver<()>,
- ) {
- tokio::select! {
- create_context = context_manager(context_rx) => create_context,
- _ = kill.recv() => {println!("Shut down context_manager");},
- }
- }
- async fn context_manager(mut context_rx: mpsc::Receiver<Command>) {
- let mut extra_infos_pages = Vec::<String>::new();
- while let Some(cmd) = context_rx.recv().await {
- use Command::*;
- match cmd {
- Request { req, sender } => {
- let response = serve_extra_infos(&mut extra_infos_pages, req).await;
- if let Err(e) = sender.send(response) {
- eprintln!("Server Response Error: {:?}", e);
- }
- sleep(Duration::from_millis(1)).await;
- }
- Shutdown { shutdown_sig } => {
- drop(shutdown_sig);
- }
- }
- }
- }
- #[derive(Debug)]
- enum Command {
- Request {
- req: Request<Body>,
- sender: oneshot::Sender<Result<Response<Body>, Infallible>>,
- },
- Shutdown {
- shutdown_sig: broadcast::Sender<()>,
- },
- }
- fn add_extra_infos(extra_infos_pages: &mut Vec<String>, request: Bytes) -> Response<Body> {
- let extra_infos: HashSet<ExtraInfo> = match serde_json::from_slice(&request) {
- Ok(req) => req,
- Err(e) => {
- let response = json!({"error": e.to_string()});
- let val = serde_json::to_string(&response).unwrap();
- return prepare_header(val);
- }
- };
- let mut extra_infos_file = String::new();
- for extra_info in extra_infos {
- extra_infos_file.push_str(extra_info.to_string().as_str());
- }
- extra_infos_pages.push(extra_infos_file);
- prepare_header("OK".to_string())
- }
- fn serve_index(extra_infos_pages: &Vec<String>) -> Response<Body> {
- let mut body_str = String::new();
- for i in 0..extra_infos_pages.len() {
- body_str
- .push_str(format!("<a href=\"{}-extra-infos\">{}-extra-infos</a>\n", i, i).as_str());
- }
- prepare_header(body_str)
- }
- fn serve_extra_infos_file(extra_infos_pages: &Vec<String>, path: &str) -> Response<Body> {
- if path.ends_with("-extra-infos") {
- if let Ok(index) = &path[1..(path.len() - "-extra-infos".len())].parse::<usize>() {
- if extra_infos_pages.len() > *index {
- return prepare_header(extra_infos_pages[*index].clone());
- }
- }
- }
- prepare_header("Not a valid file".to_string())
- }
- // Prepare HTTP Response for successful Server Request
- fn prepare_header(response: String) -> Response<Body> {
- let mut resp = Response::new(Body::from(response));
- resp.headers_mut()
- .insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
- resp
- }
|