diff --git a/Cargo.lock b/Cargo.lock index a71929d..e1cfd21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1051,6 +1051,7 @@ dependencies = [ "reaction-plugin", "regex", "remoc", + "serde", "serde_json", "serde_yaml", "tempfile", diff --git a/Cargo.toml b/Cargo.toml index 07bd33c..6ce6c63 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ num_cpus = "1.16.0" # Regex matching regex = "1.10.4" # Configuration languages, ser/deserialisation +serde.workspace = true serde_json = "1.0.117" serde_yaml = "0.9.34" jrsonnet-evaluator = "0.4.2" diff --git a/reaction-plugin/src/lib.rs b/reaction-plugin/src/lib.rs index bd9696e..10b7f49 100644 --- a/reaction-plugin/src/lib.rs +++ b/reaction-plugin/src/lib.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeMap; + /// This crate defines the API between reaction's core and plugins. /// /// It's based on [`remoc`], which permits to multiplex channels and remote objects/functions/trait @@ -63,7 +65,7 @@ pub enum Value { Float(f64), String(String), Array(Vec), - Object(Vec<(String, Value)>), + Object(BTreeMap), } #[derive(Serialize, Deserialize)] @@ -92,3 +94,5 @@ pub struct Exec { pub match_: Vec, pub result: rch::oneshot::Sender, } + +// TODO write main function here? diff --git a/src/concepts/config.rs b/src/concepts/config.rs index 2bf1296..94e0df7 100644 --- a/src/concepts/config.rs +++ b/src/concepts/config.rs @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; use thiserror::Error; use tracing::{debug, error, info, warn}; -use super::{merge_attrs, Pattern, Stream}; +use super::{merge_attrs, Pattern, Plugin, Stream}; pub type Patterns = BTreeMap>; @@ -25,7 +25,7 @@ pub struct Config { pub state_directory: String, #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub plugin_directories: Vec, + pub plugins: Vec, #[serde(default)] pub patterns: Patterns, @@ -83,13 +83,6 @@ impl Config { "state_directory", )?; - self.plugin_directories = merge_attrs( - self.plugin_directories.clone(), - other.plugin_directories, - Vec::default(), - "plugin_directories", - )?; - self.concurrency = merge_attrs( self.concurrency, other.concurrency, @@ -108,13 +101,8 @@ impl Config { // Nullify this useless field self._definitions = serde_json::Value::Null; - for dir in &self.plugin_directories { - if dir.is_empty() { - return Err("can't specify empty plugin directory".into()); - } - if !dir.starts_with("/") { - return Err(format!("plugin directory paths must be absolute: {dir}")); - } + for plugin in &mut self.plugins { + plugin.setup()?; } if self.patterns.is_empty() { diff --git a/src/concepts/mod.rs b/src/concepts/mod.rs index b059b62..44b999d 100644 --- a/src/concepts/mod.rs +++ b/src/concepts/mod.rs @@ -3,6 +3,7 @@ mod config; mod filter; mod parse_duration; mod pattern; +mod plugin; mod stream; use std::fmt::Debug; @@ -12,6 +13,7 @@ pub use config::{Config, Patterns}; pub use filter::{Duplicate, Filter}; use parse_duration::parse_duration; pub use pattern::{Pattern, PatternType}; +pub use plugin::{Plugin}; use serde::{Deserialize, Serialize}; use serde_json::Value; pub use stream::Stream; diff --git a/src/concepts/plugin.rs b/src/concepts/plugin.rs new file mode 100644 index 0000000..f4da976 --- /dev/null +++ b/src/concepts/plugin.rs @@ -0,0 +1,45 @@ +use std::{collections::BTreeMap, process::Stdio}; + +use serde::{Deserialize, Serialize}; +use tokio::process::{Child, Command}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[cfg_attr(test, derive(Default))] +#[serde(deny_unknown_fields)] +pub struct Plugin { + pub path: String, + /// If empty, defaults to root + pub file_owner: Option, + /// If empty, defaults to root + pub exec_user: Option, + /// If empty, hash is not performed + pub sha256: Option, + /// Option for `run0`. Do not provide User. + pub systemd_options: Option>>, +} + +// NOTE +// `run0` can be used for security customisation. +// with the --pipe option, raw stdio fd are transmitted to the underlying command, so there is no overhead. + +impl Plugin { + pub fn setup(&mut self) -> Result<(), String> { + if self.path.is_empty() { + return Err("can't specify empty plugin path".into()); + } + if !self.path.starts_with("/") { + return Err(format!("plugin paths must be absolute: {}", self.path)); + } + Ok(()) + } + + pub fn launch(&self) -> Result { + // TODO owner check + // TODO hash check + // TODO run0 options + Command::new(&self.path) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn() + } +} diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index c396b30..80307ce 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -16,9 +16,10 @@ use tokio::{ }; use tracing::{debug, info}; -use crate::{concepts::Config, plugin::Plugins, treedb::Database}; +use crate::{concepts::Config, treedb::Database}; use filter::FilterManager; pub use filter::React; +use plugin::Plugins; pub use shutdown::{ShutdownController, ShutdownDelegate, ShutdownToken}; use socket::Socket; use stream::StreamManager; @@ -27,6 +28,7 @@ use stream::StreamManager; pub use filter::tests; mod filter; +mod plugin; mod shutdown; mod socket; mod stream; @@ -37,8 +39,7 @@ pub async fn daemon( ) -> Result<(), Box> { let config: &'static Config = Box::leak(Box::new(Config::from_path(&config_path)?)); - let mut plugins = Plugins::default(); - plugins.import(&config.plugin_directories).await?; + let mut plugins = Plugins::new(&config.plugins).await?; // Cancellation Token let shutdown = ShutdownController::new(); diff --git a/src/daemon/plugin/mod.rs b/src/daemon/plugin/mod.rs new file mode 100644 index 0000000..a9c4e9b --- /dev/null +++ b/src/daemon/plugin/mod.rs @@ -0,0 +1,182 @@ +use std::{ + collections::BTreeMap, + ops::{Deref, DerefMut}, +}; + +use reaction_plugin::{ActionImpl, PluginInfo, PluginInfoClient, StreamImpl}; +use remoc::Connect; +use serde_json::Value; +use tokio::process::Child; + +use crate::concepts::Plugin; + +mod value; + +use value::to_stable_value; + +pub struct PluginManager { + child: Child, + plugin: &'static Plugin, + plugin_info: PluginInfoClient, +} + +impl Deref for PluginManager { + type Target = PluginInfoClient; + fn deref(&self) -> &Self::Target { + &self.plugin_info + } +} + +impl DerefMut for PluginManager { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.plugin_info + } +} + +impl PluginManager { + async fn new(plugin: &'static Plugin) -> Result { + let mut child = plugin + .launch() + .map_err(|err| format!("could not launch plugin: {err}"))?; + + let stdin = child.stdin.take().unwrap(); + let stdout = child.stdout.take().unwrap(); + + let (conn, _tx, mut rx): ( + _, + remoc::rch::base::Sender<()>, + remoc::rch::base::Receiver, + ) = Connect::io_buffered(remoc::Cfg::default(), stdout, stdin, 2048) + .await + .map_err(|err| format!("could not init communication with plugin: {err}"))?; + + tokio::spawn(conn); + + let plugin_info = rx + .recv() + .await + .map_err(|err| format!("could not retrieve initial information from plugin: {err}"))? + .ok_or("could not retrieve initial information from plugin: no data")?; + + Ok(Self { + child, + plugin, + plugin_info, + }) + } +} + +#[derive(Default)] +pub struct Plugins { + plugins: BTreeMap, + streams: BTreeMap, + actions: BTreeMap, +} + +impl Plugins { + pub async fn new(plugins: &'static Vec) -> Result { + let mut this = Self::default(); + + for plugin in plugins { + let path = plugin.path.clone(); + this.load_plugin(&plugin) + .await + .map_err(|err| format!("plugin {path}: {err}]"))?; + } + + Ok(this) + } + + async fn load_plugin(&mut self, plugin: &'static Plugin) -> Result<(), String> { + let path = plugin.path.clone(); + let manager = PluginManager::new(plugin).await?; + + for stream in manager + .stream_impls() + .await + .map_err(|err| format!("plugin error: {err}"))? + { + if let Some(path) = self.streams.insert(stream.clone().into(), path.clone()) { + return Err(format!( + "plugin {path} already exposed a stream with type name '{stream}'" + )); + } + } + + for action in manager + .action_impls() + .await + .map_err(|err| format!("plugin error: {err}"))? + { + if let Some(path) = self.actions.insert(action.clone().into(), path.clone()) { + return Err(format!( + "plugin {path} already exposed a action with type name '{action}'" + )); + } + } + + self.plugins.insert(path, manager); + Ok(()) + } + + pub async fn finish_plugin_setup(self) -> Result<(), String> { + for mut plugin in self.plugins.into_values() { + plugin + .finish_setup() + .await + .map_err(|err| format!("plugin error: {err}"))? + .map_err(|err| format!("invalid config for plugin: {err}"))?; + } + Ok(()) + } + + pub async fn init_stream_impl( + &mut self, + stream_name: String, + stream_type: String, + config: Value, + ) -> Result { + let plugin_name = self + .streams + .get(&stream_type) + .ok_or(format!("No plugin provided a stream type '{stream_type}'"))?; + + let plugin = self.plugins.get_mut(plugin_name).unwrap(); + + plugin + .stream_impl( + stream_name.into(), + stream_type.into(), + to_stable_value(config), + ) + .await + .map_err(|err| format!("plugin error: {err}"))? + } + + pub async fn init_action_impl( + &mut self, + stream_name: String, + filter_name: String, + action_name: String, + action_type: String, + config: Value, + ) -> Result { + let plugin_name = self + .actions + .get(&action_type) + .ok_or(format!("No plugin provided a action type '{action_type}'"))?; + + let plugin = self.plugins.get_mut(plugin_name).unwrap(); + + plugin + .action_impl( + stream_name.into(), + filter_name.into(), + action_name.into(), + action_type.into(), + to_stable_value(config), + ) + .await + .map_err(|err| format!("plugin error: {err}"))? + } +} diff --git a/src/plugin/value.rs b/src/daemon/plugin/value.rs similarity index 85% rename from src/plugin/value.rs rename to src/daemon/plugin/value.rs index e8b7d31..e62c498 100644 --- a/src/plugin/value.rs +++ b/src/daemon/plugin/value.rs @@ -1,6 +1,7 @@ +use std::collections::BTreeMap; + use reaction_plugin::Value as RValue; use serde_json::Value as JValue; -use stabby::{tuple::Tuple2, vec::Vec}; pub fn to_stable_value(val: JValue) -> RValue { match val { @@ -24,9 +25,9 @@ pub fn to_stable_value(val: JValue) -> RValue { vec }), JValue::Object(m) => RValue::Object({ - let mut map = Vec::with_capacity(m.len()); + let mut map = BTreeMap::new(); for (key, val) in m { - map.push(Tuple2(key.into(), to_stable_value(val))); + map.insert(key.into(), to_stable_value(val)); } map }), diff --git a/src/daemon/stream.rs b/src/daemon/stream.rs index 37f0328..f5b2630 100644 --- a/src/daemon/stream.rs +++ b/src/daemon/stream.rs @@ -6,7 +6,7 @@ use std::{ use chrono::Local; use futures::{FutureExt, Stream as AsyncStream, StreamExt}; -use reaction_plugin::{BoxedStreamImpl, StreamImplDynMut}; +use reaction_plugin::StreamImpl; use regex::RegexSet; use tokio::{ io::{AsyncBufReadExt, BufReader}, @@ -17,8 +17,7 @@ use tracing::{error, info, warn}; use crate::{ concepts::{Filter, Stream}, - daemon::filter::FilterManager, - plugin::Plugins, + daemon::{filter::FilterManager, plugin::Plugins}, }; use super::shutdown::ShutdownToken; @@ -54,7 +53,7 @@ pub struct StreamManager { compiled_regex_set: RegexSet, regex_index_to_filter_manager: Vec, stream: &'static Stream, - stream_plugin: Option<&'static mut BoxedStreamImpl>, + stream_plugin: Option, shutdown: ShutdownToken, } @@ -76,11 +75,11 @@ impl StreamManager { .collect(); let stream_plugin = if stream.is_plugin() { - Some(Box::leak(Box::new(plugins.init_stream_impl( + Some(plugins.init_stream_impl( stream.name.clone(), stream.stream_type.clone().unwrap(), stream.options.clone(), - )?))) + )?) } else { None }; diff --git a/src/lib.rs b/src/lib.rs index fb3cb94..3619f1a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,7 +13,6 @@ pub mod cli; pub mod client; pub mod concepts; pub mod daemon; -pub mod plugin; pub mod protocol; pub mod tests; pub mod treedb; diff --git a/src/plugin/mod.rs b/src/plugin/mod.rs deleted file mode 100644 index 958a2cc..0000000 --- a/src/plugin/mod.rs +++ /dev/null @@ -1,168 +0,0 @@ -use std::{collections::BTreeMap, path::PathBuf}; - -use reaction_plugin::{ - BoxedActionImpl, BoxedPluginInfo, BoxedStreamImpl, PluginInfoDyn, PluginInfoDynMut, -}; -use serde_json::Value; -use stabby::libloading::StabbyLibrary; -use tokio::{fs::read_dir, runtime::Handle}; -use value::to_stable_value; - -mod value; - -#[derive(Default)] -pub struct Plugins { - plugins: BTreeMap, - streams: BTreeMap, - // filters: BTreeMap, - actions: BTreeMap, -} - -impl Plugins { - pub async fn import(&mut self, plugin_directories: &Vec) -> Result<(), String> { - for plugin_directory in plugin_directories { - let mut dir_entries = read_dir(&plugin_directory).await.map_err(|err| { - format!("Error reading plugin directory {plugin_directory}: {err}") - })?; - loop { - match dir_entries.next_entry().await { - Err(err) => { - return Err(format!( - "Error reading plugin directory {plugin_directory}: {err}" - )) - } - - Ok(None) => break, - - Ok(Some(entry)) => { - let filename = PathBuf::from(&plugin_directory).join(&entry.file_name()); - self.load_plugin(filename.clone()) - .await - .map_err(|err| format!("Error loading plugin {filename:?}: {err}"))?; - } - } - } - } - Ok(()) - } - - async fn load_plugin(&mut self, filename: PathBuf) -> Result<(), String> { - // TODO check ownership of file? - - let name = filename.to_string_lossy().to_string(); - // SAFETY This function is exposed by libloading as unsafe - // But we're (hopefully) gonna be safe with stabby <3 - #[allow(unsafe_code)] - let plugin = Handle::current() - .spawn_blocking(|| unsafe { libloading::Library::new(filename) }) - .await - // Join Error - .map_err(|err| err.to_string())? - // Libloading Error - .map_err(|err| err.to_string())?; - - // SAFETY This function is exposed by stabby as unsafe - // But we're (hopefully) gonna be safe <3 - #[allow(unsafe_code)] - let plugin_init = unsafe { - plugin.get_stabbied:: BoxedPluginInfo>(b"reaction_plugin") - }.map_err(|err| format!("expected entrypoint `fn reaction_plugin() -> BoxedPluginInfo` is either not present or malformed: {err}"))?; - - let plugin_info = plugin_init(); - - for stream in plugin_info.stream_impls() { - if let Some(name) = self.streams.insert(stream.clone().into(), name.clone()) { - return Err(format!( - "plugin {name} already exposed a stream with type name '{stream}'" - )); - } - } - - // for filter in plugin_info.filter_impls() { - // if let Some(name) = self.filters.insert(filter.clone().into(), name.clone()) { - // return Err(format!( - // "plugin {name} already exposed a filter with type name '{filter}'" - // )); - // } - // } - - for action in plugin_info.action_impls() { - if let Some(name) = self.actions.insert(action.clone().into(), name.clone()) { - return Err(format!( - "plugin {name} already exposed a action with type name '{action}'" - )); - } - } - - self.plugins.insert(name, plugin_info); - - Ok(()) - } - - pub fn finish_plugin_setup(self) -> Result<(), String> { - for mut plugin in self.plugins.into_values() { - // Didn't find a more elegant way to manipulate [`stabby::result::Result`] - let result = plugin.finish_setup(); - if result.is_err() { - return Err(result.unwrap_err().into()); - } - } - Ok(()) - } - - pub fn init_stream_impl( - &mut self, - stream_name: String, - stream_type: String, - config: Value, - ) -> Result { - let plugin_name = self - .streams - .get(&stream_type) - .ok_or(format!("No plugin provided a stream type '{stream_type}'"))?; - - let plugin = self.plugins.get_mut(plugin_name).unwrap(); - - let result = plugin.stream_impl( - stream_name.into(), - stream_type.into(), - to_stable_value(config), - ); - - if result.is_ok() { - Ok(result.unwrap()) - } else { - Err(result.err().unwrap().into()) - } - } - - pub fn init_action_impl( - &mut self, - stream_name: String, - filter_name: String, - action_name: String, - action_type: String, - config: Value, - ) -> Result { - let plugin_name = self - .actions - .get(&action_type) - .ok_or(format!("No plugin provided a action type '{action_type}'"))?; - - let plugin = self.plugins.get_mut(plugin_name).unwrap(); - - let result = plugin.action_impl( - stream_name.into(), - filter_name.into(), - action_name.into(), - action_type.into(), - to_stable_value(config), - ); - - if result.is_ok() { - Ok(result.unwrap()) - } else { - Err(result.err().unwrap().into()) - } - } -}