Get rid of remoc for peer communications

I couldn't understand why all communications timed out as soon as all
messages are sent with a remoc RecvError::ChMux "multiplexer terminated".

So I'm getting rid of remoc (for now at least) and sending/receiving
raw data over the stream.

For now it panics, after the handshake complete, which is already good
after only one test O:D
This commit is contained in:
ppom 2025-11-28 12:00:00 +01:00
commit b667b1a373
No known key found for this signature in database

View file

@ -1,16 +1,17 @@
use std::{cmp::max, collections::VecDeque, sync::Arc};
use std::{cmp::max, collections::VecDeque, io::Error as IoError, sync::Arc};
use chrono::{DateTime, Local, TimeDelta, Utc};
use chrono::{DateTime, Local, TimeDelta, TimeZone, Utc};
use futures::FutureExt;
use iroh::{
Endpoint, EndpointAddr,
endpoint::{Connection, VarInt},
endpoint::{Connection, RecvStream, SendStream, VarInt},
};
use rand::random_range;
use reaction_plugin::{Line, shutdown::ShutdownController};
use remoc::{Connect, rch::base};
use serde::{Deserialize, Serialize};
use tokio::{io::AsyncWriteExt, sync::mpsc};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter},
sync::mpsc,
};
use crate::{
cluster::{ALPN, UtcLine},
@ -23,9 +24,12 @@ use crate::{
const PROTOCOL_VERSION: u32 = 1;
type RemoteLine = (String, DateTime<Utc>);
type MaybeRemoteLine = Result<Option<RemoteLine>, IoError>;
enum Event {
LocalMessageReceived(Option<UtcLine>),
RemoteMessageReceived(Result<Option<RemoteMessage>, remoc::rch::base::RecvError>),
RemoteMessageReceived(MaybeRemoteLine),
ConnectionReceived(Option<Connection>),
}
@ -36,8 +40,73 @@ enum Insert {
struct OwnConnection {
connection: Connection,
tx: base::Sender<RemoteMessage>,
rx: base::Receiver<RemoteMessage>,
line_tx: BufWriter<SendStream>,
line_rx: BufReader<RecvStream>,
next_time: Option<DateTime<Utc>>,
next_len: Option<usize>,
next_line: Option<Vec<u8>>,
}
impl OwnConnection {
fn new(
connection: Connection,
line_tx: BufWriter<SendStream>,
line_rx: BufReader<RecvStream>,
) -> Self {
Self {
connection,
line_tx,
line_rx,
next_time: None,
next_len: None,
next_line: None,
}
}
/// Send a line to peer
async fn send_line(&mut self, line: RemoteLine) -> Result<(), std::io::Error> {
self.line_tx.write_i64(line.1.timestamp_micros()).await?;
self.line_tx.write_u32(line.0.len() as u32).await?;
self.line_tx.write_all(line.0.as_bytes()).await?;
self.line_tx.flush().await?;
Ok(())
}
/// Cancel-safe function that returns next line from peer
/// Returns None if we don't have all data yet.
async fn recv_line(&mut self) -> Result<Option<(String, DateTime<Utc>)>, std::io::Error> {
if self.next_time.is_none() {
self.next_time = Some(Utc.timestamp_nanos(self.line_rx.read_i64().await?));
}
// Ok we have next_time.is_some()
if self.next_len.is_none() {
self.next_len = Some(self.line_rx.read_u32().await? as usize);
}
// Ok we have next_len.is_some()
if self.next_line.is_none() {
self.next_line = Some(Vec::with_capacity(self.next_len.unwrap()));
}
let actual_len = self.next_line.as_ref().unwrap().len();
self.line_rx
.read(&mut self.next_line.as_mut().unwrap()[actual_len..self.next_len.unwrap()])
.await?;
// Ok we have next_line.is_some(), but don't know yet
if self.next_line.as_ref().unwrap().len() == self.next_len.unwrap() {
// Ok we have a full line
self.next_len.take();
let line = String::try_from(self.next_line.take().unwrap()).map_err(|err| {
std::io::Error::new(std::io::ErrorKind::InvalidData, err.to_string())
})?;
Ok(Some((line, self.next_time.take().unwrap())))
} else {
// Ok we don't have a full line, will be next time!
Ok(None)
}
}
}
/// Handle a remote node.
@ -120,7 +189,7 @@ impl ConnectionManager {
let maybe_conn_rx = self
.connection
.as_mut()
.map(|conn| conn.rx.recv().boxed())
.map(|conn| conn.recv_line().boxed())
// This Future will never be polled because of the if in select!
// It still needs to be present because the branch will be evaluated
// so we can't unwrap
@ -206,22 +275,16 @@ impl ConnectionManager {
if let Some(cancel) = self.cancel_ask_connection.take() {
let _ = cancel.send(()).await;
}
match open_channels(&connection, self.last_connexion_id).await {
Ok((mut tx, rx, new_id)) => {
match open_channels(connection, self.last_connexion_id).await {
Ok((own_connection, new_id)) => {
if self.connection.is_none() || self.last_connexion_id < new_id {
if let Some(mut conn) = self.connection.take() {
// FIXME should we do this in a separate task?
let _ = conn.tx.send(RemoteMessage::Quitting).await;
}
self.connection = Some(OwnConnection { connection, tx, rx });
self.connection = Some(own_connection);
self.last_connexion_id = new_id;
} else {
eprintln!(
"WARN cluster {}: node {}: ignoring incoming connection, as we already have a valid connection with it and our connection id is greater",
self.cluster_name, self.node_id,
);
// FIXME should we do this in a separate task?
let _ = tx.send(RemoteMessage::Quitting).await;
}
}
Err(err) => {
@ -238,10 +301,7 @@ impl ConnectionManager {
}
}
async fn handle_remote_message(
&mut self,
message: Result<Option<RemoteMessage>, remoc::rch::base::RecvError>,
) {
async fn handle_remote_message(&mut self, message: MaybeRemoteLine) {
match message {
Err(err) => {
eprintln!(
@ -258,21 +318,7 @@ impl ConnectionManager {
);
self.close_connection(1, b"you closed your stream").await;
}
Ok(Some(RemoteMessage::Handshake(_, _))) => {
eprintln!(
"WARN cluster {}: node {} sent invalid message, ignoring",
self.cluster_name, self.node_id,
);
}
Ok(Some(RemoteMessage::Quitting)) => {
eprintln!(
"INFO cluster {}: node {} is quitting, bye bye",
self.cluster_name, self.node_id,
);
self.close_connection(0, b"you said you'll quit so I quit")
.await;
}
Ok(Some(RemoteMessage::Line(line))) => {
Ok(Some(line)) => {
let local_time = line.1.with_timezone(&Local);
if let Err(err) = self.own_cluster_tx.send((line.0.clone(), local_time)).await {
eprintln!(
@ -310,11 +356,7 @@ impl ConnectionManager {
Some(message) => match &mut self.connection {
Some(connection) => {
if let Err(err) = connection
.tx
.send(RemoteMessage::Line((
message.0.clone(),
message.1.with_timezone(&Utc),
)))
.send_line((message.0.clone(), message.1.with_timezone(&Utc)))
.await
{
eprintln!(
@ -350,8 +392,7 @@ impl ConnectionManager {
}
async fn close_connection(&mut self, code: u32, reason: &[u8]) {
if let Some(mut connection) = self.connection.take() {
connection.rx.close().await;
if let Some(connection) = self.connection.take() {
connection.connection.close(VarInt::from_u32(code), reason);
}
self.ask_connection().await;
@ -394,108 +435,50 @@ impl ConnectionManager {
// }
// }
/// All possible communication messages
/// Set as an enum for forward compatibility
#[derive(Serialize, Deserialize)]
pub enum RemoteMessage {
/// Must be the first message sent over, then should not be sent again
///
/// u32 is a version number.
/// The version must be the same, or compatibility is broken.
///
/// u64 is a connection id.
/// Each node sends a number between n and n + 1_000_000.
/// Both nodes keep the highest of both numbers.
/// n starts at 0.
/// When recreating a new connection, n starts at _last connection number_ + 1.
/// This leaves room for a minimum of 18_446_744_073_709 connections between two nodes
/// For each "shared uptime", which I hope is sufficient :p
/// Will be reset as soon as there is an instant where both nodes are down
///
/// This protocols permits to determine on both nodes which connection to drop
/// when we have one connection and receive another.
Handshake(u32, u64),
/// A line to transmit to your stream
Line((String, DateTime<Utc>)),
/// Announce the node is closing
Quitting,
}
/// Open accept one stream and create one stream.
/// This way, there is no need to know if we created or accepted the connection.
async fn open_channels(
connection: &Connection,
connection: Connection,
last_connexion_id: u64,
) -> Result<
(
base::Sender<RemoteMessage>,
base::Receiver<RemoteMessage>,
u64,
),
String,
> {
) -> Result<(OwnConnection, u64), IoError> {
eprintln!("DEBUG opening uni channel");
let mut output = connection.open_uni().await.map_err(|err| err.to_string())?;
eprintln!("DEBUG sending 1 byte in uni channel");
let res = output.write(&[0; 1]).await.map_err(|err| err.to_string())?;
eprintln!("DEBUG sent {res} byte in uni channel");
output.flush().await.map_err(|err| err.to_string())?;
eprintln!("DEBUG accepting uni channel");
let mut input = connection
.accept_uni()
.await
.map_err(|err| err.to_string())?;
eprintln!("DEBUG reading 1 byte from uni channel");
input
.read(&mut [0; 1])
.await
.map_err(|err| err.to_string())?;
eprintln!("DEBUG creating remoc channels");
let (conn, mut tx, mut rx) = Connect::io_buffered(remoc::Cfg::default(), input, output, 1024)
.await
.map_err(|err| err.to_string())?;
tokio::spawn(conn);
let mut output = BufWriter::new(connection.open_uni().await?);
let our_id = random_range(last_connexion_id + 1..last_connexion_id + 1_000_000);
eprintln!("DEBUG sending version");
tx.send(RemoteMessage::Handshake(PROTOCOL_VERSION, our_id))
.await
.map_err(|err| err.to_string())?;
eprintln!("DEBUG sending handshake in uni channel");
output.write_u32(PROTOCOL_VERSION).await?;
output.write_u64(our_id).await?;
output.flush().await?;
eprintln!("DEBUG receiving version");
match rx.recv().await {
// Good protocol version!
Ok(Some(RemoteMessage::Handshake(PROTOCOL_VERSION, their_id))) => {
// FIXME Do we need to test this? If so, this function should return their_id even when error in order to retry better next time
// if their_id < last_connexion_id
// ERROR
// else
let chosen_id = max(our_id, their_id);
eprintln!(
"DEBUG version handshake complete: last id: {last_connexion_id}, our id: {our_id}, their id: {their_id}: chosen id: {chosen_id}"
);
Ok((tx, rx, chosen_id))
}
// Errors
Ok(Some(RemoteMessage::Handshake(other, _))) => Err(format!(
"incompatible version: {other}. We use {PROTOCOL_VERSION}. Consider upgrading the node with the older version."
)),
Ok(Some(RemoteMessage::Line(_))) => Err(format!(
"incorrect protocol message: remote did not send its protocol version."
)),
Ok(Some(RemoteMessage::Quitting)) => Err("remote unexpectedly quit".into()),
Ok(None) => Err("remote unexpectedly closed its channel".into()),
Err(err) => Err(format!("could not receive message: {err}")),
eprintln!("DEBUG accepting uni channel");
let mut input = BufReader::new(connection.accept_uni().await?);
eprintln!("DEBUG reading handshake from uni channel");
let their_version = input.read_u32().await?;
if their_version != PROTOCOL_VERSION {
return Err(IoError::new(
std::io::ErrorKind::InvalidData,
format!(
"incompatible version: {their_version}. We use {PROTOCOL_VERSION}. Consider upgrading the node with the older version."
),
));
}
let their_id = input.read_u64().await?;
// FIXME Do we need to test this? If so, this function should return their_id even when error in order to retry better next time
// if their_id < last_connexion_id
// ERROR
// else
let chosen_id = max(our_id, their_id);
eprintln!(
"DEBUG version handshake complete: last id: {last_connexion_id}, our id: {our_id}, their id: {their_id}: chosen id: {chosen_id}"
);
Ok((OwnConnection::new(connection, output, input), chosen_id))
}
async fn false_recv() -> Result<Option<RemoteMessage>, remoc::rch::base::RecvError> {
async fn false_recv() -> MaybeRemoteLine {
Ok(None)
}