Fix #110: don't show error message on shutdown

The stream_manager is now the sole owner of the child process handle and
is responsible for cleaning it.

The stream_manager expects a shutdown command from the broadcast channel
it receives as parameter and watches it concurrently with the child process
I/O.

When the shutdown command is received, the child process is killed and the
remaining I/O is processed. Then the child process is reaped. Or it is
killed and reaped after EOF is encountered, would the child process exit
before (or at the same time as) the shutdown command is issued.
This commit is contained in:
Baptiste Careil 2024-11-13 23:00:56 +01:00 committed by ppom
commit 68637e35a7
3 changed files with 96 additions and 49 deletions

View file

@ -9,10 +9,9 @@ use std::{
};
use tokio::{
process::Child,
select,
signal::unix::{signal, SignalKind},
sync::{broadcast, mpsc, oneshot, Mutex, Semaphore},
sync::{broadcast, mpsc, Mutex, Semaphore},
};
use tracing::info;
@ -50,7 +49,6 @@ pub async fn daemon(
return Err("a start command failed, exiting.".into());
}
let mut stream_process_child_handles = Vec::new();
let mut stream_task_handles = Vec::new();
let (log_tx, log_rx) = mpsc::channel(234560);
@ -106,26 +104,18 @@ pub async fn daemon(
let stream_filter_managers = Arc::new(stream_filter_managers);
for (stream, filter_managers) in stream_filter_managers.s.iter() {
let (child_tx, child_rx) = oneshot::channel();
let filter_managers = filter_managers.clone();
let stream = *stream;
let stream_shutdow_rx = shutdown_rx.resubscribe();
stream_task_handles.push(tokio::spawn(async move {
stream_manager(stream, child_tx, filter_managers).await
stream_manager(stream, filter_managers, stream_shutdow_rx).await
}));
if let Ok(Some(child)) = child_rx.await {
stream_process_child_handles.push(child);
}
}
// Close streams when we receive a quit signal
let signal_received = Arc::new(AtomicBool::new(false));
handle_signals(
stream_process_child_handles,
shutdown_tx.clone(),
signal_received.clone(),
)?;
handle_signals(shutdown_tx.clone(), signal_received.clone())?;
let socket_manager_task_handle = {
let socket = socket.to_owned();
@ -155,7 +145,6 @@ pub async fn daemon(
}
fn handle_signals(
stream_process_child_handles: Vec<Child>,
shutdown_tx: broadcast::Sender<()>,
signal_received: Arc<AtomicBool>,
) -> tokio::io::Result<()> {
@ -168,13 +157,9 @@ fn handle_signals(
_ = sigint.recv() => "SIGINT",
_ = sigterm.recv() => "SIGTERM",
};
info!("received {signal}, closing streams...");
let _ = shutdown_tx.send(());
signal_received.store(true, Ordering::SeqCst);
info!("received {signal}, closing streams...");
// Kill stream subprocesses
for mut child_handle in stream_process_child_handles.into_iter() {
tokio::spawn(async move { child_handle.kill().await });
}
});
Ok(())
}

View file

@ -1,11 +1,11 @@
use std::{collections::HashMap, process::Stdio, sync::Arc, task::Poll};
use futures::StreamExt;
use futures::{pin_mut, FutureExt, StreamExt};
use tokio::{
io::{AsyncBufReadExt, BufReader, Lines},
pin,
process::{Child, Command},
sync::{oneshot, Mutex},
process::Command,
sync::{broadcast, Mutex},
};
use tracing::{error, info};
@ -28,8 +28,8 @@ fn lines_to_stream<T: tokio::io::AsyncBufRead + Unpin>(
pub async fn stream_manager(
stream: &'static Stream,
child_tx: oneshot::Sender<Option<Child>>,
filter_managers: Arc<Mutex<HashMap<&'static Filter, FilterManager>>>,
mut shutdown_rx: broadcast::Receiver<()>,
) {
info!("{}: start {:?}", stream.name(), stream.cmd());
let mut child = match Command::new(&stream.cmd()[0])
@ -42,7 +42,6 @@ pub async fn stream_manager(
Ok(child) => child,
Err(err) => {
error!("could not execute stream {} cmd: {}", stream.name(), err);
let _ = child_tx.send(None);
return;
}
};
@ -58,33 +57,34 @@ pub async fn stream_manager(
// aggregate outputs, will end when both streams end
let mut lines = futures::stream::select(lines_stdout, lines_stderr);
// let main handle the child process
let _ = child_tx.send(Some(child));
// fuse the future right after the first and only message we expect on this channel
let shutdown_watch = shutdown_rx.recv().fuse();
pin_mut!(shutdown_watch);
let mut child_killed = false;
loop {
match lines.next().await {
Some(Ok(line)) => {
futures::future::join_all(
filter_managers
.lock()
.await
.values_mut()
.map(|manager| manager.handle_line(&line)),
)
.await;
}
Some(Err(err)) => {
error!(
"impossible to read output from stream {}: {}",
stream.name(),
err
);
break;
}
None => {
error!("stream {} exited: its command returned.", stream.name());
break;
futures::select!(
_ = shutdown_watch => {
// reaction is shutting down, kill child process, we'll reap it after
// processing the remaining I/O
let _ = child.start_kill();
child_killed = true;
},
line = lines.next() => {
if handle_io(line, &filter_managers, stream.name(), child_killed).await.is_err() {
break;
}
}
);
}
// reap the child process if already dead or kill it (might issue sigkill twice with the one
// above).
match child.try_wait() {
Ok(Some(_)) => {}
_ => {
let _ = child.kill().await;
}
}
@ -94,3 +94,39 @@ pub async fn stream_manager(
.values_mut()
.for_each(|manager| manager.quit());
}
async fn handle_io(
line: Option<Result<String, std::io::Error>>,
filter_managers: &Arc<Mutex<HashMap<&'static Filter, FilterManager>>>,
stream_name: &str,
child_killed: bool,
) -> Result<(), ()> {
match line {
Some(Ok(line)) => {
futures::future::join_all(
filter_managers
.lock()
.await
.values_mut()
.map(|manager| manager.handle_line(&line)),
)
.await;
Ok(())
}
Some(Err(err)) => {
if !child_killed {
error!(
"impossible to read output from stream {}: {}",
stream_name, err
);
}
Err(())
}
None => {
if !child_killed {
error!("stream {} exited: its command returned.", stream_name);
}
Err(())
}
}
}

View file

@ -0,0 +1,26 @@
{
patterns: {
zero: {
regex: @'0',
},
},
streams: {
idle: {
cmd: ['sh', '-c', 'while true; do sleep 1; done'],
filters: {
filt1: {
regex: [
@'abc',
],
actions: {
act: {
cmd: ['echo', '1'],
},
},
},
},
},
},
}