From dc9e577fc8138c550ad11d08e286588caafa249c Mon Sep 17 00:00:00 2001 From: ppom Date: Sat, 27 Sep 2025 12:00:00 +0200 Subject: [PATCH] First WIP iteration on the plugin system, reaction side. Delaying the implementation of plugin Filters. I'm not sure it's useful, (apart from JSON, what can be done?) and it's likely to be more painful than the rest. I'll probably just implement one custom JSON Filter like I did with Pattern's IP support. --- reaction-plugin/src/lib.rs | 32 +++---- src/concepts/action.rs | 37 ++++++-- src/concepts/config.rs | 51 ++++++----- src/concepts/filter.rs | 17 +++- src/concepts/mod.rs | 68 +++++++++++++++ src/concepts/stream.rs | 67 ++++++++++----- src/daemon/mod.rs | 10 ++- src/lib.rs | 1 + src/plugin/mod.rs | 168 +++++++++++++++++++++++++++++++++++++ src/plugin/value.rs | 34 ++++++++ 10 files changed, 418 insertions(+), 67 deletions(-) create mode 100644 src/plugin/mod.rs create mode 100644 src/plugin/value.rs diff --git a/reaction-plugin/src/lib.rs b/reaction-plugin/src/lib.rs index a1a51f7..36095d3 100644 --- a/reaction-plugin/src/lib.rs +++ b/reaction-plugin/src/lib.rs @@ -36,16 +36,16 @@ pub trait PluginInfo { config: Value, ) -> Result; - /// Return all filter types that should be made available to reaction users - extern "C" fn filter_impls(&self) -> Vec; - /// Return one instance of a given type. - extern "C" fn filter_impl( - &mut self, - stream_name: String, - filter_name: String, - filter_type: String, - config: Value, - ) -> Result; + // /// Return all filter types that should be made available to reaction users + // extern "C" fn filter_impls(&self) -> Vec; + // /// Return one instance of a given type. + // extern "C" fn filter_impl( + // &mut self, + // stream_name: String, + // filter_name: String, + // filter_type: String, + // config: Value, + // ) -> Result; /// Return all action types that should be made available to reaction users extern "C" fn action_impls(&self) -> Vec; @@ -66,7 +66,7 @@ pub trait PluginInfo { pub type BoxedPluginInfo = stabby::dynptr!(Box); pub type BoxedStreamImpl = stabby::dynptr!(Box); -pub type BoxedFilterImpl = stabby::dynptr!(Box); +// pub type BoxedFilterImpl = stabby::dynptr!(Box); pub type BoxedActionImpl = stabby::dynptr!(Box); #[stabby::stabby(checked)] @@ -75,11 +75,11 @@ pub trait StreamImpl { extern "C" fn close<'a>(&'a mut self) -> DynFuture<'a, Result<(), String>>; } -#[stabby::stabby(checked)] -pub trait FilterImpl { - extern "C" fn matches<'a>(&'a mut self, line: String) -> DynFuture<'a, bool>; - extern "C" fn close<'a>(&'a mut self) -> DynFuture<'a, Result<(), String>>; -} +// #[stabby::stabby(checked)] +// pub trait FilterImpl { +// extern "C" fn matches<'a>(&'a mut self, line: String) -> DynFuture<'a, bool>; +// extern "C" fn close<'a>(&'a mut self) -> DynFuture<'a, Result<(), String>>; +// } #[stabby::stabby(checked)] pub trait ActionImpl { diff --git a/src/concepts/action.rs b/src/concepts/action.rs index c63724e..772fb48 100644 --- a/src/concepts/action.rs +++ b/src/concepts/action.rs @@ -3,10 +3,12 @@ use std::{cmp::Ordering, collections::BTreeSet, fmt::Display, sync::Arc}; use chrono::TimeDelta; use serde::{Deserialize, Serialize}; +use serde_json::Value; use tokio::process::Command; -use super::{parse_duration::*, PatternType}; -use super::{Match, Pattern}; +use crate::plugin::Plugins; + +use super::{null_value, parse_duration::*, Match, Pattern, PatternType}; #[derive(Clone, Debug, Default, Deserialize, Serialize)] #[serde(deny_unknown_fields)] @@ -41,6 +43,12 @@ pub struct Action { pub filter_name: String, #[serde(skip)] pub stream_name: String, + + // Plugin-specific + #[serde(default, rename = "type")] + pub action_type: Option, + #[serde(default = "null_value")] + pub options: Value, } fn set_false() -> bool { @@ -82,11 +90,21 @@ impl Action { return Err("character '.' is not allowed in filter name".into()); } - if self.cmd.is_empty() { - return Err("cmd is empty".into()); - } - if self.cmd[0].is_empty() { - return Err("cmd's first item is empty".into()); + if self + .action_type + .as_ref() + .is_none_or(|stream_type| stream_type == "cmd") + { + if self.cmd.is_empty() { + return Err("cmd is empty".into()); + } + if self.cmd[0].is_empty() { + return Err("cmd's first item is empty".into()); + } + } else { + if !self.cmd.is_empty() { + return Err("can't define cmd and a plugin type".into()); + } } if let Some(after) = &self.after { @@ -118,6 +136,11 @@ impl Action { Ok(()) } + pub fn plugin_setup(&mut self, plugins: &mut Plugins) -> Result<(), String> { + // TODO self setup + Ok(()) + } + // TODO test pub fn exec(&self, match_: &Match) -> Command { let computed_command = if self.patterns.is_empty() { diff --git a/src/concepts/config.rs b/src/concepts/config.rs index 4bee310..2bf1296 100644 --- a/src/concepts/config.rs +++ b/src/concepts/config.rs @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; use thiserror::Error; use tracing::{debug, error, info, warn}; -use super::{Pattern, Stream}; +use super::{merge_attrs, Pattern, Stream}; pub type Patterns = BTreeMap>; @@ -24,6 +24,9 @@ pub struct Config { #[serde(default = "dot", skip_serializing_if = "String::is_empty")] pub state_directory: String, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub plugin_directories: Vec, + #[serde(default)] pub patterns: Patterns, @@ -73,25 +76,26 @@ impl Config { self.start.append(&mut other.start); self.stop.append(&mut other.stop); - if !(self.state_directory == dot() - || other.state_directory == dot() - || self.state_directory == other.state_directory) - { - return Err("state_directory have conflicting definitions".into()); - } - if self.state_directory == dot() { - self.state_directory = other.state_directory; - } + self.state_directory = merge_attrs( + self.state_directory.clone(), + other.state_directory, + ".".into(), + "state_directory", + )?; - if !(self.concurrency == num_cpus::get() - || other.concurrency == num_cpus::get() - || self.concurrency == other.concurrency) - { - return Err("concurrency have conflicting definitions".into()); - } - if self.concurrency == num_cpus::get() { - self.concurrency = other.concurrency; - } + self.plugin_directories = merge_attrs( + self.plugin_directories.clone(), + other.plugin_directories, + Vec::default(), + "plugin_directories", + )?; + + self.concurrency = merge_attrs( + self.concurrency, + other.concurrency, + num_cpus::get(), + "concurrency", + )?; Ok(()) } @@ -104,6 +108,15 @@ impl Config { // Nullify this useless field self._definitions = serde_json::Value::Null; + for dir in &self.plugin_directories { + if dir.is_empty() { + return Err("can't specify empty plugin directory".into()); + } + if !dir.starts_with("/") { + return Err(format!("plugin directory paths must be absolute: {dir}")); + } + } + if self.patterns.is_empty() { return Err("no patterns configured".into()); } diff --git a/src/concepts/filter.rs b/src/concepts/filter.rs index fef8391..fdb656d 100644 --- a/src/concepts/filter.rs +++ b/src/concepts/filter.rs @@ -10,8 +10,9 @@ use chrono::TimeDelta; use regex::Regex; use serde::{Deserialize, Serialize}; -use super::{parse_duration, PatternType}; -use super::{Action, Match, Pattern, Patterns}; +use crate::plugin::Plugins; + +use super::{parse_duration, Action, Match, Pattern, PatternType, Patterns}; #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Deserialize, Serialize)] pub enum Duplicate { @@ -58,6 +59,11 @@ pub struct Filter { pub name: String, #[serde(skip)] pub stream_name: String, + // // Plugin-specific + // #[serde(default, rename = "type")] + // pub filter_type: Option, + // #[serde(default = "null_value")] + // pub options: Value, } impl Filter { @@ -190,6 +196,13 @@ impl Filter { Ok(()) } + pub fn plugin_setup(&mut self, plugins: &mut Plugins) -> Result<(), String> { + for (_, action) in &mut self.actions { + action.plugin_setup(plugins)?; + } + Ok(()) + } + pub fn get_match(&self, line: &str) -> Option { for regex in &self.compiled_regex { if let Some(matches) = regex.captures(line) { diff --git a/src/concepts/mod.rs b/src/concepts/mod.rs index 40a624c..b059b62 100644 --- a/src/concepts/mod.rs +++ b/src/concepts/mod.rs @@ -5,12 +5,15 @@ mod parse_duration; mod pattern; mod stream; +use std::fmt::Debug; + pub use action::Action; pub use config::{Config, Patterns}; pub use filter::{Duplicate, Filter}; use parse_duration::parse_duration; pub use pattern::{Pattern, PatternType}; use serde::{Deserialize, Serialize}; +use serde_json::Value; pub use stream::Stream; use chrono::{DateTime, Local}; @@ -24,5 +27,70 @@ pub struct MatchTime { pub t: Time, } +fn merge_attrs( + this: A, + other: A, + default: A, + name: &str, +) -> Result { + if !(this == default || other == default || this == other) { + return Err(format!( + "'{name}' has conflicting definitions: '{this:?}', '{other:?}'" + )); + } + if this == default { + return Ok(other); + } + return Ok(this); +} + +fn null_value() -> Value { + Value::Null +} + #[cfg(test)] pub use filter::tests as filter_tests; + +#[cfg(test)] +mod tests { + use crate::concepts::merge_attrs; + + #[test] + fn test_merge_attrs() { + assert_eq!(merge_attrs(None::, None, None, "t"), Ok(None)); + assert_eq!( + merge_attrs(Some("coucou"), None, None, "t"), + Ok(Some("coucou")) + ); + assert_eq!( + merge_attrs(None, Some("coucou"), None, "t"), + Ok(Some("coucou")) + ); + assert_eq!( + merge_attrs(Some("coucou"), Some("coucou"), None, "t"), + Ok(Some("coucou")) + ); + assert_eq!( + merge_attrs(Some("coucou"), Some("hello"), None, "t"), + Err("'t' has conflicting definitions: 'Some(\"coucou\")', 'Some(\"hello\")'".into()) + ); + + assert_eq!(merge_attrs("", "", "", "t"), Ok("")); + assert_eq!(merge_attrs("coucou", "", "", "t"), Ok("coucou")); + assert_eq!(merge_attrs("", "coucou", "", "t"), Ok("coucou")); + assert_eq!(merge_attrs("coucou", "coucou", "", "t"), Ok("coucou")); + assert_eq!( + merge_attrs("coucou", "hello", "", "t"), + Err("'t' has conflicting definitions: '\"coucou\"', '\"hello\"'".into()) + ); + + assert_eq!(merge_attrs(0, 0, 0, "t"), Ok(0)); + assert_eq!(merge_attrs(5, 0, 0, "t"), Ok(5)); + assert_eq!(merge_attrs(0, 5, 0, "t"), Ok(5)); + assert_eq!(merge_attrs(5, 5, 0, "t"), Ok(5)); + assert_eq!( + merge_attrs(5, 6, 0, "t"), + Err("'t' has conflicting definitions: '5', '6'".into()) + ); + } +} diff --git a/src/concepts/stream.rs b/src/concepts/stream.rs index 011652b..56e070e 100644 --- a/src/concepts/stream.rs +++ b/src/concepts/stream.rs @@ -1,8 +1,12 @@ use std::{cmp::Ordering, collections::BTreeMap, hash::Hash}; +use reaction_plugin::BoxedStreamImpl; use serde::{Deserialize, Serialize}; +use serde_json::Value; -use super::{Filter, Patterns}; +use crate::plugin::Plugins; + +use super::{merge_attrs, null_value, Filter, Patterns}; #[derive(Clone, Debug, Deserialize, Serialize)] #[cfg_attr(test, derive(Default))] @@ -10,11 +14,18 @@ use super::{Filter, Patterns}; pub struct Stream { #[serde(default)] pub cmd: Vec, + #[serde(default)] pub filters: BTreeMap, #[serde(skip)] pub name: String, + + // Plugin-specific + #[serde(default, rename = "type")] + pub stream_type: Option, + #[serde(default = "null_value")] + pub options: Value, } impl Stream { @@ -23,13 +34,8 @@ impl Stream { } pub fn merge(&mut self, other: Stream) -> Result<(), String> { - if !(self.cmd.is_empty() || other.cmd.is_empty() || self.cmd == other.cmd) { - return Err("cmd has conflicting definitions".into()); - } - - if self.cmd.is_empty() { - self.cmd = other.cmd; - } + self.cmd = merge_attrs(self.cmd.clone(), other.cmd, Vec::default(), "cmd")?; + self.stream_type = merge_attrs(self.stream_type.clone(), other.stream_type, None, "type")?; for (key, filter) in other.filters.into_iter() { if self.filters.insert(key.clone(), filter).is_some() { @@ -40,6 +46,12 @@ impl Stream { Ok(()) } + fn is_plugin(&self) -> bool { + self.stream_type + .as_ref() + .is_some_and(|stream_type| stream_type != "cmd") + } + pub fn setup(&mut self, name: &str, patterns: &Patterns) -> Result<(), String> { self._setup(name, patterns) .map_err(|msg| format!("stream {}: {}", name, msg)) @@ -55,11 +67,17 @@ impl Stream { return Err("character '.' is not allowed in stream name".into()); } - if self.cmd.is_empty() { - return Err("cmd is empty".into()); - } - if self.cmd[0].is_empty() { - return Err("cmd's first item is empty".into()); + if !self.is_plugin() { + if self.cmd.is_empty() { + return Err("cmd is empty".into()); + } + if self.cmd[0].is_empty() { + return Err("cmd's first item is empty".into()); + } + } else { + if !self.cmd.is_empty() { + return Err("can't define cmd and a plugin type".into()); + } } if self.filters.is_empty() { @@ -72,6 +90,19 @@ impl Stream { Ok(()) } + + // FIXME Nan faut pas que je fasse ça là en fait, ça doit se passer côté StreamManager en fait + // j'pense + pub fn plugin_setup(&mut self, plugins: &mut Plugins) -> Result<(), String> { + if self.is_plugin() { + plugins.init_stream_impl(self.name, self.stream_type, self.options); + } + + for (_, filter) in &mut self.filters { + filter.plugin_setup(plugins)?; + } + Ok(()) + } } impl PartialEq for Stream { @@ -102,16 +133,8 @@ mod tests { use super::*; use crate::concepts::filter::tests::ok_filter; - fn default_stream() -> Stream { - Stream { - cmd: Vec::new(), - name: "".into(), - filters: BTreeMap::new(), - } - } - fn ok_stream() -> Stream { - let mut stream = default_stream(); + let mut stream = Stream::default(); stream.cmd = vec!["command".into()]; stream.filters.insert("name".into(), ok_filter()); stream diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index c68f8e2..547c9b8 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -35,6 +35,13 @@ pub async fn daemon( config_path: PathBuf, socket: PathBuf, ) -> Result<(), Box> { + // Je dois + // 1. Fusionner toute la config + // 2. Charger tous les plugins + // 3. Setup la config, avec les plugins + // 4. Supprimer la struct des plugins + // → En fait nan, les plugins c'est pas du static, c'est live, faut que ça vivent dans le + // daemon! Au même endroit que les Command sont lancées en fait ! let config: &'static Config = Box::leak(Box::new(Config::from_path(&config_path)?)); // Cancellation Token @@ -66,7 +73,8 @@ pub async fn daemon( let mut filter_managers = HashMap::new(); for filter in stream.filters.values() { let manager = - FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &mut db, now).await?; + FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &mut db, now) + .await?; filter_managers.insert(filter, manager); } state.insert(stream, filter_managers.clone()); diff --git a/src/lib.rs b/src/lib.rs index 3619f1a..fb3cb94 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ pub mod cli; pub mod client; pub mod concepts; pub mod daemon; +pub mod plugin; pub mod protocol; pub mod tests; pub mod treedb; diff --git a/src/plugin/mod.rs b/src/plugin/mod.rs new file mode 100644 index 0000000..e1e3b8c --- /dev/null +++ b/src/plugin/mod.rs @@ -0,0 +1,168 @@ +use std::{collections::BTreeMap, path::PathBuf}; + +use reaction_plugin::{ + BoxedActionImpl, BoxedPluginInfo, BoxedStreamImpl, PluginInfoDyn, PluginInfoDynMut, +}; +use serde_json::Value; +use stabby::libloading::StabbyLibrary; +use tokio::{fs::read_dir, runtime::Handle}; +use value::to_stable_value; + +mod value; + +#[derive(Default)] +pub struct Plugins { + plugins: BTreeMap, + streams: BTreeMap, + // filters: BTreeMap, + actions: BTreeMap, +} + +impl Plugins { + pub async fn import(&mut self, plugin_directories: Vec) -> Result<(), String> { + for plugin_directory in plugin_directories { + let mut dir_entries = read_dir(&plugin_directory).await.map_err(|err| { + format!("Error reading plugin directory {plugin_directory}: {err}") + })?; + loop { + match dir_entries.next_entry().await { + Err(err) => { + return Err(format!( + "Error reading plugin directory {plugin_directory}: {err}" + )) + } + + Ok(None) => break, + + Ok(Some(entry)) => { + let filename = PathBuf::from(&plugin_directory).join(&entry.file_name()); + self.load_plugin(filename.clone()) + .await + .map_err(|err| format!("Error loading plugin {filename:?}: {err}"))?; + } + } + } + } + Ok(()) + } + + async fn load_plugin(&mut self, filename: PathBuf) -> Result<(), String> { + // TODO check ownership of file? + + let name = filename.to_string_lossy().to_string(); + // SAFETY This function is exposed by libloading as unsafe + // But we're (hopefully) gonna be safe with stabby <3 + #[allow(unsafe_code)] + let plugin = Handle::current() + .spawn_blocking(|| unsafe { libloading::Library::new(filename) }) + .await + // Join Error + .map_err(|err| err.to_string())? + // Libloading Error + .map_err(|err| err.to_string())?; + + // SAFETY This function is exposed by stabby as unsafe + // But we're (hopefully) gonna be safe <3 + #[allow(unsafe_code)] + let plugin_init = unsafe { + plugin.get_stabbied:: BoxedPluginInfo>(b"reaction_plugin") + }.map_err(|err| format!("expected entrypoint `fn reaction_plugin() -> BoxedPluginInfo` is either not present or malformed: {err}"))?; + + let plugin_info = plugin_init(); + + for stream in plugin_info.stream_impls() { + if let Some(name) = self.streams.insert(stream.clone().into(), name.clone()) { + return Err(format!( + "plugin {name} already exposed a stream with type name '{stream}'" + )); + } + } + + // for filter in plugin_info.filter_impls() { + // if let Some(name) = self.filters.insert(filter.clone().into(), name.clone()) { + // return Err(format!( + // "plugin {name} already exposed a filter with type name '{filter}'" + // )); + // } + // } + + for action in plugin_info.action_impls() { + if let Some(name) = self.actions.insert(action.clone().into(), name.clone()) { + return Err(format!( + "plugin {name} already exposed a action with type name '{action}'" + )); + } + } + + self.plugins.insert(name, plugin_info); + + Ok(()) + } + + pub fn finish_plugin_setup(self) -> Result<(), String> { + for mut plugin in self.plugins.into_values() { + // Didn't find a more elegant way to manipulate [`stabby::result::Result`] + let result = plugin.finish_setup(); + if result.is_err() { + return Err(result.unwrap_err().into()); + } + } + Ok(()) + } + + pub fn init_stream_impl( + &mut self, + stream_name: String, + stream_type: String, + config: Value, + ) -> Result { + let plugin_name = self + .streams + .get(&stream_type) + .ok_or(format!("No plugin provided a stream type '{stream_type}'"))?; + + let plugin = self.plugins.get_mut(plugin_name).unwrap(); + + let result = plugin.stream_impl( + stream_name.into(), + stream_type.into(), + to_stable_value(config), + ); + + if result.is_ok() { + Ok(result.unwrap()) + } else { + Err(result.err().unwrap().into()) + } + } + + pub fn init_action_impl( + &mut self, + stream_name: String, + filter_name: String, + action_name: String, + action_type: String, + config: Value, + ) -> Result { + let plugin_name = self + .actions + .get(&action_type) + .ok_or(format!("No plugin provided a action type '{action_type}'"))?; + + let plugin = self.plugins.get_mut(plugin_name).unwrap(); + + let result = plugin.action_impl( + stream_name.into(), + filter_name.into(), + action_name.into(), + action_type.into(), + to_stable_value(config), + ); + + if result.is_ok() { + Ok(result.unwrap()) + } else { + Err(result.err().unwrap().into()) + } + } +} diff --git a/src/plugin/value.rs b/src/plugin/value.rs new file mode 100644 index 0000000..e8b7d31 --- /dev/null +++ b/src/plugin/value.rs @@ -0,0 +1,34 @@ +use reaction_plugin::Value as RValue; +use serde_json::Value as JValue; +use stabby::{tuple::Tuple2, vec::Vec}; + +pub fn to_stable_value(val: JValue) -> RValue { + match val { + JValue::Null => RValue::Null, + JValue::Bool(b) => RValue::Bool(b), + JValue::Number(number) => { + if let Some(number) = number.as_i64() { + RValue::Integer(number) + } else if let Some(number) = number.as_f64() { + RValue::Float(number) + } else { + RValue::Null + } + } + JValue::String(s) => RValue::String(s.into()), + JValue::Array(v) => RValue::Array({ + let mut vec = Vec::with_capacity(v.len()); + for val in v { + vec.push(to_stable_value(val)); + } + vec + }), + JValue::Object(m) => RValue::Object({ + let mut map = Vec::with_capacity(m.len()); + for (key, val) in m { + map.push(Tuple2(key.into(), to_stable_value(val))); + } + map + }), + } +}