diff --git a/TODO b/TODO index 0cbaa1c..a6aa816 100644 --- a/TODO +++ b/TODO @@ -1,5 +1,4 @@ Test what happens when a Filter's pattern Set changes (I think it's shitty) DB: add tests on stress testing (lines should always be in order) conf: merge filters -plugins: pipe stderr too and wrap errors in logs -plugins: provide treedb storage? omg (add an enum that's either remoc::rch::mpsc or tokio::mpsc) +plugins: pipe stderr too and wrap errors in logs. fix errors? diff --git a/plugins/reaction-plugin-cluster/src/main.rs b/plugins/reaction-plugin-cluster/src/main.rs index 6ec3645..fd52fb1 100644 --- a/plugins/reaction-plugin-cluster/src/main.rs +++ b/plugins/reaction-plugin-cluster/src/main.rs @@ -21,6 +21,9 @@ mod connection; mod endpoint; mod key; +#[cfg(test)] +mod tests; + #[tokio::main] async fn main() { let plugin = Plugin::default(); @@ -63,6 +66,7 @@ fn ipv6_unspecified() -> Option { #[derive(Serialize, Deserialize)] struct NodeOption { public_key: String, + #[serde(default)] addresses: Vec, } @@ -123,6 +127,16 @@ impl PluginInfo for Plugin { let message_timeout = parse_duration(&options.message_timeout) .map_err(|err| format!("invalid message_timeout: {err}"))?; + if options.bind_ipv4.is_none() && options.bind_ipv6.is_none() { + Err( + "At least one of bind_ipv4 and bind_ipv6 must be enabled. Unset at least one of them or set at least one of them to an IP.", + )?; + } + + if options.nodes.is_empty() { + Err("At least one remote node has to be configured for a cluster")?; + } + for node in options.nodes.into_iter() { let bytes = key::key_b64_to_bytes(&node.public_key) .map_err(|err| format!("invalid public key {}: {err}", node.public_key))?; @@ -182,7 +196,7 @@ impl PluginInfo for Plugin { patterns: Vec, ) -> RemoteResult { if &action_type != "cluster_send" { - return Err("This plugin can't handle other action types than cluster".into()); + return Err("This plugin can't handle other action types than 'cluster_send'".into()); } let options: ActionOptions = serde_json::from_value(config.into()) diff --git a/plugins/reaction-plugin-cluster/src/tests/conf.rs b/plugins/reaction-plugin-cluster/src/tests/conf.rs new file mode 100644 index 0000000..b40dfb4 --- /dev/null +++ b/plugins/reaction-plugin-cluster/src/tests/conf.rs @@ -0,0 +1,258 @@ +use reaction_plugin::PluginInfo; +use serde_json::json; + +use crate::Plugin; + +use super::{PUBLIC_KEY_A, TEST_MUTEX, stream_ok, stream_ok_port}; + +#[tokio::test] +async fn conf_stream() { + // Minimal node configuration + let nodes = json!([{ + "public_key": PUBLIC_KEY_A, + }]); + + // Invalid type + assert!( + Plugin::default() + .stream_impl("stream".into(), "clust".into(), stream_ok().into(),) + .await + .is_err() + ); + + for (json, is_ok) in [ + ( + json!({ + "listen_port": 2048, + "nodes": nodes, + "message_timeout": "30m", + }), + Result::is_ok as fn(&_) -> bool, + ), + ( + // invalid time + json!({ + "listen_port": 2048, + "nodes": nodes, + "message_timeout": "30pv", + }), + Result::is_err, + ), + ( + json!({ + "listen_port": 2048, + "bind_ipv4": "0.0.0.0", + "nodes": nodes, + "message_timeout": "30m", + }), + Result::is_ok, + ), + ( + json!({ + "listen_port": 2048, + "bind_ipv6": "::", + "nodes": nodes, + "message_timeout": "30m", + }), + Result::is_ok, + ), + ( + json!({ + "listen_port": 2048, + "bind_ipv4": "0.0.0.0", + "bind_ipv6": "::", + "nodes": nodes, + "message_timeout": "30m", + }), + Result::is_ok, + ), + ( + json!({ + "listen_port": 2048, + "bind_ipv4": null, + "nodes": nodes, + "message_timeout": "30m", + }), + Result::is_ok, + ), + ( + json!({ + "listen_port": 2048, + "bind_ipv6": null, + "nodes": nodes, + "message_timeout": "30m", + }), + Result::is_ok, + ), + ( + // No bind + json!({ + "listen_port": 2048, + "bind_ipv4": null, + "bind_ipv6": null, + "nodes": nodes, + "message_timeout": "30m", + }), + Result::is_err, + ), + (json!({}), Result::is_err), + ] { + assert!(is_ok( + &Plugin::default() + .stream_impl("stream".into(), "cluster".into(), json.into()) + .await + )); + } +} + +#[tokio::test] +async fn conf_action() { + let patterns = vec!["p1".into(), "p2".into()]; + + // Invalid type + assert!( + Plugin::default() + .action_impl( + "stream".into(), + "filter".into(), + "action".into(), + "cluster_sen".into(), + json!({ + "send": "", + "to": "stream", + }) + .into(), + patterns.clone(), + ) + .await + .is_err() + ); + + for (json, is_ok) in [ + ( + json!({ + "send": "", + "to": "stream", + }), + Result::is_ok as fn(&_) -> bool, + ), + ( + json!({ + "send": "", + "to": "stream", + "self": true, + }), + Result::is_ok, + ), + ( + json!({ + "send": "", + "to": "stream", + "self": false, + }), + Result::is_ok, + ), + ( + // missing to + json!({ + "send": "", + }), + Result::is_err, + ), + ( + // missing send + json!({ + "to": "stream", + }), + Result::is_err, + ), + ( + // invalid self + json!({ + "send": "", + "to": "stream", + "self": "true", + }), + Result::is_err, + ), + (json!({}), Result::is_err), + ] { + assert!(is_ok( + &Plugin::default() + .action_impl( + "stream".into(), + "filter".into(), + "action".into(), + "cluster_send".into(), + json.into(), + patterns.clone(), + ) + .await + )); + } +} + +#[tokio::test] +async fn conf_send() { + let _lock = TEST_MUTEX.lock(); + + // No action is ok + let mut plugin = Plugin::default(); + plugin + .stream_impl( + "stream".into(), + "cluster".into(), + stream_ok_port(2051).into(), + ) + .await + .unwrap(); + let res = plugin.finish_setup().await; + eprintln!("{res:?}"); + assert!(res.is_ok()); + + // An action is ok + let mut plugin = Plugin::default(); + plugin + .stream_impl( + "stream".into(), + "cluster".into(), + stream_ok_port(2052).into(), + ) + .await + .unwrap(); + plugin + .action_impl( + "stream".into(), + "filter".into(), + "action".into(), + "cluster_send".into(), + json!({ "send": "message", "to": "stream" }).into(), + Vec::default(), + ) + .await + .unwrap(); + assert!(plugin.finish_setup().await.is_ok()); + + // Invalid to: option + let mut plugin = Plugin::default(); + plugin + .stream_impl( + "stream".into(), + "cluster".into(), + stream_ok_port(2053).into(), + ) + .await + .unwrap(); + plugin + .action_impl( + "stream".into(), + "filter".into(), + "action".into(), + "cluster_send".into(), + json!({ "send": "message", "to": "stream1" }).into(), + Vec::default(), + ) + .await + .unwrap(); + assert!(plugin.finish_setup().await.is_err()); +} diff --git a/plugins/reaction-plugin-cluster/src/tests/e2e.rs b/plugins/reaction-plugin-cluster/src/tests/e2e.rs new file mode 100644 index 0000000..f9092ee --- /dev/null +++ b/plugins/reaction-plugin-cluster/src/tests/e2e.rs @@ -0,0 +1,81 @@ +use std::{env::set_current_dir, time::Duration}; + +use assert_fs::TempDir; +use reaction_plugin::{Exec, PluginInfo}; +use serde_json::json; +use tokio::time::timeout; +use treedb::time::now; + +use crate::{ + Plugin, + tests::{PUBLIC_KEY_B, TEST_MUTEX}, +}; + +#[tokio::test] +async fn two_nodes_simultaneous_startup() { + let _lock = TEST_MUTEX.lock(); + let dir = TempDir::new().unwrap(); + set_current_dir(dir).unwrap(); + + let mut plugin = Plugin::default(); + let mut stream = plugin + .stream_impl( + "stream".into(), + "cluster".into(), + json!({ + "listen_port": 2054, + "nodes": [{ + "public_key": PUBLIC_KEY_B, + }] + }) + .into(), + ) + .await + .unwrap(); + assert!(stream.standalone); + + let action = plugin + .action_impl( + "stream".into(), + "filter".into(), + "action".into(), + "cluster_send".into(), + json!({ + "send": "message ", + "to": "stream", + "self": false, + }) + .into(), + vec!["test".into()], + ) + .await + .unwrap(); + assert!(plugin.finish_setup().await.is_ok()); + + for m in ["test1", "test2", "test3", " a a a aa a a"] { + let time = now().into(); + assert!( + action + .tx + .send(Exec { + match_: vec![m.into()], + time, + }) + .await + .is_ok() + ); + if self_ { + assert_eq!( + stream.stream.recv().await.unwrap().unwrap(), + (format!("message {m}"), time), + ); + } else { + // Don't receive anything + assert!( + timeout(Duration::from_millis(100), stream.stream.recv()) + .await + .is_err() + ); + } + } +} diff --git a/plugins/reaction-plugin-cluster/src/tests/mod.rs b/plugins/reaction-plugin-cluster/src/tests/mod.rs new file mode 100644 index 0000000..5eb65d5 --- /dev/null +++ b/plugins/reaction-plugin-cluster/src/tests/mod.rs @@ -0,0 +1,29 @@ +use std::sync::{LazyLock, Mutex}; + +use serde_json::json; + +mod conf; +mod e2e; + +const SECRET_KEY_A: &str = "g7U1LPq2cgGSyk6CH_v1QpoXowSFKVQ8IcFljd_ZKGw="; +const PUBLIC_KEY_A: &str = "HhVh7ghqpXM9375HZ82OOeB504HBSS25wgug-1vUggY="; + +const SECRET_KEY_B: &str = "5EgRjwIpqd60IXWCGg5dFTtxkI-0fS1PlhoIhUjh1eY="; +const PUBLIC_KEY_B: &str = "LPSQ9pS7m_5vvNC-fhoBNeL2-eS2Fd6aO4ImSnXp3lc="; + +// Tests that spawn a database in current directory must be run one at a time +static TEST_MUTEX: LazyLock> = LazyLock::new(|| Mutex::new(())); + +fn stream_ok_port(port: u16) -> serde_json::Value { + json!({ + "listen_port": port, + "nodes": [{ + "public_key": PUBLIC_KEY_A, + }], + "message_timeout": "30m", + }) +} + +fn stream_ok() -> serde_json::Value { + stream_ok_port(2048) +} diff --git a/plugins/reaction-plugin-cluster/src/tests/self.rs b/plugins/reaction-plugin-cluster/src/tests/self.rs new file mode 100644 index 0000000..9b9dc0c --- /dev/null +++ b/plugins/reaction-plugin-cluster/src/tests/self.rs @@ -0,0 +1,75 @@ +use std::time::Duration; + +use reaction_plugin::{Exec, PluginInfo}; +use serde_json::json; +use tokio::time::timeout; +use treedb::time::now; + +use crate::Plugin; + +use super::{TEST_MUTEX, stream_ok_port}; + +#[tokio::test] +async fn run_with_self() { + let _lock = TEST_MUTEX.lock(); + let dir = TempDir::new().unwrap(); + set_current_dir(dir).unwrap(); + + for self_ in [true, false] { + let mut plugin = Plugin::default(); + let mut stream = plugin + .stream_impl( + "stream".into(), + "cluster".into(), + stream_ok_port(2052).into(), + ) + .await + .unwrap(); + assert!(stream.standalone); + + let action = plugin + .action_impl( + "stream".into(), + "filter".into(), + "action".into(), + "cluster_send".into(), + json!({ + "send": "message ", + "to": "stream", + "self": self_, + }) + .into(), + vec!["test".into()], + ) + .await + .unwrap(); + assert!(plugin.finish_setup().await.is_ok()); + + for m in ["test1", "test2", "test3", " a a a aa a a"] { + let time = now().into(); + assert!( + action + .tx + .send(Exec { + match_: vec![m.into()], + time, + }) + .await + .is_ok() + ); + if self_ { + assert_eq!( + stream.stream.recv().await.unwrap().unwrap(), + (format!("message {m}"), time), + ); + } else { + // Don't receive anything + assert!( + timeout(Duration::from_millis(100), stream.stream.recv()) + .await + .is_err() + ); + } + } + } +} diff --git a/plugins/reaction-plugin-virtual/src/tests.rs b/plugins/reaction-plugin-virtual/src/tests.rs index e1170c5..b08f416 100644 --- a/plugins/reaction-plugin-virtual/src/tests.rs +++ b/plugins/reaction-plugin-virtual/src/tests.rs @@ -145,6 +145,9 @@ async fn conf_send() { assert!(plugin.finish_setup().await.is_err()); } +// Let's allow empty streams for now. +// I guess it can be useful to have manual only actions. +// // #[tokio::test] // async fn conf_empty_stream() { // let mut plugin = Plugin::default(); diff --git a/tests/plugin_cluster.rs b/tests/plugin_cluster.rs index 524ba89..3f43107 100644 --- a/tests/plugin_cluster.rs +++ b/tests/plugin_cluster.rs @@ -3,10 +3,16 @@ use std::{fs::read_to_string, path::Path, thread, time::Duration}; use assert_cmd::Command; use assert_fs::prelude::*; -// require UDP ports 9876 & 9877 to be free on 127.0.0.1 +const SECRET_KEY_A: &str = "g7U1LPq2cgGSyk6CH_v1QpoXowSFKVQ8IcFljd_ZKGw="; +const PUBLIC_KEY_A: &str = "HhVh7ghqpXM9375HZ82OOeB504HBSS25wgug-1vUggY="; + +const SECRET_KEY_B: &str = "5EgRjwIpqd60IXWCGg5dFTtxkI-0fS1PlhoIhUjh1eY="; +const PUBLIC_KEY_B: &str = "LPSQ9pS7m_5vvNC-fhoBNeL2-eS2Fd6aO4ImSnXp3lc="; + +// require UDP ports 9876-9879 to be free on 127.0.0.1 #[test] -fn plugin_cluster() { +fn plugin_cluster_same_startup() { // First build reaction-plugin-cluster Command::new("cargo") .args(["build", "-p", "reaction-plugin-cluster"]) @@ -14,18 +20,13 @@ fn plugin_cluster() { let config = read_to_string("tests/test-conf/test-cluster.jsonnet").unwrap(); - let secret_key_a = "g7U1LPq2cgGSyk6CH_v1QpoXowSFKVQ8IcFljd_ZKGw="; - let public_key_a = "HhVh7ghqpXM9375HZ82OOeB504HBSS25wgug-1vUggY="; - let secret_key_b = "5EgRjwIpqd60IXWCGg5dFTtxkI-0fS1PlhoIhUjh1eY="; - let public_key_b = "LPSQ9pS7m_5vvNC-fhoBNeL2-eS2Fd6aO4ImSnXp3lc="; - let config_a = config - .replace("PUBLIC_KEY", public_key_b) + .replace("PUBLIC_KEY", PUBLIC_KEY_B) .replace("NODE", "A") .replace("1234", "9876") .replace("4321", "9877"); let config_b = config - .replace("PUBLIC_KEY", public_key_a) + .replace("PUBLIC_KEY", PUBLIC_KEY_A) .replace("NODE", "B") .replace("1234", "9877") .replace("4321", "9876"); @@ -37,8 +38,47 @@ fn plugin_cluster() { "A a0 1", "A a0 2", "A a0 3", "A a0 4", "A b0 1", "A b0 2", "A b0 3", "A b0 4", "", ]; - let a_handle = thread::spawn(|| launch_node(config_a, secret_key_a, output_a)); - let b_handle = thread::spawn(|| launch_node(config_b, secret_key_b, output_b)); + let a_handle = thread::spawn(|| launch_node(config_a, SECRET_KEY_A, output_a)); + let b_handle = thread::spawn(|| launch_node(config_b, SECRET_KEY_B, output_b)); + + a_handle.join().unwrap(); + b_handle.join().unwrap(); +} + +#[test] +fn plugin_cluster_different_startup() { + // First build reaction-plugin-cluster + Command::new("cargo") + .args(["build", "-p", "reaction-plugin-cluster"]) + .unwrap(); + + let config = read_to_string("tests/test-conf/test-cluster.jsonnet").unwrap(); + + let config_a = config + .replace("PUBLIC_KEY", PUBLIC_KEY_B) + .replace("NODE", "A") + .replace("1234", "9878") + .replace("4321", "9879"); + let config_b = config + .replace("PUBLIC_KEY", PUBLIC_KEY_A) + .replace("NODE", "B") + .replace("1234", "9879") + .replace("4321", "9878"); + + let output_a = vec![ + "B a0 1", "B a0 2", "B a0 3", "B a0 4", "B b0 1", "B b0 2", "B b0 3", "B b0 4", "", + ]; + let output_b = vec![ + "A a0 1", "A a0 2", "A a0 3", "A a0 4", "A b0 1", "A b0 2", "A b0 3", "A b0 4", "", + ]; + + let a_handle = thread::spawn(|| launch_node(config_a, SECRET_KEY_A, output_a)); + let b_handle = thread::spawn(|| { + thread::sleep(Duration::from_secs(2)); + launch_node(config_b, SECRET_KEY_B, output_b); + }); + + // thread::sleep(Duration::from_secs(60)); a_handle.join().unwrap(); b_handle.join().unwrap(); @@ -60,7 +100,7 @@ fn launch_node(config: String, my_secret: &'static str, expected_output: Vec<&'s .write_file(Path::new("./target/debug/reaction-plugin-cluster")) .unwrap(); - Command::cargo_bin("reaction") + let output = Command::cargo_bin("reaction") .unwrap() .args([ "start", @@ -73,8 +113,13 @@ fn launch_node(config: String, my_secret: &'static str, expected_output: Vec<&'s ]) .current_dir(tmp_dir.path()) .timeout(Duration::from_secs(5)) - .assert() - .failure(); + .output() + .unwrap(); + + println!( + "command output:\n{}", + String::from_utf8(output.stdout).unwrap() + ); // Expected output tmp_dir.child("log").assert(expected_output.join("\n"));