diff --git a/plugins/reaction-plugin-cluster/src/main.rs b/plugins/reaction-plugin-cluster/src/main.rs index fd52fb1..b931026 100644 --- a/plugins/reaction-plugin-cluster/src/main.rs +++ b/plugins/reaction-plugin-cluster/src/main.rs @@ -7,8 +7,8 @@ use std::{ use iroh::{EndpointAddr, PublicKey, SecretKey, TransportAddr}; use reaction_plugin::{ - ActionImpl, Exec, Hello, Line, Manifest, PluginInfo, RemoteResult, StreamImpl, Value, - line::PatternLine, main_loop, shutdown::ShutdownController, time::parse_duration, + ActionConfig, ActionImpl, Exec, Hello, Line, Manifest, PluginInfo, RemoteResult, StreamConfig, + StreamImpl, line::PatternLine, main_loop, shutdown::ShutdownController, time::parse_duration, }; use remoc::{rch::mpsc, rtc}; use serde::{Deserialize, Serialize}; @@ -32,8 +32,7 @@ async fn main() { #[derive(Default)] struct Plugin { - streams: BTreeMap, - actions: BTreeMap>, + init: BTreeMap)>, cluster_shutdown: ShutdownController, } @@ -108,118 +107,136 @@ impl PluginInfo for Plugin { actions: BTreeSet::from(["cluster_send".into()]), }) } - - async fn stream_impl( + async fn load_config( &mut self, - stream_name: String, - stream_type: String, - config: Value, - ) -> RemoteResult { - if &stream_type != "cluster" { - return Err("This plugin can't handle other stream types than cluster".into()); - } + streams: Vec, + actions: Vec, + ) -> RemoteResult<(Vec, Vec)> { + let mut ret_streams = Vec::with_capacity(streams.len()); + let mut ret_actions = Vec::with_capacity(actions.len()); - let options: StreamOptions = serde_json::from_value(config.into()) - .map_err(|err| format!("invalid options: {err}"))?; + for StreamConfig { + stream_name, + stream_type, + config, + } in streams + { + if &stream_type != "cluster" { + return Err("This plugin can't handle other stream types than cluster".into()); + } - let mut nodes = BTreeMap::default(); + let options: StreamOptions = serde_json::from_value(config.into()) + .map_err(|err| format!("invalid options: {err}"))?; - let message_timeout = parse_duration(&options.message_timeout) - .map_err(|err| format!("invalid message_timeout: {err}"))?; + let mut nodes = BTreeMap::default(); - if options.bind_ipv4.is_none() && options.bind_ipv6.is_none() { - Err( - "At least one of bind_ipv4 and bind_ipv6 must be enabled. Unset at least one of them or set at least one of them to an IP.", - )?; - } + let message_timeout = parse_duration(&options.message_timeout) + .map_err(|err| format!("invalid message_timeout: {err}"))?; - if options.nodes.is_empty() { - Err("At least one remote node has to be configured for a cluster")?; - } + if options.bind_ipv4.is_none() && options.bind_ipv6.is_none() { + Err( + "At least one of bind_ipv4 and bind_ipv6 must be enabled. Unset at least one of them or set at least one of them to an IP.", + )?; + } - for node in options.nodes.into_iter() { - let bytes = key::key_b64_to_bytes(&node.public_key) - .map_err(|err| format!("invalid public key {}: {err}", node.public_key))?; + if options.nodes.is_empty() { + Err("At least one remote node has to be configured for a cluster")?; + } - let public_key = PublicKey::from_bytes(&bytes) - .map_err(|err| format!("invalid public key {}: {err}", node.public_key))?; + for node in options.nodes.into_iter() { + let bytes = key::key_b64_to_bytes(&node.public_key) + .map_err(|err| format!("invalid public key {}: {err}", node.public_key))?; - nodes.insert( - public_key, - EndpointAddr { - id: public_key, - addrs: node - .addresses - .into_iter() - .map(|addr| TransportAddr::Ip(addr)) - .collect(), - }, + let public_key = PublicKey::from_bytes(&bytes) + .map_err(|err| format!("invalid public key {}: {err}", node.public_key))?; + + nodes.insert( + public_key, + EndpointAddr { + id: public_key, + addrs: node + .addresses + .into_iter() + .map(|addr| TransportAddr::Ip(addr)) + .collect(), + }, + ); + } + + let secret_key = key::secret_key(".", &stream_name).await?; + eprintln!( + "INFO public key of this node for cluster {stream_name}: {}", + secret_key.public().show() ); + + let (tx, rx) = mpsc::channel(1); + + let stream = StreamInit { + name: stream_name.clone(), + listen_port: options.listen_port, + bind_ipv4: options.bind_ipv4, + bind_ipv6: options.bind_ipv6, + secret_key, + message_timeout, + nodes, + tx, + }; + + if let Some(_) = self.init.insert(stream_name, (stream, vec![])) { + return Err("this virtual stream has already been initialized".into()); + } + + ret_streams.push(StreamImpl { + stream: rx, + standalone: true, + }) } - let secret_key = key::secret_key(".", &stream_name).await?; - eprintln!( - "INFO public key of this node for cluster {stream_name}: {}", - secret_key.public().show() - ); + for ActionConfig { + stream_name, + filter_name, + action_name, + action_type, + config, + patterns, + } in actions + { + if &action_type != "cluster_send" { + return Err( + "This plugin can't handle other action types than 'cluster_send'".into(), + ); + } - let (tx, rx) = mpsc::channel(1); + let options: ActionOptions = serde_json::from_value(config.into()) + .map_err(|err| format!("invalid options: {err}"))?; - let stream = StreamInit { - name: stream_name.clone(), - listen_port: options.listen_port, - bind_ipv4: options.bind_ipv4, - bind_ipv6: options.bind_ipv6, - secret_key, - message_timeout, - nodes, - tx, - }; + let (tx, rx) = mpsc::channel(1); - if let Some(_) = self.streams.insert(stream_name, stream) { - return Err("this virtual stream has already been initialized".into()); + let init_action = ActionInit { + name: format!("{}.{}.{}", stream_name, filter_name, action_name), + send: PatternLine::new(options.send, patterns), + self_: options.self_, + rx, + }; + + match self.init.get_mut(&options.to) { + Some((_, actions)) => actions.push(init_action), + None => { + return Err(format!( + "ERROR action '{}' sends 'to' unknown stream '{}'", + init_action.name, options.to + ) + .into()); + } + } + + ret_actions.push(ActionImpl { tx }) } - Ok(StreamImpl { - stream: rx, - standalone: true, - }) + Ok((ret_streams, ret_actions)) } - async fn action_impl( - &mut self, - stream_name: String, - filter_name: String, - action_name: String, - action_type: String, - config: Value, - patterns: Vec, - ) -> RemoteResult { - if &action_type != "cluster_send" { - return Err("This plugin can't handle other action types than 'cluster_send'".into()); - } - - let options: ActionOptions = serde_json::from_value(config.into()) - .map_err(|err| format!("invalid options: {err}"))?; - - let (tx, rx) = mpsc::channel(1); - - let init_action = ActionInit { - name: format!("{}.{}.{}", stream_name, filter_name, action_name), - send: PatternLine::new(options.send, patterns), - self_: options.self_, - rx, - }; - - self.actions - .entry(options.to) - .or_default() - .push(init_action); - - Ok(ActionImpl { tx }) - } - - async fn finish_setup(&mut self) -> RemoteResult<()> { + async fn start(&mut self) -> RemoteResult<()> { let mut db = { let path = PathBuf::from("."); let (cancellation_token, task_tracker_token) = self.cluster_shutdown.token().split(); @@ -228,33 +245,20 @@ impl PluginInfo for Plugin { .map_err(|err| format!("Can't open database: {err}"))? }; - while let Some((stream_name, stream)) = self.streams.pop_first() { + while let Some((_, (stream, actions))) = self.init.pop_first() { let endpoint = cluster::bind(&stream).await?; cluster::cluster_tasks( endpoint, stream, - self.actions.remove(&stream_name).unwrap_or_default(), + actions, &mut db, self.cluster_shutdown.clone(), ) .await?; } - // Check there is no action left - if !self.actions.is_empty() { - for (to, actions) in &self.actions { - for action in actions { - eprintln!( - "ERROR action '{}' sends 'to' unknown stream '{}'", - action.name, to - ); - } - } - return Err("at least one cluster_send action has unknown 'to'".into()); - } // Free containers - self.actions = Default::default(); - self.streams = Default::default(); - eprintln!("DEBUG finished setup."); + self.init = Default::default(); + eprintln!("DEBUG started"); Ok(()) } diff --git a/plugins/reaction-plugin-cluster/src/tests/conf.rs b/plugins/reaction-plugin-cluster/src/tests/conf.rs index 68e7d42..2da4974 100644 --- a/plugins/reaction-plugin-cluster/src/tests/conf.rs +++ b/plugins/reaction-plugin-cluster/src/tests/conf.rs @@ -1,12 +1,12 @@ use std::env::set_current_dir; use assert_fs::TempDir; -use reaction_plugin::PluginInfo; +use reaction_plugin::{ActionConfig, PluginInfo, StreamConfig}; use serde_json::json; use crate::{Plugin, tests::insert_secret_key}; -use super::{PUBLIC_KEY_A, TEST_MUTEX, stream_ok, stream_ok_port}; +use super::{PUBLIC_KEY_A, TEST_MUTEX, stream_ok}; #[tokio::test] async fn conf_stream() { @@ -18,7 +18,14 @@ async fn conf_stream() { // Invalid type assert!( Plugin::default() - .stream_impl("stream".into(), "clust".into(), stream_ok().into(),) + .load_config( + vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "clust".into(), + config: stream_ok().into(), + }], + vec![] + ) .await .is_err() ); @@ -102,7 +109,14 @@ async fn conf_stream() { ] { assert!(is_ok( &Plugin::default() - .stream_impl("stream".into(), "cluster".into(), json.into()) + .load_config( + vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "cluster".into(), + config: json.into(), + }], + vec![] + ) .await )); } @@ -115,17 +129,24 @@ async fn conf_action() { // Invalid type assert!( Plugin::default() - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "cluster_sen".into(), - json!({ - "send": "", - "to": "stream", - }) - .into(), - patterns.clone(), + .load_config( + vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "cluster".into(), + config: stream_ok().into(), + }], + vec![ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action".into(), + action_type: "cluster_sen".into(), + config: json!({ + "send": "", + "to": "stream", + }) + .into(), + patterns: patterns.clone(), + }] ) .await .is_err() @@ -134,64 +155,79 @@ async fn conf_action() { for (json, is_ok) in [ ( json!({ - "send": "", - "to": "stream", + "send": "", + "to": "stream", }), - Result::is_ok as fn(&_) -> bool, + true, ), ( json!({ - "send": "", - "to": "stream", - "self": true, + "send": "", + "to": "stream", + "self": true, }), - Result::is_ok, + true, ), ( json!({ - "send": "", - "to": "stream", - "self": false, + "send": "", + "to": "stream", + "self": false, }), - Result::is_ok, + true, ), ( // missing to json!({ - "send": "", + "send": "", }), - Result::is_err, + false, ), ( // missing send json!({ - "to": "stream", + "to": "stream", }), - Result::is_err, + false, ), ( // invalid self json!({ - "send": "", - "to": "stream", - "self": "true", + "send": "", + "to": "stream", + "self": "true", }), - Result::is_err, + false, + ), + ( + // missing conf + json!({}), + false, ), - (json!({}), Result::is_err), ] { - assert!(is_ok( - &Plugin::default() - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "cluster_send".into(), - json.into(), - patterns.clone(), - ) - .await - )); + let ret = Plugin::default() + .load_config( + vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "cluster".into(), + config: stream_ok().into(), + }], + vec![ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action".into(), + action_type: "cluster_send".into(), + config: json.clone().into(), + patterns: patterns.clone(), + }], + ) + .await; + + assert!( + ret.is_ok() == is_ok, + "is_ok: {is_ok}, ret: {:?}, action conf: {json:?}", + ret.map(|_| ()) + ); } } @@ -203,62 +239,55 @@ async fn conf_send() { insert_secret_key().await; // No action is ok - let mut plugin = Plugin::default(); - plugin - .stream_impl( - "stream".into(), - "cluster".into(), - stream_ok_port(2051).into(), + let res = Plugin::default() + .load_config( + vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "cluster".into(), + config: stream_ok().into(), + }], + vec![], ) - .await - .unwrap(); - let res = plugin.finish_setup().await; - eprintln!("{res:?}"); - assert!(res.is_ok()); + .await; + assert!(res.is_ok(), "{:?}", res.map(|_| ())); // An action is ok - let mut plugin = Plugin::default(); - plugin - .stream_impl( - "stream".into(), - "cluster".into(), - stream_ok_port(2052).into(), + let res = Plugin::default() + .load_config( + vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "cluster".into(), + config: stream_ok().into(), + }], + vec![ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action".into(), + action_type: "cluster_send".into(), + config: json!({ "send": "message", "to": "stream" }).into(), + patterns: vec![], + }], ) - .await - .unwrap(); - plugin - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "cluster_send".into(), - json!({ "send": "message", "to": "stream" }).into(), - Vec::default(), - ) - .await - .unwrap(); - assert!(plugin.finish_setup().await.is_ok()); + .await; + assert!(res.is_ok(), "{:?}", res.map(|_| ())); // Invalid to: option - let mut plugin = Plugin::default(); - plugin - .stream_impl( - "stream".into(), - "cluster".into(), - stream_ok_port(2053).into(), + let res = Plugin::default() + .load_config( + vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "cluster".into(), + config: stream_ok().into(), + }], + vec![ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action".into(), + action_type: "cluster_send".into(), + config: json!({ "send": "message", "to": "stream1" }).into(), + patterns: vec![], + }], ) - .await - .unwrap(); - plugin - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "cluster_send".into(), - json!({ "send": "message", "to": "stream1" }).into(), - Vec::default(), - ) - .await - .unwrap(); - assert!(plugin.finish_setup().await.is_err()); + .await; + assert!(res.is_err(), "{:?}", res.map(|_| ())); } diff --git a/plugins/reaction-plugin-cluster/src/tests/e2e.rs b/plugins/reaction-plugin-cluster/src/tests/e2e.rs index 4c25468..fbf3a28 100644 --- a/plugins/reaction-plugin-cluster/src/tests/e2e.rs +++ b/plugins/reaction-plugin-cluster/src/tests/e2e.rs @@ -1,7 +1,7 @@ use std::{env::set_current_dir, time::Duration}; use assert_fs::TempDir; -use reaction_plugin::{ActionImpl, Exec, PluginInfo, StreamImpl}; +use reaction_plugin::{ActionConfig, Exec, PluginInfo, StreamConfig}; use serde_json::json; use tokio::{fs, time::timeout}; use treedb::time::now; @@ -98,11 +98,10 @@ const POOL: [TestNode; 15] = [ ]; async fn stream_action( - plugin: &mut Plugin, name: &str, index: usize, nodes: &[TestNode], -) -> (StreamImpl, ActionImpl) { +) -> (StreamConfig, ActionConfig) { let stream_name = format!("stream_{name}"); let this_node = &nodes[index]; let other_nodes: Vec<_> = nodes @@ -120,37 +119,30 @@ async fn stream_action( .await .unwrap(); - let stream = plugin - .stream_impl( - stream_name.clone(), - "cluster".into(), - json!({ + ( + StreamConfig { + stream_name: stream_name.clone(), + stream_type: "cluster".into(), + config: json!({ "message_timeout": "30s", "listen_port": this_node.port, "nodes": other_nodes, }) .into(), - ) - .await - .unwrap(); - - let action = plugin - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "cluster_send".into(), - json!({ + }, + ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action".into(), + action_type: "cluster_send".into(), + config: json!({ "send": format!("from {name}: "), "to": stream_name, }) .into(), - vec!["test".into()], - ) - .await - .unwrap(); - - (stream, action) + patterns: vec!["test".into()], + }, + ) } #[tokio::test] @@ -162,19 +154,33 @@ async fn two_nodes_simultaneous_startup() { let ((mut stream_a, action_a), (mut stream_b, action_b)) = if separate_plugin { let mut plugin_a = Plugin::default(); - let a = stream_action(&mut plugin_a, "a", 0, &POOL[0..2]).await; - plugin_a.finish_setup().await.unwrap(); + let (sa, aa) = stream_action("a", 0, &POOL[0..2]).await; + let (mut streams_a, mut actions_a) = + plugin_a.load_config(vec![sa], vec![aa]).await.unwrap(); + plugin_a.start().await.unwrap(); let mut plugin_b = Plugin::default(); - let b = stream_action(&mut plugin_b, "b", 1, &POOL[0..2]).await; - plugin_b.finish_setup().await.unwrap(); - (a, b) + let (sb, ab) = stream_action("b", 1, &POOL[0..2]).await; + let (mut streams_b, mut actions_b) = + plugin_b.load_config(vec![sb], vec![ab]).await.unwrap(); + plugin_b.start().await.unwrap(); + ( + (streams_a.remove(0), actions_a.remove(0)), + (streams_b.remove(0), actions_b.remove(0)), + ) } else { let mut plugin = Plugin::default(); - let a = stream_action(&mut plugin, "a", 0, &POOL[0..2]).await; - let b = stream_action(&mut plugin, "b", 1, &POOL[0..2]).await; - plugin.finish_setup().await.unwrap(); - (a, b) + let a = stream_action("a", 0, &POOL[0..2]).await; + let b = stream_action("b", 1, &POOL[0..2]).await; + let (mut streams, mut actions) = plugin + .load_config(vec![a.0, b.0], vec![a.1, b.1]) + .await + .unwrap(); + plugin.start().await.unwrap(); + ( + (streams.remove(0), actions.remove(0)), + (streams.remove(1), actions.remove(1)), + ) }; for m in ["test1", "test2", "test3"] { @@ -238,7 +244,6 @@ async fn n_nodes_simultaneous_startup() { let mut plugin = Plugin::default(); let name = format!("n{i}"); let (stream, action) = stream_action( - &mut plugin, &name, i, &POOL[0..n] @@ -252,10 +257,14 @@ async fn n_nodes_simultaneous_startup() { .as_slice(), ) .await; - plugin.finish_setup().await.unwrap(); + let (mut stream, mut action) = plugin + .load_config(vec![stream], vec![action]) + .await + .unwrap(); + plugin.start().await.unwrap(); plugins.push(plugin); - streams.push(stream); - actions.push((action, name)); + streams.push(stream.pop().unwrap()); + actions.push((action.pop().unwrap(), name)); } for m in ["test1", "test2", "test3", "test4", "test5"] { diff --git a/plugins/reaction-plugin-cluster/src/tests/self_.rs b/plugins/reaction-plugin-cluster/src/tests/self_.rs index 530e8ee..c06d55b 100644 --- a/plugins/reaction-plugin-cluster/src/tests/self_.rs +++ b/plugins/reaction-plugin-cluster/src/tests/self_.rs @@ -1,7 +1,7 @@ use std::{env::set_current_dir, time::Duration}; use assert_fs::TempDir; -use reaction_plugin::{Exec, PluginInfo}; +use reaction_plugin::{ActionConfig, Exec, PluginInfo, StreamConfig}; use serde_json::json; use tokio::time::timeout; use treedb::time::now; @@ -19,33 +19,34 @@ async fn run_with_self() { for self_ in [true, false] { let mut plugin = Plugin::default(); - let mut stream = plugin - .stream_impl( - "stream".into(), - "cluster".into(), - stream_ok_port(2052).into(), + let (mut streams, mut actions) = plugin + .load_config( + vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "cluster".into(), + config: stream_ok_port(2052).into(), + }], + vec![ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action".into(), + action_type: "cluster_send".into(), + config: json!({ + "send": "message ", + "to": "stream", + "self": self_, + }) + .into(), + patterns: vec!["test".into()], + }], ) .await .unwrap(); - assert!(stream.standalone); - let action = plugin - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "cluster_send".into(), - json!({ - "send": "message ", - "to": "stream", - "self": self_, - }) - .into(), - vec!["test".into()], - ) - .await - .unwrap(); - assert!(plugin.finish_setup().await.is_ok()); + let mut stream = streams.pop().unwrap(); + let action = actions.pop().unwrap(); + assert!(stream.standalone); + assert!(plugin.start().await.is_ok()); for m in ["test1", "test2", "test3", " a a a aa a a"] { let time = now().into();