cluster: add tests

- on configuration
- on sending messages to its own cluster
This commit is contained in:
ppom 2025-12-24 12:00:00 +01:00
commit 1e082086e5
No known key found for this signature in database
8 changed files with 521 additions and 17 deletions

3
TODO
View file

@ -1,5 +1,4 @@
Test what happens when a Filter's pattern Set changes (I think it's shitty)
DB: add tests on stress testing (lines should always be in order)
conf: merge filters
plugins: pipe stderr too and wrap errors in logs
plugins: provide treedb storage? omg (add an enum that's either remoc::rch::mpsc or tokio::mpsc)
plugins: pipe stderr too and wrap errors in logs. fix errors?

View file

@ -21,6 +21,9 @@ mod connection;
mod endpoint;
mod key;
#[cfg(test)]
mod tests;
#[tokio::main]
async fn main() {
let plugin = Plugin::default();
@ -63,6 +66,7 @@ fn ipv6_unspecified() -> Option<Ipv6Addr> {
#[derive(Serialize, Deserialize)]
struct NodeOption {
public_key: String,
#[serde(default)]
addresses: Vec<SocketAddr>,
}
@ -123,6 +127,16 @@ impl PluginInfo for Plugin {
let message_timeout = parse_duration(&options.message_timeout)
.map_err(|err| format!("invalid message_timeout: {err}"))?;
if options.bind_ipv4.is_none() && options.bind_ipv6.is_none() {
Err(
"At least one of bind_ipv4 and bind_ipv6 must be enabled. Unset at least one of them or set at least one of them to an IP.",
)?;
}
if options.nodes.is_empty() {
Err("At least one remote node has to be configured for a cluster")?;
}
for node in options.nodes.into_iter() {
let bytes = key::key_b64_to_bytes(&node.public_key)
.map_err(|err| format!("invalid public key {}: {err}", node.public_key))?;
@ -182,7 +196,7 @@ impl PluginInfo for Plugin {
patterns: Vec<String>,
) -> RemoteResult<ActionImpl> {
if &action_type != "cluster_send" {
return Err("This plugin can't handle other action types than cluster".into());
return Err("This plugin can't handle other action types than 'cluster_send'".into());
}
let options: ActionOptions = serde_json::from_value(config.into())

View file

@ -0,0 +1,258 @@
use reaction_plugin::PluginInfo;
use serde_json::json;
use crate::Plugin;
use super::{PUBLIC_KEY_A, TEST_MUTEX, stream_ok, stream_ok_port};
#[tokio::test]
async fn conf_stream() {
// Minimal node configuration
let nodes = json!([{
"public_key": PUBLIC_KEY_A,
}]);
// Invalid type
assert!(
Plugin::default()
.stream_impl("stream".into(), "clust".into(), stream_ok().into(),)
.await
.is_err()
);
for (json, is_ok) in [
(
json!({
"listen_port": 2048,
"nodes": nodes,
"message_timeout": "30m",
}),
Result::is_ok as fn(&_) -> bool,
),
(
// invalid time
json!({
"listen_port": 2048,
"nodes": nodes,
"message_timeout": "30pv",
}),
Result::is_err,
),
(
json!({
"listen_port": 2048,
"bind_ipv4": "0.0.0.0",
"nodes": nodes,
"message_timeout": "30m",
}),
Result::is_ok,
),
(
json!({
"listen_port": 2048,
"bind_ipv6": "::",
"nodes": nodes,
"message_timeout": "30m",
}),
Result::is_ok,
),
(
json!({
"listen_port": 2048,
"bind_ipv4": "0.0.0.0",
"bind_ipv6": "::",
"nodes": nodes,
"message_timeout": "30m",
}),
Result::is_ok,
),
(
json!({
"listen_port": 2048,
"bind_ipv4": null,
"nodes": nodes,
"message_timeout": "30m",
}),
Result::is_ok,
),
(
json!({
"listen_port": 2048,
"bind_ipv6": null,
"nodes": nodes,
"message_timeout": "30m",
}),
Result::is_ok,
),
(
// No bind
json!({
"listen_port": 2048,
"bind_ipv4": null,
"bind_ipv6": null,
"nodes": nodes,
"message_timeout": "30m",
}),
Result::is_err,
),
(json!({}), Result::is_err),
] {
assert!(is_ok(
&Plugin::default()
.stream_impl("stream".into(), "cluster".into(), json.into())
.await
));
}
}
#[tokio::test]
async fn conf_action() {
let patterns = vec!["p1".into(), "p2".into()];
// Invalid type
assert!(
Plugin::default()
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"cluster_sen".into(),
json!({
"send": "<p1>",
"to": "stream",
})
.into(),
patterns.clone(),
)
.await
.is_err()
);
for (json, is_ok) in [
(
json!({
"send": "<p1>",
"to": "stream",
}),
Result::is_ok as fn(&_) -> bool,
),
(
json!({
"send": "<p1>",
"to": "stream",
"self": true,
}),
Result::is_ok,
),
(
json!({
"send": "<p1>",
"to": "stream",
"self": false,
}),
Result::is_ok,
),
(
// missing to
json!({
"send": "<p1>",
}),
Result::is_err,
),
(
// missing send
json!({
"to": "stream",
}),
Result::is_err,
),
(
// invalid self
json!({
"send": "<p1>",
"to": "stream",
"self": "true",
}),
Result::is_err,
),
(json!({}), Result::is_err),
] {
assert!(is_ok(
&Plugin::default()
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"cluster_send".into(),
json.into(),
patterns.clone(),
)
.await
));
}
}
#[tokio::test]
async fn conf_send() {
let _lock = TEST_MUTEX.lock();
// No action is ok
let mut plugin = Plugin::default();
plugin
.stream_impl(
"stream".into(),
"cluster".into(),
stream_ok_port(2051).into(),
)
.await
.unwrap();
let res = plugin.finish_setup().await;
eprintln!("{res:?}");
assert!(res.is_ok());
// An action is ok
let mut plugin = Plugin::default();
plugin
.stream_impl(
"stream".into(),
"cluster".into(),
stream_ok_port(2052).into(),
)
.await
.unwrap();
plugin
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"cluster_send".into(),
json!({ "send": "message", "to": "stream" }).into(),
Vec::default(),
)
.await
.unwrap();
assert!(plugin.finish_setup().await.is_ok());
// Invalid to: option
let mut plugin = Plugin::default();
plugin
.stream_impl(
"stream".into(),
"cluster".into(),
stream_ok_port(2053).into(),
)
.await
.unwrap();
plugin
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"cluster_send".into(),
json!({ "send": "message", "to": "stream1" }).into(),
Vec::default(),
)
.await
.unwrap();
assert!(plugin.finish_setup().await.is_err());
}

View file

@ -0,0 +1,81 @@
use std::{env::set_current_dir, time::Duration};
use assert_fs::TempDir;
use reaction_plugin::{Exec, PluginInfo};
use serde_json::json;
use tokio::time::timeout;
use treedb::time::now;
use crate::{
Plugin,
tests::{PUBLIC_KEY_B, TEST_MUTEX},
};
#[tokio::test]
async fn two_nodes_simultaneous_startup() {
let _lock = TEST_MUTEX.lock();
let dir = TempDir::new().unwrap();
set_current_dir(dir).unwrap();
let mut plugin = Plugin::default();
let mut stream = plugin
.stream_impl(
"stream".into(),
"cluster".into(),
json!({
"listen_port": 2054,
"nodes": [{
"public_key": PUBLIC_KEY_B,
}]
})
.into(),
)
.await
.unwrap();
assert!(stream.standalone);
let action = plugin
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"cluster_send".into(),
json!({
"send": "message <test>",
"to": "stream",
"self": false,
})
.into(),
vec!["test".into()],
)
.await
.unwrap();
assert!(plugin.finish_setup().await.is_ok());
for m in ["test1", "test2", "test3", " a a a aa a a"] {
let time = now().into();
assert!(
action
.tx
.send(Exec {
match_: vec![m.into()],
time,
})
.await
.is_ok()
);
if self_ {
assert_eq!(
stream.stream.recv().await.unwrap().unwrap(),
(format!("message {m}"), time),
);
} else {
// Don't receive anything
assert!(
timeout(Duration::from_millis(100), stream.stream.recv())
.await
.is_err()
);
}
}
}

View file

@ -0,0 +1,29 @@
use std::sync::{LazyLock, Mutex};
use serde_json::json;
mod conf;
mod e2e;
const SECRET_KEY_A: &str = "g7U1LPq2cgGSyk6CH_v1QpoXowSFKVQ8IcFljd_ZKGw=";
const PUBLIC_KEY_A: &str = "HhVh7ghqpXM9375HZ82OOeB504HBSS25wgug-1vUggY=";
const SECRET_KEY_B: &str = "5EgRjwIpqd60IXWCGg5dFTtxkI-0fS1PlhoIhUjh1eY=";
const PUBLIC_KEY_B: &str = "LPSQ9pS7m_5vvNC-fhoBNeL2-eS2Fd6aO4ImSnXp3lc=";
// Tests that spawn a database in current directory must be run one at a time
static TEST_MUTEX: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
fn stream_ok_port(port: u16) -> serde_json::Value {
json!({
"listen_port": port,
"nodes": [{
"public_key": PUBLIC_KEY_A,
}],
"message_timeout": "30m",
})
}
fn stream_ok() -> serde_json::Value {
stream_ok_port(2048)
}

View file

@ -0,0 +1,75 @@
use std::time::Duration;
use reaction_plugin::{Exec, PluginInfo};
use serde_json::json;
use tokio::time::timeout;
use treedb::time::now;
use crate::Plugin;
use super::{TEST_MUTEX, stream_ok_port};
#[tokio::test]
async fn run_with_self() {
let _lock = TEST_MUTEX.lock();
let dir = TempDir::new().unwrap();
set_current_dir(dir).unwrap();
for self_ in [true, false] {
let mut plugin = Plugin::default();
let mut stream = plugin
.stream_impl(
"stream".into(),
"cluster".into(),
stream_ok_port(2052).into(),
)
.await
.unwrap();
assert!(stream.standalone);
let action = plugin
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"cluster_send".into(),
json!({
"send": "message <test>",
"to": "stream",
"self": self_,
})
.into(),
vec!["test".into()],
)
.await
.unwrap();
assert!(plugin.finish_setup().await.is_ok());
for m in ["test1", "test2", "test3", " a a a aa a a"] {
let time = now().into();
assert!(
action
.tx
.send(Exec {
match_: vec![m.into()],
time,
})
.await
.is_ok()
);
if self_ {
assert_eq!(
stream.stream.recv().await.unwrap().unwrap(),
(format!("message {m}"), time),
);
} else {
// Don't receive anything
assert!(
timeout(Duration::from_millis(100), stream.stream.recv())
.await
.is_err()
);
}
}
}
}

