From 3ed2ebd4884986db7a47c3888828bc9f809d1abf Mon Sep 17 00:00:00 2001 From: ppom Date: Wed, 26 Nov 2025 12:00:00 +0100 Subject: [PATCH] =?UTF-8?q?Two=20nodes=20succeeded=20to=20exchange=20messa?= =?UTF-8?q?ges=20=F0=9F=8E=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Separated try_connect to another task, to prevent interblocking Send a byte to the new stream so that the other can see the stream and accept it. --- .../reaction-plugin-cluster/src/cluster.rs | 25 +- .../reaction-plugin-cluster/src/connection.rs | 294 +++++++++--------- 2 files changed, 168 insertions(+), 151 deletions(-) diff --git a/plugins/reaction-plugin-cluster/src/cluster.rs b/plugins/reaction-plugin-cluster/src/cluster.rs index 5a4df4b..9d76f25 100644 --- a/plugins/reaction-plugin-cluster/src/cluster.rs +++ b/plugins/reaction-plugin-cluster/src/cluster.rs @@ -2,11 +2,15 @@ use std::{ collections::BTreeMap, net::{SocketAddrV4, SocketAddrV6}, sync::Arc, + time::Duration, }; use chrono::{DateTime, Local, Utc}; use futures::future::join_all; -use iroh::{Endpoint, PublicKey, endpoint::Connection}; +use iroh::{ + Endpoint, PublicKey, + endpoint::{Connection, TransportConfig, VarInt}, +}; use reaction_plugin::{Line, shutdown::ShutdownController}; use remoc::rch::mpsc as remocMpsc; use tokio::sync::mpsc as tokioMpsc; @@ -18,11 +22,17 @@ pub const ALPN: [&[u8]; 1] = ["reaction_cluster_1".as_bytes()]; pub type UtcLine = Arc<(String, DateTime)>; pub async fn bind(stream: &StreamInit) -> Result { + let mut transport = TransportConfig::default(); + transport + .max_idle_timeout(Some(VarInt::from_u32(1).into())) + .keep_alive_interval(Some(Duration::from_millis(400))); + let mut builder = Endpoint::builder() .secret_key(stream.secret_key.clone()) .alpns(ALPN.iter().map(|slice| slice.to_vec()).collect()) .relay_mode(iroh::RelayMode::Disabled) - .clear_discovery(); + .clear_discovery() + .transport_config(transport); if let Some(ip) = stream.bind_ipv4 { builder = builder.bind_addr_v4(SocketAddrV4::new(ip, stream.listen_port)); @@ -69,13 +79,17 @@ pub fn cluster_tasks( let (connection_endpoint2connection_txs, mut connection_endpoint2connection_rxs): ( BTreeMap>, - Vec<(PublicKey, tokioMpsc::Receiver)>, + Vec<( + PublicKey, + tokioMpsc::Sender, + tokioMpsc::Receiver, + )>, ) = stream .nodes .keys() .map(|pk| { let (tx, rx) = tokioMpsc::channel(1); - ((pk.clone(), tx), (pk.clone(), rx)) + ((pk.clone(), tx.clone()), (pk.clone(), tx, rx)) }) .unzip(); @@ -88,7 +102,7 @@ pub fn cluster_tasks( ); // Spawn connection managers - while let Some((pk, connection_endpoint2connection_rx)) = + while let Some((pk, connection_endpoint2connection_tx, connection_endpoint2connection_rx)) = connection_endpoint2connection_rxs.pop() { let cluster_name = stream.name.clone(); @@ -102,6 +116,7 @@ pub fn cluster_tasks( cluster_name, endpoint_addr, endpoint, + connection_endpoint2connection_tx, connection_endpoint2connection_rx, stream.message_timeout, message_action2connection_rx, diff --git a/plugins/reaction-plugin-cluster/src/connection.rs b/plugins/reaction-plugin-cluster/src/connection.rs index e5709b1..13ad2c0 100644 --- a/plugins/reaction-plugin-cluster/src/connection.rs +++ b/plugins/reaction-plugin-cluster/src/connection.rs @@ -1,8 +1,4 @@ -use std::{ - collections::VecDeque, - sync::Arc, - time::{Duration, Instant}, -}; +use std::{collections::VecDeque, sync::Arc}; use chrono::{DateTime, Local, TimeDelta, Utc}; use futures::FutureExt; @@ -13,21 +9,20 @@ use iroh::{ use reaction_plugin::{Line, shutdown::ShutdownController}; use remoc::{Connect, rch::base}; use serde::{Deserialize, Serialize}; -use tokio::{sync::mpsc, time::sleep}; +use tokio::{io::AsyncWriteExt, sync::mpsc}; use crate::{ cluster::{ALPN, UtcLine}, key::Show, }; -const START_TIMEOUT: Duration = Duration::from_secs(5); -const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60); // 1 hour -const TIMEOUT_FACTOR: f64 = 1.5; +// const START_TIMEOUT: Duration = Duration::from_secs(5); +// const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60); // 1 hour +// const TIMEOUT_FACTOR: f64 = 1.5; const PROTOCOL_VERSION: u32 = 1; enum Event { - Tick, LocalMessageReceived(Option), RemoteMessageReceived(Result, remoc::rch::base::RecvError>), ConnectionReceived(Option), @@ -50,22 +45,17 @@ struct OwnConnection { pub struct ConnectionManager { /// Cluster's name (for logging) cluster_name: String, + /// The remote node we're communicating with (for logging) + node_id: String, - /// The remote node we're communicating with - remote: EndpointAddr, - /// Endpoint - endpoint: Arc, - + /// Ask for a connection + ask_connection: mpsc::Sender>, + cancel_ask_connection: Option>, /// The EndpointManager sending us a connection (whether we asked for it or not) connection_rx: mpsc::Receiver, /// Our own connection (when we have one) connection: Option, - /// Delta we'll use next time we'll try to connect to remote - delta: Option, - /// Next instant we'll try to connect - next_try_connect: Option, - /// Max duration before we drop pending messages to a node we can't connect to. message_timeout: TimeDelta, /// Message we receive from actions @@ -85,19 +75,28 @@ impl ConnectionManager { cluster_name: String, remote: EndpointAddr, endpoint: Arc, + connection_tx: mpsc::Sender, connection_rx: mpsc::Receiver, message_timeout: TimeDelta, message_rx: mpsc::Receiver, own_cluster_tx: remoc::rch::mpsc::Sender, shutdown: ShutdownController, ) -> Self { + let node_id = remote.id.show(); + let (ask_connection, order_start) = mpsc::channel(1); + try_connect( + cluster_name.clone(), + remote.clone(), + endpoint.clone(), + connection_tx, + order_start, + ); Self { cluster_name, - remote, - endpoint, + node_id, connection: None, - delta: None, - next_try_connect: None, + ask_connection, + cancel_ask_connection: None, connection_rx, message_timeout, message_rx, @@ -109,17 +108,9 @@ impl ConnectionManager { /// Main loop pub async fn task(mut self) { - self.try_connect().await; - + self.ask_connection().await; loop { eprintln!("DEBUG connection: NEW LOOP!"); - let tick = sleep(if self.connection.is_none() { - self.delta.unwrap_or(START_TIMEOUT) - } else { - // Still tick when we have a connection - Duration::from_secs(60) - }); - tokio::pin!(tick); let have_connection = self.connection.is_some(); let maybe_conn_rx = self @@ -137,8 +128,6 @@ impl ConnectionManager { _ = self.shutdown.wait() => None, // Receive a connection from EndpointManager conn = self.connection_rx.recv() => Some(Event::ConnectionReceived(conn)), - // Tick when we don't have a connection - _ = tick, if !have_connection => Some(Event::Tick), // Receive remote message when we have a connection msg = maybe_conn_rx, if have_connection => Some(Event::RemoteMessageReceived(msg)), // Receive a message from local Actions @@ -158,74 +147,30 @@ impl ConnectionManager { async fn handle_event(&mut self, event: Event) { match event { - Event::Tick => { - self.handle_tick().await; - } Event::ConnectionReceived(connection) => { eprintln!( "DEBUG cluster {}: node {}: received a connection", - self.cluster_name, - self.remote.id.show(), + self.cluster_name, self.node_id, ); self.handle_connection(connection).await; } Event::LocalMessageReceived(utc_line) => { eprintln!( "DEBUG cluster {}: node {}: received a local message", - self.cluster_name, - self.remote.id.show(), + self.cluster_name, self.node_id, ); self.handle_local_message(utc_line, Insert::Back).await; } Event::RemoteMessageReceived(message) => { eprintln!( "DEBUG cluster {}: node {}: received a remote message", - self.cluster_name, - self.remote.id.show(), + self.cluster_name, self.node_id, ); self.handle_remote_message(message).await; } } } - /// Try connecting to a remote endpoint - /// Returns true if we have a valid connection now - async fn try_connect(&mut self) -> bool { - if self.connection.is_none() { - eprintln!( - "DEBUG cluster {}: node {}: trying to connect...", - self.cluster_name, - self.remote.id.show(), - ); - match self.endpoint.connect(self.remote.clone(), ALPN[0]).await { - Ok(connection) => { - eprintln!( - "DEBUG cluster {}: node {}: created connection", - self.cluster_name, - self.remote.id.show(), - ); - - self.handle_connection(Some(connection)).await - } - Err(err) => { - self.try_connect_error(err.to_string()); - false - } - } - } else { - true - } - } - - async fn handle_tick(&mut self) { - if self - .next_try_connect - .is_some_and(|next| next > Instant::now()) - { - self.try_connect().await; - } - } - async fn send_queue_messages(&mut self) { while self.connection.is_some() && let Some(message) = self.message_queue.pop_front() @@ -249,8 +194,7 @@ impl ConnectionManager { if count > 0 { eprintln!( "DEBUG cluster {}: node {}: dropping {count} messages that reached timeout", - self.cluster_name, - self.remote.id.show(), + self.cluster_name, self.node_id, ) } } @@ -262,32 +206,33 @@ impl ConnectionManager { None => { eprintln!( "DEBUG cluster {}: ConnectionManager {}: quitting because EndpointManager has quit", - self.cluster_name, - self.remote.id.show() + self.cluster_name, self.node_id, ); self.quit(); false } Some(connection) => { + if let Some(cancel) = self.cancel_ask_connection.take() { + let _ = cancel.send(()).await; + } if self.connection.is_none() { - self.delta = None; - self.next_try_connect = None; - match open_channels(&connection).await { Ok((tx, rx)) => { self.connection = Some(OwnConnection { connection, tx, rx }); true } Err(err) => { - self.try_connect_error(err); + eprintln!( + "ERROR cluster {}: trying to initialize connection to node {}: {err}", + self.cluster_name, self.node_id, + ); false } } } else { eprintln!( - "WARN cluster {}: ignoring incoming connection from {}, as we already have a valid connection with it", - self.cluster_name, - self.remote.id.show() + "WARN cluster {}: node {}: ignoring incoming connection, as we already have a valid connection with it", + self.cluster_name, self.node_id, ); true } @@ -295,24 +240,6 @@ impl ConnectionManager { } } - /// Update the state and log an error when bootstraping a new Connection - fn try_connect_error(&mut self, err: String) { - let delta = next_delta(self.delta); - self.next_try_connect = Some(Instant::now() + delta); - self.delta = Some(delta); - eprintln!( - "ERROR cluster {}: trying to connect to node {}: {err}", - self.cluster_name, - self.remote.id.show() - ); - eprintln!( - "INFO cluster {}: retry connecting to node {} in {:?}", - self.cluster_name, - self.remote.id.show(), - self.delta - ); - } - async fn handle_remote_message( &mut self, message: Result, remoc::rch::base::RecvError>, @@ -320,9 +247,8 @@ impl ConnectionManager { match message { Err(err) => { eprintln!( - "WARN cluster {}: error receiving message from node {}: {err}", - self.cluster_name, - self.remote.id.show() + "WARN cluster {}: node {}: error receiving remote message: {err}", + self.cluster_name, self.node_id, ); self.close_connection(1, b"error receiving from your stream") .await; @@ -330,23 +256,20 @@ impl ConnectionManager { Ok(None) => { eprintln!( "WARN cluster {}: node {} closed its stream", - self.cluster_name, - self.remote.id.show() + self.cluster_name, self.node_id, ); self.close_connection(1, b"you closed your stream").await; } Ok(Some(RemoteMessage::Version(_))) => { eprintln!( "WARN cluster {}: node {} sent invalid message, ignoring", - self.cluster_name, - self.remote.id.show() + self.cluster_name, self.node_id, ); } Ok(Some(RemoteMessage::Quitting)) => { eprintln!( "INFO cluster {}: node {} is quitting, bye bye", - self.cluster_name, - self.remote.id.show() + self.cluster_name, self.node_id, ); self.close_connection(0, b"you said you'll quit so I quit") .await; @@ -366,9 +289,7 @@ impl ConnectionManager { } else { eprintln!( "DEBUG cluster {}: node {}: sent a remote message to local stream: {}", - self.cluster_name, - self.remote.id.show(), - line.0 + self.cluster_name, self.node_id, line.0 ); } } @@ -396,8 +317,7 @@ impl ConnectionManager { { eprintln!( "INFO cluster {}: connection with node {} failed: {err}", - self.cluster_name, - self.remote.id.show() + self.cluster_name, self.node_id, ); self.message_queue.push_back(message); self.close_connection( @@ -408,13 +328,15 @@ impl ConnectionManager { } else { eprintln!( "DEBUG cluster {}: node {}: sent a local message to remote: {}", - self.cluster_name, - self.remote.id.show(), - message.0 + self.cluster_name, self.node_id, message.0 ); } } None => { + eprintln!( + "DEBUG cluster {}: node {}: no connection, saving local message to send later: {}", + self.cluster_name, self.node_id, message.0 + ); if let Insert::Front = insert { self.message_queue.push_front(message); } else { @@ -430,6 +352,19 @@ impl ConnectionManager { connection.rx.close().await; connection.connection.close(VarInt::from_u32(code), reason); } + self.ask_connection().await; + } + + async fn ask_connection(&mut self) { + let (tx, rx) = mpsc::channel(1); + self.cancel_ask_connection = Some(tx); + if let Err(err) = self.ask_connection.send(rx).await { + eprintln!( + "ERROR cluster {}: node {}: quitting because our connection initiater quitted: {}", + self.cluster_name, self.node_id, err + ); + self.quit(); + } } fn quit(&mut self) { @@ -439,21 +374,21 @@ impl ConnectionManager { /// Compute the next wait Duration. /// We're multiplying the Duration by [`TIMEOUT_FACTOR`] and cap it to [`MAX_TIMEOUT`]. -fn next_delta(delta: Option) -> Duration { - match delta { - None => START_TIMEOUT, - Some(delta) => { - // Multiply timeout by TIMEOUT_FACTOR - let delta = Duration::from_millis(((delta.as_millis() as f64) * TIMEOUT_FACTOR) as u64); - // Cap to MAX_TIMEOUT - if delta > MAX_TIMEOUT { - MAX_TIMEOUT - } else { - delta - } - } - } -} +// fn next_delta(delta: Option) -> Duration { +// match delta { +// None => START_TIMEOUT, +// Some(delta) => { +// // Multiply timeout by TIMEOUT_FACTOR +// let delta = Duration::from_millis(((delta.as_millis() as f64) * TIMEOUT_FACTOR) as u64); +// // Cap to MAX_TIMEOUT +// if delta > MAX_TIMEOUT { +// MAX_TIMEOUT +// } else { +// delta +// } +// } +// } +// } /// All possible communication messages /// Set as an enum for forward compatibility @@ -472,26 +407,45 @@ pub enum RemoteMessage { async fn open_channels( connection: &Connection, ) -> Result<(base::Sender, base::Receiver), String> { - let output = connection.open_uni().await.map_err(|err| err.to_string())?; + eprintln!("DEBUG opening uni channel"); + let mut output = connection.open_uni().await.map_err(|err| err.to_string())?; - let input = connection + eprintln!("DEBUG sending 1 byte in uni channel"); + let res = output.write(&[0; 1]).await.map_err(|err| err.to_string())?; + eprintln!("DEBUG sent {res} byte in uni channel"); + output.flush().await.map_err(|err| err.to_string())?; + + eprintln!("DEBUG accepting uni channel"); + let mut input = connection .accept_uni() .await .map_err(|err| err.to_string())?; + eprintln!("DEBUG reading 1 byte from uni channel"); + input + .read(&mut [0; 1]) + .await + .map_err(|err| err.to_string())?; + + eprintln!("DEBUG creating remoc channels"); let (conn, mut tx, mut rx) = Connect::io_buffered(remoc::Cfg::default(), input, output, 1024) .await .map_err(|err| err.to_string())?; tokio::spawn(conn); + eprintln!("DEBUG sending version"); tx.send(RemoteMessage::Version(PROTOCOL_VERSION)) .await .map_err(|err| err.to_string())?; + eprintln!("DEBUG receiving version"); match rx.recv().await { // Good protocol version! - Ok(Some(RemoteMessage::Version(PROTOCOL_VERSION))) => Ok((tx, rx)), + Ok(Some(RemoteMessage::Version(PROTOCOL_VERSION))) => { + eprintln!("DEBUG version handshake complete!"); + Ok((tx, rx)) + } // Errors Ok(Some(RemoteMessage::Version(other))) => Err(format!( "incompatible version: {other}. We use {PROTOCOL_VERSION}. Consider upgrading the node with the older version." @@ -508,3 +462,51 @@ async fn open_channels( async fn false_recv() -> Result, remoc::rch::base::RecvError> { Ok(None) } + +fn try_connect( + cluster_name: String, + remote: EndpointAddr, + endpoint: Arc, + connection_tx: mpsc::Sender, + mut order_start: mpsc::Receiver>, +) { + tokio::spawn(async move { + let node_id = remote.id.show(); + // Each time we receive and order + while let Some(mut order_stop) = order_start.recv().await { + // Until we have a connection or we're requested to stop + let mut keep_trying = true; + while keep_trying { + eprintln!("DEBUG cluster {cluster_name}: node {node_id}: trying to connect..."); + let connect = tokio::select! { + conn = endpoint.connect(remote.clone(), ALPN[0]) => Some(conn), + _ = order_stop.recv() => None, + }; + if let Some(connect) = connect { + match connect { + Ok(connection) => { + eprintln!( + "DEBUG cluster {cluster_name}: node {node_id}: created connection" + ); + if let Err(err) = connection_tx.send(connection).await { + eprintln!( + "DEBUG cluster {cluster_name}: node {node_id}: quitting because ConnectionManager has quit: {err}" + ); + order_start.close(); + } + keep_trying = false; + } + Err(err) => { + eprintln!( + "WARN cluster {cluster_name}: node {node_id}: while trying to connect: {err}" + ); + } + } + } else { + // received stop order + keep_trying = false; + } + } + } + }); +}