diff --git a/tests/plugin_cluster.rs b/tests/plugin_cluster.rs index 176a49c..524ba89 100644 --- a/tests/plugin_cluster.rs +++ b/tests/plugin_cluster.rs @@ -1,23 +1,18 @@ -use std::{path::Path, thread::sleep, time::Duration}; +use std::{fs::read_to_string, path::Path, thread, time::Duration}; use assert_cmd::Command; use assert_fs::prelude::*; -use predicates::prelude::predicate; -use tokio::{fs::read_to_string, runtime::Handle}; // require UDP ports 9876 & 9877 to be free on 127.0.0.1 -#[tokio::test] -#[ignore = "currently failing"] // FIXME -async fn plugin_cluster() { +#[test] +fn plugin_cluster() { // First build reaction-plugin-cluster Command::new("cargo") .args(["build", "-p", "reaction-plugin-cluster"]) .unwrap(); - let config = read_to_string("tests/test-conf/test-cluster.jsonnet") - .await - .unwrap(); + let config = read_to_string("tests/test-conf/test-cluster.jsonnet").unwrap(); let secret_key_a = "g7U1LPq2cgGSyk6CH_v1QpoXowSFKVQ8IcFljd_ZKGw="; let public_key_a = "HhVh7ghqpXM9375HZ82OOeB504HBSS25wgug-1vUggY="; @@ -36,19 +31,17 @@ async fn plugin_cluster() { .replace("4321", "9876"); let output_a = vec![ - "a0 1", "a0 2", "a0 3", "a0 4", "b0 1", "b0 2", "b0 3", "b0 4", "", + "B a0 1", "B a0 2", "B a0 3", "B a0 4", "B b0 1", "B b0 2", "B b0 3", "B b0 4", "", ]; let output_b = vec![ - "a0 1", "a0 2", "a0 3", "a0 4", "b0 1", "b0 2", "b0 3", "b0 4", "", + "A a0 1", "A a0 2", "A a0 3", "A a0 4", "A b0 1", "A b0 2", "A b0 3", "A b0 4", "", ]; - let runtime = Handle::current(); - let a_handle = runtime.spawn_blocking(|| launch_node(config_a, secret_key_a, output_a)); - let b_handle = runtime.spawn_blocking(|| launch_node(config_b, secret_key_b, output_b)); + let a_handle = thread::spawn(|| launch_node(config_a, secret_key_a, output_a)); + let b_handle = thread::spawn(|| launch_node(config_b, secret_key_b, output_b)); - let (a_res, b_res) = tokio::join!(a_handle, b_handle); - a_res.unwrap(); - b_res.unwrap(); + a_handle.join().unwrap(); + b_handle.join().unwrap(); } fn launch_node(config: String, my_secret: &'static str, expected_output: Vec<&'static str>) { @@ -57,7 +50,7 @@ fn launch_node(config: String, my_secret: &'static str, expected_output: Vec<&'s // Write node config tmp_dir.child("config.jsonnet").write_str(&config).unwrap(); tmp_dir - .child("plugin_data/cluster/secret_key_s1") + .child("plugin_data/cluster/secret_key_s1.txt") .write_str(my_secret) .unwrap(); @@ -67,17 +60,22 @@ fn launch_node(config: String, my_secret: &'static str, expected_output: Vec<&'s .write_file(Path::new("./target/debug/reaction-plugin-cluster")) .unwrap(); - sleep(Duration::from_secs(10)); Command::cargo_bin("reaction") .unwrap() - .args(["start", "--socket", "./s", "--config", "./config.jsonnet"]) + .args([ + "start", + "--socket", + "./s", + "--config", + "./config.jsonnet", + "-l", + "DEBUG", + ]) .current_dir(tmp_dir.path()) .timeout(Duration::from_secs(5)) - // Expected exit 1: all stream exited .assert() - .code(predicate::eq(1)); + .failure(); // Expected output tmp_dir.child("log").assert(expected_output.join("\n")); - tmp_dir.child("log").write_str("").unwrap(); } diff --git a/tests/test-conf/test-cluster.jsonnet b/tests/test-conf/test-cluster.jsonnet index b4cb403..36e7d76 100644 --- a/tests/test-conf/test-cluster.jsonnet +++ b/tests/test-conf/test-cluster.jsonnet @@ -20,7 +20,7 @@ streams: { s0: { - cmd: ['bash', '-c', 'for i in $(seq 4); do echo $i; sleep 0.1; done; sleep 1.2'], + cmd: ['bash', '-c', 'sleep 1; for i in $(seq 4); do echo $i; sleep 0.1; done'], filters: { f0: { regex: ['^$'], @@ -32,6 +32,14 @@ to: 's1', }, }, + b0: { + type: 'cluster_send', + options: { + send: 'NODE b0 ', + to: 's1', + }, + after: '1s', + }, }, }, },