From b667b1a3735469631662a0405ff34979c84578b9 Mon Sep 17 00:00:00 2001 From: ppom Date: Fri, 28 Nov 2025 12:00:00 +0100 Subject: [PATCH] Get rid of remoc for peer communications I couldn't understand why all communications timed out as soon as all messages are sent with a remoc RecvError::ChMux "multiplexer terminated". So I'm getting rid of remoc (for now at least) and sending/receiving raw data over the stream. For now it panics, after the handshake complete, which is already good after only one test O:D --- .../reaction-plugin-cluster/src/connection.rs | 253 ++++++++---------- 1 file changed, 118 insertions(+), 135 deletions(-) diff --git a/plugins/reaction-plugin-cluster/src/connection.rs b/plugins/reaction-plugin-cluster/src/connection.rs index dc2a04c..44f4b46 100644 --- a/plugins/reaction-plugin-cluster/src/connection.rs +++ b/plugins/reaction-plugin-cluster/src/connection.rs @@ -1,16 +1,17 @@ -use std::{cmp::max, collections::VecDeque, sync::Arc}; +use std::{cmp::max, collections::VecDeque, io::Error as IoError, sync::Arc}; -use chrono::{DateTime, Local, TimeDelta, Utc}; +use chrono::{DateTime, Local, TimeDelta, TimeZone, Utc}; use futures::FutureExt; use iroh::{ Endpoint, EndpointAddr, - endpoint::{Connection, VarInt}, + endpoint::{Connection, RecvStream, SendStream, VarInt}, }; use rand::random_range; use reaction_plugin::{Line, shutdown::ShutdownController}; -use remoc::{Connect, rch::base}; -use serde::{Deserialize, Serialize}; -use tokio::{io::AsyncWriteExt, sync::mpsc}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, + sync::mpsc, +}; use crate::{ cluster::{ALPN, UtcLine}, @@ -23,9 +24,12 @@ use crate::{ const PROTOCOL_VERSION: u32 = 1; +type RemoteLine = (String, DateTime); +type MaybeRemoteLine = Result, IoError>; + enum Event { LocalMessageReceived(Option), - RemoteMessageReceived(Result, remoc::rch::base::RecvError>), + RemoteMessageReceived(MaybeRemoteLine), ConnectionReceived(Option), } @@ -36,8 +40,73 @@ enum Insert { struct OwnConnection { connection: Connection, - tx: base::Sender, - rx: base::Receiver, + + line_tx: BufWriter, + line_rx: BufReader, + + next_time: Option>, + next_len: Option, + next_line: Option>, +} + +impl OwnConnection { + fn new( + connection: Connection, + line_tx: BufWriter, + line_rx: BufReader, + ) -> Self { + Self { + connection, + line_tx, + line_rx, + next_time: None, + next_len: None, + next_line: None, + } + } + + /// Send a line to peer + async fn send_line(&mut self, line: RemoteLine) -> Result<(), std::io::Error> { + self.line_tx.write_i64(line.1.timestamp_micros()).await?; + self.line_tx.write_u32(line.0.len() as u32).await?; + self.line_tx.write_all(line.0.as_bytes()).await?; + self.line_tx.flush().await?; + Ok(()) + } + + /// Cancel-safe function that returns next line from peer + /// Returns None if we don't have all data yet. + async fn recv_line(&mut self) -> Result)>, std::io::Error> { + if self.next_time.is_none() { + self.next_time = Some(Utc.timestamp_nanos(self.line_rx.read_i64().await?)); + } + // Ok we have next_time.is_some() + if self.next_len.is_none() { + self.next_len = Some(self.line_rx.read_u32().await? as usize); + } + // Ok we have next_len.is_some() + + if self.next_line.is_none() { + self.next_line = Some(Vec::with_capacity(self.next_len.unwrap())); + } + let actual_len = self.next_line.as_ref().unwrap().len(); + self.line_rx + .read(&mut self.next_line.as_mut().unwrap()[actual_len..self.next_len.unwrap()]) + .await?; + // Ok we have next_line.is_some(), but don't know yet + + if self.next_line.as_ref().unwrap().len() == self.next_len.unwrap() { + // Ok we have a full line + self.next_len.take(); + let line = String::try_from(self.next_line.take().unwrap()).map_err(|err| { + std::io::Error::new(std::io::ErrorKind::InvalidData, err.to_string()) + })?; + Ok(Some((line, self.next_time.take().unwrap()))) + } else { + // Ok we don't have a full line, will be next time! + Ok(None) + } + } } /// Handle a remote node. @@ -120,7 +189,7 @@ impl ConnectionManager { let maybe_conn_rx = self .connection .as_mut() - .map(|conn| conn.rx.recv().boxed()) + .map(|conn| conn.recv_line().boxed()) // This Future will never be polled because of the if in select! // It still needs to be present because the branch will be evaluated // so we can't unwrap @@ -206,22 +275,16 @@ impl ConnectionManager { if let Some(cancel) = self.cancel_ask_connection.take() { let _ = cancel.send(()).await; } - match open_channels(&connection, self.last_connexion_id).await { - Ok((mut tx, rx, new_id)) => { + match open_channels(connection, self.last_connexion_id).await { + Ok((own_connection, new_id)) => { if self.connection.is_none() || self.last_connexion_id < new_id { - if let Some(mut conn) = self.connection.take() { - // FIXME should we do this in a separate task? - let _ = conn.tx.send(RemoteMessage::Quitting).await; - } - self.connection = Some(OwnConnection { connection, tx, rx }); + self.connection = Some(own_connection); self.last_connexion_id = new_id; } else { eprintln!( "WARN cluster {}: node {}: ignoring incoming connection, as we already have a valid connection with it and our connection id is greater", self.cluster_name, self.node_id, ); - // FIXME should we do this in a separate task? - let _ = tx.send(RemoteMessage::Quitting).await; } } Err(err) => { @@ -238,10 +301,7 @@ impl ConnectionManager { } } - async fn handle_remote_message( - &mut self, - message: Result, remoc::rch::base::RecvError>, - ) { + async fn handle_remote_message(&mut self, message: MaybeRemoteLine) { match message { Err(err) => { eprintln!( @@ -258,21 +318,7 @@ impl ConnectionManager { ); self.close_connection(1, b"you closed your stream").await; } - Ok(Some(RemoteMessage::Handshake(_, _))) => { - eprintln!( - "WARN cluster {}: node {} sent invalid message, ignoring", - self.cluster_name, self.node_id, - ); - } - Ok(Some(RemoteMessage::Quitting)) => { - eprintln!( - "INFO cluster {}: node {} is quitting, bye bye", - self.cluster_name, self.node_id, - ); - self.close_connection(0, b"you said you'll quit so I quit") - .await; - } - Ok(Some(RemoteMessage::Line(line))) => { + Ok(Some(line)) => { let local_time = line.1.with_timezone(&Local); if let Err(err) = self.own_cluster_tx.send((line.0.clone(), local_time)).await { eprintln!( @@ -310,11 +356,7 @@ impl ConnectionManager { Some(message) => match &mut self.connection { Some(connection) => { if let Err(err) = connection - .tx - .send(RemoteMessage::Line(( - message.0.clone(), - message.1.with_timezone(&Utc), - ))) + .send_line((message.0.clone(), message.1.with_timezone(&Utc))) .await { eprintln!( @@ -350,8 +392,7 @@ impl ConnectionManager { } async fn close_connection(&mut self, code: u32, reason: &[u8]) { - if let Some(mut connection) = self.connection.take() { - connection.rx.close().await; + if let Some(connection) = self.connection.take() { connection.connection.close(VarInt::from_u32(code), reason); } self.ask_connection().await; @@ -394,108 +435,50 @@ impl ConnectionManager { // } // } -/// All possible communication messages -/// Set as an enum for forward compatibility -#[derive(Serialize, Deserialize)] -pub enum RemoteMessage { - /// Must be the first message sent over, then should not be sent again - /// - /// u32 is a version number. - /// The version must be the same, or compatibility is broken. - /// - /// u64 is a connection id. - /// Each node sends a number between n and n + 1_000_000. - /// Both nodes keep the highest of both numbers. - /// n starts at 0. - /// When recreating a new connection, n starts at _last connection number_ + 1. - /// This leaves room for a minimum of 18_446_744_073_709 connections between two nodes - /// For each "shared uptime", which I hope is sufficient :p - /// Will be reset as soon as there is an instant where both nodes are down - /// - /// This protocols permits to determine on both nodes which connection to drop - /// when we have one connection and receive another. - Handshake(u32, u64), - /// A line to transmit to your stream - Line((String, DateTime)), - /// Announce the node is closing - Quitting, -} - /// Open accept one stream and create one stream. /// This way, there is no need to know if we created or accepted the connection. async fn open_channels( - connection: &Connection, + connection: Connection, last_connexion_id: u64, -) -> Result< - ( - base::Sender, - base::Receiver, - u64, - ), - String, -> { +) -> Result<(OwnConnection, u64), IoError> { eprintln!("DEBUG opening uni channel"); - let mut output = connection.open_uni().await.map_err(|err| err.to_string())?; - - eprintln!("DEBUG sending 1 byte in uni channel"); - let res = output.write(&[0; 1]).await.map_err(|err| err.to_string())?; - eprintln!("DEBUG sent {res} byte in uni channel"); - output.flush().await.map_err(|err| err.to_string())?; - - eprintln!("DEBUG accepting uni channel"); - let mut input = connection - .accept_uni() - .await - .map_err(|err| err.to_string())?; - - eprintln!("DEBUG reading 1 byte from uni channel"); - input - .read(&mut [0; 1]) - .await - .map_err(|err| err.to_string())?; - - eprintln!("DEBUG creating remoc channels"); - let (conn, mut tx, mut rx) = Connect::io_buffered(remoc::Cfg::default(), input, output, 1024) - .await - .map_err(|err| err.to_string())?; - - tokio::spawn(conn); + let mut output = BufWriter::new(connection.open_uni().await?); let our_id = random_range(last_connexion_id + 1..last_connexion_id + 1_000_000); - eprintln!("DEBUG sending version"); - tx.send(RemoteMessage::Handshake(PROTOCOL_VERSION, our_id)) - .await - .map_err(|err| err.to_string())?; + eprintln!("DEBUG sending handshake in uni channel"); + output.write_u32(PROTOCOL_VERSION).await?; + output.write_u64(our_id).await?; + output.flush().await?; - eprintln!("DEBUG receiving version"); - match rx.recv().await { - // Good protocol version! - Ok(Some(RemoteMessage::Handshake(PROTOCOL_VERSION, their_id))) => { - // FIXME Do we need to test this? If so, this function should return their_id even when error in order to retry better next time - // if their_id < last_connexion_id - // ERROR - // else - let chosen_id = max(our_id, their_id); - eprintln!( - "DEBUG version handshake complete: last id: {last_connexion_id}, our id: {our_id}, their id: {their_id}: chosen id: {chosen_id}" - ); - Ok((tx, rx, chosen_id)) - } - // Errors - Ok(Some(RemoteMessage::Handshake(other, _))) => Err(format!( - "incompatible version: {other}. We use {PROTOCOL_VERSION}. Consider upgrading the node with the older version." - )), - Ok(Some(RemoteMessage::Line(_))) => Err(format!( - "incorrect protocol message: remote did not send its protocol version." - )), - Ok(Some(RemoteMessage::Quitting)) => Err("remote unexpectedly quit".into()), - Ok(None) => Err("remote unexpectedly closed its channel".into()), - Err(err) => Err(format!("could not receive message: {err}")), + eprintln!("DEBUG accepting uni channel"); + let mut input = BufReader::new(connection.accept_uni().await?); + + eprintln!("DEBUG reading handshake from uni channel"); + let their_version = input.read_u32().await?; + + if their_version != PROTOCOL_VERSION { + return Err(IoError::new( + std::io::ErrorKind::InvalidData, + format!( + "incompatible version: {their_version}. We use {PROTOCOL_VERSION}. Consider upgrading the node with the older version." + ), + )); } + + let their_id = input.read_u64().await?; + // FIXME Do we need to test this? If so, this function should return their_id even when error in order to retry better next time + // if their_id < last_connexion_id + // ERROR + // else + let chosen_id = max(our_id, their_id); + eprintln!( + "DEBUG version handshake complete: last id: {last_connexion_id}, our id: {our_id}, their id: {their_id}: chosen id: {chosen_id}" + ); + Ok((OwnConnection::new(connection, output, input), chosen_id)) } -async fn false_recv() -> Result, remoc::rch::base::RecvError> { +async fn false_recv() -> MaybeRemoteLine { Ok(None) }