Two nodes succeeded to exchange messages 🎉

Separated try_connect to another task, to prevent interblocking

Send a byte to the new stream so that the other can see the stream
and accept it.
This commit is contained in:
ppom 2025-11-26 12:00:00 +01:00
commit 3ed2ebd488
No known key found for this signature in database
2 changed files with 168 additions and 151 deletions

View file

@ -2,11 +2,15 @@ use std::{
collections::BTreeMap,
net::{SocketAddrV4, SocketAddrV6},
sync::Arc,
time::Duration,
};
use chrono::{DateTime, Local, Utc};
use futures::future::join_all;
use iroh::{Endpoint, PublicKey, endpoint::Connection};
use iroh::{
Endpoint, PublicKey,
endpoint::{Connection, TransportConfig, VarInt},
};
use reaction_plugin::{Line, shutdown::ShutdownController};
use remoc::rch::mpsc as remocMpsc;
use tokio::sync::mpsc as tokioMpsc;
@ -18,11 +22,17 @@ pub const ALPN: [&[u8]; 1] = ["reaction_cluster_1".as_bytes()];
pub type UtcLine = Arc<(String, DateTime<Utc>)>;
pub async fn bind(stream: &StreamInit) -> Result<Endpoint, String> {
let mut transport = TransportConfig::default();
transport
.max_idle_timeout(Some(VarInt::from_u32(1).into()))
.keep_alive_interval(Some(Duration::from_millis(400)));
let mut builder = Endpoint::builder()
.secret_key(stream.secret_key.clone())
.alpns(ALPN.iter().map(|slice| slice.to_vec()).collect())
.relay_mode(iroh::RelayMode::Disabled)
.clear_discovery();
.clear_discovery()
.transport_config(transport);
if let Some(ip) = stream.bind_ipv4 {
builder = builder.bind_addr_v4(SocketAddrV4::new(ip, stream.listen_port));
@ -69,13 +79,17 @@ pub fn cluster_tasks(
let (connection_endpoint2connection_txs, mut connection_endpoint2connection_rxs): (
BTreeMap<PublicKey, tokioMpsc::Sender<Connection>>,
Vec<(PublicKey, tokioMpsc::Receiver<Connection>)>,
Vec<(
PublicKey,
tokioMpsc::Sender<Connection>,
tokioMpsc::Receiver<Connection>,
)>,
) = stream
.nodes
.keys()
.map(|pk| {
let (tx, rx) = tokioMpsc::channel(1);
((pk.clone(), tx), (pk.clone(), rx))
((pk.clone(), tx.clone()), (pk.clone(), tx, rx))
})
.unzip();
@ -88,7 +102,7 @@ pub fn cluster_tasks(
);
// Spawn connection managers
while let Some((pk, connection_endpoint2connection_rx)) =
while let Some((pk, connection_endpoint2connection_tx, connection_endpoint2connection_rx)) =
connection_endpoint2connection_rxs.pop()
{
let cluster_name = stream.name.clone();
@ -102,6 +116,7 @@ pub fn cluster_tasks(
cluster_name,
endpoint_addr,
endpoint,
connection_endpoint2connection_tx,
connection_endpoint2connection_rx,
stream.message_timeout,
message_action2connection_rx,

View file

@ -1,8 +1,4 @@
use std::{
collections::VecDeque,
sync::Arc,
time::{Duration, Instant},
};
use std::{collections::VecDeque, sync::Arc};
use chrono::{DateTime, Local, TimeDelta, Utc};
use futures::FutureExt;
@ -13,21 +9,20 @@ use iroh::{
use reaction_plugin::{Line, shutdown::ShutdownController};
use remoc::{Connect, rch::base};
use serde::{Deserialize, Serialize};
use tokio::{sync::mpsc, time::sleep};
use tokio::{io::AsyncWriteExt, sync::mpsc};
use crate::{
cluster::{ALPN, UtcLine},
key::Show,
};
const START_TIMEOUT: Duration = Duration::from_secs(5);
const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60); // 1 hour
const TIMEOUT_FACTOR: f64 = 1.5;
// const START_TIMEOUT: Duration = Duration::from_secs(5);
// const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60); // 1 hour
// const TIMEOUT_FACTOR: f64 = 1.5;
const PROTOCOL_VERSION: u32 = 1;
enum Event {
Tick,
LocalMessageReceived(Option<UtcLine>),
RemoteMessageReceived(Result<Option<RemoteMessage>, remoc::rch::base::RecvError>),
ConnectionReceived(Option<Connection>),
@ -50,22 +45,17 @@ struct OwnConnection {
pub struct ConnectionManager {
/// Cluster's name (for logging)
cluster_name: String,
/// The remote node we're communicating with (for logging)
node_id: String,
/// The remote node we're communicating with
remote: EndpointAddr,
/// Endpoint
endpoint: Arc<Endpoint>,
/// Ask for a connection
ask_connection: mpsc::Sender<mpsc::Receiver<()>>,
cancel_ask_connection: Option<mpsc::Sender<()>>,
/// The EndpointManager sending us a connection (whether we asked for it or not)
connection_rx: mpsc::Receiver<Connection>,
/// Our own connection (when we have one)
connection: Option<OwnConnection>,
/// Delta we'll use next time we'll try to connect to remote
delta: Option<Duration>,
/// Next instant we'll try to connect
next_try_connect: Option<Instant>,
/// Max duration before we drop pending messages to a node we can't connect to.
message_timeout: TimeDelta,
/// Message we receive from actions
@ -85,19 +75,28 @@ impl ConnectionManager {
cluster_name: String,
remote: EndpointAddr,
endpoint: Arc<Endpoint>,
connection_tx: mpsc::Sender<Connection>,
connection_rx: mpsc::Receiver<Connection>,
message_timeout: TimeDelta,
message_rx: mpsc::Receiver<UtcLine>,
own_cluster_tx: remoc::rch::mpsc::Sender<Line>,
shutdown: ShutdownController,
) -> Self {
let node_id = remote.id.show();
let (ask_connection, order_start) = mpsc::channel(1);
try_connect(
cluster_name.clone(),
remote.clone(),
endpoint.clone(),
connection_tx,
order_start,
);
Self {
cluster_name,
remote,
endpoint,
node_id,
connection: None,
delta: None,
next_try_connect: None,
ask_connection,
cancel_ask_connection: None,
connection_rx,
message_timeout,
message_rx,
@ -109,17 +108,9 @@ impl ConnectionManager {
/// Main loop
pub async fn task(mut self) {
self.try_connect().await;
self.ask_connection().await;
loop {
eprintln!("DEBUG connection: NEW LOOP!");
let tick = sleep(if self.connection.is_none() {
self.delta.unwrap_or(START_TIMEOUT)
} else {
// Still tick when we have a connection
Duration::from_secs(60)
});
tokio::pin!(tick);
let have_connection = self.connection.is_some();
let maybe_conn_rx = self
@ -137,8 +128,6 @@ impl ConnectionManager {
_ = self.shutdown.wait() => None,
// Receive a connection from EndpointManager
conn = self.connection_rx.recv() => Some(Event::ConnectionReceived(conn)),
// Tick when we don't have a connection
_ = tick, if !have_connection => Some(Event::Tick),
// Receive remote message when we have a connection
msg = maybe_conn_rx, if have_connection => Some(Event::RemoteMessageReceived(msg)),
// Receive a message from local Actions
@ -158,74 +147,30 @@ impl ConnectionManager {
async fn handle_event(&mut self, event: Event) {
match event {
Event::Tick => {
self.handle_tick().await;
}
Event::ConnectionReceived(connection) => {
eprintln!(
"DEBUG cluster {}: node {}: received a connection",
self.cluster_name,
self.remote.id.show(),
self.cluster_name, self.node_id,
);
self.handle_connection(connection).await;
}
Event::LocalMessageReceived(utc_line) => {
eprintln!(
"DEBUG cluster {}: node {}: received a local message",
self.cluster_name,
self.remote.id.show(),
self.cluster_name, self.node_id,
);
self.handle_local_message(utc_line, Insert::Back).await;
}
Event::RemoteMessageReceived(message) => {
eprintln!(
"DEBUG cluster {}: node {}: received a remote message",
self.cluster_name,
self.remote.id.show(),
self.cluster_name, self.node_id,
);
self.handle_remote_message(message).await;
}
}
}
/// Try connecting to a remote endpoint
/// Returns true if we have a valid connection now
async fn try_connect(&mut self) -> bool {
if self.connection.is_none() {
eprintln!(
"DEBUG cluster {}: node {}: trying to connect...",
self.cluster_name,
self.remote.id.show(),
);
match self.endpoint.connect(self.remote.clone(), ALPN[0]).await {
Ok(connection) => {
eprintln!(
"DEBUG cluster {}: node {}: created connection",
self.cluster_name,
self.remote.id.show(),
);
self.handle_connection(Some(connection)).await
}
Err(err) => {
self.try_connect_error(err.to_string());
false
}
}
} else {
true
}
}
async fn handle_tick(&mut self) {
if self
.next_try_connect
.is_some_and(|next| next > Instant::now())
{
self.try_connect().await;
}
}
async fn send_queue_messages(&mut self) {
while self.connection.is_some()
&& let Some(message) = self.message_queue.pop_front()
@ -249,8 +194,7 @@ impl ConnectionManager {
if count > 0 {
eprintln!(
"DEBUG cluster {}: node {}: dropping {count} messages that reached timeout",
self.cluster_name,
self.remote.id.show(),
self.cluster_name, self.node_id,
)
}
}
@ -262,32 +206,33 @@ impl ConnectionManager {
None => {
eprintln!(
"DEBUG cluster {}: ConnectionManager {}: quitting because EndpointManager has quit",
self.cluster_name,
self.remote.id.show()
self.cluster_name, self.node_id,
);
self.quit();
false
}
Some(connection) => {
if let Some(cancel) = self.cancel_ask_connection.take() {
let _ = cancel.send(()).await;
}
if self.connection.is_none() {
self.delta = None;
self.next_try_connect = None;
match open_channels(&connection).await {
Ok((tx, rx)) => {
self.connection = Some(OwnConnection { connection, tx, rx });
true
}
Err(err) => {
self.try_connect_error(err);
eprintln!(
"ERROR cluster {}: trying to initialize connection to node {}: {err}",
self.cluster_name, self.node_id,
);
false
}
}
} else {
eprintln!(
"WARN cluster {}: ignoring incoming connection from {}, as we already have a valid connection with it",
self.cluster_name,
self.remote.id.show()
"WARN cluster {}: node {}: ignoring incoming connection, as we already have a valid connection with it",
self.cluster_name, self.node_id,
);
true
}
@ -295,24 +240,6 @@ impl ConnectionManager {
}
}
/// Update the state and log an error when bootstraping a new Connection
fn try_connect_error(&mut self, err: String) {
let delta = next_delta(self.delta);
self.next_try_connect = Some(Instant::now() + delta);
self.delta = Some(delta);
eprintln!(
"ERROR cluster {}: trying to connect to node {}: {err}",
self.cluster_name,
self.remote.id.show()
);
eprintln!(
"INFO cluster {}: retry connecting to node {} in {:?}",
self.cluster_name,
self.remote.id.show(),
self.delta
);
}
async fn handle_remote_message(
&mut self,
message: Result<Option<RemoteMessage>, remoc::rch::base::RecvError>,
@ -320,9 +247,8 @@ impl ConnectionManager {
match message {
Err(err) => {
eprintln!(
"WARN cluster {}: error receiving message from node {}: {err}",
self.cluster_name,
self.remote.id.show()
"WARN cluster {}: node {}: error receiving remote message: {err}",
self.cluster_name, self.node_id,
);
self.close_connection(1, b"error receiving from your stream")
.await;
@ -330,23 +256,20 @@ impl ConnectionManager {
Ok(None) => {
eprintln!(
"WARN cluster {}: node {} closed its stream",
self.cluster_name,
self.remote.id.show()
self.cluster_name, self.node_id,
);
self.close_connection(1, b"you closed your stream").await;
}
Ok(Some(RemoteMessage::Version(_))) => {
eprintln!(
"WARN cluster {}: node {} sent invalid message, ignoring",
self.cluster_name,
self.remote.id.show()
self.cluster_name, self.node_id,
);
}
Ok(Some(RemoteMessage::Quitting)) => {
eprintln!(
"INFO cluster {}: node {} is quitting, bye bye",
self.cluster_name,
self.remote.id.show()
self.cluster_name, self.node_id,
);
self.close_connection(0, b"you said you'll quit so I quit")
.await;
@ -366,9 +289,7 @@ impl ConnectionManager {
} else {
eprintln!(
"DEBUG cluster {}: node {}: sent a remote message to local stream: {}",
self.cluster_name,
self.remote.id.show(),
line.0
self.cluster_name, self.node_id, line.0
);
}
}
@ -396,8 +317,7 @@ impl ConnectionManager {
{
eprintln!(
"INFO cluster {}: connection with node {} failed: {err}",
self.cluster_name,
self.remote.id.show()
self.cluster_name, self.node_id,
);
self.message_queue.push_back(message);
self.close_connection(
@ -408,13 +328,15 @@ impl ConnectionManager {
} else {
eprintln!(
"DEBUG cluster {}: node {}: sent a local message to remote: {}",
self.cluster_name,
self.remote.id.show(),
message.0
self.cluster_name, self.node_id, message.0
);
}
}
None => {
eprintln!(
"DEBUG cluster {}: node {}: no connection, saving local message to send later: {}",
self.cluster_name, self.node_id, message.0
);
if let Insert::Front = insert {
self.message_queue.push_front(message);
} else {
@ -430,6 +352,19 @@ impl ConnectionManager {
connection.rx.close().await;
connection.connection.close(VarInt::from_u32(code), reason);
}
self.ask_connection().await;
}
async fn ask_connection(&mut self) {
let (tx, rx) = mpsc::channel(1);
self.cancel_ask_connection = Some(tx);
if let Err(err) = self.ask_connection.send(rx).await {
eprintln!(
"ERROR cluster {}: node {}: quitting because our connection initiater quitted: {}",
self.cluster_name, self.node_id, err
);
self.quit();
}
}
fn quit(&mut self) {
@ -439,21 +374,21 @@ impl ConnectionManager {
/// Compute the next wait Duration.
/// We're multiplying the Duration by [`TIMEOUT_FACTOR`] and cap it to [`MAX_TIMEOUT`].
fn next_delta(delta: Option<Duration>) -> Duration {
match delta {
None => START_TIMEOUT,
Some(delta) => {
// Multiply timeout by TIMEOUT_FACTOR
let delta = Duration::from_millis(((delta.as_millis() as f64) * TIMEOUT_FACTOR) as u64);
// Cap to MAX_TIMEOUT
if delta > MAX_TIMEOUT {
MAX_TIMEOUT
} else {
delta
}
}
}
}
// fn next_delta(delta: Option<Duration>) -> Duration {
// match delta {
// None => START_TIMEOUT,
// Some(delta) => {
// // Multiply timeout by TIMEOUT_FACTOR
// let delta = Duration::from_millis(((delta.as_millis() as f64) * TIMEOUT_FACTOR) as u64);
// // Cap to MAX_TIMEOUT
// if delta > MAX_TIMEOUT {
// MAX_TIMEOUT
// } else {
// delta
// }
// }
// }
// }
/// All possible communication messages
/// Set as an enum for forward compatibility
@ -472,26 +407,45 @@ pub enum RemoteMessage {
async fn open_channels(
connection: &Connection,
) -> Result<(base::Sender<RemoteMessage>, base::Receiver<RemoteMessage>), String> {
let output = connection.open_uni().await.map_err(|err| err.to_string())?;
eprintln!("DEBUG opening uni channel");
let mut output = connection.open_uni().await.map_err(|err| err.to_string())?;
let input = connection
eprintln!("DEBUG sending 1 byte in uni channel");
let res = output.write(&[0; 1]).await.map_err(|err| err.to_string())?;
eprintln!("DEBUG sent {res} byte in uni channel");
output.flush().await.map_err(|err| err.to_string())?;
eprintln!("DEBUG accepting uni channel");
let mut input = connection
.accept_uni()
.await
.map_err(|err| err.to_string())?;
eprintln!("DEBUG reading 1 byte from uni channel");
input
.read(&mut [0; 1])
.await
.map_err(|err| err.to_string())?;
eprintln!("DEBUG creating remoc channels");
let (conn, mut tx, mut rx) = Connect::io_buffered(remoc::Cfg::default(), input, output, 1024)
.await
.map_err(|err| err.to_string())?;
tokio::spawn(conn);
eprintln!("DEBUG sending version");
tx.send(RemoteMessage::Version(PROTOCOL_VERSION))
.await
.map_err(|err| err.to_string())?;
eprintln!("DEBUG receiving version");
match rx.recv().await {
// Good protocol version!
Ok(Some(RemoteMessage::Version(PROTOCOL_VERSION))) => Ok((tx, rx)),
Ok(Some(RemoteMessage::Version(PROTOCOL_VERSION))) => {
eprintln!("DEBUG version handshake complete!");
Ok((tx, rx))
}
// Errors
Ok(Some(RemoteMessage::Version(other))) => Err(format!(
"incompatible version: {other}. We use {PROTOCOL_VERSION}. Consider upgrading the node with the older version."
@ -508,3 +462,51 @@ async fn open_channels(
async fn false_recv() -> Result<Option<RemoteMessage>, remoc::rch::base::RecvError> {
Ok(None)
}
fn try_connect(
cluster_name: String,
remote: EndpointAddr,
endpoint: Arc<Endpoint>,
connection_tx: mpsc::Sender<Connection>,
mut order_start: mpsc::Receiver<mpsc::Receiver<()>>,
) {
tokio::spawn(async move {
let node_id = remote.id.show();
// Each time we receive and order
while let Some(mut order_stop) = order_start.recv().await {
// Until we have a connection or we're requested to stop
let mut keep_trying = true;
while keep_trying {
eprintln!("DEBUG cluster {cluster_name}: node {node_id}: trying to connect...");
let connect = tokio::select! {
conn = endpoint.connect(remote.clone(), ALPN[0]) => Some(conn),
_ = order_stop.recv() => None,
};
if let Some(connect) = connect {
match connect {
Ok(connection) => {
eprintln!(
"DEBUG cluster {cluster_name}: node {node_id}: created connection"
);
if let Err(err) = connection_tx.send(connection).await {
eprintln!(
"DEBUG cluster {cluster_name}: node {node_id}: quitting because ConnectionManager has quit: {err}"
);
order_start.close();
}
keep_trying = false;
}
Err(err) => {
eprintln!(
"WARN cluster {cluster_name}: node {node_id}: while trying to connect: {err}"
);
}
}
} else {
// received stop order
keep_trying = false;
}
}
}
});
}