From f08762c3f37833cab78d96c9346cebc9fc25d07f Mon Sep 17 00:00:00 2001 From: ppom Date: Fri, 3 Oct 2025 12:00:00 +0200 Subject: [PATCH] First shot of "virtual stream" plugin --- Cargo.lock | 10 + Cargo.toml | 6 +- plugins/reaction-plugin-virtual/Cargo.toml | 9 + plugins/reaction-plugin-virtual/src/main.rs | 241 ++++++++++++++++++ .../reaction-plugin}/Cargo.lock | 0 .../reaction-plugin}/Cargo.toml | 1 + .../reaction-plugin}/src/lib.rs | 26 +- src/daemon/filter/mod.rs | 7 +- src/daemon/plugin/mod.rs | 1 + 9 files changed, 295 insertions(+), 6 deletions(-) create mode 100644 plugins/reaction-plugin-virtual/Cargo.toml create mode 100644 plugins/reaction-plugin-virtual/src/main.rs rename {reaction-plugin => plugins/reaction-plugin}/Cargo.lock (100%) rename {reaction-plugin => plugins/reaction-plugin}/Cargo.toml (71%) rename {reaction-plugin => plugins/reaction-plugin}/src/lib.rs (81%) diff --git a/Cargo.lock b/Cargo.lock index e1cfd21..ef0928a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1068,6 +1068,16 @@ version = "0.1.0" dependencies = [ "remoc", "serde", + "tokio", +] + +[[package]] +name = "reaction-plugin-virtual" +version = "0.1.0" +dependencies = [ + "reaction-plugin", + "remoc", + "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 6ce6c63..6e953a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,14 +50,14 @@ jrsonnet-evaluator = "0.4.2" thiserror = "1.0.63" # Async runtime & helpers futures = "0.3.30" -tokio = { version = "1.40.0", features = ["full", "tracing"] } +tokio = { workspace = true, features = ["full", "tracing"] } tokio-util = { version = "0.7.12", features = ["codec"] } # Async logging tracing = "0.1.40" tracing-subscriber = "0.3.18" # Reaction plugin system remoc = { workspace = true } -reaction-plugin = { path = "reaction-plugin" } +reaction-plugin = { path = "plugins/reaction-plugin" } [build-dependencies] clap = { version = "4.5.4", features = ["derive"] } @@ -74,7 +74,9 @@ assert_cmd = "2.0.17" predicates = "3.1.3" [workspace] +members = ["plugins/reaction-plugin", "plugins/reaction-plugin-virtual"] [workspace.dependencies] remoc = { version = "0.18.3" } serde = { version = "1.0.203", features = ["derive"] } +tokio = { version = "1.40.0" } diff --git a/plugins/reaction-plugin-virtual/Cargo.toml b/plugins/reaction-plugin-virtual/Cargo.toml new file mode 100644 index 0000000..55ce41c --- /dev/null +++ b/plugins/reaction-plugin-virtual/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "reaction-plugin-virtual" +version = "0.1.0" +edition = "2024" + +[dependencies] +tokio = { workspace = true, features = ["rt-multi-thread"] } +remoc.workspace = true +reaction-plugin.path = "../reaction-plugin" diff --git a/plugins/reaction-plugin-virtual/src/main.rs b/plugins/reaction-plugin-virtual/src/main.rs new file mode 100644 index 0000000..330a0b2 --- /dev/null +++ b/plugins/reaction-plugin-virtual/src/main.rs @@ -0,0 +1,241 @@ +use std::collections::BTreeMap; + +use reaction_plugin::{ActionImpl, Exec, Line, PluginInfo, RemoteResult, StreamImpl, Value}; +use remoc::rch::mpsc; + +#[tokio::main] +async fn main() { + let plugin = Plugin::default(); + reaction_plugin::main_loop(plugin).await; +} + +#[derive(Default)] +struct Plugin { + streams: BTreeMap, + actions_init: Vec, +} + +impl PluginInfo for Plugin { + async fn stream_impls(&self) -> RemoteResult> { + Ok(vec!["virtual".into()]) + } + + async fn action_impls(&self) -> RemoteResult> { + Ok(vec!["virtual".into()]) + } + + async fn stream_impl( + &mut self, + stream_name: String, + stream_type: String, + config: Value, + ) -> RemoteResult> { + if stream_type != "virtual" { + return Ok(Err( + "This plugin can't handle other stream types than virtual".into(), + )); + } + + let (virtual_stream, receiver) = match VirtualStream::new(config) { + Ok(v) => v, + Err(err) => return Ok(Err(err)), + }; + + if let Some(_) = self.streams.insert(stream_name, virtual_stream) { + return Ok(Err( + "this virtual stream has already been initialized".into() + )); + } + + Ok(Ok(StreamImpl { stream: receiver })) + } + + async fn action_impl( + &mut self, + stream_name: String, + filter_name: String, + action_name: String, + action_type: String, + config: Value, + patterns: Vec, + ) -> RemoteResult> { + if &action_type != "virtual" { + return Ok(Err( + "This plugin can't handle other stream types than virtual".into(), + )); + } + + let (virtual_action_init, tx) = + match VirtualActionInit::new(stream_name, filter_name, action_name, config, patterns) { + Ok(v) => v, + Err(err) => return Ok(Err(err)), + }; + + self.actions_init.push(virtual_action_init); + Ok(Ok(ActionImpl { tx })) + } + + async fn finish_setup(&mut self) -> RemoteResult> { + while let Some(action_init) = self.actions_init.pop() { + match self.streams.get(&action_init.to) { + Some(virtual_stream) => { + let virtual_stream = virtual_stream.clone(); + tokio::spawn(async move { + VirtualAction::from(action_init, virtual_stream) + .serve() + .await + }); + } + None => { + return Ok(Err(format!( + "action {}.{}.{}: send \"{}\" matches no stream name", + action_init.stream_name, + action_init.filter_name, + action_init.action_name, + action_init.to + ))); + } + } + } + self.streams = BTreeMap::new(); + self.actions_init = Vec::new(); + + Ok(Ok(())) + } + + async fn close(self) -> RemoteResult<()> { + Ok(()) + } +} + +#[derive(Clone)] +struct VirtualStream { + tx: mpsc::Sender>, +} + +impl VirtualStream { + fn new(config: Value) -> Result<(Self, mpsc::Receiver), String> { + const CONFIG_ERROR: &'static str = "streams of type virtual take no options"; + match config { + Value::Null => (), + Value::Object(map) => { + if map.len() != 0 { + return Err(CONFIG_ERROR.into()); + } + } + _ => return Err(CONFIG_ERROR.into()), + } + + let (tx, rx) = mpsc::channel(0); + Ok((Self { tx }, rx)) + } +} + +struct VirtualActionInit { + stream_name: String, + filter_name: String, + action_name: String, + rx: mpsc::Receiver, + patterns: Vec, + send: String, + to: String, +} + +impl VirtualActionInit { + fn new( + stream_name: String, + filter_name: String, + action_name: String, + config: Value, + patterns: Vec, + ) -> Result<(Self, mpsc::Sender), String> { + let send; + let to; + match config { + Value::Object(mut map) => { + send = match map.remove("send") { + Some(Value::String(value)) => value, + _ => return Err("`send` must be a string to send to the corresponding virtual stream, example: \"ban \"".into()), + }; + + to = match map.remove("to") { + Some(Value::String(value)) => value, + _ => return Err("`to` must be the name of the corresponding virtual stream, example: \"my_stream\"".into()), + }; + + if map.len() != 0 { + return Err( + "actions of type virtual accept only `send` and `to` options".into(), + ); + } + } + _ => { + return Err("actions of type virtual require `send` and `to` options".into()); + } + } + + let patterns = patterns + .into_iter() + .map(|pattern| format!("<{pattern}>")) + .collect(); + + let (tx, rx) = mpsc::channel(0); + Ok(( + Self { + stream_name, + filter_name, + action_name, + rx, + patterns, + send, + to, + }, + tx, + )) + } +} + +struct VirtualAction { + rx: mpsc::Receiver, + patterns: Vec, + send: String, + to: VirtualStream, +} + +impl VirtualAction { + fn from(action_init: VirtualActionInit, to: VirtualStream) -> VirtualAction { + VirtualAction { + rx: action_init.rx, + patterns: action_init.patterns, + send: action_init.send, + to, + } + } + + async fn serve(&mut self) { + loop { + match self.rx.recv().await { + Ok(Some(m)) => { + let line = if m.match_.is_empty() { + self.send.clone() + } else { + (0..(m.match_.len())) + .zip(&self.patterns) + .fold(self.send.clone(), |acc, (i, pattern)| { + acc.replace(pattern, &m.match_[i]) + }) + }; + let result = match self.to.tx.send(Line::Ok(line)).await { + Ok(_) => Ok(()), + Err(err) => Err(format!("{err}")), + }; + m.result.send(result).unwrap(); + } + Ok(None) => { + return; + } + Err(_) => panic!(), + } + } + } +} diff --git a/reaction-plugin/Cargo.lock b/plugins/reaction-plugin/Cargo.lock similarity index 100% rename from reaction-plugin/Cargo.lock rename to plugins/reaction-plugin/Cargo.lock diff --git a/reaction-plugin/Cargo.toml b/plugins/reaction-plugin/Cargo.toml similarity index 71% rename from reaction-plugin/Cargo.toml rename to plugins/reaction-plugin/Cargo.toml index 96e50e7..94f6d71 100644 --- a/reaction-plugin/Cargo.toml +++ b/plugins/reaction-plugin/Cargo.toml @@ -6,3 +6,4 @@ edition = "2024" [dependencies] remoc.workspace = true serde.workspace = true +tokio = { workspace = true, features = ["io-std"] } diff --git a/reaction-plugin/src/lib.rs b/plugins/reaction-plugin/src/lib.rs similarity index 81% rename from reaction-plugin/src/lib.rs rename to plugins/reaction-plugin/src/lib.rs index a46343e..caccca3 100644 --- a/reaction-plugin/src/lib.rs +++ b/plugins/reaction-plugin/src/lib.rs @@ -8,8 +8,12 @@ use std::collections::BTreeMap; /// To implement a plugin, one has to provide an implementation of [`PluginInfo`], that provides /// the entrypoint for a plugin. /// It permits to define 0 to n (stream, filter, action) custom types. -use remoc::{rch, rtc}; +use remoc::{ + Connect, rch, + rtc::{self, Server}, +}; use serde::{Deserialize, Serialize}; +use tokio::io::{stdin, stdout}; /// This is the only trait that **must** be implemented by a plugin. /// It provides lists of stream, filter and action types implemented by a dynamic plugin. @@ -46,6 +50,7 @@ pub trait PluginInfo { action_name: String, action_type: String, config: Value, + patterns: Vec, ) -> RemoteResult>; /// Notify the plugin that setup is finished, permitting a last occasion to report an error @@ -72,9 +77,11 @@ pub enum Value { #[derive(Serialize, Deserialize)] pub struct StreamImpl { - pub stream: rch::mpsc::Receiver>, + pub stream: rch::mpsc::Receiver, } +pub type Line = Result; + // #[derive(Serialize, Deserialize)] // pub struct FilterImpl { // pub stream: rch::lr::Sender, @@ -88,7 +95,7 @@ pub struct StreamImpl { #[derive(Clone, Serialize, Deserialize)] pub struct ActionImpl { - pub sender: rch::mpsc::Sender, + pub tx: rch::mpsc::Sender, } #[derive(Serialize, Deserialize)] @@ -98,3 +105,16 @@ pub struct Exec { } // TODO write main function here? +pub async fn main_loop(plugin_info: T) { + let (conn, mut tx, _rx): ( + _, + remoc::rch::base::Sender, + remoc::rch::base::Receiver<()>, + ) = Connect::io_buffered(remoc::Cfg::default(), stdin(), stdout(), 2048) + .await + .unwrap(); + + let (server, client) = PluginInfoServer::new(plugin_info, 1); + + let _ = tokio::join!(tx.send(client), server.serve(), tokio::spawn(conn)); +} diff --git a/src/daemon/filter/mod.rs b/src/daemon/filter/mod.rs index 9bbe66c..11a4a48 100644 --- a/src/daemon/filter/mod.rs +++ b/src/daemon/filter/mod.rs @@ -73,6 +73,11 @@ impl FilterManager { action.name.clone(), action.action_type.clone().unwrap(), action.options.clone(), + action + .patterns + .iter() + .map(|pattern| pattern.name.clone()) + .collect(), ) .await?, ); @@ -400,7 +405,7 @@ fn exec_now( // Sending action let (response_tx, response_rx) = remoc::rch::oneshot::channel(); if let Err(err) = action_impl - .sender + .tx .send(reaction_plugin::Exec { match_: m, result: response_tx, diff --git a/src/daemon/plugin/mod.rs b/src/daemon/plugin/mod.rs index 79c2b05..ef1b70d 100644 --- a/src/daemon/plugin/mod.rs +++ b/src/daemon/plugin/mod.rs @@ -193,6 +193,7 @@ impl Plugins { action_name: String, action_type: String, config: Value, + patterns: Vec, ) -> Result { let plugin_name = self .actions