Tests on a cluster of 2 nodes

This commit is contained in:
ppom 2025-12-24 12:00:00 +01:00
commit 5bfcf318c7
No known key found for this signature in database
3 changed files with 78 additions and 43 deletions

View file

@ -1,38 +1,37 @@
use std::{env::set_current_dir, time::Duration};
use assert_fs::TempDir;
use reaction_plugin::{Exec, PluginInfo};
use serde_json::json;
use reaction_plugin::{ActionImpl, Exec, PluginInfo, RemoteResult, StreamImpl};
use serde_json::{Value, json};
use tokio::time::timeout;
use treedb::time::now;
use crate::{
Plugin,
tests::{PUBLIC_KEY_B, TEST_MUTEX},
tests::{PUBLIC_KEY_A, PUBLIC_KEY_B, TEST_MUTEX},
};
#[tokio::test]
async fn two_nodes_simultaneous_startup() {
let _lock = TEST_MUTEX.lock();
let dir = TempDir::new().unwrap();
set_current_dir(dir).unwrap();
let mut plugin = Plugin::default();
let mut stream = plugin
// TODO
// random stream name
// write secret key at correct file path
// pass a complete list of pubkey, privkey, port and generate according confs
async fn stream_action(
plugin: &mut Plugin,
name: &str,
port: u16,
nodes: Value,
) -> RemoteResult<(StreamImpl, ActionImpl)> {
let stream = plugin
.stream_impl(
"stream".into(),
"cluster".into(),
json!({
"listen_port": 2054,
"nodes": [{
"public_key": PUBLIC_KEY_B,
}]
"listen_port": port,
"nodes": nodes,
})
.into(),
)
.await
.unwrap();
assert!(stream.standalone);
.await?;
let action = plugin
.action_impl(
@ -41,40 +40,74 @@ async fn two_nodes_simultaneous_startup() {
"action".into(),
"cluster_send".into(),
json!({
"send": "message <test>",
"send": format!("from {name}: <test>"),
"to": "stream",
"self": false,
})
.into(),
vec!["test".into()],
)
.await
.unwrap();
assert!(plugin.finish_setup().await.is_ok());
.await?;
for m in ["test1", "test2", "test3", " a a a aa a a"] {
Ok((stream, action))
}
#[tokio::test]
async fn two_nodes_simultaneous_startup() {
let _lock = TEST_MUTEX.lock();
let dir = TempDir::new().unwrap();
set_current_dir(dir).unwrap();
let mut plugin_a = Plugin::default();
let (mut stream_a, action_a) = stream_action(
&mut plugin_a,
"a",
2054,
json!([{
"public_key": PUBLIC_KEY_B,
"addresses": ["127.0.0.1:2055"],
}]),
)
.await
.unwrap();
plugin_a.finish_setup().await.unwrap();
let mut plugin_b = Plugin::default();
let (mut stream_b, action_b) = stream_action(
&mut plugin_b,
"a",
2055,
json!([{
"public_key": PUBLIC_KEY_A,
"addresses": ["127.0.0.1:2054"],
}]),
)
.await
.unwrap();
plugin_b.finish_setup().await.unwrap();
for m in ["test1", "test2", "test3"] {
let time = now().into();
assert!(
action
.tx
.send(Exec {
match_: vec![m.into()],
time,
})
.await
.is_ok()
);
if self_ {
assert_eq!(
stream.stream.recv().await.unwrap().unwrap(),
(format!("message {m}"), time),
);
} else {
// Don't receive anything
for (stream, action, from) in [
(&mut stream_a, &action_b, "a"),
(&mut stream_b, &action_a, "b"),
] {
assert!(
action
.tx
.send(Exec {
match_: vec![m.into()],
time,
})
.await
.is_ok()
);
assert_eq!(
timeout(Duration::from_millis(100), stream.stream.recv())
.await
.is_err()
.unwrap()
.unwrap(),
Some((format!("from {from}: {m}"), time)),
);
}
}

View file

@ -4,6 +4,7 @@ use serde_json::json;
mod conf;
mod e2e;
mod self_;
const SECRET_KEY_A: &str = "g7U1LPq2cgGSyk6CH_v1QpoXowSFKVQ8IcFljd_ZKGw=";
const PUBLIC_KEY_A: &str = "HhVh7ghqpXM9375HZ82OOeB504HBSS25wgug-1vUggY=";

View file

@ -1,5 +1,6 @@
use std::time::Duration;
use std::{env::set_current_dir, time::Duration};
use assert_fs::TempDir;
use reaction_plugin::{Exec, PluginInfo};
use serde_json::json;
use tokio::time::timeout;