Loading plugin not on config side, but stream/action manager side

Trying to implement this on the StreamManager first.
I get lifetime errors that make no sense to me, like futures should
hold any argument with 'static.

I wonder if I should try to convert everything stabby to abi_stable &
async_ffi. I'll try this and see if it solves anything.
This commit is contained in:
ppom 2025-09-28 12:00:00 +02:00
commit fc11234f12
No known key found for this signature in database
7 changed files with 92 additions and 58 deletions

View file

@ -65,13 +65,14 @@ pub trait PluginInfo {
}
pub type BoxedPluginInfo = stabby::dynptr!(Box<dyn PluginInfo>);
pub type BoxedStreamImpl = stabby::dynptr!(Box<dyn StreamImpl>);
pub type BoxedStreamImpl = stabby::dynptr!(Box<dyn StreamImpl + Sync + Send>);
// pub type BoxedFilterImpl = stabby::dynptr!(Box<dyn FilterImpl>);
pub type BoxedActionImpl = stabby::dynptr!(Box<dyn ActionImpl>);
pub type BoxedActionImpl = stabby::dynptr!(Box<dyn ActionImpl + Sync + Send>);
#[stabby::stabby(checked)]
pub trait StreamImpl {
extern "C" fn next<'a>(&'a mut self) -> DynFuture<'a, Result<Option<Vec<u8>>, String>>;
extern "C" fn start<'a>(&'a mut self) -> DynFuture<'a, Result<(), String>>;
extern "C" fn next<'a>(&'a mut self) -> DynFuture<'a, Result<Option<String>, String>>;
extern "C" fn close<'a>(&'a mut self) -> DynFuture<'a, Result<(), String>>;
}
@ -83,6 +84,9 @@ pub trait StreamImpl {
#[stabby::stabby(checked)]
pub trait ActionImpl {
extern "C" fn next<'a>(&'a mut self) -> DynFuture<'a, Result<Option<Vec<u8>>, String>>;
extern "C" fn exec<'a>(
&'a mut self,
match_: Vec<String>,
) -> DynFuture<'a, Result<Option<Vec<u8>>, String>>;
extern "C" fn close<'a>(&'a mut self) -> DynFuture<'a, Result<(), String>>;
}

View file

