WIP switch to one task per connection

This commit is contained in:
ppom 2025-11-14 12:00:00 +01:00
commit 40c6202cd4
No known key found for this signature in database

View file

@ -1,6 +1,8 @@
use std::{
collections::BTreeMap,
collections::{BTreeMap, VecDeque},
net::{SocketAddrV4, SocketAddrV6},
sync::Arc,
time::Instant,
};
use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::Connection};
@ -13,86 +15,104 @@ pub const ALPN: [&[u8]; 1] = ["reaction_cluster_1".as_bytes()];
type ShutdownNotification = oneshot::Receiver<oneshot::Sender<()>>;
pub struct Cluster {
pub async fn bind(stream: &StreamInit) -> Result<Endpoint, String> {
let mut builder = Endpoint::builder()
.secret_key(stream.secret_key.clone())
.alpns(ALPN.iter().map(|slice| slice.to_vec()).collect())
.relay_mode(iroh::RelayMode::Disabled)
.clear_discovery();
if let Some(ip) = stream.bind_ipv4 {
builder = builder.bind_addr_v4(SocketAddrV4::new(ip, stream.listen_port));
}
if let Some(ip) = stream.bind_ipv6 {
builder = builder.bind_addr_v6(SocketAddrV6::new(ip, stream.listen_port, 0, 0));
}
builder.bind().await.map_err(|err| {
format!(
"Could not create socket address for cluster {}: {err}",
stream.name
)
})
}
pub fn cluster_tasks(
endpoint: Endpoint,
stream: StreamInit,
actions: Vec<ActionInit>,
shutdown: ShutdownNotification,
connections: BTreeMap<EndpointId, Connection>,
endpoint_addr_tx: mpsc::Sender<EndpointAddr>,
) {
let messages_from_actions = spawn_actions(actions, stream.tx.clone());
let (endpoint_addr_tx, connection_rx) =
EndpointManager::new(endpoint, stream.name.clone(), stream.nodes.len());
// TODO create ConnectionManagers and connect them to EndpointManager
}
fn spawn_actions(
mut actions: Vec<ActionInit>,
own_cluster_tx: remoc::rch::mpsc::Sender<String>,
) -> mpsc::Receiver<Arc<String>> {
let (nodes_tx, nodes_rx) = mpsc::channel(1);
while let Some(mut action) = actions.pop() {
let nodes_tx = nodes_tx.clone();
let own_cluster_tx = own_cluster_tx.clone();
tokio::spawn(async move { action.serve(nodes_tx, own_cluster_tx).await });
}
nodes_rx
}
impl ActionInit {
async fn serve(
&mut self,
nodes_tx: mpsc::Sender<Arc<String>>,
own_stream_tx: remoc::rch::mpsc::Sender<String>,
) {
while let Ok(Some(m)) = self.rx.recv().await {
let line = if m.match_.is_empty() {
self.send.clone()
} else {
(0..(m.match_.len()))
.zip(&self.patterns)
.fold(self.send.clone(), |acc, (i, pattern)| {
acc.replace(pattern, &m.match_[i])
})
};
if self.self_
&& let Err(err) = own_stream_tx.send(line.clone()).await
{
eprintln!("ERROR while queueing message to be sent to own cluster stream: {err}");
}
let line = Arc::new(line);
if let Err(err) = nodes_tx.send(line).await {
eprintln!("ERROR while queueing message to be sent to cluster nodes: {err}");
};
if let Err(err) = m.result.send(Ok(())) {
eprintln!("ERROR while responding to reaction action: {err}");
}
}
}
}
pub struct TimeMessage {
pub message: Arc<String>,
pub timeout: Instant,
}
pub struct ConnectionManager {
endpoint: EndpointAddr,
// Ask the EndpointManager to connect
ask_connection: mpsc::Sender<EndpointAddr>,
// Our own connection (when we have one)
connection: Option<Connection>,
// The EndpointManager sending us a connection (whether we asked for it or not)
connection_rx: mpsc::Receiver<Connection>,
}
impl Cluster {
pub async fn new(
stream: StreamInit,
actions: Vec<ActionInit>,
shutdown: ShutdownNotification,
) -> Result<(), String> {
let mut builder = Endpoint::builder()
.secret_key(stream.secret_key.clone())
.alpns(ALPN.iter().map(|slice| slice.to_vec()).collect())
.relay_mode(iroh::RelayMode::Disabled)
.clear_discovery();
if let Some(ip) = stream.bind_ipv4 {
builder = builder.bind_addr_v4(SocketAddrV4::new(ip, stream.listen_port));
}
if let Some(ip) = stream.bind_ipv6 {
builder = builder.bind_addr_v6(SocketAddrV6::new(ip, stream.listen_port, 0, 0));
}
let endpoint = builder.bind().await.map_err(|err| {
format!(
"Could not create socket address for cluster {}: {err}",
stream.name
)
})?;
let (endpoint_addr_tx, connection_rx) =
EndpointManager::new(endpoint, stream.name.clone(), stream.nodes.len());
let this = Self {
// No connection for now
connections: Default::default(),
// Values passed as-is
stream,
actions,
shutdown,
endpoint_addr_tx,
connection_rx,
};
tokio::spawn(async move { this.task().await });
Ok(())
}
async fn task(mut self) {
// Ask connections for all nodes
for node in self.stream.nodes.values() {
self.endpoint_addr_tx.send(node.clone()).await.unwrap();
}
let action_rx = self.spawn_actions();
// Ok donc là il faut :
// - Que j'ai une queue par noeud
// - Que chaque élément de la queue puisse timeout
// - Que j'envoie les messages de mes actions dans toutes les queues
// - Que j'écoute les messages de mes pairs et que je les renvoie à mon stream
}
fn spawn_actions(&mut self) -> mpsc::Receiver<(Exec, bool)> {
let (tx, rx) = mpsc::channel(1);
while let Some(mut action) = self.actions.pop() {
let tx = tx.clone();
tokio::spawn(async move {
while let Ok(Some(exec)) = action.rx.recv().await {
if let Err(err) = tx.send((exec, action.self_)).await {
eprintln!("ERROR while queueing action in cluster: {err}");
break;
}
}
});
}
rx
}
// Our queue of messages to send
queue: VecDeque<TimeMessage>,
// Messages we send from remote nodes to our own stream
own_cluster_tx: remoc::rch::mpsc::Sender<String>,
}