Cluster plugin init

- Remove PersistData utility
- Provide plugins a state directory instead, by starting them inside.
- Store the secret key as a file inside this directory.
- Use iroh's crate for base64 encoding, thus removing one dependency.
- Implement plugin's stream_impl and action_impl functions,
  creating all necessary data structures.
This commit is contained in:
ppom 2025-10-27 12:00:00 +01:00
commit 124a2827d9
No known key found for this signature in database
14 changed files with 391 additions and 276 deletions

7
Cargo.lock generated
View file

@ -2849,6 +2849,7 @@ version = "1.0.0"
dependencies = [
"remoc",
"serde",
"serde_json",
"tokio",
]
@ -2856,12 +2857,13 @@ dependencies = [
name = "reaction-plugin-cluster"
version = "0.1.0"
dependencies = [
"base64 0.22.1",
"data-encoding",
"iroh",
"rand 0.9.2",
"rand_core 0.9.3",
"reaction-plugin",
"remoc",
"serde",
"serde_json",
"tokio",
]
@ -2871,6 +2873,7 @@ version = "0.1.0"
dependencies = [
"reaction-plugin",
"remoc",
"serde_json",
"tokio",
]

View file

@ -44,7 +44,7 @@ num_cpus = "1.16.0"
regex = "1.10.4"
# Configuration languages, ser/deserialisation
serde.workspace = true
serde_json = "1.0.117"
serde_json.workspace = true
serde_yaml = "0.9.34"
jrsonnet-evaluator = "0.4.2"
# Error macro
@ -80,6 +80,7 @@ members = ["plugins/reaction-plugin", "plugins/reaction-plugin-cluster", "plugin
[workspace.dependencies]
remoc = { version = "0.18.3" }
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.117"
tokio = { version = "1.40.0" }
[[bin]]

14
TODO
View file

@ -1,16 +1,4 @@
Test what happens when a Filter's pattern Set changes (I think it's shitty)
DB: add tests on stress testing (lines should always be in order)
plugins: pipe stderr too and wrap errors in logs
plugins: provide tree storage? omg
questionnements:
- quelle cli pour les plugins ?
- Directement en appelant le plugin ? reaction-plugin-cluster gen-id ? 🟢
→ Demande de savoir où stocker tout ça
- Via moult IPC ? reaction plugin cluster gen-id ? 🔴
→ Mais du coup c'est l'oeuf ou la poule entre avoir un serveur qui fonctionne et avoir un
- Stockage ?
- uniquement dans la db reaction
→ Faut pas que ce soit trop gros, un peu d'overhead, risque de perdre la donnée
- à part dans le configuration directory
→ Pas mal en vrai
plugins: provide treedb storage? omg (add an enum that's either remoc::rch::mpsc or tokio::mpsc)

View file

@ -4,10 +4,14 @@ version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { workspace = true, features = ["rt-multi-thread"] }
remoc.workspace = true
reaction-plugin.path = "../reaction-plugin"
tokio.workspace = true
tokio.features = ["rt-multi-thread"]
remoc.workspace = true
serde.workspace = true
serde_json.workspace = true
data-encoding = "2.9.0"
iroh = "0.94.0"
base64 = "0.22.1"
rand_core = { version = "0.9.3", features = ["os_rng"] }
rand = "0.9.2"

View file

@ -1,12 +1,21 @@
use std::collections::{BTreeMap, BTreeSet};
use base64::{Engine, prelude::BASE64_STANDARD};
use iroh::{SecretKey, defaults};
use reaction_plugin::{
ActionImpl, Hello, Manifest, PersistData, PluginInfo, RemoteResult, StreamImpl, Value,
main_loop,
use std::{
collections::{BTreeMap, BTreeSet},
net::SocketAddr,
};
use remoc::{chmux::SendError, rtc};
use iroh::PublicKey;
use reaction_plugin::{
ActionImpl, Exec, Hello, Manifest, PluginInfo, RemoteResult, StreamImpl, main_loop,
};
use remoc::{rch::mpsc, rtc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::fs;
mod secret_key;
use secret_key::secret_key;
use crate::secret_key::{key_b64_to_bytes, key_bytes_to_b64};
#[tokio::main]
async fn main() {
@ -16,12 +25,63 @@ async fn main() {
#[derive(Default)]
struct Plugin {
data: Option<PersistData>,
streams: BTreeMap<String, StreamInit>,
actions: Vec<ActionInit>,
}
/// Stream options as defined by the user
#[derive(Serialize, Deserialize)]
struct StreamOptions {
/// The secret that permits to join the cluster.
shared_secret: Option<String>,
/// The secret that permits to join the cluster, as a file.
/// Beginning and ending whitespace will be trimmed.
shared_secret_file: Option<String>,
/// Other nodes which are part of the cluster.
nodes: Vec<NodeOption>,
}
/// Stream information before start
struct StreamInit {
shared_secret: String,
nodes: Vec<NodeInit>,
tx: mpsc::Sender<Result<String, String>>,
}
#[derive(Serialize, Deserialize)]
struct NodeOption {
public_key: String,
addresses: Vec<SocketAddr>,
}
#[derive(Serialize, Deserialize)]
struct NodeInit {
public_key: PublicKey,
addresses: Vec<SocketAddr>,
}
#[derive(Serialize, Deserialize)]
struct ActionOptions {
/// The line to send to the corresponding cluster, example: "ban \<ip\>"
send: String,
/// The name of the corresponding cluster, example: "my_cluster_stream"
to: String,
/// Whether the stream of this node also receives the line
#[serde(default, rename = "self")]
self_: bool,
}
#[derive(Serialize, Deserialize)]
struct ActionInit {
send: String,
to: String,
self_: bool,
patterns: Vec<String>,
rx: mpsc::Receiver<Exec>,
}
impl PluginInfo for Plugin {
async fn manifest(&mut self, data: PersistData) -> Result<Manifest, rtc::CallError> {
self.data = Some(data);
async fn manifest(&mut self) -> Result<Manifest, rtc::CallError> {
Ok(Manifest {
hello: Hello::hello(),
streams: BTreeSet::from(["cluster".into()]),
@ -35,24 +95,96 @@ impl PluginInfo for Plugin {
stream_type: String,
config: Value,
) -> RemoteResult<StreamImpl> {
todo!()
if &stream_type != "cluster" {
return Err("This plugin can't handle other stream types than cluster".into());
}
let options: StreamOptions =
serde_json::from_value(config).map_err(|err| format!("invalid options: {err}"))?;
let shared_secret = if let Some(shared_secret) = options.shared_secret {
shared_secret
} else if let Some(shared_secret_file) = &options.shared_secret_file {
fs::read_to_string(shared_secret_file)
.await
.map_err(|err| {
format!("can't access shared_secret_file {shared_secret_file}: {err}")
})?
.trim()
.to_owned()
} else {
return Err("missing shared secret: either shared_secret or shared_secret_file must be provided".into());
};
let mut init_nodes = Vec::default();
for node in options.nodes.into_iter() {
let bytes = key_b64_to_bytes(&node.public_key)
.map_err(|err| format!("invalid public key {}: {err}", node.public_key))?;
let public_key = PublicKey::from_bytes(&bytes)
.map_err(|err| format!("invalid public key {}: {err}", node.public_key))?;
init_nodes.push(NodeInit {
public_key,
addresses: node.addresses,
});
}
let (tx, rx) = mpsc::channel(1);
let stream = StreamInit {
shared_secret,
nodes: init_nodes,
tx,
};
if let Some(_) = self.streams.insert(stream_name, stream) {
return Err("this virtual stream has already been initialized".into());
}
Ok(StreamImpl {
stream: rx,
standalone: true,
})
}
async fn action_impl(
&mut self,
stream_name: String,
filter_name: String,
action_name: String,
_stream_name: String,
_filter_name: String,
_action_name: String,
action_type: String,
config: Value,
patterns: Vec<String>,
) -> RemoteResult<ActionImpl> {
todo!()
if &action_type != "cluster" {
return Err("This plugin can't handle other action types than cluster".into());
}
let options: ActionOptions =
serde_json::from_value(config).map_err(|err| format!("invalid options: {err}"))?;
let (tx, rx) = mpsc::channel(1);
let init_action = ActionInit {
send: options.send,
to: options.to,
self_: options.self_,
patterns,
rx,
};
self.actions.push(init_action);
Ok(ActionImpl { tx })
}
async fn finish_setup(&mut self) -> RemoteResult<()> {
let data = self.data.as_mut().unwrap();
let secret_key = secret_key(data).await;
let secret_key = secret_key().await?;
eprintln!(
"public key of this node: {}",
key_bytes_to_b64(secret_key.public().as_bytes())
);
todo!()
}
@ -60,47 +192,3 @@ impl PluginInfo for Plugin {
todo!()
}
}
async fn secret_key(data: &mut PersistData) -> SecretKey {
if let Some(key) = get_secret_key(data) {
key
} else {
let key = SecretKey::generate(&mut rand::rng());
set_secret_key(data, &key).await;
key
}
}
fn get_secret_key(data: &PersistData) -> Option<SecretKey> {
match &data.persisted_data {
Value::Object(map) => map.get("secret_key").and_then(|value| {
if let Value::String(str) = value {
let vec = BASE64_STANDARD.decode(str).ok()?;
if vec.len() != 32 {
return None;
}
let mut bytes = [0u8; 32];
for i in 0..32 {
bytes[i] = vec[i];
}
Some(SecretKey::from_bytes(&bytes))
} else {
None
}
}),
_ => None,
}
}
async fn set_secret_key(data: &mut PersistData, key: &SecretKey) {
let mut current = match &data.persisted_data {
Value::Object(map) => map.clone(),
_ => BTreeMap::default(),
};
let base64 = BASE64_STANDARD.encode(key.to_bytes());
current.insert("secret_key".into(), Value::String(base64));
data.persist_data
.send(Value::Object(current))
.await
.unwrap();
}

View file

@ -0,0 +1,70 @@
use std::io;
use data_encoding::DecodeError;
use iroh::SecretKey;
use tokio::{
fs::{self, File},
io::AsyncWriteExt,
};
const SECRET_KEY_PATH: &str = "./secret_key.txt";
pub async fn secret_key() -> Result<SecretKey, String> {
if let Some(key) = get_secret_key().await? {
Ok(key)
} else {
let key = SecretKey::generate(&mut rand::rng());
set_secret_key(&key).await?;
Ok(key)
}
}
async fn get_secret_key() -> Result<Option<SecretKey>, String> {
let key = match fs::read_to_string(SECRET_KEY_PATH).await {
Ok(key) => Ok(key),
Err(err) => match err.kind() {
io::ErrorKind::NotFound => return Ok(None),
_ => Err(format!("can't read secret key file: {err}")),
},
}?;
let bytes = match key_b64_to_bytes(&key) {
Ok(key) => Ok(key),
Err(err) => Err(format!(
"invalid secret key read from file: {err}. Please remove the `{SECRET_KEY_PATH}` file from plugin directory."
)),
}?;
Ok(Some(SecretKey::from_bytes(&bytes)))
}
async fn set_secret_key(key: &SecretKey) -> Result<(), String> {
let secret_key = key_bytes_to_b64(&key.to_bytes());
File::options()
.mode(0o600)
.write(true)
.create(true)
.open(SECRET_KEY_PATH)
.await
.map_err(|err| format!("can't open `{SECRET_KEY_PATH}` in plugin directory: {err}"))?
.write_all(secret_key.as_bytes())
.await
.map_err(|err| format!("can't write to `{SECRET_KEY_PATH}` in plugin directory: {err}"))
}
pub fn key_b64_to_bytes(key: &str) -> Result<[u8; 32], DecodeError> {
let vec = data_encoding::BASE64URL.decode(key.as_bytes())?;
if vec.len() != 32 {
return Err(DecodeError {
position: vec.len(),
kind: data_encoding::DecodeKind::Length,
});
}
let mut bytes = [0u8; 32];
for i in 0..32 {
bytes[i] = vec[i];
}
Ok(bytes)
}
pub fn key_bytes_to_b64(key: &[u8; 32]) -> String {
data_encoding::BASE64URL.encode(key)
}

View file

@ -7,3 +7,4 @@ edition = "2024"
tokio = { workspace = true, features = ["rt-multi-thread"] }
remoc.workspace = true
reaction-plugin.path = "../reaction-plugin"
serde_json.workspace = true

View file

@ -1,10 +1,10 @@
use std::collections::{BTreeMap, BTreeSet};
use reaction_plugin::{
ActionImpl, Exec, Hello, Line, Manifest, PersistData, PluginInfo, RemoteResult, StreamImpl,
Value,
ActionImpl, Exec, Hello, Line, Manifest, PluginInfo, RemoteResult, StreamImpl, Value,
};
use remoc::{rch::mpsc, rtc};
use serde::{Deserialize, Serialize};
#[tokio::main]
async fn main() {
@ -19,7 +19,7 @@ struct Plugin {
}
impl PluginInfo for Plugin {
async fn manifest(&mut self, _data: PersistData) -> Result<Manifest, rtc::CallError> {
async fn manifest(&mut self) -> Result<Manifest, rtc::CallError> {
Ok(Manifest {
hello: Hello::hello(),
streams: BTreeSet::from(["virtual".into()]),
@ -59,7 +59,7 @@ impl PluginInfo for Plugin {
patterns: Vec<String>,
) -> RemoteResult<ActionImpl> {
if &action_type != "virtual" {
return Err("This plugin can't handle other stream types than virtual".into());
return Err("This plugin can't handle other action types than virtual".into());
}
let (virtual_action_init, tx) =
@ -126,6 +126,14 @@ impl VirtualStream {
}
}
#[derive(Serialize, Deserialize)]
struct ActionOptions {
/// The line to send to the corresponding virtual stream, example: "ban \<ip\>"
send: String,
/// The name of the corresponding virtual stream, example: "my_stream"
to: String,
}
struct VirtualActionInit {
stream_name: String,
filter_name: String,
@ -144,30 +152,9 @@ impl VirtualActionInit {
config: Value,
patterns: Vec<String>,
) -> Result<(Self, mpsc::Sender<Exec>), String> {
let send;
let to;
match config {
Value::Object(mut map) => {
send = match map.remove("send") {
Some(Value::String(value)) => value,
_ => return Err("`send` must be a string to send to the corresponding virtual stream, example: \"ban <ip>\"".into()),
};
to = match map.remove("to") {
Some(Value::String(value)) => value,
_ => return Err("`to` must be the name of the corresponding virtual stream, example: \"my_stream\"".into()),
};
if map.len() != 0 {
return Err(
"actions of type virtual accept only `send` and `to` options".into(),
);
}
}
_ => {
return Err("actions of type virtual require `send` and `to` options".into());
}
}
let options: ActionOptions = serde_json::from_value(config).map_err(|err| {
format!("invalid options for action {stream_name}.{filter_name}.{action_name}: {err}")
})?;
let patterns = patterns
.into_iter()
@ -182,8 +169,8 @@ impl VirtualActionInit {
action_name,
rx,
patterns,
send,
to,
send: options.send,
to: options.to,
},
tx,
))

View file

@ -5,5 +5,10 @@ edition = "2024"
[dependencies]
remoc.workspace = true
serde.workspace = true
tokio = { workspace = true, features = ["io-std"] }
serde_json.workspace = true
tokio.workspace = true
tokio.features = ["io-std"]

View file

@ -35,24 +35,22 @@
//! ```
//! This can be useful if you want to provide CLI functionnality to your users.
//!
//! It will be run in its own directory, in which it should have write access.
//!
//! ## Examples
//!
//! Core plugins can be found here: <https://framagit.org/ppom/reaction/-/tree/main/plugins>
//! The "virtual" plugin is the simplest and can serve as a template.
//! You'll have to adjust dependencies versions in `Cargo.toml`.
use std::{
collections::{BTreeMap, BTreeSet},
error::Error,
fmt::Display,
};
use std::{collections::BTreeSet, error::Error, fmt::Display};
use remoc::{
Connect,
rch::{self, mpsc},
Connect, rch,
rtc::{self, Server},
};
use serde::{Deserialize, Serialize};
pub use serde_json::Value;
use tokio::io::{stdin, stdout};
/// This is the only trait that **must** be implemented by a plugin.
@ -60,7 +58,7 @@ use tokio::io::{stdin, stdout};
#[rtc::remote]
pub trait PluginInfo {
/// Return the manifest of the plugin.
async fn manifest(&mut self, data: PersistData) -> Result<Manifest, rtc::CallError>;
async fn manifest(&mut self) -> Result<Manifest, rtc::CallError>;
/// Return one stream of a given type if it exists
async fn stream_impl(
@ -159,29 +157,6 @@ impl Hello {
}
}
/// Represents a configuration value.
/// This is not meant as an efficient type, but as a very flexible one.
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum Value {
Null,
Bool(bool),
Integer(i64),
Float(f64),
String(String),
Array(Vec<Value>),
Object(BTreeMap<String, Value>),
}
/// Data persisted by reaction for the plugin.
/// This is persisted as a single JSON file by reaction, so it is not suitable for big sizes of data.
#[derive(Serialize, Deserialize)]
pub struct PersistData {
/// Data persisted by the plugin in a previous run
pub persisted_data: Value,
/// Sender of data to be persisted by the plugin for a previous run
pub persist_data: mpsc::Sender<Value>,
}
#[derive(Serialize, Deserialize)]
pub struct StreamImpl {
pub stream: rch::mpsc::Receiver<Line>,

View file

@ -7,6 +7,7 @@ use std::{
use serde::{Deserialize, Serialize};
use tokio::{
fs,
process::{Child, Command},
runtime::Handle,
};
@ -51,23 +52,19 @@ impl Plugin {
if !self.path.starts_with("/") {
return Err(format!("plugin paths must be absolute: {}", self.path));
}
if self.systemd {
self.systemd_setup();
}
Ok(())
}
/// Override default options with user-defined options, when defined.
pub fn systemd_setup(&mut self) {
let mut new_options = systemd_default_options();
while let Some((option, value)) = self.systemd_options.pop_first() {
new_options.insert(option, value);
pub fn systemd_setup(&self, working_directory: &str) -> BTreeMap<String, Vec<String>> {
let mut new_options = systemd_default_options(working_directory);
for (option, value) in self.systemd_options.iter() {
new_options.insert(option.clone(), value.clone());
}
self.systemd_options = new_options;
new_options
}
pub async fn launch(&self) -> Result<Child, std::io::Error> {
pub async fn launch(&self, state_directory: &str) -> Result<Child, std::io::Error> {
// owner check
if self.check_root {
let path = self.path.clone();
@ -85,19 +82,26 @@ impl Plugin {
}
let self_uid = if self.systemd {
// Well well we want to check if we're root
#[allow(unsafe_code)]
unsafe {
nix::libc::geteuid()
}
Some(
// Well well we want to check if we're root
#[allow(unsafe_code)]
unsafe {
nix::libc::geteuid()
},
)
} else {
0
None
};
let mut command = if self.systemd && self_uid == 0 {
// Create plugin working directory (also state directory)
let plugin_working_directory = plugin_working_directory(&self.name, state_directory)?;
fs::create_dir(&plugin_working_directory).await?;
let mut command = if self_uid.is_some_and(|self_uid| self_uid == 0) {
let mut command = Command::new("run0");
// --pipe gives direct, non-emulated stdio access, for better performance.
command.arg("--pipe");
self.systemd_setup(&plugin_working_directory);
// run0 options
for (option, values) in self.systemd_options.iter() {
for value in values.iter() {
@ -110,7 +114,9 @@ impl Plugin {
if self.systemd {
warn!("Disabling systemd because reaction does not run as root");
}
Command::new(&self.path)
let mut command = Command::new(&self.path);
command.current_dir(plugin_working_directory);
command
};
command.arg("serve");
debug!(
@ -126,14 +132,31 @@ impl Plugin {
}
}
fn plugin_working_directory(plugin_name: &str, state_directory: &str) -> Result<String, Error> {
std::fs::canonicalize(format!("{state_directory}/plugin_data/{plugin_name}")).and_then(|path| {
path.to_str()
.ok_or_else(|| {
Error::new(
ErrorKind::Other,
"state_directory is not UTF-8. please run reaction at an UTF-8 named path",
)
})
.map(str::to_owned)
})
}
// TODO commented options block execution of program,
// while developping in my home directory.
// Some options may still be useful in production environments.
fn systemd_default_options() -> BTreeMap<String, Vec<String>> {
fn systemd_default_options(working_directory: &str) -> BTreeMap<String, Vec<String>> {
BTreeMap::from(
[
// No file access
("ReadWritePaths", vec![]),
// reaction slice (does nothing if inexistent)
("Slice", vec!["reaction.slice"]),
// Started in its own directory
("WorkingDirectory", vec![working_directory]),
// No file access except own directory
("ReadWritePaths", vec![working_directory]),
("ReadOnlyPaths", vec![]),
// ("NoExecPaths", vec!["/"]),
("InaccessiblePaths", vec!["/boot", "/etc"]),

View file

@ -5,10 +5,10 @@ use std::{
};
use futures::{future::join_all, FutureExt};
use reaction_plugin::{ActionImpl, Hello, PersistData, PluginInfo, PluginInfoClient, StreamImpl};
use remoc::{rch::mpsc, Connect};
use reaction_plugin::{ActionImpl, Hello, PluginInfo, PluginInfoClient, StreamImpl};
use remoc::Connect;
use serde_json::Value;
use tokio::{fs, process::Child, time::sleep};
use tokio::{process::Child, time::sleep};
use tracing::error;
use crate::{
@ -16,10 +16,6 @@ use crate::{
daemon::{utils::kill_child, ShutdownToken},
};
mod value;
use value::to_stable_value;
pub struct PluginManager {
child: Child,
shutdown: ShutdownToken,
@ -48,7 +44,7 @@ impl PluginManager {
shutdown: ShutdownToken,
) -> Result<Self, String> {
let mut child = plugin
.launch()
.launch(state_directory)
.await
.map_err(|err| format!("could not launch plugin: {err}"))?;
@ -76,12 +72,8 @@ impl PluginManager {
.map_err(|err| format!("could not retrieve initial information from plugin: {err}"))?
.ok_or("could not retrieve initial information from plugin: no data")?;
let persist_data = data_persistence(&plugin.name, state_directory, shutdown.clone())
.await
.map_err(|err| format!("error while reading plugin {} data: {err}", plugin.name))?;
let manifest = plugin_info
.manifest(persist_data)
.manifest()
.await
.map_err(|err| format!("error while getting plugin {} manifest: {err}", plugin.name))?;
@ -188,21 +180,16 @@ impl Plugins {
stream_type: String,
config: Value,
) -> Result<StreamImpl, String> {
let plugin_name = self
.streams
.get(&stream_type)
.ok_or(format!("No plugin provided a stream type '{stream_type}'"))?;
let plugin_name = self.streams.get(&stream_type).ok_or(format!(
"No plugin provided the stream type '{stream_type}'"
))?;
let plugin = self.plugins.get_mut(plugin_name).unwrap();
plugin
.stream_impl(
stream_name.into(),
stream_type.into(),
to_stable_value(config),
)
.stream_impl(stream_name.clone(), stream_type, config)
.await
.map_err(|err| format!("plugin error while initializing stream: {err}"))
.map_err(|err| format!("plugin error while initializing stream {stream_name}: {err}"))
}
pub async fn init_action_impl(
@ -214,24 +201,23 @@ impl Plugins {
config: Value,
patterns: Vec<String>,
) -> Result<ActionImpl, String> {
let plugin_name = self
.actions
.get(&action_type)
.ok_or(format!("No plugin provided a action type '{action_type}'"))?;
let plugin_name = self.actions.get(&action_type).ok_or(format!(
"No plugin provided the action type '{action_type}'"
))?;
let plugin = self.plugins.get_mut(plugin_name).unwrap();
plugin
.action_impl(
stream_name.into(),
filter_name.into(),
action_name.into(),
action_type.into(),
to_stable_value(config),
stream_name.clone(),
filter_name.clone(),
action_name.clone(),
action_type,
config,
patterns,
)
.await
.map_err(|err| format!("plugin error while initializing action: {err}"))
.map_err(|err| format!("plugin error while initializing action {stream_name}.{filter_name}.{action_name}: {err}"))
}
pub async fn finish_setup(&mut self) -> Result<(), String> {
@ -259,48 +245,3 @@ impl Plugins {
}
}
}
async fn data_persistence(
plugin_name: &str,
state_directory: &str,
shutdown: ShutdownToken,
) -> Result<PersistData, std::io::Error> {
let dir_path = format!("{state_directory}/plugin_data/");
fs::create_dir_all(&dir_path).await?;
let file_path = format!("{dir_path}/{plugin_name}.json");
let data = if fs::try_exists(&file_path).await? {
let txt = fs::read_to_string(&file_path).await?;
serde_json::from_str::<Value>(&txt)?
} else {
Value::Null
};
let (tx, mut rx) = mpsc::channel(1);
tokio::spawn(async move {
loop {
let value = tokio::select! {
_ = shutdown.wait() => break,
value = rx.recv() => value,
};
if let Ok(Some(value)) = value {
// unwrap: serializing a [`serde_json::Value`] does not fail
let json = serde_json::to_string_pretty(&value).unwrap();
if let Err(err) = fs::write(&file_path, json).await {
error!("could not store plugin data at {file_path}: {err}");
break;
}
} else {
break;
}
}
});
Ok(PersistData {
persisted_data: to_stable_value(data),
persist_data: tx,
})
}

View file

@ -1,35 +0,0 @@
use std::collections::BTreeMap;
use reaction_plugin::Value as RValue;
use serde_json::Value as JValue;
pub fn to_stable_value(val: JValue) -> RValue {
match val {
JValue::Null => RValue::Null,
JValue::Bool(b) => RValue::Bool(b),
JValue::Number(number) => {
if let Some(number) = number.as_i64() {
RValue::Integer(number)
} else if let Some(number) = number.as_f64() {
RValue::Float(number)
} else {
RValue::Null
}
}
JValue::String(s) => RValue::String(s.into()),
JValue::Array(v) => RValue::Array({
let mut vec = Vec::with_capacity(v.len());
for val in v {
vec.push(to_stable_value(val));
}
vec
}),
JValue::Object(m) => RValue::Object({
let mut map = BTreeMap::new();
for (key, val) in m {
map.insert(key.into(), to_stable_value(val));
}
map
}),
}
}

View file

@ -0,0 +1,64 @@
{
patterns: {
num: {
regex: @"[0-9]+",
},
all: {
regex: @".*"
}
},
plugins: {
cluster: {
path: "./target/debug/reaction-plugin-cluster",
check_root: false,
systemd_options: {
DynamicUser: ["false"],
}
}
},
streams: {
s0: {
cmd: ["bash", "-c", "for i in $(seq 4); do echo $i; sleep 0.1; done; sleep 1.2"],
filters: {
f0: {
regex: ["^<num>$"],
actions: {
a0: {
type: "virtual",
options: {
send: "a0 <num>",
to: "s1",
}
},
b0: {
type: "cluster",
options: {
send: "b0 <num>",
to: "s1",
},
after: "600ms",
},
},
},
},
},
s1: {
type: "cluster",
options: {
},
filters: {
f1: {
regex: ["^<all>$"],
actions: {
a1: {
cmd: ['sh', '-c', 'echo <all> >>./log'],
},
},
},
},
},
},
}