diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 7d6adbc..17ba412 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -9,10 +9,9 @@ use std::{ }; use tokio::{ - process::Child, select, signal::unix::{signal, SignalKind}, - sync::{broadcast, mpsc, oneshot, Mutex, Semaphore}, + sync::{broadcast, mpsc, Mutex, Semaphore}, }; use tracing::info; @@ -50,7 +49,6 @@ pub async fn daemon( return Err("a start command failed, exiting.".into()); } - let mut stream_process_child_handles = Vec::new(); let mut stream_task_handles = Vec::new(); let (log_tx, log_rx) = mpsc::channel(234560); @@ -106,26 +104,18 @@ pub async fn daemon( let stream_filter_managers = Arc::new(stream_filter_managers); for (stream, filter_managers) in stream_filter_managers.s.iter() { - let (child_tx, child_rx) = oneshot::channel(); let filter_managers = filter_managers.clone(); let stream = *stream; + let stream_shutdow_rx = shutdown_rx.resubscribe(); stream_task_handles.push(tokio::spawn(async move { - stream_manager(stream, child_tx, filter_managers).await + stream_manager(stream, filter_managers, stream_shutdow_rx).await })); - - if let Ok(Some(child)) = child_rx.await { - stream_process_child_handles.push(child); - } } // Close streams when we receive a quit signal let signal_received = Arc::new(AtomicBool::new(false)); - handle_signals( - stream_process_child_handles, - shutdown_tx.clone(), - signal_received.clone(), - )?; + handle_signals(shutdown_tx.clone(), signal_received.clone())?; let socket_manager_task_handle = { let socket = socket.to_owned(); @@ -155,7 +145,6 @@ pub async fn daemon( } fn handle_signals( - stream_process_child_handles: Vec, shutdown_tx: broadcast::Sender<()>, signal_received: Arc, ) -> tokio::io::Result<()> { @@ -168,13 +157,9 @@ fn handle_signals( _ = sigint.recv() => "SIGINT", _ = sigterm.recv() => "SIGTERM", }; + info!("received {signal}, closing streams..."); let _ = shutdown_tx.send(()); signal_received.store(true, Ordering::SeqCst); - info!("received {signal}, closing streams..."); - // Kill stream subprocesses - for mut child_handle in stream_process_child_handles.into_iter() { - tokio::spawn(async move { child_handle.kill().await }); - } }); Ok(()) } diff --git a/src/daemon/stream.rs b/src/daemon/stream.rs index 26f67bd..b4fc031 100644 --- a/src/daemon/stream.rs +++ b/src/daemon/stream.rs @@ -1,11 +1,11 @@ use std::{collections::HashMap, process::Stdio, sync::Arc, task::Poll}; -use futures::StreamExt; +use futures::{pin_mut, FutureExt, StreamExt}; use tokio::{ io::{AsyncBufReadExt, BufReader, Lines}, pin, - process::{Child, Command}, - sync::{oneshot, Mutex}, + process::Command, + sync::{broadcast, Mutex}, }; use tracing::{error, info}; @@ -28,8 +28,8 @@ fn lines_to_stream( pub async fn stream_manager( stream: &'static Stream, - child_tx: oneshot::Sender>, filter_managers: Arc>>, + mut shutdown_rx: broadcast::Receiver<()>, ) { info!("{}: start {:?}", stream.name(), stream.cmd()); let mut child = match Command::new(&stream.cmd()[0]) @@ -42,7 +42,6 @@ pub async fn stream_manager( Ok(child) => child, Err(err) => { error!("could not execute stream {} cmd: {}", stream.name(), err); - let _ = child_tx.send(None); return; } }; @@ -58,33 +57,34 @@ pub async fn stream_manager( // aggregate outputs, will end when both streams end let mut lines = futures::stream::select(lines_stdout, lines_stderr); - // let main handle the child process - let _ = child_tx.send(Some(child)); + // fuse the future right after the first and only message we expect on this channel + let shutdown_watch = shutdown_rx.recv().fuse(); + pin_mut!(shutdown_watch); + + let mut child_killed = false; loop { - match lines.next().await { - Some(Ok(line)) => { - futures::future::join_all( - filter_managers - .lock() - .await - .values_mut() - .map(|manager| manager.handle_line(&line)), - ) - .await; - } - Some(Err(err)) => { - error!( - "impossible to read output from stream {}: {}", - stream.name(), - err - ); - break; - } - None => { - error!("stream {} exited: its command returned.", stream.name()); - break; + futures::select!( + _ = shutdown_watch => { + // reaction is shutting down, kill child process, we'll reap it after + // processing the remaining I/O + let _ = child.start_kill(); + child_killed = true; + }, + line = lines.next() => { + if handle_io(line, &filter_managers, stream.name(), child_killed).await.is_err() { + break; + } } + ); + } + + // reap the child process if already dead or kill it (might issue sigkill twice with the one + // above). + match child.try_wait() { + Ok(Some(_)) => {} + _ => { + let _ = child.kill().await; } } @@ -94,3 +94,39 @@ pub async fn stream_manager( .values_mut() .for_each(|manager| manager.quit()); } + +async fn handle_io( + line: Option>, + filter_managers: &Arc>>, + stream_name: &str, + child_killed: bool, +) -> Result<(), ()> { + match line { + Some(Ok(line)) => { + futures::future::join_all( + filter_managers + .lock() + .await + .values_mut() + .map(|manager| manager.handle_line(&line)), + ) + .await; + Ok(()) + } + Some(Err(err)) => { + if !child_killed { + error!( + "impossible to read output from stream {}: {}", + stream_name, err + ); + } + Err(()) + } + None => { + if !child_killed { + error!("stream {} exited: its command returned.", stream_name); + } + Err(()) + } + } +} diff --git a/tests/test-shutdown.jsonnet b/tests/test-shutdown.jsonnet new file mode 100644 index 0000000..2f68c7a --- /dev/null +++ b/tests/test-shutdown.jsonnet @@ -0,0 +1,26 @@ +{ + patterns: { + zero: { + regex: @'0', + }, + }, + + streams: { + idle: { + cmd: ['sh', '-c', 'while true; do sleep 1; done'], + filters: { + filt1: { + regex: [ + @'abc', + ], + actions: { + act: { + cmd: ['echo', '1'], + }, + }, + }, + }, + }, + }, +} +