WIP custom Time

Permitting custom de/serialiazing
This commit is contained in:
ppom 2025-06-03 12:00:00 +02:00
commit f1f4c2ea2c
No known key found for this signature in database
9 changed files with 227 additions and 87 deletions

View file

@ -4,6 +4,7 @@ mod filter;
mod parse_duration;
mod pattern;
mod stream;
mod time;
pub use action::Action;
pub use config::{Config, Patterns};
@ -12,10 +13,8 @@ use parse_duration::parse_duration;
pub use pattern::Pattern;
use serde::{Deserialize, Serialize};
pub use stream::Stream;
pub use time::{Time, TimeDelta};
use chrono::{DateTime, Local};
pub type Time = DateTime<Local>;
pub type Match = Vec<String>;
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]

147
src/concepts/time.rs Normal file
View file

@ -0,0 +1,147 @@
use std::ops::{Add, Deref, DerefMut, Sub};
use chrono::{DateTime, Local, TimeZone};
use serde::{de::Visitor, Deserialize, Serialize};
pub use chrono::TimeDelta;
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Time(DateTime<Local>);
impl Time {
pub fn now() -> Time {
Time(Local::now())
}
pub fn timestamp_micros(value: i64) -> Result<Time, String> {
match Local.timestamp_micros(value) {
chrono::offset::LocalResult::Single(time) => Ok(Time(time)),
chrono::offset::LocalResult::Ambiguous(a, b) => Err(format!(
"number can't represent a UNIX timestamp in microseconds. is {a} or {b}"
)),
chrono::offset::LocalResult::None => {
Err("number can't represent a UNIX timestamp in microseconds".into())
}
}
}
}
impl Default for Time {
fn default() -> Self {
Time(Local.timestamp_opt(0, 0).unwrap())
}
}
impl From<DateTime<Local>> for Time {
fn from(value: DateTime<Local>) -> Self {
Time(value)
}
}
impl Deref for Time {
type Target = DateTime<Local>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for Time {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl Add<TimeDelta> for Time {
type Output = Time;
fn add(self, rhs: TimeDelta) -> Self::Output {
Time(self.0 + rhs)
}
}
impl Sub<Time> for Time {
type Output = TimeDelta;
fn sub(self, rhs: Time) -> Self::Output {
self.0 - rhs.0
}
}
impl Sub<TimeDelta> for Time {
type Output = Time;
fn sub(self, rhs: TimeDelta) -> Self::Output {
Time(self.0 - rhs)
}
}
impl Serialize for Time {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_i64(DateTime::timestamp_micros(&self.0))
}
}
struct TimeVisitor;
impl<'de> Visitor<'de> for TimeVisitor {
type Value = Time;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("an i64 representing a UNIX timestamp in microseconds")
}
fn visit_i64<E>(self, value: i64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Time::timestamp_micros(value).map_err(E::custom)
}
}
impl<'de> Deserialize<'de> for Time {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_i64(TimeVisitor)
}
}
#[cfg(test)]
mod tests {
use chrono::Datelike;
use super::Time;
#[test]
fn time_creation() {
let t0 = Time::timestamp_micros(0);
assert!(t0.is_ok());
assert_eq!(t0.unwrap().year(), 1970);
}
#[test]
fn time_serialize() {
let t0 = Time::timestamp_micros(0).unwrap();
let json = serde_json::to_string(&t0).unwrap();
assert_eq!(json, "0");
let t1 = Time::timestamp_micros(12345678912345).unwrap();
let json = serde_json::to_string(&t1).unwrap();
assert_eq!(json, "12345678912345");
}
#[test]
fn time_deserialize() {
// let t0 = "0";
// let des: Time = serde_json::from_str(&t0).unwrap();
// assert_eq!(des, Time::timestamp_micros(0).unwrap());
let t1 = "12345678912345";
let des: Time = serde_json::from_str(&t1).unwrap();
assert_eq!(des, Time::timestamp_micros(12345678912345).unwrap());
}
}

View file

