diff --git a/plugins/reaction-plugin-cluster/src/cluster.rs b/plugins/reaction-plugin-cluster/src/cluster.rs index cb32566..30f60fd 100644 --- a/plugins/reaction-plugin-cluster/src/cluster.rs +++ b/plugins/reaction-plugin-cluster/src/cluster.rs @@ -7,8 +7,8 @@ use std::{ use futures::future::join_all; use iroh::{ - Endpoint, PublicKey, - endpoint::{ConnectOptions, Connection, TransportConfig}, + Endpoint, + endpoint::{ConnectOptions, TransportConfig}, }; use reaction_plugin::{Line, shutdown::ShutdownController}; use remoc::rch::mpsc as remocMpsc; @@ -84,21 +84,27 @@ pub fn cluster_tasks( let endpoint = Arc::new(endpoint); - let (connection_endpoint2connection_txs, mut connection_endpoint2connection_rxs): ( - BTreeMap>, - Vec<( - PublicKey, - tokioMpsc::Sender, - tokioMpsc::Receiver, - )>, - ) = stream - .nodes - .keys() - .map(|pk| { - let (tx, rx) = tokioMpsc::channel(1); - ((pk.clone(), tx.clone()), (pk.clone(), tx, rx)) - }) - .unzip(); + let mut connection_endpoint2connection_txs = BTreeMap::new(); + + // Spawn connection managers + while let Some((pk, endpoint_addr)) = stream.nodes.pop_first() { + let cluster_name = stream.name.clone(); + let endpoint = endpoint.clone(); + let message_action2connection_rx = message_action2connection_rxs.pop().unwrap(); + let stream_tx = stream.tx.clone(); + let shutdown = shutdown.clone(); + let (connection_manager, connection_endpoint2connection_tx) = ConnectionManager::new( + cluster_name, + endpoint_addr, + endpoint, + stream.message_timeout, + message_action2connection_rx, + stream_tx, + shutdown, + ); + tokio::spawn(async move { connection_manager.task().await }); + connection_endpoint2connection_txs.insert(pk, connection_endpoint2connection_tx); + } // Spawn connection accepter EndpointManager::new( @@ -108,33 +114,6 @@ pub fn cluster_tasks( shutdown.clone(), ); - // Spawn connection managers - while let Some((pk, connection_endpoint2connection_tx, connection_endpoint2connection_rx)) = - connection_endpoint2connection_rxs.pop() - { - let cluster_name = stream.name.clone(); - let endpoint_addr = stream.nodes.remove(&pk).unwrap(); - let endpoint = endpoint.clone(); - let message_action2connection_rx = message_action2connection_rxs.pop().unwrap(); - let stream_tx = stream.tx.clone(); - let shutdown = shutdown.clone(); - tokio::spawn(async move { - ConnectionManager::new( - cluster_name, - endpoint_addr, - endpoint, - connection_endpoint2connection_tx, - connection_endpoint2connection_rx, - stream.message_timeout, - message_action2connection_rx, - stream_tx, - shutdown, - ) - .task() - .await - }); - } - eprintln!("DEBUG cluster tasks finished running"); } diff --git a/plugins/reaction-plugin-cluster/src/connection.rs b/plugins/reaction-plugin-cluster/src/connection.rs index 08208ee..45d77af 100644 --- a/plugins/reaction-plugin-cluster/src/connection.rs +++ b/plugins/reaction-plugin-cluster/src/connection.rs @@ -16,6 +16,7 @@ use reaction_plugin::{Line, shutdown::ShutdownController}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, sync::mpsc, + time::sleep, }; use crate::{ @@ -23,10 +24,6 @@ use crate::{ key::Show, }; -// const START_TIMEOUT: Duration = Duration::from_secs(5); -// const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60); // 1 hour -// const TIMEOUT_FACTOR: f64 = 1.5; - const PROTOCOL_VERSION: u32 = 1; type RemoteLine = (String, Duration); @@ -35,7 +32,7 @@ type MaybeRemoteLine = Result, IoError>; enum Event { LocalMessageReceived(Option), RemoteMessageReceived(MaybeRemoteLine), - ConnectionReceived(Option), + ConnectionReceived(Option), } enum Insert { @@ -43,8 +40,9 @@ enum Insert { Back, } -struct OwnConnection { +pub struct OwnConnection { connection: Connection, + id: u64, line_tx: BufWriter, line_rx: BufReader, @@ -57,11 +55,13 @@ struct OwnConnection { impl OwnConnection { fn new( connection: Connection, + id: u64, line_tx: BufWriter, line_rx: BufReader, ) -> Self { Self { connection, + id, line_tx, line_rx, next_time: None, @@ -125,6 +125,11 @@ impl OwnConnection { } } +pub enum ConnOrConn { + Connection(Connection), + OwnConnection(OwnConnection), +} + /// Handle a remote node. /// Manage reception and sending of messages to this node. /// Retry failed connections. @@ -138,11 +143,11 @@ pub struct ConnectionManager { ask_connection: mpsc::Sender>, cancel_ask_connection: Option>, /// The EndpointManager sending us a connection (whether we asked for it or not) - connection_rx: mpsc::Receiver, + connection_rx: mpsc::Receiver, /// Our own connection (when we have one) connection: Option, /// Last connexion ID, used to have a determinist way to choose between conflicting connections - last_connexion_id: u64, + last_connection_id: u64, /// Max duration before we drop pending messages to a node we can't connect to. message_timeout: Duration, @@ -163,44 +168,45 @@ impl ConnectionManager { cluster_name: String, remote: EndpointAddr, endpoint: Arc, - connection_tx: mpsc::Sender, - connection_rx: mpsc::Receiver, message_timeout: Duration, message_rx: mpsc::Receiver, own_cluster_tx: remoc::rch::mpsc::Sender, shutdown: ShutdownController, - ) -> Self { + ) -> (Self, mpsc::Sender) { let node_id = remote.id.show(); + let (connection_tx, connection_rx) = mpsc::channel(1); let (ask_connection, order_start) = mpsc::channel(1); try_connect( cluster_name.clone(), remote.clone(), endpoint.clone(), - connection_tx, + 0, + connection_tx.clone(), order_start, ); - Self { - cluster_name, - node_id, - connection: None, - ask_connection, - cancel_ask_connection: None, - connection_rx, - last_connexion_id: 0, - message_timeout, - message_rx, - message_queue: VecDeque::default(), - own_cluster_tx, - shutdown, - } + ( + Self { + cluster_name, + node_id, + connection: None, + ask_connection, + cancel_ask_connection: None, + connection_rx, + last_connection_id: 0, + message_timeout, + message_rx, + message_queue: VecDeque::default(), + own_cluster_tx, + shutdown, + }, + connection_tx, + ) } /// Main loop pub async fn task(mut self) { self.ask_connection().await; loop { - eprintln!("DEBUG connection: NEW LOOP!"); - let have_connection = self.connection.is_some(); let maybe_conn_rx = self .connection @@ -278,7 +284,7 @@ impl ConnectionManager { /// Bootstrap a new Connection /// Returns true if we have a valid connection now - async fn handle_connection(&mut self, connection: Option) { + async fn handle_connection(&mut self, connection: Option) { match connection { None => { eprintln!( @@ -291,27 +297,40 @@ impl ConnectionManager { if let Some(cancel) = self.cancel_ask_connection.take() { let _ = cancel.send(()).await; } - match open_channels(connection, self.last_connexion_id).await { - Ok((own_connection, new_id)) => { - if self.connection.is_none() || self.last_connexion_id < new_id { - self.connection = Some(own_connection); - self.last_connexion_id = new_id; - } else { - eprintln!( - "WARN cluster {}: node {}: ignoring incoming connection, as we already have a valid connection with it and our connection id is greater", - self.cluster_name, self.node_id, - ); - } - } - Err(err) => { + + let last_connection_id = self.last_connection_id; + let mut insert_connection = |own_connection: OwnConnection| { + if self + .connection + .as_ref() + .is_none_or(|old_own| old_own.id < own_connection.id) + { + self.last_connection_id = own_connection.id; + self.connection = Some(own_connection); + } else { eprintln!( - "ERROR cluster {}: trying to initialize connection to node {}: {err}", + "WARN cluster {}: node {}: ignoring incoming connection, as we already have a valid connection with it and our connection id is greater", self.cluster_name, self.node_id, ); - if self.connection.is_none() { - self.ask_connection().await; + } + }; + + match connection { + ConnOrConn::Connection(connection) => { + match open_channels(connection, last_connection_id).await { + Ok(own_connection) => insert_connection(own_connection), + Err(err) => { + eprintln!( + "ERROR cluster {}: trying to initialize connection to node {}: {err}", + self.cluster_name, self.node_id, + ); + if self.connection.is_none() { + self.ask_connection().await; + } + } } } + ConnOrConn::OwnConnection(own_connection) => insert_connection(own_connection), } } } @@ -322,7 +341,7 @@ impl ConnectionManager { Err(err) => { eprintln!( "WARN cluster {}: node {}: connection {}: error receiving remote message: {err}", - self.cluster_name, self.node_id, self.last_connexion_id + self.cluster_name, self.node_id, self.last_connection_id ); self.close_connection(1, b"error receiving from your stream") .await; @@ -432,30 +451,12 @@ impl ConnectionManager { } } -/// Compute the next wait Duration. -/// We're multiplying the Duration by [`TIMEOUT_FACTOR`] and cap it to [`MAX_TIMEOUT`]. -// fn next_delta(delta: Option) -> Duration { -// match delta { -// None => START_TIMEOUT, -// Some(delta) => { -// // Multiply timeout by TIMEOUT_FACTOR -// let delta = Duration::from_millis(((delta.as_millis() as f64) * TIMEOUT_FACTOR) as u64); -// // Cap to MAX_TIMEOUT -// if delta > MAX_TIMEOUT { -// MAX_TIMEOUT -// } else { -// delta -// } -// } -// } -// } - /// Open accept one stream and create one stream. /// This way, there is no need to know if we created or accepted the connection. async fn open_channels( connection: Connection, last_connexion_id: u64, -) -> Result<(OwnConnection, u64), IoError> { +) -> Result { eprintln!("DEBUG opening uni channel"); let mut output = BufWriter::new(connection.open_uni().await?); @@ -490,18 +491,41 @@ async fn open_channels( eprintln!( "DEBUG version handshake complete: last id: {last_connexion_id}, our id: {our_id}, their id: {their_id}: chosen id: {chosen_id}" ); - Ok((OwnConnection::new(connection, output, input), chosen_id)) + Ok(OwnConnection::new(connection, chosen_id, output, input)) } async fn false_recv() -> MaybeRemoteLine { Ok(None) } +const START_TIMEOUT: Duration = Duration::from_secs(5); +const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60); // 1 hour +const TIMEOUT_FACTOR: f64 = 1.5; + +// Compute the next wait Duration. +// We're multiplying the Duration by [`TIMEOUT_FACTOR`] and cap it to [`MAX_TIMEOUT`]. +fn next_delta(delta: Option) -> Duration { + match delta { + None => START_TIMEOUT, + Some(delta) => { + // Multiply timeout by TIMEOUT_FACTOR + let delta = Duration::from_millis(((delta.as_millis() as f64) * TIMEOUT_FACTOR) as u64); + // Cap to MAX_TIMEOUT + if delta > MAX_TIMEOUT { + MAX_TIMEOUT + } else { + delta + } + } + } +} + fn try_connect( cluster_name: String, remote: EndpointAddr, endpoint: Arc, - connection_tx: mpsc::Sender, + last_connection_id: u64, + connection_tx: mpsc::Sender, mut order_start: mpsc::Receiver>, ) { tokio::spawn(async move { @@ -510,43 +534,61 @@ fn try_connect( while let Some(mut order_stop) = order_start.recv().await { // Until we have a connection or we're requested to stop let mut keep_trying = true; + let mut delta = None; + while keep_trying { - eprintln!("DEBUG cluster {cluster_name}: node {node_id}: trying to connect..."); - let connect = tokio::select! { - // conn = endpoint.connect(remote.clone(), ALPN[0]) => Some(conn), - conn = endpoint.connect_with_opts(remote.clone(), ALPN[0], connect_config()) => Some(conn), - _ = order_stop.recv() => None, - }; - if let Some(connect) = connect { - match connect { - Ok(connecting) => match connecting.await { - Ok(connection) => { - eprintln!( - "DEBUG cluster {cluster_name}: node {node_id}: created connection" - ); - if let Err(err) = connection_tx.send(connection).await { + if let Some(delta) = delta { + keep_trying = tokio::select! { + _ = sleep(delta) => true, + _ = order_stop.recv() => false, + }; + } + if keep_trying { + eprintln!("DEBUG cluster {cluster_name}: node {node_id}: trying to connect..."); + let connect = tokio::select! { + // conn = endpoint.connect(remote.clone(), ALPN[0]) => Some(conn), + conn = endpoint.connect_with_opts(remote.clone(), ALPN[0], connect_config()) => Some(conn), + _ = order_stop.recv() => None, + }; + if let Some(connect) = connect { + let res = match connect { + Ok(connecting) => match connecting.await { + Ok(connection) => { eprintln!( - "DEBUG cluster {cluster_name}: node {node_id}: quitting because ConnectionManager has quit: {err}" + "DEBUG cluster {cluster_name}: node {node_id}: created connection" ); - order_start.close(); + match open_channels(connection, last_connection_id).await { + Ok(own_connection) => { + if let Err(err) = connection_tx + .send(ConnOrConn::OwnConnection(own_connection)) + .await + { + eprintln!( + "DEBUG cluster {cluster_name}: node {node_id}: quitting because ConnectionManager has quit: {err}" + ); + order_start.close(); + } + // successfully opened connection + keep_trying = false; + Ok(()) + } + Err(err) => Err(err.to_string()), + } } - keep_trying = false; - } - Err(err) => { - eprintln!( - "WARN cluster {cluster_name}: node {node_id}: while trying to connect: {err}" - ); - } - }, - Err(err) => { + Err(err) => Err(err.to_string()), + }, + Err(err) => Err(err.to_string()), + }; + if let Err(err) = res { eprintln!( "WARN cluster {cluster_name}: node {node_id}: while trying to connect: {err}" ); } + } else { + // received stop order + keep_trying = false; } - } else { - // received stop order - keep_trying = false; + delta = Some(next_delta(delta)); } } } diff --git a/plugins/reaction-plugin-cluster/src/endpoint.rs b/plugins/reaction-plugin-cluster/src/endpoint.rs index 57b1d5a..28b6594 100644 --- a/plugins/reaction-plugin-cluster/src/endpoint.rs +++ b/plugins/reaction-plugin-cluster/src/endpoint.rs @@ -1,14 +1,11 @@ use std::collections::BTreeMap; use std::sync::Arc; -use iroh::{ - Endpoint, PublicKey, - endpoint::{Connection, Incoming}, -}; +use iroh::{Endpoint, PublicKey, endpoint::Incoming}; use reaction_plugin::shutdown::ShutdownController; use tokio::sync::mpsc; -use crate::key::Show; +use crate::{connection::ConnOrConn, key::Show}; enum Break { Yes, @@ -21,7 +18,7 @@ pub struct EndpointManager { /// Cluster's name (for logging) cluster_name: String, /// Connection sender to the Connection Managers - connections_tx: BTreeMap>, + connections_tx: BTreeMap>, /// shutdown shutdown: ShutdownController, } @@ -30,7 +27,7 @@ impl EndpointManager { pub fn new( endpoint: Arc, cluster_name: String, - connections_tx: BTreeMap>, + connections_tx: BTreeMap>, shutdown: ShutdownController, ) { tokio::spawn(async move { @@ -117,7 +114,7 @@ impl EndpointManager { return Break::No; } Some(tx) => { - if let Err(_) = tx.send(connection).await { + if let Err(_) = tx.send(ConnOrConn::Connection(connection)).await { eprintln!( "DEBUG cluster {}: EndpointManager: quitting because ConnectionManager has quit", self.cluster_name,