From 124a2827d9e96e1ee5cfe23fad7030df62556eff Mon Sep 17 00:00:00 2001 From: ppom Date: Mon, 27 Oct 2025 12:00:00 +0100 Subject: [PATCH] Cluster plugin init - Remove PersistData utility - Provide plugins a state directory instead, by starting them inside. - Store the secret key as a file inside this directory. - Use iroh's crate for base64 encoding, thus removing one dependency. - Implement plugin's stream_impl and action_impl functions, creating all necessary data structures. --- Cargo.lock | 7 +- Cargo.toml | 3 +- TODO | 14 +- plugins/reaction-plugin-cluster/Cargo.toml | 12 +- plugins/reaction-plugin-cluster/src/main.rs | 212 +++++++++++++----- .../reaction-plugin-cluster/src/secret_key.rs | 70 ++++++ plugins/reaction-plugin-virtual/Cargo.toml | 1 + plugins/reaction-plugin-virtual/src/main.rs | 47 ++-- plugins/reaction-plugin/Cargo.toml | 7 +- plugins/reaction-plugin/src/lib.rs | 37 +-- src/concepts/plugin.rs | 65 ++++-- src/daemon/plugin/mod.rs | 97 ++------ src/daemon/plugin/value.rs | 35 --- tests/test-conf/cluster-a.jsonnet | 64 ++++++ 14 files changed, 393 insertions(+), 278 deletions(-) create mode 100644 plugins/reaction-plugin-cluster/src/secret_key.rs delete mode 100644 src/daemon/plugin/value.rs create mode 100644 tests/test-conf/cluster-a.jsonnet diff --git a/Cargo.lock b/Cargo.lock index edc74cb..13d51f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2849,6 +2849,7 @@ version = "1.0.0" dependencies = [ "remoc", "serde", + "serde_json", "tokio", ] @@ -2856,12 +2857,13 @@ dependencies = [ name = "reaction-plugin-cluster" version = "0.1.0" dependencies = [ - "base64 0.22.1", + "data-encoding", "iroh", "rand 0.9.2", - "rand_core 0.9.3", "reaction-plugin", "remoc", + "serde", + "serde_json", "tokio", ] @@ -2871,6 +2873,7 @@ version = "0.1.0" dependencies = [ "reaction-plugin", "remoc", + "serde_json", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index fcec4c0..08daff8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ num_cpus = "1.16.0" regex = "1.10.4" # Configuration languages, ser/deserialisation serde.workspace = true -serde_json = "1.0.117" +serde_json.workspace = true serde_yaml = "0.9.34" jrsonnet-evaluator = "0.4.2" # Error macro @@ -80,6 +80,7 @@ members = ["plugins/reaction-plugin", "plugins/reaction-plugin-cluster", "plugin [workspace.dependencies] remoc = { version = "0.18.3" } serde = { version = "1.0.203", features = ["derive"] } +serde_json = "1.0.117" tokio = { version = "1.40.0" } [[bin]] diff --git a/TODO b/TODO index b81282c..557559e 100644 --- a/TODO +++ b/TODO @@ -1,16 +1,4 @@ Test what happens when a Filter's pattern Set changes (I think it's shitty) DB: add tests on stress testing (lines should always be in order) plugins: pipe stderr too and wrap errors in logs -plugins: provide tree storage? omg - -questionnements: -- quelle cli pour les plugins ? - - Directement en appelant le plugin ? reaction-plugin-cluster gen-id ? 🟢 - → Demande de savoir où stocker tout ça - - Via moult IPC ? reaction plugin cluster gen-id ? 🔴 - → Mais du coup c'est l'oeuf ou la poule entre avoir un serveur qui fonctionne et avoir un -- Stockage ? - - uniquement dans la db reaction - → Faut pas que ce soit trop gros, un peu d'overhead, risque de perdre la donnée - - à part dans le configuration directory - → Pas mal en vrai +plugins: provide treedb storage? omg (add an enum that's either remoc::rch::mpsc or tokio::mpsc) diff --git a/plugins/reaction-plugin-cluster/Cargo.toml b/plugins/reaction-plugin-cluster/Cargo.toml index 3d45475..902804d 100644 --- a/plugins/reaction-plugin-cluster/Cargo.toml +++ b/plugins/reaction-plugin-cluster/Cargo.toml @@ -4,10 +4,14 @@ version = "0.1.0" edition = "2024" [dependencies] -tokio = { workspace = true, features = ["rt-multi-thread"] } -remoc.workspace = true reaction-plugin.path = "../reaction-plugin" + +tokio.workspace = true +tokio.features = ["rt-multi-thread"] +remoc.workspace = true +serde.workspace = true +serde_json.workspace = true + +data-encoding = "2.9.0" iroh = "0.94.0" -base64 = "0.22.1" -rand_core = { version = "0.9.3", features = ["os_rng"] } rand = "0.9.2" diff --git a/plugins/reaction-plugin-cluster/src/main.rs b/plugins/reaction-plugin-cluster/src/main.rs index 0b24543..77230b1 100644 --- a/plugins/reaction-plugin-cluster/src/main.rs +++ b/plugins/reaction-plugin-cluster/src/main.rs @@ -1,12 +1,21 @@ -use std::collections::{BTreeMap, BTreeSet}; - -use base64::{Engine, prelude::BASE64_STANDARD}; -use iroh::{SecretKey, defaults}; -use reaction_plugin::{ - ActionImpl, Hello, Manifest, PersistData, PluginInfo, RemoteResult, StreamImpl, Value, - main_loop, +use std::{ + collections::{BTreeMap, BTreeSet}, + net::SocketAddr, }; -use remoc::{chmux::SendError, rtc}; + +use iroh::PublicKey; +use reaction_plugin::{ + ActionImpl, Exec, Hello, Manifest, PluginInfo, RemoteResult, StreamImpl, main_loop, +}; +use remoc::{rch::mpsc, rtc}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use tokio::fs; + +mod secret_key; +use secret_key::secret_key; + +use crate::secret_key::{key_b64_to_bytes, key_bytes_to_b64}; #[tokio::main] async fn main() { @@ -16,12 +25,63 @@ async fn main() { #[derive(Default)] struct Plugin { - data: Option, + streams: BTreeMap, + actions: Vec, +} + +/// Stream options as defined by the user +#[derive(Serialize, Deserialize)] +struct StreamOptions { + /// The secret that permits to join the cluster. + shared_secret: Option, + /// The secret that permits to join the cluster, as a file. + /// Beginning and ending whitespace will be trimmed. + shared_secret_file: Option, + /// Other nodes which are part of the cluster. + nodes: Vec, +} + +/// Stream information before start +struct StreamInit { + shared_secret: String, + nodes: Vec, + tx: mpsc::Sender>, +} + +#[derive(Serialize, Deserialize)] +struct NodeOption { + public_key: String, + addresses: Vec, +} + +#[derive(Serialize, Deserialize)] +struct NodeInit { + public_key: PublicKey, + addresses: Vec, +} + +#[derive(Serialize, Deserialize)] +struct ActionOptions { + /// The line to send to the corresponding cluster, example: "ban \" + send: String, + /// The name of the corresponding cluster, example: "my_cluster_stream" + to: String, + /// Whether the stream of this node also receives the line + #[serde(default, rename = "self")] + self_: bool, +} + +#[derive(Serialize, Deserialize)] +struct ActionInit { + send: String, + to: String, + self_: bool, + patterns: Vec, + rx: mpsc::Receiver, } impl PluginInfo for Plugin { - async fn manifest(&mut self, data: PersistData) -> Result { - self.data = Some(data); + async fn manifest(&mut self) -> Result { Ok(Manifest { hello: Hello::hello(), streams: BTreeSet::from(["cluster".into()]), @@ -35,24 +95,96 @@ impl PluginInfo for Plugin { stream_type: String, config: Value, ) -> RemoteResult { - todo!() + if &stream_type != "cluster" { + return Err("This plugin can't handle other stream types than cluster".into()); + } + + let options: StreamOptions = + serde_json::from_value(config).map_err(|err| format!("invalid options: {err}"))?; + + let shared_secret = if let Some(shared_secret) = options.shared_secret { + shared_secret + } else if let Some(shared_secret_file) = &options.shared_secret_file { + fs::read_to_string(shared_secret_file) + .await + .map_err(|err| { + format!("can't access shared_secret_file {shared_secret_file}: {err}") + })? + .trim() + .to_owned() + } else { + return Err("missing shared secret: either shared_secret or shared_secret_file must be provided".into()); + }; + + let mut init_nodes = Vec::default(); + for node in options.nodes.into_iter() { + let bytes = key_b64_to_bytes(&node.public_key) + .map_err(|err| format!("invalid public key {}: {err}", node.public_key))?; + + let public_key = PublicKey::from_bytes(&bytes) + .map_err(|err| format!("invalid public key {}: {err}", node.public_key))?; + + init_nodes.push(NodeInit { + public_key, + addresses: node.addresses, + }); + } + + let (tx, rx) = mpsc::channel(1); + + let stream = StreamInit { + shared_secret, + nodes: init_nodes, + tx, + }; + + if let Some(_) = self.streams.insert(stream_name, stream) { + return Err("this virtual stream has already been initialized".into()); + } + + Ok(StreamImpl { + stream: rx, + standalone: true, + }) } async fn action_impl( &mut self, - stream_name: String, - filter_name: String, - action_name: String, + _stream_name: String, + _filter_name: String, + _action_name: String, action_type: String, config: Value, patterns: Vec, ) -> RemoteResult { - todo!() + if &action_type != "cluster" { + return Err("This plugin can't handle other action types than cluster".into()); + } + + let options: ActionOptions = + serde_json::from_value(config).map_err(|err| format!("invalid options: {err}"))?; + + let (tx, rx) = mpsc::channel(1); + + let init_action = ActionInit { + send: options.send, + to: options.to, + self_: options.self_, + patterns, + rx, + }; + + self.actions.push(init_action); + + Ok(ActionImpl { tx }) } async fn finish_setup(&mut self) -> RemoteResult<()> { - let data = self.data.as_mut().unwrap(); - let secret_key = secret_key(data).await; + let secret_key = secret_key().await?; + eprintln!( + "public key of this node: {}", + key_bytes_to_b64(secret_key.public().as_bytes()) + ); todo!() } @@ -60,47 +192,3 @@ impl PluginInfo for Plugin { todo!() } } - -async fn secret_key(data: &mut PersistData) -> SecretKey { - if let Some(key) = get_secret_key(data) { - key - } else { - let key = SecretKey::generate(&mut rand::rng()); - set_secret_key(data, &key).await; - key - } -} - -fn get_secret_key(data: &PersistData) -> Option { - match &data.persisted_data { - Value::Object(map) => map.get("secret_key").and_then(|value| { - if let Value::String(str) = value { - let vec = BASE64_STANDARD.decode(str).ok()?; - if vec.len() != 32 { - return None; - } - let mut bytes = [0u8; 32]; - for i in 0..32 { - bytes[i] = vec[i]; - } - Some(SecretKey::from_bytes(&bytes)) - } else { - None - } - }), - _ => None, - } -} - -async fn set_secret_key(data: &mut PersistData, key: &SecretKey) { - let mut current = match &data.persisted_data { - Value::Object(map) => map.clone(), - _ => BTreeMap::default(), - }; - let base64 = BASE64_STANDARD.encode(key.to_bytes()); - current.insert("secret_key".into(), Value::String(base64)); - data.persist_data - .send(Value::Object(current)) - .await - .unwrap(); -} diff --git a/plugins/reaction-plugin-cluster/src/secret_key.rs b/plugins/reaction-plugin-cluster/src/secret_key.rs new file mode 100644 index 0000000..ca7e67a --- /dev/null +++ b/plugins/reaction-plugin-cluster/src/secret_key.rs @@ -0,0 +1,70 @@ +use std::io; + +use data_encoding::DecodeError; +use iroh::SecretKey; +use tokio::{ + fs::{self, File}, + io::AsyncWriteExt, +}; + +const SECRET_KEY_PATH: &str = "./secret_key.txt"; + +pub async fn secret_key() -> Result { + if let Some(key) = get_secret_key().await? { + Ok(key) + } else { + let key = SecretKey::generate(&mut rand::rng()); + set_secret_key(&key).await?; + Ok(key) + } +} + +async fn get_secret_key() -> Result, String> { + let key = match fs::read_to_string(SECRET_KEY_PATH).await { + Ok(key) => Ok(key), + Err(err) => match err.kind() { + io::ErrorKind::NotFound => return Ok(None), + _ => Err(format!("can't read secret key file: {err}")), + }, + }?; + let bytes = match key_b64_to_bytes(&key) { + Ok(key) => Ok(key), + Err(err) => Err(format!( + "invalid secret key read from file: {err}. Please remove the `{SECRET_KEY_PATH}` file from plugin directory." + )), + }?; + Ok(Some(SecretKey::from_bytes(&bytes))) +} + +async fn set_secret_key(key: &SecretKey) -> Result<(), String> { + let secret_key = key_bytes_to_b64(&key.to_bytes()); + File::options() + .mode(0o600) + .write(true) + .create(true) + .open(SECRET_KEY_PATH) + .await + .map_err(|err| format!("can't open `{SECRET_KEY_PATH}` in plugin directory: {err}"))? + .write_all(secret_key.as_bytes()) + .await + .map_err(|err| format!("can't write to `{SECRET_KEY_PATH}` in plugin directory: {err}")) +} + +pub fn key_b64_to_bytes(key: &str) -> Result<[u8; 32], DecodeError> { + let vec = data_encoding::BASE64URL.decode(key.as_bytes())?; + if vec.len() != 32 { + return Err(DecodeError { + position: vec.len(), + kind: data_encoding::DecodeKind::Length, + }); + } + let mut bytes = [0u8; 32]; + for i in 0..32 { + bytes[i] = vec[i]; + } + Ok(bytes) +} + +pub fn key_bytes_to_b64(key: &[u8; 32]) -> String { + data_encoding::BASE64URL.encode(key) +} diff --git a/plugins/reaction-plugin-virtual/Cargo.toml b/plugins/reaction-plugin-virtual/Cargo.toml index 55ce41c..d0d65c5 100644 --- a/plugins/reaction-plugin-virtual/Cargo.toml +++ b/plugins/reaction-plugin-virtual/Cargo.toml @@ -7,3 +7,4 @@ edition = "2024" tokio = { workspace = true, features = ["rt-multi-thread"] } remoc.workspace = true reaction-plugin.path = "../reaction-plugin" +serde_json.workspace = true diff --git a/plugins/reaction-plugin-virtual/src/main.rs b/plugins/reaction-plugin-virtual/src/main.rs index 3b06cbc..5d4e7de 100644 --- a/plugins/reaction-plugin-virtual/src/main.rs +++ b/plugins/reaction-plugin-virtual/src/main.rs @@ -1,10 +1,10 @@ use std::collections::{BTreeMap, BTreeSet}; use reaction_plugin::{ - ActionImpl, Exec, Hello, Line, Manifest, PersistData, PluginInfo, RemoteResult, StreamImpl, - Value, + ActionImpl, Exec, Hello, Line, Manifest, PluginInfo, RemoteResult, StreamImpl, Value, }; use remoc::{rch::mpsc, rtc}; +use serde::{Deserialize, Serialize}; #[tokio::main] async fn main() { @@ -19,7 +19,7 @@ struct Plugin { } impl PluginInfo for Plugin { - async fn manifest(&mut self, _data: PersistData) -> Result { + async fn manifest(&mut self) -> Result { Ok(Manifest { hello: Hello::hello(), streams: BTreeSet::from(["virtual".into()]), @@ -59,7 +59,7 @@ impl PluginInfo for Plugin { patterns: Vec, ) -> RemoteResult { if &action_type != "virtual" { - return Err("This plugin can't handle other stream types than virtual".into()); + return Err("This plugin can't handle other action types than virtual".into()); } let (virtual_action_init, tx) = @@ -126,6 +126,14 @@ impl VirtualStream { } } +#[derive(Serialize, Deserialize)] +struct ActionOptions { + /// The line to send to the corresponding virtual stream, example: "ban \" + send: String, + /// The name of the corresponding virtual stream, example: "my_stream" + to: String, +} + struct VirtualActionInit { stream_name: String, filter_name: String, @@ -144,30 +152,9 @@ impl VirtualActionInit { config: Value, patterns: Vec, ) -> Result<(Self, mpsc::Sender), String> { - let send; - let to; - match config { - Value::Object(mut map) => { - send = match map.remove("send") { - Some(Value::String(value)) => value, - _ => return Err("`send` must be a string to send to the corresponding virtual stream, example: \"ban \"".into()), - }; - - to = match map.remove("to") { - Some(Value::String(value)) => value, - _ => return Err("`to` must be the name of the corresponding virtual stream, example: \"my_stream\"".into()), - }; - - if map.len() != 0 { - return Err( - "actions of type virtual accept only `send` and `to` options".into(), - ); - } - } - _ => { - return Err("actions of type virtual require `send` and `to` options".into()); - } - } + let options: ActionOptions = serde_json::from_value(config).map_err(|err| { + format!("invalid options for action {stream_name}.{filter_name}.{action_name}: {err}") + })?; let patterns = patterns .into_iter() @@ -182,8 +169,8 @@ impl VirtualActionInit { action_name, rx, patterns, - send, - to, + send: options.send, + to: options.to, }, tx, )) diff --git a/plugins/reaction-plugin/Cargo.toml b/plugins/reaction-plugin/Cargo.toml index 3bd74e9..4a23e2c 100644 --- a/plugins/reaction-plugin/Cargo.toml +++ b/plugins/reaction-plugin/Cargo.toml @@ -5,5 +5,10 @@ edition = "2024" [dependencies] remoc.workspace = true + serde.workspace = true -tokio = { workspace = true, features = ["io-std"] } + +serde_json.workspace = true + +tokio.workspace = true +tokio.features = ["io-std"] diff --git a/plugins/reaction-plugin/src/lib.rs b/plugins/reaction-plugin/src/lib.rs index 0bf602e..9f092f9 100644 --- a/plugins/reaction-plugin/src/lib.rs +++ b/plugins/reaction-plugin/src/lib.rs @@ -35,24 +35,22 @@ //! ``` //! This can be useful if you want to provide CLI functionnality to your users. //! +//! It will be run in its own directory, in which it should have write access. +//! //! ## Examples //! //! Core plugins can be found here: //! The "virtual" plugin is the simplest and can serve as a template. //! You'll have to adjust dependencies versions in `Cargo.toml`. -use std::{ - collections::{BTreeMap, BTreeSet}, - error::Error, - fmt::Display, -}; +use std::{collections::BTreeSet, error::Error, fmt::Display}; use remoc::{ - Connect, - rch::{self, mpsc}, + Connect, rch, rtc::{self, Server}, }; use serde::{Deserialize, Serialize}; +pub use serde_json::Value; use tokio::io::{stdin, stdout}; /// This is the only trait that **must** be implemented by a plugin. @@ -60,7 +58,7 @@ use tokio::io::{stdin, stdout}; #[rtc::remote] pub trait PluginInfo { /// Return the manifest of the plugin. - async fn manifest(&mut self, data: PersistData) -> Result; + async fn manifest(&mut self) -> Result; /// Return one stream of a given type if it exists async fn stream_impl( @@ -159,29 +157,6 @@ impl Hello { } } -/// Represents a configuration value. -/// This is not meant as an efficient type, but as a very flexible one. -#[derive(Serialize, Deserialize, Clone, Debug)] -pub enum Value { - Null, - Bool(bool), - Integer(i64), - Float(f64), - String(String), - Array(Vec), - Object(BTreeMap), -} - -/// Data persisted by reaction for the plugin. -/// This is persisted as a single JSON file by reaction, so it is not suitable for big sizes of data. -#[derive(Serialize, Deserialize)] -pub struct PersistData { - /// Data persisted by the plugin in a previous run - pub persisted_data: Value, - /// Sender of data to be persisted by the plugin for a previous run - pub persist_data: mpsc::Sender, -} - #[derive(Serialize, Deserialize)] pub struct StreamImpl { pub stream: rch::mpsc::Receiver, diff --git a/src/concepts/plugin.rs b/src/concepts/plugin.rs index fa9b7fb..f5331a0 100644 --- a/src/concepts/plugin.rs +++ b/src/concepts/plugin.rs @@ -7,6 +7,7 @@ use std::{ use serde::{Deserialize, Serialize}; use tokio::{ + fs, process::{Child, Command}, runtime::Handle, }; @@ -51,23 +52,19 @@ impl Plugin { if !self.path.starts_with("/") { return Err(format!("plugin paths must be absolute: {}", self.path)); } - - if self.systemd { - self.systemd_setup(); - } Ok(()) } /// Override default options with user-defined options, when defined. - pub fn systemd_setup(&mut self) { - let mut new_options = systemd_default_options(); - while let Some((option, value)) = self.systemd_options.pop_first() { - new_options.insert(option, value); + pub fn systemd_setup(&self, working_directory: &str) -> BTreeMap> { + let mut new_options = systemd_default_options(working_directory); + for (option, value) in self.systemd_options.iter() { + new_options.insert(option.clone(), value.clone()); } - self.systemd_options = new_options; + new_options } - pub async fn launch(&self) -> Result { + pub async fn launch(&self, state_directory: &str) -> Result { // owner check if self.check_root { let path = self.path.clone(); @@ -85,19 +82,26 @@ impl Plugin { } let self_uid = if self.systemd { - // Well well we want to check if we're root - #[allow(unsafe_code)] - unsafe { - nix::libc::geteuid() - } + Some( + // Well well we want to check if we're root + #[allow(unsafe_code)] + unsafe { + nix::libc::geteuid() + }, + ) } else { - 0 + None }; - let mut command = if self.systemd && self_uid == 0 { + // Create plugin working directory (also state directory) + let plugin_working_directory = plugin_working_directory(&self.name, state_directory)?; + fs::create_dir(&plugin_working_directory).await?; + + let mut command = if self_uid.is_some_and(|self_uid| self_uid == 0) { let mut command = Command::new("run0"); // --pipe gives direct, non-emulated stdio access, for better performance. command.arg("--pipe"); + self.systemd_setup(&plugin_working_directory); // run0 options for (option, values) in self.systemd_options.iter() { for value in values.iter() { @@ -110,7 +114,9 @@ impl Plugin { if self.systemd { warn!("Disabling systemd because reaction does not run as root"); } - Command::new(&self.path) + let mut command = Command::new(&self.path); + command.current_dir(plugin_working_directory); + command }; command.arg("serve"); debug!( @@ -126,14 +132,31 @@ impl Plugin { } } +fn plugin_working_directory(plugin_name: &str, state_directory: &str) -> Result { + std::fs::canonicalize(format!("{state_directory}/plugin_data/{plugin_name}")).and_then(|path| { + path.to_str() + .ok_or_else(|| { + Error::new( + ErrorKind::Other, + "state_directory is not UTF-8. please run reaction at an UTF-8 named path", + ) + }) + .map(str::to_owned) + }) +} + // TODO commented options block execution of program, // while developping in my home directory. // Some options may still be useful in production environments. -fn systemd_default_options() -> BTreeMap> { +fn systemd_default_options(working_directory: &str) -> BTreeMap> { BTreeMap::from( [ - // No file access - ("ReadWritePaths", vec![]), + // reaction slice (does nothing if inexistent) + ("Slice", vec!["reaction.slice"]), + // Started in its own directory + ("WorkingDirectory", vec![working_directory]), + // No file access except own directory + ("ReadWritePaths", vec![working_directory]), ("ReadOnlyPaths", vec![]), // ("NoExecPaths", vec!["/"]), ("InaccessiblePaths", vec!["/boot", "/etc"]), diff --git a/src/daemon/plugin/mod.rs b/src/daemon/plugin/mod.rs index 6c2762d..02fe61b 100644 --- a/src/daemon/plugin/mod.rs +++ b/src/daemon/plugin/mod.rs @@ -5,10 +5,10 @@ use std::{ }; use futures::{future::join_all, FutureExt}; -use reaction_plugin::{ActionImpl, Hello, PersistData, PluginInfo, PluginInfoClient, StreamImpl}; -use remoc::{rch::mpsc, Connect}; +use reaction_plugin::{ActionImpl, Hello, PluginInfo, PluginInfoClient, StreamImpl}; +use remoc::Connect; use serde_json::Value; -use tokio::{fs, process::Child, time::sleep}; +use tokio::{process::Child, time::sleep}; use tracing::error; use crate::{ @@ -16,10 +16,6 @@ use crate::{ daemon::{utils::kill_child, ShutdownToken}, }; -mod value; - -use value::to_stable_value; - pub struct PluginManager { child: Child, shutdown: ShutdownToken, @@ -48,7 +44,7 @@ impl PluginManager { shutdown: ShutdownToken, ) -> Result { let mut child = plugin - .launch() + .launch(state_directory) .await .map_err(|err| format!("could not launch plugin: {err}"))?; @@ -76,12 +72,8 @@ impl PluginManager { .map_err(|err| format!("could not retrieve initial information from plugin: {err}"))? .ok_or("could not retrieve initial information from plugin: no data")?; - let persist_data = data_persistence(&plugin.name, state_directory, shutdown.clone()) - .await - .map_err(|err| format!("error while reading plugin {} data: {err}", plugin.name))?; - let manifest = plugin_info - .manifest(persist_data) + .manifest() .await .map_err(|err| format!("error while getting plugin {} manifest: {err}", plugin.name))?; @@ -188,21 +180,16 @@ impl Plugins { stream_type: String, config: Value, ) -> Result { - let plugin_name = self - .streams - .get(&stream_type) - .ok_or(format!("No plugin provided a stream type '{stream_type}'"))?; + let plugin_name = self.streams.get(&stream_type).ok_or(format!( + "No plugin provided the stream type '{stream_type}'" + ))?; let plugin = self.plugins.get_mut(plugin_name).unwrap(); plugin - .stream_impl( - stream_name.into(), - stream_type.into(), - to_stable_value(config), - ) + .stream_impl(stream_name.clone(), stream_type, config) .await - .map_err(|err| format!("plugin error while initializing stream: {err}")) + .map_err(|err| format!("plugin error while initializing stream {stream_name}: {err}")) } pub async fn init_action_impl( @@ -214,24 +201,23 @@ impl Plugins { config: Value, patterns: Vec, ) -> Result { - let plugin_name = self - .actions - .get(&action_type) - .ok_or(format!("No plugin provided a action type '{action_type}'"))?; + let plugin_name = self.actions.get(&action_type).ok_or(format!( + "No plugin provided the action type '{action_type}'" + ))?; let plugin = self.plugins.get_mut(plugin_name).unwrap(); plugin .action_impl( - stream_name.into(), - filter_name.into(), - action_name.into(), - action_type.into(), - to_stable_value(config), + stream_name.clone(), + filter_name.clone(), + action_name.clone(), + action_type, + config, patterns, ) .await - .map_err(|err| format!("plugin error while initializing action: {err}")) + .map_err(|err| format!("plugin error while initializing action {stream_name}.{filter_name}.{action_name}: {err}")) } pub async fn finish_setup(&mut self) -> Result<(), String> { @@ -259,48 +245,3 @@ impl Plugins { } } } - -async fn data_persistence( - plugin_name: &str, - state_directory: &str, - shutdown: ShutdownToken, -) -> Result { - let dir_path = format!("{state_directory}/plugin_data/"); - fs::create_dir_all(&dir_path).await?; - - let file_path = format!("{dir_path}/{plugin_name}.json"); - - let data = if fs::try_exists(&file_path).await? { - let txt = fs::read_to_string(&file_path).await?; - serde_json::from_str::(&txt)? - } else { - Value::Null - }; - - let (tx, mut rx) = mpsc::channel(1); - - tokio::spawn(async move { - loop { - let value = tokio::select! { - _ = shutdown.wait() => break, - value = rx.recv() => value, - }; - if let Ok(Some(value)) = value { - // unwrap: serializing a [`serde_json::Value`] does not fail - let json = serde_json::to_string_pretty(&value).unwrap(); - - if let Err(err) = fs::write(&file_path, json).await { - error!("could not store plugin data at {file_path}: {err}"); - break; - } - } else { - break; - } - } - }); - - Ok(PersistData { - persisted_data: to_stable_value(data), - persist_data: tx, - }) -} diff --git a/src/daemon/plugin/value.rs b/src/daemon/plugin/value.rs deleted file mode 100644 index e62c498..0000000 --- a/src/daemon/plugin/value.rs +++ /dev/null @@ -1,35 +0,0 @@ -use std::collections::BTreeMap; - -use reaction_plugin::Value as RValue; -use serde_json::Value as JValue; - -pub fn to_stable_value(val: JValue) -> RValue { - match val { - JValue::Null => RValue::Null, - JValue::Bool(b) => RValue::Bool(b), - JValue::Number(number) => { - if let Some(number) = number.as_i64() { - RValue::Integer(number) - } else if let Some(number) = number.as_f64() { - RValue::Float(number) - } else { - RValue::Null - } - } - JValue::String(s) => RValue::String(s.into()), - JValue::Array(v) => RValue::Array({ - let mut vec = Vec::with_capacity(v.len()); - for val in v { - vec.push(to_stable_value(val)); - } - vec - }), - JValue::Object(m) => RValue::Object({ - let mut map = BTreeMap::new(); - for (key, val) in m { - map.insert(key.into(), to_stable_value(val)); - } - map - }), - } -} diff --git a/tests/test-conf/cluster-a.jsonnet b/tests/test-conf/cluster-a.jsonnet new file mode 100644 index 0000000..ed36d95 --- /dev/null +++ b/tests/test-conf/cluster-a.jsonnet @@ -0,0 +1,64 @@ +{ + patterns: { + num: { + regex: @"[0-9]+", + }, + all: { + regex: @".*" + } + }, + + plugins: { + cluster: { + path: "./target/debug/reaction-plugin-cluster", + check_root: false, + systemd_options: { + DynamicUser: ["false"], + } + } + }, + + streams: { + s0: { + cmd: ["bash", "-c", "for i in $(seq 4); do echo $i; sleep 0.1; done; sleep 1.2"], + filters: { + f0: { + regex: ["^$"], + actions: { + a0: { + type: "virtual", + options: { + send: "a0 ", + to: "s1", + } + }, + b0: { + type: "cluster", + options: { + send: "b0 ", + to: "s1", + }, + after: "600ms", + }, + }, + }, + }, + }, + s1: { + type: "cluster", + options: { + + }, + filters: { + f1: { + regex: ["^$"], + actions: { + a1: { + cmd: ['sh', '-c', 'echo >>./log'], + }, + }, + }, + }, + }, + }, +}