WIP duplicates

- new duplicate option
- change triggers Tree structure to keep O(log(n)) querying:
  now we need to know if a match already has a trigger.
- triggers migration
- triggers adaptations in State & FilterManager
This commit is contained in:
ppom 2025-06-28 12:00:00 +02:00
commit 881fc76bf9
No known key found for this signature in database
7 changed files with 151 additions and 47 deletions

View file

@ -89,6 +89,8 @@ streams:
# - h / hour / hours
# - d / day / days
retryperiod: 6h
# duplicates!
duplicate: rerun
# actions are run by the filter when regexes are matched
actions:
# actions have a user-defined name

View file

@ -14,6 +14,17 @@ use tracing::info;
use super::parse_duration;
use super::{Action, Match, Pattern, Patterns};
#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
pub enum Duplicate {
#[default]
#[serde(rename = "extend")]
Extend,
#[serde(rename = "ignore")]
Ignore,
#[serde(rename = "rerun")]
Rerun,
}
// Only names are serialized
// Only computed fields are not deserialized
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
@ -37,6 +48,9 @@ pub struct Filter {
#[serde(skip)]
retry_duration: Option<TimeDelta>,
#[serde(default)]
duplicate: Duplicate,
actions: BTreeMap<String, Action>,
#[serde(skip)]
@ -93,6 +107,10 @@ impl Filter {
&self.regex
}
pub fn duplicate(&self) -> Duplicate {
self.duplicate
}
pub fn actions(&self) -> &BTreeMap<String, Action> {
&self.actions
}

View file

@ -7,7 +7,7 @@ mod stream;
pub use action::Action;
pub use config::{Config, Patterns};
pub use filter::Filter;
pub use filter::{Duplicate, Filter};
use parse_duration::parse_duration;
pub use pattern::Pattern;
use serde::{Deserialize, Serialize};

View file

@ -14,7 +14,7 @@ use tokio::sync::Semaphore;
use tracing::{error, info};
use crate::{
concepts::{Action, Filter, Match, Pattern, Time},
concepts::{Action, Duplicate, Filter, Match, Pattern, Time},
protocol::{Order, PatternStatus},
treedb::Database,
};
@ -89,7 +89,11 @@ impl FilterManager {
let mut state = self.state.lock().unwrap();
state.clear_past_matches(now);
let exec = match self.filter.retry() {
if let Duplicate::Ignore = self.filter.duplicate() {
if state.triggers.contains_key(&m) {}
}
let trigger = match self.filter.retry() {
None => true,
Some(retry) => {
state.add_match(m.clone(), now);
@ -98,6 +102,11 @@ impl FilterManager {
}
};
let exec = match self.filter.duplicate() {
Duplicate::Rerun => true,
Duplicate::Extend | Duplicate::Ignore => false,
};
if exec {
state.remove_match(&m);
state.add_trigger(m.clone(), now);
@ -179,12 +188,12 @@ impl FilterManager {
.triggers
.keys()
// match filtering
.filter(|match_| is_match(&match_.m))
.filter(|match_| is_match(&match_))
// clone necessary to drop all references to State
.cloned()
.collect::<Vec<_>>();
for mt in cloned_triggers.into_iter() {
for m in cloned_triggers.into_iter() {
// mutable State required here
// Remove the match from the triggers
if let Order::Flush = order {
@ -276,7 +285,7 @@ impl FilterManager {
.values()
// On startup, skip oneshot actions
.filter(|action| !action.oneshot())
.count();
.count() as u64;
#[allow(clippy::unwrap_used)] // propagating panics is ok
let mut state = self.state.lock().unwrap();
@ -284,17 +293,23 @@ impl FilterManager {
let cloned_triggers = state
.triggers
.iter()
.map(|(k, v)| (k.clone(), *v))
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<Vec<_>>();
for (mt, remaining) in cloned_triggers.into_iter() {
if remaining > 0 && mt.t + longuest_action_duration > now {
// Insert back the upcoming times
state.triggers.insert(mt.clone(), number_of_actions as u64);
// Schedule the upcoming times
self.schedule_exec(mt.m, mt.t, now, &mut state, true);
for (m, map) in cloned_triggers.into_iter() {
let mut new_map = BTreeMap::default();
for (t, remaining) in map.into_iter() {
if remaining > 0 && t + longuest_action_duration > now {
// Insert back the upcoming times
new_map.insert(t, number_of_actions);
// Schedule the upcoming times
self.schedule_exec(m.clone(), t, now, &mut state, true);
}
}
if new_map.is_empty() {
state.triggers.remove(&m);
} else {
state.triggers.remove(&mt);
state.triggers.insert(m, new_map);
}
}
}

View file

@ -3,7 +3,7 @@ use std::collections::{BTreeMap, BTreeSet};
use crate::{
concepts::{Filter, Match, MatchTime, Time},
treedb::{
helpers::{to_match, to_matchtime, to_time, to_u64},
helpers::{to_match, to_matchtime, to_time, to_timemap, to_u64},
Database, Tree,
},
};
@ -16,10 +16,18 @@ pub fn filter_ordered_times_db_name(filter: &Filter) -> String {
)
}
pub fn filter_triggers_db_name(filter: &Filter) -> String {
pub fn filter_triggers_old_db_name(filter: &Filter) -> String {
format!("filter_triggers_{}.{}", filter.stream_name(), filter.name())
}
pub fn filter_triggers_db_name(filter: &Filter) -> String {
format!(
"filter_triggers2_{}.{}",
filter.stream_name(),
filter.name()
)
}
/// Internal state of a [`FilterManager`].
/// Holds all data on current matches and triggers.
pub struct State {
@ -37,7 +45,7 @@ pub struct State {
pub ordered_times: Tree<Time, Match>,
/// Saves all the current Triggers for this Filter
/// Persisted
pub triggers: Tree<MatchTime, u64>,
pub triggers: Tree<Match, BTreeMap<Time, u64>>,
}
impl State {
@ -47,20 +55,40 @@ impl State {
db: &mut Database,
now: Time,
) -> Result<Self, String> {
let ordered_times = db.open_tree(
filter_ordered_times_db_name(filter),
filter.retry_duration().unwrap_or_default(),
|(key, value)| Ok((to_time(&key)?, to_match(&value)?)),
)?;
let mut triggers = db.open_tree(
filter_triggers_db_name(filter),
filter.retry_duration().unwrap_or_default(),
|(key, value)| Ok((to_match(&key)?, to_timemap(&value)?)),
)?;
if triggers.is_empty() {
let old_triggers = db.open_tree(
filter_triggers_old_db_name(filter),
filter.retry_duration().unwrap_or_default(),
|(key, value)| Ok((to_matchtime(&key)?, to_u64(&value)?)),
)?;
for (mt, n) in old_triggers.iter() {
triggers.fetch_update(mt.m.clone(), |map| {
Some(match map {
None => [(mt.t, *n)].into(),
Some(mut map) => {
map.insert(mt.t, *n);
map
}
})
});
}
}
let mut this = Self {
filter,
has_after,
matches: BTreeMap::new(),
ordered_times: db.open_tree(
filter_ordered_times_db_name(filter),
filter.retry_duration().unwrap_or_default(),
|(key, value)| Ok((to_time(&key)?, to_match(&value)?)),
)?,
triggers: db.open_tree(
filter_triggers_db_name(filter),
filter.longuest_action_duration(),
|(key, value)| Ok((to_matchtime(&key)?, to_u64(&value)?)),
)?,
ordered_times,
triggers,
};
this.clear_past_matches(now);
this.load_matches_from_ordered_times();
@ -77,8 +105,16 @@ impl State {
// We record triggered filters only when there is an action with an `after` directive
if self.has_after {
// Add the (Match, Time) to the triggers map
self.triggers
.insert(MatchTime { m, t }, self.filter.actions().len() as u64);
let n = self.filter.actions().len() as u64;
self.triggers.fetch_update(m, |map| {
Some(match map {
None => [(t, n)].into(),
Some(mut value) => {
value.insert(t, n);
value
}
})
});
}
}
@ -94,9 +130,19 @@ impl State {
/// Completely remove a Match from the triggers
pub fn remove_trigger(&mut self, m: &Match, t: &Time) {
self.triggers.remove(&MatchTime {
m: m.clone(),
t: *t,
// self.triggers.remove(&MatchTime {
// m: m.clone(),
// t: *t,
// });
self.triggers.fetch_update(m.clone(), |map| {
map.and_then(|mut map| {
map.remove(t);
if map.is_empty() {
None
} else {
Some(map)
}
})
});
}
@ -106,13 +152,27 @@ impl State {
if self.has_after {
let mut exec_needed = false;
let mt = MatchTime { m: m.clone(), t };
let count = self.triggers.get(&mt);
let count = self.triggers.get(&mt.m).and_then(|map| map.get(&mt.t)).cloned();
if let Some(count) = count {
exec_needed = true;
if *count <= 1 {
self.triggers.remove(&mt);
if count <= 1 {
self.triggers.fetch_update(mt.m, |map| {
map.and_then(|mut map| {
map.remove(&mt.t);
if map.is_empty() {
None
} else {
Some(map)
}
})
});
} else {
self.triggers.insert(mt, count - 1);
self.triggers.fetch_update(mt.m, |map| {
map.and_then(|mut map| {
map.insert(mt.t, count - 1);
Some(map)
})
});
}
}
exec_needed

View file

@ -1,4 +1,4 @@
use std::collections::BTreeSet;
use std::collections::{BTreeMap, BTreeSet};
use chrono::{DateTime, Local};
use serde_json::Value;
@ -15,13 +15,15 @@ pub fn to_u64(val: &Value) -> Result<u64, String> {
val.as_u64().ok_or("not a u64".into())
}
fn string_to_time(val: &str) -> Result<Time, String> {
Ok(DateTime::parse_from_rfc3339(val)
.map_err(|err| err.to_string())?
.with_timezone(&Local))
}
/// Tries to convert a [`Value`] into a [`Time`]
pub fn to_time(val: &Value) -> Result<Time, String> {
Ok(
DateTime::parse_from_rfc3339(val.as_str().ok_or("not a number")?)
.map_err(|err| err.to_string())?
.with_timezone(&Local),
)
Ok(string_to_time(val.as_str().ok_or("not a number")?)?)
}
/// Tries to convert a [`Value`] into a [`Match`]
@ -51,6 +53,15 @@ pub fn to_timeset(val: &Value) -> Result<BTreeSet<Time>, String> {
.collect()
}
/// Tries to convert a [`Value`] into a [`BTreeMap<Time, u64>`]
pub fn to_timemap(val: &Value) -> Result<BTreeMap<Time, u64>, String> {
val.as_object()
.ok_or("not a map")?
.iter()
.map(|(key, value)| Ok((string_to_time(key)?, to_u64(value)?)))
.collect()
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;

View file

@ -359,16 +359,14 @@ impl<K: KeyType, V: ValueType> Tree<K, V> {
/// Returning None removes the item if it existed before.
/// Asynchronously persisted.
/// *API design borrowed from [`fjall::WriteTransaction::fetch_update`].*
pub fn fetch_update<F: FnMut(Option<&V>) -> Option<V>>(
pub fn fetch_update<F: FnMut(Option<V>) -> Option<V>>(
&mut self,
key: K,
mut f: F,
) -> Option<V> {
let old_value = self.get(&key);
let old_value = self.remove(&key);
let new_value = f(old_value);
if old_value != new_value.as_ref() {
self.log(&key, new_value.as_ref());
}
self.log(&key, new_value.as_ref());
if let Some(new_value) = new_value {
self.tree.insert(key, new_value)
} else {