@ -6,8 +6,6 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::process::Command;
use crate::plugin::Plugins;
use super::{null_value, parse_duration::*, Match, Pattern, PatternType};
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
@ -60,6 +58,12 @@ fn is_false(b: &bool) -> bool {
}
impl Action {
fn is_plugin(&self) -> bool {
self.action_type
.as_ref()
.is_some_and(|action_type| action_type != "cmd")
}
pub fn setup(
&mut self,
stream_name: &str,
@ -90,11 +94,7 @@ impl Action {
return Err("character '.' is not allowed in filter name".into());
}
if self
.action_type
.as_ref()
.is_none_or(|stream_type| stream_type == "cmd")
{
if !self.is_plugin() {
if self.cmd.is_empty() {
return Err("cmd is empty".into());
}
@ -103,7 +103,7 @@ impl Action {
}
} else {
if !self.cmd.is_empty() {
return Err("can't define cmd and a plugin type".into());
return Err("can't define a cmd and a plugin type".into());
}
}
@ -136,11 +136,6 @@ impl Action {
Ok(())
}
pub fn plugin_setup(&mut self, plugins: &mut Plugins) -> Result<(), String> {
// TODO self setup
Ok(())
}
// TODO test
pub fn exec(&self, match_: &Match) -> Command {
let computed_command = if self.patterns.is_empty() {

View file

@ -10,8 +10,6 @@ use chrono::TimeDelta;
use regex::Regex;
use serde::{Deserialize, Serialize};
use crate::plugin::Plugins;
use super::{parse_duration, Action, Match, Pattern, PatternType, Patterns};
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Deserialize, Serialize)]
@ -196,13 +194,6 @@ impl Filter {
Ok(())
}
pub fn plugin_setup(&mut self, plugins: &mut Plugins) -> Result<(), String> {
for (_, action) in &mut self.actions {
action.plugin_setup(plugins)?;
}
Ok(())
}
pub fn get_match(&self, line: &str) -> Option<Match> {
for regex in &self.compiled_regex {
if let Some(matches) = regex.captures(line) {

View file

@ -1,11 +1,8 @@
use std::{cmp::Ordering, collections::BTreeMap, hash::Hash};
use reaction_plugin::BoxedStreamImpl;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::plugin::Plugins;
use super::{merge_attrs, null_value, Filter, Patterns};
#[derive(Clone, Debug, Deserialize, Serialize)]
@ -46,7 +43,7 @@ impl Stream {
Ok(())
}
fn is_plugin(&self) -> bool {
pub fn is_plugin(&self) -> bool {
self.stream_type
.as_ref()
.is_some_and(|stream_type| stream_type != "cmd")
@ -90,19 +87,6 @@ impl Stream {
Ok(())
}
// FIXME Nan faut pas que je fasse ça là en fait, ça doit se passer côté StreamManager en fait
// j'pense
pub fn plugin_setup(&mut self, plugins: &mut Plugins) -> Result<(), String> {
if self.is_plugin() {
plugins.init_stream_impl(self.name, self.stream_type, self.options);
}
for (_, filter) in &mut self.filters {
filter.plugin_setup(plugins)?;
}
Ok(())
}
}
impl PartialEq for Stream {

View file

@ -16,7 +16,7 @@ use tokio::{
};
use tracing::{debug, info};
use crate::{concepts::Config, treedb::Database};
use crate::{concepts::Config, plugin::Plugins, treedb::Database};
use filter::FilterManager;
pub use filter::React;
pub use shutdown::{ShutdownController, ShutdownDelegate, ShutdownToken};
@ -35,15 +35,11 @@ pub async fn daemon(
config_path: PathBuf,
socket: PathBuf,
) -> Result<(), Box<dyn Error + Send + Sync>> {
// Je dois
// 1. Fusionner toute la config
// 2. Charger tous les plugins
// 3. Setup la config, avec les plugins
// 4. Supprimer la struct des plugins
// → En fait nan, les plugins c'est pas du static, c'est live, faut que ça vivent dans le
// daemon! Au même endroit que les Command sont lancées en fait !
let config: &'static Config = Box::leak(Box::new(Config::from_path(&config_path)?));
let mut plugins = Plugins::default();
plugins.import(&config.plugin_directories).await?;
// Cancellation Token
let shutdown = ShutdownController::new();
@ -83,6 +79,7 @@ pub async fn daemon(
stream,
filter_managers,
shutdown.token(),
&mut plugins,
)?);
}
(state, stream_managers)

View file

@ -6,6 +6,7 @@ use std::{
use chrono::Local;
use futures::{FutureExt, Stream as AsyncStream, StreamExt};
use reaction_plugin::{BoxedStreamImpl, StreamImplDynMut};
use regex::RegexSet;
use tokio::{
io::{AsyncBufReadExt, BufReader},
@ -17,6 +18,7 @@ use tracing::{error, info, warn};
use crate::{
concepts::{Filter, Stream},
daemon::filter::FilterManager,
plugin::Plugins,
};
use super::shutdown::ShutdownToken;
@ -52,6 +54,7 @@ pub struct StreamManager {
compiled_regex_set: RegexSet,
regex_index_to_filter_manager: Vec<FilterManager>,
stream: &'static Stream,
stream_plugin: Option<&'static mut BoxedStreamImpl>,
shutdown: ShutdownToken,
}
@ -60,7 +63,8 @@ impl StreamManager {
stream: &'static Stream,
filter_managers: HashMap<&'static Filter, FilterManager>,
shutdown: ShutdownToken,
) -> Result<Self, regex::Error> {
plugins: &mut Plugins,
) -> Result<Self, String> {
let all_regexes: BTreeMap<_, _> = filter_managers
.iter()
.flat_map(|(filter, filter_manager)| {
@ -71,16 +75,71 @@ impl StreamManager {
})
.collect();
let stream_plugin = if stream.is_plugin() {
Some(Box::leak(Box::new(plugins.init_stream_impl(
stream.name.clone(),
stream.stream_type.clone().unwrap(),
stream.options.clone(),
)?)))
} else {
None
};
Ok(StreamManager {
compiled_regex_set: RegexSet::new(all_regexes.keys())?,
compiled_regex_set: RegexSet::new(all_regexes.keys()).map_err(|err| err.to_string())?,
regex_index_to_filter_manager: all_regexes.into_values().collect(),
stream,
stream_plugin,
shutdown,
})
}
pub async fn start(self) {
info!("{}: start {:?}", self.stream.name, self.stream.cmd);
if self.stream_plugin.is_some() {
self.start_plugin().await
} else {
self.start_cmd().await
}
}
async fn start_plugin(self) {
let plugin = self.stream_plugin.unwrap();
{
let result = plugin.start().await;
if result.is_err() {
error!(
"could not execute stream {}: {}",
self.stream.name,
result.unwrap_err()
);
return;
}
}
loop {
let result = plugin.next().await;
let result = if result.is_ok() {
let option = result.unwrap();
if option.is_some() {
self.handle_line(option.unwrap().to_string()).await;
} else {
return;
}
} else {
error!(
"impossible to read output from stream {}: {}",
self.stream.name,
result.unwrap_err()
);
return;
};
}
}
async fn start_cmd(self) {
let mut child = match Command::new(&self.stream.cmd[0])
.args(&self.stream.cmd[1..])
.stdin(Stdio::null())
@ -171,10 +230,7 @@ impl StreamManager {
loop {
match lines.next().await {
Some(Ok(line)) => {
let now = Local::now();
for manager in self.matching_filters(&line) {
manager.handle_line(&line, now).await;
}
self.handle_line(line).await;
}
Some(Err(err)) => {
error!(
@ -190,6 +246,13 @@ impl StreamManager {
}
}
async fn handle_line(&self, line: String) {
let now = Local::now();
for manager in self.matching_filters(&line) {
manager.handle_line(&line, now).await;
}
}
fn matching_filters(&self, line: &str) -> BTreeSet<&FilterManager> {
let matches = self.compiled_regex_set.matches(line);
matches

View file

@ -19,7 +19,7 @@ pub struct Plugins {
}
impl Plugins {
pub async fn import(&mut self, plugin_directories: Vec<String>) -> Result<(), String> {
pub async fn import(&mut self, plugin_directories: &Vec<String>) -> Result<(), String> {
for plugin_directory in plugin_directories {
let mut dir_entries = read_dir(&plugin_directory).await.map_err(|err| {
format!("Error reading plugin directory {plugin_directory}: {err}")