View file

@ -145,6 +145,9 @@ async fn conf_send() {
assert!(plugin.finish_setup().await.is_err());
}
// Let's allow empty streams for now.
// I guess it can be useful to have manual only actions.
//
// #[tokio::test]
// async fn conf_empty_stream() {
// let mut plugin = Plugin::default();

View file

@ -3,10 +3,16 @@ use std::{fs::read_to_string, path::Path, thread, time::Duration};
use assert_cmd::Command;
use assert_fs::prelude::*;
// require UDP ports 9876 & 9877 to be free on 127.0.0.1
const SECRET_KEY_A: &str = "g7U1LPq2cgGSyk6CH_v1QpoXowSFKVQ8IcFljd_ZKGw=";
const PUBLIC_KEY_A: &str = "HhVh7ghqpXM9375HZ82OOeB504HBSS25wgug-1vUggY=";
const SECRET_KEY_B: &str = "5EgRjwIpqd60IXWCGg5dFTtxkI-0fS1PlhoIhUjh1eY=";
const PUBLIC_KEY_B: &str = "LPSQ9pS7m_5vvNC-fhoBNeL2-eS2Fd6aO4ImSnXp3lc=";
// require UDP ports 9876-9879 to be free on 127.0.0.1
#[test]
fn plugin_cluster() {
fn plugin_cluster_same_startup() {
// First build reaction-plugin-cluster
Command::new("cargo")
.args(["build", "-p", "reaction-plugin-cluster"])
@ -14,18 +20,13 @@ fn plugin_cluster() {
let config = read_to_string("tests/test-conf/test-cluster.jsonnet").unwrap();
let secret_key_a = "g7U1LPq2cgGSyk6CH_v1QpoXowSFKVQ8IcFljd_ZKGw=";
let public_key_a = "HhVh7ghqpXM9375HZ82OOeB504HBSS25wgug-1vUggY=";
let secret_key_b = "5EgRjwIpqd60IXWCGg5dFTtxkI-0fS1PlhoIhUjh1eY=";
let public_key_b = "LPSQ9pS7m_5vvNC-fhoBNeL2-eS2Fd6aO4ImSnXp3lc=";
let config_a = config
.replace("PUBLIC_KEY", public_key_b)
.replace("PUBLIC_KEY", PUBLIC_KEY_B)
.replace("NODE", "A")
.replace("1234", "9876")
.replace("4321", "9877");
let config_b = config
.replace("PUBLIC_KEY", public_key_a)
.replace("PUBLIC_KEY", PUBLIC_KEY_A)
.replace("NODE", "B")
.replace("1234", "9877")
.replace("4321", "9876");
@ -37,8 +38,47 @@ fn plugin_cluster() {
"A a0 1", "A a0 2", "A a0 3", "A a0 4", "A b0 1", "A b0 2", "A b0 3", "A b0 4", "",
];
let a_handle = thread::spawn(|| launch_node(config_a, secret_key_a, output_a));
let b_handle = thread::spawn(|| launch_node(config_b, secret_key_b, output_b));
let a_handle = thread::spawn(|| launch_node(config_a, SECRET_KEY_A, output_a));
let b_handle = thread::spawn(|| launch_node(config_b, SECRET_KEY_B, output_b));
a_handle.join().unwrap();
b_handle.join().unwrap();
}
#[test]
fn plugin_cluster_different_startup() {
// First build reaction-plugin-cluster
Command::new("cargo")
.args(["build", "-p", "reaction-plugin-cluster"])
.unwrap();
let config = read_to_string("tests/test-conf/test-cluster.jsonnet").unwrap();
let config_a = config
.replace("PUBLIC_KEY", PUBLIC_KEY_B)
.replace("NODE", "A")
.replace("1234", "9878")
.replace("4321", "9879");
let config_b = config
.replace("PUBLIC_KEY", PUBLIC_KEY_A)
.replace("NODE", "B")
.replace("1234", "9879")
.replace("4321", "9878");
let output_a = vec![
"B a0 1", "B a0 2", "B a0 3", "B a0 4", "B b0 1", "B b0 2", "B b0 3", "B b0 4", "",
];
let output_b = vec![
"A a0 1", "A a0 2", "A a0 3", "A a0 4", "A b0 1", "A b0 2", "A b0 3", "A b0 4", "",
];
let a_handle = thread::spawn(|| launch_node(config_a, SECRET_KEY_A, output_a));
let b_handle = thread::spawn(|| {
thread::sleep(Duration::from_secs(2));
launch_node(config_b, SECRET_KEY_B, output_b);
});
// thread::sleep(Duration::from_secs(60));
a_handle.join().unwrap();
b_handle.join().unwrap();
@ -60,7 +100,7 @@ fn launch_node(config: String, my_secret: &'static str, expected_output: Vec<&'s
.write_file(Path::new("./target/debug/reaction-plugin-cluster"))
.unwrap();
Command::cargo_bin("reaction")
let output = Command::cargo_bin("reaction")
.unwrap()
.args([
"start",
@ -73,8 +113,13 @@ fn launch_node(config: String, my_secret: &'static str, expected_output: Vec<&'s
])
.current_dir(tmp_dir.path())
.timeout(Duration::from_secs(5))
.assert()
.failure();
.output()
.unwrap();
println!(
"command output:\n{}",
String::from_utf8(output.stdout).unwrap()
);
// Expected output
tmp_dir.child("log").assert(expected_output.join("\n"));