diff --git a/TODO b/TODO index a6aa816..2b6071f 100644 --- a/TODO +++ b/TODO @@ -1,4 +1,3 @@ Test what happens when a Filter's pattern Set changes (I think it's shitty) DB: add tests on stress testing (lines should always be in order) conf: merge filters -plugins: pipe stderr too and wrap errors in logs. fix errors? diff --git a/src/daemon/plugin/mod.rs b/src/daemon/plugin/mod.rs index aeaf1a9..b9e7b88 100644 --- a/src/daemon/plugin/mod.rs +++ b/src/daemon/plugin/mod.rs @@ -1,16 +1,18 @@ use std::{ collections::{BTreeMap, BTreeSet}, + io, ops::{Deref, DerefMut}, + process::ExitStatus, time::Duration, }; -use futures::{FutureExt, StreamExt, future::join_all}; +use futures::{StreamExt, future::join_all}; use reaction_plugin::{ActionImpl, Hello, PluginInfo, PluginInfoClient, StreamImpl}; use remoc::Connect; use serde_json::Value; use tokio::{ process::{Child, ChildStderr}, - time::sleep, + time::timeout, }; use tracing::{error, info}; @@ -53,8 +55,8 @@ impl PluginManager { { let stderr = child.stderr.take().unwrap(); - let shutdown = shutdown.clone(); - tokio::spawn(async move { handle_stderr(stderr, plugin.name.clone(), shutdown).await }); + // let shutdown = shutdown.clone(); + tokio::spawn(async move { handle_stderr(stderr, plugin.name.clone()).await }); } let stdin = child.stdin.take().unwrap(); @@ -114,36 +116,61 @@ impl PluginManager { const PLUGIN_STOP_GRACE_TIME: u64 = 15; // wait either for the child process to exit on its own or for the shutdown signal - futures::select! { - _ = self.child.wait().fuse() => { - error!("plugin {} exited: its command returned.", self.plugin.name); + tokio::select! { + status = self.child.wait() => { + self.print_exit(status); return; } - _ = self.shutdown.wait().fuse() => {} + _ = self.shutdown.wait() => {} } - futures::select! { - _ = self.plugin_info.close().fuse() => { - return; - }, - _ = sleep(Duration::from_secs(PLUGIN_STOP_GRACE_TIME)).fuse() => { - error!("plugin {} did not respond to close request in time, killing", self.plugin.name) - }, + match timeout( + Duration::from_secs(PLUGIN_STOP_GRACE_TIME), + self.plugin_info.close(), + ) + .await + { + Ok(Ok(())) => (), + Ok(Err(err)) => { + error!("plugin {}: {err}", self.plugin.name); + } + // got timeout + Err(_) => { + error!( + "plugin {} did not respond to close request in time, killing", + self.plugin.name + ); + kill_child(self.child, format!("plugin {}", self.plugin.name), 5).await; + } } + } - kill_child(self.child, format!("plugin {}", self.plugin.name), 5).await; + fn print_exit(&self, status: io::Result) { + match status { + Ok(status) => match status.code() { + Some(code) => { + error!( + "plugin {}: process exited. exit code: {}", + self.plugin.name, code + ); + } + None => { + error!("plugin {}: process exited.", self.plugin.name); + } + }, + Err(err) => { + error!("plugin {}: process exited. {err}", self.plugin.name); + } + } } } -async fn handle_stderr(stderr: ChildStderr, plugin_name: String, shutdown: ShutdownToken) { +async fn handle_stderr(stderr: ChildStderr, plugin_name: String) { + // read lines until shutdown let lines = reader_to_stream(stderr); tokio::pin!(lines); loop { - let event = tokio::select! { - line = lines.next() => line, - _ = shutdown.wait() => None, - }; - match event { + match lines.next().await { Some(Ok(line)) => { // sad: I can't factorize this because the tracing::event! macro // requires its log level to be a constant. @@ -161,7 +188,7 @@ async fn handle_stderr(stderr: ChildStderr, plugin_name: String, shutdown: Shutd } } Some(Err(err)) => { - error!("while trying to read plugin {plugin_name} stderr: {err}"); + tracing::error!("while trying to read plugin {plugin_name} stderr: {err}"); break; } None => break, @@ -240,7 +267,12 @@ impl Plugins { plugin .stream_impl(stream_name.clone(), stream_type, config.into()) .await - .map_err(|err| format!("plugin error while initializing stream {stream_name}: {err}")) + .map_err(|err| { + format!( + "plugin error while initializing stream {stream_name}: {}", + err.to_string().replace('\n', " ") + ) + }) } pub async fn init_action_impl( @@ -274,7 +306,9 @@ impl Plugins { patterns, ) .await - .map_err(|err| format!("plugin error while initializing action {stream_name}.{filter_name}.{action_name}: {err}")) + .map_err(|err| format!("plugin error while initializing action {stream_name}.{filter_name}.{action_name}: {}", + err.to_string().replace('\n', " ") + )) } pub async fn finish_setup(&mut self) -> Result<(), String> { @@ -289,7 +323,13 @@ impl Plugins { .into_iter() .zip(self.plugins.values()) .try_for_each(|(result, plugin_manager)| { - result.map_err(|err| format!("plugin {} error: {err}", plugin_manager.plugin.name)) + result.map_err(|err| { + format!( + "plugin {}: {}", + plugin_manager.plugin.name, + err.to_string().replace('\n', " ") + ) + }) }) } diff --git a/src/daemon/utils.rs b/src/daemon/utils.rs index 0ef3986..4ec94ae 100644 --- a/src/daemon/utils.rs +++ b/src/daemon/utils.rs @@ -1,7 +1,6 @@ use std::time::Duration; -use futures::FutureExt; -use tokio::{process::Child, time::sleep}; +use tokio::{process::Child, time::timeout}; use tracing::{error, warn}; pub async fn kill_child(mut child: Child, context: String, grace_time_sec: u64) { @@ -15,14 +14,11 @@ pub async fn kill_child(mut child: Child, context: String, grace_time_sec: u64) // but we still need to reclaim it with Child::wait let _ = nix::sys::signal::kill(pid, nix::sys::signal::SIGTERM); - futures::select! { - _ = child.wait().fuse() => { - return; - }, - _ = sleep(Duration::from_secs(grace_time_sec)).fuse() => {}, + if let Ok(_) = timeout(Duration::from_secs(grace_time_sec), child.wait()).await { + return; } } else { - warn!("could not get PID of child process for {}", context); + warn!("could not get PID of child process for {context}"); // still try to use tokio API to kill and reclaim the child process } @@ -33,12 +29,23 @@ pub async fn kill_child(mut child: Child, context: String, grace_time_sec: u64) // as before, the only expected error is that the child process already terminated // but we still need to reclaim it if that's the case. + warn!("process for {context} didn't exit {grace_time_sec}s after SIGTERM, sending SIGKILL"); let _ = child.start_kill(); - futures::select! { - _ = child.wait().fuse() => {} - _ = sleep(Duration::from_secs(STREAM_PROCESS_KILL_WAIT_TIMEOUT_SEC)).fuse() => { - error!("child process of {} did not terminate", context); - } + match timeout( + Duration::from_secs(STREAM_PROCESS_KILL_WAIT_TIMEOUT_SEC), + child.wait(), + ) + .await + { + Ok(_) => {} + Err(_) => match child.id() { + Some(id) => { + error!("child process of {context} did not terminate. PID: {id}"); + } + None => { + error!("child process of {context} did not terminate"); + } + }, } }