diff --git a/reaction-plugin/src/lib.rs b/reaction-plugin/src/lib.rs index a1a51f7..36095d3 100644 --- a/reaction-plugin/src/lib.rs +++ b/reaction-plugin/src/lib.rs @@ -36,16 +36,16 @@ pub trait PluginInfo { config: Value, ) -> Result; - /// Return all filter types that should be made available to reaction users - extern "C" fn filter_impls(&self) -> Vec; - /// Return one instance of a given type. - extern "C" fn filter_impl( - &mut self, - stream_name: String, - filter_name: String, - filter_type: String, - config: Value, - ) -> Result; + // /// Return all filter types that should be made available to reaction users + // extern "C" fn filter_impls(&self) -> Vec; + // /// Return one instance of a given type. + // extern "C" fn filter_impl( + // &mut self, + // stream_name: String, + // filter_name: String, + // filter_type: String, + // config: Value, + // ) -> Result; /// Return all action types that should be made available to reaction users extern "C" fn action_impls(&self) -> Vec; @@ -66,7 +66,7 @@ pub trait PluginInfo { pub type BoxedPluginInfo = stabby::dynptr!(Box); pub type BoxedStreamImpl = stabby::dynptr!(Box); -pub type BoxedFilterImpl = stabby::dynptr!(Box); +// pub type BoxedFilterImpl = stabby::dynptr!(Box); pub type BoxedActionImpl = stabby::dynptr!(Box); #[stabby::stabby(checked)] @@ -75,11 +75,11 @@ pub trait StreamImpl { extern "C" fn close<'a>(&'a mut self) -> DynFuture<'a, Result<(), String>>; } -#[stabby::stabby(checked)] -pub trait FilterImpl { - extern "C" fn matches<'a>(&'a mut self, line: String) -> DynFuture<'a, bool>; - extern "C" fn close<'a>(&'a mut self) -> DynFuture<'a, Result<(), String>>; -} +// #[stabby::stabby(checked)] +// pub trait FilterImpl { +// extern "C" fn matches<'a>(&'a mut self, line: String) -> DynFuture<'a, bool>; +// extern "C" fn close<'a>(&'a mut self) -> DynFuture<'a, Result<(), String>>; +// } #[stabby::stabby(checked)] pub trait ActionImpl { diff --git a/src/concepts/action.rs b/src/concepts/action.rs index c63724e..772fb48 100644 --- a/src/concepts/action.rs +++ b/src/concepts/action.rs @@ -3,10 +3,12 @@ use std::{cmp::Ordering, collections::BTreeSet, fmt::Display, sync::Arc}; use chrono::TimeDelta; use serde::{Deserialize, Serialize}; +use serde_json::Value; use tokio::process::Command; -use super::{parse_duration::*, PatternType}; -use super::{Match, Pattern}; +use crate::plugin::Plugins; + +use super::{null_value, parse_duration::*, Match, Pattern, PatternType}; #[derive(Clone, Debug, Default, Deserialize, Serialize)] #[serde(deny_unknown_fields)] @@ -41,6 +43,12 @@ pub struct Action { pub filter_name: String, #[serde(skip)] pub stream_name: String, + + // Plugin-specific + #[serde(default, rename = "type")] + pub action_type: Option, + #[serde(default = "null_value")] + pub options: Value, } fn set_false() -> bool { @@ -82,11 +90,21 @@ impl Action { return Err("character '.' is not allowed in filter name".into()); } - if self.cmd.is_empty() { - return Err("cmd is empty".into()); - } - if self.cmd[0].is_empty() { - return Err("cmd's first item is empty".into()); + if self + .action_type + .as_ref() + .is_none_or(|stream_type| stream_type == "cmd") + { + if self.cmd.is_empty() { + return Err("cmd is empty".into()); + } + if self.cmd[0].is_empty() { + return Err("cmd's first item is empty".into()); + } + } else { + if !self.cmd.is_empty() { + return Err("can't define cmd and a plugin type".into()); + } } if let Some(after) = &self.after { @@ -118,6 +136,11 @@ impl Action { Ok(()) } + pub fn plugin_setup(&mut self, plugins: &mut Plugins) -> Result<(), String> { + // TODO self setup + Ok(()) + } + // TODO test pub fn exec(&self, match_: &Match) -> Command { let computed_command = if self.patterns.is_empty() { diff --git a/src/concepts/config.rs b/src/concepts/config.rs index 4bee310..2bf1296 100644 --- a/src/concepts/config.rs +++ b/src/concepts/config.rs @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; use thiserror::Error; use tracing::{debug, error, info, warn}; -use super::{Pattern, Stream}; +use super::{merge_attrs, Pattern, Stream}; pub type Patterns = BTreeMap>; @@ -24,6 +24,9 @@ pub struct Config { #[serde(default = "dot", skip_serializing_if = "String::is_empty")] pub state_directory: String, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub plugin_directories: Vec, + #[serde(default)] pub patterns: Patterns, @@ -73,25 +76,26 @@ impl Config { self.start.append(&mut other.start); self.stop.append(&mut other.stop); - if !(self.state_directory == dot() - || other.state_directory == dot() - || self.state_directory == other.state_directory) - { - return Err("state_directory have conflicting definitions".into()); - } - if self.state_directory == dot() { - self.state_directory = other.state_directory; - } + self.state_directory = merge_attrs( + self.state_directory.clone(), + other.state_directory, + ".".into(), + "state_directory", + )?; - if !(self.concurrency == num_cpus::get() - || other.concurrency == num_cpus::get() - || self.concurrency == other.concurrency) - { - return Err("concurrency have conflicting definitions".into()); - } - if self.concurrency == num_cpus::get() { - self.concurrency = other.concurrency; - } + self.plugin_directories = merge_attrs( + self.plugin_directories.clone(), + other.plugin_directories, + Vec::default(), + "plugin_directories", + )?; + + self.concurrency = merge_attrs( + self.concurrency, + other.concurrency, + num_cpus::get(), + "concurrency", + )?; Ok(()) } @@ -104,6 +108,15 @@ impl Config { // Nullify this useless field self._definitions = serde_json::Value::Null; + for dir in &self.plugin_directories { + if dir.is_empty() { + return Err("can't specify empty plugin directory".into()); + } + if !dir.starts_with("/") { + return Err(format!("plugin directory paths must be absolute: {dir}")); + } + } + if self.patterns.is_empty() { return Err("no patterns configured".into()); } diff --git a/src/concepts/filter.rs b/src/concepts/filter.rs index fef8391..fdb656d 100644 --- a/src/concepts/filter.rs +++ b/src/concepts/filter.rs @@ -10,8 +10,9 @@ use chrono::TimeDelta; use regex::Regex; use serde::{Deserialize, Serialize}; -use super::{parse_duration, PatternType}; -use super::{Action, Match, Pattern, Patterns}; +use crate::plugin::Plugins; + +use super::{parse_duration, Action, Match, Pattern, PatternType, Patterns}; #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Deserialize, Serialize)] pub enum Duplicate { @@ -58,6 +59,11 @@ pub struct Filter { pub name: String, #[serde(skip)] pub stream_name: String, + // // Plugin-specific + // #[serde(default, rename = "type")] + // pub filter_type: Option, + // #[serde(default = "null_value")] + // pub options: Value, } impl Filter { @@ -190,6 +196,13 @@ impl Filter { Ok(()) } + pub fn plugin_setup(&mut self, plugins: &mut Plugins) -> Result<(), String> { + for (_, action) in &mut self.actions { + action.plugin_setup(plugins)?; + } + Ok(()) + } + pub fn get_match(&self, line: &str) -> Option { for regex in &self.compiled_regex { if let Some(matches) = regex.captures(line) { diff --git a/src/concepts/mod.rs b/src/concepts/mod.rs index 40a624c..b059b62 100644 --- a/src/concepts/mod.rs +++ b/src/concepts/mod.rs @@ -5,12 +5,15 @@ mod parse_duration; mod pattern; mod stream; +use std::fmt::Debug; + pub use action::Action; pub use config::{Config, Patterns}; pub use filter::{Duplicate, Filter}; use parse_duration::parse_duration; pub use pattern::{Pattern, PatternType}; use serde::{Deserialize, Serialize}; +use serde_json::Value; pub use stream::Stream; use chrono::{DateTime, Local}; @@ -24,5 +27,70 @@ pub struct MatchTime { pub t: Time, } +fn merge_attrs( + this: A, + other: A, + default: A, + name: &str, +) -> Result { + if !(this == default || other == default || this == other) { + return Err(format!( + "'{name}' has conflicting definitions: '{this:?}', '{other:?}'" + )); + } + if this == default { + return Ok(other); + } + return Ok(this); +} + +fn null_value() -> Value { + Value::Null +} + #[cfg(test)] pub use filter::tests as filter_tests; + +#[cfg(test)] +mod tests { + use crate::concepts::merge_attrs; + + #[test] + fn test_merge_attrs() { + assert_eq!(merge_attrs(None::, None, None, "t"), Ok(None)); + assert_eq!( + merge_attrs(Some("coucou"), None, None, "t"), + Ok(Some("coucou")) + ); + assert_eq!( + merge_attrs(None, Some("coucou"), None, "t"), + Ok(Some("coucou")) + ); + assert_eq!( + merge_attrs(Some("coucou"), Some("coucou"), None, "t"), + Ok(Some("coucou")) + ); + assert_eq!( + merge_attrs(Some("coucou"), Some("hello"), None, "t"), + Err("'t' has conflicting definitions: 'Some(\"coucou\")', 'Some(\"hello\")'".into()) + ); + + assert_eq!(merge_attrs("", "", "", "t"), Ok("")); + assert_eq!(merge_attrs("coucou", "", "", "t"), Ok("coucou")); + assert_eq!(merge_attrs("", "coucou", "", "t"), Ok("coucou")); + assert_eq!(merge_attrs("coucou", "coucou", "", "t"), Ok("coucou")); + assert_eq!( + merge_attrs("coucou", "hello", "", "t"), + Err("'t' has conflicting definitions: '\"coucou\"', '\"hello\"'".into()) + ); + + assert_eq!(merge_attrs(0, 0, 0, "t"), Ok(0)); + assert_eq!(merge_attrs(5, 0, 0, "t"), Ok(5)); + assert_eq!(merge_attrs(0, 5, 0, "t"), Ok(5)); + assert_eq!(merge_attrs(5, 5, 0, "t"), Ok(5)); + assert_eq!( + merge_attrs(5, 6, 0, "t"), + Err("'t' has conflicting definitions: '5', '6'".into()) + ); + } +} diff --git a/src/concepts/stream.rs b/src/concepts/stream.rs index 011652b..56e070e 100644 --- a/src/concepts/stream.rs +++ b/src/concepts/stream.rs @@ -1,8 +1,12 @@ use std::{cmp::Ordering, collections::BTreeMap, hash::Hash}; +use reaction_plugin::BoxedStreamImpl; use serde::{Deserialize, Serialize}; +use serde_json::Value; -use super::{Filter, Patterns}; +use crate::plugin::Plugins; + +use super::{merge_attrs, null_value, Filter, Patterns}; #[derive(Clone, Debug, Deserialize, Serialize)] #[cfg_attr(test, derive(Default))] @@ -10,11 +14,18 @@ use super::{Filter, Patterns}; pub struct Stream { #[serde(default)] pub cmd: Vec, + #[serde(default)] pub filters: BTreeMap, #[serde(skip)] pub name: String, + + // Plugin-specific + #[serde(default, rename = "type")] + pub stream_type: Option, + #[serde(default = "null_value")] + pub options: Value, } impl Stream { @@ -23,13 +34,8 @@ impl Stream { } pub fn merge(&mut self, other: Stream) -> Result<(), String> { - if !(self.cmd.is_empty() || other.cmd.is_empty() || self.cmd == other.cmd) { - return Err("cmd has conflicting definitions".into()); - } - - if self.cmd.is_empty() { - self.cmd = other.cmd; - } + self.cmd = merge_attrs(self.cmd.clone(), other.cmd, Vec::default(), "cmd")?; + self.stream_type = merge_attrs(self.stream_type.clone(), other.stream_type, None, "type")?; for (key, filter) in other.filters.into_iter() { if self.filters.insert(key.clone(), filter).is_some() { @@ -40,6 +46,12 @@ impl Stream { Ok(()) } + fn is_plugin(&self) -> bool { + self.stream_type + .as_ref() + .is_some_and(|stream_type| stream_type != "cmd") + } + pub fn setup(&mut self, name: &str, patterns: &Patterns) -> Result<(), String> { self._setup(name, patterns) .map_err(|msg| format!("stream {}: {}", name, msg)) @@ -55,11 +67,17 @@ impl Stream { return Err("character '.' is not allowed in stream name".into()); } - if self.cmd.is_empty() { - return Err("cmd is empty".into()); - } - if self.cmd[0].is_empty() { - return Err("cmd's first item is empty".into()); + if !self.is_plugin() { + if self.cmd.is_empty() { + return Err("cmd is empty".into()); + } + if self.cmd[0].is_empty() { + return Err("cmd's first item is empty".into()); + } + } else { + if !self.cmd.is_empty() { + return Err("can't define cmd and a plugin type".into()); + } } if self.filters.is_empty() { @@ -72,6 +90,19 @@ impl Stream { Ok(()) } + + // FIXME Nan faut pas que je fasse ça là en fait, ça doit se passer côté StreamManager en fait + // j'pense + pub fn plugin_setup(&mut self, plugins: &mut Plugins) -> Result<(), String> { + if self.is_plugin() { + plugins.init_stream_impl(self.name, self.stream_type, self.options); + } + + for (_, filter) in &mut self.filters { + filter.plugin_setup(plugins)?; + } + Ok(()) + } } impl PartialEq for Stream { @@ -102,16 +133,8 @@ mod tests { use super::*; use crate::concepts::filter::tests::ok_filter; - fn default_stream() -> Stream { - Stream { - cmd: Vec::new(), - name: "".into(), - filters: BTreeMap::new(), - } - } - fn ok_stream() -> Stream { - let mut stream = default_stream(); + let mut stream = Stream::default(); stream.cmd = vec!["command".into()]; stream.filters.insert("name".into(), ok_filter()); stream diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index c68f8e2..547c9b8 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -35,6 +35,13 @@ pub async fn daemon( config_path: PathBuf, socket: PathBuf, ) -> Result<(), Box> { + // Je dois + // 1. Fusionner toute la config + // 2. Charger tous les plugins + // 3. Setup la config, avec les plugins + // 4. Supprimer la struct des plugins + // → En fait nan, les plugins c'est pas du static, c'est live, faut que ça vivent dans le + // daemon! Au même endroit que les Command sont lancées en fait ! let config: &'static Config = Box::leak(Box::new(Config::from_path(&config_path)?)); // Cancellation Token @@ -66,7 +73,8 @@ pub async fn daemon( let mut filter_managers = HashMap::new(); for filter in stream.filters.values() { let manager = - FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &mut db, now).await?; + FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &mut db, now) + .await?; filter_managers.insert(filter, manager); } state.insert(stream, filter_managers.clone()); diff --git a/src/lib.rs b/src/lib.rs index 3619f1a..fb3cb94 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ pub mod cli; pub mod client; pub mod concepts; pub mod daemon; +pub mod plugin; pub mod protocol; pub mod tests; pub mod treedb; diff --git a/src/plugin/mod.rs b/src/plugin/mod.rs new file mode 100644 index 0000000..e1e3b8c --- /dev/null +++ b/src/plugin/mod.rs @@ -0,0 +1,168 @@ +use std::{collections::BTreeMap, path::PathBuf}; + +use reaction_plugin::{ + BoxedActionImpl, BoxedPluginInfo, BoxedStreamImpl, PluginInfoDyn, PluginInfoDynMut, +}; +use serde_json::Value; +use stabby::libloading::StabbyLibrary; +use tokio::{fs::read_dir, runtime::Handle}; +use value::to_stable_value; + +mod value; + +#[derive(Default)] +pub struct Plugins { + plugins: BTreeMap, + streams: BTreeMap, + // filters: BTreeMap, + actions: BTreeMap, +} + +impl Plugins { + pub async fn import(&mut self, plugin_directories: Vec) -> Result<(), String> { + for plugin_directory in plugin_directories { + let mut dir_entries = read_dir(&plugin_directory).await.map_err(|err| { + format!("Error reading plugin directory {plugin_directory}: {err}") + })?; + loop { + match dir_entries.next_entry().await { + Err(err) => { + return Err(format!( + "Error reading plugin directory {plugin_directory}: {err}" + )) + } + + Ok(None) => break, + + Ok(Some(entry)) => { + let filename = PathBuf::from(&plugin_directory).join(&entry.file_name()); + self.load_plugin(filename.clone()) + .await + .map_err(|err| format!("Error loading plugin {filename:?}: {err}"))?; + } + } + } + } + Ok(()) + } + + async fn load_plugin(&mut self, filename: PathBuf) -> Result<(), String> { + // TODO check ownership of file? + + let name = filename.to_string_lossy().to_string(); + // SAFETY This function is exposed by libloading as unsafe + // But we're (hopefully) gonna be safe with stabby <3 + #[allow(unsafe_code)] + let plugin = Handle::current() + .spawn_blocking(|| unsafe { libloading::Library::new(filename) }) + .await + // Join Error + .map_err(|err| err.to_string())? + // Libloading Error + .map_err(|err| err.to_string())?; + + // SAFETY This function is exposed by stabby as unsafe + // But we're (hopefully) gonna be safe <3 + #[allow(unsafe_code)] + let plugin_init = unsafe { + plugin.get_stabbied:: BoxedPluginInfo>(b"reaction_plugin") + }.map_err(|err| format!("expected entrypoint `fn reaction_plugin() -> BoxedPluginInfo` is either not present or malformed: {err}"))?; + + let plugin_info = plugin_init(); + + for stream in plugin_info.stream_impls() { + if let Some(name) = self.streams.insert(stream.clone().into(), name.clone()) { + return Err(format!( + "plugin {name} already exposed a stream with type name '{stream}'" + )); + } + } + + // for filter in plugin_info.filter_impls() { + // if let Some(name) = self.filters.insert(filter.clone().into(), name.clone()) { + // return Err(format!( + // "plugin {name} already exposed a filter with type name '{filter}'" + // )); + // } + // } + + for action in plugin_info.action_impls() { + if let Some(name) = self.actions.insert(action.clone().into(), name.clone()) { + return Err(format!( + "plugin {name} already exposed a action with type name '{action}'" + )); + } + } + + self.plugins.insert(name, plugin_info); + + Ok(()) + } + + pub fn finish_plugin_setup(self) -> Result<(), String> { + for mut plugin in self.plugins.into_values() { + // Didn't find a more elegant way to manipulate [`stabby::result::Result`] + let result = plugin.finish_setup(); + if result.is_err() { + return Err(result.unwrap_err().into()); + } + } + Ok(()) + } + + pub fn init_stream_impl( + &mut self, + stream_name: String, + stream_type: String, + config: Value, + ) -> Result { + let plugin_name = self + .streams + .get(&stream_type) + .ok_or(format!("No plugin provided a stream type '{stream_type}'"))?; + + let plugin = self.plugins.get_mut(plugin_name).unwrap(); + + let result = plugin.stream_impl( + stream_name.into(), + stream_type.into(), + to_stable_value(config), + ); + + if result.is_ok() { + Ok(result.unwrap()) + } else { + Err(result.err().unwrap().into()) + } + } + + pub fn init_action_impl( + &mut self, + stream_name: String, + filter_name: String, + action_name: String, + action_type: String, + config: Value, + ) -> Result { + let plugin_name = self + .actions + .get(&action_type) + .ok_or(format!("No plugin provided a action type '{action_type}'"))?; + + let plugin = self.plugins.get_mut(plugin_name).unwrap(); + + let result = plugin.action_impl( + stream_name.into(), + filter_name.into(), + action_name.into(), + action_type.into(), + to_stable_value(config), + ); + + if result.is_ok() { + Ok(result.unwrap()) + } else { + Err(result.err().unwrap().into()) + } + } +} diff --git a/src/plugin/value.rs b/src/plugin/value.rs new file mode 100644 index 0000000..e8b7d31 --- /dev/null +++ b/src/plugin/value.rs @@ -0,0 +1,34 @@ +use reaction_plugin::Value as RValue; +use serde_json::Value as JValue; +use stabby::{tuple::Tuple2, vec::Vec}; + +pub fn to_stable_value(val: JValue) -> RValue { + match val { + JValue::Null => RValue::Null, + JValue::Bool(b) => RValue::Bool(b), + JValue::Number(number) => { + if let Some(number) = number.as_i64() { + RValue::Integer(number) + } else if let Some(number) = number.as_f64() { + RValue::Float(number) + } else { + RValue::Null + } + } + JValue::String(s) => RValue::String(s.into()), + JValue::Array(v) => RValue::Array({ + let mut vec = Vec::with_capacity(v.len()); + for val in v { + vec.push(to_stable_value(val)); + } + vec + }), + JValue::Object(m) => RValue::Object({ + let mut map = Vec::with_capacity(m.len()); + for (key, val) in m { + map.push(Tuple2(key.into(), to_stable_value(val))); + } + map + }), + } +}