From 71d26766f82ad0b43f4ef0c598f32398cf0a233b Mon Sep 17 00:00:00 2001 From: ppom Date: Sat, 15 Nov 2025 12:00:00 +0100 Subject: [PATCH] plugin: Stream plugins now pass time information along their lines This will permit the cluster to accurately receive older-than-immediate information, and it will permit potential log plugins (journald?) to go back in time at startup. --- .../reaction-plugin-cluster/src/cluster.rs | 39 ++++++++++++------- plugins/reaction-plugin-cluster/src/main.rs | 6 +-- plugins/reaction-plugin-virtual/src/main.rs | 8 ++-- plugins/reaction-plugin/src/lib.rs | 5 ++- src/daemon/stream.rs | 13 +++---- 5 files changed, 43 insertions(+), 28 deletions(-) diff --git a/plugins/reaction-plugin-cluster/src/cluster.rs b/plugins/reaction-plugin-cluster/src/cluster.rs index 87b283e..6e6c7f0 100644 --- a/plugins/reaction-plugin-cluster/src/cluster.rs +++ b/plugins/reaction-plugin-cluster/src/cluster.rs @@ -4,8 +4,9 @@ use std::{ sync::Arc, }; -use chrono::{DateTime, Local}; +use chrono::{DateTime, Local, Utc}; use iroh::{Endpoint, EndpointAddr, endpoint::Connection}; +use reaction_plugin::Line; use tokio::sync::{mpsc, oneshot}; use crate::{ActionInit, StreamInit, endpoint::EndpointManager}; @@ -14,6 +15,8 @@ pub const ALPN: [&[u8]; 1] = ["reaction_cluster_1".as_bytes()]; type ShutdownNotification = oneshot::Receiver>; +type UtcLine = Arc<(String, DateTime)>; + pub async fn bind(stream: &StreamInit) -> Result { let mut builder = Endpoint::builder() .secret_key(stream.secret_key.clone()) @@ -52,8 +55,8 @@ pub fn cluster_tasks( fn spawn_actions( mut actions: Vec, - own_cluster_tx: remoc::rch::mpsc::Sender, -) -> mpsc::Receiver> { + own_cluster_tx: remoc::rch::mpsc::Sender, +) -> mpsc::Receiver { let (nodes_tx, nodes_rx) = mpsc::channel(1); while let Some(mut action) = actions.pop() { let nodes_tx = nodes_tx.clone(); @@ -66,8 +69,8 @@ fn spawn_actions( impl ActionInit { async fn serve( &mut self, - nodes_tx: mpsc::Sender>, - own_stream_tx: remoc::rch::mpsc::Sender, + nodes_tx: mpsc::Sender, + own_stream_tx: remoc::rch::mpsc::Sender, ) { while let Ok(Some(m)) = self.rx.recv().await { let line = if m.match_.is_empty() { @@ -79,13 +82,14 @@ impl ActionInit { acc.replace(pattern, &m.match_[i]) }) }; + let now = Local::now(); if self.self_ - && let Err(err) = own_stream_tx.send(line.clone()).await + && let Err(err) = own_stream_tx.send((line.clone(), now.clone())).await { eprintln!("ERROR while queueing message to be sent to own cluster stream: {err}"); } - let line = Arc::new(line); + let line = Arc::new((line, now.to_utc())); if let Err(err) = nodes_tx.send(line).await { eprintln!("ERROR while queueing message to be sent to cluster nodes: {err}"); }; @@ -97,11 +101,6 @@ impl ActionInit { } } -pub struct TimeMessage { - pub message: Arc, - pub timeout: DateTime, -} - pub struct ConnectionManager { endpoint: EndpointAddr, // Ask the EndpointManager to connect @@ -111,7 +110,21 @@ pub struct ConnectionManager { // The EndpointManager sending us a connection (whether we asked for it or not) connection_rx: mpsc::Receiver, // Our queue of messages to send - queue: VecDeque, + queue: VecDeque, // Messages we send from remote nodes to our own stream own_cluster_tx: remoc::rch::mpsc::Sender, } + +#[cfg(test)] +mod tests { + use chrono::{DateTime, Local}; + + fn different_local_tz_is_ok() { + let date1: DateTime = + serde_json::from_str("2025-11-02T17:47:21.716229569+01:00").unwrap(); + let date2: DateTime = + serde_json::from_str("2025-11-02T18:47:21.716229569+02:00").unwrap(); + + assert_eq!(date1.to_utc(), date2.to_utc()); + } +} diff --git a/plugins/reaction-plugin-cluster/src/main.rs b/plugins/reaction-plugin-cluster/src/main.rs index 0bd5117..9802936 100644 --- a/plugins/reaction-plugin-cluster/src/main.rs +++ b/plugins/reaction-plugin-cluster/src/main.rs @@ -6,8 +6,8 @@ use std::{ use chrono::TimeDelta; use iroh::{EndpointAddr, PublicKey, SecretKey, TransportAddr}; use reaction_plugin::{ - ActionImpl, Exec, Hello, Manifest, PluginInfo, RemoteResult, StreamImpl, Value, main_loop, - parse_duration, + ActionImpl, Exec, Hello, Line, Manifest, PluginInfo, RemoteResult, StreamImpl, Value, + main_loop, parse_duration, }; use remoc::{rch::mpsc, rtc}; use serde::{Deserialize, Serialize}; @@ -69,7 +69,7 @@ struct StreamInit { secret_key: SecretKey, message_timeout: TimeDelta, nodes: BTreeMap, - tx: mpsc::Sender, + tx: mpsc::Sender, } #[derive(Serialize, Deserialize)] diff --git a/plugins/reaction-plugin-virtual/src/main.rs b/plugins/reaction-plugin-virtual/src/main.rs index 57b8610..8bd735c 100644 --- a/plugins/reaction-plugin-virtual/src/main.rs +++ b/plugins/reaction-plugin-virtual/src/main.rs @@ -1,7 +1,7 @@ use std::collections::{BTreeMap, BTreeSet}; use reaction_plugin::{ - ActionImpl, Exec, Hello, Manifest, PluginInfo, RemoteResult, StreamImpl, Value, + ActionImpl, Exec, Hello, Line, Local, Manifest, PluginInfo, RemoteResult, StreamImpl, Value, }; use remoc::{rch::mpsc, rtc}; use serde::{Deserialize, Serialize}; @@ -105,11 +105,11 @@ impl PluginInfo for Plugin { #[derive(Clone)] struct VirtualStream { - tx: mpsc::Sender, + tx: mpsc::Sender, } impl VirtualStream { - fn new(config: Value) -> Result<(Self, mpsc::Receiver), String> { + fn new(config: Value) -> Result<(Self, mpsc::Receiver), String> { const CONFIG_ERROR: &'static str = "streams of type virtual take no options"; match config { Value::Null => (), @@ -205,7 +205,7 @@ impl VirtualAction { acc.replace(pattern, &m.match_[i]) }) }; - let result = match self.to.tx.send(line).await { + let result = match self.to.tx.send((line, Local::now())).await { Ok(_) => Ok(()), Err(err) => Err(format!("{err}")), }; diff --git a/plugins/reaction-plugin/src/lib.rs b/plugins/reaction-plugin/src/lib.rs index f24a030..2bc436e 100644 --- a/plugins/reaction-plugin/src/lib.rs +++ b/plugins/reaction-plugin/src/lib.rs @@ -92,6 +92,7 @@ use std::{ process::exit, }; +pub use chrono::{DateTime, Local}; use remoc::{ Connect, rch, rtc::{self, Server}, @@ -245,9 +246,11 @@ impl Into for Value { } } +pub type Line = (String, DateTime); + #[derive(Serialize, Deserialize)] pub struct StreamImpl { - pub stream: rch::mpsc::Receiver, + pub stream: rch::mpsc::Receiver, /// Whether this stream works standalone, or if it needs other streams to be fed. /// Defaults to true. /// When false, reaction will exit if it's the last one standing. diff --git a/src/daemon/stream.rs b/src/daemon/stream.rs index 8834311..25375f7 100644 --- a/src/daemon/stream.rs +++ b/src/daemon/stream.rs @@ -14,7 +14,7 @@ use tokio::{ use tracing::{debug, error, info}; use crate::{ - concepts::{Filter, Stream}, + concepts::{Filter, Stream, Time}, daemon::{filter::FilterManager, plugin::Plugins, utils::kill_child}, }; @@ -129,8 +129,8 @@ impl StreamManager { loop { match plugin.stream.recv().await { - Ok(Some(line)) => { - self.handle_line(line).await; + Ok(Some((line, time))) => { + self.handle_line(line, time).await; } Err(err) => { if err.is_final() { @@ -208,7 +208,7 @@ impl StreamManager { loop { match lines.next().await { Some(Ok(line)) => { - self.handle_line(line).await; + self.handle_line(line, Local::now()).await; } Some(Err(err)) => { error!( @@ -224,10 +224,9 @@ impl StreamManager { } } - async fn handle_line(&self, line: String) { - let now = Local::now(); + async fn handle_line(&self, line: String, time: Time) { for manager in self.matching_filters(&line) { - manager.handle_line(&line, now).await; + manager.handle_line(&line, time).await; } }