First shot of "virtual stream" plugin

This commit is contained in:
ppom 2025-10-03 12:00:00 +02:00
commit f08762c3f3
No known key found for this signature in database
9 changed files with 295 additions and 6 deletions

10
Cargo.lock generated
View file

@ -1068,6 +1068,16 @@ version = "0.1.0"
dependencies = [
"remoc",
"serde",
"tokio",
]
[[package]]
name = "reaction-plugin-virtual"
version = "0.1.0"
dependencies = [
"reaction-plugin",
"remoc",
"tokio",
]
[[package]]

View file

@ -50,14 +50,14 @@ jrsonnet-evaluator = "0.4.2"
thiserror = "1.0.63"
# Async runtime & helpers
futures = "0.3.30"
tokio = { version = "1.40.0", features = ["full", "tracing"] }
tokio = { workspace = true, features = ["full", "tracing"] }
tokio-util = { version = "0.7.12", features = ["codec"] }
# Async logging
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
# Reaction plugin system
remoc = { workspace = true }
reaction-plugin = { path = "reaction-plugin" }
reaction-plugin = { path = "plugins/reaction-plugin" }
[build-dependencies]
clap = { version = "4.5.4", features = ["derive"] }
@ -74,7 +74,9 @@ assert_cmd = "2.0.17"
predicates = "3.1.3"
[workspace]
members = ["plugins/reaction-plugin", "plugins/reaction-plugin-virtual"]
[workspace.dependencies]
remoc = { version = "0.18.3" }
serde = { version = "1.0.203", features = ["derive"] }
tokio = { version = "1.40.0" }

View file

@ -0,0 +1,9 @@
[package]
name = "reaction-plugin-virtual"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { workspace = true, features = ["rt-multi-thread"] }
remoc.workspace = true
reaction-plugin.path = "../reaction-plugin"

View file

@ -0,0 +1,241 @@
use std::collections::BTreeMap;
use reaction_plugin::{ActionImpl, Exec, Line, PluginInfo, RemoteResult, StreamImpl, Value};
use remoc::rch::mpsc;
#[tokio::main]
async fn main() {
let plugin = Plugin::default();
reaction_plugin::main_loop(plugin).await;
}
#[derive(Default)]
struct Plugin {
streams: BTreeMap<String, VirtualStream>,
actions_init: Vec<VirtualActionInit>,
}
impl PluginInfo for Plugin {
async fn stream_impls(&self) -> RemoteResult<Vec<String>> {
Ok(vec!["virtual".into()])
}
async fn action_impls(&self) -> RemoteResult<Vec<String>> {
Ok(vec!["virtual".into()])
}
async fn stream_impl(
&mut self,
stream_name: String,
stream_type: String,
config: Value,
) -> RemoteResult<Result<StreamImpl, String>> {
if stream_type != "virtual" {
return Ok(Err(
"This plugin can't handle other stream types than virtual".into(),
));
}
let (virtual_stream, receiver) = match VirtualStream::new(config) {
Ok(v) => v,
Err(err) => return Ok(Err(err)),
};
if let Some(_) = self.streams.insert(stream_name, virtual_stream) {
return Ok(Err(
"this virtual stream has already been initialized".into()
));
}
Ok(Ok(StreamImpl { stream: receiver }))
}
async fn action_impl(
&mut self,
stream_name: String,
filter_name: String,
action_name: String,
action_type: String,
config: Value,
patterns: Vec<String>,
) -> RemoteResult<Result<ActionImpl, String>> {
if &action_type != "virtual" {
return Ok(Err(
"This plugin can't handle other stream types than virtual".into(),
));
}
let (virtual_action_init, tx) =
match VirtualActionInit::new(stream_name, filter_name, action_name, config, patterns) {
Ok(v) => v,
Err(err) => return Ok(Err(err)),
};
self.actions_init.push(virtual_action_init);
Ok(Ok(ActionImpl { tx }))
}
async fn finish_setup(&mut self) -> RemoteResult<Result<(), String>> {
while let Some(action_init) = self.actions_init.pop() {
match self.streams.get(&action_init.to) {
Some(virtual_stream) => {
let virtual_stream = virtual_stream.clone();
tokio::spawn(async move {
VirtualAction::from(action_init, virtual_stream)
.serve()
.await
});
}
None => {
return Ok(Err(format!(
"action {}.{}.{}: send \"{}\" matches no stream name",
action_init.stream_name,
action_init.filter_name,
action_init.action_name,
action_init.to
)));
}
}
}
self.streams = BTreeMap::new();
self.actions_init = Vec::new();
Ok(Ok(()))
}
async fn close(self) -> RemoteResult<()> {
Ok(())
}
}
#[derive(Clone)]
struct VirtualStream {
tx: mpsc::Sender<Result<String, String>>,
}
impl VirtualStream {
fn new(config: Value) -> Result<(Self, mpsc::Receiver<Line>), String> {
const CONFIG_ERROR: &'static str = "streams of type virtual take no options";
match config {
Value::Null => (),
Value::Object(map) => {
if map.len() != 0 {
return Err(CONFIG_ERROR.into());
}
}
_ => return Err(CONFIG_ERROR.into()),
}
let (tx, rx) = mpsc::channel(0);
Ok((Self { tx }, rx))
}
}
struct VirtualActionInit {
stream_name: String,
filter_name: String,
action_name: String,
rx: mpsc::Receiver<Exec>,
patterns: Vec<String>,
send: String,
to: String,
}
impl VirtualActionInit {
fn new(
stream_name: String,
filter_name: String,
action_name: String,
config: Value,
patterns: Vec<String>,
) -> Result<(Self, mpsc::Sender<Exec>), String> {
let send;
let to;
match config {
Value::Object(mut map) => {
send = match map.remove("send") {
Some(Value::String(value)) => value,
_ => return Err("`send` must be a string to send to the corresponding virtual stream, example: \"ban <ip>\"".into()),
};
to = match map.remove("to") {
Some(Value::String(value)) => value,
_ => return Err("`to` must be the name of the corresponding virtual stream, example: \"my_stream\"".into()),
};
if map.len() != 0 {
return Err(
"actions of type virtual accept only `send` and `to` options".into(),
);
}
}
_ => {
return Err("actions of type virtual require `send` and `to` options".into());
}
}
let patterns = patterns
.into_iter()
.map(|pattern| format!("<{pattern}>"))
.collect();
let (tx, rx) = mpsc::channel(0);
Ok((
Self {
stream_name,
filter_name,
action_name,
rx,
patterns,
send,
to,
},
tx,
))
}
}
struct VirtualAction {
rx: mpsc::Receiver<Exec>,
patterns: Vec<String>,
send: String,
to: VirtualStream,
}
impl VirtualAction {
fn from(action_init: VirtualActionInit, to: VirtualStream) -> VirtualAction {
VirtualAction {
rx: action_init.rx,
patterns: action_init.patterns,
send: action_init.send,
to,
}
}
async fn serve(&mut self) {
loop {
match self.rx.recv().await {
Ok(Some(m)) => {
let line = if m.match_.is_empty() {
self.send.clone()
} else {
(0..(m.match_.len()))
.zip(&self.patterns)
.fold(self.send.clone(), |acc, (i, pattern)| {
acc.replace(pattern, &m.match_[i])
})
};
let result = match self.to.tx.send(Line::Ok(line)).await {
Ok(_) => Ok(()),
Err(err) => Err(format!("{err}")),
};
m.result.send(result).unwrap();
}
Ok(None) => {
return;
}
Err(_) => panic!(),
}
}
}
}

