cluster: adapt to plugin interface change

This commit is contained in:
ppom 2026-02-09 12:00:00 +01:00
commit ae28cfbb31
No known key found for this signature in database
4 changed files with 316 additions and 273 deletions

View file

@ -7,8 +7,8 @@ use std::{
use iroh::{EndpointAddr, PublicKey, SecretKey, TransportAddr};
use reaction_plugin::{
ActionImpl, Exec, Hello, Line, Manifest, PluginInfo, RemoteResult, StreamImpl, Value,
line::PatternLine, main_loop, shutdown::ShutdownController, time::parse_duration,
ActionConfig, ActionImpl, Exec, Hello, Line, Manifest, PluginInfo, RemoteResult, StreamConfig,
StreamImpl, line::PatternLine, main_loop, shutdown::ShutdownController, time::parse_duration,
};
use remoc::{rch::mpsc, rtc};
use serde::{Deserialize, Serialize};
@ -32,8 +32,7 @@ async fn main() {
#[derive(Default)]
struct Plugin {
streams: BTreeMap<String, StreamInit>,
actions: BTreeMap<String, Vec<ActionInit>>,
init: BTreeMap<String, (StreamInit, Vec<ActionInit>)>,
cluster_shutdown: ShutdownController,
}
@ -108,118 +107,136 @@ impl PluginInfo for Plugin {
actions: BTreeSet::from(["cluster_send".into()]),
})
}
async fn stream_impl(
async fn load_config(
&mut self,
stream_name: String,
stream_type: String,
config: Value,
) -> RemoteResult<StreamImpl> {
if &stream_type != "cluster" {
return Err("This plugin can't handle other stream types than cluster".into());
}
streams: Vec<StreamConfig>,
actions: Vec<ActionConfig>,
) -> RemoteResult<(Vec<StreamImpl>, Vec<ActionImpl>)> {
let mut ret_streams = Vec::with_capacity(streams.len());
let mut ret_actions = Vec::with_capacity(actions.len());
let options: StreamOptions = serde_json::from_value(config.into())
.map_err(|err| format!("invalid options: {err}"))?;
for StreamConfig {
stream_name,
stream_type,
config,
} in streams
{
if &stream_type != "cluster" {
return Err("This plugin can't handle other stream types than cluster".into());
}
let mut nodes = BTreeMap::default();
let options: StreamOptions = serde_json::from_value(config.into())
.map_err(|err| format!("invalid options: {err}"))?;
let message_timeout = parse_duration(&options.message_timeout)
.map_err(|err| format!("invalid message_timeout: {err}"))?;
let mut nodes = BTreeMap::default();
if options.bind_ipv4.is_none() && options.bind_ipv6.is_none() {
Err(
"At least one of bind_ipv4 and bind_ipv6 must be enabled. Unset at least one of them or set at least one of them to an IP.",
)?;
}
let message_timeout = parse_duration(&options.message_timeout)
.map_err(|err| format!("invalid message_timeout: {err}"))?;
if options.nodes.is_empty() {
Err("At least one remote node has to be configured for a cluster")?;
}
if options.bind_ipv4.is_none() && options.bind_ipv6.is_none() {
Err(
"At least one of bind_ipv4 and bind_ipv6 must be enabled. Unset at least one of them or set at least one of them to an IP.",
)?;
}
for node in options.nodes.into_iter() {
let bytes = key::key_b64_to_bytes(&node.public_key)
.map_err(|err| format!("invalid public key {}: {err}", node.public_key))?;
if options.nodes.is_empty() {
Err("At least one remote node has to be configured for a cluster")?;
}
let public_key = PublicKey::from_bytes(&bytes)
.map_err(|err| format!("invalid public key {}: {err}", node.public_key))?;
for node in options.nodes.into_iter() {
let bytes = key::key_b64_to_bytes(&node.public_key)
.map_err(|err| format!("invalid public key {}: {err}", node.public_key))?;
nodes.insert(
public_key,
EndpointAddr {
id: public_key,
addrs: node
.addresses
.into_iter()
.map(|addr| TransportAddr::Ip(addr))
.collect(),
},
let public_key = PublicKey::from_bytes(&bytes)
.map_err(|err| format!("invalid public key {}: {err}", node.public_key))?;
nodes.insert(
public_key,
EndpointAddr {
id: public_key,
addrs: node
.addresses
.into_iter()
.map(|addr| TransportAddr::Ip(addr))
.collect(),
},
);
}
let secret_key = key::secret_key(".", &stream_name).await?;
eprintln!(
"INFO public key of this node for cluster {stream_name}: {}",
secret_key.public().show()
);
let (tx, rx) = mpsc::channel(1);
let stream = StreamInit {
name: stream_name.clone(),
listen_port: options.listen_port,
bind_ipv4: options.bind_ipv4,
bind_ipv6: options.bind_ipv6,
secret_key,
message_timeout,
nodes,
tx,
};
if let Some(_) = self.init.insert(stream_name, (stream, vec![])) {
return Err("this virtual stream has already been initialized".into());
}
ret_streams.push(StreamImpl {
stream: rx,
standalone: true,
})
}
let secret_key = key::secret_key(".", &stream_name).await?;
eprintln!(
"INFO public key of this node for cluster {stream_name}: {}",
secret_key.public().show()
);
for ActionConfig {
stream_name,
filter_name,
action_name,
action_type,
config,
patterns,
} in actions
{
if &action_type != "cluster_send" {
return Err(
"This plugin can't handle other action types than 'cluster_send'".into(),
);
}
let (tx, rx) = mpsc::channel(1);
let options: ActionOptions = serde_json::from_value(config.into())
.map_err(|err| format!("invalid options: {err}"))?;
let stream = StreamInit {
name: stream_name.clone(),
listen_port: options.listen_port,
bind_ipv4: options.bind_ipv4,
bind_ipv6: options.bind_ipv6,
secret_key,
message_timeout,
nodes,
tx,
};
let (tx, rx) = mpsc::channel(1);
if let Some(_) = self.streams.insert(stream_name, stream) {
return Err("this virtual stream has already been initialized".into());
let init_action = ActionInit {
name: format!("{}.{}.{}", stream_name, filter_name, action_name),
send: PatternLine::new(options.send, patterns),
self_: options.self_,
rx,
};
match self.init.get_mut(&options.to) {
Some((_, actions)) => actions.push(init_action),
None => {
return Err(format!(
"ERROR action '{}' sends 'to' unknown stream '{}'",
init_action.name, options.to
)
.into());
}
}
ret_actions.push(ActionImpl { tx })
}
Ok(StreamImpl {
stream: rx,
standalone: true,
})
Ok((ret_streams, ret_actions))
}
async fn action_impl(
&mut self,
stream_name: String,
filter_name: String,
action_name: String,
action_type: String,
config: Value,
patterns: Vec<String>,
) -> RemoteResult<ActionImpl> {
if &action_type != "cluster_send" {
return Err("This plugin can't handle other action types than 'cluster_send'".into());
}
let options: ActionOptions = serde_json::from_value(config.into())
.map_err(|err| format!("invalid options: {err}"))?;
let (tx, rx) = mpsc::channel(1);
let init_action = ActionInit {
name: format!("{}.{}.{}", stream_name, filter_name, action_name),
send: PatternLine::new(options.send, patterns),
self_: options.self_,
rx,
};
self.actions
.entry(options.to)
.or_default()
.push(init_action);
Ok(ActionImpl { tx })
}
async fn finish_setup(&mut self) -> RemoteResult<()> {
async fn start(&mut self) -> RemoteResult<()> {
let mut db = {
let path = PathBuf::from(".");
let (cancellation_token, task_tracker_token) = self.cluster_shutdown.token().split();
@ -228,33 +245,20 @@ impl PluginInfo for Plugin {
.map_err(|err| format!("Can't open database: {err}"))?
};
while let Some((stream_name, stream)) = self.streams.pop_first() {
while let Some((_, (stream, actions))) = self.init.pop_first() {
let endpoint = cluster::bind(&stream).await?;
cluster::cluster_tasks(
endpoint,
stream,
self.actions.remove(&stream_name).unwrap_or_default(),
actions,
&mut db,
self.cluster_shutdown.clone(),
)
.await?;
}
// Check there is no action left
if !self.actions.is_empty() {
for (to, actions) in &self.actions {
for action in actions {
eprintln!(
"ERROR action '{}' sends 'to' unknown stream '{}'",
action.name, to
);
}
}
return Err("at least one cluster_send action has unknown 'to'".into());
}
// Free containers
self.actions = Default::default();
self.streams = Default::default();
eprintln!("DEBUG finished setup.");
self.init = Default::default();
eprintln!("DEBUG started");
Ok(())
}

View file

@ -1,12 +1,12 @@
use std::env::set_current_dir;
use assert_fs::TempDir;
use reaction_plugin::PluginInfo;
use reaction_plugin::{ActionConfig, PluginInfo, StreamConfig};
use serde_json::json;
use crate::{Plugin, tests::insert_secret_key};
use super::{PUBLIC_KEY_A, TEST_MUTEX, stream_ok, stream_ok_port};
use super::{PUBLIC_KEY_A, TEST_MUTEX, stream_ok};
#[tokio::test]
async fn conf_stream() {
@ -18,7 +18,14 @@ async fn conf_stream() {
// Invalid type
assert!(
Plugin::default()
.stream_impl("stream".into(), "clust".into(), stream_ok().into(),)
.load_config(
vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "clust".into(),
config: stream_ok().into(),
}],
vec![]
)
.await
.is_err()
);
@ -102,7 +109,14 @@ async fn conf_stream() {
] {
assert!(is_ok(
&Plugin::default()
.stream_impl("stream".into(), "cluster".into(), json.into())
.load_config(
vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "cluster".into(),
config: json.into(),
}],
vec![]
)
.await
));
}
@ -115,17 +129,24 @@ async fn conf_action() {
// Invalid type
assert!(
Plugin::default()
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"cluster_sen".into(),
json!({
"send": "<p1>",
"to": "stream",
})
.into(),
patterns.clone(),
.load_config(
vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "cluster".into(),
config: stream_ok().into(),
}],
vec![ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action".into(),
action_type: "cluster_sen".into(),
config: json!({
"send": "<p1>",
"to": "stream",
})
.into(),
patterns: patterns.clone(),
}]
)
.await
.is_err()
@ -134,64 +155,79 @@ async fn conf_action() {
for (json, is_ok) in [
(
json!({
"send": "<p1>",
"to": "stream",
"send": "<p1>",
"to": "stream",
}),
Result::is_ok as fn(&_) -> bool,
true,
),
(
json!({
"send": "<p1>",
"to": "stream",
"self": true,
"send": "<p1>",
"to": "stream",
"self": true,
}),
Result::is_ok,
true,
),
(
json!({
"send": "<p1>",
"to": "stream",
"self": false,
"send": "<p1>",
"to": "stream",
"self": false,
}),
Result::is_ok,
true,
),
(
// missing to
json!({
"send": "<p1>",
"send": "<p1>",
}),
Result::is_err,
false,
),
(
// missing send
json!({
"to": "stream",
"to": "stream",
}),
Result::is_err,
false,
),
(
// invalid self
json!({
"send": "<p1>",
"to": "stream",
"self": "true",
"send": "<p1>",
"to": "stream",
"self": "true",
}),
Result::is_err,
false,
),
(
// missing conf
json!({}),
false,
),
(json!({}), Result::is_err),
] {
assert!(is_ok(
&Plugin::default()
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"cluster_send".into(),
json.into(),
patterns.clone(),
)
.await
));
let ret = Plugin::default()
.load_config(
vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "cluster".into(),
config: stream_ok().into(),
}],
vec![ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action".into(),
action_type: "cluster_send".into(),
config: json.clone().into(),
patterns: patterns.clone(),
}],
)
.await;
assert!(
ret.is_ok() == is_ok,
"is_ok: {is_ok}, ret: {:?}, action conf: {json:?}",
ret.map(|_| ())
);
}
}
@ -203,62 +239,55 @@ async fn conf_send() {
insert_secret_key().await;
// No action is ok
let mut plugin = Plugin::default();
plugin
.stream_impl(
"stream".into(),
"cluster".into(),
stream_ok_port(2051).into(),
let res = Plugin::default()
.load_config(
vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "cluster".into(),
config: stream_ok().into(),
}],
vec![],
)
.await
.unwrap();
let res = plugin.finish_setup().await;
eprintln!("{res:?}");
assert!(res.is_ok());
.await;
assert!(res.is_ok(), "{:?}", res.map(|_| ()));
// An action is ok
let mut plugin = Plugin::default();
plugin
.stream_impl(
"stream".into(),
"cluster".into(),
stream_ok_port(2052).into(),
let res = Plugin::default()
.load_config(
vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "cluster".into(),
config: stream_ok().into(),
}],
vec![ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action".into(),
action_type: "cluster_send".into(),
config: json!({ "send": "message", "to": "stream" }).into(),
patterns: vec![],
}],
)
.await
.unwrap();
plugin
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"cluster_send".into(),
json!({ "send": "message", "to": "stream" }).into(),
Vec::default(),
)
.await
.unwrap();
assert!(plugin.finish_setup().await.is_ok());
.await;
assert!(res.is_ok(), "{:?}", res.map(|_| ()));
// Invalid to: option
let mut plugin = Plugin::default();
plugin
.stream_impl(
"stream".into(),
"cluster".into(),
stream_ok_port(2053).into(),
let res = Plugin::default()
.load_config(
vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "cluster".into(),
config: stream_ok().into(),
}],
vec![ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action".into(),
action_type: "cluster_send".into(),
config: json!({ "send": "message", "to": "stream1" }).into(),
patterns: vec![],
}],
)
.await
.unwrap();
plugin
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"cluster_send".into(),
json!({ "send": "message", "to": "stream1" }).into(),
Vec::default(),
)
.await
.unwrap();
assert!(plugin.finish_setup().await.is_err());
.await;
assert!(res.is_err(), "{:?}", res.map(|_| ()));
}

