cluster: First round of fixes and tests after first run

Still not working!
This commit is contained in:
ppom 2025-11-22 12:00:00 +01:00
commit cacaa8d639
No known key found for this signature in database
8 changed files with 249 additions and 46 deletions

1
Cargo.lock generated
View file

@ -2859,6 +2859,7 @@ dependencies = [
name = "reaction-plugin-cluster"
version = "0.1.0"
dependencies = [
"assert_fs",
"chrono",
"data-encoding",
"futures",

View file

@ -70,7 +70,7 @@ tracing = "0.1.40"
[dev-dependencies]
rand = "0.8.5"
tempfile = "3.12.0"
assert_fs = "1.1.3"
assert_fs.workspace = true
assert_cmd = "2.0.17"
predicates = "3.1.3"
@ -78,6 +78,7 @@ predicates = "3.1.3"
members = ["plugins/reaction-plugin", "plugins/reaction-plugin-cluster", "plugins/reaction-plugin-virtual"]
[workspace.dependencies]
assert_fs = "1.1.3"
chrono = { version = "0.4.38", features = ["std", "clock", "serde"] }
futures = "0.3.30"
remoc = { version = "0.18.3" }

View file

@ -15,5 +15,8 @@ tokio.workspace = true
tokio.features = ["rt-multi-thread"]
data-encoding = "2.9.0"
iroh = "0.94.0"
iroh = { version = "0.94.0", default-features = false }
rand = "0.9.2"
[dev-dependencies]
assert_fs.workspace = true

View file

@ -5,6 +5,7 @@ use std::{
};
use chrono::{DateTime, Local, TimeDelta, Utc};
use futures::FutureExt;
use iroh::{
Endpoint, EndpointAddr,
endpoint::{Connection, VarInt},
@ -14,7 +15,10 @@ use remoc::{Connect, rch::base};
use serde::{Deserialize, Serialize};
use tokio::{sync::mpsc, time::sleep};
use crate::cluster::{ALPN, UtcLine};
use crate::{
cluster::{ALPN, UtcLine},
secret_key::key_bytes_to_b64,
};
const START_TIMEOUT: Duration = Duration::from_secs(5);
const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60); // 1 hour
@ -117,13 +121,20 @@ impl ConnectionManager {
tokio::pin!(tick);
let have_connection = self.connection.is_some();
let maybe_conn_rx = self.connection.as_mut().map(|conn| conn.rx.recv());
let maybe_conn_rx = self
.connection
.as_mut()
.map(|conn| conn.rx.recv().boxed())
// This Future will never be polled because of the if in select!
// It still needs to be present because the branch will be evaluated
// so we can't unwrap
.unwrap_or(false_recv().boxed());
let event = tokio::select! {
// Tick when we don't have a connection
_ = tick, if !have_connection => Some(Event::Tick),
// Receive remote message when we have a connection
msg = maybe_conn_rx.unwrap(), if have_connection => Some(Event::RemoteMessageReceived(msg)),
msg = maybe_conn_rx, if have_connection => Some(Event::RemoteMessageReceived(msg)),
// Receive a connection from EndpointManager
conn = self.connection_rx.recv() => Some(Event::ConnectionReceived(conn)),
// Receive a message from local Actions
@ -208,7 +219,8 @@ impl ConnectionManager {
if count > 0 {
eprintln!(
"DEBUG cluster {}: node {}: dropping {count} messages that reached timeout",
self.cluster_name, self.remote.id,
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes()),
)
}
}
@ -220,7 +232,8 @@ impl ConnectionManager {
None => {
eprintln!(
"DEBUG cluster {}: ConnectionManager {}: quitting because EndpointManager has quit",
self.cluster_name, self.remote.id
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes())
);
self.quit();
false
@ -243,7 +256,8 @@ impl ConnectionManager {
} else {
eprintln!(
"WARN cluster {}: ignoring incoming connection from {}, as we already have a valid connection with it",
self.cluster_name, self.remote.id
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes())
);
true
}
@ -258,11 +272,14 @@ impl ConnectionManager {
self.delta = Some(delta);
eprintln!(
"ERROR cluster {}: trying to connect to node {}: {err}",
self.cluster_name, self.remote.id
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes())
);
eprintln!(
"INFO cluster {}: retry connecting to node {} in {:?}",
self.cluster_name, self.remote.id, self.delta
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes()),
self.delta
);
}
@ -274,7 +291,8 @@ impl ConnectionManager {
Err(err) => {
eprintln!(
"WARN cluster {}: error receiving message from node {}: {err}",
self.cluster_name, self.remote.id
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes())
);
self.close_connection(1, b"error receiving from your stream")
.await;
@ -282,20 +300,23 @@ impl ConnectionManager {
Ok(None) => {
eprintln!(
"WARN cluster {}: node {} closed its stream",
self.cluster_name, self.remote.id
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes())
);
self.close_connection(1, b"you closed your stream").await;
}
Ok(Some(RemoteMessage::Version(_))) => {
eprintln!(
"WARN cluster {}: node {} sent invalid message, ignoring",
self.cluster_name, self.remote.id
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes())
);
}
Ok(Some(RemoteMessage::Quitting)) => {
eprintln!(
"INFO cluster {}: node {} is quitting, bye bye",
self.cluster_name, self.remote.id
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes())
);
self.close_connection(0, b"you said you'll quit so I quit")
.await;
@ -338,7 +359,8 @@ impl ConnectionManager {
{
eprintln!(
"INFO cluster {}: connection with node {} failed: {err}",
self.cluster_name, self.remote.id
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes())
);
self.message_queue.push_back(message);
self.close_connection(
@ -438,3 +460,7 @@ async fn open_channels(
Err(err) => Err(format!("could not receive message: {err}")),
}
}
async fn false_recv() -> Result<Option<RemoteMessage>, remoc::rch::base::RecvError> {
Ok(None)
}

View file

@ -145,7 +145,7 @@ impl PluginInfo for Plugin {
);
}
let secret_key = secret_key(&stream_name).await?;
let secret_key = secret_key(".", &stream_name).await?;
eprintln!(
"INFO public key of this node for cluster {stream_name}: {}",
key_bytes_to_b64(secret_key.public().as_bytes())
@ -183,7 +183,7 @@ impl PluginInfo for Plugin {
config: Value,
patterns: Vec<String>,
) -> RemoteResult<ActionImpl> {
if &action_type != "cluster" {
if &action_type != "cluster_send" {
return Err("This plugin can't handle other action types than cluster".into());
}

View file

@ -7,22 +7,23 @@ use tokio::{
io::AsyncWriteExt,
};
fn secret_key_path(cluster_name: &str) -> String {
format!("./secret_key_{cluster_name}.txt")
fn secret_key_path(dir: &str, cluster_name: &str) -> String {
format!("{dir}/secret_key_{cluster_name}.txt")
}
pub async fn secret_key(cluster_name: &str) -> Result<SecretKey, String> {
if let Some(key) = get_secret_key(cluster_name).await? {
pub async fn secret_key(dir: &str, cluster_name: &str) -> Result<SecretKey, String> {
let path = secret_key_path(dir, cluster_name);
if let Some(key) = get_secret_key(&path).await? {
Ok(key)
} else {
let key = SecretKey::generate(&mut rand::rng());
set_secret_key(cluster_name, &key).await?;
set_secret_key(&path, &key).await?;
Ok(key)
}
}
async fn get_secret_key(cluster_name: &str) -> Result<Option<SecretKey>, String> {
let key = match fs::read_to_string(secret_key_path(cluster_name)).await {
async fn get_secret_key(path: &str) -> Result<Option<SecretKey>, String> {
let key = match fs::read_to_string(path).await {
Ok(key) => Ok(key),
Err(err) => match err.kind() {
io::ErrorKind::NotFound => return Ok(None),
@ -32,26 +33,24 @@ async fn get_secret_key(cluster_name: &str) -> Result<Option<SecretKey>, String>
let bytes = match key_b64_to_bytes(&key) {
Ok(key) => Ok(key),
Err(err) => Err(format!(
"invalid secret key read from file: {err}. Please remove the `{}` file from plugin directory.",
secret_key_path(cluster_name),
"invalid secret key read from file: {err}. Please remove the `{path}` file from plugin directory.",
)),
}?;
Ok(Some(SecretKey::from_bytes(&bytes)))
}
async fn set_secret_key(cluster_name: &str, key: &SecretKey) -> Result<(), String> {
async fn set_secret_key(path: &str, key: &SecretKey) -> Result<(), String> {
let secret_key = key_bytes_to_b64(&key.to_bytes());
let secret_key_path = secret_key_path(cluster_name);
File::options()
.mode(0o600)
.write(true)
.create(true)
.open(&secret_key_path)
.open(path)
.await
.map_err(|err| format!("can't open `{secret_key_path}` in plugin directory: {err}"))?
.map_err(|err| format!("can't open `{path}` in plugin directory: {err}"))?
.write_all(secret_key.as_bytes())
.await
.map_err(|err| format!("can't write to `{secret_key_path}` in plugin directory: {err}"))
.map_err(|err| format!("can't write to `{path}` in plugin directory: {err}"))
}
pub fn key_b64_to_bytes(key: &str) -> Result<[u8; 32], DecodeError> {
@ -72,3 +71,100 @@ pub fn key_b64_to_bytes(key: &str) -> Result<[u8; 32], DecodeError> {
pub fn key_bytes_to_b64(key: &[u8; 32]) -> String {
data_encoding::BASE64URL.encode(key)
}
#[cfg(test)]
mod tests {
use assert_fs::{
TempDir,
prelude::{FileWriteStr, PathChild},
};
use iroh::{PublicKey, SecretKey};
use tokio::fs::read_to_string;
use crate::secret_key::{
get_secret_key, key_b64_to_bytes, key_bytes_to_b64, secret_key_path, set_secret_key,
};
#[test]
fn secret_key_encode_decode() {
for (secret_key, public_key) in [
(
"g7U1LPq2cgGSyk6CH_v1QpoXowSFKVQ8IcFljd_ZKGw=",
"HhVh7ghqpXM9375HZ82OOeB504HBSS25wgug-1vUggY=",
),
(
"5EgRjwIpqd60IXWCGg5dFTtxkI-0fS1PlhoIhUjh1eY=",
"LPSQ9pS7m_5vvNC-fhoBNeL2-eS2Fd6aO4ImSnXp3lc=",
),
] {
assert_eq!(
secret_key,
&key_bytes_to_b64(&key_b64_to_bytes(secret_key).unwrap())
);
assert_eq!(
public_key,
&key_bytes_to_b64(&key_b64_to_bytes(public_key).unwrap())
);
let secret_key_parsed = SecretKey::from_bytes(&key_b64_to_bytes(secret_key).unwrap());
let public_key_parsed =
PublicKey::from_bytes(&key_b64_to_bytes(public_key).unwrap()).unwrap();
assert_eq!(secret_key_parsed.public(), public_key_parsed);
}
}
#[tokio::test]
async fn secret_key_get() {
let tmp_dir = TempDir::new().unwrap();
let tmp_dir_str = tmp_dir.to_str().unwrap();
for (secret_key, cluster_name) in [
("g7U1LPq2cgGSyk6CH_v1QpoXowSFKVQ8IcFljd_ZKGw=", "my_cluster"),
("5EgRjwIpqd60IXWCGg5dFTtxkI-0fS1PlhoIhUjh1eY=", "name"),
] {
tmp_dir
.child(&format!("secret_key_{cluster_name}.txt"))
.write_str(secret_key)
.unwrap();
let secret_key_parsed = SecretKey::from_bytes(&key_b64_to_bytes(secret_key).unwrap());
let path = secret_key_path(tmp_dir_str, cluster_name);
let secret_key_from_file = get_secret_key(&path).await.unwrap();
assert_eq!(
secret_key_parsed.to_bytes(),
secret_key_from_file.unwrap().to_bytes()
)
}
assert_eq!(
Ok(None),
get_secret_key(&format!("{tmp_dir_str}/non_existent"))
.await
// Can't compare secret keys so we map to bytes
// even if we don't want one
.map(|opt| opt.map(|pk| pk.to_bytes()))
);
// Will fail if we're root, but who runs this as root??
assert!(
get_secret_key(&format!("/root/non_existent"))
.await
.is_err()
);
}
#[tokio::test]
async fn secret_key_set() {
let tmp_dir = TempDir::new().unwrap();
let tmp_dir_str = tmp_dir.to_str().unwrap();
let path = format!("{tmp_dir_str}/secret");
let key = SecretKey::generate(&mut rand::rng());
assert!(set_secret_key(&path, &key).await.is_ok());
let read_file = read_to_string(&path).await;
assert!(read_file.is_ok());
assert_eq!(read_file.unwrap(), key_bytes_to_b64(&key.to_bytes()));
}
}

82
tests/plugin_cluster.rs Normal file
View file

@ -0,0 +1,82 @@
use std::{path::Path, thread::sleep, time::Duration};
use assert_cmd::Command;
use assert_fs::prelude::*;
use predicates::prelude::predicate;
use tokio::{fs::read_to_string, runtime::Handle};
// require UDP ports 9876 & 9877 to be free on 127.0.0.1
#[tokio::test]
async fn plugin_cluster() {
// First build reaction-plugin-cluster
Command::new("cargo")
.args(["build", "-p", "reaction-plugin-cluster"])
.unwrap();
let config = read_to_string("tests/test-conf/test-cluster.jsonnet")
.await
.unwrap();
let secret_key_a = "g7U1LPq2cgGSyk6CH_v1QpoXowSFKVQ8IcFljd_ZKGw=";
let public_key_a = "HhVh7ghqpXM9375HZ82OOeB504HBSS25wgug-1vUggY=";
let secret_key_b = "5EgRjwIpqd60IXWCGg5dFTtxkI-0fS1PlhoIhUjh1eY=";
let public_key_b = "LPSQ9pS7m_5vvNC-fhoBNeL2-eS2Fd6aO4ImSnXp3lc=";
let config_a = config
.replace("PUBLIC_KEY", public_key_b)
.replace("NODE", "A")
.replace("1234", "9876")
.replace("4321", "9877");
let config_b = config
.replace("PUBLIC_KEY", public_key_a)
.replace("NODE", "B")
.replace("1234", "9877")
.replace("4321", "9876");
let output_a = vec![
"a0 1", "a0 2", "a0 3", "a0 4", "b0 1", "b0 2", "b0 3", "b0 4", "",
];
let output_b = vec![
"a0 1", "a0 2", "a0 3", "a0 4", "b0 1", "b0 2", "b0 3", "b0 4", "",
];
let runtime = Handle::current();
let a_handle = runtime.spawn_blocking(|| launch_node(config_a, secret_key_a, output_a));
let b_handle = runtime.spawn_blocking(|| launch_node(config_b, secret_key_b, output_b));
let (a_res, b_res) = tokio::join!(a_handle, b_handle);
a_res.unwrap();
b_res.unwrap();
}
fn launch_node(config: String, my_secret: &'static str, expected_output: Vec<&'static str>) {
let tmp_dir = assert_fs::TempDir::new().unwrap();
// Write node config
tmp_dir.child("config.jsonnet").write_str(&config).unwrap();
tmp_dir
.child("plugin_data/cluster/secret_key_s1")
.write_str(my_secret)
.unwrap();
// Copy cluster plugin
tmp_dir
.child("./target/debug/reaction-plugin-cluster")
.write_file(Path::new("./target/debug/reaction-plugin-cluster"))
.unwrap();
sleep(Duration::from_secs(10));
Command::cargo_bin("reaction")
.unwrap()
.args(["start", "--socket", "./s", "--config", "./config.jsonnet"])
.current_dir(tmp_dir.path())
.timeout(Duration::from_secs(5))
// Expected exit 1: all stream exited
.assert()
.code(predicate::eq(1));
// Expected output
tmp_dir.child("log").assert(expected_output.join("\n"));
tmp_dir.child("log").write_str("").unwrap();
}

View file

@ -28,18 +28,10 @@
a0: {
type: 'cluster_send',
options: {
send: 'a0 <num>',
send: 'NODE a0 <num>',
to: 's1',
},
},
b0: {
type: 'cluster_send',
options: {
send: 'b0 <num>',
to: 's1',
},
after: '600ms',
},
},
},
},
@ -47,12 +39,14 @@
s1: {
type: 'cluster',
options: {
listen_port: 9000,
shared_secret: '',
nodes: {
publickey: '',
addresses: ['127.0.0.1:9001'],
},
listen_port: 1234,
bind_ipv4: '127.0.0.1',
bind_ipv6: null,
message_timeout: '30s',
nodes: [{
public_key: 'PUBLIC_KEY',
addresses: ['127.0.0.1:4321'],
}],
},
filters: {
f1: {