cluster initialization

- Actions are connected to Cluster,
- Separate task to (re)initialize connections
This commit is contained in:
ppom 2025-11-03 12:00:00 +01:00
commit 983eff13eb
No known key found for this signature in database
3 changed files with 297 additions and 57 deletions

View file

@ -1,36 +1,107 @@
use std::net::{SocketAddrV4, SocketAddrV6};
use std::{
collections::BTreeMap,
net::{SocketAddrV4, SocketAddrV6},
sync::Arc,
};
use iroh::Endpoint;
use reaction_plugin::RemoteResult;
use iroh::{Endpoint, EndpointAddr, EndpointId, endpoint::Connection};
use reaction_plugin::Exec;
use tokio::sync::{mpsc, oneshot};
use crate::Plugin;
use crate::{ActionInit, StreamInit, connection::ConnectionInitializer};
const ALPN: [&[u8]; 1] = ["reaction_cluster_1".as_bytes()];
pub const ALPN: [&[u8]; 1] = ["reaction_cluster_1".as_bytes()];
impl Plugin {
pub async fn endpoint_init(&mut self) -> RemoteResult<()> {
// while let Some((stream_name, stream)) = self.streams.pop_first() {
for (stream_name, stream) in &self.streams {
let mut builder = Endpoint::builder()
.secret_key(stream.secret_key.clone())
.alpns(ALPN.iter().map(|slice| slice.to_vec()).collect())
.relay_mode(iroh::RelayMode::Disabled)
.clear_discovery();
type ShutdownNotification = oneshot::Receiver<oneshot::Sender<()>>;
if let Some(ip) = stream.bind_ipv4 {
builder = builder.bind_addr_v4(SocketAddrV4::new(ip, stream.listen_port));
}
if let Some(ip) = stream.bind_ipv6 {
builder = builder.bind_addr_v6(SocketAddrV6::new(ip, stream.listen_port, 0, 0));
}
pub struct Cluster {
endpoint: Arc<Endpoint>,
stream: StreamInit,
actions: Vec<ActionInit>,
shutdown: ShutdownNotification,
connections: BTreeMap<EndpointId, Connection>,
endpoint_addr_tx: mpsc::Sender<EndpointAddr>,
connection_rx: mpsc::Receiver<Connection>,
}
let endpoint = builder.bind().await.map_err(|err| {
format!("Could not create socket address for cluster {stream_name}: {err}")
})?;
self.endpoints.insert(stream_name.clone(), endpoint);
impl Cluster {
pub async fn new(
stream: StreamInit,
actions: Vec<ActionInit>,
shutdown: ShutdownNotification,
) -> Result<(), String> {
let mut builder = Endpoint::builder()
.secret_key(stream.secret_key.clone())
.alpns(ALPN.iter().map(|slice| slice.to_vec()).collect())
.relay_mode(iroh::RelayMode::Disabled)
.clear_discovery();
if let Some(ip) = stream.bind_ipv4 {
builder = builder.bind_addr_v4(SocketAddrV4::new(ip, stream.listen_port));
}
// We have no use of those parameters anymore
self.streams = Default::default();
if let Some(ip) = stream.bind_ipv6 {
builder = builder.bind_addr_v6(SocketAddrV6::new(ip, stream.listen_port, 0, 0));
}
let endpoint = builder.bind().await.map_err(|err| {
format!(
"Could not create socket address for cluster {}: {err}",
stream.name
)
})?;
let endpoint = Arc::new(endpoint);
let (endpoint_addr_tx, connection_rx) =
ConnectionInitializer::new(endpoint.clone(), stream.name.clone(), stream.nodes.len());
for node in stream.nodes.values() {
endpoint_addr_tx.send(node.clone()).await.unwrap();
}
let this = Self {
// No connection for now
connections: Default::default(),
// Values passed as-is
stream,
actions,
endpoint,
shutdown,
endpoint_addr_tx,
connection_rx,
};
tokio::spawn(async move { this.task().await });
Ok(())
}
async fn task(mut self) {
let action_rx = self.spawn_actions();
// Ok donc là il faut :
// - Que je réessaie plus tard les connections qui ont raté
// - Que j'accepte des nouvelles connections
// - Que j'ai une queue par noeud
// - Que chaque élément de la queue puisse timeout
// - Que j'envoie les messages de mes actions dans toutes les queues
//
// - Que j'écoute les messages de mes pairs et que je les renvoie à mon stream
//
// Et que je gère l'authentification en début de connection
}
fn spawn_actions(&mut self) -> mpsc::Receiver<(Exec, bool)> {
let (tx, rx) = mpsc::channel(1);
while let Some(mut action) = self.actions.pop() {
let tx = tx.clone();
tokio::spawn(async move {
while let Ok(Some(exec)) = action.rx.recv().await {
if let Err(err) = tx.send((exec, action.self_)).await {
eprintln!("ERROR while queueing action in cluster: {err}");
break;
}
}
});
}
rx
}
}

View file

@ -0,0 +1,136 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use iroh::Endpoint;
use iroh::{EndpointAddr, endpoint::Connection};
use tokio::{
sync::mpsc,
time::{Instant, sleep, sleep_until},
};
use crate::cluster::ALPN;
const START_TIMEOUT: Duration = Duration::from_secs(5);
const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60); // 1 hour
const TIMEOUT_FACTOR: f64 = 1.5;
pub struct ConnectionInitializer {
endpoint: Arc<Endpoint>,
cluster_name: String,
retry_connections: BTreeMap<Instant, (EndpointAddr, Duration)>,
endpoint_addr_rx: mpsc::Receiver<EndpointAddr>,
connection_tx: mpsc::Sender<Connection>,
}
impl ConnectionInitializer {
pub fn new(
endpoint: Arc<Endpoint>,
cluster_name: String,
cluster_size: usize,
) -> (mpsc::Sender<EndpointAddr>, mpsc::Receiver<Connection>) {
let (tx1, rx1) = mpsc::channel(cluster_size);
let (tx2, rx2) = mpsc::channel(cluster_size);
tokio::spawn(async move {
Self {
endpoint,
cluster_name,
retry_connections: Default::default(),
endpoint_addr_rx: rx1,
connection_tx: tx2,
}
.task()
.await
});
(tx1, rx2)
}
async fn task(&mut self) {
let mut tick = sleep(Duration::default());
loop {
// Uncomment this line and comment the select for faster development in this function
// let option = Some(self.endpoint_addr_rx.recv().await);
let option = tokio::select! {
endpoint_addr = self.endpoint_addr_rx.recv() => Some(endpoint_addr),
_ = tick => None,
};
if let Some(option) = option {
if let Some(endpoint_addr) = option {
match self.try_connect(endpoint_addr).await {
Ok(connection) => {
if let Err(_) = self.connection_tx.send(connection).await {
// This means the main cluster loop has quit, so let's quit
break;
}
}
Err(endpoint_addr) => {
self.insert_address(endpoint_addr, START_TIMEOUT);
}
}
} else {
break;
}
} else {
if self
.retry_connections
.keys()
.next()
.is_some_and(|time| time < &Instant::now())
{
let (_, (endpoint_addr, delta)) = self.retry_connections.pop_first().unwrap();
match self.try_connect(endpoint_addr).await {
Ok(connection) => {
if let Err(_) = self.connection_tx.send(connection).await {
// This means the main cluster loop has quit, so let's quit
break;
}
}
Err(endpoint_addr) => {
// Multiply timeout by TIMEOUT_FACTOR
let delta = Duration::from_millis(
((delta.as_millis() as f64) * TIMEOUT_FACTOR) as u64,
);
// Cap to MAX_TIMEOUT
let delta = if delta > MAX_TIMEOUT {
MAX_TIMEOUT
} else {
delta
};
self.insert_address(endpoint_addr, delta);
}
}
}
}
// Tick at next deadline
tick = sleep_until(*self.retry_connections.keys().next().unwrap());
}
}
fn insert_address(&mut self, endpoint_addr: EndpointAddr, delta: Duration) {
if !delta.is_zero() {
eprintln!(
"INFO cluster {}: retry connecting to node {} in {:?}",
self.cluster_name, endpoint_addr.id, delta
);
}
let now = Instant::now();
// Schedule this address for later
self.retry_connections
.insert(now + delta, (endpoint_addr, delta));
}
async fn try_connect(&self, addr: EndpointAddr) -> Result<Connection, EndpointAddr> {
match self.endpoint.connect(addr.clone(), ALPN[0]).await {
Ok(connection) => Ok(connection),
Err(err) => {
eprintln!(
"ERROR cluster {}: connecting to node {}: {err}",
self.cluster_name, addr.id
);
Err(addr)
}
}
}
}

View file

@ -3,19 +3,21 @@ use std::{
net::{Ipv4Addr, Ipv6Addr, SocketAddr},
};
use iroh::{Endpoint, PublicKey, SecretKey};
use iroh::{EndpointAddr, PublicKey, SecretKey, TransportAddr};
use reaction_plugin::{
ActionImpl, Exec, Hello, Manifest, PluginInfo, RemoteResult, StreamImpl, Value, main_loop,
};
use remoc::{rch::mpsc, rtc};
use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::{fs, sync::oneshot};
mod cluster;
mod connection;
mod secret_key;
use secret_key::secret_key;
use crate::secret_key::{key_b64_to_bytes, key_bytes_to_b64};
use secret_key::{key_b64_to_bytes, key_bytes_to_b64, secret_key};
use crate::cluster::Cluster;
#[tokio::main]
async fn main() {
@ -26,8 +28,8 @@ async fn main() {
#[derive(Default)]
struct Plugin {
streams: BTreeMap<String, StreamInit>,
actions: Vec<ActionInit>,
endpoints: BTreeMap<String, Endpoint>,
actions: BTreeMap<String, Vec<ActionInit>>,
cluster_shutdown: Vec<oneshot::Sender<oneshot::Sender<()>>>,
}
/// Stream options as defined by the user
@ -61,12 +63,13 @@ fn ipv6_unspecified() -> Option<Ipv6Addr> {
/// Stream information before start
struct StreamInit {
name: String,
listen_port: u16,
bind_ipv4: Option<Ipv4Addr>,
bind_ipv6: Option<Ipv6Addr>,
shared_secret: String,
secret_key: SecretKey,
nodes: Vec<NodeInit>,
nodes: BTreeMap<PublicKey, EndpointAddr>,
tx: mpsc::Sender<String>,
}
@ -76,12 +79,6 @@ struct NodeOption {
addresses: Vec<SocketAddr>,
}
#[derive(Serialize, Deserialize)]
struct NodeInit {
public_key: PublicKey,
addresses: Vec<SocketAddr>,
}
#[derive(Serialize, Deserialize)]
struct ActionOptions {
/// The line to send to the corresponding cluster, example: "ban \<ip\>"
@ -95,8 +92,8 @@ struct ActionOptions {
#[derive(Serialize, Deserialize)]
struct ActionInit {
name: String,
send: String,
to: String,
self_: bool,
patterns: Vec<String>,
rx: mpsc::Receiver<Exec>,
@ -138,7 +135,7 @@ impl PluginInfo for Plugin {
return Err("missing shared secret: either shared_secret or shared_secret_file must be provided".into());
};
let mut init_nodes = Vec::default();
let mut nodes = BTreeMap::default();
for node in options.nodes.into_iter() {
let bytes = key_b64_to_bytes(&node.public_key)
.map_err(|err| format!("invalid public key {}: {err}", node.public_key))?;
@ -146,10 +143,17 @@ impl PluginInfo for Plugin {
let public_key = PublicKey::from_bytes(&bytes)
.map_err(|err| format!("invalid public key {}: {err}", node.public_key))?;
init_nodes.push(NodeInit {
nodes.insert(
public_key,
addresses: node.addresses,
});
EndpointAddr {
id: public_key,
addrs: node
.addresses
.into_iter()
.map(|addr| TransportAddr::Ip(addr))
.collect(),
},
);
}
let secret_key = secret_key(&stream_name).await?;
@ -161,12 +165,13 @@ impl PluginInfo for Plugin {
let (tx, rx) = mpsc::channel(1);
let stream = StreamInit {
name: stream_name.clone(),
listen_port: options.listen_port,
bind_ipv4: options.bind_ipv4,
bind_ipv6: options.bind_ipv6,
shared_secret,
secret_key,
nodes: init_nodes,
nodes,
tx,
};
@ -182,9 +187,9 @@ impl PluginInfo for Plugin {
async fn action_impl(
&mut self,
_stream_name: String,
_filter_name: String,
_action_name: String,
stream_name: String,
filter_name: String,
action_name: String,
action_type: String,
config: Value,
patterns: Vec<String>,
@ -199,20 +204,48 @@ impl PluginInfo for Plugin {
let (tx, rx) = mpsc::channel(1);
let init_action = ActionInit {
name: format!("{}.{}.{}", stream_name, filter_name, action_name),
send: options.send,
to: options.to,
self_: options.self_,
patterns,
rx,
};
self.actions.push(init_action);
self.actions
.entry(options.to)
.or_default()
.push(init_action);
Ok(ActionImpl { tx })
}
async fn finish_setup(&mut self) -> RemoteResult<()> {
self.endpoint_init().await
while let Some((stream_name, stream)) = self.streams.pop_first() {
let (tx, rx) = oneshot::channel();
self.cluster_shutdown.push(tx);
Cluster::new(
stream,
self.actions.remove(&stream_name).unwrap_or_default(),
rx,
)
.await?;
}
// Check there is no action left
if !self.actions.is_empty() {
for (to, actions) in &self.actions {
for action in actions {
eprintln!(
"ERROR action '{}' sends 'to' unknown stream '{}'",
action.name, to
);
}
}
return Err("at least one cluster_send action has unknown 'to'".into());
}
// Free containers
self.actions = Default::default();
self.streams = Default::default();
Ok(())
}
async fn close(self) -> RemoteResult<()> {