From 915e30801558bd650401b446d04d8823cc0b64ce Mon Sep 17 00:00:00 2001 From: ppom Date: Tue, 3 Feb 2026 12:00:00 +0100 Subject: [PATCH] Better plugin process management following stderr: task doesn't use shutdown anymore. It will simply follow stderr until the end of reaction, which at worst is a negligible memory leak if reaction continues running. I tried closing stderr on the plugin side with a raw syscall of the file descriptor, but reaction side doesn't see that stderr is closed. So I can't rely on that. Quitting when shutdown.wait() returns is too early, because that's also what makes reaction asking for the plugin to close(), and it can print important logs during its shutdown. The task ignoring all the shutdown part is dead simple and is most likely correct everytime. updated the wording of plugin-related errors. also replaced futures::select! { future, sleep() } with more concise and macro-less tokio::timeout. --- TODO | 1 - src/daemon/plugin/mod.rs | 92 ++++++++++++++++++++++++++++------------ src/daemon/utils.rs | 33 ++++++++------ 3 files changed, 86 insertions(+), 40 deletions(-) diff --git a/TODO b/TODO index a6aa816..2b6071f 100644 --- a/TODO +++ b/TODO @@ -1,4 +1,3 @@ Test what happens when a Filter's pattern Set changes (I think it's shitty) DB: add tests on stress testing (lines should always be in order) conf: merge filters -plugins: pipe stderr too and wrap errors in logs. fix errors? diff --git a/src/daemon/plugin/mod.rs b/src/daemon/plugin/mod.rs index aeaf1a9..b9e7b88 100644 --- a/src/daemon/plugin/mod.rs +++ b/src/daemon/plugin/mod.rs @@ -1,16 +1,18 @@ use std::{ collections::{BTreeMap, BTreeSet}, + io, ops::{Deref, DerefMut}, + process::ExitStatus, time::Duration, }; -use futures::{FutureExt, StreamExt, future::join_all}; +use futures::{StreamExt, future::join_all}; use reaction_plugin::{ActionImpl, Hello, PluginInfo, PluginInfoClient, StreamImpl}; use remoc::Connect; use serde_json::Value; use tokio::{ process::{Child, ChildStderr}, - time::sleep, + time::timeout, }; use tracing::{error, info}; @@ -53,8 +55,8 @@ impl PluginManager { { let stderr = child.stderr.take().unwrap(); - let shutdown = shutdown.clone(); - tokio::spawn(async move { handle_stderr(stderr, plugin.name.clone(), shutdown).await }); + // let shutdown = shutdown.clone(); + tokio::spawn(async move { handle_stderr(stderr, plugin.name.clone()).await }); } let stdin = child.stdin.take().unwrap(); @@ -114,36 +116,61 @@ impl PluginManager { const PLUGIN_STOP_GRACE_TIME: u64 = 15; // wait either for the child process to exit on its own or for the shutdown signal - futures::select! { - _ = self.child.wait().fuse() => { - error!("plugin {} exited: its command returned.", self.plugin.name); + tokio::select! { + status = self.child.wait() => { + self.print_exit(status); return; } - _ = self.shutdown.wait().fuse() => {} + _ = self.shutdown.wait() => {} } - futures::select! { - _ = self.plugin_info.close().fuse() => { - return; - }, - _ = sleep(Duration::from_secs(PLUGIN_STOP_GRACE_TIME)).fuse() => { - error!("plugin {} did not respond to close request in time, killing", self.plugin.name) - }, + match timeout( + Duration::from_secs(PLUGIN_STOP_GRACE_TIME), + self.plugin_info.close(), + ) + .await + { + Ok(Ok(())) => (), + Ok(Err(err)) => { + error!("plugin {}: {err}", self.plugin.name); + } + // got timeout + Err(_) => { + error!( + "plugin {} did not respond to close request in time, killing", + self.plugin.name + ); + kill_child(self.child, format!("plugin {}", self.plugin.name), 5).await; + } } + } - kill_child(self.child, format!("plugin {}", self.plugin.name), 5).await; + fn print_exit(&self, status: io::Result) { + match status { + Ok(status) => match status.code() { + Some(code) => { + error!( + "plugin {}: process exited. exit code: {}", + self.plugin.name, code + ); + } + None => { + error!("plugin {}: process exited.", self.plugin.name); + } + }, + Err(err) => { + error!("plugin {}: process exited. {err}", self.plugin.name); + } + } } } -async fn handle_stderr(stderr: ChildStderr, plugin_name: String, shutdown: ShutdownToken) { +async fn handle_stderr(stderr: ChildStderr, plugin_name: String) { + // read lines until shutdown let lines = reader_to_stream(stderr); tokio::pin!(lines); loop { - let event = tokio::select! { - line = lines.next() => line, - _ = shutdown.wait() => None, - }; - match event { + match lines.next().await { Some(Ok(line)) => { // sad: I can't factorize this because the tracing::event! macro // requires its log level to be a constant. @@ -161,7 +188,7 @@ async fn handle_stderr(stderr: ChildStderr, plugin_name: String, shutdown: Shutd } } Some(Err(err)) => { - error!("while trying to read plugin {plugin_name} stderr: {err}"); + tracing::error!("while trying to read plugin {plugin_name} stderr: {err}"); break; } None => break, @@ -240,7 +267,12 @@ impl Plugins { plugin .stream_impl(stream_name.clone(), stream_type, config.into()) .await - .map_err(|err| format!("plugin error while initializing stream {stream_name}: {err}")) + .map_err(|err| { + format!( + "plugin error while initializing stream {stream_name}: {}", + err.to_string().replace('\n', " ") + ) + }) } pub async fn init_action_impl( @@ -274,7 +306,9 @@ impl Plugins { patterns, ) .await - .map_err(|err| format!("plugin error while initializing action {stream_name}.{filter_name}.{action_name}: {err}")) + .map_err(|err| format!("plugin error while initializing action {stream_name}.{filter_name}.{action_name}: {}", + err.to_string().replace('\n', " ") + )) } pub async fn finish_setup(&mut self) -> Result<(), String> { @@ -289,7 +323,13 @@ impl Plugins { .into_iter() .zip(self.plugins.values()) .try_for_each(|(result, plugin_manager)| { - result.map_err(|err| format!("plugin {} error: {err}", plugin_manager.plugin.name)) + result.map_err(|err| { + format!( + "plugin {}: {}", + plugin_manager.plugin.name, + err.to_string().replace('\n', " ") + ) + }) }) } diff --git a/src/daemon/utils.rs b/src/daemon/utils.rs index 0ef3986..4ec94ae 100644 --- a/src/daemon/utils.rs +++ b/src/daemon/utils.rs @@ -1,7 +1,6 @@ use std::time::Duration; -use futures::FutureExt; -use tokio::{process::Child, time::sleep}; +use tokio::{process::Child, time::timeout}; use tracing::{error, warn}; pub async fn kill_child(mut child: Child, context: String, grace_time_sec: u64) { @@ -15,14 +14,11 @@ pub async fn kill_child(mut child: Child, context: String, grace_time_sec: u64) // but we still need to reclaim it with Child::wait let _ = nix::sys::signal::kill(pid, nix::sys::signal::SIGTERM); - futures::select! { - _ = child.wait().fuse() => { - return; - }, - _ = sleep(Duration::from_secs(grace_time_sec)).fuse() => {}, + if let Ok(_) = timeout(Duration::from_secs(grace_time_sec), child.wait()).await { + return; } } else { - warn!("could not get PID of child process for {}", context); + warn!("could not get PID of child process for {context}"); // still try to use tokio API to kill and reclaim the child process } @@ -33,12 +29,23 @@ pub async fn kill_child(mut child: Child, context: String, grace_time_sec: u64) // as before, the only expected error is that the child process already terminated // but we still need to reclaim it if that's the case. + warn!("process for {context} didn't exit {grace_time_sec}s after SIGTERM, sending SIGKILL"); let _ = child.start_kill(); - futures::select! { - _ = child.wait().fuse() => {} - _ = sleep(Duration::from_secs(STREAM_PROCESS_KILL_WAIT_TIMEOUT_SEC)).fuse() => { - error!("child process of {} did not terminate", context); - } + match timeout( + Duration::from_secs(STREAM_PROCESS_KILL_WAIT_TIMEOUT_SEC), + child.wait(), + ) + .await + { + Ok(_) => {} + Err(_) => match child.id() { + Some(id) => { + error!("child process of {context} did not terminate. PID: {id}"); + } + None => { + error!("child process of {context} did not terminate"); + } + }, } }