mirror of
https://framagit.org/ppom/reaction
synced 2026-03-14 12:45:47 +01:00
Better plugin process management
following stderr: task doesn't use shutdown anymore. It will simply follow
stderr until the end of reaction, which at worst is a negligible
memory leak if reaction continues running.
I tried closing stderr on the plugin side with a raw syscall of the file
descriptor, but reaction side doesn't see that stderr is closed.
So I can't rely on that.
Quitting when shutdown.wait() returns is too early, because that's also
what makes reaction asking for the plugin to close(), and it can print
important logs during its shutdown.
The task ignoring all the shutdown part is dead simple and is most likely
correct everytime.
updated the wording of plugin-related errors.
also replaced futures::select! { future, sleep() } with more concise and
macro-less tokio::timeout.
This commit is contained in:
parent
b2c0c19997
commit
5817d52678
3 changed files with 86 additions and 40 deletions
1
TODO
1
TODO
|
|
@ -1,4 +1,3 @@
|
|||
Test what happens when a Filter's pattern Set changes (I think it's shitty)
|
||||
DB: add tests on stress testing (lines should always be in order)
|
||||
conf: merge filters
|
||||
plugins: pipe stderr too and wrap errors in logs. fix errors?
|
||||
|
|
|
|||
|
|
@ -1,16 +1,18 @@
|
|||
use std::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
io,
|
||||
ops::{Deref, DerefMut},
|
||||
process::ExitStatus,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use futures::{FutureExt, StreamExt, future::join_all};
|
||||
use futures::{StreamExt, future::join_all};
|
||||
use reaction_plugin::{ActionImpl, Hello, PluginInfo, PluginInfoClient, StreamImpl};
|
||||
use remoc::Connect;
|
||||
use serde_json::Value;
|
||||
use tokio::{
|
||||
process::{Child, ChildStderr},
|
||||
time::sleep,
|
||||
time::timeout,
|
||||
};
|
||||
use tracing::{error, info};
|
||||
|
||||
|
|
@ -53,8 +55,8 @@ impl PluginManager {
|
|||
|
||||
{
|
||||
let stderr = child.stderr.take().unwrap();
|
||||
let shutdown = shutdown.clone();
|
||||
tokio::spawn(async move { handle_stderr(stderr, plugin.name.clone(), shutdown).await });
|
||||
// let shutdown = shutdown.clone();
|
||||
tokio::spawn(async move { handle_stderr(stderr, plugin.name.clone()).await });
|
||||
}
|
||||
|
||||
let stdin = child.stdin.take().unwrap();
|
||||
|
|
@ -114,36 +116,61 @@ impl PluginManager {
|
|||
const PLUGIN_STOP_GRACE_TIME: u64 = 15;
|
||||
|
||||
// wait either for the child process to exit on its own or for the shutdown signal
|
||||
futures::select! {
|
||||
_ = self.child.wait().fuse() => {
|
||||
error!("plugin {} exited: its command returned.", self.plugin.name);
|
||||
tokio::select! {
|
||||
status = self.child.wait() => {
|
||||
self.print_exit(status);
|
||||
return;
|
||||
}
|
||||
_ = self.shutdown.wait().fuse() => {}
|
||||
_ = self.shutdown.wait() => {}
|
||||
}
|
||||
|
||||
futures::select! {
|
||||
_ = self.plugin_info.close().fuse() => {
|
||||
return;
|
||||
},
|
||||
_ = sleep(Duration::from_secs(PLUGIN_STOP_GRACE_TIME)).fuse() => {
|
||||
error!("plugin {} did not respond to close request in time, killing", self.plugin.name)
|
||||
},
|
||||
match timeout(
|
||||
Duration::from_secs(PLUGIN_STOP_GRACE_TIME),
|
||||
self.plugin_info.close(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(())) => (),
|
||||
Ok(Err(err)) => {
|
||||
error!("plugin {}: {err}", self.plugin.name);
|
||||
}
|
||||
// got timeout
|
||||
Err(_) => {
|
||||
error!(
|
||||
"plugin {} did not respond to close request in time, killing",
|
||||
self.plugin.name
|
||||
);
|
||||
kill_child(self.child, format!("plugin {}", self.plugin.name), 5).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
kill_child(self.child, format!("plugin {}", self.plugin.name), 5).await;
|
||||
fn print_exit(&self, status: io::Result<ExitStatus>) {
|
||||
match status {
|
||||
Ok(status) => match status.code() {
|
||||
Some(code) => {
|
||||
error!(
|
||||
"plugin {}: process exited. exit code: {}",
|
||||
self.plugin.name, code
|
||||
);
|
||||
}
|
||||
None => {
|
||||
error!("plugin {}: process exited.", self.plugin.name);
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
error!("plugin {}: process exited. {err}", self.plugin.name);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_stderr(stderr: ChildStderr, plugin_name: String, shutdown: ShutdownToken) {
|
||||
async fn handle_stderr(stderr: ChildStderr, plugin_name: String) {
|
||||
// read lines until shutdown
|
||||
let lines = reader_to_stream(stderr);
|
||||
tokio::pin!(lines);
|
||||
loop {
|
||||
let event = tokio::select! {
|
||||
line = lines.next() => line,
|
||||
_ = shutdown.wait() => None,
|
||||
};
|
||||
match event {
|
||||
match lines.next().await {
|
||||
Some(Ok(line)) => {
|
||||
// sad: I can't factorize this because the tracing::event! macro
|
||||
// requires its log level to be a constant.
|
||||
|
|
@ -161,7 +188,7 @@ async fn handle_stderr(stderr: ChildStderr, plugin_name: String, shutdown: Shutd
|
|||
}
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
error!("while trying to read plugin {plugin_name} stderr: {err}");
|
||||
tracing::error!("while trying to read plugin {plugin_name} stderr: {err}");
|
||||
break;
|
||||
}
|
||||
None => break,
|
||||
|
|
@ -240,7 +267,12 @@ impl Plugins {
|
|||
plugin
|
||||
.stream_impl(stream_name.clone(), stream_type, config.into())
|
||||
.await
|
||||
.map_err(|err| format!("plugin error while initializing stream {stream_name}: {err}"))
|
||||
.map_err(|err| {
|
||||
format!(
|
||||
"plugin error while initializing stream {stream_name}: {}",
|
||||
err.to_string().replace('\n', " ")
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn init_action_impl(
|
||||
|
|
@ -274,7 +306,9 @@ impl Plugins {
|
|||
patterns,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| format!("plugin error while initializing action {stream_name}.{filter_name}.{action_name}: {err}"))
|
||||
.map_err(|err| format!("plugin error while initializing action {stream_name}.{filter_name}.{action_name}: {}",
|
||||
err.to_string().replace('\n', " ")
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn finish_setup(&mut self) -> Result<(), String> {
|
||||
|
|
@ -289,7 +323,13 @@ impl Plugins {
|
|||
.into_iter()
|
||||
.zip(self.plugins.values())
|
||||
.try_for_each(|(result, plugin_manager)| {
|
||||
result.map_err(|err| format!("plugin {} error: {err}", plugin_manager.plugin.name))
|
||||
result.map_err(|err| {
|
||||
format!(
|
||||
"plugin {}: {}",
|
||||
plugin_manager.plugin.name,
|
||||
err.to_string().replace('\n', " ")
|
||||
)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use futures::FutureExt;
|
||||
use tokio::{process::Child, time::sleep};
|
||||
use tokio::{process::Child, time::timeout};
|
||||
use tracing::{error, warn};
|
||||
|
||||
pub async fn kill_child(mut child: Child, context: String, grace_time_sec: u64) {
|
||||
|
|
@ -15,14 +14,11 @@ pub async fn kill_child(mut child: Child, context: String, grace_time_sec: u64)
|
|||
// but we still need to reclaim it with Child::wait
|
||||
let _ = nix::sys::signal::kill(pid, nix::sys::signal::SIGTERM);
|
||||
|
||||
futures::select! {
|
||||
_ = child.wait().fuse() => {
|
||||
return;
|
||||
},
|
||||
_ = sleep(Duration::from_secs(grace_time_sec)).fuse() => {},
|
||||
if let Ok(_) = timeout(Duration::from_secs(grace_time_sec), child.wait()).await {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
warn!("could not get PID of child process for {}", context);
|
||||
warn!("could not get PID of child process for {context}");
|
||||
// still try to use tokio API to kill and reclaim the child process
|
||||
}
|
||||
|
||||
|
|
@ -33,12 +29,23 @@ pub async fn kill_child(mut child: Child, context: String, grace_time_sec: u64)
|
|||
|
||||
// as before, the only expected error is that the child process already terminated
|
||||
// but we still need to reclaim it if that's the case.
|
||||
warn!("process for {context} didn't exit {grace_time_sec}s after SIGTERM, sending SIGKILL");
|
||||
let _ = child.start_kill();
|
||||
|
||||
futures::select! {
|
||||
_ = child.wait().fuse() => {}
|
||||
_ = sleep(Duration::from_secs(STREAM_PROCESS_KILL_WAIT_TIMEOUT_SEC)).fuse() => {
|
||||
error!("child process of {} did not terminate", context);
|
||||
}
|
||||
match timeout(
|
||||
Duration::from_secs(STREAM_PROCESS_KILL_WAIT_TIMEOUT_SEC),
|
||||
child.wait(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {}
|
||||
Err(_) => match child.id() {
|
||||
Some(id) => {
|
||||
error!("child process of {context} did not terminate. PID: {id}");
|
||||
}
|
||||
None => {
|
||||
error!("child process of {context} did not terminate");
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue