diff --git a/Cargo.toml b/Cargo.toml index 51c2a45..682e1e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,7 @@ chrono = { version = "0.4.38", features = ["std", "clock", "serde"] } futures = "0.3.30" remoc = { version = "0.18.3" } serde = { version = "1.0.203", features = ["derive"] } -serde_json = "1.0.117" +serde_json = { version = "1.0.117", features = ["arbitrary_precision"] } tokio = { version = "1.40.0" } tokio-util = { version = "0.7.12" } reaction-plugin = { path = "plugins/reaction-plugin" } diff --git a/TODO b/TODO index 557559e..0cbaa1c 100644 --- a/TODO +++ b/TODO @@ -1,4 +1,5 @@ Test what happens when a Filter's pattern Set changes (I think it's shitty) DB: add tests on stress testing (lines should always be in order) +conf: merge filters plugins: pipe stderr too and wrap errors in logs plugins: provide treedb storage? omg (add an enum that's either remoc::rch::mpsc or tokio::mpsc) diff --git a/plugins/reaction-plugin-cluster/src/cluster.rs b/plugins/reaction-plugin-cluster/src/cluster.rs index ee31fb8..5992d18 100644 --- a/plugins/reaction-plugin-cluster/src/cluster.rs +++ b/plugins/reaction-plugin-cluster/src/cluster.rs @@ -2,10 +2,9 @@ use std::{ collections::BTreeMap, net::{SocketAddrV4, SocketAddrV6}, sync::Arc, - time::Duration, + time::{Duration, SystemTime, UNIX_EPOCH}, }; -use chrono::{DateTime, Local, Utc}; use futures::future::join_all; use iroh::{ Endpoint, PublicKey, @@ -19,7 +18,7 @@ use crate::{ActionInit, StreamInit, connection::ConnectionManager, endpoint::End pub const ALPN: [&[u8]; 1] = ["reaction_cluster_1".as_bytes()]; -pub type UtcLine = Arc<(String, DateTime)>; +pub type UtcLine = Arc<(String, Duration)>; pub async fn bind(stream: &StreamInit) -> Result { // FIXME higher timeouts and keep alive @@ -150,14 +149,15 @@ impl ActionInit { acc.replace(pattern, &m.match_[i]) }) }; - let now = Local::now(); + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); if self.self_ && let Err(err) = own_stream_tx.send((line.clone(), now.clone())).await { eprintln!("ERROR while queueing message to be sent to own cluster stream: {err}"); } - let line = Arc::new((line, now.to_utc())); + // TODO to_utc + let line = Arc::new((line, now)); for result in join_all(nodes_tx.iter().map(|tx| tx.send(line.clone()))).await { if let Err(err) = result { eprintln!("ERROR while queueing message to be sent to cluster nodes: {err}"); diff --git a/plugins/reaction-plugin-cluster/src/connection.rs b/plugins/reaction-plugin-cluster/src/connection.rs index 0c1ea7e..51c7697 100644 --- a/plugins/reaction-plugin-cluster/src/connection.rs +++ b/plugins/reaction-plugin-cluster/src/connection.rs @@ -1,6 +1,11 @@ -use std::{cmp::max, collections::VecDeque, io::Error as IoError, sync::Arc}; +use std::{ + cmp::max, + collections::VecDeque, + io::Error as IoError, + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; -use chrono::{DateTime, Local, TimeDelta, TimeZone, Utc}; use futures::FutureExt; use iroh::{ Endpoint, EndpointAddr, @@ -24,7 +29,7 @@ use crate::{ const PROTOCOL_VERSION: u32 = 1; -type RemoteLine = (String, DateTime); +type RemoteLine = (String, Duration); type MaybeRemoteLine = Result, IoError>; enum Event { @@ -44,7 +49,7 @@ struct OwnConnection { line_tx: BufWriter, line_rx: BufReader, - next_time: Option>, + next_time: Option, next_len: Option, next_line: Option>, } @@ -67,7 +72,7 @@ impl OwnConnection { /// Send a line to peer async fn send_line(&mut self, line: RemoteLine) -> Result<(), std::io::Error> { - self.line_tx.write_i64(line.1.timestamp_micros()).await?; + self.line_tx.write_u64(line.1.as_micros() as u64).await?; self.line_tx.write_u32(line.0.len() as u32).await?; self.line_tx.write_all(line.0.as_bytes()).await?; self.line_tx.flush().await?; @@ -78,7 +83,7 @@ impl OwnConnection { /// Returns None if we don't have all data yet. async fn recv_line(&mut self) -> Result, std::io::Error> { if self.next_time.is_none() { - self.next_time = Some(Utc.timestamp_nanos(self.line_rx.read_i64().await?)); + self.next_time = Some(Duration::from_micros(self.line_rx.read_u64().await?)); } // Ok we have next_time.is_some() if self.next_len.is_none() { @@ -140,7 +145,7 @@ pub struct ConnectionManager { last_connexion_id: u64, /// Max duration before we drop pending messages to a node we can't connect to. - message_timeout: TimeDelta, + message_timeout: Duration, /// Message we receive from actions message_rx: mpsc::Receiver, /// Our queue of messages to send @@ -160,7 +165,7 @@ impl ConnectionManager { endpoint: Arc, connection_tx: mpsc::Sender, connection_rx: mpsc::Receiver, - message_timeout: TimeDelta, + message_timeout: Duration, message_rx: mpsc::Receiver, own_cluster_tx: remoc::rch::mpsc::Sender, shutdown: ShutdownController, @@ -253,7 +258,7 @@ impl ConnectionManager { } async fn drop_timeout_messages(&mut self) { - let now = Utc::now(); + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); let mut count = 0; while self .message_queue @@ -330,7 +335,8 @@ impl ConnectionManager { self.close_connection(1, b"you closed your stream").await; } Ok(Some(line)) => { - let local_time = line.1.with_timezone(&Local); + // TODO from_utc + let local_time = line.1; if let Err(err) = self.own_cluster_tx.send((line.0.clone(), local_time)).await { eprintln!( "ERROR cluster {}: could not send message to reaction stream: {err}", @@ -366,10 +372,8 @@ impl ConnectionManager { } Some(message) => match &mut self.connection { Some(connection) => { - if let Err(err) = connection - .send_line((message.0.clone(), message.1.with_timezone(&Utc))) - .await - { + // TODO to_utc + if let Err(err) = connection.send_line((message.0.clone(), message.1)).await { eprintln!( "INFO cluster {}: connection with node {} failed: {err}", self.cluster_name, self.node_id, diff --git a/plugins/reaction-plugin-cluster/src/main.rs b/plugins/reaction-plugin-cluster/src/main.rs index cd8875a..ed69cc5 100644 --- a/plugins/reaction-plugin-cluster/src/main.rs +++ b/plugins/reaction-plugin-cluster/src/main.rs @@ -1,9 +1,9 @@ use std::{ collections::{BTreeMap, BTreeSet}, net::{Ipv4Addr, Ipv6Addr, SocketAddr}, + time::Duration, }; -use chrono::TimeDelta; use iroh::{EndpointAddr, PublicKey, SecretKey, TransportAddr}; use reaction_plugin::{ ActionImpl, Exec, Hello, Line, Manifest, PluginInfo, RemoteResult, StreamImpl, Value, @@ -74,7 +74,7 @@ struct StreamInit { bind_ipv4: Option, bind_ipv6: Option, secret_key: SecretKey, - message_timeout: TimeDelta, + message_timeout: Duration, nodes: BTreeMap, tx: mpsc::Sender, } diff --git a/plugins/reaction-plugin-virtual/src/tests.rs b/plugins/reaction-plugin-virtual/src/tests.rs index baea0de..9b3dec6 100644 --- a/plugins/reaction-plugin-virtual/src/tests.rs +++ b/plugins/reaction-plugin-virtual/src/tests.rs @@ -1,4 +1,6 @@ -use reaction_plugin::{Exec, Local, PluginInfo, Value}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use reaction_plugin::{Exec, PluginInfo, Value}; use remoc::rch::oneshot; use serde_json::json; @@ -178,7 +180,7 @@ async fn run_simple() { for m in ["test1", "test2", "test3", " a a a aa a a"] { let (tx, rx) = oneshot::channel(); - let time = Local::now(); + let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); assert!( action .tx @@ -233,7 +235,7 @@ async fn run_two_actions() { assert!(plugin.finish_setup().await.is_ok()); - let time = Local::now(); + let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); let (tx, rx1) = oneshot::channel(); assert!( diff --git a/plugins/reaction-plugin/src/lib.rs b/plugins/reaction-plugin/src/lib.rs index 395db9d..73da253 100644 --- a/plugins/reaction-plugin/src/lib.rs +++ b/plugins/reaction-plugin/src/lib.rs @@ -101,9 +101,9 @@ use std::{ error::Error, fmt::Display, process::exit, + time::Duration, }; -pub use chrono::{DateTime, Local}; use remoc::{ Connect, rch, rtc::{self, Server}, @@ -258,7 +258,7 @@ impl Into for Value { } } -pub type Line = (String, DateTime); +pub type Line = (String, Duration); #[derive(Debug, Serialize, Deserialize)] pub struct StreamImpl { @@ -294,7 +294,7 @@ pub struct ActionImpl { pub struct Exec { pub match_: Vec, pub result: rch::oneshot::Sender>, - pub time: DateTime, + pub time: Duration, } /// The main loop for a plugin. diff --git a/plugins/reaction-plugin/src/time.rs b/plugins/reaction-plugin/src/time.rs index 9a2694b..d682429 100644 --- a/plugins/reaction-plugin/src/time.rs +++ b/plugins/reaction-plugin/src/time.rs @@ -1,7 +1,7 @@ -use chrono::TimeDelta; +use std::time::Duration; -/// Parses the &str argument as a TimeDelta -/// Returns Ok(TimeDelta) if successful, or Err(String). +/// Parses the &str argument as a Duration +/// Returns Ok(Duration) if successful, or Err(String). /// /// Format is defined as follows: ` ` /// - whitespace between the integer and unit is optional @@ -12,7 +12,7 @@ use chrono::TimeDelta; /// - `m` / `min` / `mins` / `minute` / `minutes` /// - `h` / `hour` / `hours` /// - `d` / `day` / `days` -pub fn parse_duration(d: &str) -> Result { +pub fn parse_duration(d: &str) -> Result { let d_trimmed = d.trim(); let chars = d_trimmed.as_bytes(); let mut value = 0; @@ -24,14 +24,14 @@ pub fn parse_duration(d: &str) -> Result { if i == 0 { return Err(format!("duration '{}' doesn't start with digits", d)); } - let ok_as = |func: fn(i64) -> TimeDelta| -> Result<_, String> { Ok(func(value as i64)) }; + let ok_as = |func: fn(u64) -> Duration| -> Result<_, String> { Ok(func(value as u64)) }; match d_trimmed[i..].trim() { - "ms" | "millis" | "millisecond" | "milliseconds" => ok_as(TimeDelta::milliseconds), - "s" | "sec" | "secs" | "second" | "seconds" => ok_as(TimeDelta::seconds), - "m" | "min" | "mins" | "minute" | "minutes" => ok_as(TimeDelta::minutes), - "h" | "hour" | "hours" => ok_as(TimeDelta::hours), - "d" | "day" | "days" => ok_as(TimeDelta::days), + "ms" | "millis" | "millisecond" | "milliseconds" => ok_as(Duration::from_millis), + "s" | "sec" | "secs" | "second" | "seconds" => ok_as(Duration::from_secs), + "m" | "min" | "mins" | "minute" | "minutes" => ok_as(Duration::from_mins), + "h" | "hour" | "hours" => ok_as(Duration::from_hours), + "d" | "day" | "days" => ok_as(|d: u64| Duration::from_hours(d * 24)), unit => Err(format!( "unit {} not recognised. must be one of s/sec/seconds, m/min/minutes, h/hours, d/days", unit @@ -42,8 +42,6 @@ pub fn parse_duration(d: &str) -> Result { #[cfg(test)] mod tests { - use chrono::TimeDelta; - use super::*; #[test] @@ -53,13 +51,13 @@ mod tests { #[test] fn parse_duration_test() { - assert_eq!(parse_duration("1s"), Ok(TimeDelta::seconds(1))); - assert_eq!(parse_duration("12s"), Ok(TimeDelta::seconds(12))); - assert_eq!(parse_duration(" 12 secs "), Ok(TimeDelta::seconds(12))); - assert_eq!(parse_duration("2m"), Ok(TimeDelta::minutes(2))); - assert_eq!(parse_duration("6 hours"), Ok(TimeDelta::hours(6))); - assert_eq!(parse_duration("1d"), Ok(TimeDelta::days(1))); - assert_eq!(parse_duration("365d"), Ok(TimeDelta::days(365))); + assert_eq!(parse_duration("1s"), Ok(Duration::from_secs(1))); + assert_eq!(parse_duration("12s"), Ok(Duration::from_secs(12))); + assert_eq!(parse_duration(" 12 secs "), Ok(Duration::from_secs(12))); + assert_eq!(parse_duration("2m"), Ok(Duration::from_mins(2))); + assert_eq!(parse_duration("6 hours"), Ok(Duration::from_hours(6))); + assert_eq!(parse_duration("1d"), Ok(Duration::from_hours(1 * 24))); + assert_eq!(parse_duration("365d"), Ok(Duration::from_hours(365 * 24))); assert!(parse_duration("d 3").is_err()); assert!(parse_duration("d3").is_err()); diff --git a/src/concepts/action.rs b/src/concepts/action.rs index cbad05c..4d5f765 100644 --- a/src/concepts/action.rs +++ b/src/concepts/action.rs @@ -1,6 +1,4 @@ -use std::{cmp::Ordering, collections::BTreeSet, fmt::Display, sync::Arc}; - -use chrono::TimeDelta; +use std::{cmp::Ordering, collections::BTreeSet, fmt::Display, sync::Arc, time::Duration}; use reaction_plugin::time::parse_duration; use serde::{Deserialize, Serialize}; @@ -19,7 +17,7 @@ pub struct Action { #[serde(skip_serializing_if = "Option::is_none")] pub after: Option, #[serde(skip)] - pub after_duration: Option, + pub after_duration: Option, #[serde( rename = "onexit", diff --git a/src/concepts/filter.rs b/src/concepts/filter.rs index d3932c6..52fc41f 100644 --- a/src/concepts/filter.rs +++ b/src/concepts/filter.rs @@ -4,9 +4,9 @@ use std::{ fmt::Display, hash::Hash, sync::Arc, + time::Duration, }; -use chrono::TimeDelta; use reaction_plugin::time::parse_duration; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -30,7 +30,7 @@ pub enum Duplicate { #[serde(deny_unknown_fields)] pub struct Filter { #[serde(skip)] - pub longuest_action_duration: TimeDelta, + pub longuest_action_duration: Duration, #[serde(skip)] pub has_ip: bool, @@ -47,7 +47,7 @@ pub struct Filter { #[serde(rename = "retryperiod", skip_serializing_if = "Option::is_none")] pub retry_period: Option, #[serde(skip)] - pub retry_duration: Option, + pub retry_duration: Option, #[serde(default)] pub duplicate: Duplicate, @@ -187,10 +187,12 @@ impl Filter { .any(|action| action.ipv4only || action.ipv6only); self.longuest_action_duration = - self.actions.values().fold(TimeDelta::seconds(0), |acc, v| { - v.after_duration - .map_or(acc, |v| if v > acc { v } else { acc }) - }); + self.actions + .values() + .fold(Duration::from_secs(0), |acc, v| { + v.after_duration + .map_or(acc, |v| if v > acc { v } else { acc }) + }); Ok(()) } @@ -480,14 +482,14 @@ pub mod tests { let name = "name".to_string(); let empty_patterns = Patterns::new(); let minute_str = "1m".to_string(); - let minute = TimeDelta::seconds(60); - let two_minutes = TimeDelta::seconds(60 * 2); + let minute = Duration::from_secs(60); + let two_minutes = Duration::from_secs(60 * 2); let two_minutes_str = "2m".to_string(); // duration 0 filter = ok_filter(); filter.setup(&name, &name, &empty_patterns).unwrap(); - assert_eq!(filter.longuest_action_duration, TimeDelta::default()); + assert_eq!(filter.longuest_action_duration, Duration::default()); let minute_action = ok_action_with_after(minute_str.clone(), &minute_str); diff --git a/src/concepts/mod.rs b/src/concepts/mod.rs index 215be2b..761f9f1 100644 --- a/src/concepts/mod.rs +++ b/src/concepts/mod.rs @@ -5,7 +5,11 @@ mod pattern; mod plugin; mod stream; -use std::fmt::Debug; +use std::{ + fmt::{self, Debug}, + ops::{Add, Deref, Sub}, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; pub use action::Action; pub use config::{Config, Patterns}; @@ -16,9 +20,116 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; pub use stream::Stream; -use chrono::{DateTime, Local}; +/// [`std::time::Duration`] since [`std::time::UNIX_EPOCH`] +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct Time(Duration); +impl Deref for Time { + type Target = Duration; + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl From for Time { + fn from(value: Duration) -> Self { + Time(value) + } +} +impl Into for Time { + fn into(self) -> Duration { + self.0 + } +} +impl Add for Time { + type Output = Time; + fn add(self, rhs: Duration) -> Self::Output { + Time(self.0 + rhs) + } +} +impl Add