Adapt Config and plugin loading

daemon::Stream integration TBD
This commit is contained in:
ppom 2025-10-03 12:00:00 +02:00
commit ba65ea3879
No known key found for this signature in database
12 changed files with 253 additions and 198 deletions

1
Cargo.lock generated
View file

@ -1051,6 +1051,7 @@ dependencies = [
"reaction-plugin",
"regex",
"remoc",
"serde",
"serde_json",
"serde_yaml",
"tempfile",

View file

@ -42,6 +42,7 @@ num_cpus = "1.16.0"
# Regex matching
regex = "1.10.4"
# Configuration languages, ser/deserialisation
serde.workspace = true
serde_json = "1.0.117"
serde_yaml = "0.9.34"
jrsonnet-evaluator = "0.4.2"

View file

@ -1,3 +1,5 @@
use std::collections::BTreeMap;
/// This crate defines the API between reaction's core and plugins.
///
/// It's based on [`remoc`], which permits to multiplex channels and remote objects/functions/trait
@ -63,7 +65,7 @@ pub enum Value {
Float(f64),
String(String),
Array(Vec<Value>),
Object(Vec<(String, Value)>),
Object(BTreeMap<String, Value>),
}
#[derive(Serialize, Deserialize)]
@ -92,3 +94,5 @@ pub struct Exec {
pub match_: Vec<String>,
pub result: rch::oneshot::Sender<String>,
}
// TODO write main function here?

View file

@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::{debug, error, info, warn};
use super::{merge_attrs, Pattern, Stream};
use super::{merge_attrs, Pattern, Plugin, Stream};
pub type Patterns = BTreeMap<String, Arc<Pattern>>;
@ -25,7 +25,7 @@ pub struct Config {
pub state_directory: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub plugin_directories: Vec<String>,
pub plugins: Vec<Plugin>,
#[serde(default)]
pub patterns: Patterns,
@ -83,13 +83,6 @@ impl Config {
"state_directory",
)?;
self.plugin_directories = merge_attrs(
self.plugin_directories.clone(),
other.plugin_directories,
Vec::default(),
"plugin_directories",
)?;
self.concurrency = merge_attrs(
self.concurrency,
other.concurrency,
@ -108,13 +101,8 @@ impl Config {
// Nullify this useless field
self._definitions = serde_json::Value::Null;
for dir in &self.plugin_directories {
if dir.is_empty() {
return Err("can't specify empty plugin directory".into());
}
if !dir.starts_with("/") {
return Err(format!("plugin directory paths must be absolute: {dir}"));
}
for plugin in &mut self.plugins {
plugin.setup()?;
}
if self.patterns.is_empty() {

View file

@ -3,6 +3,7 @@ mod config;
mod filter;
mod parse_duration;
mod pattern;
mod plugin;
mod stream;
use std::fmt::Debug;
@ -12,6 +13,7 @@ pub use config::{Config, Patterns};
pub use filter::{Duplicate, Filter};
use parse_duration::parse_duration;
pub use pattern::{Pattern, PatternType};
pub use plugin::{Plugin};
use serde::{Deserialize, Serialize};
use serde_json::Value;
pub use stream::Stream;

45
src/concepts/plugin.rs Normal file
View file

@ -0,0 +1,45 @@
use std::{collections::BTreeMap, process::Stdio};
use serde::{Deserialize, Serialize};
use tokio::process::{Child, Command};
#[derive(Clone, Debug, Deserialize, Serialize)]
#[cfg_attr(test, derive(Default))]
#[serde(deny_unknown_fields)]
pub struct Plugin {
pub path: String,
/// If empty, defaults to root
pub file_owner: Option<String>,
/// If empty, defaults to root
pub exec_user: Option<String>,
/// If empty, hash is not performed
pub sha256: Option<String>,
/// Option for `run0`. Do not provide User.
pub systemd_options: Option<BTreeMap<String, Vec<String>>>,
}
// NOTE
// `run0` can be used for security customisation.
// with the --pipe option, raw stdio fd are transmitted to the underlying command, so there is no overhead.
impl Plugin {
pub fn setup(&mut self) -> Result<(), String> {
if self.path.is_empty() {
return Err("can't specify empty plugin path".into());
}
if !self.path.starts_with("/") {
return Err(format!("plugin paths must be absolute: {}", self.path));
}
Ok(())
}
pub fn launch(&self) -> Result<Child, std::io::Error> {
// TODO owner check
// TODO hash check
// TODO run0 options
Command::new(&self.path)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
}
}

View file

@ -16,9 +16,10 @@ use tokio::{
};
use tracing::{debug, info};
use crate::{concepts::Config, plugin::Plugins, treedb::Database};
use crate::{concepts::Config, treedb::Database};
use filter::FilterManager;
pub use filter::React;
use plugin::Plugins;
pub use shutdown::{ShutdownController, ShutdownDelegate, ShutdownToken};
use socket::Socket;
use stream::StreamManager;
@ -27,6 +28,7 @@ use stream::StreamManager;
pub use filter::tests;
mod filter;
mod plugin;
mod shutdown;
mod socket;
mod stream;
@ -37,8 +39,7 @@ pub async fn daemon(
) -> Result<(), Box<dyn Error + Send + Sync>> {
let config: &'static Config = Box::leak(Box::new(Config::from_path(&config_path)?));
let mut plugins = Plugins::default();
plugins.import(&config.plugin_directories).await?;
let mut plugins = Plugins::new(&config.plugins).await?;
// Cancellation Token
let shutdown = ShutdownController::new();

182
src/daemon/plugin/mod.rs Normal file
View file

@ -0,0 +1,182 @@
use std::{
collections::BTreeMap,
ops::{Deref, DerefMut},
};
use reaction_plugin::{ActionImpl, PluginInfo, PluginInfoClient, StreamImpl};
use remoc::Connect;
use serde_json::Value;
use tokio::process::Child;
use crate::concepts::Plugin;
mod value;
use value::to_stable_value;
pub struct PluginManager {
child: Child,
plugin: &'static Plugin,
plugin_info: PluginInfoClient,
}
impl Deref for PluginManager {
type Target = PluginInfoClient;
fn deref(&self) -> &Self::Target {
&self.plugin_info
}
}
impl DerefMut for PluginManager {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.plugin_info
}
}
impl PluginManager {
async fn new(plugin: &'static Plugin) -> Result<Self, String> {
let mut child = plugin
.launch()
.map_err(|err| format!("could not launch plugin: {err}"))?;
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let (conn, _tx, mut rx): (
_,
remoc::rch::base::Sender<()>,
remoc::rch::base::Receiver<PluginInfoClient>,
) = Connect::io_buffered(remoc::Cfg::default(), stdout, stdin, 2048)
.await
.map_err(|err| format!("could not init communication with plugin: {err}"))?;
tokio::spawn(conn);
let plugin_info = rx
.recv()
.await
.map_err(|err| format!("could not retrieve initial information from plugin: {err}"))?
.ok_or("could not retrieve initial information from plugin: no data")?;
Ok(Self {
child,
plugin,
plugin_info,
})
}
}
#[derive(Default)]
pub struct Plugins {
plugins: BTreeMap<String, PluginManager>,
streams: BTreeMap<String, String>,
actions: BTreeMap<String, String>,
}
impl Plugins {
pub async fn new(plugins: &'static Vec<Plugin>) -> Result<Self, String> {
let mut this = Self::default();
for plugin in plugins {
let path = plugin.path.clone();
this.load_plugin(&plugin)
.await
.map_err(|err| format!("plugin {path}: {err}]"))?;
}
Ok(this)
}
async fn load_plugin(&mut self, plugin: &'static Plugin) -> Result<(), String> {
let path = plugin.path.clone();
let manager = PluginManager::new(plugin).await?;
for stream in manager
.stream_impls()
.await
.map_err(|err| format!("plugin error: {err}"))?
{
if let Some(path) = self.streams.insert(stream.clone().into(), path.clone()) {
return Err(format!(
"plugin {path} already exposed a stream with type name '{stream}'"
));
}
}
for action in manager
.action_impls()
.await
.map_err(|err| format!("plugin error: {err}"))?
{
if let Some(path) = self.actions.insert(action.clone().into(), path.clone()) {
return Err(format!(
"plugin {path} already exposed a action with type name '{action}'"
));
}
}
self.plugins.insert(path, manager);
Ok(())
}
pub async fn finish_plugin_setup(self) -> Result<(), String> {
for mut plugin in self.plugins.into_values() {
plugin
.finish_setup()
.await
.map_err(|err| format!("plugin error: {err}"))?
.map_err(|err| format!("invalid config for plugin: {err}"))?;
}
Ok(())
}
pub async fn init_stream_impl(
&mut self,
stream_name: String,
stream_type: String,
config: Value,
) -> Result<StreamImpl, String> {
let plugin_name = self
.streams
.get(&stream_type)
.ok_or(format!("No plugin provided a stream type '{stream_type}'"))?;
let plugin = self.plugins.get_mut(plugin_name).unwrap();
plugin
.stream_impl(
stream_name.into(),
stream_type.into(),
to_stable_value(config),
)
.await
.map_err(|err| format!("plugin error: {err}"))?
}
pub async fn init_action_impl(
&mut self,
stream_name: String,
filter_name: String,
action_name: String,
action_type: String,
config: Value,
) -> Result<ActionImpl, String> {
let plugin_name = self
.actions
.get(&action_type)
.ok_or(format!("No plugin provided a action type '{action_type}'"))?;
let plugin = self.plugins.get_mut(plugin_name).unwrap();
plugin
.action_impl(
stream_name.into(),
filter_name.into(),
action_name.into(),
action_type.into(),
to_stable_value(config),
)
.await
.map_err(|err| format!("plugin error: {err}"))?
}
}

View file

@ -1,6 +1,7 @@
use std::collections::BTreeMap;
use reaction_plugin::Value as RValue;
use serde_json::Value as JValue;
use stabby::{tuple::Tuple2, vec::Vec};
pub fn to_stable_value(val: JValue) -> RValue {
match val {
@ -24,9 +25,9 @@ pub fn to_stable_value(val: JValue) -> RValue {
vec
}),
JValue::Object(m) => RValue::Object({
let mut map = Vec::with_capacity(m.len());
let mut map = BTreeMap::new();
for (key, val) in m {
map.push(Tuple2(key.into(), to_stable_value(val)));
map.insert(key.into(), to_stable_value(val));
}
map
}),

View file

@ -6,7 +6,7 @@ use std::{
use chrono::Local;
use futures::{FutureExt, Stream as AsyncStream, StreamExt};
use reaction_plugin::{BoxedStreamImpl, StreamImplDynMut};
use reaction_plugin::StreamImpl;
use regex::RegexSet;
use tokio::{
io::{AsyncBufReadExt, BufReader},
@ -17,8 +17,7 @@ use tracing::{error, info, warn};
use crate::{
concepts::{Filter, Stream},
daemon::filter::FilterManager,
plugin::Plugins,
daemon::{filter::FilterManager, plugin::Plugins},
};
use super::shutdown::ShutdownToken;
@ -54,7 +53,7 @@ pub struct StreamManager {
compiled_regex_set: RegexSet,
regex_index_to_filter_manager: Vec<FilterManager>,
stream: &'static Stream,
stream_plugin: Option<&'static mut BoxedStreamImpl>,
stream_plugin: Option<StreamImpl>,
shutdown: ShutdownToken,
}
@ -76,11 +75,11 @@ impl StreamManager {
.collect();
let stream_plugin = if stream.is_plugin() {
Some(Box::leak(Box::new(plugins.init_stream_impl(
Some(plugins.init_stream_impl(
stream.name.clone(),
stream.stream_type.clone().unwrap(),
stream.options.clone(),
)?)))
)?)
} else {
None
};

View file

@ -13,7 +13,6 @@ pub mod cli;
pub mod client;
pub mod concepts;
pub mod daemon;
pub mod plugin;
pub mod protocol;
pub mod tests;
pub mod treedb;

View file

@ -1,168 +0,0 @@
use std::{collections::BTreeMap, path::PathBuf};
use reaction_plugin::{
BoxedActionImpl, BoxedPluginInfo, BoxedStreamImpl, PluginInfoDyn, PluginInfoDynMut,
};
use serde_json::Value;
use stabby::libloading::StabbyLibrary;
use tokio::{fs::read_dir, runtime::Handle};
use value::to_stable_value;
mod value;
#[derive(Default)]
pub struct Plugins {
plugins: BTreeMap<String, BoxedPluginInfo>,
streams: BTreeMap<String, String>,
// filters: BTreeMap<String, String>,
actions: BTreeMap<String, String>,
}
impl Plugins {
pub async fn import(&mut self, plugin_directories: &Vec<String>) -> Result<(), String> {
for plugin_directory in plugin_directories {
let mut dir_entries = read_dir(&plugin_directory).await.map_err(|err| {
format!("Error reading plugin directory {plugin_directory}: {err}")
})?;
loop {
match dir_entries.next_entry().await {
Err(err) => {
return Err(format!(
"Error reading plugin directory {plugin_directory}: {err}"
))
}
Ok(None) => break,
Ok(Some(entry)) => {
let filename = PathBuf::from(&plugin_directory).join(&entry.file_name());
self.load_plugin(filename.clone())
.await
.map_err(|err| format!("Error loading plugin {filename:?}: {err}"))?;
}
}
}
}
Ok(())
}
async fn load_plugin(&mut self, filename: PathBuf) -> Result<(), String> {
// TODO check ownership of file?
let name = filename.to_string_lossy().to_string();
// SAFETY This function is exposed by libloading as unsafe
// But we're (hopefully) gonna be safe with stabby <3
#[allow(unsafe_code)]
let plugin = Handle::current()
.spawn_blocking(|| unsafe { libloading::Library::new(filename) })
.await
// Join Error
.map_err(|err| err.to_string())?
// Libloading Error
.map_err(|err| err.to_string())?;
// SAFETY This function is exposed by stabby as unsafe
// But we're (hopefully) gonna be safe <3
#[allow(unsafe_code)]
let plugin_init = unsafe {
plugin.get_stabbied::<extern "C" fn() -> BoxedPluginInfo>(b"reaction_plugin")
}.map_err(|err| format!("expected entrypoint `fn reaction_plugin() -> BoxedPluginInfo` is either not present or malformed: {err}"))?;
let plugin_info = plugin_init();
for stream in plugin_info.stream_impls() {
if let Some(name) = self.streams.insert(stream.clone().into(), name.clone()) {
return Err(format!(
"plugin {name} already exposed a stream with type name '{stream}'"
));
}
}
// for filter in plugin_info.filter_impls() {
// if let Some(name) = self.filters.insert(filter.clone().into(), name.clone()) {
// return Err(format!(
// "plugin {name} already exposed a filter with type name '{filter}'"
// ));
// }
// }
for action in plugin_info.action_impls() {
if let Some(name) = self.actions.insert(action.clone().into(), name.clone()) {
return Err(format!(
"plugin {name} already exposed a action with type name '{action}'"
));
}
}
self.plugins.insert(name, plugin_info);
Ok(())
}
pub fn finish_plugin_setup(self) -> Result<(), String> {
for mut plugin in self.plugins.into_values() {
// Didn't find a more elegant way to manipulate [`stabby::result::Result`]
let result = plugin.finish_setup();
if result.is_err() {
return Err(result.unwrap_err().into());
}
}
Ok(())
}
pub fn init_stream_impl(
&mut self,
stream_name: String,
stream_type: String,
config: Value,
) -> Result<BoxedStreamImpl, String> {
let plugin_name = self
.streams
.get(&stream_type)
.ok_or(format!("No plugin provided a stream type '{stream_type}'"))?;
let plugin = self.plugins.get_mut(plugin_name).unwrap();
let result = plugin.stream_impl(
stream_name.into(),
stream_type.into(),
to_stable_value(config),
);
if result.is_ok() {
Ok(result.unwrap())
} else {
Err(result.err().unwrap().into())
}
}
pub fn init_action_impl(
&mut self,
stream_name: String,
filter_name: String,
action_name: String,
action_type: String,
config: Value,
) -> Result<BoxedActionImpl, String> {
let plugin_name = self
.actions
.get(&action_type)
.ok_or(format!("No plugin provided a action type '{action_type}'"))?;
let plugin = self.plugins.get_mut(plugin_name).unwrap();
let result = plugin.action_impl(
stream_name.into(),
filter_name.into(),
action_name.into(),
action_type.into(),
to_stable_value(config),
);
if result.is_ok() {
Ok(result.unwrap())
} else {
Err(result.err().unwrap().into())
}
}
}