From 635d1a052daa8de7d6b8ec373779592f1d2181b2 Mon Sep 17 00:00:00 2001 From: ppom Date: Fri, 10 Oct 2025 12:00:00 +0200 Subject: [PATCH] plugin improvements - fix panic of channel(0) - cleaner plugin interface with one level of Result - standalone metadata for stream plugins - new test for plugin virtual --- plugins/reaction-plugin-virtual/src/main.rs | 52 ++++++-------- plugins/reaction-plugin/src/lib.rs | 76 ++++++++++++++++++--- src/concepts/action.rs | 1 + src/concepts/plugin.rs | 1 + src/daemon/filter/mod.rs | 3 +- src/daemon/mod.rs | 13 +++- src/daemon/plugin/mod.rs | 47 +++++++------ src/daemon/stream.rs | 30 +++++--- tests/plugin_virtual.rs | 30 ++++++++ tests/test-conf/test-virtual.jsonnet | 58 ++++++++++++++++ 10 files changed, 236 insertions(+), 75 deletions(-) create mode 100644 tests/plugin_virtual.rs create mode 100644 tests/test-conf/test-virtual.jsonnet diff --git a/plugins/reaction-plugin-virtual/src/main.rs b/plugins/reaction-plugin-virtual/src/main.rs index 330a0b2..26fdd56 100644 --- a/plugins/reaction-plugin-virtual/src/main.rs +++ b/plugins/reaction-plugin-virtual/src/main.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use reaction_plugin::{ActionImpl, Exec, Line, PluginInfo, RemoteResult, StreamImpl, Value}; -use remoc::rch::mpsc; +use remoc::{rch::mpsc, rtc}; #[tokio::main] async fn main() { @@ -16,11 +16,11 @@ struct Plugin { } impl PluginInfo for Plugin { - async fn stream_impls(&self) -> RemoteResult> { + async fn stream_impls(&self) -> Result, rtc::CallError> { Ok(vec!["virtual".into()]) } - async fn action_impls(&self) -> RemoteResult> { + async fn action_impls(&self) -> Result, rtc::CallError> { Ok(vec!["virtual".into()]) } @@ -29,25 +29,21 @@ impl PluginInfo for Plugin { stream_name: String, stream_type: String, config: Value, - ) -> RemoteResult> { + ) -> RemoteResult { if stream_type != "virtual" { - return Ok(Err( - "This plugin can't handle other stream types than virtual".into(), - )); + return Err("This plugin can't handle other stream types than virtual".into()); } - let (virtual_stream, receiver) = match VirtualStream::new(config) { - Ok(v) => v, - Err(err) => return Ok(Err(err)), - }; + let (virtual_stream, receiver) = VirtualStream::new(config)?; if let Some(_) = self.streams.insert(stream_name, virtual_stream) { - return Ok(Err( - "this virtual stream has already been initialized".into() - )); + return Err("this virtual stream has already been initialized".into()); } - Ok(Ok(StreamImpl { stream: receiver })) + Ok(StreamImpl { + stream: receiver, + standalone: false, + }) } async fn action_impl( @@ -58,24 +54,19 @@ impl PluginInfo for Plugin { action_type: String, config: Value, patterns: Vec, - ) -> RemoteResult> { + ) -> RemoteResult { if &action_type != "virtual" { - return Ok(Err( - "This plugin can't handle other stream types than virtual".into(), - )); + return Err("This plugin can't handle other stream types than virtual".into()); } let (virtual_action_init, tx) = - match VirtualActionInit::new(stream_name, filter_name, action_name, config, patterns) { - Ok(v) => v, - Err(err) => return Ok(Err(err)), - }; + VirtualActionInit::new(stream_name, filter_name, action_name, config, patterns)?; self.actions_init.push(virtual_action_init); - Ok(Ok(ActionImpl { tx })) + Ok(ActionImpl { tx }) } - async fn finish_setup(&mut self) -> RemoteResult> { + async fn finish_setup(&mut self) -> RemoteResult<()> { while let Some(action_init) = self.actions_init.pop() { match self.streams.get(&action_init.to) { Some(virtual_stream) => { @@ -87,20 +78,21 @@ impl PluginInfo for Plugin { }); } None => { - return Ok(Err(format!( + return Err(format!( "action {}.{}.{}: send \"{}\" matches no stream name", action_init.stream_name, action_init.filter_name, action_init.action_name, action_init.to - ))); + ) + .into()); } } } self.streams = BTreeMap::new(); self.actions_init = Vec::new(); - Ok(Ok(())) + Ok(()) } async fn close(self) -> RemoteResult<()> { @@ -126,7 +118,7 @@ impl VirtualStream { _ => return Err(CONFIG_ERROR.into()), } - let (tx, rx) = mpsc::channel(0); + let (tx, rx) = mpsc::channel(2); Ok((Self { tx }, rx)) } } @@ -179,7 +171,7 @@ impl VirtualActionInit { .map(|pattern| format!("<{pattern}>")) .collect(); - let (tx, rx) = mpsc::channel(0); + let (tx, rx) = mpsc::channel(1); Ok(( Self { stream_name, diff --git a/plugins/reaction-plugin/src/lib.rs b/plugins/reaction-plugin/src/lib.rs index caccca3..5b3af6c 100644 --- a/plugins/reaction-plugin/src/lib.rs +++ b/plugins/reaction-plugin/src/lib.rs @@ -1,4 +1,4 @@ -use std::collections::BTreeMap; +use std::{collections::BTreeMap, error::Error, fmt::Display}; /// This crate defines the API between reaction's core and plugins. /// @@ -20,14 +20,24 @@ use tokio::io::{stdin, stdout}; #[rtc::remote] pub trait PluginInfo { /// Return all stream types that should be made available to reaction users - async fn stream_impls(&self) -> RemoteResult>; + /// ```jsonnet + /// { + /// streams: { + /// my_stream: { + /// type: "..." + /// # ↑ all those exposed types + /// } + /// } + /// } + /// ``` + async fn stream_impls(&self) -> Result, rtc::CallError>; /// Return one stream of a given type if it exists async fn stream_impl( &mut self, stream_name: String, stream_type: String, config: Value, - ) -> RemoteResult>; + ) -> RemoteResult; // /// Return all filter types that should be made available to reaction users // async fn filter_impls(&self) -> RemoteResult>; @@ -41,7 +51,7 @@ pub trait PluginInfo { // ) -> RemoteResult>; /// Return all action types that should be made available to reaction users - async fn action_impls(&self) -> RemoteResult>; + async fn action_impls(&self) -> Result, rtc::CallError>; /// Return one instance of a given type. async fn action_impl( &mut self, @@ -51,17 +61,15 @@ pub trait PluginInfo { action_type: String, config: Value, patterns: Vec, - ) -> RemoteResult>; + ) -> RemoteResult; /// Notify the plugin that setup is finished, permitting a last occasion to report an error /// (For example if a stream wants a companion action but it hasn't been initialized) - async fn finish_setup(&mut self) -> RemoteResult>; + async fn finish_setup(&mut self) -> RemoteResult<()>; async fn close(mut self) -> RemoteResult<()>; } -pub type RemoteResult = Result; - /// Represents a configuration value. /// This is not meant as an efficient type, but as a very flexible one. #[derive(Serialize, Deserialize)] @@ -78,6 +86,15 @@ pub enum Value { #[derive(Serialize, Deserialize)] pub struct StreamImpl { pub stream: rch::mpsc::Receiver, + /// Whether this stream works standalone, or if it needs other streams to be fed. + /// Defaults to true. + /// When false, reaction will exit if it's the last one standing. + #[serde(default = "_true")] + pub standalone: bool, +} + +fn _true() -> bool { + true } pub type Line = Result; @@ -110,7 +127,7 @@ pub async fn main_loop(plugin_info: T) { _, remoc::rch::base::Sender, remoc::rch::base::Receiver<()>, - ) = Connect::io_buffered(remoc::Cfg::default(), stdin(), stdout(), 2048) + ) = Connect::io(remoc::Cfg::default(), stdin(), stdout()) .await .unwrap(); @@ -118,3 +135,44 @@ pub async fn main_loop(plugin_info: T) { let _ = tokio::join!(tx.send(client), server.serve(), tokio::spawn(conn)); } + +// Errors + +pub type RemoteResult = Result; + +/// A Plugin Error +/// It's either a connection error or a free String for plugin-specific errors +#[derive(Debug, Serialize, Deserialize)] +pub enum RemoteError { + Remoc(rtc::CallError), + Plugin(String), +} + +impl Display for RemoteError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RemoteError::Remoc(call_error) => write!(f, "communication error: {call_error}"), + RemoteError::Plugin(err) => write!(f, "{err}"), + } + } +} + +impl Error for RemoteError {} + +impl From for RemoteError { + fn from(value: String) -> Self { + Self::Plugin(value) + } +} + +impl From<&str> for RemoteError { + fn from(value: &str) -> Self { + Self::Plugin(value.into()) + } +} + +impl From for RemoteError { + fn from(value: rtc::CallError) -> Self { + Self::Remoc(value) + } +} diff --git a/src/concepts/action.rs b/src/concepts/action.rs index 70a89bc..00754d1 100644 --- a/src/concepts/action.rs +++ b/src/concepts/action.rs @@ -11,6 +11,7 @@ use super::{null_value, parse_duration::*, Match, Pattern, PatternType}; #[derive(Clone, Debug, Default, Deserialize, Serialize)] #[serde(deny_unknown_fields)] pub struct Action { + #[serde(default)] pub cmd: Vec, // TODO one shot time deserialization diff --git a/src/concepts/plugin.rs b/src/concepts/plugin.rs index f4da976..fb365d3 100644 --- a/src/concepts/plugin.rs +++ b/src/concepts/plugin.rs @@ -40,6 +40,7 @@ impl Plugin { Command::new(&self.path) .stdin(Stdio::piped()) .stdout(Stdio::piped()) + .env("RUST_BACKTRACE", "1") .spawn() } } diff --git a/src/daemon/filter/mod.rs b/src/daemon/filter/mod.rs index 11a4a48..a71bcd5 100644 --- a/src/daemon/filter/mod.rs +++ b/src/daemon/filter/mod.rs @@ -398,8 +398,9 @@ fn exec_now( match action_impl { Some(action_impl) => { info!( - "{action}: run {}", + "{action}: run {} [{:?}]", action.action_type.clone().unwrap_or_default(), + &m, ); // Sending action diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 94b416e..70f21cb 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -99,9 +99,16 @@ pub async fn daemon( socket.manager(config, state, shutdown.token()); // Start Stream managers - let stream_task_handles = stream_managers - .into_iter() - .map(|stream_manager| tokio::spawn(async move { stream_manager.start().await })); + let stream_task_handles = stream_managers.into_iter().filter_map(|stream_manager| { + let standalone = stream_manager.is_standalone(); + let handle = tokio::spawn(async move { stream_manager.start().await }); + // Only wait for standalone streams + if standalone { + Some(handle) + } else { + None + } + }); // Close streams when we receive a quit signal let signal_received = Arc::new(AtomicBool::new(false)); diff --git a/src/daemon/plugin/mod.rs b/src/daemon/plugin/mod.rs index ef1b70d..9d8520c 100644 --- a/src/daemon/plugin/mod.rs +++ b/src/daemon/plugin/mod.rs @@ -25,6 +25,8 @@ pub struct PluginManager { shutdown: ShutdownToken, plugin: &'static Plugin, plugin_info: PluginInfoClient, + stream_impls: Vec, + action_impls: Vec, } impl Deref for PluginManager { @@ -53,7 +55,7 @@ impl PluginManager { _, remoc::rch::base::Sender<()>, remoc::rch::base::Receiver, - ) = Connect::io_buffered(remoc::Cfg::default(), stdout, stdin, 2048) + ) = Connect::io(remoc::Cfg::default(), stdout, stdin) .await .map_err(|err| { format!( @@ -70,11 +72,23 @@ impl PluginManager { .map_err(|err| format!("could not retrieve initial information from plugin: {err}"))? .ok_or("could not retrieve initial information from plugin: no data")?; + let stream_impls = plugin_info + .stream_impls() + .await + .map_err(|err| format!("plugin error while retrieving stream types: {err}"))?; + + let action_impls = plugin_info + .action_impls() + .await + .map_err(|err| format!("plugin error while retrieving action types: {err}"))?; + Ok(Self { child, shutdown, plugin, plugin_info, + stream_impls, + action_impls, }) } @@ -135,26 +149,18 @@ impl Plugins { let path = plugin.path.clone(); let manager = PluginManager::new(plugin, shutdown).await?; - for stream in manager - .stream_impls() - .await - .map_err(|err| format!("plugin error: {err}"))? - { + for stream in &manager.stream_impls { if let Some(path) = self.streams.insert(stream.clone().into(), path.clone()) { return Err(format!( - "plugin {path} already exposed a stream with type name '{stream}'" + "plugin {path} already exposed a stream with type name '{stream}'", )); } } - for action in manager - .action_impls() - .await - .map_err(|err| format!("plugin error: {err}"))? - { + for action in &manager.action_impls { if let Some(path) = self.actions.insert(action.clone().into(), path.clone()) { return Err(format!( - "plugin {path} already exposed a action with type name '{action}'" + "plugin {path} already exposed a action with type name '{action}'", )); } } @@ -183,7 +189,7 @@ impl Plugins { to_stable_value(config), ) .await - .map_err(|err| format!("plugin error: {err}"))? + .map_err(|err| format!("plugin error while initializing stream: {err}")) } pub async fn init_action_impl( @@ -209,9 +215,10 @@ impl Plugins { action_name.into(), action_type.into(), to_stable_value(config), + patterns, ) .await - .map_err(|err| format!("plugin error: {err}"))? + .map_err(|err| format!("plugin error while initializing action: {err}")) } pub async fn finish_setup(&mut self) -> Result<(), String> { @@ -226,15 +233,7 @@ impl Plugins { .into_iter() .zip(self.plugins.values()) .map(|(result, plugin_manager)| { - result - .map_err(|err| format!("plugin {} error: {err}", plugin_manager.plugin.path)) - .map_err(|err| { - format!( - "invalid config for plugin {}: {err}", - plugin_manager.plugin.path - ) - }) - .flatten() + result.map_err(|err| format!("plugin {} error: {err}", plugin_manager.plugin.path)) }) .collect::>() } diff --git a/src/daemon/stream.rs b/src/daemon/stream.rs index bf5c189..f9e8eec 100644 --- a/src/daemon/stream.rs +++ b/src/daemon/stream.rs @@ -95,7 +95,14 @@ impl StreamManager { }) } - pub async fn start(self) { + pub fn is_standalone(&self) -> bool { + match &self.stream_plugin { + Some(plugin) => plugin.standalone, + None => true, + } + } + + pub async fn start(mut self) { // First start FilterManagers persisted actions let now = Local::now(); join_all( @@ -115,7 +122,7 @@ impl StreamManager { } } - async fn start_plugin(mut self) { + async fn start_plugin(&mut self) { let mut plugin = self.stream_plugin.take().unwrap(); loop { @@ -128,11 +135,18 @@ impl StreamManager { return; } Err(err) => { - error!( - "impossible to read output from stream {}: {}", - self.stream.name, err - ); - return; + if err.is_final() { + error!( + "error reading from plugin stream {}: {}", + self.stream.name, err + ); + return; + } else { + error!( + "temporary error reading from plugin stream {}: {}", + self.stream.name, err + ); + } } Ok(None) => { return; @@ -141,7 +155,7 @@ impl StreamManager { } } - async fn start_cmd(self) { + async fn start_cmd(&self) { let mut child = match Command::new(&self.stream.cmd[0]) .args(&self.stream.cmd[1..]) .stdin(Stdio::null()) diff --git a/tests/plugin_virtual.rs b/tests/plugin_virtual.rs new file mode 100644 index 0000000..cf546f3 --- /dev/null +++ b/tests/plugin_virtual.rs @@ -0,0 +1,30 @@ +use std::{path::Path, time::Duration}; + +use assert_cmd::Command; +use assert_fs::prelude::*; +use predicates::prelude::predicate; + +#[test] +fn plugin_virtual() { + let tmp_dir = assert_fs::TempDir::new().unwrap(); + tmp_dir + .child("config.jsonnet") + .write_file(Path::new("tests/test-conf/test-virtual.jsonnet")) + .unwrap(); + + Command::cargo_bin("reaction") + .unwrap() + .args(["start", "--socket", "./s", "--config", "./config.jsonnet"]) + .current_dir(tmp_dir.path()) + .timeout(Duration::from_secs(5)) + // Expected exit 1: all stream exited + .assert() + .code(predicate::eq(1)); + + // Expected output + let output = [ + "a0 1", "a0 2", "a0 3", "a0 4", "b0 1", "b0 2", "b0 3", "b0 4", "", + ]; + tmp_dir.child("log").assert(&output.join("\n")); + tmp_dir.child("log").write_str("").unwrap(); +} diff --git a/tests/test-conf/test-virtual.jsonnet b/tests/test-conf/test-virtual.jsonnet new file mode 100644 index 0000000..f253319 --- /dev/null +++ b/tests/test-conf/test-virtual.jsonnet @@ -0,0 +1,58 @@ +{ + patterns: { + num: { + regex: @"[0-9]+", + }, + all: { + regex: @".*" + } + }, + + plugins: [ + { + path: "/home/ppom/prg/reaction/target/debug/reaction-plugin-virtual", + } + ], + + streams: { + s0: { + cmd: ["bash", "-c", "for i in $(seq 4); do echo $i; sleep 0.1; done; sleep 1.2"], + filters: { + f0: { + regex: ["^$"], + actions: { + a0: { + type: "virtual", + options: { + send: "a0 ", + to: "s1", + } + }, + b0: { + type: "virtual", + options: { + send: "b0 ", + to: "s1", + }, + after: "600ms", + }, + }, + }, + }, + }, + s1: { + type: "virtual", + options: {}, + filters: { + f1: { + regex: ["^$"], + actions: { + a1: { + cmd: ['sh', '-c', 'echo >>./log'], + }, + }, + }, + }, + }, + }, +}