diff --git a/plugins/reaction-plugin-cluster/src/cluster.rs b/plugins/reaction-plugin-cluster/src/cluster.rs index 9d76f25..ee31fb8 100644 --- a/plugins/reaction-plugin-cluster/src/cluster.rs +++ b/plugins/reaction-plugin-cluster/src/cluster.rs @@ -22,10 +22,11 @@ pub const ALPN: [&[u8]; 1] = ["reaction_cluster_1".as_bytes()]; pub type UtcLine = Arc<(String, DateTime)>; pub async fn bind(stream: &StreamInit) -> Result { + // FIXME higher timeouts and keep alive let mut transport = TransportConfig::default(); transport - .max_idle_timeout(Some(VarInt::from_u32(1).into())) - .keep_alive_interval(Some(Duration::from_millis(400))); + .max_idle_timeout(Some(VarInt::from_u32(2).into())) + .keep_alive_interval(Some(Duration::from_millis(200))); let mut builder = Endpoint::builder() .secret_key(stream.secret_key.clone()) diff --git a/plugins/reaction-plugin-cluster/src/connection.rs b/plugins/reaction-plugin-cluster/src/connection.rs index 13ad2c0..dc2a04c 100644 --- a/plugins/reaction-plugin-cluster/src/connection.rs +++ b/plugins/reaction-plugin-cluster/src/connection.rs @@ -1,4 +1,4 @@ -use std::{collections::VecDeque, sync::Arc}; +use std::{cmp::max, collections::VecDeque, sync::Arc}; use chrono::{DateTime, Local, TimeDelta, Utc}; use futures::FutureExt; @@ -6,6 +6,7 @@ use iroh::{ Endpoint, EndpointAddr, endpoint::{Connection, VarInt}, }; +use rand::random_range; use reaction_plugin::{Line, shutdown::ShutdownController}; use remoc::{Connect, rch::base}; use serde::{Deserialize, Serialize}; @@ -55,6 +56,8 @@ pub struct ConnectionManager { connection_rx: mpsc::Receiver, /// Our own connection (when we have one) connection: Option, + /// Last connexion ID, used to have a determinist way to choose between conflicting connections + last_connexion_id: u64, /// Max duration before we drop pending messages to a node we can't connect to. message_timeout: TimeDelta, @@ -98,6 +101,7 @@ impl ConnectionManager { ask_connection, cancel_ask_connection: None, connection_rx, + last_connexion_id: 0, message_timeout, message_rx, message_queue: VecDeque::default(), @@ -148,24 +152,12 @@ impl ConnectionManager { async fn handle_event(&mut self, event: Event) { match event { Event::ConnectionReceived(connection) => { - eprintln!( - "DEBUG cluster {}: node {}: received a connection", - self.cluster_name, self.node_id, - ); self.handle_connection(connection).await; } Event::LocalMessageReceived(utc_line) => { - eprintln!( - "DEBUG cluster {}: node {}: received a local message", - self.cluster_name, self.node_id, - ); self.handle_local_message(utc_line, Insert::Back).await; } Event::RemoteMessageReceived(message) => { - eprintln!( - "DEBUG cluster {}: node {}: received a remote message", - self.cluster_name, self.node_id, - ); self.handle_remote_message(message).await; } } @@ -201,7 +193,7 @@ impl ConnectionManager { /// Bootstrap a new Connection /// Returns true if we have a valid connection now - async fn handle_connection(&mut self, connection: Option) -> bool { + async fn handle_connection(&mut self, connection: Option) { match connection { None => { eprintln!( @@ -209,32 +201,38 @@ impl ConnectionManager { self.cluster_name, self.node_id, ); self.quit(); - false } Some(connection) => { if let Some(cancel) = self.cancel_ask_connection.take() { let _ = cancel.send(()).await; } - if self.connection.is_none() { - match open_channels(&connection).await { - Ok((tx, rx)) => { + match open_channels(&connection, self.last_connexion_id).await { + Ok((mut tx, rx, new_id)) => { + if self.connection.is_none() || self.last_connexion_id < new_id { + if let Some(mut conn) = self.connection.take() { + // FIXME should we do this in a separate task? + let _ = conn.tx.send(RemoteMessage::Quitting).await; + } self.connection = Some(OwnConnection { connection, tx, rx }); - true - } - Err(err) => { + self.last_connexion_id = new_id; + } else { eprintln!( - "ERROR cluster {}: trying to initialize connection to node {}: {err}", + "WARN cluster {}: node {}: ignoring incoming connection, as we already have a valid connection with it and our connection id is greater", self.cluster_name, self.node_id, ); - false + // FIXME should we do this in a separate task? + let _ = tx.send(RemoteMessage::Quitting).await; + } + } + Err(err) => { + eprintln!( + "ERROR cluster {}: trying to initialize connection to node {}: {err}", + self.cluster_name, self.node_id, + ); + if self.connection.is_none() { + self.ask_connection().await; } } - } else { - eprintln!( - "WARN cluster {}: node {}: ignoring incoming connection, as we already have a valid connection with it", - self.cluster_name, self.node_id, - ); - true } } } @@ -247,8 +245,8 @@ impl ConnectionManager { match message { Err(err) => { eprintln!( - "WARN cluster {}: node {}: error receiving remote message: {err}", - self.cluster_name, self.node_id, + "WARN cluster {}: node {}: connection {}: error receiving remote message: {err}", + self.cluster_name, self.node_id, self.last_connexion_id ); self.close_connection(1, b"error receiving from your stream") .await; @@ -260,7 +258,7 @@ impl ConnectionManager { ); self.close_connection(1, b"you closed your stream").await; } - Ok(Some(RemoteMessage::Version(_))) => { + Ok(Some(RemoteMessage::Handshake(_, _))) => { eprintln!( "WARN cluster {}: node {} sent invalid message, ignoring", self.cluster_name, self.node_id, @@ -297,6 +295,10 @@ impl ConnectionManager { } async fn handle_local_message(&mut self, message: Option, insert: Insert) { + eprintln!( + "DEBUG cluster {}: node {}: received a local message", + self.cluster_name, self.node_id, + ); match message { None => { eprintln!( @@ -356,6 +358,7 @@ impl ConnectionManager { } async fn ask_connection(&mut self) { + // if self.node_id.starts_with('H') { let (tx, rx) = mpsc::channel(1); self.cancel_ask_connection = Some(tx); if let Err(err) = self.ask_connection.send(rx).await { @@ -365,6 +368,7 @@ impl ConnectionManager { ); self.quit(); } + // } } fn quit(&mut self) { @@ -395,7 +399,22 @@ impl ConnectionManager { #[derive(Serialize, Deserialize)] pub enum RemoteMessage { /// Must be the first message sent over, then should not be sent again - Version(u32), + /// + /// u32 is a version number. + /// The version must be the same, or compatibility is broken. + /// + /// u64 is a connection id. + /// Each node sends a number between n and n + 1_000_000. + /// Both nodes keep the highest of both numbers. + /// n starts at 0. + /// When recreating a new connection, n starts at _last connection number_ + 1. + /// This leaves room for a minimum of 18_446_744_073_709 connections between two nodes + /// For each "shared uptime", which I hope is sufficient :p + /// Will be reset as soon as there is an instant where both nodes are down + /// + /// This protocols permits to determine on both nodes which connection to drop + /// when we have one connection and receive another. + Handshake(u32, u64), /// A line to transmit to your stream Line((String, DateTime)), /// Announce the node is closing @@ -406,7 +425,15 @@ pub enum RemoteMessage { /// This way, there is no need to know if we created or accepted the connection. async fn open_channels( connection: &Connection, -) -> Result<(base::Sender, base::Receiver), String> { + last_connexion_id: u64, +) -> Result< + ( + base::Sender, + base::Receiver, + u64, + ), + String, +> { eprintln!("DEBUG opening uni channel"); let mut output = connection.open_uni().await.map_err(|err| err.to_string())?; @@ -434,20 +461,29 @@ async fn open_channels( tokio::spawn(conn); + let our_id = random_range(last_connexion_id + 1..last_connexion_id + 1_000_000); + eprintln!("DEBUG sending version"); - tx.send(RemoteMessage::Version(PROTOCOL_VERSION)) + tx.send(RemoteMessage::Handshake(PROTOCOL_VERSION, our_id)) .await .map_err(|err| err.to_string())?; eprintln!("DEBUG receiving version"); match rx.recv().await { // Good protocol version! - Ok(Some(RemoteMessage::Version(PROTOCOL_VERSION))) => { - eprintln!("DEBUG version handshake complete!"); - Ok((tx, rx)) + Ok(Some(RemoteMessage::Handshake(PROTOCOL_VERSION, their_id))) => { + // FIXME Do we need to test this? If so, this function should return their_id even when error in order to retry better next time + // if their_id < last_connexion_id + // ERROR + // else + let chosen_id = max(our_id, their_id); + eprintln!( + "DEBUG version handshake complete: last id: {last_connexion_id}, our id: {our_id}, their id: {their_id}: chosen id: {chosen_id}" + ); + Ok((tx, rx, chosen_id)) } // Errors - Ok(Some(RemoteMessage::Version(other))) => Err(format!( + Ok(Some(RemoteMessage::Handshake(other, _))) => Err(format!( "incompatible version: {other}. We use {PROTOCOL_VERSION}. Consider upgrading the node with the older version." )), Ok(Some(RemoteMessage::Line(_))) => Err(format!(