Reduce usage of chrono

TODO: handle migrations
This commit is contained in:
ppom 2025-12-06 12:00:00 +01:00
commit 79d85c1df1
No known key found for this signature in database
21 changed files with 440 additions and 292 deletions

View file

@ -83,7 +83,7 @@ chrono = { version = "0.4.38", features = ["std", "clock", "serde"] }
futures = "0.3.30"
remoc = { version = "0.18.3" }
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.117"
serde_json = { version = "1.0.117", features = ["arbitrary_precision"] }
tokio = { version = "1.40.0" }
tokio-util = { version = "0.7.12" }
reaction-plugin = { path = "plugins/reaction-plugin" }

1
TODO
View file

@ -1,4 +1,5 @@
Test what happens when a Filter's pattern Set changes (I think it's shitty)
DB: add tests on stress testing (lines should always be in order)
conf: merge filters
plugins: pipe stderr too and wrap errors in logs
plugins: provide treedb storage? omg (add an enum that's either remoc::rch::mpsc or tokio::mpsc)

View file

@ -2,10 +2,9 @@ use std::{
collections::BTreeMap,
net::{SocketAddrV4, SocketAddrV6},
sync::Arc,
time::Duration,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use chrono::{DateTime, Local, Utc};
use futures::future::join_all;
use iroh::{
Endpoint, PublicKey,
@ -19,7 +18,7 @@ use crate::{ActionInit, StreamInit, connection::ConnectionManager, endpoint::End
pub const ALPN: [&[u8]; 1] = ["reaction_cluster_1".as_bytes()];
pub type UtcLine = Arc<(String, DateTime<Utc>)>;
pub type UtcLine = Arc<(String, Duration)>;
pub async fn bind(stream: &StreamInit) -> Result<Endpoint, String> {
// FIXME higher timeouts and keep alive
@ -150,14 +149,15 @@ impl ActionInit {
acc.replace(pattern, &m.match_[i])
})
};
let now = Local::now();
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
if self.self_
&& let Err(err) = own_stream_tx.send((line.clone(), now.clone())).await
{
eprintln!("ERROR while queueing message to be sent to own cluster stream: {err}");
}
let line = Arc::new((line, now.to_utc()));
// TODO to_utc
let line = Arc::new((line, now));
for result in join_all(nodes_tx.iter().map(|tx| tx.send(line.clone()))).await {
if let Err(err) = result {
eprintln!("ERROR while queueing message to be sent to cluster nodes: {err}");

View file

@ -1,6 +1,11 @@
use std::{cmp::max, collections::VecDeque, io::Error as IoError, sync::Arc};
use std::{
cmp::max,
collections::VecDeque,
io::Error as IoError,
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use chrono::{DateTime, Local, TimeDelta, TimeZone, Utc};
use futures::FutureExt;
use iroh::{
Endpoint, EndpointAddr,
@ -24,7 +29,7 @@ use crate::{
const PROTOCOL_VERSION: u32 = 1;
type RemoteLine = (String, DateTime<Utc>);
type RemoteLine = (String, Duration);
type MaybeRemoteLine = Result<Option<RemoteLine>, IoError>;
enum Event {
@ -44,7 +49,7 @@ struct OwnConnection {
line_tx: BufWriter<SendStream>,
line_rx: BufReader<RecvStream>,
next_time: Option<DateTime<Utc>>,
next_time: Option<Duration>,
next_len: Option<usize>,
next_line: Option<Vec<u8>>,
}
@ -67,7 +72,7 @@ impl OwnConnection {
/// Send a line to peer
async fn send_line(&mut self, line: RemoteLine) -> Result<(), std::io::Error> {
self.line_tx.write_i64(line.1.timestamp_micros()).await?;
self.line_tx.write_u64(line.1.as_micros() as u64).await?;
self.line_tx.write_u32(line.0.len() as u32).await?;
self.line_tx.write_all(line.0.as_bytes()).await?;
self.line_tx.flush().await?;
@ -78,7 +83,7 @@ impl OwnConnection {
/// Returns None if we don't have all data yet.
async fn recv_line(&mut self) -> Result<Option<RemoteLine>, std::io::Error> {
if self.next_time.is_none() {
self.next_time = Some(Utc.timestamp_nanos(self.line_rx.read_i64().await?));
self.next_time = Some(Duration::from_micros(self.line_rx.read_u64().await?));
}
// Ok we have next_time.is_some()
if self.next_len.is_none() {
@ -140,7 +145,7 @@ pub struct ConnectionManager {
last_connexion_id: u64,
/// Max duration before we drop pending messages to a node we can't connect to.
message_timeout: TimeDelta,
message_timeout: Duration,
/// Message we receive from actions
message_rx: mpsc::Receiver<UtcLine>,
/// Our queue of messages to send
@ -160,7 +165,7 @@ impl ConnectionManager {
endpoint: Arc<Endpoint>,
connection_tx: mpsc::Sender<Connection>,
connection_rx: mpsc::Receiver<Connection>,
message_timeout: TimeDelta,
message_timeout: Duration,
message_rx: mpsc::Receiver<UtcLine>,
own_cluster_tx: remoc::rch::mpsc::Sender<Line>,
shutdown: ShutdownController,
@ -253,7 +258,7 @@ impl ConnectionManager {
}
async fn drop_timeout_messages(&mut self) {
let now = Utc::now();
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let mut count = 0;
while self
.message_queue
@ -330,7 +335,8 @@ impl ConnectionManager {
self.close_connection(1, b"you closed your stream").await;
}
Ok(Some(line)) => {
let local_time = line.1.with_timezone(&Local);
// TODO from_utc
let local_time = line.1;
if let Err(err) = self.own_cluster_tx.send((line.0.clone(), local_time)).await {
eprintln!(
"ERROR cluster {}: could not send message to reaction stream: {err}",
@ -366,10 +372,8 @@ impl ConnectionManager {
}
Some(message) => match &mut self.connection {
Some(connection) => {
if let Err(err) = connection
.send_line((message.0.clone(), message.1.with_timezone(&Utc)))
.await
{
// TODO to_utc
if let Err(err) = connection.send_line((message.0.clone(), message.1)).await {
eprintln!(
"INFO cluster {}: connection with node {} failed: {err}",
self.cluster_name, self.node_id,

View file

@ -1,9 +1,9 @@
use std::{
collections::{BTreeMap, BTreeSet},
net::{Ipv4Addr, Ipv6Addr, SocketAddr},
time::Duration,
};
use chrono::TimeDelta;
use iroh::{EndpointAddr, PublicKey, SecretKey, TransportAddr};
use reaction_plugin::{
ActionImpl, Exec, Hello, Line, Manifest, PluginInfo, RemoteResult, StreamImpl, Value,
@ -74,7 +74,7 @@ struct StreamInit {
bind_ipv4: Option<Ipv4Addr>,
bind_ipv6: Option<Ipv6Addr>,
secret_key: SecretKey,
message_timeout: TimeDelta,
message_timeout: Duration,
nodes: BTreeMap<PublicKey, EndpointAddr>,
tx: mpsc::Sender<Line>,
}

View file

@ -1,4 +1,6 @@
use reaction_plugin::{Exec, Local, PluginInfo, Value};
use std::time::{SystemTime, UNIX_EPOCH};
use reaction_plugin::{Exec, PluginInfo, Value};
use remoc::rch::oneshot;
use serde_json::json;
@ -178,7 +180,7 @@ async fn run_simple() {
for m in ["test1", "test2", "test3", " a a a aa a a"] {
let (tx, rx) = oneshot::channel();
let time = Local::now();
let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
assert!(
action
.tx
@ -233,7 +235,7 @@ async fn run_two_actions() {
assert!(plugin.finish_setup().await.is_ok());
let time = Local::now();
let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let (tx, rx1) = oneshot::channel();
assert!(

View file

@ -101,9 +101,9 @@ use std::{
error::Error,
fmt::Display,
process::exit,
time::Duration,
};
pub use chrono::{DateTime, Local};
use remoc::{
Connect, rch,
rtc::{self, Server},
@ -258,7 +258,7 @@ impl Into<JValue> for Value {
}
}
pub type Line = (String, DateTime<Local>);
pub type Line = (String, Duration);
#[derive(Debug, Serialize, Deserialize)]
pub struct StreamImpl {
@ -294,7 +294,7 @@ pub struct ActionImpl {
pub struct Exec {
pub match_: Vec<String>,
pub result: rch::oneshot::Sender<Result<(), String>>,
pub time: DateTime<Local>,
pub time: Duration,
}
/// The main loop for a plugin.

View file

@ -1,7 +1,7 @@
use chrono::TimeDelta;
use std::time::Duration;
/// Parses the &str argument as a TimeDelta
/// Returns Ok(TimeDelta) if successful, or Err(String).
/// Parses the &str argument as a Duration
/// Returns Ok(Duration) if successful, or Err(String).
///
/// Format is defined as follows: `<integer> <unit>`
/// - whitespace between the integer and unit is optional
@ -12,7 +12,7 @@ use chrono::TimeDelta;
/// - `m` / `min` / `mins` / `minute` / `minutes`
/// - `h` / `hour` / `hours`
/// - `d` / `day` / `days`
pub fn parse_duration(d: &str) -> Result<TimeDelta, String> {
pub fn parse_duration(d: &str) -> Result<Duration, String> {
let d_trimmed = d.trim();
let chars = d_trimmed.as_bytes();
let mut value = 0;
@ -24,14 +24,14 @@ pub fn parse_duration(d: &str) -> Result<TimeDelta, String> {
if i == 0 {
return Err(format!("duration '{}' doesn't start with digits", d));
}
let ok_as = |func: fn(i64) -> TimeDelta| -> Result<_, String> { Ok(func(value as i64)) };
let ok_as = |func: fn(u64) -> Duration| -> Result<_, String> { Ok(func(value as u64)) };
match d_trimmed[i..].trim() {
"ms" | "millis" | "millisecond" | "milliseconds" => ok_as(TimeDelta::milliseconds),
"s" | "sec" | "secs" | "second" | "seconds" => ok_as(TimeDelta::seconds),
"m" | "min" | "mins" | "minute" | "minutes" => ok_as(TimeDelta::minutes),
"h" | "hour" | "hours" => ok_as(TimeDelta::hours),
"d" | "day" | "days" => ok_as(TimeDelta::days),
"ms" | "millis" | "millisecond" | "milliseconds" => ok_as(Duration::from_millis),
"s" | "sec" | "secs" | "second" | "seconds" => ok_as(Duration::from_secs),
"m" | "min" | "mins" | "minute" | "minutes" => ok_as(Duration::from_mins),
"h" | "hour" | "hours" => ok_as(Duration::from_hours),
"d" | "day" | "days" => ok_as(|d: u64| Duration::from_hours(d * 24)),
unit => Err(format!(
"unit {} not recognised. must be one of s/sec/seconds, m/min/minutes, h/hours, d/days",
unit
@ -42,8 +42,6 @@ pub fn parse_duration(d: &str) -> Result<TimeDelta, String> {
#[cfg(test)]
mod tests {
use chrono::TimeDelta;
use super::*;
#[test]
@ -53,13 +51,13 @@ mod tests {
#[test]
fn parse_duration_test() {
assert_eq!(parse_duration("1s"), Ok(TimeDelta::seconds(1)));
assert_eq!(parse_duration("12s"), Ok(TimeDelta::seconds(12)));
assert_eq!(parse_duration(" 12 secs "), Ok(TimeDelta::seconds(12)));
assert_eq!(parse_duration("2m"), Ok(TimeDelta::minutes(2)));
assert_eq!(parse_duration("6 hours"), Ok(TimeDelta::hours(6)));
assert_eq!(parse_duration("1d"), Ok(TimeDelta::days(1)));
assert_eq!(parse_duration("365d"), Ok(TimeDelta::days(365)));
assert_eq!(parse_duration("1s"), Ok(Duration::from_secs(1)));
assert_eq!(parse_duration("12s"), Ok(Duration::from_secs(12)));
assert_eq!(parse_duration(" 12 secs "), Ok(Duration::from_secs(12)));
assert_eq!(parse_duration("2m"), Ok(Duration::from_mins(2)));
assert_eq!(parse_duration("6 hours"), Ok(Duration::from_hours(6)));
assert_eq!(parse_duration("1d"), Ok(Duration::from_hours(1 * 24)));
assert_eq!(parse_duration("365d"), Ok(Duration::from_hours(365 * 24)));
assert!(parse_duration("d 3").is_err());
assert!(parse_duration("d3").is_err());

View file

@ -1,6 +1,4 @@
use std::{cmp::Ordering, collections::BTreeSet, fmt::Display, sync::Arc};
use chrono::TimeDelta;
use std::{cmp::Ordering, collections::BTreeSet, fmt::Display, sync::Arc, time::Duration};
use reaction_plugin::time::parse_duration;
use serde::{Deserialize, Serialize};
@ -19,7 +17,7 @@ pub struct Action {
#[serde(skip_serializing_if = "Option::is_none")]
pub after: Option<String>,
#[serde(skip)]
pub after_duration: Option<TimeDelta>,
pub after_duration: Option<Duration>,
#[serde(
rename = "onexit",

View file

@ -4,9 +4,9 @@ use std::{
fmt::Display,
hash::Hash,
sync::Arc,
time::Duration,
};
use chrono::TimeDelta;
use reaction_plugin::time::parse_duration;
use regex::Regex;
use serde::{Deserialize, Serialize};
@ -30,7 +30,7 @@ pub enum Duplicate {
#[serde(deny_unknown_fields)]
pub struct Filter {
#[serde(skip)]
pub longuest_action_duration: TimeDelta,
pub longuest_action_duration: Duration,
#[serde(skip)]
pub has_ip: bool,
@ -47,7 +47,7 @@ pub struct Filter {
#[serde(rename = "retryperiod", skip_serializing_if = "Option::is_none")]
pub retry_period: Option<String>,
#[serde(skip)]
pub retry_duration: Option<TimeDelta>,
pub retry_duration: Option<Duration>,
#[serde(default)]
pub duplicate: Duplicate,
@ -187,10 +187,12 @@ impl Filter {
.any(|action| action.ipv4only || action.ipv6only);
self.longuest_action_duration =
self.actions.values().fold(TimeDelta::seconds(0), |acc, v| {
v.after_duration
.map_or(acc, |v| if v > acc { v } else { acc })
});
self.actions
.values()
.fold(Duration::from_secs(0), |acc, v| {
v.after_duration
.map_or(acc, |v| if v > acc { v } else { acc })
});
Ok(())
}
@ -480,14 +482,14 @@ pub mod tests {
let name = "name".to_string();
let empty_patterns = Patterns::new();
let minute_str = "1m".to_string();
let minute = TimeDelta::seconds(60);
let two_minutes = TimeDelta::seconds(60 * 2);
let minute = Duration::from_secs(60);
let two_minutes = Duration::from_secs(60 * 2);
let two_minutes_str = "2m".to_string();
// duration 0
filter = ok_filter();
filter.setup(&name, &name, &empty_patterns).unwrap();
assert_eq!(filter.longuest_action_duration, TimeDelta::default());
assert_eq!(filter.longuest_action_duration, Duration::default());
let minute_action = ok_action_with_after(minute_str.clone(), &minute_str);

View file

@ -5,7 +5,11 @@ mod pattern;
mod plugin;
mod stream;
use std::fmt::Debug;
use std::{
fmt::{self, Debug},
ops::{Add, Deref, Sub},
time::{Duration, SystemTime, UNIX_EPOCH},
};
pub use action::Action;
pub use config::{Config, Patterns};
@ -16,9 +20,116 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
pub use stream::Stream;
use chrono::{DateTime, Local};
/// [`std::time::Duration`] since [`std::time::UNIX_EPOCH`]
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Time(Duration);
impl Deref for Time {
type Target = Duration;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl From<Duration> for Time {
fn from(value: Duration) -> Self {
Time(value)
}
}
impl Into<Duration> for Time {
fn into(self) -> Duration {
self.0
}
}
impl Add<Duration> for Time {
type Output = Time;
fn add(self, rhs: Duration) -> Self::Output {
Time(self.0 + rhs)
}
}
impl Add<Time> for Time {
type Output = Time;
fn add(self, rhs: Time) -> Self::Output {
Time(self.0 + rhs.0)
}
}
impl Sub<Duration> for Time {
type Output = Time;
fn sub(self, rhs: Duration) -> Self::Output {
Time(self.0 - rhs)
}
}
impl Sub<Time> for Time {
type Output = Time;
fn sub(self, rhs: Time) -> Self::Output {
Time(self.0 - rhs.0)
}
}
impl Serialize for Time {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.as_nanos().to_string().serialize(serializer)
}
}
struct TimeVisitor;
impl<'de> serde::de::Visitor<'de> for TimeVisitor {
type Value = Time;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(formatter, "a string representing nanoseconds")
}
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
match s.parse::<u128>() {
Ok(nanos) => Ok(Time(Duration::new(
(nanos / 1_000_000_000) as u64,
(nanos % 1_000_000_000) as u32,
))),
Err(_) => Err(serde::de::Error::invalid_value(
serde::de::Unexpected::Str(s),
&self,
)),
}
}
}
impl<'de> Deserialize<'de> for Time {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_str(TimeVisitor)
}
}
impl Time {
pub fn new(secs: u64, nanos: u32) -> Time {
Time(Duration::new(secs, nanos))
}
pub fn from_hours(hours: u64) -> Time {
Time(Duration::from_hours(hours))
}
pub fn from_mins(mins: u64) -> Time {
Time(Duration::from_mins(mins))
}
pub fn from_secs(secs: u64) -> Time {
Time(Duration::from_secs(secs))
}
pub fn from_millis(millis: u64) -> Time {
Time(Duration::from_millis(millis))
}
pub fn from_nanos(nanos: u64) -> Time {
Time(Duration::from_nanos(nanos))
}
}
pub fn now() -> Time {
Time(SystemTime::now().duration_since(UNIX_EPOCH).unwrap())
}
pub type Time = DateTime<Local>;
pub type Match = Vec<String>;
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]

View file

@ -189,7 +189,9 @@ impl PatternIp {
let cidr_normalized = Cidr::from_str(cidr)?;
let cidr_normalized_string = cidr_normalized.to_string();
if &cidr_normalized_string != cidr {
warn!("CIDR {cidr} should be rewritten in its normalized form: {cidr_normalized_string}");
warn!(
"CIDR {cidr} should be rewritten in its normalized form: {cidr_normalized_string}"
);
}
self.ignore_cidr_normalized.push(cidr_normalized);
}
@ -289,12 +291,11 @@ impl PatternIp {
mod patternip_tests {
use std::net::{Ipv4Addr, Ipv6Addr};
use chrono::Local;
use tokio::{fs::read_to_string, task::JoinSet};
use crate::{
concepts::{Action, Duplicate, Filter, Pattern},
daemon::{tests::TestBed, React},
concepts::{Action, Duplicate, Filter, Pattern, now},
daemon::{React, tests::TestBed},
};
use super::{Cidr, PatternIp, PatternType};
@ -708,9 +709,9 @@ mod patternip_tests {
Duplicate::Ignore,
&bed.ip_patterns,
);
let bed = bed.part2(filter, Local::now(), None).await;
let bed = bed.part2(filter, now(), None).await;
assert_eq!(
bed.manager.handle_line(&line, Local::now()).await,
bed.manager.handle_line(&line, now()).await,
React::Trigger,
"line: {line}"
);

View file

@ -5,6 +5,7 @@ mod state;
use std::{collections::BTreeMap, process::Stdio, sync::Arc};
use chrono::TimeZone;
use reaction_plugin::{ActionImpl, shutdown::ShutdownToken};
use regex::Regex;
use tokio::sync::{Mutex, MutexGuard, Semaphore};
@ -238,12 +239,23 @@ impl FilterManager {
for action in self.filter.filtered_actions_from_match(&m) {
let action_time = t + action.after_duration.unwrap_or_default();
if action_time > now {
// Pretty print time
let time = chrono::Local
.timestamp_opt(
action_time.as_secs() as i64,
action_time.subsec_nanos(),
)
.unwrap()
.to_rfc3339()
.chars()
.take(19)
.collect();
// Insert action
pattern_status
.actions
.entry(action.name.clone())
.or_default()
.push(action_time.to_rfc3339().chars().take(19).collect());
.push(time);
// Execute the action early
if let Order::Flush = order {
@ -297,15 +309,11 @@ impl FilterManager {
let this = self.clone();
let action_impl = self.action_plugins.get(&action.name).cloned();
tokio::spawn(async move {
let dur = (exec_time - now)
.to_std()
// Could cause an error if t + after < now
// In this case, 0 is fine
.unwrap_or_default();
let dur = exec_time - now;
// Wait either for end of sleep
// or reaction exiting
let exiting = tokio::select! {
_ = tokio::time::sleep(dur) => false,
_ = tokio::time::sleep(dur.into()) => false,
_ = this.shutdown.wait() => true,
};
// Exec action if triggered hasn't been already flushed
@ -414,7 +422,7 @@ fn exec_now(
.send(reaction_plugin::Exec {
match_: m,
result: response_tx,
time: t,
time: t.into(),
})
.await
{

View file

@ -3,8 +3,8 @@ use std::collections::{BTreeMap, BTreeSet};
use crate::{
concepts::{Filter, Match, MatchTime, Time},
treedb::{
helpers::{to_match, to_matchtime, to_time, to_timemap, to_u64},
Database, Tree,
helpers::{to_match, to_matchtime, to_time, to_timemap, to_u64},
},
};
@ -41,12 +41,12 @@ pub struct State {
/// I'm pretty confident that Time will always be unique, because it has enough precision.
/// See this code that gives different times, even in a minimal loop:
/// ```rust
/// use chrono::{Local};
/// use reaction::concepts::now;
///
/// let mut res = vec![];
/// for _ in 0..10 {
/// let now = Local::now();
/// res.push(format!("Now: {now}"));
/// let now = now();
/// res.push(format!("Now: {}", now.as_nanos()));
/// }
/// for s in res {
/// println!("{s}");
@ -172,11 +172,7 @@ impl State {
.fetch_update(mt.m, |map| {
map.and_then(|mut map| {
map.remove(&mt.t);
if map.is_empty() {
None
} else {
Some(map)
}
if map.is_empty() { None } else { Some(map) }
})
})
.await;
@ -249,11 +245,10 @@ impl State {
mod tests {
use std::collections::{BTreeMap, HashMap};
use chrono::{DateTime, Local, TimeDelta};
use serde_json::json;
use crate::{
concepts::{filter_tests::ok_filter, Action, Duplicate, Filter, Pattern},
concepts::{Action, Duplicate, Filter, Pattern, Time, filter_tests::ok_filter, now},
daemon::filter::state::State,
tests::TempDatabase,
};
@ -285,15 +280,16 @@ mod tests {
&patterns,
);
let now = DateTime::parse_from_rfc3339("2025-07-10T12:35:00.000+00:00")
.unwrap()
.with_timezone(&Local);
let now_plus_1m = now + TimeDelta::minutes(1);
let now_plus_1m01 = now_plus_1m + TimeDelta::seconds(1);
let now_less_1m = now - TimeDelta::minutes(1);
let now_less_1s = now - TimeDelta::seconds(1);
let now_less_4s = now - TimeDelta::seconds(4);
let now_less_5s = now - TimeDelta::seconds(5);
let now = Time::from_secs(1234567);
// DateTime::parse_from_rfc3339("2025-07-10T12:35:00.000+00:00")
// .unwrap()
// .with_timezone(&Local);
let now_plus_1m = now + Time::from_mins(1);
let now_plus_1m01 = now_plus_1m + Time::from_secs(1);
let now_less_1m = now - Time::from_mins(1);
let now_less_1s = now - Time::from_secs(1);
let now_less_4s = now - Time::from_secs(4);
let now_less_5s = now - Time::from_secs(5);
let triggers = [
// format v1
@ -332,10 +328,10 @@ mod tests {
json!(["one"]),
json!({
// Will stay
now_plus_1m.to_rfc3339(): 1,
now_less_1s.to_rfc3339(): 1,
now_plus_1m.as_nanos().to_string(): 1,
now_less_1s.as_nanos().to_string(): 1,
// Will not get cleaned because it's FilterManager's task
now_less_5s.to_rfc3339(): 1,
now_less_5s.as_nanos().to_string(): 1,
}),
)]),
),
@ -347,13 +343,13 @@ mod tests {
"filter_ordered_times_s1.f1".into(),
HashMap::from([
// Will stay
(now_plus_1m.to_rfc3339().into(), ["one"].into()),
(now_plus_1m01.to_rfc3339().into(), ["one"].into()),
(now_less_1s.to_rfc3339().into(), ["two"].into()), // stays because retry: 2s
(now_plus_1m.as_nanos().to_string().into(), ["one"].into()),
(now_plus_1m01.as_nanos().to_string().into(), ["one"].into()),
(now_less_1s.as_nanos().to_string().into(), ["two"].into()), // stays because retry: 2s
// Will get cleaned
(now_less_4s.to_rfc3339().into(), ["two"].into()),
(now_less_5s.to_rfc3339().into(), ["three"].into()),
(now_less_1m.to_rfc3339().into(), ["two"].into()),
(now_less_4s.as_nanos().to_string().into(), ["two"].into()),
(now_less_5s.as_nanos().to_string().into(), ["three"].into()),
(now_less_1m.as_nanos().to_string().into(), ["two"].into()),
]),
),
trigger_db,
@ -397,11 +393,9 @@ mod tests {
let one = vec!["one".into()];
let now = DateTime::parse_from_rfc3339("2025-07-10T12:35:00.000+00:00")
.unwrap()
.with_timezone(&Local);
let now_less_1s = now - TimeDelta::seconds(1);
let now_less_4s = now - TimeDelta::seconds(4);
let now = Time::from_secs(1234567);
let now_less_1s = now - Time::from_secs(1);
let now_less_4s = now - Time::from_secs(4);
let mut db = TempDatabase::default().await;
let mut state = State::new(filter, &mut db, now).await.unwrap();
@ -442,7 +436,7 @@ mod tests {
let filter = Box::leak(Box::new(ok_filter()));
let one = vec!["one".into()];
let now = Local::now();
let now = now();
let mut db = TempDatabase::default().await;
let mut state = State::new(filter, &mut db, now).await.unwrap();
@ -495,8 +489,8 @@ mod tests {
);
let one = vec!["one".into()];
let now = Local::now();
let now_plus_1s = now + TimeDelta::seconds(1);
let now = now();
let now_plus_1s = now + Time::from_secs(1);
let mut db = TempDatabase::default().await;
let mut state = State::new(filter, &mut db, now).await.unwrap();

View file

@ -5,7 +5,6 @@ use std::{
time::Duration,
};
use chrono::{Local, TimeDelta};
use reaction_plugin::shutdown::ShutdownController;
use serde_json::json;
use tempfile::TempPath;
@ -16,7 +15,7 @@ use super::{
state::{filter_ordered_times_db_name, filter_triggers_db_name},
};
use crate::{
concepts::{Action, Duplicate, Filter, Pattern, Patterns, Time},
concepts::{Action, Duplicate, Filter, Pattern, Patterns, Time, now},
daemon::plugin::Plugins,
tests::TempDatabase,
};
@ -175,11 +174,12 @@ async fn three_matches_then_action_then_delayed_action() {
&bed.az_patterns,
);
let bed = bed.part2(filter, Local::now(), None).await;
let now = now();
let bed = bed.part2(filter, now, None).await;
let now = bed.now;
let now1s = bed.now + TimeDelta::seconds(1);
let now2s = bed.now + TimeDelta::seconds(2);
let now1s = bed.now + Time::from_secs(1);
let now2s = bed.now + Time::from_secs(2);
// No match
assert_eq!(
@ -306,8 +306,8 @@ async fn one_match_one_action() {
&bed.az_patterns,
);
let bed = bed.part2(filter, Local::now(), None).await;
let now = bed.now;
let now = now();
let bed = bed.part2(filter, now, None).await;
// No match
assert_eq!(
@ -359,8 +359,8 @@ async fn one_match_one_delayed_action() {
&bed.az_patterns,
);
let bed = bed.part2(filter, Local::now(), None).await;
let now = bed.now;
let now = now();
let bed = bed.part2(filter, now, None).await;
// No match
assert_eq!(
@ -432,13 +432,13 @@ async fn one_db_match_one_runtime_match_one_action() {
);
// Pre-add match
let now = Local::now();
let now = now();
let one = vec!["one".to_string()];
let now1s = now - TimeDelta::seconds(1);
let now1s = now - Time::from_secs(1);
let db = TempDatabase::from_loaded_db(HashMap::from([(
filter_ordered_times_db_name(filter),
HashMap::from([(now1s.to_rfc3339().into(), one.clone().into())]),
HashMap::from([(now1s.as_nanos().to_string().into(), one.clone().into())]),
)]))
.await;
@ -501,13 +501,13 @@ async fn one_outdated_db_match() {
);
// Pre-add match
let now = Local::now();
let now = now();
let one = vec!["one".to_string()];
let now1s = now - TimeDelta::milliseconds(1001);
let now1s = now - Time::from_millis(1001);
let db = TempDatabase::from_loaded_db(HashMap::from([(
filter_ordered_times_db_name(filter),
HashMap::from([(now1s.to_rfc3339().into(), one.clone().into())]),
HashMap::from([(now1s.as_nanos().to_string().into(), one.clone().into())]),
)]))
.await;
@ -558,7 +558,7 @@ async fn trigger_unmatched_pattern() {
&bed.az_patterns,
);
let now = Local::now();
let now = now();
let one = vec!["one".to_string()];
let bed = bed.part2(filter, now, None).await;
@ -632,13 +632,13 @@ async fn trigger_matched_pattern() {
&bed.az_patterns,
);
let now = Local::now();
let now1s = now - TimeDelta::milliseconds(10);
let now = now();
let now1s = now - Time::from_millis(10);
let one = vec!["one".to_string()];
let db = TempDatabase::from_loaded_db(HashMap::from([(
filter_ordered_times_db_name(filter),
HashMap::from([(now1s.to_rfc3339().into(), one.clone().into())]),
HashMap::from([(now1s.as_nanos().to_string().into(), one.clone().into())]),
)]))
.await;
let bed = bed.part2(filter, now, Some(db)).await;
@ -713,9 +713,9 @@ async fn trigger_deduplication_on_start() {
&bed.az_patterns,
);
let now = Local::now();
let now1s = now - TimeDelta::milliseconds(1000);
let now2s = now - TimeDelta::milliseconds(1030);
let now = now();
let now1s = now - Time::from_millis(1000);
let now2s = now - Time::from_millis(1030);
let one = vec!["one".to_string()];
let db = TempDatabase::from_loaded_db(HashMap::from([(
@ -723,8 +723,8 @@ async fn trigger_deduplication_on_start() {
HashMap::from([(
one.clone().into(),
json!({
now1s.to_rfc3339(): 1,
now2s.to_rfc3339(): 1,
now1s.as_nanos().to_string(): 1,
now2s.as_nanos().to_string(): 1,
}),
)]),
)]))
@ -799,15 +799,17 @@ async fn multiple_triggers() {
&bed.az_patterns,
);
let bed = bed.part2(filter, Local::now(), None).await;
let now = now();
let bed = bed.part2(filter, now, None).await;
assert_eq!(
bed.manager.handle_line("test one", Local::now()).await,
bed.manager.handle_line("test one", now).await,
React::Match,
"Duplicate: {dup:?}"
);
let now = crate::concepts::now();
assert_eq!(
bed.manager.handle_line("test one", Local::now()).await,
bed.manager.handle_line("test one", now).await,
React::Trigger,
"Duplicate: {dup:?}"
);
@ -823,8 +825,9 @@ async fn multiple_triggers() {
tokio::time::sleep(Duration::from_millis(50)).await;
let now = crate::concepts::now();
assert_eq!(
bed.manager.handle_line("test one", Local::now()).await,
bed.manager.handle_line("test one", now).await,
match dup {
Duplicate::Ignore => React::Match,
_ => React::Match,
@ -832,8 +835,9 @@ async fn multiple_triggers() {
"Duplicate: {dup:?}"
);
let now = crate::concepts::now();
assert_eq!(
bed.manager.handle_line("test one", Local::now()).await,
bed.manager.handle_line("test one", now).await,
match dup {
Duplicate::Ignore => React::Match,
_ => React::Trigger,
@ -964,10 +968,12 @@ async fn extend_trigger_multiple_after_actions() {
&bed.az_patterns,
);
let bed = bed.part2(filter, Local::now(), None).await;
let now = now();
let bed = bed.part2(filter, now, None).await;
let now = crate::concepts::now();
assert_eq!(
bed.manager.handle_line("test one", Local::now()).await,
bed.manager.handle_line("test one", now).await,
React::Trigger,
);
@ -980,8 +986,9 @@ async fn extend_trigger_multiple_after_actions() {
"Sleep: {second_match_duration:?}"
);
let now = crate::concepts::now();
assert_eq!(
bed.manager.handle_line("test one", Local::now()).await,
bed.manager.handle_line("test one", now).await,
React::Trigger,
);
@ -1045,10 +1052,12 @@ async fn ip_specific() {
&bed.ip_patterns,
);
let bed = bed.part2(filter, Local::now(), None).await;
let now = now();
let bed = bed.part2(filter, now, None).await;
let now = crate::concepts::now();
assert_eq!(
bed.manager.handle_line("test 1.2.3.4", Local::now()).await,
bed.manager.handle_line("test 1.2.3.4", now).await,
React::Trigger,
);
@ -1062,10 +1071,9 @@ async fn ip_specific() {
bed.reset_out_file().await;
let now = crate::concepts::now();
assert_eq!(
bed.manager
.handle_line("test 1:2:3:4:5:6:7:8", Local::now())
.await,
bed.manager.handle_line("test 1:2:3:4:5:6:7:8", now).await,
React::Trigger,
);

View file

@ -8,7 +8,6 @@ use std::{
},
};
use chrono::Local;
use futures::future::join_all;
use reaction_plugin::shutdown::{ShutdownController, ShutdownDelegate, ShutdownToken};
use tokio::{
@ -18,7 +17,10 @@ use tokio::{
};
use tracing::{debug, error, info};
use crate::{concepts::Config, treedb::Database};
use crate::{
concepts::{Config, now},
treedb::Database,
};
use filter::FilterManager;
pub use filter::React;
use plugin::Plugins;
@ -124,7 +126,7 @@ async fn daemon_start(
};
// Filter managers
let now = Local::now();
let now = now();
let mut state = HashMap::new();
let mut stream_managers = Vec::new();
for stream in config.streams.values() {

View file

@ -4,7 +4,6 @@ use std::{
sync::Arc,
};
use chrono::Local;
use futures::{SinkExt, StreamExt};
use reaction_plugin::shutdown::ShutdownToken;
use regex::Regex;
@ -16,7 +15,7 @@ use tokio_util::{
use tracing::{error, warn};
use crate::{
concepts::{Config, Filter, Pattern, Stream},
concepts::{Config, Filter, Pattern, Stream, now},
protocol::{ClientRequest, ClientStatus, DaemonResponse, Order},
};
@ -100,8 +99,7 @@ async fn handle_trigger_order(
}
};
let now = Local::now();
match filter_manager.handle_trigger(patterns, now).await {
match filter_manager.handle_trigger(patterns, now()).await {
Ok(()) => DaemonResponse::Ok(()),
Err(err) => DaemonResponse::Err(err),
}
@ -114,7 +112,7 @@ async fn handle_show_or_flush_order(
order: Order,
shared_state: &HashMap<&'static Stream, HashMap<&'static Filter, FilterManager>>,
) -> DaemonResponse {
let now = Local::now();
let now = now();
let iter = shared_state
.iter()
// stream filtering

View file

@ -3,7 +3,6 @@ use std::{
process::Stdio,
};
use chrono::Local;
use futures::{FutureExt, Stream as AsyncStream, StreamExt, future::join_all};
use reaction_plugin::{StreamImpl, shutdown::ShutdownToken};
use regex::RegexSet;
@ -14,7 +13,7 @@ use tokio::{
use tracing::{debug, error, info};
use crate::{
concepts::{Filter, Stream, Time},
concepts::{Filter, Stream, Time, now},
daemon::{filter::FilterManager, plugin::Plugins, utils::kill_child},
};
@ -104,7 +103,7 @@ impl StreamManager {
pub async fn start(mut self) {
// First start FilterManagers persisted actions
let now = Local::now();
let now = now();
join_all(
self.regex_index_to_filter_manager
.iter()
@ -128,7 +127,7 @@ impl StreamManager {
loop {
match plugin.stream.recv().await {
Ok(Some((line, time))) => {
self.handle_line(line, time).await;
self.handle_line(line, time.into()).await;
}
Err(err) => {
if err.is_final() {
@ -206,7 +205,8 @@ impl StreamManager {
loop {
match lines.next().await {
Some(Ok(line)) => {
self.handle_line(line, Local::now()).await;
let now = now();
self.handle_line(line, now).await;
}
Some(Err(err)) => {
error!(

View file

@ -1,6 +1,8 @@
use std::collections::{BTreeMap, BTreeSet};
use std::{
collections::{BTreeMap, BTreeSet},
time::Duration,
};
use chrono::{DateTime, Local};
use serde_json::Value;
use crate::concepts::{Match, MatchTime, Time};
@ -15,15 +17,18 @@ pub fn to_u64(val: &Value) -> Result<u64, String> {
val.as_u64().ok_or("not a u64".into())
}
fn string_to_time(val: &str) -> Result<Time, String> {
Ok(DateTime::parse_from_rfc3339(val)
.map_err(|err| err.to_string())?
.with_timezone(&Local))
pub fn string_to_time(val: &str) -> Result<Time, String> {
let nanos: u128 = val.parse().map_err(|_| "not a number")?;
Ok(Duration::new(
(nanos / 1_000_000_000) as u64,
(nanos % 1_000_000_000) as u32,
)
.into())
}
/// Tries to convert a [`Value`] into a [`Time`]
pub fn to_time(val: &Value) -> Result<Time, String> {
string_to_time(val.as_str().ok_or("not a datetime")?)
string_to_time(val.as_str().ok_or("not a string number")?)
}
/// Tries to convert a [`Value`] into a [`Match`]
@ -66,7 +71,6 @@ pub fn to_timemap(val: &Value) -> Result<BTreeMap<Time, u64>, String> {
mod tests {
use std::collections::BTreeMap;
use chrono::TimeZone;
use serde_json::Map;
use super::*;
@ -96,15 +100,12 @@ mod tests {
#[test]
fn test_to_time() {
assert_eq!(
to_time(&"1970-01-01T01:02:03.456+01:00".into()).unwrap(),
Local.timestamp_millis_opt(123456).unwrap(),
);
assert_eq!(to_time(&"123456".into()).unwrap(), Time::from_nanos(123456),);
assert!(to_time(&(u64::MAX.into())).is_err());
assert!(to_time(&(["ploup"].into())).is_err());
assert!(to_time(&(true.into())).is_err());
assert!(to_time(&(12345.into())).is_err());
// assert!(to_time(&(12345.into())).is_err());
assert!(to_time(&(None::<String>.into())).is_err());
}
@ -126,22 +127,14 @@ mod tests {
#[test]
fn test_to_timeset() {
assert_eq!(
to_timeset(&(["1970-01-01T01:20:34.567+01:00"].into())),
Ok(BTreeSet::from([Local
.timestamp_millis_opt(1234567)
.unwrap()]))
to_timeset(&Value::from([Value::from("123456789")])),
Ok(BTreeSet::from([Time::from_nanos(123456789)]))
);
assert_eq!(
to_timeset(
&([
"1970-01-01T01:00:00.008+01:00",
"1970-01-01T01:02:03.456+01:00"
]
.into())
),
to_timeset(&Value::from([Value::from("8"), Value::from("123456")])),
Ok(BTreeSet::from([
Local.timestamp_millis_opt(8).unwrap(),
Local.timestamp_millis_opt(123456).unwrap()
Time::from_nanos(8),
Time::from_nanos(123456),
]))
);
assert!(to_timeset(&[Value::from("plip"), Value::from(10)].into()).is_err());
@ -159,40 +152,48 @@ mod tests {
to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([
("m".into(), ["plip", "ploup"].into()),
("t".into(), "1970-01-01T04:25:45.678+01:00".into()),
("t".into(), "12345678".into()),
])
.into_iter()
))),
Ok(MatchTime {
m: vec!["plip".into(), "ploup".into()],
t: Local.timestamp_millis_opt(12345678).unwrap(),
t: Time::from_nanos(12345678),
})
);
assert!(to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([("m".into(), ["plip", "ploup"].into()),]).into_iter()
)))
.is_err());
assert!(
to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([("m".into(), ["plip", "ploup"].into()),]).into_iter()
)))
.is_err()
);
assert!(to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([("t".into(), 12345678.into()),]).into_iter()
)))
.is_err());
assert!(
to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([("t".into(), 12345678.into()),]).into_iter()
)))
.is_err()
);
assert!(to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([("m".into(), "ploup".into()), ("t".into(), 12345678.into()),])
assert!(
to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([("m".into(), "ploup".into()), ("t".into(), 12345678.into()),])
.into_iter()
)))
.is_err()
);
assert!(
to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([
("m".into(), ["plip", "ploup"].into()),
("t".into(), [1234567].into()),
])
.into_iter()
)))
.is_err());
assert!(to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([
("m".into(), ["plip", "ploup"].into()),
("t".into(), [1234567].into()),
])
.into_iter()
)))
.is_err());
)))
.is_err()
);
assert!(to_timeset(&([""].into())).is_err());
assert!(to_timeset(&(["ploup"].into())).is_err());
@ -203,26 +204,25 @@ mod tests {
#[test]
fn test_to_timemap() {
let time1 = "2025-07-10T12:35:00.000+02:00";
let time1_t = DateTime::parse_from_rfc3339(time1)
.unwrap()
.with_timezone(&Local);
let time2 = "2026-08-11T12:36:01.000+02:00";
let time2_t = DateTime::parse_from_rfc3339(time2)
.unwrap()
.with_timezone(&Local);
let time1 = 1234567;
let time1_t = Time::from_nanos(time1);
let time2 = 123456789;
let time2_t = Time::from_nanos(time2);
assert_eq!(
to_timemap(&Value::from_iter([(time2, 1)])),
to_timemap(&Value::from_iter([(time2.to_string(), 1)])),
Ok(BTreeMap::from([(time2_t, 1)]))
);
assert_eq!(
to_timemap(&Value::from_iter([(time1, 4), (time2, 0)])),
Ok(BTreeMap::from([(time1_t, 4), (time2_t, 0)]))
to_timemap(&Value::from_iter([
(time1.to_string(), 4),
(time2.to_string(), 0)
])),
Ok(BTreeMap::from([(time1_t.into(), 4), (time2_t.into(), 0)]))
);
assert!(to_timemap(&Value::from_iter([("1", time2)])).is_err());
assert!(to_timemap(&Value::from_iter([(time2, time2)])).is_err());
assert!(to_timemap(&Value::from_iter([("1-1", time2)])).is_err());
// assert!(to_timemap(&Value::from_iter([(time2.to_string(), time2)])).is_err());
assert!(to_timemap(&Value::from_iter([(time2)])).is_err());
assert!(to_timemap(&Value::from_iter([(1)])).is_err());

View file

@ -17,7 +17,6 @@ use std::{
time::Duration,
};
use chrono::{Local, TimeDelta};
use reaction_plugin::shutdown::ShutdownToken;
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use serde_json::Value;
@ -27,7 +26,7 @@ use tokio::{
time::{MissedTickBehavior, interval},
};
use crate::concepts::{Config, Time};
use crate::concepts::{Config, Time, now};
pub mod helpers;
@ -320,7 +319,7 @@ pub struct Tree<K: KeyType, V: ValueType> {
/// This property permits the database rotation to be `O(n)` in time and `O(1)` in RAM space,
/// `n` being the number of write operations from the last rotation plus the number of new
/// operations.
entry_timeout: TimeDelta,
entry_timeout: Duration,
/// The inner BTreeMap
tree: BTreeMap<K, V>,
/// The sender that permits to asynchronously send write operations to database
@ -335,7 +334,7 @@ impl Database {
pub async fn open_tree<K: KeyType, V: ValueType, F>(
&mut self,
name: String,
entry_timeout: TimeDelta,
entry_timeout: Duration,
map_f: F,
) -> Result<Tree<K, V>, String>
where
@ -404,11 +403,12 @@ impl<K: KeyType, V: ValueType> Deref for Tree<K, V> {
impl<K: KeyType, V: ValueType> Tree<K, V> {
/// Log an [`Entry`] to the [`Database`]
async fn log(&mut self, k: &K, v: Option<&V>) {
let now = now();
let e = Entry {
tree: self.id.clone(),
key: serde_json::to_value(k).expect("could not serialize key"),
value: v.map(|v| serde_json::to_value(v).expect("could not serialize value")),
expiry: Local::now() + self.entry_timeout,
expiry: now + self.entry_timeout,
};
let tx = self.tx.clone();
// FIXME what if send fails?
@ -476,15 +476,15 @@ mod tests {
collections::{BTreeMap, BTreeSet, HashMap},
io::Error as IoError,
path::Path,
time::Duration,
};
use chrono::{Local, TimeDelta};
use reaction_plugin::shutdown::ShutdownController;
use serde_json::Value;
use tempfile::{NamedTempFile, TempDir};
use tokio::fs::{File, write};
use crate::concepts::Config;
use crate::concepts::{Config, Time, now};
use super::{
DB_NAME, Database, DatabaseManager, Entry, KeyType, LoadedDB, Tree, ValueType, helpers::*,
@ -546,10 +546,10 @@ mod tests {
#[tokio::test]
async fn test_rotate_db() {
let now = Local::now();
let now = now();
let expired = now - TimeDelta::seconds(2);
let valid = now + TimeDelta::seconds(2);
let expired = now - Time::from_secs(2);
let valid = now + Time::from_secs(2);
let entries = [
Entry {
@ -647,15 +647,16 @@ mod tests {
#[tokio::test]
async fn test_open_tree() {
let now = Local::now();
let now2 = now + TimeDelta::milliseconds(2);
let now3 = now + TimeDelta::milliseconds(3);
let now = now();
let now_ms = now.to_rfc3339();
let now2_ms = now2.to_rfc3339();
let now3_ms = now3.to_rfc3339();
let now2 = now + Time::from_millis(2);
let now3 = now + Time::from_millis(3);
let valid = now + TimeDelta::seconds(2);
// let now_ms = now.as_nanos().to_string();
// let now2_ms = now2.as_nanos().to_string();
// let now3_ms = now3.as_nanos().to_string();
let valid = now + Time::from_secs(2);
let ip127 = vec!["127.0.0.1".to_string()];
let ip1 = vec!["1.1.1.1".to_string()];
@ -663,44 +664,50 @@ mod tests {
let entries = [
Entry {
tree: "time-match".into(),
key: now_ms.clone().into(),
key: now.as_nanos().to_string().into(),
value: Some(ip127.clone().into()),
expiry: valid,
},
Entry {
tree: "time-match".into(),
key: now2_ms.clone().into(),
key: now2.as_nanos().to_string().into(),
value: Some(ip127.clone().into()),
expiry: valid,
},
Entry {
tree: "time-match".into(),
key: now3_ms.clone().into(),
key: now3.as_nanos().to_string().into(),
value: Some(ip127.clone().into()),
expiry: valid,
},
Entry {
tree: "time-match".into(),
key: now2_ms.clone().into(),
key: now2.as_nanos().to_string().into(),
value: Some(ip127.clone().into()),
expiry: valid,
},
Entry {
tree: "match-timeset".into(),
key: ip127.clone().into(),
value: Some([Value::String(now_ms)].into()),
value: Some([Value::String(now.as_nanos().to_string())].into()),
expiry: valid,
},
Entry {
tree: "match-timeset".into(),
key: ip1.clone().into(),
value: Some([Value::String(now2_ms.clone())].into()),
value: Some([Value::String(now2.as_nanos().to_string())].into()),
expiry: valid,
},
Entry {
tree: "match-timeset".into(),
key: ip1.clone().into(),
value: Some([Value::String(now2_ms.clone()), now3_ms.into()].into()),
value: Some(
[
Value::String(now2.as_nanos().to_string()),
Value::String(now3.as_nanos().to_string()),
]
.into(),
),
expiry: valid,
},
];
@ -721,7 +728,7 @@ mod tests {
let time_match = database
.open_tree(
"time-match".into(),
TimeDelta::seconds(2),
Duration::from_secs(2),
|(key, value)| Ok((to_time(&key)?, to_match(&value)?)),
)
.await
@ -738,7 +745,7 @@ mod tests {
let match_timeset = database
.open_tree(
"match-timeset".into(),
TimeDelta::hours(2),
Duration::from_hours(2),
|(key, value)| Ok((to_match(&key)?, to_timeset(&value)?)),
)
.await
@ -754,7 +761,7 @@ mod tests {
let unknown_tree = database
.open_tree(
"unknown_tree".into(),
TimeDelta::hours(2),
Duration::from_hours(2),
|(key, value)| Ok((to_match(&key)?, to_timeset(&value)?)),
)
.await

View file

@ -1,6 +1,9 @@
use std::{collections::HashMap, io::Error as IoError};
use std::{
collections::HashMap,
io::Error as IoError,
time::{SystemTime, UNIX_EPOCH},
};
use chrono::{Local, TimeZone};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use thiserror::Error;
@ -10,6 +13,8 @@ use tokio::{
};
use tracing::error;
use crate::concepts::Time;
use super::{Entry, LoadedDB};
const DB_TREE_ID: u64 = 0;
@ -43,7 +48,7 @@ struct WriteEntry<'a> {
#[serde(rename = "v")]
pub value: &'a Option<Value>,
#[serde(rename = "e")]
pub expiry: i64,
pub expiry: u64,
}
/// Entry in custom database format, just read from database
@ -56,7 +61,7 @@ struct ReadEntry {
#[serde(rename = "v")]
pub value: Option<Value>,
#[serde(rename = "e")]
pub expiry: i64,
pub expiry: u64,
}
/// Permits to write entries in a database.
@ -109,7 +114,7 @@ impl WriteDB {
tree: tree_id,
key: &entry.key,
value: &entry.value,
expiry: entry.expiry.timestamp_millis(),
expiry: entry.expiry.as_millis() as u64,
})
.await
.map(|bytes_written| bytes_written + written)
@ -176,12 +181,14 @@ impl ReadDB {
Ok(Some(entry)) => {
// Add back in new DB
match write_db.write_entry(&entry).await {
Ok(_) => (),
Err(err) => match err {
SerdeOrIoError::IO(err) => return Err(err),
SerdeOrIoError::Serde(err) => error!("serde should be able to serialize an entry just deserialized: {err}"),
}
}
Ok(_) => (),
Err(err) => match err {
SerdeOrIoError::IO(err) => return Err(err),
SerdeOrIoError::Serde(err) => error!(
"serde should be able to serialize an entry just deserialized: {err}"
),
},
}
// Insert data in RAM
if load_db {
let map: &mut HashMap<Value, Value> =
@ -199,7 +206,10 @@ impl ReadDB {
}
async fn next(&mut self) -> Result<Option<Entry>, DatabaseError> {
let now = Local::now().timestamp_millis();
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
// Loop until we get a non-special value
let raw_entry = loop {
self.buffer.clear();
@ -239,7 +249,7 @@ impl ReadDB {
tree: tree.to_owned(),
key: raw_entry.key,
value: raw_entry.value,
expiry: Local.timestamp_millis_opt(raw_entry.expiry).unwrap(),
expiry: Time::from_millis(raw_entry.expiry),
})),
None => Err(DatabaseError::MissingKeyId(raw_entry.tree)),
}
@ -250,22 +260,24 @@ impl ReadDB {
mod tests {
use std::collections::HashMap;
use chrono::{Local, TimeDelta, TimeZone};
use serde_json::Value;
use tempfile::NamedTempFile;
use tokio::fs::{read, write, File};
use tokio::fs::{File, read, write};
use crate::treedb::{
raw::{DatabaseError, ReadDB, WriteDB, DB_TREE_ID},
Entry,
use crate::{
concepts::{Time, now},
treedb::{
Entry,
raw::{DB_TREE_ID, DatabaseError, ReadDB, WriteDB},
},
};
#[tokio::test]
async fn write_db_write_entry() {
let now = Local::now();
let expired = now - TimeDelta::seconds(2);
let expired_ts = expired.timestamp_millis();
// let valid = now + TimeDelta::seconds(2);
let now = now();
let expired = now - Time::from_secs(2);
let expired_ts = expired.as_millis();
// let valid = now + Time::from_secs(2);
// let valid_ts = valid.timestamp_millis();
let path = NamedTempFile::new().unwrap().into_temp_path();
@ -289,21 +301,23 @@ mod tests {
assert_eq!(
contents,
format!("{{\"t\":0,\"k\":1,\"v\":\"yooo\",\"e\":0}}\n{{\"t\":1,\"k\":\"key1\",\"v\":\"value1\",\"e\":{expired_ts}}}\n")
format!(
"{{\"t\":0,\"k\":1,\"v\":\"yooo\",\"e\":0}}\n{{\"t\":1,\"k\":\"key1\",\"v\":\"value1\",\"e\":{expired_ts}}}\n"
)
);
}
#[tokio::test]
async fn read_db_next() {
let now = Local::now();
let now = now();
let expired = now - TimeDelta::seconds(2);
let expired_ts = expired.timestamp_millis();
let expired = now - Time::from_secs(2);
let expired_ts = expired.as_millis();
let valid = now + TimeDelta::seconds(2);
let valid_ts = valid.timestamp_millis();
let valid = now + Time::from_secs(2);
let valid_ts = valid.as_millis();
// Truncate to millisecond precision
let valid = Local.timestamp_millis_opt(valid_ts).unwrap();
let valid = Time::new(valid.as_secs(), valid.subsec_millis() * 1_000_000);
let path = NamedTempFile::new().unwrap().into_temp_path();
@ -343,13 +357,13 @@ mod tests {
#[tokio::test]
async fn read_db_read() {
let now = Local::now();
let now = now();
let expired = now - TimeDelta::seconds(2);
let expired_ts = expired.timestamp_millis();
let expired = now - Time::from_secs(2);
let expired_ts = expired.as_millis();
let valid = now + TimeDelta::seconds(2);
let valid_ts = valid.timestamp_millis();
let valid = now + Time::from_secs(2);
let valid_ts = valid.as_millis();
let read_path = NamedTempFile::new().unwrap().into_temp_path();
let write_path = NamedTempFile::new().unwrap().into_temp_path();
@ -422,13 +436,13 @@ mod tests {
#[tokio::test]
async fn write_then_read_1000() {
// Generate entries
let now = Local::now();
let now = now();
let entries: Vec<_> = (0..1000)
.map(|i| Entry {
tree: format!("tree{}", i % 4),
key: format!("key{}", i % 10).into(),
value: Some(format!("value{}", i % 10).into()),
expiry: now + TimeDelta::seconds((i % 4) - 1),
expiry: now + Time::from_secs(i % 4) - Time::from_secs(1),
})
.collect();
@ -438,7 +452,7 @@ mod tests {
tree: format!("tree{}", i % 4),
key: format!("key{}", i % 10).into(),
value: None,
expiry: now + TimeDelta::seconds(i % 4),
expiry: now + Time::from_secs(i % 4),
})
.collect();