diff --git a/Cargo.lock b/Cargo.lock index bb17bba..6722383 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2859,6 +2859,7 @@ dependencies = [ name = "reaction-plugin-cluster" version = "0.1.0" dependencies = [ + "assert_fs", "chrono", "data-encoding", "futures", diff --git a/Cargo.toml b/Cargo.toml index 822a086..51c2a45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,7 @@ tracing = "0.1.40" [dev-dependencies] rand = "0.8.5" tempfile = "3.12.0" -assert_fs = "1.1.3" +assert_fs.workspace = true assert_cmd = "2.0.17" predicates = "3.1.3" @@ -78,6 +78,7 @@ predicates = "3.1.3" members = ["plugins/reaction-plugin", "plugins/reaction-plugin-cluster", "plugins/reaction-plugin-virtual"] [workspace.dependencies] +assert_fs = "1.1.3" chrono = { version = "0.4.38", features = ["std", "clock", "serde"] } futures = "0.3.30" remoc = { version = "0.18.3" } diff --git a/plugins/reaction-plugin-cluster/Cargo.toml b/plugins/reaction-plugin-cluster/Cargo.toml index 4ba08a5..fe78995 100644 --- a/plugins/reaction-plugin-cluster/Cargo.toml +++ b/plugins/reaction-plugin-cluster/Cargo.toml @@ -15,5 +15,8 @@ tokio.workspace = true tokio.features = ["rt-multi-thread"] data-encoding = "2.9.0" -iroh = "0.94.0" +iroh = { version = "0.94.0", default-features = false } rand = "0.9.2" + +[dev-dependencies] +assert_fs.workspace = true diff --git a/plugins/reaction-plugin-cluster/src/connection.rs b/plugins/reaction-plugin-cluster/src/connection.rs index 02d792a..bbfb1ed 100644 --- a/plugins/reaction-plugin-cluster/src/connection.rs +++ b/plugins/reaction-plugin-cluster/src/connection.rs @@ -5,6 +5,7 @@ use std::{ }; use chrono::{DateTime, Local, TimeDelta, Utc}; +use futures::FutureExt; use iroh::{ Endpoint, EndpointAddr, endpoint::{Connection, VarInt}, @@ -14,7 +15,10 @@ use remoc::{Connect, rch::base}; use serde::{Deserialize, Serialize}; use tokio::{sync::mpsc, time::sleep}; -use crate::cluster::{ALPN, UtcLine}; +use crate::{ + cluster::{ALPN, UtcLine}, + secret_key::key_bytes_to_b64, +}; const START_TIMEOUT: Duration = Duration::from_secs(5); const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60); // 1 hour @@ -117,13 +121,20 @@ impl ConnectionManager { tokio::pin!(tick); let have_connection = self.connection.is_some(); - let maybe_conn_rx = self.connection.as_mut().map(|conn| conn.rx.recv()); + let maybe_conn_rx = self + .connection + .as_mut() + .map(|conn| conn.rx.recv().boxed()) + // This Future will never be polled because of the if in select! + // It still needs to be present because the branch will be evaluated + // so we can't unwrap + .unwrap_or(false_recv().boxed()); let event = tokio::select! { // Tick when we don't have a connection _ = tick, if !have_connection => Some(Event::Tick), // Receive remote message when we have a connection - msg = maybe_conn_rx.unwrap(), if have_connection => Some(Event::RemoteMessageReceived(msg)), + msg = maybe_conn_rx, if have_connection => Some(Event::RemoteMessageReceived(msg)), // Receive a connection from EndpointManager conn = self.connection_rx.recv() => Some(Event::ConnectionReceived(conn)), // Receive a message from local Actions @@ -208,7 +219,8 @@ impl ConnectionManager { if count > 0 { eprintln!( "DEBUG cluster {}: node {}: dropping {count} messages that reached timeout", - self.cluster_name, self.remote.id, + self.cluster_name, + key_bytes_to_b64(self.remote.id.as_bytes()), ) } } @@ -220,7 +232,8 @@ impl ConnectionManager { None => { eprintln!( "DEBUG cluster {}: ConnectionManager {}: quitting because EndpointManager has quit", - self.cluster_name, self.remote.id + self.cluster_name, + key_bytes_to_b64(self.remote.id.as_bytes()) ); self.quit(); false @@ -243,7 +256,8 @@ impl ConnectionManager { } else { eprintln!( "WARN cluster {}: ignoring incoming connection from {}, as we already have a valid connection with it", - self.cluster_name, self.remote.id + self.cluster_name, + key_bytes_to_b64(self.remote.id.as_bytes()) ); true } @@ -258,11 +272,14 @@ impl ConnectionManager { self.delta = Some(delta); eprintln!( "ERROR cluster {}: trying to connect to node {}: {err}", - self.cluster_name, self.remote.id + self.cluster_name, + key_bytes_to_b64(self.remote.id.as_bytes()) ); eprintln!( "INFO cluster {}: retry connecting to node {} in {:?}", - self.cluster_name, self.remote.id, self.delta + self.cluster_name, + key_bytes_to_b64(self.remote.id.as_bytes()), + self.delta ); } @@ -274,7 +291,8 @@ impl ConnectionManager { Err(err) => { eprintln!( "WARN cluster {}: error receiving message from node {}: {err}", - self.cluster_name, self.remote.id + self.cluster_name, + key_bytes_to_b64(self.remote.id.as_bytes()) ); self.close_connection(1, b"error receiving from your stream") .await; @@ -282,20 +300,23 @@ impl ConnectionManager { Ok(None) => { eprintln!( "WARN cluster {}: node {} closed its stream", - self.cluster_name, self.remote.id + self.cluster_name, + key_bytes_to_b64(self.remote.id.as_bytes()) ); self.close_connection(1, b"you closed your stream").await; } Ok(Some(RemoteMessage::Version(_))) => { eprintln!( "WARN cluster {}: node {} sent invalid message, ignoring", - self.cluster_name, self.remote.id + self.cluster_name, + key_bytes_to_b64(self.remote.id.as_bytes()) ); } Ok(Some(RemoteMessage::Quitting)) => { eprintln!( "INFO cluster {}: node {} is quitting, bye bye", - self.cluster_name, self.remote.id + self.cluster_name, + key_bytes_to_b64(self.remote.id.as_bytes()) ); self.close_connection(0, b"you said you'll quit so I quit") .await; @@ -338,7 +359,8 @@ impl ConnectionManager { { eprintln!( "INFO cluster {}: connection with node {} failed: {err}", - self.cluster_name, self.remote.id + self.cluster_name, + key_bytes_to_b64(self.remote.id.as_bytes()) ); self.message_queue.push_back(message); self.close_connection( @@ -438,3 +460,7 @@ async fn open_channels( Err(err) => Err(format!("could not receive message: {err}")), } } + +async fn false_recv() -> Result, remoc::rch::base::RecvError> { + Ok(None) +} diff --git a/plugins/reaction-plugin-cluster/src/main.rs b/plugins/reaction-plugin-cluster/src/main.rs index b27e335..b8e8198 100644 --- a/plugins/reaction-plugin-cluster/src/main.rs +++ b/plugins/reaction-plugin-cluster/src/main.rs @@ -145,7 +145,7 @@ impl PluginInfo for Plugin { ); } - let secret_key = secret_key(&stream_name).await?; + let secret_key = secret_key(".", &stream_name).await?; eprintln!( "INFO public key of this node for cluster {stream_name}: {}", key_bytes_to_b64(secret_key.public().as_bytes()) @@ -183,7 +183,7 @@ impl PluginInfo for Plugin { config: Value, patterns: Vec, ) -> RemoteResult { - if &action_type != "cluster" { + if &action_type != "cluster_send" { return Err("This plugin can't handle other action types than cluster".into()); } diff --git a/plugins/reaction-plugin-cluster/src/secret_key.rs b/plugins/reaction-plugin-cluster/src/secret_key.rs index 5545b86..6397611 100644 --- a/plugins/reaction-plugin-cluster/src/secret_key.rs +++ b/plugins/reaction-plugin-cluster/src/secret_key.rs @@ -7,22 +7,23 @@ use tokio::{ io::AsyncWriteExt, }; -fn secret_key_path(cluster_name: &str) -> String { - format!("./secret_key_{cluster_name}.txt") +fn secret_key_path(dir: &str, cluster_name: &str) -> String { + format!("{dir}/secret_key_{cluster_name}.txt") } -pub async fn secret_key(cluster_name: &str) -> Result { - if let Some(key) = get_secret_key(cluster_name).await? { +pub async fn secret_key(dir: &str, cluster_name: &str) -> Result { + let path = secret_key_path(dir, cluster_name); + if let Some(key) = get_secret_key(&path).await? { Ok(key) } else { let key = SecretKey::generate(&mut rand::rng()); - set_secret_key(cluster_name, &key).await?; + set_secret_key(&path, &key).await?; Ok(key) } } -async fn get_secret_key(cluster_name: &str) -> Result, String> { - let key = match fs::read_to_string(secret_key_path(cluster_name)).await { +async fn get_secret_key(path: &str) -> Result, String> { + let key = match fs::read_to_string(path).await { Ok(key) => Ok(key), Err(err) => match err.kind() { io::ErrorKind::NotFound => return Ok(None), @@ -32,26 +33,24 @@ async fn get_secret_key(cluster_name: &str) -> Result, String> let bytes = match key_b64_to_bytes(&key) { Ok(key) => Ok(key), Err(err) => Err(format!( - "invalid secret key read from file: {err}. Please remove the `{}` file from plugin directory.", - secret_key_path(cluster_name), + "invalid secret key read from file: {err}. Please remove the `{path}` file from plugin directory.", )), }?; Ok(Some(SecretKey::from_bytes(&bytes))) } -async fn set_secret_key(cluster_name: &str, key: &SecretKey) -> Result<(), String> { +async fn set_secret_key(path: &str, key: &SecretKey) -> Result<(), String> { let secret_key = key_bytes_to_b64(&key.to_bytes()); - let secret_key_path = secret_key_path(cluster_name); File::options() .mode(0o600) .write(true) .create(true) - .open(&secret_key_path) + .open(path) .await - .map_err(|err| format!("can't open `{secret_key_path}` in plugin directory: {err}"))? + .map_err(|err| format!("can't open `{path}` in plugin directory: {err}"))? .write_all(secret_key.as_bytes()) .await - .map_err(|err| format!("can't write to `{secret_key_path}` in plugin directory: {err}")) + .map_err(|err| format!("can't write to `{path}` in plugin directory: {err}")) } pub fn key_b64_to_bytes(key: &str) -> Result<[u8; 32], DecodeError> { @@ -72,3 +71,100 @@ pub fn key_b64_to_bytes(key: &str) -> Result<[u8; 32], DecodeError> { pub fn key_bytes_to_b64(key: &[u8; 32]) -> String { data_encoding::BASE64URL.encode(key) } + +#[cfg(test)] +mod tests { + use assert_fs::{ + TempDir, + prelude::{FileWriteStr, PathChild}, + }; + use iroh::{PublicKey, SecretKey}; + use tokio::fs::read_to_string; + + use crate::secret_key::{ + get_secret_key, key_b64_to_bytes, key_bytes_to_b64, secret_key_path, set_secret_key, + }; + + #[test] + fn secret_key_encode_decode() { + for (secret_key, public_key) in [ + ( + "g7U1LPq2cgGSyk6CH_v1QpoXowSFKVQ8IcFljd_ZKGw=", + "HhVh7ghqpXM9375HZ82OOeB504HBSS25wgug-1vUggY=", + ), + ( + "5EgRjwIpqd60IXWCGg5dFTtxkI-0fS1PlhoIhUjh1eY=", + "LPSQ9pS7m_5vvNC-fhoBNeL2-eS2Fd6aO4ImSnXp3lc=", + ), + ] { + assert_eq!( + secret_key, + &key_bytes_to_b64(&key_b64_to_bytes(secret_key).unwrap()) + ); + assert_eq!( + public_key, + &key_bytes_to_b64(&key_b64_to_bytes(public_key).unwrap()) + ); + + let secret_key_parsed = SecretKey::from_bytes(&key_b64_to_bytes(secret_key).unwrap()); + let public_key_parsed = + PublicKey::from_bytes(&key_b64_to_bytes(public_key).unwrap()).unwrap(); + + assert_eq!(secret_key_parsed.public(), public_key_parsed); + } + } + + #[tokio::test] + async fn secret_key_get() { + let tmp_dir = TempDir::new().unwrap(); + let tmp_dir_str = tmp_dir.to_str().unwrap(); + for (secret_key, cluster_name) in [ + ("g7U1LPq2cgGSyk6CH_v1QpoXowSFKVQ8IcFljd_ZKGw=", "my_cluster"), + ("5EgRjwIpqd60IXWCGg5dFTtxkI-0fS1PlhoIhUjh1eY=", "name"), + ] { + tmp_dir + .child(&format!("secret_key_{cluster_name}.txt")) + .write_str(secret_key) + .unwrap(); + + let secret_key_parsed = SecretKey::from_bytes(&key_b64_to_bytes(secret_key).unwrap()); + + let path = secret_key_path(tmp_dir_str, cluster_name); + let secret_key_from_file = get_secret_key(&path).await.unwrap(); + + assert_eq!( + secret_key_parsed.to_bytes(), + secret_key_from_file.unwrap().to_bytes() + ) + } + + assert_eq!( + Ok(None), + get_secret_key(&format!("{tmp_dir_str}/non_existent")) + .await + // Can't compare secret keys so we map to bytes + // even if we don't want one + .map(|opt| opt.map(|pk| pk.to_bytes())) + ); + // Will fail if we're root, but who runs this as root?? + assert!( + get_secret_key(&format!("/root/non_existent")) + .await + .is_err() + ); + } + + #[tokio::test] + async fn secret_key_set() { + let tmp_dir = TempDir::new().unwrap(); + let tmp_dir_str = tmp_dir.to_str().unwrap(); + + let path = format!("{tmp_dir_str}/secret"); + let key = SecretKey::generate(&mut rand::rng()); + + assert!(set_secret_key(&path, &key).await.is_ok()); + let read_file = read_to_string(&path).await; + assert!(read_file.is_ok()); + assert_eq!(read_file.unwrap(), key_bytes_to_b64(&key.to_bytes())); + } +} diff --git a/tests/plugin_cluster.rs b/tests/plugin_cluster.rs new file mode 100644 index 0000000..08fc1a1 --- /dev/null +++ b/tests/plugin_cluster.rs @@ -0,0 +1,82 @@ +use std::{path::Path, thread::sleep, time::Duration}; + +use assert_cmd::Command; +use assert_fs::prelude::*; +use predicates::prelude::predicate; +use tokio::{fs::read_to_string, runtime::Handle}; + +// require UDP ports 9876 & 9877 to be free on 127.0.0.1 + +#[tokio::test] +async fn plugin_cluster() { + // First build reaction-plugin-cluster + Command::new("cargo") + .args(["build", "-p", "reaction-plugin-cluster"]) + .unwrap(); + + let config = read_to_string("tests/test-conf/test-cluster.jsonnet") + .await + .unwrap(); + + let secret_key_a = "g7U1LPq2cgGSyk6CH_v1QpoXowSFKVQ8IcFljd_ZKGw="; + let public_key_a = "HhVh7ghqpXM9375HZ82OOeB504HBSS25wgug-1vUggY="; + let secret_key_b = "5EgRjwIpqd60IXWCGg5dFTtxkI-0fS1PlhoIhUjh1eY="; + let public_key_b = "LPSQ9pS7m_5vvNC-fhoBNeL2-eS2Fd6aO4ImSnXp3lc="; + + let config_a = config + .replace("PUBLIC_KEY", public_key_b) + .replace("NODE", "A") + .replace("1234", "9876") + .replace("4321", "9877"); + let config_b = config + .replace("PUBLIC_KEY", public_key_a) + .replace("NODE", "B") + .replace("1234", "9877") + .replace("4321", "9876"); + + let output_a = vec![ + "a0 1", "a0 2", "a0 3", "a0 4", "b0 1", "b0 2", "b0 3", "b0 4", "", + ]; + let output_b = vec![ + "a0 1", "a0 2", "a0 3", "a0 4", "b0 1", "b0 2", "b0 3", "b0 4", "", + ]; + + let runtime = Handle::current(); + let a_handle = runtime.spawn_blocking(|| launch_node(config_a, secret_key_a, output_a)); + let b_handle = runtime.spawn_blocking(|| launch_node(config_b, secret_key_b, output_b)); + + let (a_res, b_res) = tokio::join!(a_handle, b_handle); + a_res.unwrap(); + b_res.unwrap(); +} + +fn launch_node(config: String, my_secret: &'static str, expected_output: Vec<&'static str>) { + let tmp_dir = assert_fs::TempDir::new().unwrap(); + + // Write node config + tmp_dir.child("config.jsonnet").write_str(&config).unwrap(); + tmp_dir + .child("plugin_data/cluster/secret_key_s1") + .write_str(my_secret) + .unwrap(); + + // Copy cluster plugin + tmp_dir + .child("./target/debug/reaction-plugin-cluster") + .write_file(Path::new("./target/debug/reaction-plugin-cluster")) + .unwrap(); + + sleep(Duration::from_secs(10)); + Command::cargo_bin("reaction") + .unwrap() + .args(["start", "--socket", "./s", "--config", "./config.jsonnet"]) + .current_dir(tmp_dir.path()) + .timeout(Duration::from_secs(5)) + // Expected exit 1: all stream exited + .assert() + .code(predicate::eq(1)); + + // Expected output + tmp_dir.child("log").assert(expected_output.join("\n")); + tmp_dir.child("log").write_str("").unwrap(); +} diff --git a/tests/test-conf/test-cluster.jsonnet b/tests/test-conf/test-cluster.jsonnet index a635c73..b4cb403 100644 --- a/tests/test-conf/test-cluster.jsonnet +++ b/tests/test-conf/test-cluster.jsonnet @@ -28,18 +28,10 @@ a0: { type: 'cluster_send', options: { - send: 'a0 ', + send: 'NODE a0 ', to: 's1', }, }, - b0: { - type: 'cluster_send', - options: { - send: 'b0 ', - to: 's1', - }, - after: '600ms', - }, }, }, }, @@ -47,12 +39,14 @@ s1: { type: 'cluster', options: { - listen_port: 9000, - shared_secret: '', - nodes: { - publickey: '', - addresses: ['127.0.0.1:9001'], - }, + listen_port: 1234, + bind_ipv4: '127.0.0.1', + bind_ipv6: null, + message_timeout: '30s', + nodes: [{ + public_key: 'PUBLIC_KEY', + addresses: ['127.0.0.1:4321'], + }], }, filters: { f1: {