cluster: add a lot of DEBUG msgs, Show trait to ease logging

This commit is contained in:
ppom 2025-11-25 12:00:00 +01:00
commit ff5200b0a0
No known key found for this signature in database
6 changed files with 109 additions and 28 deletions

View file

@ -45,6 +45,8 @@ pub fn cluster_tasks(
mut actions: Vec<ActionInit>,
shutdown: ShutdownController,
) {
eprintln!("DEBUG cluster tasks starts running");
let (message_action2connection_txs, mut message_action2connection_rxs): (
Vec<tokioMpsc::Sender<UtcLine>>,
Vec<tokioMpsc::Receiver<UtcLine>>,
@ -110,6 +112,8 @@ pub fn cluster_tasks(
.await
});
}
eprintln!("DEBUG cluster tasks finished running");
}
impl ActionInit {
@ -120,6 +124,7 @@ impl ActionInit {
own_stream_tx: remocMpsc::Sender<Line>,
) {
while let Ok(Some(m)) = self.rx.recv().await {
eprintln!("DEBUG action: received a message to send to connections");
let line = if m.match_.is_empty() {
self.send.clone()
} else {

View file

@ -17,7 +17,7 @@ use tokio::{sync::mpsc, time::sleep};
use crate::{
cluster::{ALPN, UtcLine},
secret_key::key_bytes_to_b64,
key::Show,
};
const START_TIMEOUT: Duration = Duration::from_secs(5);
@ -112,6 +112,7 @@ impl ConnectionManager {
self.try_connect().await;
loop {
eprintln!("DEBUG connection: NEW LOOP!");
let tick = sleep(if self.connection.is_none() {
self.delta.unwrap_or(START_TIMEOUT)
} else {
@ -131,16 +132,17 @@ impl ConnectionManager {
.unwrap_or(false_recv().boxed());
let event = tokio::select! {
biased;
// Quitting
_ = self.shutdown.wait() => None,
// Receive a connection from EndpointManager
conn = self.connection_rx.recv() => Some(Event::ConnectionReceived(conn)),
// Tick when we don't have a connection
_ = tick, if !have_connection => Some(Event::Tick),
// Receive remote message when we have a connection
msg = maybe_conn_rx, if have_connection => Some(Event::RemoteMessageReceived(msg)),
// Receive a connection from EndpointManager
conn = self.connection_rx.recv() => Some(Event::ConnectionReceived(conn)),
// Receive a message from local Actions
msg = self.message_rx.recv() => Some(Event::LocalMessageReceived(msg)),
// Quitting
_ = self.shutdown.wait() => None,
};
match event {
@ -160,12 +162,27 @@ impl ConnectionManager {
self.handle_tick().await;
}
Event::ConnectionReceived(connection) => {
eprintln!(
"DEBUG cluster {}: node {}: received a connection",
self.cluster_name,
self.remote.id.show(),
);
self.handle_connection(connection).await;
}
Event::LocalMessageReceived(utc_line) => {
eprintln!(
"DEBUG cluster {}: node {}: received a local message",
self.cluster_name,
self.remote.id.show(),
);
self.handle_local_message(utc_line, Insert::Back).await;
}
Event::RemoteMessageReceived(message) => {
eprintln!(
"DEBUG cluster {}: node {}: received a remote message",
self.cluster_name,
self.remote.id.show(),
);
self.handle_remote_message(message).await;
}
}
@ -175,8 +192,21 @@ impl ConnectionManager {
/// Returns true if we have a valid connection now
async fn try_connect(&mut self) -> bool {
if self.connection.is_none() {
eprintln!(
"DEBUG cluster {}: node {}: trying to connect...",
self.cluster_name,
self.remote.id.show(),
);
match self.endpoint.connect(self.remote.clone(), ALPN[0]).await {
Ok(connection) => self.handle_connection(Some(connection)).await,
Ok(connection) => {
eprintln!(
"DEBUG cluster {}: node {}: created connection",
self.cluster_name,
self.remote.id.show(),
);
self.handle_connection(Some(connection)).await
}
Err(err) => {
self.try_connect_error(err.to_string());
false
@ -220,7 +250,7 @@ impl ConnectionManager {
eprintln!(
"DEBUG cluster {}: node {}: dropping {count} messages that reached timeout",
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes()),
self.remote.id.show(),
)
}
}
@ -233,7 +263,7 @@ impl ConnectionManager {
eprintln!(
"DEBUG cluster {}: ConnectionManager {}: quitting because EndpointManager has quit",
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes())
self.remote.id.show()
);
self.quit();
false
@ -257,7 +287,7 @@ impl ConnectionManager {
eprintln!(
"WARN cluster {}: ignoring incoming connection from {}, as we already have a valid connection with it",
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes())
self.remote.id.show()
);
true
}
@ -273,12 +303,12 @@ impl ConnectionManager {
eprintln!(
"ERROR cluster {}: trying to connect to node {}: {err}",
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes())
self.remote.id.show()
);
eprintln!(
"INFO cluster {}: retry connecting to node {} in {:?}",
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes()),
self.remote.id.show(),
self.delta
);
}
@ -292,7 +322,7 @@ impl ConnectionManager {
eprintln!(
"WARN cluster {}: error receiving message from node {}: {err}",
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes())
self.remote.id.show()
);
self.close_connection(1, b"error receiving from your stream")
.await;
@ -301,7 +331,7 @@ impl ConnectionManager {
eprintln!(
"WARN cluster {}: node {} closed its stream",
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes())
self.remote.id.show()
);
self.close_connection(1, b"you closed your stream").await;
}
@ -309,14 +339,14 @@ impl ConnectionManager {
eprintln!(
"WARN cluster {}: node {} sent invalid message, ignoring",
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes())
self.remote.id.show()
);
}
Ok(Some(RemoteMessage::Quitting)) => {
eprintln!(
"INFO cluster {}: node {} is quitting, bye bye",
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes())
self.remote.id.show()
);
self.close_connection(0, b"you said you'll quit so I quit")
.await;
@ -333,6 +363,13 @@ impl ConnectionManager {
self.cluster_name, line.0
);
self.quit();
} else {
eprintln!(
"DEBUG cluster {}: node {}: sent a remote message to local stream: {}",
self.cluster_name,
self.remote.id.show(),
line.0
);
}
}
}
@ -360,7 +397,7 @@ impl ConnectionManager {
eprintln!(
"INFO cluster {}: connection with node {} failed: {err}",
self.cluster_name,
key_bytes_to_b64(self.remote.id.as_bytes())
self.remote.id.show()
);
self.message_queue.push_back(message);
self.close_connection(
@ -368,6 +405,13 @@ impl ConnectionManager {
b"could not send a message to your channel so I quit",
)
.await;
} else {
eprintln!(
"DEBUG cluster {}: node {}: sent a local message to remote: {}",
self.cluster_name,
self.remote.id.show(),
message.0
);
}
}
None => {

View file

@ -8,6 +8,8 @@ use iroh::{
use reaction_plugin::shutdown::ShutdownController;
use tokio::sync::mpsc;
use crate::key::Show;
enum Break {
Yes,
No,
@ -66,10 +68,13 @@ impl EndpointManager {
}
async fn handle_incoming(&mut self, incoming: Incoming) -> Break {
eprintln!(
"DEBUG cluster {}: EndpointManager: receiving connection",
self.cluster_name,
);
// FIXME a malicious actor could maybe prevent a node from connecting to
// its cluster by sending lots of invalid slow connection requests?
// We could lower its priority https://docs.rs/tokio/latest/tokio/macro.select.html#fairness
// And/or moving the handshake to another task
// This function could be moved to a new 'oneshot' task instead
let remote_address = incoming.remote_address();
let remote_address_validated = incoming.remote_address_validated();
let connection = match incoming.await {
@ -118,6 +123,11 @@ impl EndpointManager {
self.shutdown.ask_shutdown();
return Break::Yes;
}
eprintln!(
"DEBUG cluster {}: EndpointManager: receiving connection from {}",
self.cluster_name,
remote_id.show(),
);
}
}

View file

@ -1,7 +1,7 @@
use std::io;
use data_encoding::DecodeError;
use iroh::SecretKey;
use iroh::{PublicKey, SecretKey};
use tokio::{
fs::{self, File},
io::AsyncWriteExt,
@ -40,7 +40,7 @@ async fn get_secret_key(path: &str) -> Result<Option<SecretKey>, String> {
}
async fn set_secret_key(path: &str, key: &SecretKey) -> Result<(), String> {
let secret_key = key_bytes_to_b64(&key.to_bytes());
let secret_key = key.show();
File::options()
.mode(0o600)
.write(true)
@ -72,6 +72,24 @@ pub fn key_bytes_to_b64(key: &[u8; 32]) -> String {
data_encoding::BASE64URL.encode(key)
}
/// Implemented by PublicKey & SecretKey to display keys as base64 instead of hexadecimal.
/// Similar to Display/ToString
pub trait Show {
fn show(&self) -> String;
}
impl Show for PublicKey {
fn show(&self) -> String {
key_bytes_to_b64(self.as_bytes())
}
}
impl Show for SecretKey {
fn show(&self) -> String {
key_bytes_to_b64(&self.to_bytes())
}
}
#[cfg(test)]
mod tests {
use assert_fs::{
@ -81,7 +99,7 @@ mod tests {
use iroh::{PublicKey, SecretKey};
use tokio::fs::read_to_string;
use crate::secret_key::{
use crate::key::{
get_secret_key, key_b64_to_bytes, key_bytes_to_b64, secret_key_path, set_secret_key,
};

View file

@ -15,11 +15,12 @@ use serde::{Deserialize, Serialize};
mod cluster;
mod connection;
mod endpoint;
mod secret_key;
mod key;
use secret_key::{key_b64_to_bytes, key_bytes_to_b64, secret_key};
use crate::cluster::{bind, cluster_tasks};
use crate::{
cluster::{bind, cluster_tasks},
key::{Show, key_b64_to_bytes, secret_key},
};
#[tokio::main]
async fn main() {
@ -148,7 +149,7 @@ impl PluginInfo for Plugin {
let secret_key = secret_key(".", &stream_name).await?;
eprintln!(
"INFO public key of this node for cluster {stream_name}: {}",
key_bytes_to_b64(secret_key.public().as_bytes())
secret_key.public().show()
);
let (tx, rx) = mpsc::channel(1);
@ -233,6 +234,8 @@ impl PluginInfo for Plugin {
// Free containers
self.actions = Default::default();
self.streams = Default::default();
eprintln!("DEBUG finished setup.");
Ok(())
}

View file

@ -92,8 +92,9 @@ impl PluginInfo for Plugin {
}
}
}
self.streams = BTreeMap::new();
self.actions_init = Vec::new();
// Free containers
self.streams = Default::default();
self.actions_init = Default::default();
Ok(())
}