From 881fc76bf9eefc73c8a5394d72b070a5b24c0492 Mon Sep 17 00:00:00 2001 From: ppom Date: Sat, 28 Jun 2025 12:00:00 +0200 Subject: [PATCH] WIP duplicates - new duplicate option - change triggers Tree structure to keep O(log(n)) querying: now we need to know if a match already has a trigger. - triggers migration - triggers adaptations in State & FilterManager --- config/example.yml | 2 + src/concepts/filter.rs | 18 +++++++ src/concepts/mod.rs | 2 +- src/daemon/filter/mod.rs | 41 ++++++++++----- src/daemon/filter/state.rs | 104 +++++++++++++++++++++++++++++-------- src/treedb/helpers.rs | 23 +++++--- src/treedb/mod.rs | 8 ++- 7 files changed, 151 insertions(+), 47 deletions(-) diff --git a/config/example.yml b/config/example.yml index 4bdf39e..74c0cde 100644 --- a/config/example.yml +++ b/config/example.yml @@ -89,6 +89,8 @@ streams: # - h / hour / hours # - d / day / days retryperiod: 6h + # duplicates! + duplicate: rerun # actions are run by the filter when regexes are matched actions: # actions have a user-defined name diff --git a/src/concepts/filter.rs b/src/concepts/filter.rs index 15792ef..7f87f66 100644 --- a/src/concepts/filter.rs +++ b/src/concepts/filter.rs @@ -14,6 +14,17 @@ use tracing::info; use super::parse_duration; use super::{Action, Match, Pattern, Patterns}; +#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)] +pub enum Duplicate { + #[default] + #[serde(rename = "extend")] + Extend, + #[serde(rename = "ignore")] + Ignore, + #[serde(rename = "rerun")] + Rerun, +} + // Only names are serialized // Only computed fields are not deserialized #[derive(Clone, Debug, Default, Deserialize, Serialize)] @@ -37,6 +48,9 @@ pub struct Filter { #[serde(skip)] retry_duration: Option, + #[serde(default)] + duplicate: Duplicate, + actions: BTreeMap, #[serde(skip)] @@ -93,6 +107,10 @@ impl Filter { &self.regex } + pub fn duplicate(&self) -> Duplicate { + self.duplicate + } + pub fn actions(&self) -> &BTreeMap { &self.actions } diff --git a/src/concepts/mod.rs b/src/concepts/mod.rs index a163286..b4a785e 100644 --- a/src/concepts/mod.rs +++ b/src/concepts/mod.rs @@ -7,7 +7,7 @@ mod stream; pub use action::Action; pub use config::{Config, Patterns}; -pub use filter::Filter; +pub use filter::{Duplicate, Filter}; use parse_duration::parse_duration; pub use pattern::Pattern; use serde::{Deserialize, Serialize}; diff --git a/src/daemon/filter/mod.rs b/src/daemon/filter/mod.rs index 918bd7d..117bc30 100644 --- a/src/daemon/filter/mod.rs +++ b/src/daemon/filter/mod.rs @@ -14,7 +14,7 @@ use tokio::sync::Semaphore; use tracing::{error, info}; use crate::{ - concepts::{Action, Filter, Match, Pattern, Time}, + concepts::{Action, Duplicate, Filter, Match, Pattern, Time}, protocol::{Order, PatternStatus}, treedb::Database, }; @@ -89,7 +89,11 @@ impl FilterManager { let mut state = self.state.lock().unwrap(); state.clear_past_matches(now); - let exec = match self.filter.retry() { + if let Duplicate::Ignore = self.filter.duplicate() { + if state.triggers.contains_key(&m) {} + } + + let trigger = match self.filter.retry() { None => true, Some(retry) => { state.add_match(m.clone(), now); @@ -98,6 +102,11 @@ impl FilterManager { } }; + let exec = match self.filter.duplicate() { + Duplicate::Rerun => true, + Duplicate::Extend | Duplicate::Ignore => false, + }; + if exec { state.remove_match(&m); state.add_trigger(m.clone(), now); @@ -179,12 +188,12 @@ impl FilterManager { .triggers .keys() // match filtering - .filter(|match_| is_match(&match_.m)) + .filter(|match_| is_match(&match_)) // clone necessary to drop all references to State .cloned() .collect::>(); - for mt in cloned_triggers.into_iter() { + for m in cloned_triggers.into_iter() { // mutable State required here // Remove the match from the triggers if let Order::Flush = order { @@ -276,7 +285,7 @@ impl FilterManager { .values() // On startup, skip oneshot actions .filter(|action| !action.oneshot()) - .count(); + .count() as u64; #[allow(clippy::unwrap_used)] // propagating panics is ok let mut state = self.state.lock().unwrap(); @@ -284,17 +293,23 @@ impl FilterManager { let cloned_triggers = state .triggers .iter() - .map(|(k, v)| (k.clone(), *v)) + .map(|(k, v)| (k.clone(), v.clone())) .collect::>(); - for (mt, remaining) in cloned_triggers.into_iter() { - if remaining > 0 && mt.t + longuest_action_duration > now { - // Insert back the upcoming times - state.triggers.insert(mt.clone(), number_of_actions as u64); - // Schedule the upcoming times - self.schedule_exec(mt.m, mt.t, now, &mut state, true); + for (m, map) in cloned_triggers.into_iter() { + let mut new_map = BTreeMap::default(); + for (t, remaining) in map.into_iter() { + if remaining > 0 && t + longuest_action_duration > now { + // Insert back the upcoming times + new_map.insert(t, number_of_actions); + // Schedule the upcoming times + self.schedule_exec(m.clone(), t, now, &mut state, true); + } + } + if new_map.is_empty() { + state.triggers.remove(&m); } else { - state.triggers.remove(&mt); + state.triggers.insert(m, new_map); } } } diff --git a/src/daemon/filter/state.rs b/src/daemon/filter/state.rs index 5cb98ce..f72b9c0 100644 --- a/src/daemon/filter/state.rs +++ b/src/daemon/filter/state.rs @@ -3,7 +3,7 @@ use std::collections::{BTreeMap, BTreeSet}; use crate::{ concepts::{Filter, Match, MatchTime, Time}, treedb::{ - helpers::{to_match, to_matchtime, to_time, to_u64}, + helpers::{to_match, to_matchtime, to_time, to_timemap, to_u64}, Database, Tree, }, }; @@ -16,10 +16,18 @@ pub fn filter_ordered_times_db_name(filter: &Filter) -> String { ) } -pub fn filter_triggers_db_name(filter: &Filter) -> String { +pub fn filter_triggers_old_db_name(filter: &Filter) -> String { format!("filter_triggers_{}.{}", filter.stream_name(), filter.name()) } +pub fn filter_triggers_db_name(filter: &Filter) -> String { + format!( + "filter_triggers2_{}.{}", + filter.stream_name(), + filter.name() + ) +} + /// Internal state of a [`FilterManager`]. /// Holds all data on current matches and triggers. pub struct State { @@ -37,7 +45,7 @@ pub struct State { pub ordered_times: Tree, /// Saves all the current Triggers for this Filter /// Persisted - pub triggers: Tree, + pub triggers: Tree>, } impl State { @@ -47,20 +55,40 @@ impl State { db: &mut Database, now: Time, ) -> Result { + let ordered_times = db.open_tree( + filter_ordered_times_db_name(filter), + filter.retry_duration().unwrap_or_default(), + |(key, value)| Ok((to_time(&key)?, to_match(&value)?)), + )?; + let mut triggers = db.open_tree( + filter_triggers_db_name(filter), + filter.retry_duration().unwrap_or_default(), + |(key, value)| Ok((to_match(&key)?, to_timemap(&value)?)), + )?; + if triggers.is_empty() { + let old_triggers = db.open_tree( + filter_triggers_old_db_name(filter), + filter.retry_duration().unwrap_or_default(), + |(key, value)| Ok((to_matchtime(&key)?, to_u64(&value)?)), + )?; + for (mt, n) in old_triggers.iter() { + triggers.fetch_update(mt.m.clone(), |map| { + Some(match map { + None => [(mt.t, *n)].into(), + Some(mut map) => { + map.insert(mt.t, *n); + map + } + }) + }); + } + } let mut this = Self { filter, has_after, matches: BTreeMap::new(), - ordered_times: db.open_tree( - filter_ordered_times_db_name(filter), - filter.retry_duration().unwrap_or_default(), - |(key, value)| Ok((to_time(&key)?, to_match(&value)?)), - )?, - triggers: db.open_tree( - filter_triggers_db_name(filter), - filter.longuest_action_duration(), - |(key, value)| Ok((to_matchtime(&key)?, to_u64(&value)?)), - )?, + ordered_times, + triggers, }; this.clear_past_matches(now); this.load_matches_from_ordered_times(); @@ -77,8 +105,16 @@ impl State { // We record triggered filters only when there is an action with an `after` directive if self.has_after { // Add the (Match, Time) to the triggers map - self.triggers - .insert(MatchTime { m, t }, self.filter.actions().len() as u64); + let n = self.filter.actions().len() as u64; + self.triggers.fetch_update(m, |map| { + Some(match map { + None => [(t, n)].into(), + Some(mut value) => { + value.insert(t, n); + value + } + }) + }); } } @@ -94,9 +130,19 @@ impl State { /// Completely remove a Match from the triggers pub fn remove_trigger(&mut self, m: &Match, t: &Time) { - self.triggers.remove(&MatchTime { - m: m.clone(), - t: *t, + // self.triggers.remove(&MatchTime { + // m: m.clone(), + // t: *t, + // }); + self.triggers.fetch_update(m.clone(), |map| { + map.and_then(|mut map| { + map.remove(t); + if map.is_empty() { + None + } else { + Some(map) + } + }) }); } @@ -106,13 +152,27 @@ impl State { if self.has_after { let mut exec_needed = false; let mt = MatchTime { m: m.clone(), t }; - let count = self.triggers.get(&mt); + let count = self.triggers.get(&mt.m).and_then(|map| map.get(&mt.t)).cloned(); if let Some(count) = count { exec_needed = true; - if *count <= 1 { - self.triggers.remove(&mt); + if count <= 1 { + self.triggers.fetch_update(mt.m, |map| { + map.and_then(|mut map| { + map.remove(&mt.t); + if map.is_empty() { + None + } else { + Some(map) + } + }) + }); } else { - self.triggers.insert(mt, count - 1); + self.triggers.fetch_update(mt.m, |map| { + map.and_then(|mut map| { + map.insert(mt.t, count - 1); + Some(map) + }) + }); } } exec_needed diff --git a/src/treedb/helpers.rs b/src/treedb/helpers.rs index 7833957..ae4db1d 100644 --- a/src/treedb/helpers.rs +++ b/src/treedb/helpers.rs @@ -1,4 +1,4 @@ -use std::collections::BTreeSet; +use std::collections::{BTreeMap, BTreeSet}; use chrono::{DateTime, Local}; use serde_json::Value; @@ -15,13 +15,15 @@ pub fn to_u64(val: &Value) -> Result { val.as_u64().ok_or("not a u64".into()) } +fn string_to_time(val: &str) -> Result { + Ok(DateTime::parse_from_rfc3339(val) + .map_err(|err| err.to_string())? + .with_timezone(&Local)) +} + /// Tries to convert a [`Value`] into a [`Time`] pub fn to_time(val: &Value) -> Result { - Ok( - DateTime::parse_from_rfc3339(val.as_str().ok_or("not a number")?) - .map_err(|err| err.to_string())? - .with_timezone(&Local), - ) + Ok(string_to_time(val.as_str().ok_or("not a number")?)?) } /// Tries to convert a [`Value`] into a [`Match`] @@ -51,6 +53,15 @@ pub fn to_timeset(val: &Value) -> Result, String> { .collect() } +/// Tries to convert a [`Value`] into a [`BTreeMap`] +pub fn to_timemap(val: &Value) -> Result, String> { + val.as_object() + .ok_or("not a map")? + .iter() + .map(|(key, value)| Ok((string_to_time(key)?, to_u64(value)?))) + .collect() +} + #[cfg(test)] mod tests { use std::collections::BTreeMap; diff --git a/src/treedb/mod.rs b/src/treedb/mod.rs index 8809169..f012cf4 100644 --- a/src/treedb/mod.rs +++ b/src/treedb/mod.rs @@ -359,16 +359,14 @@ impl Tree { /// Returning None removes the item if it existed before. /// Asynchronously persisted. /// *API design borrowed from [`fjall::WriteTransaction::fetch_update`].* - pub fn fetch_update) -> Option>( + pub fn fetch_update) -> Option>( &mut self, key: K, mut f: F, ) -> Option { - let old_value = self.get(&key); + let old_value = self.remove(&key); let new_value = f(old_value); - if old_value != new_value.as_ref() { - self.log(&key, new_value.as_ref()); - } + self.log(&key, new_value.as_ref()); if let Some(new_value) = new_value { self.tree.insert(key, new_value) } else {