mirror of
https://framagit.org/ppom/reaction
synced 2026-03-14 12:45:47 +01:00
plugin improvements
- fix panic of channel(0) - cleaner plugin interface with one level of Result - standalone metadata for stream plugins - new test for plugin virtual
This commit is contained in:
parent
550606801a
commit
635d1a052d
10 changed files with 236 additions and 75 deletions
|
|
@ -1,7 +1,7 @@
|
|||
use std::collections::BTreeMap;
|
||||
|
||||
use reaction_plugin::{ActionImpl, Exec, Line, PluginInfo, RemoteResult, StreamImpl, Value};
|
||||
use remoc::rch::mpsc;
|
||||
use remoc::{rch::mpsc, rtc};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
|
|
@ -16,11 +16,11 @@ struct Plugin {
|
|||
}
|
||||
|
||||
impl PluginInfo for Plugin {
|
||||
async fn stream_impls(&self) -> RemoteResult<Vec<String>> {
|
||||
async fn stream_impls(&self) -> Result<Vec<String>, rtc::CallError> {
|
||||
Ok(vec!["virtual".into()])
|
||||
}
|
||||
|
||||
async fn action_impls(&self) -> RemoteResult<Vec<String>> {
|
||||
async fn action_impls(&self) -> Result<Vec<String>, rtc::CallError> {
|
||||
Ok(vec!["virtual".into()])
|
||||
}
|
||||
|
||||
|
|
@ -29,25 +29,21 @@ impl PluginInfo for Plugin {
|
|||
stream_name: String,
|
||||
stream_type: String,
|
||||
config: Value,
|
||||
) -> RemoteResult<Result<StreamImpl, String>> {
|
||||
) -> RemoteResult<StreamImpl> {
|
||||
if stream_type != "virtual" {
|
||||
return Ok(Err(
|
||||
"This plugin can't handle other stream types than virtual".into(),
|
||||
));
|
||||
return Err("This plugin can't handle other stream types than virtual".into());
|
||||
}
|
||||
|
||||
let (virtual_stream, receiver) = match VirtualStream::new(config) {
|
||||
Ok(v) => v,
|
||||
Err(err) => return Ok(Err(err)),
|
||||
};
|
||||
let (virtual_stream, receiver) = VirtualStream::new(config)?;
|
||||
|
||||
if let Some(_) = self.streams.insert(stream_name, virtual_stream) {
|
||||
return Ok(Err(
|
||||
"this virtual stream has already been initialized".into()
|
||||
));
|
||||
return Err("this virtual stream has already been initialized".into());
|
||||
}
|
||||
|
||||
Ok(Ok(StreamImpl { stream: receiver }))
|
||||
Ok(StreamImpl {
|
||||
stream: receiver,
|
||||
standalone: false,
|
||||
})
|
||||
}
|
||||
|
||||
async fn action_impl(
|
||||
|
|
@ -58,24 +54,19 @@ impl PluginInfo for Plugin {
|
|||
action_type: String,
|
||||
config: Value,
|
||||
patterns: Vec<String>,
|
||||
) -> RemoteResult<Result<ActionImpl, String>> {
|
||||
) -> RemoteResult<ActionImpl> {
|
||||
if &action_type != "virtual" {
|
||||
return Ok(Err(
|
||||
"This plugin can't handle other stream types than virtual".into(),
|
||||
));
|
||||
return Err("This plugin can't handle other stream types than virtual".into());
|
||||
}
|
||||
|
||||
let (virtual_action_init, tx) =
|
||||
match VirtualActionInit::new(stream_name, filter_name, action_name, config, patterns) {
|
||||
Ok(v) => v,
|
||||
Err(err) => return Ok(Err(err)),
|
||||
};
|
||||
VirtualActionInit::new(stream_name, filter_name, action_name, config, patterns)?;
|
||||
|
||||
self.actions_init.push(virtual_action_init);
|
||||
Ok(Ok(ActionImpl { tx }))
|
||||
Ok(ActionImpl { tx })
|
||||
}
|
||||
|
||||
async fn finish_setup(&mut self) -> RemoteResult<Result<(), String>> {
|
||||
async fn finish_setup(&mut self) -> RemoteResult<()> {
|
||||
while let Some(action_init) = self.actions_init.pop() {
|
||||
match self.streams.get(&action_init.to) {
|
||||
Some(virtual_stream) => {
|
||||
|
|
@ -87,20 +78,21 @@ impl PluginInfo for Plugin {
|
|||
});
|
||||
}
|
||||
None => {
|
||||
return Ok(Err(format!(
|
||||
return Err(format!(
|
||||
"action {}.{}.{}: send \"{}\" matches no stream name",
|
||||
action_init.stream_name,
|
||||
action_init.filter_name,
|
||||
action_init.action_name,
|
||||
action_init.to
|
||||
)));
|
||||
)
|
||||
.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
self.streams = BTreeMap::new();
|
||||
self.actions_init = Vec::new();
|
||||
|
||||
Ok(Ok(()))
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn close(self) -> RemoteResult<()> {
|
||||
|
|
@ -126,7 +118,7 @@ impl VirtualStream {
|
|||
_ => return Err(CONFIG_ERROR.into()),
|
||||
}
|
||||
|
||||
let (tx, rx) = mpsc::channel(0);
|
||||
let (tx, rx) = mpsc::channel(2);
|
||||
Ok((Self { tx }, rx))
|
||||
}
|
||||
}
|
||||
|
|
@ -179,7 +171,7 @@ impl VirtualActionInit {
|
|||
.map(|pattern| format!("<{pattern}>"))
|
||||
.collect();
|
||||
|
||||
let (tx, rx) = mpsc::channel(0);
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
Ok((
|
||||
Self {
|
||||
stream_name,
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
use std::collections::BTreeMap;
|
||||
use std::{collections::BTreeMap, error::Error, fmt::Display};
|
||||
|
||||
/// This crate defines the API between reaction's core and plugins.
|
||||
///
|
||||
|
|
@ -20,14 +20,24 @@ use tokio::io::{stdin, stdout};
|
|||
#[rtc::remote]
|
||||
pub trait PluginInfo {
|
||||
/// Return all stream types that should be made available to reaction users
|
||||
async fn stream_impls(&self) -> RemoteResult<Vec<String>>;
|
||||
/// ```jsonnet
|
||||
/// {
|
||||
/// streams: {
|
||||
/// my_stream: {
|
||||
/// type: "..."
|
||||
/// # ↑ all those exposed types
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
async fn stream_impls(&self) -> Result<Vec<String>, rtc::CallError>;
|
||||
/// Return one stream of a given type if it exists
|
||||
async fn stream_impl(
|
||||
&mut self,
|
||||
stream_name: String,
|
||||
stream_type: String,
|
||||
config: Value,
|
||||
) -> RemoteResult<Result<StreamImpl, String>>;
|
||||
) -> RemoteResult<StreamImpl>;
|
||||
|
||||
// /// Return all filter types that should be made available to reaction users
|
||||
// async fn filter_impls(&self) -> RemoteResult<Vec<String>>;
|
||||
|
|
@ -41,7 +51,7 @@ pub trait PluginInfo {
|
|||
// ) -> RemoteResult<Result<FilterImpl, String>>;
|
||||
|
||||
/// Return all action types that should be made available to reaction users
|
||||
async fn action_impls(&self) -> RemoteResult<Vec<String>>;
|
||||
async fn action_impls(&self) -> Result<Vec<String>, rtc::CallError>;
|
||||
/// Return one instance of a given type.
|
||||
async fn action_impl(
|
||||
&mut self,
|
||||
|
|
@ -51,17 +61,15 @@ pub trait PluginInfo {
|
|||
action_type: String,
|
||||
config: Value,
|
||||
patterns: Vec<String>,
|
||||
) -> RemoteResult<Result<ActionImpl, String>>;
|
||||
) -> RemoteResult<ActionImpl>;
|
||||
|
||||
/// Notify the plugin that setup is finished, permitting a last occasion to report an error
|
||||
/// (For example if a stream wants a companion action but it hasn't been initialized)
|
||||
async fn finish_setup(&mut self) -> RemoteResult<Result<(), String>>;
|
||||
async fn finish_setup(&mut self) -> RemoteResult<()>;
|
||||
|
||||
async fn close(mut self) -> RemoteResult<()>;
|
||||
}
|
||||
|
||||
pub type RemoteResult<T> = Result<T, rtc::CallError>;
|
||||
|
||||
/// Represents a configuration value.
|
||||
/// This is not meant as an efficient type, but as a very flexible one.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
|
@ -78,6 +86,15 @@ pub enum Value {
|
|||
#[derive(Serialize, Deserialize)]
|
||||
pub struct StreamImpl {
|
||||
pub stream: rch::mpsc::Receiver<Line>,
|
||||
/// Whether this stream works standalone, or if it needs other streams to be fed.
|
||||
/// Defaults to true.
|
||||
/// When false, reaction will exit if it's the last one standing.
|
||||
#[serde(default = "_true")]
|
||||
pub standalone: bool,
|
||||
}
|
||||
|
||||
fn _true() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
pub type Line = Result<String, String>;
|
||||
|
|
@ -110,7 +127,7 @@ pub async fn main_loop<T: PluginInfo + Send + Sync + 'static>(plugin_info: T) {
|
|||
_,
|
||||
remoc::rch::base::Sender<PluginInfoClient>,
|
||||
remoc::rch::base::Receiver<()>,
|
||||
) = Connect::io_buffered(remoc::Cfg::default(), stdin(), stdout(), 2048)
|
||||
) = Connect::io(remoc::Cfg::default(), stdin(), stdout())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
@ -118,3 +135,44 @@ pub async fn main_loop<T: PluginInfo + Send + Sync + 'static>(plugin_info: T) {
|
|||
|
||||
let _ = tokio::join!(tx.send(client), server.serve(), tokio::spawn(conn));
|
||||
}
|
||||
|
||||
// Errors
|
||||
|
||||
pub type RemoteResult<T> = Result<T, RemoteError>;
|
||||
|
||||
/// A Plugin Error
|
||||
/// It's either a connection error or a free String for plugin-specific errors
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum RemoteError {
|
||||
Remoc(rtc::CallError),
|
||||
Plugin(String),
|
||||
}
|
||||
|
||||
impl Display for RemoteError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
RemoteError::Remoc(call_error) => write!(f, "communication error: {call_error}"),
|
||||
RemoteError::Plugin(err) => write!(f, "{err}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for RemoteError {}
|
||||
|
||||
impl From<String> for RemoteError {
|
||||
fn from(value: String) -> Self {
|
||||
Self::Plugin(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&str> for RemoteError {
|
||||
fn from(value: &str) -> Self {
|
||||
Self::Plugin(value.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<rtc::CallError> for RemoteError {
|
||||
fn from(value: rtc::CallError) -> Self {
|
||||
Self::Remoc(value)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ use super::{null_value, parse_duration::*, Match, Pattern, PatternType};
|
|||
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct Action {
|
||||
#[serde(default)]
|
||||
pub cmd: Vec<String>,
|
||||
|
||||
// TODO one shot time deserialization
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ impl Plugin {
|
|||
Command::new(&self.path)
|
||||
.stdin(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
.env("RUST_BACKTRACE", "1")
|
||||
.spawn()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -398,8 +398,9 @@ fn exec_now(
|
|||
match action_impl {
|
||||
Some(action_impl) => {
|
||||
info!(
|
||||
"{action}: run {}",
|
||||
"{action}: run {} [{:?}]",
|
||||
action.action_type.clone().unwrap_or_default(),
|
||||
&m,
|
||||
);
|
||||
|
||||
// Sending action
|
||||
|
|
|
|||
|
|
@ -99,9 +99,16 @@ pub async fn daemon(
|
|||
socket.manager(config, state, shutdown.token());
|
||||
|
||||
// Start Stream managers
|
||||
let stream_task_handles = stream_managers
|
||||
.into_iter()
|
||||
.map(|stream_manager| tokio::spawn(async move { stream_manager.start().await }));
|
||||
let stream_task_handles = stream_managers.into_iter().filter_map(|stream_manager| {
|
||||
let standalone = stream_manager.is_standalone();
|
||||
let handle = tokio::spawn(async move { stream_manager.start().await });
|
||||
// Only wait for standalone streams
|
||||
if standalone {
|
||||
Some(handle)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
|
||||
// Close streams when we receive a quit signal
|
||||
let signal_received = Arc::new(AtomicBool::new(false));
|
||||
|
|
|
|||
|
|
@ -25,6 +25,8 @@ pub struct PluginManager {
|
|||
shutdown: ShutdownToken,
|
||||
plugin: &'static Plugin,
|
||||
plugin_info: PluginInfoClient,
|
||||
stream_impls: Vec<String>,
|
||||
action_impls: Vec<String>,
|
||||
}
|
||||
|
||||
impl Deref for PluginManager {
|
||||
|
|
@ -53,7 +55,7 @@ impl PluginManager {
|
|||
_,
|
||||
remoc::rch::base::Sender<()>,
|
||||
remoc::rch::base::Receiver<PluginInfoClient>,
|
||||
) = Connect::io_buffered(remoc::Cfg::default(), stdout, stdin, 2048)
|
||||
) = Connect::io(remoc::Cfg::default(), stdout, stdin)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
format!(
|
||||
|
|
@ -70,11 +72,23 @@ impl PluginManager {
|
|||
.map_err(|err| format!("could not retrieve initial information from plugin: {err}"))?
|
||||
.ok_or("could not retrieve initial information from plugin: no data")?;
|
||||
|
||||
let stream_impls = plugin_info
|
||||
.stream_impls()
|
||||
.await
|
||||
.map_err(|err| format!("plugin error while retrieving stream types: {err}"))?;
|
||||
|
||||
let action_impls = plugin_info
|
||||
.action_impls()
|
||||
.await
|
||||
.map_err(|err| format!("plugin error while retrieving action types: {err}"))?;
|
||||
|
||||
Ok(Self {
|
||||
child,
|
||||
shutdown,
|
||||
plugin,
|
||||
plugin_info,
|
||||
stream_impls,
|
||||
action_impls,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -135,26 +149,18 @@ impl Plugins {
|
|||
let path = plugin.path.clone();
|
||||
let manager = PluginManager::new(plugin, shutdown).await?;
|
||||
|
||||
for stream in manager
|
||||
.stream_impls()
|
||||
.await
|
||||
.map_err(|err| format!("plugin error: {err}"))?
|
||||
{
|
||||
for stream in &manager.stream_impls {
|
||||
if let Some(path) = self.streams.insert(stream.clone().into(), path.clone()) {
|
||||
return Err(format!(
|
||||
"plugin {path} already exposed a stream with type name '{stream}'"
|
||||
"plugin {path} already exposed a stream with type name '{stream}'",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
for action in manager
|
||||
.action_impls()
|
||||
.await
|
||||
.map_err(|err| format!("plugin error: {err}"))?
|
||||
{
|
||||
for action in &manager.action_impls {
|
||||
if let Some(path) = self.actions.insert(action.clone().into(), path.clone()) {
|
||||
return Err(format!(
|
||||
"plugin {path} already exposed a action with type name '{action}'"
|
||||
"plugin {path} already exposed a action with type name '{action}'",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
|
@ -183,7 +189,7 @@ impl Plugins {
|
|||
to_stable_value(config),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| format!("plugin error: {err}"))?
|
||||
.map_err(|err| format!("plugin error while initializing stream: {err}"))
|
||||
}
|
||||
|
||||
pub async fn init_action_impl(
|
||||
|
|
@ -209,9 +215,10 @@ impl Plugins {
|
|||
action_name.into(),
|
||||
action_type.into(),
|
||||
to_stable_value(config),
|
||||
patterns,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| format!("plugin error: {err}"))?
|
||||
.map_err(|err| format!("plugin error while initializing action: {err}"))
|
||||
}
|
||||
|
||||
pub async fn finish_setup(&mut self) -> Result<(), String> {
|
||||
|
|
@ -226,15 +233,7 @@ impl Plugins {
|
|||
.into_iter()
|
||||
.zip(self.plugins.values())
|
||||
.map(|(result, plugin_manager)| {
|
||||
result
|
||||
.map_err(|err| format!("plugin {} error: {err}", plugin_manager.plugin.path))
|
||||
.map_err(|err| {
|
||||
format!(
|
||||
"invalid config for plugin {}: {err}",
|
||||
plugin_manager.plugin.path
|
||||
)
|
||||
})
|
||||
.flatten()
|
||||
result.map_err(|err| format!("plugin {} error: {err}", plugin_manager.plugin.path))
|
||||
})
|
||||
.collect::<Result<(), String>>()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -95,7 +95,14 @@ impl StreamManager {
|
|||
})
|
||||
}
|
||||
|
||||
pub async fn start(self) {
|
||||
pub fn is_standalone(&self) -> bool {
|
||||
match &self.stream_plugin {
|
||||
Some(plugin) => plugin.standalone,
|
||||
None => true,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(mut self) {
|
||||
// First start FilterManagers persisted actions
|
||||
let now = Local::now();
|
||||
join_all(
|
||||
|
|
@ -115,7 +122,7 @@ impl StreamManager {
|
|||
}
|
||||
}
|
||||
|
||||
async fn start_plugin(mut self) {
|
||||
async fn start_plugin(&mut self) {
|
||||
let mut plugin = self.stream_plugin.take().unwrap();
|
||||
|
||||
loop {
|
||||
|
|
@ -128,11 +135,18 @@ impl StreamManager {
|
|||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
error!(
|
||||
"impossible to read output from stream {}: {}",
|
||||
self.stream.name, err
|
||||
);
|
||||
return;
|
||||
if err.is_final() {
|
||||
error!(
|
||||
"error reading from plugin stream {}: {}",
|
||||
self.stream.name, err
|
||||
);
|
||||
return;
|
||||
} else {
|
||||
error!(
|
||||
"temporary error reading from plugin stream {}: {}",
|
||||
self.stream.name, err
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
return;
|
||||
|
|
@ -141,7 +155,7 @@ impl StreamManager {
|
|||
}
|
||||
}
|
||||
|
||||
async fn start_cmd(self) {
|
||||
async fn start_cmd(&self) {
|
||||
let mut child = match Command::new(&self.stream.cmd[0])
|
||||
.args(&self.stream.cmd[1..])
|
||||
.stdin(Stdio::null())
|
||||
|
|
|
|||
30
tests/plugin_virtual.rs
Normal file
30
tests/plugin_virtual.rs
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
use std::{path::Path, time::Duration};
|
||||
|
||||
use assert_cmd::Command;
|
||||
use assert_fs::prelude::*;
|
||||
use predicates::prelude::predicate;
|
||||
|
||||
#[test]
|
||||
fn plugin_virtual() {
|
||||
let tmp_dir = assert_fs::TempDir::new().unwrap();
|
||||
tmp_dir
|
||||
.child("config.jsonnet")
|
||||
.write_file(Path::new("tests/test-conf/test-virtual.jsonnet"))
|
||||
.unwrap();
|
||||
|
||||
Command::cargo_bin("reaction")
|
||||
.unwrap()
|
||||
.args(["start", "--socket", "./s", "--config", "./config.jsonnet"])
|
||||
.current_dir(tmp_dir.path())
|
||||
.timeout(Duration::from_secs(5))
|
||||
// Expected exit 1: all stream exited
|
||||
.assert()
|
||||
.code(predicate::eq(1));
|
||||
|
||||
// Expected output
|
||||
let output = [
|
||||
"a0 1", "a0 2", "a0 3", "a0 4", "b0 1", "b0 2", "b0 3", "b0 4", "",
|
||||
];
|
||||
tmp_dir.child("log").assert(&output.join("\n"));
|
||||
tmp_dir.child("log").write_str("").unwrap();
|
||||
}
|
||||
58
tests/test-conf/test-virtual.jsonnet
Normal file
58
tests/test-conf/test-virtual.jsonnet
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
{
|
||||
patterns: {
|
||||
num: {
|
||||
regex: @"[0-9]+",
|
||||
},
|
||||
all: {
|
||||
regex: @".*"
|
||||
}
|
||||
},
|
||||
|
||||
plugins: [
|
||||
{
|
||||
path: "/home/ppom/prg/reaction/target/debug/reaction-plugin-virtual",
|
||||
}
|
||||
],
|
||||
|
||||
streams: {
|
||||
s0: {
|
||||
cmd: ["bash", "-c", "for i in $(seq 4); do echo $i; sleep 0.1; done; sleep 1.2"],
|
||||
filters: {
|
||||
f0: {
|
||||
regex: ["^<num>$"],
|
||||
actions: {
|
||||
a0: {
|
||||
type: "virtual",
|
||||
options: {
|
||||
send: "a0 <num>",
|
||||
to: "s1",
|
||||
}
|
||||
},
|
||||
b0: {
|
||||
type: "virtual",
|
||||
options: {
|
||||
send: "b0 <num>",
|
||||
to: "s1",
|
||||
},
|
||||
after: "600ms",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
s1: {
|
||||
type: "virtual",
|
||||
options: {},
|
||||
filters: {
|
||||
f1: {
|
||||
regex: ["^<all>$"],
|
||||
actions: {
|
||||
a1: {
|
||||
cmd: ['sh', '-c', 'echo <all> >>./log'],
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue