reaction/tests/plugin_cluster.rs
2025-12-07 12:00:00 +01:00

82 lines
2.7 KiB
Rust

use std::{path::Path, thread::sleep, time::Duration};
use assert_cmd::Command;
use assert_fs::prelude::*;
use predicates::prelude::predicate;
use tokio::{fs::read_to_string, runtime::Handle};
// require UDP ports 9876 & 9877 to be free on 127.0.0.1
#[tokio::test]
async fn plugin_cluster() {
// First build reaction-plugin-cluster
Command::new("cargo")
.args(["build", "-p", "reaction-plugin-cluster"])
.unwrap();
let config = read_to_string("tests/test-conf/test-cluster.jsonnet")
.await
.unwrap();
let secret_key_a = "g7U1LPq2cgGSyk6CH_v1QpoXowSFKVQ8IcFljd_ZKGw=";
let public_key_a = "HhVh7ghqpXM9375HZ82OOeB504HBSS25wgug-1vUggY=";
let secret_key_b = "5EgRjwIpqd60IXWCGg5dFTtxkI-0fS1PlhoIhUjh1eY=";
let public_key_b = "LPSQ9pS7m_5vvNC-fhoBNeL2-eS2Fd6aO4ImSnXp3lc=";
let config_a = config
.replace("PUBLIC_KEY", public_key_b)
.replace("NODE", "A")
.replace("1234", "9876")
.replace("4321", "9877");
let config_b = config
.replace("PUBLIC_KEY", public_key_a)
.replace("NODE", "B")
.replace("1234", "9877")
.replace("4321", "9876");
let output_a = vec![
"a0 1", "a0 2", "a0 3", "a0 4", "b0 1", "b0 2", "b0 3", "b0 4", "",
];
let output_b = vec![
"a0 1", "a0 2", "a0 3", "a0 4", "b0 1", "b0 2", "b0 3", "b0 4", "",
];
let runtime = Handle::current();
let a_handle = runtime.spawn_blocking(|| launch_node(config_a, secret_key_a, output_a));
let b_handle = runtime.spawn_blocking(|| launch_node(config_b, secret_key_b, output_b));
let (a_res, b_res) = tokio::join!(a_handle, b_handle);
a_res.unwrap();
b_res.unwrap();
}
fn launch_node(config: String, my_secret: &'static str, expected_output: Vec<&'static str>) {
let tmp_dir = assert_fs::TempDir::new().unwrap();
// Write node config
tmp_dir.child("config.jsonnet").write_str(&config).unwrap();
tmp_dir
.child("plugin_data/cluster/secret_key_s1")
.write_str(my_secret)
.unwrap();
// Copy cluster plugin
tmp_dir
.child("./target/debug/reaction-plugin-cluster")
.write_file(Path::new("./target/debug/reaction-plugin-cluster"))
.unwrap();
sleep(Duration::from_secs(10));
Command::cargo_bin("reaction")
.unwrap()
.args(["start", "--socket", "./s", "--config", "./config.jsonnet"])
.current_dir(tmp_dir.path())
.timeout(Duration::from_secs(5))
// Expected exit 1: all stream exited
.assert()
.code(predicate::eq(1));
// Expected output
tmp_dir.child("log").assert(expected_output.join("\n"));
tmp_dir.child("log").write_str("").unwrap();
}