Connections have ids, to fix simultaneous connections races

This commit is contained in:
ppom 2025-11-28 12:00:00 +01:00
commit 83ac520d27
No known key found for this signature in database
2 changed files with 78 additions and 41 deletions

View file

@ -22,10 +22,11 @@ pub const ALPN: [&[u8]; 1] = ["reaction_cluster_1".as_bytes()];
pub type UtcLine = Arc<(String, DateTime<Utc>)>;
pub async fn bind(stream: &StreamInit) -> Result<Endpoint, String> {
// FIXME higher timeouts and keep alive
let mut transport = TransportConfig::default();
transport
.max_idle_timeout(Some(VarInt::from_u32(1).into()))
.keep_alive_interval(Some(Duration::from_millis(400)));
.max_idle_timeout(Some(VarInt::from_u32(2).into()))
.keep_alive_interval(Some(Duration::from_millis(200)));
let mut builder = Endpoint::builder()
.secret_key(stream.secret_key.clone())

View file

@ -1,4 +1,4 @@
use std::{collections::VecDeque, sync::Arc};
use std::{cmp::max, collections::VecDeque, sync::Arc};
use chrono::{DateTime, Local, TimeDelta, Utc};
use futures::FutureExt;
@ -6,6 +6,7 @@ use iroh::{
Endpoint, EndpointAddr,
endpoint::{Connection, VarInt},
};
use rand::random_range;
use reaction_plugin::{Line, shutdown::ShutdownController};
use remoc::{Connect, rch::base};
use serde::{Deserialize, Serialize};
@ -55,6 +56,8 @@ pub struct ConnectionManager {
connection_rx: mpsc::Receiver<Connection>,
/// Our own connection (when we have one)
connection: Option<OwnConnection>,
/// Last connexion ID, used to have a determinist way to choose between conflicting connections
last_connexion_id: u64,
/// Max duration before we drop pending messages to a node we can't connect to.
message_timeout: TimeDelta,
@ -98,6 +101,7 @@ impl ConnectionManager {
ask_connection,
cancel_ask_connection: None,
connection_rx,
last_connexion_id: 0,
message_timeout,
message_rx,
message_queue: VecDeque::default(),
@ -148,24 +152,12 @@ impl ConnectionManager {
async fn handle_event(&mut self, event: Event) {
match event {
Event::ConnectionReceived(connection) => {
eprintln!(
"DEBUG cluster {}: node {}: received a connection",
self.cluster_name, self.node_id,
);
self.handle_connection(connection).await;
}
Event::LocalMessageReceived(utc_line) => {
eprintln!(
"DEBUG cluster {}: node {}: received a local message",
self.cluster_name, self.node_id,
);
self.handle_local_message(utc_line, Insert::Back).await;
}
Event::RemoteMessageReceived(message) => {
eprintln!(
"DEBUG cluster {}: node {}: received a remote message",
self.cluster_name, self.node_id,
);
self.handle_remote_message(message).await;
}
}
@ -201,7 +193,7 @@ impl ConnectionManager {
/// Bootstrap a new Connection
/// Returns true if we have a valid connection now
async fn handle_connection(&mut self, connection: Option<Connection>) -> bool {
async fn handle_connection(&mut self, connection: Option<Connection>) {
match connection {
None => {
eprintln!(
@ -209,32 +201,38 @@ impl ConnectionManager {
self.cluster_name, self.node_id,
);
self.quit();
false
}
Some(connection) => {
if let Some(cancel) = self.cancel_ask_connection.take() {
let _ = cancel.send(()).await;
}
if self.connection.is_none() {
match open_channels(&connection).await {
Ok((tx, rx)) => {
match open_channels(&connection, self.last_connexion_id).await {
Ok((mut tx, rx, new_id)) => {
if self.connection.is_none() || self.last_connexion_id < new_id {
if let Some(mut conn) = self.connection.take() {
// FIXME should we do this in a separate task?
let _ = conn.tx.send(RemoteMessage::Quitting).await;
}
self.connection = Some(OwnConnection { connection, tx, rx });
true
}
Err(err) => {
self.last_connexion_id = new_id;
} else {
eprintln!(
"ERROR cluster {}: trying to initialize connection to node {}: {err}",
"WARN cluster {}: node {}: ignoring incoming connection, as we already have a valid connection with it and our connection id is greater",
self.cluster_name, self.node_id,
);
false
// FIXME should we do this in a separate task?
let _ = tx.send(RemoteMessage::Quitting).await;
}
}
Err(err) => {
eprintln!(
"ERROR cluster {}: trying to initialize connection to node {}: {err}",
self.cluster_name, self.node_id,
);
if self.connection.is_none() {
self.ask_connection().await;
}
}
} else {
eprintln!(
"WARN cluster {}: node {}: ignoring incoming connection, as we already have a valid connection with it",
self.cluster_name, self.node_id,
);
true
}
}
}
@ -247,8 +245,8 @@ impl ConnectionManager {
match message {
Err(err) => {
eprintln!(
"WARN cluster {}: node {}: error receiving remote message: {err}",
self.cluster_name, self.node_id,
"WARN cluster {}: node {}: connection {}: error receiving remote message: {err}",
self.cluster_name, self.node_id, self.last_connexion_id
);
self.close_connection(1, b"error receiving from your stream")
.await;
@ -260,7 +258,7 @@ impl ConnectionManager {
);
self.close_connection(1, b"you closed your stream").await;
}
Ok(Some(RemoteMessage::Version(_))) => {
Ok(Some(RemoteMessage::Handshake(_, _))) => {
eprintln!(
"WARN cluster {}: node {} sent invalid message, ignoring",
self.cluster_name, self.node_id,
@ -297,6 +295,10 @@ impl ConnectionManager {
}
async fn handle_local_message(&mut self, message: Option<UtcLine>, insert: Insert) {
eprintln!(
"DEBUG cluster {}: node {}: received a local message",
self.cluster_name, self.node_id,
);
match message {
None => {
eprintln!(
@ -356,6 +358,7 @@ impl ConnectionManager {
}
async fn ask_connection(&mut self) {
// if self.node_id.starts_with('H') {
let (tx, rx) = mpsc::channel(1);
self.cancel_ask_connection = Some(tx);
if let Err(err) = self.ask_connection.send(rx).await {
@ -365,6 +368,7 @@ impl ConnectionManager {
);
self.quit();
}
// }
}
fn quit(&mut self) {
@ -395,7 +399,22 @@ impl ConnectionManager {
#[derive(Serialize, Deserialize)]
pub enum RemoteMessage {
/// Must be the first message sent over, then should not be sent again
Version(u32),
///
/// u32 is a version number.
/// The version must be the same, or compatibility is broken.
///
/// u64 is a connection id.
/// Each node sends a number between n and n + 1_000_000.
/// Both nodes keep the highest of both numbers.
/// n starts at 0.
/// When recreating a new connection, n starts at _last connection number_ + 1.
/// This leaves room for a minimum of 18_446_744_073_709 connections between two nodes
/// For each "shared uptime", which I hope is sufficient :p
/// Will be reset as soon as there is an instant where both nodes are down
///
/// This protocols permits to determine on both nodes which connection to drop
/// when we have one connection and receive another.
Handshake(u32, u64),
/// A line to transmit to your stream
Line((String, DateTime<Utc>)),
/// Announce the node is closing
@ -406,7 +425,15 @@ pub enum RemoteMessage {
/// This way, there is no need to know if we created or accepted the connection.
async fn open_channels(
connection: &Connection,
) -> Result<(base::Sender<RemoteMessage>, base::Receiver<RemoteMessage>), String> {
last_connexion_id: u64,
) -> Result<
(
base::Sender<RemoteMessage>,
base::Receiver<RemoteMessage>,
u64,
),
String,
> {
eprintln!("DEBUG opening uni channel");
let mut output = connection.open_uni().await.map_err(|err| err.to_string())?;
@ -434,20 +461,29 @@ async fn open_channels(
tokio::spawn(conn);
let our_id = random_range(last_connexion_id + 1..last_connexion_id + 1_000_000);
eprintln!("DEBUG sending version");
tx.send(RemoteMessage::Version(PROTOCOL_VERSION))
tx.send(RemoteMessage::Handshake(PROTOCOL_VERSION, our_id))
.await
.map_err(|err| err.to_string())?;
eprintln!("DEBUG receiving version");
match rx.recv().await {
// Good protocol version!
Ok(Some(RemoteMessage::Version(PROTOCOL_VERSION))) => {
eprintln!("DEBUG version handshake complete!");
Ok((tx, rx))
Ok(Some(RemoteMessage::Handshake(PROTOCOL_VERSION, their_id))) => {
// FIXME Do we need to test this? If so, this function should return their_id even when error in order to retry better next time
// if their_id < last_connexion_id
// ERROR
// else
let chosen_id = max(our_id, their_id);
eprintln!(
"DEBUG version handshake complete: last id: {last_connexion_id}, our id: {our_id}, their id: {their_id}: chosen id: {chosen_id}"
);
Ok((tx, rx, chosen_id))
}
// Errors
Ok(Some(RemoteMessage::Version(other))) => Err(format!(
Ok(Some(RemoteMessage::Handshake(other, _))) => Err(format!(
"incompatible version: {other}. We use {PROTOCOL_VERSION}. Consider upgrading the node with the older version."
)),
Ok(Some(RemoteMessage::Line(_))) => Err(format!(