diff --git a/plugins/reaction-plugin-cluster/src/cluster.rs b/plugins/reaction-plugin-cluster/src/cluster.rs index f29c47b..5a4df4b 100644 --- a/plugins/reaction-plugin-cluster/src/cluster.rs +++ b/plugins/reaction-plugin-cluster/src/cluster.rs @@ -45,6 +45,8 @@ pub fn cluster_tasks( mut actions: Vec, shutdown: ShutdownController, ) { + eprintln!("DEBUG cluster tasks starts running"); + let (message_action2connection_txs, mut message_action2connection_rxs): ( Vec>, Vec>, @@ -110,6 +112,8 @@ pub fn cluster_tasks( .await }); } + + eprintln!("DEBUG cluster tasks finished running"); } impl ActionInit { @@ -120,6 +124,7 @@ impl ActionInit { own_stream_tx: remocMpsc::Sender, ) { while let Ok(Some(m)) = self.rx.recv().await { + eprintln!("DEBUG action: received a message to send to connections"); let line = if m.match_.is_empty() { self.send.clone() } else { diff --git a/plugins/reaction-plugin-cluster/src/connection.rs b/plugins/reaction-plugin-cluster/src/connection.rs index bbfb1ed..e5709b1 100644 --- a/plugins/reaction-plugin-cluster/src/connection.rs +++ b/plugins/reaction-plugin-cluster/src/connection.rs @@ -17,7 +17,7 @@ use tokio::{sync::mpsc, time::sleep}; use crate::{ cluster::{ALPN, UtcLine}, - secret_key::key_bytes_to_b64, + key::Show, }; const START_TIMEOUT: Duration = Duration::from_secs(5); @@ -112,6 +112,7 @@ impl ConnectionManager { self.try_connect().await; loop { + eprintln!("DEBUG connection: NEW LOOP!"); let tick = sleep(if self.connection.is_none() { self.delta.unwrap_or(START_TIMEOUT) } else { @@ -131,16 +132,17 @@ impl ConnectionManager { .unwrap_or(false_recv().boxed()); let event = tokio::select! { + biased; + // Quitting + _ = self.shutdown.wait() => None, + // Receive a connection from EndpointManager + conn = self.connection_rx.recv() => Some(Event::ConnectionReceived(conn)), // Tick when we don't have a connection _ = tick, if !have_connection => Some(Event::Tick), // Receive remote message when we have a connection msg = maybe_conn_rx, if have_connection => Some(Event::RemoteMessageReceived(msg)), - // Receive a connection from EndpointManager - conn = self.connection_rx.recv() => Some(Event::ConnectionReceived(conn)), // Receive a message from local Actions msg = self.message_rx.recv() => Some(Event::LocalMessageReceived(msg)), - // Quitting - _ = self.shutdown.wait() => None, }; match event { @@ -160,12 +162,27 @@ impl ConnectionManager { self.handle_tick().await; } Event::ConnectionReceived(connection) => { + eprintln!( + "DEBUG cluster {}: node {}: received a connection", + self.cluster_name, + self.remote.id.show(), + ); self.handle_connection(connection).await; } Event::LocalMessageReceived(utc_line) => { + eprintln!( + "DEBUG cluster {}: node {}: received a local message", + self.cluster_name, + self.remote.id.show(), + ); self.handle_local_message(utc_line, Insert::Back).await; } Event::RemoteMessageReceived(message) => { + eprintln!( + "DEBUG cluster {}: node {}: received a remote message", + self.cluster_name, + self.remote.id.show(), + ); self.handle_remote_message(message).await; } } @@ -175,8 +192,21 @@ impl ConnectionManager { /// Returns true if we have a valid connection now async fn try_connect(&mut self) -> bool { if self.connection.is_none() { + eprintln!( + "DEBUG cluster {}: node {}: trying to connect...", + self.cluster_name, + self.remote.id.show(), + ); match self.endpoint.connect(self.remote.clone(), ALPN[0]).await { - Ok(connection) => self.handle_connection(Some(connection)).await, + Ok(connection) => { + eprintln!( + "DEBUG cluster {}: node {}: created connection", + self.cluster_name, + self.remote.id.show(), + ); + + self.handle_connection(Some(connection)).await + } Err(err) => { self.try_connect_error(err.to_string()); false @@ -220,7 +250,7 @@ impl ConnectionManager { eprintln!( "DEBUG cluster {}: node {}: dropping {count} messages that reached timeout", self.cluster_name, - key_bytes_to_b64(self.remote.id.as_bytes()), + self.remote.id.show(), ) } } @@ -233,7 +263,7 @@ impl ConnectionManager { eprintln!( "DEBUG cluster {}: ConnectionManager {}: quitting because EndpointManager has quit", self.cluster_name, - key_bytes_to_b64(self.remote.id.as_bytes()) + self.remote.id.show() ); self.quit(); false @@ -257,7 +287,7 @@ impl ConnectionManager { eprintln!( "WARN cluster {}: ignoring incoming connection from {}, as we already have a valid connection with it", self.cluster_name, - key_bytes_to_b64(self.remote.id.as_bytes()) + self.remote.id.show() ); true } @@ -273,12 +303,12 @@ impl ConnectionManager { eprintln!( "ERROR cluster {}: trying to connect to node {}: {err}", self.cluster_name, - key_bytes_to_b64(self.remote.id.as_bytes()) + self.remote.id.show() ); eprintln!( "INFO cluster {}: retry connecting to node {} in {:?}", self.cluster_name, - key_bytes_to_b64(self.remote.id.as_bytes()), + self.remote.id.show(), self.delta ); } @@ -292,7 +322,7 @@ impl ConnectionManager { eprintln!( "WARN cluster {}: error receiving message from node {}: {err}", self.cluster_name, - key_bytes_to_b64(self.remote.id.as_bytes()) + self.remote.id.show() ); self.close_connection(1, b"error receiving from your stream") .await; @@ -301,7 +331,7 @@ impl ConnectionManager { eprintln!( "WARN cluster {}: node {} closed its stream", self.cluster_name, - key_bytes_to_b64(self.remote.id.as_bytes()) + self.remote.id.show() ); self.close_connection(1, b"you closed your stream").await; } @@ -309,14 +339,14 @@ impl ConnectionManager { eprintln!( "WARN cluster {}: node {} sent invalid message, ignoring", self.cluster_name, - key_bytes_to_b64(self.remote.id.as_bytes()) + self.remote.id.show() ); } Ok(Some(RemoteMessage::Quitting)) => { eprintln!( "INFO cluster {}: node {} is quitting, bye bye", self.cluster_name, - key_bytes_to_b64(self.remote.id.as_bytes()) + self.remote.id.show() ); self.close_connection(0, b"you said you'll quit so I quit") .await; @@ -333,6 +363,13 @@ impl ConnectionManager { self.cluster_name, line.0 ); self.quit(); + } else { + eprintln!( + "DEBUG cluster {}: node {}: sent a remote message to local stream: {}", + self.cluster_name, + self.remote.id.show(), + line.0 + ); } } } @@ -360,7 +397,7 @@ impl ConnectionManager { eprintln!( "INFO cluster {}: connection with node {} failed: {err}", self.cluster_name, - key_bytes_to_b64(self.remote.id.as_bytes()) + self.remote.id.show() ); self.message_queue.push_back(message); self.close_connection( @@ -368,6 +405,13 @@ impl ConnectionManager { b"could not send a message to your channel so I quit", ) .await; + } else { + eprintln!( + "DEBUG cluster {}: node {}: sent a local message to remote: {}", + self.cluster_name, + self.remote.id.show(), + message.0 + ); } } None => { diff --git a/plugins/reaction-plugin-cluster/src/endpoint.rs b/plugins/reaction-plugin-cluster/src/endpoint.rs index 77a7035..4101ba3 100644 --- a/plugins/reaction-plugin-cluster/src/endpoint.rs +++ b/plugins/reaction-plugin-cluster/src/endpoint.rs @@ -8,6 +8,8 @@ use iroh::{ use reaction_plugin::shutdown::ShutdownController; use tokio::sync::mpsc; +use crate::key::Show; + enum Break { Yes, No, @@ -66,10 +68,13 @@ impl EndpointManager { } async fn handle_incoming(&mut self, incoming: Incoming) -> Break { + eprintln!( + "DEBUG cluster {}: EndpointManager: receiving connection", + self.cluster_name, + ); // FIXME a malicious actor could maybe prevent a node from connecting to // its cluster by sending lots of invalid slow connection requests? - // We could lower its priority https://docs.rs/tokio/latest/tokio/macro.select.html#fairness - // And/or moving the handshake to another task + // This function could be moved to a new 'oneshot' task instead let remote_address = incoming.remote_address(); let remote_address_validated = incoming.remote_address_validated(); let connection = match incoming.await { @@ -118,6 +123,11 @@ impl EndpointManager { self.shutdown.ask_shutdown(); return Break::Yes; } + eprintln!( + "DEBUG cluster {}: EndpointManager: receiving connection from {}", + self.cluster_name, + remote_id.show(), + ); } } diff --git a/plugins/reaction-plugin-cluster/src/secret_key.rs b/plugins/reaction-plugin-cluster/src/key.rs similarity index 91% rename from plugins/reaction-plugin-cluster/src/secret_key.rs rename to plugins/reaction-plugin-cluster/src/key.rs index 6397611..db0d277 100644 --- a/plugins/reaction-plugin-cluster/src/secret_key.rs +++ b/plugins/reaction-plugin-cluster/src/key.rs @@ -1,7 +1,7 @@ use std::io; use data_encoding::DecodeError; -use iroh::SecretKey; +use iroh::{PublicKey, SecretKey}; use tokio::{ fs::{self, File}, io::AsyncWriteExt, @@ -40,7 +40,7 @@ async fn get_secret_key(path: &str) -> Result, String> { } async fn set_secret_key(path: &str, key: &SecretKey) -> Result<(), String> { - let secret_key = key_bytes_to_b64(&key.to_bytes()); + let secret_key = key.show(); File::options() .mode(0o600) .write(true) @@ -72,6 +72,24 @@ pub fn key_bytes_to_b64(key: &[u8; 32]) -> String { data_encoding::BASE64URL.encode(key) } +/// Implemented by PublicKey & SecretKey to display keys as base64 instead of hexadecimal. +/// Similar to Display/ToString +pub trait Show { + fn show(&self) -> String; +} + +impl Show for PublicKey { + fn show(&self) -> String { + key_bytes_to_b64(self.as_bytes()) + } +} + +impl Show for SecretKey { + fn show(&self) -> String { + key_bytes_to_b64(&self.to_bytes()) + } +} + #[cfg(test)] mod tests { use assert_fs::{ @@ -81,7 +99,7 @@ mod tests { use iroh::{PublicKey, SecretKey}; use tokio::fs::read_to_string; - use crate::secret_key::{ + use crate::key::{ get_secret_key, key_b64_to_bytes, key_bytes_to_b64, secret_key_path, set_secret_key, }; diff --git a/plugins/reaction-plugin-cluster/src/main.rs b/plugins/reaction-plugin-cluster/src/main.rs index b8e8198..cd8875a 100644 --- a/plugins/reaction-plugin-cluster/src/main.rs +++ b/plugins/reaction-plugin-cluster/src/main.rs @@ -15,11 +15,12 @@ use serde::{Deserialize, Serialize}; mod cluster; mod connection; mod endpoint; -mod secret_key; +mod key; -use secret_key::{key_b64_to_bytes, key_bytes_to_b64, secret_key}; - -use crate::cluster::{bind, cluster_tasks}; +use crate::{ + cluster::{bind, cluster_tasks}, + key::{Show, key_b64_to_bytes, secret_key}, +}; #[tokio::main] async fn main() { @@ -148,7 +149,7 @@ impl PluginInfo for Plugin { let secret_key = secret_key(".", &stream_name).await?; eprintln!( "INFO public key of this node for cluster {stream_name}: {}", - key_bytes_to_b64(secret_key.public().as_bytes()) + secret_key.public().show() ); let (tx, rx) = mpsc::channel(1); @@ -233,6 +234,8 @@ impl PluginInfo for Plugin { // Free containers self.actions = Default::default(); self.streams = Default::default(); + eprintln!("DEBUG finished setup."); + Ok(()) } diff --git a/plugins/reaction-plugin-virtual/src/main.rs b/plugins/reaction-plugin-virtual/src/main.rs index 8bd735c..eafb0f9 100644 --- a/plugins/reaction-plugin-virtual/src/main.rs +++ b/plugins/reaction-plugin-virtual/src/main.rs @@ -92,8 +92,9 @@ impl PluginInfo for Plugin { } } } - self.streams = BTreeMap::new(); - self.actions_init = Vec::new(); + // Free containers + self.streams = Default::default(); + self.actions_init = Default::default(); Ok(()) }