cluster: try_connect opens the channels and handshakes itself

This fixes a deadlock where each node is initiating a connection
and therefore unable to accept an incoming connection.

connection_rx can now be either a raw connection or an initialized connection.
cluster startup has been refactored to take this into account and make
ConnectionManager create this channel itself.
This commit is contained in:
ppom 2025-12-08 12:00:00 +01:00
commit fbf8c24e31
No known key found for this signature in database
3 changed files with 165 additions and 147 deletions

View file

@ -7,8 +7,8 @@ use std::{
use futures::future::join_all;
use iroh::{
Endpoint, PublicKey,
endpoint::{ConnectOptions, Connection, TransportConfig},
Endpoint,
endpoint::{ConnectOptions, TransportConfig},
};
use reaction_plugin::{Line, shutdown::ShutdownController};
use remoc::rch::mpsc as remocMpsc;
@ -84,21 +84,27 @@ pub fn cluster_tasks(
let endpoint = Arc::new(endpoint);
let (connection_endpoint2connection_txs, mut connection_endpoint2connection_rxs): (
BTreeMap<PublicKey, tokioMpsc::Sender<Connection>>,
Vec<(
PublicKey,
tokioMpsc::Sender<Connection>,
tokioMpsc::Receiver<Connection>,
)>,
) = stream
.nodes
.keys()
.map(|pk| {
let (tx, rx) = tokioMpsc::channel(1);
((pk.clone(), tx.clone()), (pk.clone(), tx, rx))
})
.unzip();
let mut connection_endpoint2connection_txs = BTreeMap::new();
// Spawn connection managers
while let Some((pk, endpoint_addr)) = stream.nodes.pop_first() {
let cluster_name = stream.name.clone();
let endpoint = endpoint.clone();
let message_action2connection_rx = message_action2connection_rxs.pop().unwrap();
let stream_tx = stream.tx.clone();
let shutdown = shutdown.clone();
let (connection_manager, connection_endpoint2connection_tx) = ConnectionManager::new(
cluster_name,
endpoint_addr,
endpoint,
stream.message_timeout,
message_action2connection_rx,
stream_tx,
shutdown,
);
tokio::spawn(async move { connection_manager.task().await });
connection_endpoint2connection_txs.insert(pk, connection_endpoint2connection_tx);
}
// Spawn connection accepter
EndpointManager::new(
@ -108,33 +114,6 @@ pub fn cluster_tasks(
shutdown.clone(),
);
// Spawn connection managers
while let Some((pk, connection_endpoint2connection_tx, connection_endpoint2connection_rx)) =
connection_endpoint2connection_rxs.pop()
{
let cluster_name = stream.name.clone();
let endpoint_addr = stream.nodes.remove(&pk).unwrap();
let endpoint = endpoint.clone();
let message_action2connection_rx = message_action2connection_rxs.pop().unwrap();
let stream_tx = stream.tx.clone();
let shutdown = shutdown.clone();
tokio::spawn(async move {
ConnectionManager::new(
cluster_name,
endpoint_addr,
endpoint,
connection_endpoint2connection_tx,
connection_endpoint2connection_rx,
stream.message_timeout,
message_action2connection_rx,
stream_tx,
shutdown,
)
.task()
.await
});
}
eprintln!("DEBUG cluster tasks finished running");
}

View file

@ -16,6 +16,7 @@ use reaction_plugin::{Line, shutdown::ShutdownController};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter},
sync::mpsc,
time::sleep,
};
use crate::{
@ -23,10 +24,6 @@ use crate::{
key::Show,
};
// const START_TIMEOUT: Duration = Duration::from_secs(5);
// const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60); // 1 hour
// const TIMEOUT_FACTOR: f64 = 1.5;
const PROTOCOL_VERSION: u32 = 1;
type RemoteLine = (String, Duration);
@ -35,7 +32,7 @@ type MaybeRemoteLine = Result<Option<RemoteLine>, IoError>;
enum Event {
LocalMessageReceived(Option<UtcLine>),
RemoteMessageReceived(MaybeRemoteLine),
ConnectionReceived(Option<Connection>),
ConnectionReceived(Option<ConnOrConn>),
}
enum Insert {
@ -43,8 +40,9 @@ enum Insert {
Back,
}
struct OwnConnection {
pub struct OwnConnection {
connection: Connection,
id: u64,
line_tx: BufWriter<SendStream>,
line_rx: BufReader<RecvStream>,
@ -57,11 +55,13 @@ struct OwnConnection {
impl OwnConnection {
fn new(
connection: Connection,
id: u64,
line_tx: BufWriter<SendStream>,
line_rx: BufReader<RecvStream>,
) -> Self {
Self {
connection,
id,
line_tx,
line_rx,
next_time: None,
@ -125,6 +125,11 @@ impl OwnConnection {
}
}
pub enum ConnOrConn {
Connection(Connection),
OwnConnection(OwnConnection),
}
/// Handle a remote node.
/// Manage reception and sending of messages to this node.
/// Retry failed connections.
@ -138,11 +143,11 @@ pub struct ConnectionManager {
ask_connection: mpsc::Sender<mpsc::Receiver<()>>,
cancel_ask_connection: Option<mpsc::Sender<()>>,
/// The EndpointManager sending us a connection (whether we asked for it or not)
connection_rx: mpsc::Receiver<Connection>,
connection_rx: mpsc::Receiver<ConnOrConn>,
/// Our own connection (when we have one)
connection: Option<OwnConnection>,
/// Last connexion ID, used to have a determinist way to choose between conflicting connections
last_connexion_id: u64,
last_connection_id: u64,
/// Max duration before we drop pending messages to a node we can't connect to.
message_timeout: Duration,
@ -163,44 +168,45 @@ impl ConnectionManager {
cluster_name: String,
remote: EndpointAddr,
endpoint: Arc<Endpoint>,
connection_tx: mpsc::Sender<Connection>,
connection_rx: mpsc::Receiver<Connection>,
message_timeout: Duration,
message_rx: mpsc::Receiver<UtcLine>,
own_cluster_tx: remoc::rch::mpsc::Sender<Line>,
shutdown: ShutdownController,
) -> Self {
) -> (Self, mpsc::Sender<ConnOrConn>) {
let node_id = remote.id.show();
let (connection_tx, connection_rx) = mpsc::channel(1);
let (ask_connection, order_start) = mpsc::channel(1);
try_connect(
cluster_name.clone(),
remote.clone(),
endpoint.clone(),
connection_tx,
0,
connection_tx.clone(),
order_start,
);
Self {
cluster_name,
node_id,
connection: None,
ask_connection,
cancel_ask_connection: None,
connection_rx,
last_connexion_id: 0,
message_timeout,
message_rx,
message_queue: VecDeque::default(),
own_cluster_tx,
shutdown,
}
(
Self {
cluster_name,
node_id,
connection: None,
ask_connection,
cancel_ask_connection: None,
connection_rx,
last_connection_id: 0,
message_timeout,
message_rx,
message_queue: VecDeque::default(),
own_cluster_tx,
shutdown,
},
connection_tx,
)
}
/// Main loop
pub async fn task(mut self) {
self.ask_connection().await;
loop {
eprintln!("DEBUG connection: NEW LOOP!");
let have_connection = self.connection.is_some();
let maybe_conn_rx = self
.connection
@ -278,7 +284,7 @@ impl ConnectionManager {
/// Bootstrap a new Connection
/// Returns true if we have a valid connection now
async fn handle_connection(&mut self, connection: Option<Connection>) {
async fn handle_connection(&mut self, connection: Option<ConnOrConn>) {
match connection {
None => {
eprintln!(
@ -291,27 +297,40 @@ impl ConnectionManager {
if let Some(cancel) = self.cancel_ask_connection.take() {
let _ = cancel.send(()).await;
}
match open_channels(connection, self.last_connexion_id).await {
Ok((own_connection, new_id)) => {
if self.connection.is_none() || self.last_connexion_id < new_id {
self.connection = Some(own_connection);
self.last_connexion_id = new_id;
} else {
eprintln!(
"WARN cluster {}: node {}: ignoring incoming connection, as we already have a valid connection with it and our connection id is greater",
self.cluster_name, self.node_id,
);
}
}
Err(err) => {
let last_connection_id = self.last_connection_id;
let mut insert_connection = |own_connection: OwnConnection| {
if self
.connection
.as_ref()
.is_none_or(|old_own| old_own.id < own_connection.id)
{
self.last_connection_id = own_connection.id;
self.connection = Some(own_connection);
} else {
eprintln!(
"ERROR cluster {}: trying to initialize connection to node {}: {err}",
"WARN cluster {}: node {}: ignoring incoming connection, as we already have a valid connection with it and our connection id is greater",
self.cluster_name, self.node_id,
);
if self.connection.is_none() {
self.ask_connection().await;
}
};
match connection {
ConnOrConn::Connection(connection) => {
match open_channels(connection, last_connection_id).await {
Ok(own_connection) => insert_connection(own_connection),
Err(err) => {
eprintln!(
"ERROR cluster {}: trying to initialize connection to node {}: {err}",
self.cluster_name, self.node_id,
);
if self.connection.is_none() {
self.ask_connection().await;
}
}
}
}
ConnOrConn::OwnConnection(own_connection) => insert_connection(own_connection),
}
}
}
@ -322,7 +341,7 @@ impl ConnectionManager {
Err(err) => {
eprintln!(
"WARN cluster {}: node {}: connection {}: error receiving remote message: {err}",
self.cluster_name, self.node_id, self.last_connexion_id
self.cluster_name, self.node_id, self.last_connection_id
);
self.close_connection(1, b"error receiving from your stream")
.await;
@ -432,30 +451,12 @@ impl ConnectionManager {
}
}
/// Compute the next wait Duration.
/// We're multiplying the Duration by [`TIMEOUT_FACTOR`] and cap it to [`MAX_TIMEOUT`].
// fn next_delta(delta: Option<Duration>) -> Duration {
// match delta {
// None => START_TIMEOUT,
// Some(delta) => {
// // Multiply timeout by TIMEOUT_FACTOR
// let delta = Duration::from_millis(((delta.as_millis() as f64) * TIMEOUT_FACTOR) as u64);
// // Cap to MAX_TIMEOUT
// if delta > MAX_TIMEOUT {
// MAX_TIMEOUT
// } else {
// delta
// }
// }
// }
// }
/// Open accept one stream and create one stream.
/// This way, there is no need to know if we created or accepted the connection.
async fn open_channels(
connection: Connection,
last_connexion_id: u64,
) -> Result<(OwnConnection, u64), IoError> {
) -> Result<OwnConnection, IoError> {
eprintln!("DEBUG opening uni channel");
let mut output = BufWriter::new(connection.open_uni().await?);
@ -490,18 +491,41 @@ async fn open_channels(
eprintln!(
"DEBUG version handshake complete: last id: {last_connexion_id}, our id: {our_id}, their id: {their_id}: chosen id: {chosen_id}"
);
Ok((OwnConnection::new(connection, output, input), chosen_id))
Ok(OwnConnection::new(connection, chosen_id, output, input))
}
async fn false_recv() -> MaybeRemoteLine {
Ok(None)
}
const START_TIMEOUT: Duration = Duration::from_secs(5);
const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60); // 1 hour
const TIMEOUT_FACTOR: f64 = 1.5;
// Compute the next wait Duration.
// We're multiplying the Duration by [`TIMEOUT_FACTOR`] and cap it to [`MAX_TIMEOUT`].
fn next_delta(delta: Option<Duration>) -> Duration {
match delta {
None => START_TIMEOUT,
Some(delta) => {
// Multiply timeout by TIMEOUT_FACTOR
let delta = Duration::from_millis(((delta.as_millis() as f64) * TIMEOUT_FACTOR) as u64);
// Cap to MAX_TIMEOUT
if delta > MAX_TIMEOUT {
MAX_TIMEOUT
} else {
delta
}
}
}
}
fn try_connect(
cluster_name: String,
remote: EndpointAddr,
endpoint: Arc<Endpoint>,
connection_tx: mpsc::Sender<Connection>,
last_connection_id: u64,
connection_tx: mpsc::Sender<ConnOrConn>,
mut order_start: mpsc::Receiver<mpsc::Receiver<()>>,
) {
tokio::spawn(async move {
@ -510,43 +534,61 @@ fn try_connect(
while let Some(mut order_stop) = order_start.recv().await {
// Until we have a connection or we're requested to stop
let mut keep_trying = true;
let mut delta = None;
while keep_trying {
eprintln!("DEBUG cluster {cluster_name}: node {node_id}: trying to connect...");
let connect = tokio::select! {
// conn = endpoint.connect(remote.clone(), ALPN[0]) => Some(conn),
conn = endpoint.connect_with_opts(remote.clone(), ALPN[0], connect_config()) => Some(conn),
_ = order_stop.recv() => None,
};
if let Some(connect) = connect {
match connect {
Ok(connecting) => match connecting.await {
Ok(connection) => {
eprintln!(
"DEBUG cluster {cluster_name}: node {node_id}: created connection"
);
if let Err(err) = connection_tx.send(connection).await {
if let Some(delta) = delta {
keep_trying = tokio::select! {
_ = sleep(delta) => true,
_ = order_stop.recv() => false,
};
}
if keep_trying {
eprintln!("DEBUG cluster {cluster_name}: node {node_id}: trying to connect...");
let connect = tokio::select! {
// conn = endpoint.connect(remote.clone(), ALPN[0]) => Some(conn),
conn = endpoint.connect_with_opts(remote.clone(), ALPN[0], connect_config()) => Some(conn),
_ = order_stop.recv() => None,
};
if let Some(connect) = connect {
let res = match connect {
Ok(connecting) => match connecting.await {
Ok(connection) => {
eprintln!(
"DEBUG cluster {cluster_name}: node {node_id}: quitting because ConnectionManager has quit: {err}"
"DEBUG cluster {cluster_name}: node {node_id}: created connection"
);
order_start.close();
match open_channels(connection, last_connection_id).await {
Ok(own_connection) => {
if let Err(err) = connection_tx
.send(ConnOrConn::OwnConnection(own_connection))
.await
{
eprintln!(
"DEBUG cluster {cluster_name}: node {node_id}: quitting because ConnectionManager has quit: {err}"
);
order_start.close();
}
// successfully opened connection
keep_trying = false;
Ok(())
}
Err(err) => Err(err.to_string()),
}
}
keep_trying = false;
}
Err(err) => {
eprintln!(
"WARN cluster {cluster_name}: node {node_id}: while trying to connect: {err}"
);
}
},
Err(err) => {
Err(err) => Err(err.to_string()),
},
Err(err) => Err(err.to_string()),
};
if let Err(err) = res {
eprintln!(
"WARN cluster {cluster_name}: node {node_id}: while trying to connect: {err}"
);
}
} else {
// received stop order
keep_trying = false;
}
} else {
// received stop order
keep_trying = false;
delta = Some(next_delta(delta));
}
}
}

View file

@ -1,14 +1,11 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use iroh::{
Endpoint, PublicKey,
endpoint::{Connection, Incoming},
};
use iroh::{Endpoint, PublicKey, endpoint::Incoming};
use reaction_plugin::shutdown::ShutdownController;
use tokio::sync::mpsc;
use crate::key::Show;
use crate::{connection::ConnOrConn, key::Show};
enum Break {
Yes,
@ -21,7 +18,7 @@ pub struct EndpointManager {
/// Cluster's name (for logging)
cluster_name: String,
/// Connection sender to the Connection Managers
connections_tx: BTreeMap<PublicKey, mpsc::Sender<Connection>>,
connections_tx: BTreeMap<PublicKey, mpsc::Sender<ConnOrConn>>,
/// shutdown
shutdown: ShutdownController,
}
@ -30,7 +27,7 @@ impl EndpointManager {
pub fn new(
endpoint: Arc<Endpoint>,
cluster_name: String,
connections_tx: BTreeMap<PublicKey, mpsc::Sender<Connection>>,
connections_tx: BTreeMap<PublicKey, mpsc::Sender<ConnOrConn>>,
shutdown: ShutdownController,
) {
tokio::spawn(async move {
@ -117,7 +114,7 @@ impl EndpointManager {
return Break::No;
}
Some(tx) => {
if let Err(_) = tx.send(connection).await {
if let Err(_) = tx.send(ConnOrConn::Connection(connection)).await {
eprintln!(
"DEBUG cluster {}: EndpointManager: quitting because ConnectionManager has quit",
self.cluster_name,