View file

@ -1,7 +1,7 @@
use std::{env::set_current_dir, time::Duration};
use assert_fs::TempDir;
use reaction_plugin::{ActionImpl, Exec, PluginInfo, StreamImpl};
use reaction_plugin::{ActionConfig, Exec, PluginInfo, StreamConfig};
use serde_json::json;
use tokio::{fs, time::timeout};
use treedb::time::now;
@ -98,11 +98,10 @@ const POOL: [TestNode; 15] = [
];
async fn stream_action(
plugin: &mut Plugin,
name: &str,
index: usize,
nodes: &[TestNode],
) -> (StreamImpl, ActionImpl) {
) -> (StreamConfig, ActionConfig) {
let stream_name = format!("stream_{name}");
let this_node = &nodes[index];
let other_nodes: Vec<_> = nodes
@ -120,37 +119,30 @@ async fn stream_action(
.await
.unwrap();
let stream = plugin
.stream_impl(
stream_name.clone(),
"cluster".into(),
json!({
(
StreamConfig {
stream_name: stream_name.clone(),
stream_type: "cluster".into(),
config: json!({
"message_timeout": "30s",
"listen_port": this_node.port,
"nodes": other_nodes,
})
.into(),
)
.await
.unwrap();
let action = plugin
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"cluster_send".into(),
json!({
},
ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action".into(),
action_type: "cluster_send".into(),
config: json!({
"send": format!("from {name}: <test>"),
"to": stream_name,
})
.into(),
vec!["test".into()],
)
.await
.unwrap();
(stream, action)
patterns: vec!["test".into()],
},
)
}
#[tokio::test]
@ -162,19 +154,33 @@ async fn two_nodes_simultaneous_startup() {
let ((mut stream_a, action_a), (mut stream_b, action_b)) = if separate_plugin {
let mut plugin_a = Plugin::default();
let a = stream_action(&mut plugin_a, "a", 0, &POOL[0..2]).await;
plugin_a.finish_setup().await.unwrap();
let (sa, aa) = stream_action("a", 0, &POOL[0..2]).await;
let (mut streams_a, mut actions_a) =
plugin_a.load_config(vec![sa], vec![aa]).await.unwrap();
plugin_a.start().await.unwrap();
let mut plugin_b = Plugin::default();
let b = stream_action(&mut plugin_b, "b", 1, &POOL[0..2]).await;
plugin_b.finish_setup().await.unwrap();
(a, b)
let (sb, ab) = stream_action("b", 1, &POOL[0..2]).await;
let (mut streams_b, mut actions_b) =
plugin_b.load_config(vec![sb], vec![ab]).await.unwrap();
plugin_b.start().await.unwrap();
(
(streams_a.remove(0), actions_a.remove(0)),
(streams_b.remove(0), actions_b.remove(0)),
)
} else {
let mut plugin = Plugin::default();
let a = stream_action(&mut plugin, "a", 0, &POOL[0..2]).await;
let b = stream_action(&mut plugin, "b", 1, &POOL[0..2]).await;
plugin.finish_setup().await.unwrap();
(a, b)
let a = stream_action("a", 0, &POOL[0..2]).await;
let b = stream_action("b", 1, &POOL[0..2]).await;
let (mut streams, mut actions) = plugin
.load_config(vec![a.0, b.0], vec![a.1, b.1])
.await
.unwrap();
plugin.start().await.unwrap();
(
(streams.remove(0), actions.remove(0)),
(streams.remove(1), actions.remove(1)),
)
};
for m in ["test1", "test2", "test3"] {
@ -238,7 +244,6 @@ async fn n_nodes_simultaneous_startup() {
let mut plugin = Plugin::default();
let name = format!("n{i}");
let (stream, action) = stream_action(
&mut plugin,
&name,
i,
&POOL[0..n]
@ -252,10 +257,14 @@ async fn n_nodes_simultaneous_startup() {
.as_slice(),
)
.await;
plugin.finish_setup().await.unwrap();
let (mut stream, mut action) = plugin
.load_config(vec![stream], vec![action])
.await
.unwrap();
plugin.start().await.unwrap();
plugins.push(plugin);
streams.push(stream);
actions.push((action, name));
streams.push(stream.pop().unwrap());
actions.push((action.pop().unwrap(), name));
}
for m in ["test1", "test2", "test3", "test4", "test5"] {

View file

@ -1,7 +1,7 @@
use std::{env::set_current_dir, time::Duration};
use assert_fs::TempDir;
use reaction_plugin::{Exec, PluginInfo};
use reaction_plugin::{ActionConfig, Exec, PluginInfo, StreamConfig};
use serde_json::json;
use tokio::time::timeout;
use treedb::time::now;
@ -19,33 +19,34 @@ async fn run_with_self() {
for self_ in [true, false] {
let mut plugin = Plugin::default();
let mut stream = plugin
.stream_impl(
"stream".into(),
"cluster".into(),
stream_ok_port(2052).into(),
let (mut streams, mut actions) = plugin
.load_config(
vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "cluster".into(),
config: stream_ok_port(2052).into(),
}],
vec![ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action".into(),
action_type: "cluster_send".into(),
config: json!({
"send": "message <test>",
"to": "stream",
"self": self_,
})
.into(),
patterns: vec!["test".into()],
}],
)
.await
.unwrap();
assert!(stream.standalone);
let action = plugin
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"cluster_send".into(),
json!({
"send": "message <test>",
"to": "stream",
"self": self_,
})
.into(),
vec!["test".into()],
)
.await
.unwrap();
assert!(plugin.finish_setup().await.is_ok());
let mut stream = streams.pop().unwrap();
let action = actions.pop().unwrap();
assert!(stream.standalone);
assert!(plugin.start().await.is_ok());
for m in ["test1", "test2", "test3", " a a a aa a a"] {
let time = now().into();