cluster: use treedb for message queue persistance

This commit is contained in:
ppom 2025-12-15 12:00:00 +01:00
commit 2095009fa9
No known key found for this signature in database
5 changed files with 129 additions and 79 deletions

1
Cargo.lock generated
View file

@ -2871,6 +2871,7 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"treedb",
]
[[package]]

View file

@ -13,6 +13,7 @@ serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tokio.features = ["rt-multi-thread"]
treedb.workspace = true
data-encoding = "2.9.0"
iroh = { version = "0.94.0", default-features = false }

View file

@ -2,7 +2,7 @@ use std::{
collections::BTreeMap,
net::{SocketAddrV4, SocketAddrV6},
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
time::Duration,
};
use futures::future::join_all;
@ -13,12 +13,16 @@ use iroh::{
use reaction_plugin::{Line, shutdown::ShutdownController};
use remoc::rch::mpsc as remocMpsc;
use tokio::sync::mpsc as tokioMpsc;
use treedb::{
Database,
time::{Time, now},
};
use crate::{ActionInit, StreamInit, connection::ConnectionManager, endpoint::EndpointManager};
pub const ALPN: [&[u8]; 1] = ["reaction_cluster_1".as_bytes()];
pub type UtcLine = Arc<(String, Duration)>;
pub type UtcLine = (Arc<String>, Time);
pub fn transport_config() -> TransportConfig {
// FIXME higher timeouts and keep alive
@ -56,12 +60,13 @@ pub async fn bind(stream: &StreamInit) -> Result<Endpoint, String> {
})
}
pub fn cluster_tasks(
pub async fn cluster_tasks(
endpoint: Endpoint,
mut stream: StreamInit,
mut actions: Vec<ActionInit>,
db: &mut Database,
shutdown: ShutdownController,
) {
) -> Result<(), String> {
eprintln!("DEBUG cluster tasks starts running");
let (message_action2connection_txs, mut message_action2connection_rxs): (
@ -100,8 +105,10 @@ pub fn cluster_tasks(
stream.message_timeout,
message_action2connection_rx,
stream_tx,
db,
shutdown,
);
)
.await?;
tokio::spawn(async move { connection_manager.task().await });
connection_endpoint2connection_txs.insert(pk, connection_endpoint2connection_tx);
}
@ -115,6 +122,7 @@ pub fn cluster_tasks(
);
eprintln!("DEBUG cluster tasks finished running");
Ok(())
}
impl ActionInit {
@ -127,15 +135,15 @@ impl ActionInit {
while let Ok(Some(m)) = self.rx.recv().await {
eprintln!("DEBUG action: received a message to send to connections");
let line = self.send.line(m.match_);
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let now = now();
if self.self_
&& let Err(err) = own_stream_tx.send((line.clone(), now.clone())).await
&& let Err(err) = own_stream_tx.send((line.clone(), now.into())).await
{
eprintln!("ERROR while queueing message to be sent to own cluster stream: {err}");
}
// TODO to_utc
let line = Arc::new((line, now));
let line = (Arc::new(line), now);
for result in join_all(nodes_tx.iter().map(|tx| tx.send(line.clone()))).await {
if let Err(err) = result {
eprintln!("ERROR while queueing message to be sent to cluster nodes: {err}");

View file

@ -1,10 +1,4 @@
use std::{
cmp::max,
collections::VecDeque,
io::Error as IoError,
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use std::{cmp::max, io::Error as IoError, sync::Arc, time::Duration};
use futures::FutureExt;
use iroh::{
@ -18,6 +12,11 @@ use tokio::{
sync::mpsc,
time::sleep,
};
use treedb::{
Database, Tree,
helpers::{to_string, to_time},
time::{Time, now},
};
use crate::{
cluster::{ALPN, UtcLine, connect_config},
@ -26,7 +25,11 @@ use crate::{
const PROTOCOL_VERSION: u32 = 1;
type RemoteLine = (String, Duration);
const CLOSE_RECV: (u32, &[u8]) = (1, b"error receiving from your stream");
const CLOSE_CLOSED: (u32, &[u8]) = (2, b"you closed your stream");
const CLOSE_SEND: (u32, &[u8]) = (3, b"could not send a message to your channel so I quit");
type RemoteLine = (String, Time);
type MaybeRemoteLine = Result<Option<RemoteLine>, IoError>;
enum Event {
@ -35,11 +38,6 @@ enum Event {
ConnectionReceived(Option<ConnOrConn>),
}
enum Insert {
Front,
Back,
}
pub struct OwnConnection {
connection: Connection,
id: u64,
@ -47,7 +45,8 @@ pub struct OwnConnection {
line_tx: BufWriter<SendStream>,
line_rx: BufReader<RecvStream>,
next_time: Option<Duration>,
next_time_secs: Option<u64>,
next_time_nanos: Option<u32>,
next_len: Option<usize>,
next_line: Option<Vec<u8>>,
}
@ -64,17 +63,19 @@ impl OwnConnection {
id,
line_tx,
line_rx,
next_time: None,
next_time_secs: None,
next_time_nanos: None,
next_len: None,
next_line: None,
}
}
/// Send a line to peer
async fn send_line(&mut self, line: RemoteLine) -> Result<(), std::io::Error> {
self.line_tx.write_u64(line.1.as_micros() as u64).await?;
self.line_tx.write_u32(line.0.len() as u32).await?;
self.line_tx.write_all(line.0.as_bytes()).await?;
async fn send_line(&mut self, line: &String, time: &Time) -> Result<(), std::io::Error> {
self.line_tx.write_u64(time.as_secs()).await?;
self.line_tx.write_u32(time.subsec_nanos()).await?;
self.line_tx.write_u32(line.len() as u32).await?;
self.line_tx.write_all(line.as_bytes()).await?;
self.line_tx.flush().await?;
Ok(())
}
@ -82,10 +83,12 @@ impl OwnConnection {
/// Cancel-safe function that returns next line from peer
/// Returns None if we don't have all data yet.
async fn recv_line(&mut self) -> Result<Option<RemoteLine>, std::io::Error> {
if self.next_time.is_none() {
self.next_time = Some(Duration::from_micros(self.line_rx.read_u64().await?));
if self.next_time_secs.is_none() {
self.next_time_secs = Some(self.line_rx.read_u64().await?);
}
if self.next_time_nanos.is_none() {
self.next_time_nanos = Some(self.line_rx.read_u32().await?);
}
// Ok we have next_time.is_some()
if self.next_len.is_none() {
self.next_len = Some(self.line_rx.read_u32().await? as usize);
}
@ -117,7 +120,11 @@ impl OwnConnection {
let line = String::try_from(self.next_line.take().unwrap()).map_err(|err| {
std::io::Error::new(std::io::ErrorKind::InvalidData, err.to_string())
})?;
Ok(Some((line, self.next_time.take().unwrap())))
let time = Time::new(
self.next_time_secs.take().unwrap(),
self.next_time_nanos.take().unwrap(),
);
Ok(Some((line, time)))
} else {
// Ok we don't have a full line, will be next time!
Ok(None)
@ -154,7 +161,7 @@ pub struct ConnectionManager {
/// Message we receive from actions
message_rx: mpsc::Receiver<UtcLine>,
/// Our queue of messages to send
message_queue: VecDeque<UtcLine>,
message_queue: Tree<Time, Arc<String>>,
/// Messages we send from remote nodes to our own stream
own_cluster_tx: remoc::rch::mpsc::Sender<Line>,
@ -164,16 +171,26 @@ pub struct ConnectionManager {
}
impl ConnectionManager {
pub fn new(
pub async fn new(
cluster_name: String,
remote: EndpointAddr,
endpoint: Arc<Endpoint>,
message_timeout: Duration,
message_rx: mpsc::Receiver<UtcLine>,
own_cluster_tx: remoc::rch::mpsc::Sender<Line>,
db: &mut Database,
shutdown: ShutdownController,
) -> (Self, mpsc::Sender<ConnOrConn>) {
) -> Result<(Self, mpsc::Sender<ConnOrConn>), String> {
let node_id = remote.id.show();
let message_queue = db
.open_tree(
format!("message_queue_{}_{}", endpoint.id().show(), node_id),
message_timeout,
|(key, value)| Ok((to_time(&key)?, Arc::new(to_string(&value)?))),
)
.await?;
let (connection_tx, connection_rx) = mpsc::channel(1);
let (ask_connection, order_start) = mpsc::channel(1);
try_connect(
@ -184,7 +201,7 @@ impl ConnectionManager {
connection_tx.clone(),
order_start,
);
(
Ok((
Self {
cluster_name,
node_id,
@ -195,12 +212,12 @@ impl ConnectionManager {
last_connection_id: 0,
message_timeout,
message_rx,
message_queue: VecDeque::default(),
message_queue,
own_cluster_tx,
shutdown,
},
connection_tx,
)
))
}
/// Main loop
@ -246,7 +263,7 @@ impl ConnectionManager {
self.handle_connection(connection).await;
}
Event::LocalMessageReceived(utc_line) => {
self.handle_local_message(utc_line, Insert::Back).await;
self.handle_local_message(utc_line).await;
}
Event::RemoteMessageReceived(message) => {
self.handle_remote_message(message).await;
@ -255,24 +272,41 @@ impl ConnectionManager {
}
async fn send_queue_messages(&mut self) {
while self.connection.is_some()
&& let Some(message) = self.message_queue.pop_front()
while let Some(connection) = &mut self.connection
&& let Some((time, line)) = self
.message_queue
.first_key_value()
.map(|(k, v)| (k.clone(), v.clone()))
{
self.handle_local_message(Some(message), Insert::Front)
.await;
if let Err(err) = connection.send_line(&line, &time).await {
eprintln!(
"INFO cluster {}: connection with node {} failed: {err}",
self.cluster_name, self.node_id,
);
self.close_connection(CLOSE_SEND).await;
} else {
self.message_queue.remove(&time).await;
eprintln!(
"DEBUG cluster {}: node {}: sent a local message to remote: {}",
self.cluster_name, self.node_id, line
);
}
}
}
async fn drop_timeout_messages(&mut self) {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let now = now();
let mut count = 0;
while self
.message_queue
.front()
.is_some_and(|element| element.1 + self.message_timeout < now)
{
self.message_queue.pop_front();
count += 1;
loop {
// We have a next key and it reached timeout
if let Some(next_key) = self.message_queue.first_key_value().map(|kv| kv.0.clone())
&& next_key + self.message_timeout < now
{
self.message_queue.remove(&next_key).await;
count += 1;
} else {
break;
}
}
if count > 0 {
eprintln!(
@ -343,20 +377,23 @@ impl ConnectionManager {
"WARN cluster {}: node {}: connection {}: error receiving remote message: {err}",
self.cluster_name, self.node_id, self.last_connection_id
);
self.close_connection(1, b"error receiving from your stream")
.await;
self.close_connection(CLOSE_RECV).await;
}
Ok(None) => {
eprintln!(
"WARN cluster {}: node {} closed its stream",
self.cluster_name, self.node_id,
);
self.close_connection(1, b"you closed your stream").await;
self.close_connection(CLOSE_CLOSED).await;
}
Ok(Some(line)) => {
// TODO from_utc
let local_time = line.1;
if let Err(err) = self.own_cluster_tx.send((line.0.clone(), local_time)).await {
if let Err(err) = self
.own_cluster_tx
.send((line.0.clone(), local_time.into()))
.await
{
eprintln!(
"ERROR cluster {}: could not send message to reaction stream: {err}",
self.cluster_name
@ -376,7 +413,7 @@ impl ConnectionManager {
}
}
async fn handle_local_message(&mut self, message: Option<UtcLine>, insert: Insert) {
async fn handle_local_message(&mut self, message: Option<UtcLine>) {
eprintln!(
"DEBUG cluster {}: node {}: received a local message",
self.cluster_name, self.node_id,
@ -392,17 +429,13 @@ impl ConnectionManager {
Some(message) => match &mut self.connection {
Some(connection) => {
// TODO to_utc
if let Err(err) = connection.send_line((message.0.clone(), message.1)).await {
if let Err(err) = connection.send_line(&message.0, &message.1).await {
eprintln!(
"INFO cluster {}: connection with node {} failed: {err}",
self.cluster_name, self.node_id,
);
self.message_queue.push_back(message);
self.close_connection(
0,
b"could not send a message to your channel so I quit",
)
.await;
self.message_queue.insert(message.1, message.0).await;
self.close_connection(CLOSE_SEND).await;
} else {
eprintln!(
"DEBUG cluster {}: node {}: sent a local message to remote: {}",
@ -415,19 +448,17 @@ impl ConnectionManager {
"DEBUG cluster {}: node {}: no connection, saving local message to send later: {}",
self.cluster_name, self.node_id, message.0
);
if let Insert::Front = insert {
self.message_queue.push_front(message);
} else {
self.message_queue.push_back(message);
}
self.message_queue.insert(message.1, message.0).await;
}
},
}
}
async fn close_connection(&mut self, code: u32, reason: &[u8]) {
async fn close_connection(&mut self, code: (u32, &[u8])) {
if let Some(connection) = self.connection.take() {
connection.connection.close(VarInt::from_u32(code), reason);
connection
.connection
.close(VarInt::from_u32(code.0), code.1);
}
self.ask_connection().await;
}

View file

@ -1,6 +1,7 @@
use std::{
collections::{BTreeMap, BTreeSet},
net::{Ipv4Addr, Ipv6Addr, SocketAddr},
path::PathBuf,
time::Duration,
};
@ -11,17 +12,15 @@ use reaction_plugin::{
};
use remoc::{rch::mpsc, rtc};
use serde::{Deserialize, Serialize};
use treedb::Database;
use crate::key::Show;
mod cluster;
mod connection;
mod endpoint;
mod key;
use crate::{
cluster::{bind, cluster_tasks},
key::{Show, key_b64_to_bytes, secret_key},
};
#[tokio::main]
async fn main() {
let plugin = Plugin::default();
@ -125,7 +124,7 @@ impl PluginInfo for Plugin {
.map_err(|err| format!("invalid message_timeout: {err}"))?;
for node in options.nodes.into_iter() {
let bytes = key_b64_to_bytes(&node.public_key)
let bytes = key::key_b64_to_bytes(&node.public_key)
.map_err(|err| format!("invalid public key {}: {err}", node.public_key))?;
let public_key = PublicKey::from_bytes(&bytes)
@ -144,7 +143,7 @@ impl PluginInfo for Plugin {
);
}
let secret_key = secret_key(".", &stream_name).await?;
let secret_key = key::secret_key(".", &stream_name).await?;
eprintln!(
"INFO public key of this node for cluster {stream_name}: {}",
secret_key.public().show()
@ -207,14 +206,24 @@ impl PluginInfo for Plugin {
}
async fn finish_setup(&mut self) -> RemoteResult<()> {
let mut db = {
let path = PathBuf::from(".");
let (cancellation_token, task_tracker_token) = self.cluster_shutdown.token().split();
Database::open(&path, cancellation_token, task_tracker_token)
.await
.map_err(|err| format!("Can't open database: {err}"))?
};
while let Some((stream_name, stream)) = self.streams.pop_first() {
let endpoint = bind(&stream).await?;
cluster_tasks(
let endpoint = cluster::bind(&stream).await?;
cluster::cluster_tasks(
endpoint,
stream,
self.actions.remove(&stream_name).unwrap_or_default(),
&mut db,
self.cluster_shutdown.clone(),
);
)
.await?;
}
// Check there is no action left
if !self.actions.is_empty() {