extra_infos_server.rs 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. use crate::extra_info::ExtraInfo;
  2. use hyper::{
  3. body::{self, Bytes},
  4. header::HeaderValue,
  5. server::conn::AddrStream,
  6. service::{make_service_fn, service_fn},
  7. Body, Method, Request, Response, Server,
  8. };
  9. use serde_json::json;
  10. use std::{collections::HashSet, convert::Infallible, net::SocketAddr, time::Duration};
  11. use tokio::{
  12. spawn,
  13. sync::{broadcast, mpsc, oneshot},
  14. time::sleep,
  15. };
  16. async fn serve_extra_infos(
  17. extra_infos_pages: &mut Vec<String>,
  18. req: Request<Body>,
  19. ) -> Result<Response<Body>, Infallible> {
  20. match req.method() {
  21. &Method::OPTIONS => Ok(Response::builder()
  22. .header("Access-Control-Allow-Origin", HeaderValue::from_static("*"))
  23. .header("Access-Control-Allow-Headers", "accept, content-type")
  24. .header("Access-Control-Allow-Methods", "POST")
  25. .status(200)
  26. .body(Body::from("Allow POST"))
  27. .unwrap()),
  28. _ => match req.uri().path() {
  29. "/" => Ok::<_, Infallible>(serve_index(&extra_infos_pages)),
  30. "/add" => Ok::<_, Infallible>({
  31. let bytes = body::to_bytes(req.into_body()).await.unwrap();
  32. add_extra_infos(extra_infos_pages, bytes)
  33. }),
  34. path => Ok::<_, Infallible>({
  35. // Serve the requested file
  36. serve_extra_infos_file(&extra_infos_pages, path)
  37. }),
  38. },
  39. }
  40. }
  41. pub async fn server() {
  42. let (context_tx, context_rx) = mpsc::channel(32);
  43. let request_tx = context_tx.clone();
  44. let shutdown_cmd_tx = context_tx.clone();
  45. let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16);
  46. let kill_context = shutdown_tx.subscribe();
  47. let context_manager =
  48. spawn(async move { create_context_manager(context_rx, kill_context).await });
  49. let addr = SocketAddr::from(([127, 0, 0, 1], 8004));
  50. let make_svc = make_service_fn(move |_conn: &AddrStream| {
  51. let request_tx = request_tx.clone();
  52. let service = service_fn(move |req| {
  53. let request_tx = request_tx.clone();
  54. let (response_tx, response_rx) = oneshot::channel();
  55. let cmd = Command::Request {
  56. req,
  57. sender: response_tx,
  58. };
  59. async move {
  60. request_tx.send(cmd).await.unwrap();
  61. response_rx.await.unwrap()
  62. }
  63. });
  64. async move { Ok::<_, Infallible>(service) }
  65. });
  66. let server = Server::bind(&addr).serve(make_svc);
  67. println!("Listening on localhost:8004");
  68. if let Err(e) = server.await {
  69. eprintln!("server error: {}", e);
  70. }
  71. }
  72. async fn create_context_manager(
  73. context_rx: mpsc::Receiver<Command>,
  74. mut kill: broadcast::Receiver<()>,
  75. ) {
  76. tokio::select! {
  77. create_context = context_manager(context_rx) => create_context,
  78. _ = kill.recv() => {println!("Shut down context_manager");},
  79. }
  80. }
  81. async fn context_manager(mut context_rx: mpsc::Receiver<Command>) {
  82. let mut extra_infos_pages = Vec::<String>::new();
  83. while let Some(cmd) = context_rx.recv().await {
  84. use Command::*;
  85. match cmd {
  86. Request { req, sender } => {
  87. let response = serve_extra_infos(&mut extra_infos_pages, req).await;
  88. if let Err(e) = sender.send(response) {
  89. eprintln!("Server Response Error: {:?}", e);
  90. }
  91. sleep(Duration::from_millis(1)).await;
  92. }
  93. Shutdown { shutdown_sig } => {
  94. drop(shutdown_sig);
  95. }
  96. }
  97. }
  98. }
  99. #[derive(Debug)]
  100. enum Command {
  101. Request {
  102. req: Request<Body>,
  103. sender: oneshot::Sender<Result<Response<Body>, Infallible>>,
  104. },
  105. Shutdown {
  106. shutdown_sig: broadcast::Sender<()>,
  107. },
  108. }
  109. fn add_extra_infos(extra_infos_pages: &mut Vec<String>, request: Bytes) -> Response<Body> {
  110. let extra_infos: HashSet<ExtraInfo> = match serde_json::from_slice(&request) {
  111. Ok(req) => req,
  112. Err(e) => {
  113. let response = json!({"error": e.to_string()});
  114. let val = serde_json::to_string(&response).unwrap();
  115. return prepare_header(val);
  116. }
  117. };
  118. let mut extra_infos_file = String::new();
  119. for extra_info in extra_infos {
  120. extra_infos_file.push_str(extra_info.to_string().as_str());
  121. }
  122. extra_infos_pages.push(extra_infos_file);
  123. prepare_header("OK".to_string())
  124. }
  125. fn serve_index(extra_infos_pages: &Vec<String>) -> Response<Body> {
  126. let mut body_str = String::new();
  127. for i in 0..extra_infos_pages.len() {
  128. body_str
  129. .push_str(format!("<a href=\"{}-extra-infos\">{}-extra-infos</a>\n", i, i).as_str());
  130. }
  131. prepare_header(body_str)
  132. }
  133. fn serve_extra_infos_file(extra_infos_pages: &Vec<String>, path: &str) -> Response<Body> {
  134. if path.ends_with("-extra-infos") {
  135. if let Ok(index) = &path[1..(path.len() - "-extra-infos".len())].parse::<usize>() {
  136. if extra_infos_pages.len() > *index {
  137. return prepare_header(extra_infos_pages[*index].clone());
  138. }
  139. }
  140. }
  141. prepare_header("Not a valid file".to_string())
  142. }
  143. // Prepare HTTP Response for successful Server Request
  144. fn prepare_header(response: String) -> Response<Body> {
  145. let mut resp = Response::new(Body::from(response));
  146. resp.headers_mut()
  147. .insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
  148. resp
  149. }