Better plugin process management

following stderr: task doesn't use shutdown anymore. It will simply follow
stderr until the end of reaction, which at worst is a negligible
memory leak if reaction continues running.
I tried closing stderr on the plugin side with a raw syscall of the file
descriptor, but reaction side doesn't see that stderr is closed.
So I can't rely on that.
Quitting when shutdown.wait() returns is too early, because that's also
what makes reaction asking for the plugin to close(), and it can print
important logs during its shutdown.
The task ignoring all the shutdown part is dead simple and is most likely
correct everytime.

updated the wording of plugin-related errors.

also replaced futures::select! { future, sleep() } with more concise and
macro-less tokio::timeout.
This commit is contained in:
ppom 2026-02-03 12:00:00 +01:00
commit 915e308015
No known key found for this signature in database
3 changed files with 86 additions and 40 deletions

1
TODO
View file

@ -1,4 +1,3 @@
Test what happens when a Filter's pattern Set changes (I think it's shitty) Test what happens when a Filter's pattern Set changes (I think it's shitty)
DB: add tests on stress testing (lines should always be in order) DB: add tests on stress testing (lines should always be in order)
conf: merge filters conf: merge filters
plugins: pipe stderr too and wrap errors in logs. fix errors?

View file

@ -1,16 +1,18 @@
use std::{ use std::{
collections::{BTreeMap, BTreeSet}, collections::{BTreeMap, BTreeSet},
io,
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
process::ExitStatus,
time::Duration, time::Duration,
}; };
use futures::{FutureExt, StreamExt, future::join_all}; use futures::{StreamExt, future::join_all};
use reaction_plugin::{ActionImpl, Hello, PluginInfo, PluginInfoClient, StreamImpl}; use reaction_plugin::{ActionImpl, Hello, PluginInfo, PluginInfoClient, StreamImpl};
use remoc::Connect; use remoc::Connect;
use serde_json::Value; use serde_json::Value;
use tokio::{ use tokio::{
process::{Child, ChildStderr}, process::{Child, ChildStderr},
time::sleep, time::timeout,
}; };
use tracing::{error, info}; use tracing::{error, info};
@ -53,8 +55,8 @@ impl PluginManager {
{ {
let stderr = child.stderr.take().unwrap(); let stderr = child.stderr.take().unwrap();
let shutdown = shutdown.clone(); // let shutdown = shutdown.clone();
tokio::spawn(async move { handle_stderr(stderr, plugin.name.clone(), shutdown).await }); tokio::spawn(async move { handle_stderr(stderr, plugin.name.clone()).await });
} }
let stdin = child.stdin.take().unwrap(); let stdin = child.stdin.take().unwrap();
@ -114,36 +116,61 @@ impl PluginManager {
const PLUGIN_STOP_GRACE_TIME: u64 = 15; const PLUGIN_STOP_GRACE_TIME: u64 = 15;
// wait either for the child process to exit on its own or for the shutdown signal // wait either for the child process to exit on its own or for the shutdown signal
futures::select! { tokio::select! {
_ = self.child.wait().fuse() => { status = self.child.wait() => {
error!("plugin {} exited: its command returned.", self.plugin.name); self.print_exit(status);
return; return;
} }
_ = self.shutdown.wait().fuse() => {} _ = self.shutdown.wait() => {}
} }
futures::select! { match timeout(
_ = self.plugin_info.close().fuse() => { Duration::from_secs(PLUGIN_STOP_GRACE_TIME),
return; self.plugin_info.close(),
}, )
_ = sleep(Duration::from_secs(PLUGIN_STOP_GRACE_TIME)).fuse() => { .await
error!("plugin {} did not respond to close request in time, killing", self.plugin.name) {
}, Ok(Ok(())) => (),
Ok(Err(err)) => {
error!("plugin {}: {err}", self.plugin.name);
}
// got timeout
Err(_) => {
error!(
"plugin {} did not respond to close request in time, killing",
self.plugin.name
);
kill_child(self.child, format!("plugin {}", self.plugin.name), 5).await;
}
} }
}
kill_child(self.child, format!("plugin {}", self.plugin.name), 5).await; fn print_exit(&self, status: io::Result<ExitStatus>) {
match status {
Ok(status) => match status.code() {
Some(code) => {
error!(
"plugin {}: process exited. exit code: {}",
self.plugin.name, code
);
}
None => {
error!("plugin {}: process exited.", self.plugin.name);
}
},
Err(err) => {
error!("plugin {}: process exited. {err}", self.plugin.name);
}
}
} }
} }
async fn handle_stderr(stderr: ChildStderr, plugin_name: String, shutdown: ShutdownToken) { async fn handle_stderr(stderr: ChildStderr, plugin_name: String) {
// read lines until shutdown
let lines = reader_to_stream(stderr); let lines = reader_to_stream(stderr);
tokio::pin!(lines); tokio::pin!(lines);
loop { loop {
let event = tokio::select! { match lines.next().await {
line = lines.next() => line,
_ = shutdown.wait() => None,
};
match event {
Some(Ok(line)) => { Some(Ok(line)) => {
// sad: I can't factorize this because the tracing::event! macro // sad: I can't factorize this because the tracing::event! macro
// requires its log level to be a constant. // requires its log level to be a constant.
@ -161,7 +188,7 @@ async fn handle_stderr(stderr: ChildStderr, plugin_name: String, shutdown: Shutd
} }
} }
Some(Err(err)) => { Some(Err(err)) => {
error!("while trying to read plugin {plugin_name} stderr: {err}"); tracing::error!("while trying to read plugin {plugin_name} stderr: {err}");
break; break;
} }
None => break, None => break,
@ -240,7 +267,12 @@ impl Plugins {
plugin plugin
.stream_impl(stream_name.clone(), stream_type, config.into()) .stream_impl(stream_name.clone(), stream_type, config.into())
.await .await
.map_err(|err| format!("plugin error while initializing stream {stream_name}: {err}")) .map_err(|err| {
format!(
"plugin error while initializing stream {stream_name}: {}",
err.to_string().replace('\n', " ")
)
})
} }
pub async fn init_action_impl( pub async fn init_action_impl(
@ -274,7 +306,9 @@ impl Plugins {
patterns, patterns,
) )
.await .await
.map_err(|err| format!("plugin error while initializing action {stream_name}.{filter_name}.{action_name}: {err}")) .map_err(|err| format!("plugin error while initializing action {stream_name}.{filter_name}.{action_name}: {}",
err.to_string().replace('\n', " ")
))
} }
pub async fn finish_setup(&mut self) -> Result<(), String> { pub async fn finish_setup(&mut self) -> Result<(), String> {
@ -289,7 +323,13 @@ impl Plugins {
.into_iter() .into_iter()
.zip(self.plugins.values()) .zip(self.plugins.values())
.try_for_each(|(result, plugin_manager)| { .try_for_each(|(result, plugin_manager)| {
result.map_err(|err| format!("plugin {} error: {err}", plugin_manager.plugin.name)) result.map_err(|err| {
format!(
"plugin {}: {}",
plugin_manager.plugin.name,
err.to_string().replace('\n', " ")
)
})
}) })
} }

View file

@ -1,7 +1,6 @@
use std::time::Duration; use std::time::Duration;
use futures::FutureExt; use tokio::{process::Child, time::timeout};
use tokio::{process::Child, time::sleep};
use tracing::{error, warn}; use tracing::{error, warn};
pub async fn kill_child(mut child: Child, context: String, grace_time_sec: u64) { pub async fn kill_child(mut child: Child, context: String, grace_time_sec: u64) {
@ -15,14 +14,11 @@ pub async fn kill_child(mut child: Child, context: String, grace_time_sec: u64)
// but we still need to reclaim it with Child::wait // but we still need to reclaim it with Child::wait
let _ = nix::sys::signal::kill(pid, nix::sys::signal::SIGTERM); let _ = nix::sys::signal::kill(pid, nix::sys::signal::SIGTERM);
futures::select! { if let Ok(_) = timeout(Duration::from_secs(grace_time_sec), child.wait()).await {
_ = child.wait().fuse() => { return;
return;
},
_ = sleep(Duration::from_secs(grace_time_sec)).fuse() => {},
} }
} else { } else {
warn!("could not get PID of child process for {}", context); warn!("could not get PID of child process for {context}");
// still try to use tokio API to kill and reclaim the child process // still try to use tokio API to kill and reclaim the child process
} }
@ -33,12 +29,23 @@ pub async fn kill_child(mut child: Child, context: String, grace_time_sec: u64)
// as before, the only expected error is that the child process already terminated // as before, the only expected error is that the child process already terminated
// but we still need to reclaim it if that's the case. // but we still need to reclaim it if that's the case.
warn!("process for {context} didn't exit {grace_time_sec}s after SIGTERM, sending SIGKILL");
let _ = child.start_kill(); let _ = child.start_kill();
futures::select! { match timeout(
_ = child.wait().fuse() => {} Duration::from_secs(STREAM_PROCESS_KILL_WAIT_TIMEOUT_SEC),
_ = sleep(Duration::from_secs(STREAM_PROCESS_KILL_WAIT_TIMEOUT_SEC)).fuse() => { child.wait(),
error!("child process of {} did not terminate", context); )
} .await
{
Ok(_) => {}
Err(_) => match child.id() {
Some(id) => {
error!("child process of {context} did not terminate. PID: {id}");
}
None => {
error!("child process of {context} did not terminate");
}
},
} }
} }