Browse Source

server: add instrumentation

Justin Tracey 2 months ago
parent
commit
554f035015
2 changed files with 32 additions and 2 deletions
  1. 2 1
      Cargo.toml
  2. 30 1
      src/bin/mgen-server.rs

+ 2 - 1
Cargo.toml

@@ -19,6 +19,7 @@ rustls-pemfile = "1.0.3"
 serde = { version = "1.0.158", features = ["derive"] }
 serde_yaml = "0.9.21"
 tokio = { version = "1", features = ["full"] }
+tokio-metrics = { version = "0.3.1", optional = true }
 tokio-rustls = { version = "0.24.1", features = ["dangerous_configuration"] }
 tokio-socks = "0.5.1"
 tracing = { version = "0.1.40", optional = true }
@@ -26,7 +27,7 @@ url = "2.4.0"
 
 [features]
 default = []
-tracing = ["dep:console-subscriber", "dep:tracing", "tokio/tracing"]
+tracing = ["dep:console-subscriber", "dep:tracing", "dep:tokio-metrics", "tokio/tracing"]
 
 [profile.release]
 lto = true

+ 30 - 1
src/bin/mgen-server.rs

@@ -22,6 +22,19 @@ type ReaderToSender = mpsc::UnboundedSender<Arc<SerializedMessage>>;
 type WriterDb = HashMap<Handshake, Updater<(WriteHalf<TlsStream<TcpStream>>, Arc<Notify>)>>;
 type SndDb = HashMap<ID, Arc<RwLock<HashMap<ID, ReaderToSender>>>>;
 
+#[cfg(feature = "tracing")]
+async fn tracing(metrics_monitor: tokio_metrics::TaskMonitor) {
+    console_subscriber::init();
+    let handle = tokio::runtime::Handle::current();
+    let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle);
+
+    for intervals in std::iter::zip(metrics_monitor.intervals(), runtime_monitor.intervals()) {
+        log!("{:?}", intervals.0);
+        log!("{:?}", intervals.1);
+        tokio::time::sleep(Duration::from_secs(5)).await;
+    }
+}
+
 fn main() -> Result<(), Box<dyn std::error::Error>> {
     tokio::runtime::Builder::new_multi_thread()
         .worker_threads(10)
@@ -34,7 +47,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
 
 async fn main_worker() -> Result<(), Box<dyn Error>> {
     #[cfg(feature = "tracing")]
-    console_subscriber::init();
+    let metrics_monitor = {
+        let metrics_monitor = tokio_metrics::TaskMonitor::new();
+        tokio::spawn(tracing(metrics_monitor.clone()));
+        metrics_monitor
+    };
 
     let mut args = std::env::args();
     let _arg0 = args.next().unwrap();
@@ -98,6 +115,18 @@ async fn main_worker() -> Result<(), Box<dyn Error>> {
         let writer_db = writer_db.clone();
         let snd_db = snd_db.clone();
         let phase_notify = phase_notify.clone();
+        #[cfg(feature = "tracing")]
+        tokio::spawn(metrics_monitor.instrument(async move {
+            handle_handshake::</*REGISTRATION_PHASE=*/ true>(
+                stream,
+                acceptor,
+                writer_db,
+                snd_db,
+                phase_notify,
+            )
+            .await
+        }));
+        #[cfg(not(feature = "tracing"))]
         tokio::spawn(async move {
             handle_handshake::</*REGISTRATION_PHASE=*/ true>(
                 stream,