plugin improvements

- fix panic of channel(0)
- cleaner plugin interface with one level of Result
- standalone metadata for stream plugins
- new test for plugin virtual
This commit is contained in:
ppom 2025-10-10 12:00:00 +02:00
commit 7cbf482e4d
No known key found for this signature in database
10 changed files with 236 additions and 75 deletions

View file

@ -1,7 +1,7 @@
use std::collections::BTreeMap;
use reaction_plugin::{ActionImpl, Exec, Line, PluginInfo, RemoteResult, StreamImpl, Value};
use remoc::rch::mpsc;
use remoc::{rch::mpsc, rtc};
#[tokio::main]
async fn main() {
@ -16,11 +16,11 @@ struct Plugin {
}
impl PluginInfo for Plugin {
async fn stream_impls(&self) -> RemoteResult<Vec<String>> {
async fn stream_impls(&self) -> Result<Vec<String>, rtc::CallError> {
Ok(vec!["virtual".into()])
}
async fn action_impls(&self) -> RemoteResult<Vec<String>> {
async fn action_impls(&self) -> Result<Vec<String>, rtc::CallError> {
Ok(vec!["virtual".into()])
}
@ -29,25 +29,21 @@ impl PluginInfo for Plugin {
stream_name: String,
stream_type: String,
config: Value,
) -> RemoteResult<Result<StreamImpl, String>> {
) -> RemoteResult<StreamImpl> {
if stream_type != "virtual" {
return Ok(Err(
"This plugin can't handle other stream types than virtual".into(),
));
return Err("This plugin can't handle other stream types than virtual".into());
}
let (virtual_stream, receiver) = match VirtualStream::new(config) {
Ok(v) => v,
Err(err) => return Ok(Err(err)),
};
let (virtual_stream, receiver) = VirtualStream::new(config)?;
if let Some(_) = self.streams.insert(stream_name, virtual_stream) {
return Ok(Err(
"this virtual stream has already been initialized".into()
));
return Err("this virtual stream has already been initialized".into());
}
Ok(Ok(StreamImpl { stream: receiver }))
Ok(StreamImpl {
stream: receiver,
standalone: false,
})
}
async fn action_impl(
@ -58,24 +54,19 @@ impl PluginInfo for Plugin {
action_type: String,
config: Value,
patterns: Vec<String>,
) -> RemoteResult<Result<ActionImpl, String>> {
) -> RemoteResult<ActionImpl> {
if &action_type != "virtual" {
return Ok(Err(
"This plugin can't handle other stream types than virtual".into(),
));
return Err("This plugin can't handle other stream types than virtual".into());
}
let (virtual_action_init, tx) =
match VirtualActionInit::new(stream_name, filter_name, action_name, config, patterns) {
Ok(v) => v,
Err(err) => return Ok(Err(err)),
};
VirtualActionInit::new(stream_name, filter_name, action_name, config, patterns)?;
self.actions_init.push(virtual_action_init);
Ok(Ok(ActionImpl { tx }))
Ok(ActionImpl { tx })
}
async fn finish_setup(&mut self) -> RemoteResult<Result<(), String>> {
async fn finish_setup(&mut self) -> RemoteResult<()> {
while let Some(action_init) = self.actions_init.pop() {
match self.streams.get(&action_init.to) {
Some(virtual_stream) => {
@ -87,20 +78,21 @@ impl PluginInfo for Plugin {
});
}
None => {
return Ok(Err(format!(
return Err(format!(
"action {}.{}.{}: send \"{}\" matches no stream name",
action_init.stream_name,
action_init.filter_name,
action_init.action_name,
action_init.to
)));
)
.into());
}
}
}
self.streams = BTreeMap::new();
self.actions_init = Vec::new();
Ok(Ok(()))
Ok(())
}
async fn close(self) -> RemoteResult<()> {
@ -126,7 +118,7 @@ impl VirtualStream {
_ => return Err(CONFIG_ERROR.into()),
}
let (tx, rx) = mpsc::channel(0);
let (tx, rx) = mpsc::channel(2);
Ok((Self { tx }, rx))
}
}
@ -179,7 +171,7 @@ impl VirtualActionInit {
.map(|pattern| format!("<{pattern}>"))
.collect();
let (tx, rx) = mpsc::channel(0);
let (tx, rx) = mpsc::channel(1);
Ok((
Self {
stream_name,

View file

@ -1,4 +1,4 @@
use std::collections::BTreeMap;
use std::{collections::BTreeMap, error::Error, fmt::Display};
/// This crate defines the API between reaction's core and plugins.
///
@ -20,14 +20,24 @@ use tokio::io::{stdin, stdout};
#[rtc::remote]
pub trait PluginInfo {
/// Return all stream types that should be made available to reaction users
async fn stream_impls(&self) -> RemoteResult<Vec<String>>;
/// ```jsonnet
/// {
/// streams: {
/// my_stream: {
/// type: "..."
/// # ↑ all those exposed types
/// }
/// }
/// }
/// ```
async fn stream_impls(&self) -> Result<Vec<String>, rtc::CallError>;
/// Return one stream of a given type if it exists
async fn stream_impl(
&mut self,
stream_name: String,
stream_type: String,
config: Value,
) -> RemoteResult<Result<StreamImpl, String>>;
) -> RemoteResult<StreamImpl>;
// /// Return all filter types that should be made available to reaction users
// async fn filter_impls(&self) -> RemoteResult<Vec<String>>;
@ -41,7 +51,7 @@ pub trait PluginInfo {
// ) -> RemoteResult<Result<FilterImpl, String>>;
/// Return all action types that should be made available to reaction users
async fn action_impls(&self) -> RemoteResult<Vec<String>>;
async fn action_impls(&self) -> Result<Vec<String>, rtc::CallError>;
/// Return one instance of a given type.
async fn action_impl(
&mut self,
@ -51,17 +61,15 @@ pub trait PluginInfo {
action_type: String,
config: Value,
patterns: Vec<String>,
) -> RemoteResult<Result<ActionImpl, String>>;
) -> RemoteResult<ActionImpl>;
/// Notify the plugin that setup is finished, permitting a last occasion to report an error
/// (For example if a stream wants a companion action but it hasn't been initialized)
async fn finish_setup(&mut self) -> RemoteResult<Result<(), String>>;
async fn finish_setup(&mut self) -> RemoteResult<()>;
async fn close(mut self) -> RemoteResult<()>;
}
pub type RemoteResult<T> = Result<T, rtc::CallError>;
/// Represents a configuration value.
/// This is not meant as an efficient type, but as a very flexible one.
#[derive(Serialize, Deserialize)]
@ -78,6 +86,15 @@ pub enum Value {
#[derive(Serialize, Deserialize)]
pub struct StreamImpl {
pub stream: rch::mpsc::Receiver<Line>,
/// Whether this stream works standalone, or if it needs other streams to be fed.
/// Defaults to true.
/// When false, reaction will exit if it's the last one standing.
#[serde(default = "_true")]
pub standalone: bool,
}
fn _true() -> bool {
true
}
pub type Line = Result<String, String>;
@ -110,7 +127,7 @@ pub async fn main_loop<T: PluginInfo + Send + Sync + 'static>(plugin_info: T) {
_,
remoc::rch::base::Sender<PluginInfoClient>,
remoc::rch::base::Receiver<()>,
) = Connect::io_buffered(remoc::Cfg::default(), stdin(), stdout(), 2048)
) = Connect::io(remoc::Cfg::default(), stdin(), stdout())
.await
.unwrap();
@ -118,3 +135,44 @@ pub async fn main_loop<T: PluginInfo + Send + Sync + 'static>(plugin_info: T) {
let _ = tokio::join!(tx.send(client), server.serve(), tokio::spawn(conn));
}
// Errors
pub type RemoteResult<T> = Result<T, RemoteError>;
/// A Plugin Error
/// It's either a connection error or a free String for plugin-specific errors
#[derive(Debug, Serialize, Deserialize)]
pub enum RemoteError {
Remoc(rtc::CallError),
Plugin(String),
}
impl Display for RemoteError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RemoteError::Remoc(call_error) => write!(f, "communication error: {call_error}"),
RemoteError::Plugin(err) => write!(f, "{err}"),
}
}
}
impl Error for RemoteError {}
impl From<String> for RemoteError {
fn from(value: String) -> Self {
Self::Plugin(value)
}
}
impl From<&str> for RemoteError {
fn from(value: &str) -> Self {
Self::Plugin(value.into())
}
}
impl From<rtc::CallError> for RemoteError {
fn from(value: rtc::CallError) -> Self {
Self::Remoc(value)
}
}

View file

@ -11,6 +11,7 @@ use super::{null_value, parse_duration::*, Match, Pattern, PatternType};
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct Action {
#[serde(default)]
pub cmd: Vec<String>,
// TODO one shot time deserialization

View file

@ -40,6 +40,7 @@ impl Plugin {
Command::new(&self.path)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.env("RUST_BACKTRACE", "1")
.spawn()
}
}

View file

@ -398,8 +398,9 @@ fn exec_now(
match action_impl {
Some(action_impl) => {
info!(
"{action}: run {}",
"{action}: run {} [{:?}]",
action.action_type.clone().unwrap_or_default(),
&m,
);
// Sending action

View file

@ -99,9 +99,16 @@ pub async fn daemon(
socket.manager(config, state, shutdown.token());
// Start Stream managers
let stream_task_handles = stream_managers
.into_iter()
.map(|stream_manager| tokio::spawn(async move { stream_manager.start().await }));
let stream_task_handles = stream_managers.into_iter().filter_map(|stream_manager| {
let standalone = stream_manager.is_standalone();
let handle = tokio::spawn(async move { stream_manager.start().await });
// Only wait for standalone streams
if standalone {
Some(handle)
} else {
None
}
});
// Close streams when we receive a quit signal
let signal_received = Arc::new(AtomicBool::new(false));

View file

@ -25,6 +25,8 @@ pub struct PluginManager {
shutdown: ShutdownToken,
plugin: &'static Plugin,
plugin_info: PluginInfoClient,
stream_impls: Vec<String>,
action_impls: Vec<String>,
}
impl Deref for PluginManager {
@ -53,7 +55,7 @@ impl PluginManager {
_,
remoc::rch::base::Sender<()>,
remoc::rch::base::Receiver<PluginInfoClient>,
) = Connect::io_buffered(remoc::Cfg::default(), stdout, stdin, 2048)
) = Connect::io(remoc::Cfg::default(), stdout, stdin)
.await
.map_err(|err| {
format!(
@ -70,11 +72,23 @@ impl PluginManager {
.map_err(|err| format!("could not retrieve initial information from plugin: {err}"))?
.ok_or("could not retrieve initial information from plugin: no data")?;
let stream_impls = plugin_info
.stream_impls()
.await
.map_err(|err| format!("plugin error while retrieving stream types: {err}"))?;
let action_impls = plugin_info
.action_impls()
.await
.map_err(|err| format!("plugin error while retrieving action types: {err}"))?;
Ok(Self {
child,
shutdown,
plugin,
plugin_info,
stream_impls,
action_impls,
})
}
@ -135,26 +149,18 @@ impl Plugins {
let path = plugin.path.clone();
let manager = PluginManager::new(plugin, shutdown).await?;
for stream in manager
.stream_impls()
.await
.map_err(|err| format!("plugin error: {err}"))?
{
for stream in &manager.stream_impls {
if let Some(path) = self.streams.insert(stream.clone().into(), path.clone()) {
return Err(format!(
"plugin {path} already exposed a stream with type name '{stream}'"
"plugin {path} already exposed a stream with type name '{stream}'",
));
}
}
for action in manager
.action_impls()
.await
.map_err(|err| format!("plugin error: {err}"))?
{
for action in &manager.action_impls {
if let Some(path) = self.actions.insert(action.clone().into(), path.clone()) {
return Err(format!(
"plugin {path} already exposed a action with type name '{action}'"
"plugin {path} already exposed a action with type name '{action}'",
));
}
}
@ -183,7 +189,7 @@ impl Plugins {
to_stable_value(config),
)
.await
.map_err(|err| format!("plugin error: {err}"))?
.map_err(|err| format!("plugin error while initializing stream: {err}"))
}
pub async fn init_action_impl(
@ -209,9 +215,10 @@ impl Plugins {
action_name.into(),
action_type.into(),
to_stable_value(config),
patterns,
)
.await
.map_err(|err| format!("plugin error: {err}"))?
.map_err(|err| format!("plugin error while initializing action: {err}"))
}
pub async fn finish_setup(&mut self) -> Result<(), String> {
@ -226,15 +233,7 @@ impl Plugins {
.into_iter()
.zip(self.plugins.values())
.map(|(result, plugin_manager)| {
result
.map_err(|err| format!("plugin {} error: {err}", plugin_manager.plugin.path))
.map_err(|err| {
format!(
"invalid config for plugin {}: {err}",
plugin_manager.plugin.path
)
})
.flatten()
result.map_err(|err| format!("plugin {} error: {err}", plugin_manager.plugin.path))
})
.collect::<Result<(), String>>()
}

View file

@ -95,7 +95,14 @@ impl StreamManager {
})
}
pub async fn start(self) {
pub fn is_standalone(&self) -> bool {
match &self.stream_plugin {
Some(plugin) => plugin.standalone,
None => true,
}
}
pub async fn start(mut self) {
// First start FilterManagers persisted actions
let now = Local::now();
join_all(
@ -115,7 +122,7 @@ impl StreamManager {
}
}
async fn start_plugin(mut self) {
async fn start_plugin(&mut self) {
let mut plugin = self.stream_plugin.take().unwrap();
loop {
@ -128,11 +135,18 @@ impl StreamManager {
return;
}
Err(err) => {
error!(
"impossible to read output from stream {}: {}",
self.stream.name, err
);
return;
if err.is_final() {
error!(
"error reading from plugin stream {}: {}",
self.stream.name, err
);
return;
} else {
error!(
"temporary error reading from plugin stream {}: {}",
self.stream.name, err
);
}
}
Ok(None) => {
return;
@ -141,7 +155,7 @@ impl StreamManager {
}
}
async fn start_cmd(self) {
async fn start_cmd(&self) {
let mut child = match Command::new(&self.stream.cmd[0])
.args(&self.stream.cmd[1..])
.stdin(Stdio::null())

30
tests/plugin_virtual.rs Normal file
View file

@ -0,0 +1,30 @@
use std::{path::Path, time::Duration};
use assert_cmd::Command;
use assert_fs::prelude::*;
use predicates::prelude::predicate;
#[test]
fn plugin_virtual() {
let tmp_dir = assert_fs::TempDir::new().unwrap();
tmp_dir
.child("config.jsonnet")
.write_file(Path::new("tests/test-conf/test-virtual.jsonnet"))
.unwrap();
Command::cargo_bin("reaction")
.unwrap()
.args(["start", "--socket", "./s", "--config", "./config.jsonnet"])
.current_dir(tmp_dir.path())
.timeout(Duration::from_secs(5))
// Expected exit 1: all stream exited
.assert()
.code(predicate::eq(1));
// Expected output
let output = [
"a0 1", "a0 2", "a0 3", "a0 4", "b0 1", "b0 2", "b0 3", "b0 4", "",
];
tmp_dir.child("log").assert(&output.join("\n"));
tmp_dir.child("log").write_str("").unwrap();
}

View file

@ -0,0 +1,58 @@
{
patterns: {
num: {
regex: @"[0-9]+",
},
all: {
regex: @".*"
}
},
plugins: [
{
path: "/home/ppom/prg/reaction/target/debug/reaction-plugin-virtual",
}
],
streams: {
s0: {
cmd: ["bash", "-c", "for i in $(seq 4); do echo $i; sleep 0.1; done; sleep 1.2"],
filters: {
f0: {
regex: ["^<num>$"],
actions: {
a0: {
type: "virtual",
options: {
send: "a0 <num>",
to: "s1",
}
},
b0: {
type: "virtual",
options: {
send: "b0 <num>",
to: "s1",
},
after: "600ms",
},
},
},
},
},
s1: {
type: "virtual",
options: {},
filters: {
f1: {
regex: ["^<all>$"],
actions: {
a1: {
cmd: ['sh', '-c', 'echo <all> >>./log'],
},
},
},
},
},
},
}