View file

@ -6,3 +6,4 @@ edition = "2024"
[dependencies]
remoc.workspace = true
serde.workspace = true
tokio = { workspace = true, features = ["io-std"] }

View file

@ -8,8 +8,12 @@ use std::collections::BTreeMap;
/// To implement a plugin, one has to provide an implementation of [`PluginInfo`], that provides
/// the entrypoint for a plugin.
/// It permits to define 0 to n (stream, filter, action) custom types.
use remoc::{rch, rtc};
use remoc::{
Connect, rch,
rtc::{self, Server},
};
use serde::{Deserialize, Serialize};
use tokio::io::{stdin, stdout};
/// This is the only trait that **must** be implemented by a plugin.
/// It provides lists of stream, filter and action types implemented by a dynamic plugin.
@ -46,6 +50,7 @@ pub trait PluginInfo {
action_name: String,
action_type: String,
config: Value,
patterns: Vec<String>,
) -> RemoteResult<Result<ActionImpl, String>>;
/// Notify the plugin that setup is finished, permitting a last occasion to report an error
@ -72,9 +77,11 @@ pub enum Value {
#[derive(Serialize, Deserialize)]
pub struct StreamImpl {
pub stream: rch::mpsc::Receiver<Result<String, String>>,
pub stream: rch::mpsc::Receiver<Line>,
}
pub type Line = Result<String, String>;
// #[derive(Serialize, Deserialize)]
// pub struct FilterImpl {
// pub stream: rch::lr::Sender<Exec>,
@ -88,7 +95,7 @@ pub struct StreamImpl {
#[derive(Clone, Serialize, Deserialize)]
pub struct ActionImpl {
pub sender: rch::mpsc::Sender<Exec>,
pub tx: rch::mpsc::Sender<Exec>,
}
#[derive(Serialize, Deserialize)]
@ -98,3 +105,16 @@ pub struct Exec {
}
// TODO write main function here?
pub async fn main_loop<T: PluginInfo + Send + Sync + 'static>(plugin_info: T) {
let (conn, mut tx, _rx): (
_,
remoc::rch::base::Sender<PluginInfoClient>,
remoc::rch::base::Receiver<()>,
) = Connect::io_buffered(remoc::Cfg::default(), stdin(), stdout(), 2048)
.await
.unwrap();
let (server, client) = PluginInfoServer::new(plugin_info, 1);
let _ = tokio::join!(tx.send(client), server.serve(), tokio::spawn(conn));
}

View file

@ -73,6 +73,11 @@ impl FilterManager {
action.name.clone(),
action.action_type.clone().unwrap(),
action.options.clone(),
action
.patterns
.iter()
.map(|pattern| pattern.name.clone())
.collect(),
)
.await?,
);
@ -400,7 +405,7 @@ fn exec_now(
// Sending action
let (response_tx, response_rx) = remoc::rch::oneshot::channel();
if let Err(err) = action_impl
.sender
.tx
.send(reaction_plugin::Exec {
match_: m,
result: response_tx,

View file

@ -193,6 +193,7 @@ impl Plugins {
action_name: String,
action_type: String,
config: Value,
patterns: Vec<String>,
) -> Result<ActionImpl, String> {
let plugin_name = self
.actions