From 983eff13eb90519180a034230c2ccc0eafca80e6 Mon Sep 17 00:00:00 2001 From: ppom Date: Mon, 3 Nov 2025 12:00:00 +0100 Subject: [PATCH] cluster initialization - Actions are connected to Cluster, - Separate task to (re)initialize connections --- .../reaction-plugin-cluster/src/cluster.rs | 123 ++++++++++++---- .../reaction-plugin-cluster/src/connection.rs | 136 ++++++++++++++++++ plugins/reaction-plugin-cluster/src/main.rs | 83 +++++++---- 3 files changed, 291 insertions(+), 51 deletions(-) create mode 100644 plugins/reaction-plugin-cluster/src/connection.rs diff --git a/plugins/reaction-plugin-cluster/src/cluster.rs b/plugins/reaction-plugin-cluster/src/cluster.rs index 4b3744a..f58372b 100644 --- a/plugins/reaction-plugin-cluster/src/cluster.rs +++ b/plugins/reaction-plugin-cluster/src/cluster.rs @@ -1,36 +1,107 @@ -use std::net::{SocketAddrV4, SocketAddrV6}; +use std::{ + collections::BTreeMap, + net::{SocketAddrV4, SocketAddrV6}, + sync::Arc, +}; -use iroh::Endpoint; -use reaction_plugin::RemoteResult; +use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::Connection}; +use reaction_plugin::Exec; +use tokio::sync::{mpsc, oneshot}; -use crate::Plugin; +use crate::{ActionInit, StreamInit, connection::ConnectionInitializer}; -const ALPN: [&[u8]; 1] = ["reaction_cluster_1".as_bytes()]; +pub const ALPN: [&[u8]; 1] = ["reaction_cluster_1".as_bytes()]; -impl Plugin { - pub async fn endpoint_init(&mut self) -> RemoteResult<()> { - // while let Some((stream_name, stream)) = self.streams.pop_first() { - for (stream_name, stream) in &self.streams { - let mut builder = Endpoint::builder() - .secret_key(stream.secret_key.clone()) - .alpns(ALPN.iter().map(|slice| slice.to_vec()).collect()) - .relay_mode(iroh::RelayMode::Disabled) - .clear_discovery(); +type ShutdownNotification = oneshot::Receiver>; - if let Some(ip) = stream.bind_ipv4 { - builder = builder.bind_addr_v4(SocketAddrV4::new(ip, stream.listen_port)); - } - if let Some(ip) = stream.bind_ipv6 { - builder = builder.bind_addr_v6(SocketAddrV6::new(ip, stream.listen_port, 0, 0)); - } +pub struct Cluster { + endpoint: Arc, + stream: StreamInit, + actions: Vec, + shutdown: ShutdownNotification, + connections: BTreeMap, + endpoint_addr_tx: mpsc::Sender, + connection_rx: mpsc::Receiver, +} - let endpoint = builder.bind().await.map_err(|err| { - format!("Could not create socket address for cluster {stream_name}: {err}") - })?; - self.endpoints.insert(stream_name.clone(), endpoint); +impl Cluster { + pub async fn new( + stream: StreamInit, + actions: Vec, + shutdown: ShutdownNotification, + ) -> Result<(), String> { + let mut builder = Endpoint::builder() + .secret_key(stream.secret_key.clone()) + .alpns(ALPN.iter().map(|slice| slice.to_vec()).collect()) + .relay_mode(iroh::RelayMode::Disabled) + .clear_discovery(); + + if let Some(ip) = stream.bind_ipv4 { + builder = builder.bind_addr_v4(SocketAddrV4::new(ip, stream.listen_port)); } - // We have no use of those parameters anymore - self.streams = Default::default(); + if let Some(ip) = stream.bind_ipv6 { + builder = builder.bind_addr_v6(SocketAddrV6::new(ip, stream.listen_port, 0, 0)); + } + + let endpoint = builder.bind().await.map_err(|err| { + format!( + "Could not create socket address for cluster {}: {err}", + stream.name + ) + })?; + let endpoint = Arc::new(endpoint); + + let (endpoint_addr_tx, connection_rx) = + ConnectionInitializer::new(endpoint.clone(), stream.name.clone(), stream.nodes.len()); + + for node in stream.nodes.values() { + endpoint_addr_tx.send(node.clone()).await.unwrap(); + } + + let this = Self { + // No connection for now + connections: Default::default(), + // Values passed as-is + stream, + actions, + endpoint, + shutdown, + endpoint_addr_tx, + connection_rx, + }; + tokio::spawn(async move { this.task().await }); + Ok(()) } + + async fn task(mut self) { + let action_rx = self.spawn_actions(); + + // Ok donc là il faut : + // - Que je réessaie plus tard les connections qui ont raté + // - Que j'accepte des nouvelles connections + // - Que j'ai une queue par noeud + // - Que chaque élément de la queue puisse timeout + // - Que j'envoie les messages de mes actions dans toutes les queues + // + // - Que j'écoute les messages de mes pairs et que je les renvoie à mon stream + // + // Et que je gère l'authentification en début de connection + } + + fn spawn_actions(&mut self) -> mpsc::Receiver<(Exec, bool)> { + let (tx, rx) = mpsc::channel(1); + while let Some(mut action) = self.actions.pop() { + let tx = tx.clone(); + tokio::spawn(async move { + while let Ok(Some(exec)) = action.rx.recv().await { + if let Err(err) = tx.send((exec, action.self_)).await { + eprintln!("ERROR while queueing action in cluster: {err}"); + break; + } + } + }); + } + rx + } } diff --git a/plugins/reaction-plugin-cluster/src/connection.rs b/plugins/reaction-plugin-cluster/src/connection.rs new file mode 100644 index 0000000..5cb646e --- /dev/null +++ b/plugins/reaction-plugin-cluster/src/connection.rs @@ -0,0 +1,136 @@ +use std::collections::BTreeMap; +use std::sync::Arc; +use std::time::Duration; + +use iroh::Endpoint; +use iroh::{EndpointAddr, endpoint::Connection}; +use tokio::{ + sync::mpsc, + time::{Instant, sleep, sleep_until}, +}; + +use crate::cluster::ALPN; + +const START_TIMEOUT: Duration = Duration::from_secs(5); +const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60); // 1 hour +const TIMEOUT_FACTOR: f64 = 1.5; + +pub struct ConnectionInitializer { + endpoint: Arc, + cluster_name: String, + retry_connections: BTreeMap, + endpoint_addr_rx: mpsc::Receiver, + connection_tx: mpsc::Sender, +} + +impl ConnectionInitializer { + pub fn new( + endpoint: Arc, + cluster_name: String, + cluster_size: usize, + ) -> (mpsc::Sender, mpsc::Receiver) { + let (tx1, rx1) = mpsc::channel(cluster_size); + let (tx2, rx2) = mpsc::channel(cluster_size); + + tokio::spawn(async move { + Self { + endpoint, + cluster_name, + retry_connections: Default::default(), + endpoint_addr_rx: rx1, + connection_tx: tx2, + } + .task() + .await + }); + + (tx1, rx2) + } + + async fn task(&mut self) { + let mut tick = sleep(Duration::default()); + loop { + // Uncomment this line and comment the select for faster development in this function + // let option = Some(self.endpoint_addr_rx.recv().await); + let option = tokio::select! { + endpoint_addr = self.endpoint_addr_rx.recv() => Some(endpoint_addr), + _ = tick => None, + }; + if let Some(option) = option { + if let Some(endpoint_addr) = option { + match self.try_connect(endpoint_addr).await { + Ok(connection) => { + if let Err(_) = self.connection_tx.send(connection).await { + // This means the main cluster loop has quit, so let's quit + break; + } + } + Err(endpoint_addr) => { + self.insert_address(endpoint_addr, START_TIMEOUT); + } + } + } else { + break; + } + } else { + if self + .retry_connections + .keys() + .next() + .is_some_and(|time| time < &Instant::now()) + { + let (_, (endpoint_addr, delta)) = self.retry_connections.pop_first().unwrap(); + match self.try_connect(endpoint_addr).await { + Ok(connection) => { + if let Err(_) = self.connection_tx.send(connection).await { + // This means the main cluster loop has quit, so let's quit + break; + } + } + Err(endpoint_addr) => { + // Multiply timeout by TIMEOUT_FACTOR + let delta = Duration::from_millis( + ((delta.as_millis() as f64) * TIMEOUT_FACTOR) as u64, + ); + // Cap to MAX_TIMEOUT + let delta = if delta > MAX_TIMEOUT { + MAX_TIMEOUT + } else { + delta + }; + self.insert_address(endpoint_addr, delta); + } + } + } + } + // Tick at next deadline + tick = sleep_until(*self.retry_connections.keys().next().unwrap()); + } + } + + fn insert_address(&mut self, endpoint_addr: EndpointAddr, delta: Duration) { + if !delta.is_zero() { + eprintln!( + "INFO cluster {}: retry connecting to node {} in {:?}", + self.cluster_name, endpoint_addr.id, delta + ); + } + let now = Instant::now(); + // Schedule this address for later + self.retry_connections + .insert(now + delta, (endpoint_addr, delta)); + } + + async fn try_connect(&self, addr: EndpointAddr) -> Result { + match self.endpoint.connect(addr.clone(), ALPN[0]).await { + Ok(connection) => Ok(connection), + Err(err) => { + eprintln!( + "ERROR cluster {}: connecting to node {}: {err}", + self.cluster_name, addr.id + ); + Err(addr) + } + } + } +} diff --git a/plugins/reaction-plugin-cluster/src/main.rs b/plugins/reaction-plugin-cluster/src/main.rs index 72cddb4..e40f17e 100644 --- a/plugins/reaction-plugin-cluster/src/main.rs +++ b/plugins/reaction-plugin-cluster/src/main.rs @@ -3,19 +3,21 @@ use std::{ net::{Ipv4Addr, Ipv6Addr, SocketAddr}, }; -use iroh::{Endpoint, PublicKey, SecretKey}; +use iroh::{EndpointAddr, PublicKey, SecretKey, TransportAddr}; use reaction_plugin::{ ActionImpl, Exec, Hello, Manifest, PluginInfo, RemoteResult, StreamImpl, Value, main_loop, }; use remoc::{rch::mpsc, rtc}; use serde::{Deserialize, Serialize}; -use tokio::fs; +use tokio::{fs, sync::oneshot}; mod cluster; +mod connection; mod secret_key; -use secret_key::secret_key; -use crate::secret_key::{key_b64_to_bytes, key_bytes_to_b64}; +use secret_key::{key_b64_to_bytes, key_bytes_to_b64, secret_key}; + +use crate::cluster::Cluster; #[tokio::main] async fn main() { @@ -26,8 +28,8 @@ async fn main() { #[derive(Default)] struct Plugin { streams: BTreeMap, - actions: Vec, - endpoints: BTreeMap, + actions: BTreeMap>, + cluster_shutdown: Vec>>, } /// Stream options as defined by the user @@ -61,12 +63,13 @@ fn ipv6_unspecified() -> Option { /// Stream information before start struct StreamInit { + name: String, listen_port: u16, bind_ipv4: Option, bind_ipv6: Option, shared_secret: String, secret_key: SecretKey, - nodes: Vec, + nodes: BTreeMap, tx: mpsc::Sender, } @@ -76,12 +79,6 @@ struct NodeOption { addresses: Vec, } -#[derive(Serialize, Deserialize)] -struct NodeInit { - public_key: PublicKey, - addresses: Vec, -} - #[derive(Serialize, Deserialize)] struct ActionOptions { /// The line to send to the corresponding cluster, example: "ban \" @@ -95,8 +92,8 @@ struct ActionOptions { #[derive(Serialize, Deserialize)] struct ActionInit { + name: String, send: String, - to: String, self_: bool, patterns: Vec, rx: mpsc::Receiver, @@ -138,7 +135,7 @@ impl PluginInfo for Plugin { return Err("missing shared secret: either shared_secret or shared_secret_file must be provided".into()); }; - let mut init_nodes = Vec::default(); + let mut nodes = BTreeMap::default(); for node in options.nodes.into_iter() { let bytes = key_b64_to_bytes(&node.public_key) .map_err(|err| format!("invalid public key {}: {err}", node.public_key))?; @@ -146,10 +143,17 @@ impl PluginInfo for Plugin { let public_key = PublicKey::from_bytes(&bytes) .map_err(|err| format!("invalid public key {}: {err}", node.public_key))?; - init_nodes.push(NodeInit { + nodes.insert( public_key, - addresses: node.addresses, - }); + EndpointAddr { + id: public_key, + addrs: node + .addresses + .into_iter() + .map(|addr| TransportAddr::Ip(addr)) + .collect(), + }, + ); } let secret_key = secret_key(&stream_name).await?; @@ -161,12 +165,13 @@ impl PluginInfo for Plugin { let (tx, rx) = mpsc::channel(1); let stream = StreamInit { + name: stream_name.clone(), listen_port: options.listen_port, bind_ipv4: options.bind_ipv4, bind_ipv6: options.bind_ipv6, shared_secret, secret_key, - nodes: init_nodes, + nodes, tx, }; @@ -182,9 +187,9 @@ impl PluginInfo for Plugin { async fn action_impl( &mut self, - _stream_name: String, - _filter_name: String, - _action_name: String, + stream_name: String, + filter_name: String, + action_name: String, action_type: String, config: Value, patterns: Vec, @@ -199,20 +204,48 @@ impl PluginInfo for Plugin { let (tx, rx) = mpsc::channel(1); let init_action = ActionInit { + name: format!("{}.{}.{}", stream_name, filter_name, action_name), send: options.send, - to: options.to, self_: options.self_, patterns, rx, }; - self.actions.push(init_action); + self.actions + .entry(options.to) + .or_default() + .push(init_action); Ok(ActionImpl { tx }) } async fn finish_setup(&mut self) -> RemoteResult<()> { - self.endpoint_init().await + while let Some((stream_name, stream)) = self.streams.pop_first() { + let (tx, rx) = oneshot::channel(); + self.cluster_shutdown.push(tx); + Cluster::new( + stream, + self.actions.remove(&stream_name).unwrap_or_default(), + rx, + ) + .await?; + } + // Check there is no action left + if !self.actions.is_empty() { + for (to, actions) in &self.actions { + for action in actions { + eprintln!( + "ERROR action '{}' sends 'to' unknown stream '{}'", + action.name, to + ); + } + } + return Err("at least one cluster_send action has unknown 'to'".into()); + } + // Free containers + self.actions = Default::default(); + self.streams = Default::default(); + Ok(()) } async fn close(self) -> RemoteResult<()> {