diff --git a/reaction-plugin/src/lib.rs b/reaction-plugin/src/lib.rs index 36095d3..17abf77 100644 --- a/reaction-plugin/src/lib.rs +++ b/reaction-plugin/src/lib.rs @@ -65,13 +65,14 @@ pub trait PluginInfo { } pub type BoxedPluginInfo = stabby::dynptr!(Box); -pub type BoxedStreamImpl = stabby::dynptr!(Box); +pub type BoxedStreamImpl = stabby::dynptr!(Box); // pub type BoxedFilterImpl = stabby::dynptr!(Box); -pub type BoxedActionImpl = stabby::dynptr!(Box); +pub type BoxedActionImpl = stabby::dynptr!(Box); #[stabby::stabby(checked)] pub trait StreamImpl { - extern "C" fn next<'a>(&'a mut self) -> DynFuture<'a, Result>, String>>; + extern "C" fn start<'a>(&'a mut self) -> DynFuture<'a, Result<(), String>>; + extern "C" fn next<'a>(&'a mut self) -> DynFuture<'a, Result, String>>; extern "C" fn close<'a>(&'a mut self) -> DynFuture<'a, Result<(), String>>; } @@ -83,6 +84,9 @@ pub trait StreamImpl { #[stabby::stabby(checked)] pub trait ActionImpl { - extern "C" fn next<'a>(&'a mut self) -> DynFuture<'a, Result>, String>>; + extern "C" fn exec<'a>( + &'a mut self, + match_: Vec, + ) -> DynFuture<'a, Result>, String>>; extern "C" fn close<'a>(&'a mut self) -> DynFuture<'a, Result<(), String>>; } diff --git a/src/concepts/action.rs b/src/concepts/action.rs index 772fb48..98dc5f8 100644 --- a/src/concepts/action.rs +++ b/src/concepts/action.rs @@ -6,8 +6,6 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::process::Command; -use crate::plugin::Plugins; - use super::{null_value, parse_duration::*, Match, Pattern, PatternType}; #[derive(Clone, Debug, Default, Deserialize, Serialize)] @@ -60,6 +58,12 @@ fn is_false(b: &bool) -> bool { } impl Action { + fn is_plugin(&self) -> bool { + self.action_type + .as_ref() + .is_some_and(|action_type| action_type != "cmd") + } + pub fn setup( &mut self, stream_name: &str, @@ -90,11 +94,7 @@ impl Action { return Err("character '.' is not allowed in filter name".into()); } - if self - .action_type - .as_ref() - .is_none_or(|stream_type| stream_type == "cmd") - { + if !self.is_plugin() { if self.cmd.is_empty() { return Err("cmd is empty".into()); } @@ -103,7 +103,7 @@ impl Action { } } else { if !self.cmd.is_empty() { - return Err("can't define cmd and a plugin type".into()); + return Err("can't define a cmd and a plugin type".into()); } } @@ -136,11 +136,6 @@ impl Action { Ok(()) } - pub fn plugin_setup(&mut self, plugins: &mut Plugins) -> Result<(), String> { - // TODO self setup - Ok(()) - } - // TODO test pub fn exec(&self, match_: &Match) -> Command { let computed_command = if self.patterns.is_empty() { diff --git a/src/concepts/filter.rs b/src/concepts/filter.rs index fdb656d..cf5e0d4 100644 --- a/src/concepts/filter.rs +++ b/src/concepts/filter.rs @@ -10,8 +10,6 @@ use chrono::TimeDelta; use regex::Regex; use serde::{Deserialize, Serialize}; -use crate::plugin::Plugins; - use super::{parse_duration, Action, Match, Pattern, PatternType, Patterns}; #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Deserialize, Serialize)] @@ -196,13 +194,6 @@ impl Filter { Ok(()) } - pub fn plugin_setup(&mut self, plugins: &mut Plugins) -> Result<(), String> { - for (_, action) in &mut self.actions { - action.plugin_setup(plugins)?; - } - Ok(()) - } - pub fn get_match(&self, line: &str) -> Option { for regex in &self.compiled_regex { if let Some(matches) = regex.captures(line) { diff --git a/src/concepts/stream.rs b/src/concepts/stream.rs index 56e070e..40c78d0 100644 --- a/src/concepts/stream.rs +++ b/src/concepts/stream.rs @@ -1,11 +1,8 @@ use std::{cmp::Ordering, collections::BTreeMap, hash::Hash}; -use reaction_plugin::BoxedStreamImpl; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::plugin::Plugins; - use super::{merge_attrs, null_value, Filter, Patterns}; #[derive(Clone, Debug, Deserialize, Serialize)] @@ -46,7 +43,7 @@ impl Stream { Ok(()) } - fn is_plugin(&self) -> bool { + pub fn is_plugin(&self) -> bool { self.stream_type .as_ref() .is_some_and(|stream_type| stream_type != "cmd") @@ -90,19 +87,6 @@ impl Stream { Ok(()) } - - // FIXME Nan faut pas que je fasse ça là en fait, ça doit se passer côté StreamManager en fait - // j'pense - pub fn plugin_setup(&mut self, plugins: &mut Plugins) -> Result<(), String> { - if self.is_plugin() { - plugins.init_stream_impl(self.name, self.stream_type, self.options); - } - - for (_, filter) in &mut self.filters { - filter.plugin_setup(plugins)?; - } - Ok(()) - } } impl PartialEq for Stream { diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 547c9b8..c396b30 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -16,7 +16,7 @@ use tokio::{ }; use tracing::{debug, info}; -use crate::{concepts::Config, treedb::Database}; +use crate::{concepts::Config, plugin::Plugins, treedb::Database}; use filter::FilterManager; pub use filter::React; pub use shutdown::{ShutdownController, ShutdownDelegate, ShutdownToken}; @@ -35,15 +35,11 @@ pub async fn daemon( config_path: PathBuf, socket: PathBuf, ) -> Result<(), Box> { - // Je dois - // 1. Fusionner toute la config - // 2. Charger tous les plugins - // 3. Setup la config, avec les plugins - // 4. Supprimer la struct des plugins - // → En fait nan, les plugins c'est pas du static, c'est live, faut que ça vivent dans le - // daemon! Au même endroit que les Command sont lancées en fait ! let config: &'static Config = Box::leak(Box::new(Config::from_path(&config_path)?)); + let mut plugins = Plugins::default(); + plugins.import(&config.plugin_directories).await?; + // Cancellation Token let shutdown = ShutdownController::new(); @@ -83,6 +79,7 @@ pub async fn daemon( stream, filter_managers, shutdown.token(), + &mut plugins, )?); } (state, stream_managers) diff --git a/src/daemon/stream.rs b/src/daemon/stream.rs index a55d449..37f0328 100644 --- a/src/daemon/stream.rs +++ b/src/daemon/stream.rs @@ -6,6 +6,7 @@ use std::{ use chrono::Local; use futures::{FutureExt, Stream as AsyncStream, StreamExt}; +use reaction_plugin::{BoxedStreamImpl, StreamImplDynMut}; use regex::RegexSet; use tokio::{ io::{AsyncBufReadExt, BufReader}, @@ -17,6 +18,7 @@ use tracing::{error, info, warn}; use crate::{ concepts::{Filter, Stream}, daemon::filter::FilterManager, + plugin::Plugins, }; use super::shutdown::ShutdownToken; @@ -52,6 +54,7 @@ pub struct StreamManager { compiled_regex_set: RegexSet, regex_index_to_filter_manager: Vec, stream: &'static Stream, + stream_plugin: Option<&'static mut BoxedStreamImpl>, shutdown: ShutdownToken, } @@ -60,7 +63,8 @@ impl StreamManager { stream: &'static Stream, filter_managers: HashMap<&'static Filter, FilterManager>, shutdown: ShutdownToken, - ) -> Result { + plugins: &mut Plugins, + ) -> Result { let all_regexes: BTreeMap<_, _> = filter_managers .iter() .flat_map(|(filter, filter_manager)| { @@ -71,16 +75,71 @@ impl StreamManager { }) .collect(); + let stream_plugin = if stream.is_plugin() { + Some(Box::leak(Box::new(plugins.init_stream_impl( + stream.name.clone(), + stream.stream_type.clone().unwrap(), + stream.options.clone(), + )?))) + } else { + None + }; + Ok(StreamManager { - compiled_regex_set: RegexSet::new(all_regexes.keys())?, + compiled_regex_set: RegexSet::new(all_regexes.keys()).map_err(|err| err.to_string())?, regex_index_to_filter_manager: all_regexes.into_values().collect(), stream, + stream_plugin, shutdown, }) } pub async fn start(self) { info!("{}: start {:?}", self.stream.name, self.stream.cmd); + + if self.stream_plugin.is_some() { + self.start_plugin().await + } else { + self.start_cmd().await + } + } + + async fn start_plugin(self) { + let plugin = self.stream_plugin.unwrap(); + + { + let result = plugin.start().await; + if result.is_err() { + error!( + "could not execute stream {}: {}", + self.stream.name, + result.unwrap_err() + ); + return; + } + } + + loop { + let result = plugin.next().await; + let result = if result.is_ok() { + let option = result.unwrap(); + if option.is_some() { + self.handle_line(option.unwrap().to_string()).await; + } else { + return; + } + } else { + error!( + "impossible to read output from stream {}: {}", + self.stream.name, + result.unwrap_err() + ); + return; + }; + } + } + + async fn start_cmd(self) { let mut child = match Command::new(&self.stream.cmd[0]) .args(&self.stream.cmd[1..]) .stdin(Stdio::null()) @@ -171,10 +230,7 @@ impl StreamManager { loop { match lines.next().await { Some(Ok(line)) => { - let now = Local::now(); - for manager in self.matching_filters(&line) { - manager.handle_line(&line, now).await; - } + self.handle_line(line).await; } Some(Err(err)) => { error!( @@ -190,6 +246,13 @@ impl StreamManager { } } + async fn handle_line(&self, line: String) { + let now = Local::now(); + for manager in self.matching_filters(&line) { + manager.handle_line(&line, now).await; + } + } + fn matching_filters(&self, line: &str) -> BTreeSet<&FilterManager> { let matches = self.compiled_regex_set.matches(line); matches diff --git a/src/plugin/mod.rs b/src/plugin/mod.rs index e1e3b8c..958a2cc 100644 --- a/src/plugin/mod.rs +++ b/src/plugin/mod.rs @@ -19,7 +19,7 @@ pub struct Plugins { } impl Plugins { - pub async fn import(&mut self, plugin_directories: Vec) -> Result<(), String> { + pub async fn import(&mut self, plugin_directories: &Vec) -> Result<(), String> { for plugin_directory in plugin_directories { let mut dir_entries = read_dir(&plugin_directory).await.map_err(|err| { format!("Error reading plugin directory {plugin_directory}: {err}")