mirror of
https://framagit.org/ppom/reaction
synced 2026-03-14 20:55:47 +01:00
Adapt Config and plugin loading
daemon::Stream integration TBD
This commit is contained in:
parent
a99dea4421
commit
d887acf27e
12 changed files with 253 additions and 198 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -1051,6 +1051,7 @@ dependencies = [
|
|||
"reaction-plugin",
|
||||
"regex",
|
||||
"remoc",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_yaml",
|
||||
"tempfile",
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ num_cpus = "1.16.0"
|
|||
# Regex matching
|
||||
regex = "1.10.4"
|
||||
# Configuration languages, ser/deserialisation
|
||||
serde.workspace = true
|
||||
serde_json = "1.0.117"
|
||||
serde_yaml = "0.9.34"
|
||||
jrsonnet-evaluator = "0.4.2"
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
use std::collections::BTreeMap;
|
||||
|
||||
/// This crate defines the API between reaction's core and plugins.
|
||||
///
|
||||
/// It's based on [`remoc`], which permits to multiplex channels and remote objects/functions/trait
|
||||
|
|
@ -63,7 +65,7 @@ pub enum Value {
|
|||
Float(f64),
|
||||
String(String),
|
||||
Array(Vec<Value>),
|
||||
Object(Vec<(String, Value)>),
|
||||
Object(BTreeMap<String, Value>),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
|
@ -92,3 +94,5 @@ pub struct Exec {
|
|||
pub match_: Vec<String>,
|
||||
pub result: rch::oneshot::Sender<String>,
|
||||
}
|
||||
|
||||
// TODO write main function here?
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
|
|||
use thiserror::Error;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use super::{merge_attrs, Pattern, Stream};
|
||||
use super::{merge_attrs, Pattern, Plugin, Stream};
|
||||
|
||||
pub type Patterns = BTreeMap<String, Arc<Pattern>>;
|
||||
|
||||
|
|
@ -25,7 +25,7 @@ pub struct Config {
|
|||
pub state_directory: String,
|
||||
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub plugin_directories: Vec<String>,
|
||||
pub plugins: Vec<Plugin>,
|
||||
|
||||
#[serde(default)]
|
||||
pub patterns: Patterns,
|
||||
|
|
@ -83,13 +83,6 @@ impl Config {
|
|||
"state_directory",
|
||||
)?;
|
||||
|
||||
self.plugin_directories = merge_attrs(
|
||||
self.plugin_directories.clone(),
|
||||
other.plugin_directories,
|
||||
Vec::default(),
|
||||
"plugin_directories",
|
||||
)?;
|
||||
|
||||
self.concurrency = merge_attrs(
|
||||
self.concurrency,
|
||||
other.concurrency,
|
||||
|
|
@ -108,13 +101,8 @@ impl Config {
|
|||
// Nullify this useless field
|
||||
self._definitions = serde_json::Value::Null;
|
||||
|
||||
for dir in &self.plugin_directories {
|
||||
if dir.is_empty() {
|
||||
return Err("can't specify empty plugin directory".into());
|
||||
}
|
||||
if !dir.starts_with("/") {
|
||||
return Err(format!("plugin directory paths must be absolute: {dir}"));
|
||||
}
|
||||
for plugin in &mut self.plugins {
|
||||
plugin.setup()?;
|
||||
}
|
||||
|
||||
if self.patterns.is_empty() {
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ mod config;
|
|||
mod filter;
|
||||
mod parse_duration;
|
||||
mod pattern;
|
||||
mod plugin;
|
||||
mod stream;
|
||||
|
||||
use std::fmt::Debug;
|
||||
|
|
@ -12,6 +13,7 @@ pub use config::{Config, Patterns};
|
|||
pub use filter::{Duplicate, Filter};
|
||||
use parse_duration::parse_duration;
|
||||
pub use pattern::{Pattern, PatternType};
|
||||
pub use plugin::{Plugin};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
pub use stream::Stream;
|
||||
|
|
|
|||
45
src/concepts/plugin.rs
Normal file
45
src/concepts/plugin.rs
Normal file
|
|
@ -0,0 +1,45 @@
|
|||
use std::{collections::BTreeMap, process::Stdio};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::process::{Child, Command};
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[cfg_attr(test, derive(Default))]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct Plugin {
|
||||
pub path: String,
|
||||
/// If empty, defaults to root
|
||||
pub file_owner: Option<String>,
|
||||
/// If empty, defaults to root
|
||||
pub exec_user: Option<String>,
|
||||
/// If empty, hash is not performed
|
||||
pub sha256: Option<String>,
|
||||
/// Option for `run0`. Do not provide User.
|
||||
pub systemd_options: Option<BTreeMap<String, Vec<String>>>,
|
||||
}
|
||||
|
||||
// NOTE
|
||||
// `run0` can be used for security customisation.
|
||||
// with the --pipe option, raw stdio fd are transmitted to the underlying command, so there is no overhead.
|
||||
|
||||
impl Plugin {
|
||||
pub fn setup(&mut self) -> Result<(), String> {
|
||||
if self.path.is_empty() {
|
||||
return Err("can't specify empty plugin path".into());
|
||||
}
|
||||
if !self.path.starts_with("/") {
|
||||
return Err(format!("plugin paths must be absolute: {}", self.path));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn launch(&self) -> Result<Child, std::io::Error> {
|
||||
// TODO owner check
|
||||
// TODO hash check
|
||||
// TODO run0 options
|
||||
Command::new(&self.path)
|
||||
.stdin(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
.spawn()
|
||||
}
|
||||
}
|
||||
|
|
@ -16,9 +16,10 @@ use tokio::{
|
|||
};
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::{concepts::Config, plugin::Plugins, treedb::Database};
|
||||
use crate::{concepts::Config, treedb::Database};
|
||||
use filter::FilterManager;
|
||||
pub use filter::React;
|
||||
use plugin::Plugins;
|
||||
pub use shutdown::{ShutdownController, ShutdownDelegate, ShutdownToken};
|
||||
use socket::Socket;
|
||||
use stream::StreamManager;
|
||||
|
|
@ -27,6 +28,7 @@ use stream::StreamManager;
|
|||
pub use filter::tests;
|
||||
|
||||
mod filter;
|
||||
mod plugin;
|
||||
mod shutdown;
|
||||
mod socket;
|
||||
mod stream;
|
||||
|
|
@ -37,8 +39,7 @@ pub async fn daemon(
|
|||
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
let config: &'static Config = Box::leak(Box::new(Config::from_path(&config_path)?));
|
||||
|
||||
let mut plugins = Plugins::default();
|
||||
plugins.import(&config.plugin_directories).await?;
|
||||
let mut plugins = Plugins::new(&config.plugins).await?;
|
||||
|
||||
// Cancellation Token
|
||||
let shutdown = ShutdownController::new();
|
||||
|
|
|
|||
182
src/daemon/plugin/mod.rs
Normal file
182
src/daemon/plugin/mod.rs
Normal file
|
|
@ -0,0 +1,182 @@
|
|||
use std::{
|
||||
collections::BTreeMap,
|
||||
ops::{Deref, DerefMut},
|
||||
};
|
||||
|
||||
use reaction_plugin::{ActionImpl, PluginInfo, PluginInfoClient, StreamImpl};
|
||||
use remoc::Connect;
|
||||
use serde_json::Value;
|
||||
use tokio::process::Child;
|
||||
|
||||
use crate::concepts::Plugin;
|
||||
|
||||
mod value;
|
||||
|
||||
use value::to_stable_value;
|
||||
|
||||
pub struct PluginManager {
|
||||
child: Child,
|
||||
plugin: &'static Plugin,
|
||||
plugin_info: PluginInfoClient,
|
||||
}
|
||||
|
||||
impl Deref for PluginManager {
|
||||
type Target = PluginInfoClient;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.plugin_info
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for PluginManager {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.plugin_info
|
||||
}
|
||||
}
|
||||
|
||||
impl PluginManager {
|
||||
async fn new(plugin: &'static Plugin) -> Result<Self, String> {
|
||||
let mut child = plugin
|
||||
.launch()
|
||||
.map_err(|err| format!("could not launch plugin: {err}"))?;
|
||||
|
||||
let stdin = child.stdin.take().unwrap();
|
||||
let stdout = child.stdout.take().unwrap();
|
||||
|
||||
let (conn, _tx, mut rx): (
|
||||
_,
|
||||
remoc::rch::base::Sender<()>,
|
||||
remoc::rch::base::Receiver<PluginInfoClient>,
|
||||
) = Connect::io_buffered(remoc::Cfg::default(), stdout, stdin, 2048)
|
||||
.await
|
||||
.map_err(|err| format!("could not init communication with plugin: {err}"))?;
|
||||
|
||||
tokio::spawn(conn);
|
||||
|
||||
let plugin_info = rx
|
||||
.recv()
|
||||
.await
|
||||
.map_err(|err| format!("could not retrieve initial information from plugin: {err}"))?
|
||||
.ok_or("could not retrieve initial information from plugin: no data")?;
|
||||
|
||||
Ok(Self {
|
||||
child,
|
||||
plugin,
|
||||
plugin_info,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Plugins {
|
||||
plugins: BTreeMap<String, PluginManager>,
|
||||
streams: BTreeMap<String, String>,
|
||||
actions: BTreeMap<String, String>,
|
||||
}
|
||||
|
||||
impl Plugins {
|
||||
pub async fn new(plugins: &'static Vec<Plugin>) -> Result<Self, String> {
|
||||
let mut this = Self::default();
|
||||
|
||||
for plugin in plugins {
|
||||
let path = plugin.path.clone();
|
||||
this.load_plugin(&plugin)
|
||||
.await
|
||||
.map_err(|err| format!("plugin {path}: {err}]"))?;
|
||||
}
|
||||
|
||||
Ok(this)
|
||||
}
|
||||
|
||||
async fn load_plugin(&mut self, plugin: &'static Plugin) -> Result<(), String> {
|
||||
let path = plugin.path.clone();
|
||||
let manager = PluginManager::new(plugin).await?;
|
||||
|
||||
for stream in manager
|
||||
.stream_impls()
|
||||
.await
|
||||
.map_err(|err| format!("plugin error: {err}"))?
|
||||
{
|
||||
if let Some(path) = self.streams.insert(stream.clone().into(), path.clone()) {
|
||||
return Err(format!(
|
||||
"plugin {path} already exposed a stream with type name '{stream}'"
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
for action in manager
|
||||
.action_impls()
|
||||
.await
|
||||
.map_err(|err| format!("plugin error: {err}"))?
|
||||
{
|
||||
if let Some(path) = self.actions.insert(action.clone().into(), path.clone()) {
|
||||
return Err(format!(
|
||||
"plugin {path} already exposed a action with type name '{action}'"
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
self.plugins.insert(path, manager);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn finish_plugin_setup(self) -> Result<(), String> {
|
||||
for mut plugin in self.plugins.into_values() {
|
||||
plugin
|
||||
.finish_setup()
|
||||
.await
|
||||
.map_err(|err| format!("plugin error: {err}"))?
|
||||
.map_err(|err| format!("invalid config for plugin: {err}"))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn init_stream_impl(
|
||||
&mut self,
|
||||
stream_name: String,
|
||||
stream_type: String,
|
||||
config: Value,
|
||||
) -> Result<StreamImpl, String> {
|
||||
let plugin_name = self
|
||||
.streams
|
||||
.get(&stream_type)
|
||||
.ok_or(format!("No plugin provided a stream type '{stream_type}'"))?;
|
||||
|
||||
let plugin = self.plugins.get_mut(plugin_name).unwrap();
|
||||
|
||||
plugin
|
||||
.stream_impl(
|
||||
stream_name.into(),
|
||||
stream_type.into(),
|
||||
to_stable_value(config),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| format!("plugin error: {err}"))?
|
||||
}
|
||||
|
||||
pub async fn init_action_impl(
|
||||
&mut self,
|
||||
stream_name: String,
|
||||
filter_name: String,
|
||||
action_name: String,
|
||||
action_type: String,
|
||||
config: Value,
|
||||
) -> Result<ActionImpl, String> {
|
||||
let plugin_name = self
|
||||
.actions
|
||||
.get(&action_type)
|
||||
.ok_or(format!("No plugin provided a action type '{action_type}'"))?;
|
||||
|
||||
let plugin = self.plugins.get_mut(plugin_name).unwrap();
|
||||
|
||||
plugin
|
||||
.action_impl(
|
||||
stream_name.into(),
|
||||
filter_name.into(),
|
||||
action_name.into(),
|
||||
action_type.into(),
|
||||
to_stable_value(config),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| format!("plugin error: {err}"))?
|
||||
}
|
||||
}
|
||||
|
|
@ -1,6 +1,7 @@
|
|||
use std::collections::BTreeMap;
|
||||
|
||||
use reaction_plugin::Value as RValue;
|
||||
use serde_json::Value as JValue;
|
||||
use stabby::{tuple::Tuple2, vec::Vec};
|
||||
|
||||
pub fn to_stable_value(val: JValue) -> RValue {
|
||||
match val {
|
||||
|
|
@ -24,9 +25,9 @@ pub fn to_stable_value(val: JValue) -> RValue {
|
|||
vec
|
||||
}),
|
||||
JValue::Object(m) => RValue::Object({
|
||||
let mut map = Vec::with_capacity(m.len());
|
||||
let mut map = BTreeMap::new();
|
||||
for (key, val) in m {
|
||||
map.push(Tuple2(key.into(), to_stable_value(val)));
|
||||
map.insert(key.into(), to_stable_value(val));
|
||||
}
|
||||
map
|
||||
}),
|
||||
|
|
@ -6,7 +6,7 @@ use std::{
|
|||
|
||||
use chrono::Local;
|
||||
use futures::{FutureExt, Stream as AsyncStream, StreamExt};
|
||||
use reaction_plugin::{BoxedStreamImpl, StreamImplDynMut};
|
||||
use reaction_plugin::StreamImpl;
|
||||
use regex::RegexSet;
|
||||
use tokio::{
|
||||
io::{AsyncBufReadExt, BufReader},
|
||||
|
|
@ -17,8 +17,7 @@ use tracing::{error, info, warn};
|
|||
|
||||
use crate::{
|
||||
concepts::{Filter, Stream},
|
||||
daemon::filter::FilterManager,
|
||||
plugin::Plugins,
|
||||
daemon::{filter::FilterManager, plugin::Plugins},
|
||||
};
|
||||
|
||||
use super::shutdown::ShutdownToken;
|
||||
|
|
@ -54,7 +53,7 @@ pub struct StreamManager {
|
|||
compiled_regex_set: RegexSet,
|
||||
regex_index_to_filter_manager: Vec<FilterManager>,
|
||||
stream: &'static Stream,
|
||||
stream_plugin: Option<&'static mut BoxedStreamImpl>,
|
||||
stream_plugin: Option<StreamImpl>,
|
||||
shutdown: ShutdownToken,
|
||||
}
|
||||
|
||||
|
|
@ -76,11 +75,11 @@ impl StreamManager {
|
|||
.collect();
|
||||
|
||||
let stream_plugin = if stream.is_plugin() {
|
||||
Some(Box::leak(Box::new(plugins.init_stream_impl(
|
||||
Some(plugins.init_stream_impl(
|
||||
stream.name.clone(),
|
||||
stream.stream_type.clone().unwrap(),
|
||||
stream.options.clone(),
|
||||
)?)))
|
||||
)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
|
|
|||
|
|
@ -13,7 +13,6 @@ pub mod cli;
|
|||
pub mod client;
|
||||
pub mod concepts;
|
||||
pub mod daemon;
|
||||
pub mod plugin;
|
||||
pub mod protocol;
|
||||
pub mod tests;
|
||||
pub mod treedb;
|
||||
|
|
|
|||
|
|
@ -1,168 +0,0 @@
|
|||
use std::{collections::BTreeMap, path::PathBuf};
|
||||
|
||||
use reaction_plugin::{
|
||||
BoxedActionImpl, BoxedPluginInfo, BoxedStreamImpl, PluginInfoDyn, PluginInfoDynMut,
|
||||
};
|
||||
use serde_json::Value;
|
||||
use stabby::libloading::StabbyLibrary;
|
||||
use tokio::{fs::read_dir, runtime::Handle};
|
||||
use value::to_stable_value;
|
||||
|
||||
mod value;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Plugins {
|
||||
plugins: BTreeMap<String, BoxedPluginInfo>,
|
||||
streams: BTreeMap<String, String>,
|
||||
// filters: BTreeMap<String, String>,
|
||||
actions: BTreeMap<String, String>,
|
||||
}
|
||||
|
||||
impl Plugins {
|
||||
pub async fn import(&mut self, plugin_directories: &Vec<String>) -> Result<(), String> {
|
||||
for plugin_directory in plugin_directories {
|
||||
let mut dir_entries = read_dir(&plugin_directory).await.map_err(|err| {
|
||||
format!("Error reading plugin directory {plugin_directory}: {err}")
|
||||
})?;
|
||||
loop {
|
||||
match dir_entries.next_entry().await {
|
||||
Err(err) => {
|
||||
return Err(format!(
|
||||
"Error reading plugin directory {plugin_directory}: {err}"
|
||||
))
|
||||
}
|
||||
|
||||
Ok(None) => break,
|
||||
|
||||
Ok(Some(entry)) => {
|
||||
let filename = PathBuf::from(&plugin_directory).join(&entry.file_name());
|
||||
self.load_plugin(filename.clone())
|
||||
.await
|
||||
.map_err(|err| format!("Error loading plugin {filename:?}: {err}"))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn load_plugin(&mut self, filename: PathBuf) -> Result<(), String> {
|
||||
// TODO check ownership of file?
|
||||
|
||||
let name = filename.to_string_lossy().to_string();
|
||||
// SAFETY This function is exposed by libloading as unsafe
|
||||
// But we're (hopefully) gonna be safe with stabby <3
|
||||
#[allow(unsafe_code)]
|
||||
let plugin = Handle::current()
|
||||
.spawn_blocking(|| unsafe { libloading::Library::new(filename) })
|
||||
.await
|
||||
// Join Error
|
||||
.map_err(|err| err.to_string())?
|
||||
// Libloading Error
|
||||
.map_err(|err| err.to_string())?;
|
||||
|
||||
// SAFETY This function is exposed by stabby as unsafe
|
||||
// But we're (hopefully) gonna be safe <3
|
||||
#[allow(unsafe_code)]
|
||||
let plugin_init = unsafe {
|
||||
plugin.get_stabbied::<extern "C" fn() -> BoxedPluginInfo>(b"reaction_plugin")
|
||||
}.map_err(|err| format!("expected entrypoint `fn reaction_plugin() -> BoxedPluginInfo` is either not present or malformed: {err}"))?;
|
||||
|
||||
let plugin_info = plugin_init();
|
||||
|
||||
for stream in plugin_info.stream_impls() {
|
||||
if let Some(name) = self.streams.insert(stream.clone().into(), name.clone()) {
|
||||
return Err(format!(
|
||||
"plugin {name} already exposed a stream with type name '{stream}'"
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// for filter in plugin_info.filter_impls() {
|
||||
// if let Some(name) = self.filters.insert(filter.clone().into(), name.clone()) {
|
||||
// return Err(format!(
|
||||
// "plugin {name} already exposed a filter with type name '{filter}'"
|
||||
// ));
|
||||
// }
|
||||
// }
|
||||
|
||||
for action in plugin_info.action_impls() {
|
||||
if let Some(name) = self.actions.insert(action.clone().into(), name.clone()) {
|
||||
return Err(format!(
|
||||
"plugin {name} already exposed a action with type name '{action}'"
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
self.plugins.insert(name, plugin_info);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn finish_plugin_setup(self) -> Result<(), String> {
|
||||
for mut plugin in self.plugins.into_values() {
|
||||
// Didn't find a more elegant way to manipulate [`stabby::result::Result`]
|
||||
let result = plugin.finish_setup();
|
||||
if result.is_err() {
|
||||
return Err(result.unwrap_err().into());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn init_stream_impl(
|
||||
&mut self,
|
||||
stream_name: String,
|
||||
stream_type: String,
|
||||
config: Value,
|
||||
) -> Result<BoxedStreamImpl, String> {
|
||||
let plugin_name = self
|
||||
.streams
|
||||
.get(&stream_type)
|
||||
.ok_or(format!("No plugin provided a stream type '{stream_type}'"))?;
|
||||
|
||||
let plugin = self.plugins.get_mut(plugin_name).unwrap();
|
||||
|
||||
let result = plugin.stream_impl(
|
||||
stream_name.into(),
|
||||
stream_type.into(),
|
||||
to_stable_value(config),
|
||||
);
|
||||
|
||||
if result.is_ok() {
|
||||
Ok(result.unwrap())
|
||||
} else {
|
||||
Err(result.err().unwrap().into())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init_action_impl(
|
||||
&mut self,
|
||||
stream_name: String,
|
||||
filter_name: String,
|
||||
action_name: String,
|
||||
action_type: String,
|
||||
config: Value,
|
||||
) -> Result<BoxedActionImpl, String> {
|
||||
let plugin_name = self
|
||||
.actions
|
||||
.get(&action_type)
|
||||
.ok_or(format!("No plugin provided a action type '{action_type}'"))?;
|
||||
|
||||
let plugin = self.plugins.get_mut(plugin_name).unwrap();
|
||||
|
||||
let result = plugin.action_impl(
|
||||
stream_name.into(),
|
||||
filter_name.into(),
|
||||
action_name.into(),
|
||||
action_type.into(),
|
||||
to_stable_value(config),
|
||||
);
|
||||
|
||||
if result.is_ok() {
|
||||
Ok(result.unwrap())
|
||||
} else {
|
||||
Err(result.err().unwrap().into())
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue