From 40c6202cd43fc14f7418b0320e0b1da6a5711b9f Mon Sep 17 00:00:00 2001 From: ppom Date: Fri, 14 Nov 2025 12:00:00 +0100 Subject: [PATCH] WIP switch to one task per connection --- .../reaction-plugin-cluster/src/cluster.rs | 178 ++++++++++-------- 1 file changed, 99 insertions(+), 79 deletions(-) diff --git a/plugins/reaction-plugin-cluster/src/cluster.rs b/plugins/reaction-plugin-cluster/src/cluster.rs index dd17465..c32d21a 100644 --- a/plugins/reaction-plugin-cluster/src/cluster.rs +++ b/plugins/reaction-plugin-cluster/src/cluster.rs @@ -1,6 +1,8 @@ use std::{ - collections::BTreeMap, + collections::{BTreeMap, VecDeque}, net::{SocketAddrV4, SocketAddrV6}, + sync::Arc, + time::Instant, }; use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::Connection}; @@ -13,86 +15,104 @@ pub const ALPN: [&[u8]; 1] = ["reaction_cluster_1".as_bytes()]; type ShutdownNotification = oneshot::Receiver>; -pub struct Cluster { +pub async fn bind(stream: &StreamInit) -> Result { + let mut builder = Endpoint::builder() + .secret_key(stream.secret_key.clone()) + .alpns(ALPN.iter().map(|slice| slice.to_vec()).collect()) + .relay_mode(iroh::RelayMode::Disabled) + .clear_discovery(); + + if let Some(ip) = stream.bind_ipv4 { + builder = builder.bind_addr_v4(SocketAddrV4::new(ip, stream.listen_port)); + } + if let Some(ip) = stream.bind_ipv6 { + builder = builder.bind_addr_v6(SocketAddrV6::new(ip, stream.listen_port, 0, 0)); + } + + builder.bind().await.map_err(|err| { + format!( + "Could not create socket address for cluster {}: {err}", + stream.name + ) + }) +} + +pub fn cluster_tasks( + endpoint: Endpoint, stream: StreamInit, actions: Vec, shutdown: ShutdownNotification, - connections: BTreeMap, - endpoint_addr_tx: mpsc::Sender, +) { + let messages_from_actions = spawn_actions(actions, stream.tx.clone()); + + let (endpoint_addr_tx, connection_rx) = + EndpointManager::new(endpoint, stream.name.clone(), stream.nodes.len()); + + // TODO create ConnectionManagers and connect them to EndpointManager +} + +fn spawn_actions( + mut actions: Vec, + own_cluster_tx: remoc::rch::mpsc::Sender, +) -> mpsc::Receiver> { + let (nodes_tx, nodes_rx) = mpsc::channel(1); + while let Some(mut action) = actions.pop() { + let nodes_tx = nodes_tx.clone(); + let own_cluster_tx = own_cluster_tx.clone(); + tokio::spawn(async move { action.serve(nodes_tx, own_cluster_tx).await }); + } + nodes_rx +} + +impl ActionInit { + async fn serve( + &mut self, + nodes_tx: mpsc::Sender>, + own_stream_tx: remoc::rch::mpsc::Sender, + ) { + while let Ok(Some(m)) = self.rx.recv().await { + let line = if m.match_.is_empty() { + self.send.clone() + } else { + (0..(m.match_.len())) + .zip(&self.patterns) + .fold(self.send.clone(), |acc, (i, pattern)| { + acc.replace(pattern, &m.match_[i]) + }) + }; + if self.self_ + && let Err(err) = own_stream_tx.send(line.clone()).await + { + eprintln!("ERROR while queueing message to be sent to own cluster stream: {err}"); + } + + let line = Arc::new(line); + if let Err(err) = nodes_tx.send(line).await { + eprintln!("ERROR while queueing message to be sent to cluster nodes: {err}"); + }; + + if let Err(err) = m.result.send(Ok(())) { + eprintln!("ERROR while responding to reaction action: {err}"); + } + } + } +} + +pub struct TimeMessage { + pub message: Arc, + pub timeout: Instant, +} + +pub struct ConnectionManager { + endpoint: EndpointAddr, + // Ask the EndpointManager to connect + ask_connection: mpsc::Sender, + // Our own connection (when we have one) + connection: Option, + // The EndpointManager sending us a connection (whether we asked for it or not) connection_rx: mpsc::Receiver, -} - -impl Cluster { - pub async fn new( - stream: StreamInit, - actions: Vec, - shutdown: ShutdownNotification, - ) -> Result<(), String> { - let mut builder = Endpoint::builder() - .secret_key(stream.secret_key.clone()) - .alpns(ALPN.iter().map(|slice| slice.to_vec()).collect()) - .relay_mode(iroh::RelayMode::Disabled) - .clear_discovery(); - - if let Some(ip) = stream.bind_ipv4 { - builder = builder.bind_addr_v4(SocketAddrV4::new(ip, stream.listen_port)); - } - if let Some(ip) = stream.bind_ipv6 { - builder = builder.bind_addr_v6(SocketAddrV6::new(ip, stream.listen_port, 0, 0)); - } - - let endpoint = builder.bind().await.map_err(|err| { - format!( - "Could not create socket address for cluster {}: {err}", - stream.name - ) - })?; - - let (endpoint_addr_tx, connection_rx) = - EndpointManager::new(endpoint, stream.name.clone(), stream.nodes.len()); - - let this = Self { - // No connection for now - connections: Default::default(), - // Values passed as-is - stream, - actions, - shutdown, - endpoint_addr_tx, - connection_rx, - }; - tokio::spawn(async move { this.task().await }); - - Ok(()) - } - - async fn task(mut self) { - // Ask connections for all nodes - for node in self.stream.nodes.values() { - self.endpoint_addr_tx.send(node.clone()).await.unwrap(); - } - let action_rx = self.spawn_actions(); - - // Ok donc là il faut : - // - Que j'ai une queue par noeud - // - Que chaque élément de la queue puisse timeout - // - Que j'envoie les messages de mes actions dans toutes les queues - // - Que j'écoute les messages de mes pairs et que je les renvoie à mon stream - } - - fn spawn_actions(&mut self) -> mpsc::Receiver<(Exec, bool)> { - let (tx, rx) = mpsc::channel(1); - while let Some(mut action) = self.actions.pop() { - let tx = tx.clone(); - tokio::spawn(async move { - while let Ok(Some(exec)) = action.rx.recv().await { - if let Err(err) = tx.send((exec, action.self_)).await { - eprintln!("ERROR while queueing action in cluster: {err}"); - break; - } - } - }); - } - rx - } + // Our queue of messages to send + queue: VecDeque, + // Messages we send from remote nodes to our own stream + own_cluster_tx: remoc::rch::mpsc::Sender, }