plugin: Stream plugins now pass time information along their lines

This will permit the cluster to accurately receive older-than-immediate
information, and it will permit potential log plugins (journald?) to go
back in time at startup.
This commit is contained in:
ppom 2025-11-15 12:00:00 +01:00
commit 71d26766f8
No known key found for this signature in database
5 changed files with 43 additions and 28 deletions

View file

@ -4,8 +4,9 @@ use std::{
sync::Arc,
};
use chrono::{DateTime, Local};
use chrono::{DateTime, Local, Utc};
use iroh::{Endpoint, EndpointAddr, endpoint::Connection};
use reaction_plugin::Line;
use tokio::sync::{mpsc, oneshot};
use crate::{ActionInit, StreamInit, endpoint::EndpointManager};
@ -14,6 +15,8 @@ pub const ALPN: [&[u8]; 1] = ["reaction_cluster_1".as_bytes()];
type ShutdownNotification = oneshot::Receiver<oneshot::Sender<()>>;
type UtcLine = Arc<(String, DateTime<Utc>)>;
pub async fn bind(stream: &StreamInit) -> Result<Endpoint, String> {
let mut builder = Endpoint::builder()
.secret_key(stream.secret_key.clone())
@ -52,8 +55,8 @@ pub fn cluster_tasks(
fn spawn_actions(
mut actions: Vec<ActionInit>,
own_cluster_tx: remoc::rch::mpsc::Sender<String>,
) -> mpsc::Receiver<Arc<String>> {
own_cluster_tx: remoc::rch::mpsc::Sender<Line>,
) -> mpsc::Receiver<UtcLine> {
let (nodes_tx, nodes_rx) = mpsc::channel(1);
while let Some(mut action) = actions.pop() {
let nodes_tx = nodes_tx.clone();
@ -66,8 +69,8 @@ fn spawn_actions(
impl ActionInit {
async fn serve(
&mut self,
nodes_tx: mpsc::Sender<Arc<String>>,
own_stream_tx: remoc::rch::mpsc::Sender<String>,
nodes_tx: mpsc::Sender<UtcLine>,
own_stream_tx: remoc::rch::mpsc::Sender<Line>,
) {
while let Ok(Some(m)) = self.rx.recv().await {
let line = if m.match_.is_empty() {
@ -79,13 +82,14 @@ impl ActionInit {
acc.replace(pattern, &m.match_[i])
})
};
let now = Local::now();
if self.self_
&& let Err(err) = own_stream_tx.send(line.clone()).await
&& let Err(err) = own_stream_tx.send((line.clone(), now.clone())).await
{
eprintln!("ERROR while queueing message to be sent to own cluster stream: {err}");
}
let line = Arc::new(line);
let line = Arc::new((line, now.to_utc()));
if let Err(err) = nodes_tx.send(line).await {
eprintln!("ERROR while queueing message to be sent to cluster nodes: {err}");
};
@ -97,11 +101,6 @@ impl ActionInit {
}
}
pub struct TimeMessage {
pub message: Arc<String>,
pub timeout: DateTime<Local>,
}
pub struct ConnectionManager {
endpoint: EndpointAddr,
// Ask the EndpointManager to connect
@ -111,7 +110,21 @@ pub struct ConnectionManager {
// The EndpointManager sending us a connection (whether we asked for it or not)
connection_rx: mpsc::Receiver<Connection>,
// Our queue of messages to send
queue: VecDeque<TimeMessage>,
queue: VecDeque<UtcLine>,
// Messages we send from remote nodes to our own stream
own_cluster_tx: remoc::rch::mpsc::Sender<String>,
}
#[cfg(test)]
mod tests {
use chrono::{DateTime, Local};
fn different_local_tz_is_ok() {
let date1: DateTime<Local> =
serde_json::from_str("2025-11-02T17:47:21.716229569+01:00").unwrap();
let date2: DateTime<Local> =
serde_json::from_str("2025-11-02T18:47:21.716229569+02:00").unwrap();
assert_eq!(date1.to_utc(), date2.to_utc());
}
}

View file

@ -6,8 +6,8 @@ use std::{
use chrono::TimeDelta;
use iroh::{EndpointAddr, PublicKey, SecretKey, TransportAddr};
use reaction_plugin::{
ActionImpl, Exec, Hello, Manifest, PluginInfo, RemoteResult, StreamImpl, Value, main_loop,
parse_duration,
ActionImpl, Exec, Hello, Line, Manifest, PluginInfo, RemoteResult, StreamImpl, Value,
main_loop, parse_duration,
};
use remoc::{rch::mpsc, rtc};
use serde::{Deserialize, Serialize};
@ -69,7 +69,7 @@ struct StreamInit {
secret_key: SecretKey,
message_timeout: TimeDelta,
nodes: BTreeMap<PublicKey, EndpointAddr>,
tx: mpsc::Sender<String>,
tx: mpsc::Sender<Line>,
}
#[derive(Serialize, Deserialize)]

View file

@ -1,7 +1,7 @@
use std::collections::{BTreeMap, BTreeSet};
use reaction_plugin::{
ActionImpl, Exec, Hello, Manifest, PluginInfo, RemoteResult, StreamImpl, Value,
ActionImpl, Exec, Hello, Line, Local, Manifest, PluginInfo, RemoteResult, StreamImpl, Value,
};
use remoc::{rch::mpsc, rtc};
use serde::{Deserialize, Serialize};
@ -105,11 +105,11 @@ impl PluginInfo for Plugin {
#[derive(Clone)]
struct VirtualStream {
tx: mpsc::Sender<String>,
tx: mpsc::Sender<Line>,
}
impl VirtualStream {
fn new(config: Value) -> Result<(Self, mpsc::Receiver<String>), String> {
fn new(config: Value) -> Result<(Self, mpsc::Receiver<Line>), String> {
const CONFIG_ERROR: &'static str = "streams of type virtual take no options";
match config {
Value::Null => (),
@ -205,7 +205,7 @@ impl VirtualAction {
acc.replace(pattern, &m.match_[i])
})
};
let result = match self.to.tx.send(line).await {
let result = match self.to.tx.send((line, Local::now())).await {
Ok(_) => Ok(()),
Err(err) => Err(format!("{err}")),
};

View file

@ -92,6 +92,7 @@ use std::{
process::exit,
};
pub use chrono::{DateTime, Local};
use remoc::{
Connect, rch,
rtc::{self, Server},
@ -245,9 +246,11 @@ impl Into<JValue> for Value {
}
}
pub type Line = (String, DateTime<Local>);
#[derive(Serialize, Deserialize)]
pub struct StreamImpl {
pub stream: rch::mpsc::Receiver<String>,
pub stream: rch::mpsc::Receiver<Line>,
/// Whether this stream works standalone, or if it needs other streams to be fed.
/// Defaults to true.
/// When false, reaction will exit if it's the last one standing.

View file

@ -14,7 +14,7 @@ use tokio::{
use tracing::{debug, error, info};
use crate::{
concepts::{Filter, Stream},
concepts::{Filter, Stream, Time},
daemon::{filter::FilterManager, plugin::Plugins, utils::kill_child},
};
@ -129,8 +129,8 @@ impl StreamManager {
loop {
match plugin.stream.recv().await {
Ok(Some(line)) => {
self.handle_line(line).await;
Ok(Some((line, time))) => {
self.handle_line(line, time).await;
}
Err(err) => {
if err.is_final() {
@ -208,7 +208,7 @@ impl StreamManager {
loop {
match lines.next().await {
Some(Ok(line)) => {
self.handle_line(line).await;
self.handle_line(line, Local::now()).await;
}
Some(Err(err)) => {
error!(
@ -224,10 +224,9 @@ impl StreamManager {
}
}
async fn handle_line(&self, line: String) {
let now = Local::now();
async fn handle_line(&self, line: String, time: Time) {
for manager in self.matching_filters(&line) {
manager.handle_line(&line, now).await;
manager.handle_line(&line, time).await;
}
}