@ -5,13 +5,12 @@ use std::{
time::Duration,
};
use chrono::{Local, TimeDelta};
use tempfile::TempPath;
use tokio::sync::Semaphore;
use super::{filter_ordered_times_db_name, FilterManager, React};
use crate::{
concepts::{Action, Filter, MatchTime, Pattern, Patterns, Time},
concepts::{Action, Filter, MatchTime, Pattern, Patterns, Time, TimeDelta},
daemon::shutdown::ShutdownController,
tests::TempDatabase,
};
@ -111,7 +110,7 @@ async fn three_matches_then_action_then_delayed_action() {
&bed.az_patterns,
);
let bed = bed.part2(filter, Local::now(), None).await;
let bed = bed.part2(filter, Time::now(), None).await;
let now = bed.now;
let now1s = bed.now + TimeDelta::seconds(1);
@ -241,7 +240,7 @@ async fn one_match_one_action() {
&bed.az_patterns,
);
let bed = bed.part2(filter, Local::now(), None).await;
let bed = bed.part2(filter, Time::now(), None).await;
let now = bed.now;
// No match
@ -284,7 +283,7 @@ async fn one_match_one_delayed_action() {
&bed.az_patterns,
);
let bed = bed.part2(filter, Local::now(), None).await;
let bed = bed.part2(filter, Time::now(), None).await;
let now = bed.now;
// No match
@ -355,7 +354,7 @@ async fn one_db_match_one_runtime_match_one_action() {
let mut db = TempDatabase::default().await;
// Pre-add match
let now = Local::now();
let now = Time::now();
let one = vec!["one".to_string()];
let now1s = now - TimeDelta::seconds(1);
@ -418,7 +417,7 @@ async fn one_outdated_db_match() {
let mut db = TempDatabase::default().await;
// Pre-add match
let now = Local::now();
let now = Time::now();
let one = vec!["one".to_string()];
let now1s = now - TimeDelta::milliseconds(1001);

View file

@ -8,7 +8,6 @@ use std::{
},
};
use chrono::Local;
use tokio::{
select,
signal::unix::{signal, SignalKind},
@ -16,7 +15,7 @@ use tokio::{
};
use tracing::{debug, info};
use crate::{concepts::Config, treedb::Database};
use crate::{concepts::{Config, Time}, treedb::Database};
use filter::FilterManager;
pub use shutdown::{ShutdownController, ShutdownDelegate, ShutdownToken};
use socket::socket_manager;
@ -52,7 +51,7 @@ pub async fn daemon(
let mut db = Database::open(config).await?;
// Filter managers
let now = Local::now();
let now = Time::now();
let mut state = HashMap::new();
for stream in config.streams().values() {
let mut filter_managers = HashMap::new();

View file

@ -6,7 +6,6 @@ use std::{
sync::Arc,
};
use chrono::Local;
use futures::{SinkExt, StreamExt};
use regex::Regex;
use tokio::net::UnixListener;
@ -17,7 +16,7 @@ use tokio_util::{
use tracing::{error, warn};
use crate::{
concepts::{Config, Filter, Pattern, Stream},
concepts::{Config, Filter, Pattern, Stream, Time},
protocol::{ClientRequest, ClientStatus, DaemonResponse},
};
@ -88,7 +87,7 @@ fn answer_order(
})
.collect::<Result<BTreeMap<Arc<Pattern>, Regex>, String>>()?;
let now = Local::now();
let now = Time::now();
let cs: ClientStatus = shared_state
.iter()
// stream filtering

View file

@ -1,6 +1,5 @@
use std::{collections::HashMap, process::Stdio, task::Poll, time::Duration};
use chrono::Local;
use futures::{FutureExt, StreamExt};
use tokio::{
io::{AsyncBufReadExt, BufReader, Lines},
@ -11,7 +10,7 @@ use tokio::{
use tracing::{error, info, warn};
use crate::{
concepts::{Filter, Stream},
concepts::{Filter, Stream, Time},
daemon::filter::FilterManager,
};
@ -126,7 +125,7 @@ async fn handle_io(
loop {
match lines.next().await {
Some(Ok(line)) => {
let now = Local::now();
let now = Time::now();
for manager in filter_managers.values() {
manager.handle_line(&line, now);
}

View file

@ -1,6 +1,5 @@
use std::collections::BTreeSet;
use chrono::{DateTime, Local};
use serde_json::Value;
use crate::concepts::{Match, MatchTime, Time};
@ -17,11 +16,7 @@ pub fn to_u64(val: &Value) -> Result<u64, String> {
/// Tries to convert a [`Value`] into a [`Time`]
pub fn to_time(val: &Value) -> Result<Time, String> {
Ok(
DateTime::parse_from_rfc3339(val.as_str().ok_or("not a number")?)
.map_err(|err| err.to_string())?
.with_timezone(&Local),
)
Ok(Time::timestamp_micros(val.as_i64().ok_or("not an i64")?)?)
}
/// Tries to convert a [`Value`] into a [`Match`]
@ -55,7 +50,6 @@ pub fn to_timeset(val: &Value) -> Result<BTreeSet<Time>, String> {
mod tests {
use std::collections::BTreeMap;
use chrono::TimeZone;
use serde_json::Map;
use super::*;
@ -86,14 +80,14 @@ mod tests {
#[test]
fn test_to_time() {
assert_eq!(
to_time(&"1970-01-01T01:02:03.456+01:00".into()).unwrap(),
Local.timestamp_millis_opt(123456).unwrap(),
to_time(&1234567890.into()).unwrap(),
Time::timestamp_micros(1234567890).unwrap(),
);
assert!(to_time(&(u64::MAX.into())).is_err());
assert!(to_time(&i64::MAX.into()).is_err());
assert!(to_time(&(["ploup"].into())).is_err());
assert!(to_time(&(true.into())).is_err());
assert!(to_time(&(12345.into())).is_err());
assert!(to_time(&("12345".into())).is_err());
assert!(to_time(&(None::<String>.into())).is_err());
}
@ -115,22 +109,14 @@ mod tests {
#[test]
fn test_to_timeset() {
assert_eq!(
to_timeset(&(["1970-01-01T01:20:34.567+01:00"].into())),
Ok(BTreeSet::from([Local
.timestamp_millis_opt(1234567)
.unwrap()]))
to_timeset(&([123456789].into())),
Ok(BTreeSet::from([Time::timestamp_micros(123456789).unwrap()]))
);
assert_eq!(
to_timeset(
&([
"1970-01-01T01:00:00.008+01:00",
"1970-01-01T01:02:03.456+01:00"
]
.into())
),
to_timeset(&([8, 123456].into())),
Ok(BTreeSet::from([
Local.timestamp_millis_opt(8).unwrap(),
Local.timestamp_millis_opt(123456).unwrap()
Time::timestamp_micros(8).unwrap(),
Time::timestamp_micros(123456).unwrap()
]))
);
assert!(to_timeset(&[Value::from("plip"), Value::from(10)].into()).is_err());
@ -148,13 +134,13 @@ mod tests {
to_matchtime(&Value::Object(Map::from_iter(
BTreeMap::from([
("m".into(), ["plip", "ploup"].into()),
("t".into(), "1970-01-01T04:25:45.678+01:00".into()),
("t".into(), 12345678.into()),
])
.into_iter()
))),
Ok(MatchTime {
m: vec!["plip".into(), "ploup".into()],
t: Local.timestamp_millis_opt(12345678).unwrap(),
t: Time::timestamp_micros(12345678).unwrap(),
})
);

View file

@ -17,7 +17,6 @@ use std::{
time::Duration,
};
use chrono::{Local, TimeDelta};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value;
use tokio::{
@ -27,7 +26,7 @@ use tokio::{
};
use crate::{
concepts::{Config, Time},
concepts::{Config, Time, TimeDelta},
daemon::ShutdownToken,
};
@ -316,7 +315,7 @@ impl<K: KeyType, V: ValueType> Tree<K, V> {
tree: self.id.clone(),
key: serde_json::to_value(k).expect("could not serialize key"),
value: v.map(|v| serde_json::to_value(v).expect("could not serialize value")),
expiry: Local::now() + self.entry_timeout,
expiry: Time::now() + self.entry_timeout,
};
let tx = self.tx.clone();
// FIXME what if send fails?
@ -384,12 +383,11 @@ mod tests {
path::Path,
};
use chrono::{Local, TimeDelta};
use serde_json::Value;
use tempfile::{NamedTempFile, TempDir};
use tokio::fs::{write, File};
use crate::concepts::Config;
use crate::concepts::{Config, Time, TimeDelta};
use super::{
helpers::*, raw::WriteDB, rotate_db, Database, Entry, KeyType, LoadedDB, Tree, ValueType,
@ -438,7 +436,7 @@ mod tests {
#[tokio::test]
async fn test_rotate_db() {
let now = Local::now();
let now = Time::now();
let expired = now - TimeDelta::seconds(2);
let valid = now + TimeDelta::seconds(2);
@ -539,7 +537,7 @@ mod tests {
#[tokio::test]
async fn test_open_tree() {
let now = Local::now();
let now = Time::now();
let now2 = now + TimeDelta::milliseconds(2);
let now3 = now + TimeDelta::milliseconds(3);

View file

@ -3,7 +3,6 @@ use std::{
io::{Error as IoError, Write},
};
use chrono::{Local, TimeZone};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use thiserror::Error;
@ -13,6 +12,8 @@ use tokio::{
};
use tracing::error;
use crate::concepts::Time;
use super::{Entry, LoadedDB};
const DB_TREE_ID: u64 = 0;
@ -46,7 +47,7 @@ struct WriteEntry<'a> {
#[serde(rename = "v")]
pub value: &'a Option<Value>,
#[serde(rename = "e")]
pub expiry: i64,
pub expiry: Time,
}
/// Entry in custom database format, just read from database
@ -59,7 +60,7 @@ struct ReadEntry {
#[serde(rename = "v")]
pub value: Option<Value>,
#[serde(rename = "e")]
pub expiry: i64,
pub expiry: Time,
}
/// Permits to write entries in a database.
@ -101,7 +102,7 @@ impl WriteDB {
key: &id.into(),
value: &Some(entry.tree.clone().into()),
// Expiry is not used for special entries
expiry: 0,
expiry: Time::timestamp_micros(0).unwrap(),
})
.await?;
@ -112,7 +113,7 @@ impl WriteDB {
tree: tree_id,
key: &entry.key,
value: &entry.value,
expiry: entry.expiry.timestamp_millis(),
expiry: entry.expiry,
})
.await
.map(|bytes_written| bytes_written + written)
@ -205,7 +206,7 @@ impl ReadDB {
}
async fn next(&mut self) -> Result<Option<Entry>, DatabaseError> {
let now = Local::now().timestamp_millis();
let now = Time::now();
// Loop until we get a non-special value
let raw_entry = loop {
self.buffer.clear();
@ -245,7 +246,7 @@ impl ReadDB {
tree: tree.to_owned(),
key: raw_entry.key,
value: raw_entry.value,
expiry: Local.timestamp_millis_opt(raw_entry.expiry).unwrap(),
expiry: raw_entry.expiry,
})),
None => Err(DatabaseError::MissingKeyId(raw_entry.tree)),
}
@ -295,23 +296,25 @@ impl Write for Buffer {
mod tests {
use std::collections::HashMap;
use chrono::{Local, TimeDelta, TimeZone};
use serde_json::Value;
use tempfile::NamedTempFile;
use tokio::fs::{read, write, File};
use crate::treedb::{
raw::{DatabaseError, ReadDB, WriteDB, DB_TREE_ID},
Entry,
use crate::{
concepts::{Time, TimeDelta},
treedb::{
raw::{DatabaseError, ReadDB, WriteDB, DB_TREE_ID},
Entry,
},
};
#[tokio::test]
async fn write_db_write_entry() {
let now = Local::now();
let now = Time::now();
let expired = now - TimeDelta::seconds(2);
let expired_ts = expired.timestamp_millis();
let expired_ts = expired.timestamp_micros();
// let valid = now + TimeDelta::seconds(2);
// let valid_ts = valid.timestamp_millis();
// let valid_ts = valid.timestamp_micros();
let path = NamedTempFile::new().unwrap().into_temp_path();
@ -340,28 +343,39 @@ mod tests {
#[tokio::test]
async fn read_db_next() {
let now = Local::now();
let now = Time::now();
let expired = now - TimeDelta::seconds(2);
let expired_ts = expired.timestamp_millis();
let expired_ts = expired.timestamp_micros();
let valid = now + TimeDelta::seconds(2);
let valid_ts = valid.timestamp_millis();
// Truncate to millisecond precision
let valid = Local.timestamp_millis_opt(valid_ts).unwrap();
let valid_ts = valid.timestamp_micros();
// Truncate to microsecond precision
let valid = Time::timestamp_micros(valid_ts).unwrap();
let default = Time::default().timestamp_micros();
let path = NamedTempFile::new().unwrap().into_temp_path();
println!(
r#"{{"t": {DB_TREE_ID}, "k": 1, "v": "test_tree", "e": {default}}}
{{"t": 1, "k": "key1", "v": 1, "e": {expired_ts}}}
{{"t": 1, "k": "key2", "v": 2, "e": {valid_ts}}}
malformed entry: not json
{{"t": "tree cant be string", "k": "key2", "v": 2, "e": {valid_ts}}}
{{"t": 1, "k": "missing quote, "v": 2, "e": {valid_ts}}}
{{"t": 3, "k": "missing key id", "v": 2, "e": {valid_ts}}}"#
);
write(
&path,
format!(
"{{\"t\": {DB_TREE_ID}, \"k\": 1, \"v\": \"test_tree\", \"e\": 0}}
{{\"t\": 1, \"k\": \"key1\", \"v\": 1, \"e\": {expired_ts}}}
{{\"t\": 1, \"k\": \"key2\", \"v\": 2, \"e\": {valid_ts}}}
r#"{{"t": {DB_TREE_ID}, "k": 1, "v": "test_tree", "e": {default}}}
{{"t": 1, "k": "key1", "v": 1, "e": {expired_ts}}}
{{"t": 1, "k": "key2", "v": 2, "e": {valid_ts}}}
malformed entry: not json
{{\"t\": \"tree cant be string\", \"k\": \"key2\", \"v\": 2, \"e\": {valid_ts}}}
{{\"t\": 1, \"k\": \"missing quote, \"v\": 2, \"e\": {valid_ts}}}
{{\"t\": 3, \"k\": \"missing key id\", \"v\": 2, \"e\": {valid_ts}}}"
{{"t": "tree cant be string", "k": "key2", "v": 2, "e": {valid_ts}}}
{{"t": 1, "k": "missing quote, "v": 2, "e": {valid_ts}}}
{{"t": 3, "k": "missing key id", "v": 2, "e": {valid_ts}}}"#
),
)
.await
@ -400,13 +414,13 @@ mod tests {
#[tokio::test]
async fn read_db_read() {
let now = Local::now();
let now = Time::now();
let expired = now - TimeDelta::seconds(2);
let expired_ts = expired.timestamp_millis();
let expired_ts = expired.timestamp_micros();
let valid = now + TimeDelta::seconds(2);
let valid_ts = valid.timestamp_millis();
let valid_ts = valid.timestamp_micros();
let read_path = NamedTempFile::new().unwrap().into_temp_path();
let write_path = NamedTempFile::new().unwrap().into_temp_path();
@ -414,13 +428,13 @@ mod tests {
write(
&read_path,
format!(
"{{\"t\": {DB_TREE_ID}, \"k\": 1, \"v\": \"test_tree\", \"e\": 0}}
{{\"t\": 1, \"k\": \"key1\", \"v\": 1, \"e\": {expired_ts}}}
{{\"t\": 1, \"k\": \"key2\", \"v\": 2, \"e\": {valid_ts}}}
r#"{{"t": {DB_TREE_ID}, "k": 1, "v": "test_tree", "e": 0}}
{{"t": 1, "k": "key1", "v": 1, "e": {expired_ts}}}
{{"t": 1, "k": "key2", "v": 2, "e": {valid_ts}}}
malformed entry: not json
{{\"t\": \"tree cant be string\", \"k\": \"key2\", \"v\": 2, \"e\": {valid_ts}}}
{{\"t\": 1, \"k\": \"missing quote, \"v\": 2, \"e\": {valid_ts}}}
{{\"t\": 3, \"k\": \"missing key id\", \"v\": 2, \"e\": {valid_ts}}}"
{{"t": "tree cant be string", "k": "key2", "v": 2, "e": {valid_ts}}}
{{"t": 1, "k": "missing quote, "v": 2, "e": {valid_ts}}}
{{"t": 3, "k": "missing key id", "v": 2, "e": {valid_ts}}}"#
),
)
.await
@ -479,7 +493,7 @@ mod tests {
#[tokio::test]
async fn write_then_read_1000() {
// Generate entries
let now = Local::now();
let now = Time::now();
let entries: Vec<_> = (0..1000)
.map(|i| Entry {
tree: format!("tree{}", i % 4),