From 2095009fa96cc734eaa10bda23764dbad93c520a Mon Sep 17 00:00:00 2001 From: ppom Date: Mon, 15 Dec 2025 12:00:00 +0100 Subject: [PATCH] cluster: use treedb for message queue persistance --- Cargo.lock | 1 + plugins/reaction-plugin-cluster/Cargo.toml | 1 + .../reaction-plugin-cluster/src/cluster.rs | 24 ++- .../reaction-plugin-cluster/src/connection.rs | 153 +++++++++++------- plugins/reaction-plugin-cluster/src/main.rs | 29 ++-- 5 files changed, 129 insertions(+), 79 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e8cd50e..0441c18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2871,6 +2871,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "treedb", ] [[package]] diff --git a/plugins/reaction-plugin-cluster/Cargo.toml b/plugins/reaction-plugin-cluster/Cargo.toml index fe78995..8854a52 100644 --- a/plugins/reaction-plugin-cluster/Cargo.toml +++ b/plugins/reaction-plugin-cluster/Cargo.toml @@ -13,6 +13,7 @@ serde.workspace = true serde_json.workspace = true tokio.workspace = true tokio.features = ["rt-multi-thread"] +treedb.workspace = true data-encoding = "2.9.0" iroh = { version = "0.94.0", default-features = false } diff --git a/plugins/reaction-plugin-cluster/src/cluster.rs b/plugins/reaction-plugin-cluster/src/cluster.rs index ba7c33b..29a13e6 100644 --- a/plugins/reaction-plugin-cluster/src/cluster.rs +++ b/plugins/reaction-plugin-cluster/src/cluster.rs @@ -2,7 +2,7 @@ use std::{ collections::BTreeMap, net::{SocketAddrV4, SocketAddrV6}, sync::Arc, - time::{Duration, SystemTime, UNIX_EPOCH}, + time::Duration, }; use futures::future::join_all; @@ -13,12 +13,16 @@ use iroh::{ use reaction_plugin::{Line, shutdown::ShutdownController}; use remoc::rch::mpsc as remocMpsc; use tokio::sync::mpsc as tokioMpsc; +use treedb::{ + Database, + time::{Time, now}, +}; use crate::{ActionInit, StreamInit, connection::ConnectionManager, endpoint::EndpointManager}; pub const ALPN: [&[u8]; 1] = ["reaction_cluster_1".as_bytes()]; -pub type UtcLine = Arc<(String, Duration)>; +pub type UtcLine = (Arc, Time); pub fn transport_config() -> TransportConfig { // FIXME higher timeouts and keep alive @@ -56,12 +60,13 @@ pub async fn bind(stream: &StreamInit) -> Result { }) } -pub fn cluster_tasks( +pub async fn cluster_tasks( endpoint: Endpoint, mut stream: StreamInit, mut actions: Vec, + db: &mut Database, shutdown: ShutdownController, -) { +) -> Result<(), String> { eprintln!("DEBUG cluster tasks starts running"); let (message_action2connection_txs, mut message_action2connection_rxs): ( @@ -100,8 +105,10 @@ pub fn cluster_tasks( stream.message_timeout, message_action2connection_rx, stream_tx, + db, shutdown, - ); + ) + .await?; tokio::spawn(async move { connection_manager.task().await }); connection_endpoint2connection_txs.insert(pk, connection_endpoint2connection_tx); } @@ -115,6 +122,7 @@ pub fn cluster_tasks( ); eprintln!("DEBUG cluster tasks finished running"); + Ok(()) } impl ActionInit { @@ -127,15 +135,15 @@ impl ActionInit { while let Ok(Some(m)) = self.rx.recv().await { eprintln!("DEBUG action: received a message to send to connections"); let line = self.send.line(m.match_); - let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + let now = now(); if self.self_ - && let Err(err) = own_stream_tx.send((line.clone(), now.clone())).await + && let Err(err) = own_stream_tx.send((line.clone(), now.into())).await { eprintln!("ERROR while queueing message to be sent to own cluster stream: {err}"); } // TODO to_utc - let line = Arc::new((line, now)); + let line = (Arc::new(line), now); for result in join_all(nodes_tx.iter().map(|tx| tx.send(line.clone()))).await { if let Err(err) = result { eprintln!("ERROR while queueing message to be sent to cluster nodes: {err}"); diff --git a/plugins/reaction-plugin-cluster/src/connection.rs b/plugins/reaction-plugin-cluster/src/connection.rs index 45d77af..c9da512 100644 --- a/plugins/reaction-plugin-cluster/src/connection.rs +++ b/plugins/reaction-plugin-cluster/src/connection.rs @@ -1,10 +1,4 @@ -use std::{ - cmp::max, - collections::VecDeque, - io::Error as IoError, - sync::Arc, - time::{Duration, SystemTime, UNIX_EPOCH}, -}; +use std::{cmp::max, io::Error as IoError, sync::Arc, time::Duration}; use futures::FutureExt; use iroh::{ @@ -18,6 +12,11 @@ use tokio::{ sync::mpsc, time::sleep, }; +use treedb::{ + Database, Tree, + helpers::{to_string, to_time}, + time::{Time, now}, +}; use crate::{ cluster::{ALPN, UtcLine, connect_config}, @@ -26,7 +25,11 @@ use crate::{ const PROTOCOL_VERSION: u32 = 1; -type RemoteLine = (String, Duration); +const CLOSE_RECV: (u32, &[u8]) = (1, b"error receiving from your stream"); +const CLOSE_CLOSED: (u32, &[u8]) = (2, b"you closed your stream"); +const CLOSE_SEND: (u32, &[u8]) = (3, b"could not send a message to your channel so I quit"); + +type RemoteLine = (String, Time); type MaybeRemoteLine = Result, IoError>; enum Event { @@ -35,11 +38,6 @@ enum Event { ConnectionReceived(Option), } -enum Insert { - Front, - Back, -} - pub struct OwnConnection { connection: Connection, id: u64, @@ -47,7 +45,8 @@ pub struct OwnConnection { line_tx: BufWriter, line_rx: BufReader, - next_time: Option, + next_time_secs: Option, + next_time_nanos: Option, next_len: Option, next_line: Option>, } @@ -64,17 +63,19 @@ impl OwnConnection { id, line_tx, line_rx, - next_time: None, + next_time_secs: None, + next_time_nanos: None, next_len: None, next_line: None, } } /// Send a line to peer - async fn send_line(&mut self, line: RemoteLine) -> Result<(), std::io::Error> { - self.line_tx.write_u64(line.1.as_micros() as u64).await?; - self.line_tx.write_u32(line.0.len() as u32).await?; - self.line_tx.write_all(line.0.as_bytes()).await?; + async fn send_line(&mut self, line: &String, time: &Time) -> Result<(), std::io::Error> { + self.line_tx.write_u64(time.as_secs()).await?; + self.line_tx.write_u32(time.subsec_nanos()).await?; + self.line_tx.write_u32(line.len() as u32).await?; + self.line_tx.write_all(line.as_bytes()).await?; self.line_tx.flush().await?; Ok(()) } @@ -82,10 +83,12 @@ impl OwnConnection { /// Cancel-safe function that returns next line from peer /// Returns None if we don't have all data yet. async fn recv_line(&mut self) -> Result, std::io::Error> { - if self.next_time.is_none() { - self.next_time = Some(Duration::from_micros(self.line_rx.read_u64().await?)); + if self.next_time_secs.is_none() { + self.next_time_secs = Some(self.line_rx.read_u64().await?); + } + if self.next_time_nanos.is_none() { + self.next_time_nanos = Some(self.line_rx.read_u32().await?); } - // Ok we have next_time.is_some() if self.next_len.is_none() { self.next_len = Some(self.line_rx.read_u32().await? as usize); } @@ -117,7 +120,11 @@ impl OwnConnection { let line = String::try_from(self.next_line.take().unwrap()).map_err(|err| { std::io::Error::new(std::io::ErrorKind::InvalidData, err.to_string()) })?; - Ok(Some((line, self.next_time.take().unwrap()))) + let time = Time::new( + self.next_time_secs.take().unwrap(), + self.next_time_nanos.take().unwrap(), + ); + Ok(Some((line, time))) } else { // Ok we don't have a full line, will be next time! Ok(None) @@ -154,7 +161,7 @@ pub struct ConnectionManager { /// Message we receive from actions message_rx: mpsc::Receiver, /// Our queue of messages to send - message_queue: VecDeque, + message_queue: Tree>, /// Messages we send from remote nodes to our own stream own_cluster_tx: remoc::rch::mpsc::Sender, @@ -164,16 +171,26 @@ pub struct ConnectionManager { } impl ConnectionManager { - pub fn new( + pub async fn new( cluster_name: String, remote: EndpointAddr, endpoint: Arc, message_timeout: Duration, message_rx: mpsc::Receiver, own_cluster_tx: remoc::rch::mpsc::Sender, + db: &mut Database, shutdown: ShutdownController, - ) -> (Self, mpsc::Sender) { + ) -> Result<(Self, mpsc::Sender), String> { let node_id = remote.id.show(); + + let message_queue = db + .open_tree( + format!("message_queue_{}_{}", endpoint.id().show(), node_id), + message_timeout, + |(key, value)| Ok((to_time(&key)?, Arc::new(to_string(&value)?))), + ) + .await?; + let (connection_tx, connection_rx) = mpsc::channel(1); let (ask_connection, order_start) = mpsc::channel(1); try_connect( @@ -184,7 +201,7 @@ impl ConnectionManager { connection_tx.clone(), order_start, ); - ( + Ok(( Self { cluster_name, node_id, @@ -195,12 +212,12 @@ impl ConnectionManager { last_connection_id: 0, message_timeout, message_rx, - message_queue: VecDeque::default(), + message_queue, own_cluster_tx, shutdown, }, connection_tx, - ) + )) } /// Main loop @@ -246,7 +263,7 @@ impl ConnectionManager { self.handle_connection(connection).await; } Event::LocalMessageReceived(utc_line) => { - self.handle_local_message(utc_line, Insert::Back).await; + self.handle_local_message(utc_line).await; } Event::RemoteMessageReceived(message) => { self.handle_remote_message(message).await; @@ -255,24 +272,41 @@ impl ConnectionManager { } async fn send_queue_messages(&mut self) { - while self.connection.is_some() - && let Some(message) = self.message_queue.pop_front() + while let Some(connection) = &mut self.connection + && let Some((time, line)) = self + .message_queue + .first_key_value() + .map(|(k, v)| (k.clone(), v.clone())) { - self.handle_local_message(Some(message), Insert::Front) - .await; + if let Err(err) = connection.send_line(&line, &time).await { + eprintln!( + "INFO cluster {}: connection with node {} failed: {err}", + self.cluster_name, self.node_id, + ); + self.close_connection(CLOSE_SEND).await; + } else { + self.message_queue.remove(&time).await; + eprintln!( + "DEBUG cluster {}: node {}: sent a local message to remote: {}", + self.cluster_name, self.node_id, line + ); + } } } async fn drop_timeout_messages(&mut self) { - let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + let now = now(); let mut count = 0; - while self - .message_queue - .front() - .is_some_and(|element| element.1 + self.message_timeout < now) - { - self.message_queue.pop_front(); - count += 1; + loop { + // We have a next key and it reached timeout + if let Some(next_key) = self.message_queue.first_key_value().map(|kv| kv.0.clone()) + && next_key + self.message_timeout < now + { + self.message_queue.remove(&next_key).await; + count += 1; + } else { + break; + } } if count > 0 { eprintln!( @@ -343,20 +377,23 @@ impl ConnectionManager { "WARN cluster {}: node {}: connection {}: error receiving remote message: {err}", self.cluster_name, self.node_id, self.last_connection_id ); - self.close_connection(1, b"error receiving from your stream") - .await; + self.close_connection(CLOSE_RECV).await; } Ok(None) => { eprintln!( "WARN cluster {}: node {} closed its stream", self.cluster_name, self.node_id, ); - self.close_connection(1, b"you closed your stream").await; + self.close_connection(CLOSE_CLOSED).await; } Ok(Some(line)) => { // TODO from_utc let local_time = line.1; - if let Err(err) = self.own_cluster_tx.send((line.0.clone(), local_time)).await { + if let Err(err) = self + .own_cluster_tx + .send((line.0.clone(), local_time.into())) + .await + { eprintln!( "ERROR cluster {}: could not send message to reaction stream: {err}", self.cluster_name @@ -376,7 +413,7 @@ impl ConnectionManager { } } - async fn handle_local_message(&mut self, message: Option, insert: Insert) { + async fn handle_local_message(&mut self, message: Option) { eprintln!( "DEBUG cluster {}: node {}: received a local message", self.cluster_name, self.node_id, @@ -392,17 +429,13 @@ impl ConnectionManager { Some(message) => match &mut self.connection { Some(connection) => { // TODO to_utc - if let Err(err) = connection.send_line((message.0.clone(), message.1)).await { + if let Err(err) = connection.send_line(&message.0, &message.1).await { eprintln!( "INFO cluster {}: connection with node {} failed: {err}", self.cluster_name, self.node_id, ); - self.message_queue.push_back(message); - self.close_connection( - 0, - b"could not send a message to your channel so I quit", - ) - .await; + self.message_queue.insert(message.1, message.0).await; + self.close_connection(CLOSE_SEND).await; } else { eprintln!( "DEBUG cluster {}: node {}: sent a local message to remote: {}", @@ -415,19 +448,17 @@ impl ConnectionManager { "DEBUG cluster {}: node {}: no connection, saving local message to send later: {}", self.cluster_name, self.node_id, message.0 ); - if let Insert::Front = insert { - self.message_queue.push_front(message); - } else { - self.message_queue.push_back(message); - } + self.message_queue.insert(message.1, message.0).await; } }, } } - async fn close_connection(&mut self, code: u32, reason: &[u8]) { + async fn close_connection(&mut self, code: (u32, &[u8])) { if let Some(connection) = self.connection.take() { - connection.connection.close(VarInt::from_u32(code), reason); + connection + .connection + .close(VarInt::from_u32(code.0), code.1); } self.ask_connection().await; } diff --git a/plugins/reaction-plugin-cluster/src/main.rs b/plugins/reaction-plugin-cluster/src/main.rs index 026ad31..6ec3645 100644 --- a/plugins/reaction-plugin-cluster/src/main.rs +++ b/plugins/reaction-plugin-cluster/src/main.rs @@ -1,6 +1,7 @@ use std::{ collections::{BTreeMap, BTreeSet}, net::{Ipv4Addr, Ipv6Addr, SocketAddr}, + path::PathBuf, time::Duration, }; @@ -11,17 +12,15 @@ use reaction_plugin::{ }; use remoc::{rch::mpsc, rtc}; use serde::{Deserialize, Serialize}; +use treedb::Database; + +use crate::key::Show; mod cluster; mod connection; mod endpoint; mod key; -use crate::{ - cluster::{bind, cluster_tasks}, - key::{Show, key_b64_to_bytes, secret_key}, -}; - #[tokio::main] async fn main() { let plugin = Plugin::default(); @@ -125,7 +124,7 @@ impl PluginInfo for Plugin { .map_err(|err| format!("invalid message_timeout: {err}"))?; for node in options.nodes.into_iter() { - let bytes = key_b64_to_bytes(&node.public_key) + let bytes = key::key_b64_to_bytes(&node.public_key) .map_err(|err| format!("invalid public key {}: {err}", node.public_key))?; let public_key = PublicKey::from_bytes(&bytes) @@ -144,7 +143,7 @@ impl PluginInfo for Plugin { ); } - let secret_key = secret_key(".", &stream_name).await?; + let secret_key = key::secret_key(".", &stream_name).await?; eprintln!( "INFO public key of this node for cluster {stream_name}: {}", secret_key.public().show() @@ -207,14 +206,24 @@ impl PluginInfo for Plugin { } async fn finish_setup(&mut self) -> RemoteResult<()> { + let mut db = { + let path = PathBuf::from("."); + let (cancellation_token, task_tracker_token) = self.cluster_shutdown.token().split(); + Database::open(&path, cancellation_token, task_tracker_token) + .await + .map_err(|err| format!("Can't open database: {err}"))? + }; + while let Some((stream_name, stream)) = self.streams.pop_first() { - let endpoint = bind(&stream).await?; - cluster_tasks( + let endpoint = cluster::bind(&stream).await?; + cluster::cluster_tasks( endpoint, stream, self.actions.remove(&stream_name).unwrap_or_default(), + &mut db, self.cluster_shutdown.clone(), - ); + ) + .await?; } // Check there is no action left if !self.actions.is_empty() {