cluster: finish first draft

finish ConnectionManager main loop
handle local & remote messages, maintain local queue
This commit is contained in:
ppom 2025-11-20 12:00:00 +01:00
commit 43fdd3a877
No known key found for this signature in database
5 changed files with 282 additions and 67 deletions

1
TODO
View file

@ -2,4 +2,3 @@ Test what happens when a Filter's pattern Set changes (I think it's shitty)
DB: add tests on stress testing (lines should always be in order)
plugins: pipe stderr too and wrap errors in logs
plugins: provide treedb storage? omg (add an enum that's either remoc::rch::mpsc or tokio::mpsc)
plugin cluster: provide a stream of refused connections?

View file

@ -7,7 +7,7 @@ use std::{
use chrono::{DateTime, Local, Utc};
use futures::future::join_all;
use iroh::{Endpoint, PublicKey, endpoint::Connection};
use reaction_plugin::{Line, shutdown::ShutdownToken};
use reaction_plugin::{Line, shutdown::ShutdownController};
use remoc::rch::mpsc as remocMpsc;
use tokio::sync::mpsc as tokioMpsc;
@ -43,7 +43,7 @@ pub fn cluster_tasks(
endpoint: Endpoint,
mut stream: StreamInit,
mut actions: Vec<ActionInit>,
shutdown: ShutdownToken,
shutdown: ShutdownController,
) {
let (message_action2connection_txs, mut message_action2connection_rxs): (
Vec<tokioMpsc::Sender<UtcLine>>,
@ -94,6 +94,7 @@ pub fn cluster_tasks(
let endpoint = endpoint.clone();
let message_action2connection_rx = message_action2connection_rxs.pop().unwrap();
let stream_tx = stream.tx.clone();
let shutdown = shutdown.clone();
tokio::spawn(async move {
ConnectionManager::new(
cluster_name,
@ -103,6 +104,7 @@ pub fn cluster_tasks(
stream.message_timeout,
message_action2connection_rx,
stream_tx,
shutdown,
)
.task()
.await

View file

@ -1,14 +1,18 @@
use std::{collections::VecDeque, sync::Arc, time::Duration};
use std::{
collections::VecDeque,
sync::Arc,
time::{Duration, Instant},
};
use chrono::TimeDelta;
use iroh::{Endpoint, EndpointAddr, endpoint::Connection};
use reaction_plugin::Line;
use chrono::{DateTime, Local, TimeDelta, Utc};
use iroh::{
Endpoint, EndpointAddr,
endpoint::{Connection, VarInt},
};
use reaction_plugin::{Line, shutdown::ShutdownController};
use remoc::{Connect, rch::base};
use serde::{Deserialize, Serialize};
use tokio::{
sync::mpsc,
time::{Sleep, sleep},
};
use tokio::{sync::mpsc, time::sleep};
use crate::cluster::{ALPN, UtcLine};
@ -20,9 +24,14 @@ const PROTOCOL_VERSION: u32 = 1;
enum Event {
Tick,
LocalMessageReceived(UtcLine),
RemoteMessageReceived(RemoteMessage),
ConnectionReceived(Connection),
LocalMessageReceived(Option<UtcLine>),
RemoteMessageReceived(Result<Option<RemoteMessage>, remoc::rch::base::RecvError>),
ConnectionReceived(Option<Connection>),
}
enum Insert {
Front,
Back,
}
struct OwnConnection {
@ -31,6 +40,9 @@ struct OwnConnection {
rx: base::Receiver<RemoteMessage>,
}
/// Handle a remote node.
/// Manage reception and sending of messages to this node.
/// Retry failed connections.
pub struct ConnectionManager {
/// Cluster's name (for logging)
cluster_name: String,
@ -46,9 +58,9 @@ pub struct ConnectionManager {
connection: Option<OwnConnection>,
/// Delta we'll use next time we'll try to connect to remote
delta: Duration,
/// When this Future resolves, we'll retry connecting to remote
tick: Option<Sleep>,
delta: Option<Duration>,
/// Next instant we'll try to connect
next_try_connect: Option<Instant>,
/// Max duration before we drop pending messages to a node we can't connect to.
message_timeout: TimeDelta,
@ -59,6 +71,9 @@ pub struct ConnectionManager {
/// Messages we send from remote nodes to our own stream
own_cluster_tx: remoc::rch::mpsc::Sender<Line>,
/// shutdown
shutdown: ShutdownController,
}
impl ConnectionManager {
@ -70,48 +85,77 @@ impl ConnectionManager {
message_timeout: TimeDelta,
message_rx: mpsc::Receiver<UtcLine>,
own_cluster_tx: remoc::rch::mpsc::Sender<Line>,
shutdown: ShutdownController,
) -> Self {
Self {
cluster_name,
remote,
endpoint,
connection: None,
delta: Duration::default(),
tick: None,
delta: None,
next_try_connect: None,
connection_rx,
message_timeout,
message_rx,
message_queue: VecDeque::default(),
own_cluster_tx,
}
}
pub async fn task(mut self) {
self.try_connect().await;
loop {
// TODO event
let event = Event::Tick;
self.handle_event(event).await;
shutdown,
}
}
/// Main loop
pub async fn task(mut self) {
self.try_connect().await;
loop {
let tick = sleep(if self.connection.is_none() {
self.delta.unwrap_or(START_TIMEOUT)
} else {
// Still tick when we have a connection
Duration::from_secs(60)
});
tokio::pin!(tick);
let have_connection = self.connection.is_some();
let maybe_conn_rx = self.connection.as_mut().map(|conn| conn.rx.recv());
let event = tokio::select! {
// Tick when we don't have a connection
_ = tick, if !have_connection => Some(Event::Tick),
// Receive remote message when we have a connection
msg = maybe_conn_rx.unwrap(), if have_connection => Some(Event::RemoteMessageReceived(msg)),
// Receive a connection from EndpointManager
conn = self.connection_rx.recv() => Some(Event::ConnectionReceived(conn)),
// Receive a message from local Actions
msg = self.message_rx.recv() => Some(Event::LocalMessageReceived(msg)),
// Quitting
_ = self.shutdown.wait() => None,
};
match event {
Some(event) => {
self.handle_event(event).await;
self.send_queue_messages().await;
self.drop_timeout_messages().await;
}
None => break,
}
}
}
async fn handle_event(&mut self, event: Event) {
match event {
Event::Tick => {
// TODO
self.try_connect().await;
self.handle_tick().await;
}
Event::ConnectionReceived(connection) => {
// TODO
self.handle_connection(connection).await;
}
Event::LocalMessageReceived(utc_line) => {
// TODO
self.handle_local_message(utc_line, Insert::Back).await;
}
Event::RemoteMessageReceived(remote_message) => {
// TODO
Event::RemoteMessageReceived(message) => {
self.handle_remote_message(message).await;
}
}
}
@ -121,7 +165,7 @@ impl ConnectionManager {
async fn try_connect(&mut self) -> bool {
if self.connection.is_none() {
match self.endpoint.connect(self.remote.clone(), ALPN[0]).await {
Ok(connection) => self.handle_connection(connection).await,
Ok(connection) => self.handle_connection(Some(connection)).await,
Err(err) => {
self.try_connect_error(err.to_string());
false
@ -132,30 +176,88 @@ impl ConnectionManager {
}
}
async fn handle_tick(&mut self) {
if self
.next_try_connect
.is_some_and(|next| next > Instant::now())
{
self.try_connect().await;
}
}
async fn send_queue_messages(&mut self) {
while self.connection.is_some()
&& let Some(message) = self.message_queue.pop_front()
{
self.handle_local_message(Some(message), Insert::Front)
.await;
}
}
async fn drop_timeout_messages(&mut self) {
let now = Utc::now();
let mut count = 0;
while self
.message_queue
.front()
.is_some_and(|element| element.1 + self.message_timeout < now)
{
self.message_queue.pop_front();
count += 1;
}
if count > 0 {
eprintln!(
"DEBUG cluster {}: node {}: dropping {count} messages that reached timeout",
self.cluster_name, self.remote.id,
)
}
}
/// Bootstrap a new Connection
/// Returns true if we have a valid connection now
async fn handle_connection(&mut self, connection: Connection) -> bool {
self.delta = Duration::default();
self.tick = None;
match open_channels(&connection).await {
Ok((tx, rx)) => {
self.connection = Some(OwnConnection { connection, tx, rx });
true
}
Err(err) => {
self.try_connect_error(err);
async fn handle_connection(&mut self, connection: Option<Connection>) -> bool {
match connection {
None => {
eprintln!(
"DEBUG cluster {}: ConnectionManager {}: quitting because EndpointManager has quit",
self.cluster_name, self.remote.id
);
self.quit();
false
}
Some(connection) => {
if self.connection.is_none() {
self.delta = None;
self.next_try_connect = None;
match open_channels(&connection).await {
Ok((tx, rx)) => {
self.connection = Some(OwnConnection { connection, tx, rx });
true
}
Err(err) => {
self.try_connect_error(err);
false
}
}
} else {
eprintln!(
"WARN cluster {}: ignoring incoming connection from {}, as we already have a valid connection with it",
self.cluster_name, self.remote.id
);
true
}
}
}
}
/// Update the state and log an error when bootstraping a new Connection
async fn try_connect_error(&mut self, err: String) {
self.delta = next_delta(self.delta);
self.tick = Some(sleep(self.delta));
fn try_connect_error(&mut self, err: String) {
let delta = next_delta(self.delta);
self.next_try_connect = Some(Instant::now() + delta);
self.delta = Some(delta);
eprintln!(
"ERROR cluster {}: node {}: {err}",
"ERROR cluster {}: trying to connect to node {}: {err}",
self.cluster_name, self.remote.id
);
eprintln!(
@ -163,18 +265,127 @@ impl ConnectionManager {
self.cluster_name, self.remote.id, self.delta
);
}
async fn handle_remote_message(
&mut self,
message: Result<Option<RemoteMessage>, remoc::rch::base::RecvError>,
) {
match message {
Err(err) => {
eprintln!(
"WARN cluster {}: error receiving message from node {}: {err}",
self.cluster_name, self.remote.id
);
self.close_connection(1, b"error receiving from your stream")
.await;
}
Ok(None) => {
eprintln!(
"WARN cluster {}: node {} closed its stream",
self.cluster_name, self.remote.id
);
self.close_connection(1, b"you closed your stream").await;
}
Ok(Some(RemoteMessage::Version(_))) => {
eprintln!(
"WARN cluster {}: node {} sent invalid message, ignoring",
self.cluster_name, self.remote.id
);
}
Ok(Some(RemoteMessage::Quitting)) => {
eprintln!(
"INFO cluster {}: node {} is quitting, bye bye",
self.cluster_name, self.remote.id
);
self.close_connection(0, b"you said you'll quit so I quit")
.await;
}
Ok(Some(RemoteMessage::Line(line))) => {
let local_time = line.1.with_timezone(&Local);
if let Err(err) = self.own_cluster_tx.send((line.0.clone(), local_time)).await {
eprintln!(
"ERROR cluster {}: could not send message to reaction stream: {err}",
self.cluster_name
);
eprintln!(
"INFO cluster {}: line that can't be sent: {}",
self.cluster_name, line.0
);
self.quit();
}
}
}
}
async fn handle_local_message(&mut self, message: Option<UtcLine>, insert: Insert) {
match message {
None => {
eprintln!(
"INFO cluster {}: no action remaining, quitting",
self.cluster_name
);
self.quit();
}
Some(message) => match &mut self.connection {
Some(connection) => {
if let Err(err) = connection
.tx
.send(RemoteMessage::Line((
message.0.clone(),
message.1.with_timezone(&Utc),
)))
.await
{
eprintln!(
"INFO cluster {}: connection with node {} failed: {err}",
self.cluster_name, self.remote.id
);
self.message_queue.push_back(message);
self.close_connection(
0,
b"could not send a message to your channel so I quit",
)
.await;
}
}
None => {
if let Insert::Front = insert {
self.message_queue.push_front(message);
} else {
self.message_queue.push_back(message);
}
}
},
}
}
async fn close_connection(&mut self, code: u32, reason: &[u8]) {
if let Some(mut connection) = self.connection.take() {
connection.rx.close().await;
connection.connection.close(VarInt::from_u32(code), reason);
}
}
fn quit(&mut self) {
self.shutdown.ask_shutdown();
}
}
/// Compute the next wait Duration.
/// We're multiplying the Duration by [`TIMEOUT_FACTOR`] and cap it to [`MAX_TIMEOUT`].
fn next_delta(delta: Duration) -> Duration {
// Multiply timeout by TIMEOUT_FACTOR
let delta = Duration::from_millis(((delta.as_millis() as f64) * TIMEOUT_FACTOR) as u64);
// Cap to MAX_TIMEOUT
if delta > MAX_TIMEOUT {
MAX_TIMEOUT
} else {
delta
fn next_delta(delta: Option<Duration>) -> Duration {
match delta {
None => START_TIMEOUT,
Some(delta) => {
// Multiply timeout by TIMEOUT_FACTOR
let delta = Duration::from_millis(((delta.as_millis() as f64) * TIMEOUT_FACTOR) as u64);
// Cap to MAX_TIMEOUT
if delta > MAX_TIMEOUT {
MAX_TIMEOUT
} else {
delta
}
}
}
}
@ -185,7 +396,7 @@ pub enum RemoteMessage {
/// Must be the first message sent over, then should not be sent again
Version(u32),
/// A line to transmit to your stream
Line(UtcLine),
Line((String, DateTime<Utc>)),
/// Announce the node is closing
Quitting,
}
@ -195,23 +406,22 @@ pub enum RemoteMessage {
async fn open_channels(
connection: &Connection,
) -> Result<(base::Sender<RemoteMessage>, base::Receiver<RemoteMessage>), String> {
let output = connection
.open_uni()
.await
.map_err(|err| format!("{err}"))?;
let output = connection.open_uni().await.map_err(|err| err.to_string())?;
let input = connection
.accept_uni()
.await
.map_err(|err| format!("{err}"))?;
.map_err(|err| err.to_string())?;
let (conn, mut tx, mut rx) = Connect::io_buffered(remoc::Cfg::default(), input, output, 1024)
.await
.map_err(|err| format!("{err}"))?;
.map_err(|err| err.to_string())?;
tokio::spawn(conn);
tx.send(RemoteMessage::Version(PROTOCOL_VERSION)).await;
tx.send(RemoteMessage::Version(PROTOCOL_VERSION))
.await
.map_err(|err| err.to_string())?;
match rx.recv().await {
// Good protocol version!

View file

@ -5,7 +5,7 @@ use iroh::{
Endpoint, PublicKey,
endpoint::{Connection, Incoming},
};
use reaction_plugin::shutdown::ShutdownToken;
use reaction_plugin::shutdown::ShutdownController;
use tokio::sync::mpsc;
enum Break {
@ -21,7 +21,7 @@ pub struct EndpointManager {
/// Connection sender to the Connection Managers
connections_tx: BTreeMap<PublicKey, mpsc::Sender<Connection>>,
/// shutdown
shutdown: ShutdownToken,
shutdown: ShutdownController,
}
impl EndpointManager {
@ -29,7 +29,7 @@ impl EndpointManager {
endpoint: Arc<Endpoint>,
cluster_name: String,
connections_tx: BTreeMap<PublicKey, mpsc::Sender<Connection>>,
shutdown: ShutdownToken,
shutdown: ShutdownController,
) {
tokio::spawn(async move {
Self {
@ -111,7 +111,11 @@ impl EndpointManager {
}
Some(tx) => {
if let Err(_) = tx.send(connection).await {
// This means the main cluster loop has exited, so let's quit
eprintln!(
"DEBUG cluster {}: EndpointManager: quitting because ConnectionManager has quit",
self.cluster_name,
);
self.shutdown.ask_shutdown();
return Break::Yes;
}
}

View file

@ -215,7 +215,7 @@ impl PluginInfo for Plugin {
endpoint,
stream,
self.actions.remove(&stream_name).unwrap_or_default(),
self.cluster_shutdown.token(),
self.cluster_shutdown.clone(),
);
}
// Check there is no action left
@ -238,7 +238,7 @@ impl PluginInfo for Plugin {
async fn close(self) -> RemoteResult<()> {
self.cluster_shutdown.ask_shutdown();
self.cluster_shutdown.wait_shutdown().await;
self.cluster_shutdown.wait_all_task_shutdown().await;
Ok(())
}
}