From 3b6c3522042b329e9d366cdc6d52e7ce1e90516e Mon Sep 17 00:00:00 2001 From: ppom Date: Sat, 3 May 2025 12:00:00 +0200 Subject: [PATCH] WIP --- src/daemon/filter.rs | 144 ++++++++++++++--------- src/daemon/heedext.rs | 28 ++++- src/daemon/mod.rs | 14 +-- src/daemon/sledext.rs | 265 ------------------------------------------ 4 files changed, 118 insertions(+), 333 deletions(-) delete mode 100644 src/daemon/sledext.rs diff --git a/src/daemon/filter.rs b/src/daemon/filter.rs index 489c35c..999d652 100644 --- a/src/daemon/filter.rs +++ b/src/daemon/filter.rs @@ -110,67 +110,101 @@ impl FilterManager { }; let read_txn = self.env.read_txn().unwrap(); - let mut cs: BTreeMap<_, _> = self - .matches - .iter(&read_txn) - .unwrap() - .map(|result| result.unwrap()) - // aggregate by key - .fold(BTreeMap::new(), |acc, (k, v)| { - let // TODO - - }) - // match filtering - .filter(|(match_, _)| is_match(match_)) - // FIXME aggregate keys before this map - .map(|(match_, times)| { - if let Order::Flush = order { - self.remove_match(&match_); + let mut cs = BTreeMap::new(); + { + let mut last_key: Option = None; + let mut n_times = 0; + + // let mut insert_last_key = |last_key: &Option<&Match>, n_times: usize| { + // if let Some(last_key) = last_key { + // if let Order::Flush = order { + // self.remove_match(&last_key); + // } + // let last_key = *(last_key.clone()); + // cs.insert( + // last_key, + // PatternStatus { + // matches: n_times, + // ..Default::default() + // }, + // ); + // } + // }; + + for (k, _) in self + .matches + .iter(&read_txn) + .unwrap() + .map(|result| result.unwrap()) + // match filtering + .filter(|(match_, _)| is_match(match_)) + { + if last_key.clone().is_some_and(|last_key| last_key == k) { + n_times += 1; + } else { + // insert_last_key(&last_key, n_times); + if let Some(last_key) = last_key { + if let Order::Flush = order { + self.remove_match(&last_key); + } + cs.insert( + last_key.clone(), + PatternStatus { + matches: n_times, + ..Default::default() + }, + ); + } + n_times = 0; + last_key = Some(k); } - ( - match_, + } + if let Some(last_key) = last_key { + if let Order::Flush = order { + self.remove_match(&last_key); + } + cs.insert( + last_key.clone(), PatternStatus { - // matches: times.len(), - matches: 1, + matches: n_times, ..Default::default() }, - ) - }) - .collect(); + ); + } + // insert_last_key(&last_key, n_times); + } - for (mt, times) in self + for (mt, _) in self .triggers .iter(&read_txn) .unwrap() .map(|result| result.unwrap()) // match filtering - .filter(|(match_, _)| is_match(match_)) + .filter(|(match_, _)| is_match(&match_.m)) { // Remove the match from the triggers if let Order::Flush = order { - // FIXME delete specific (Match, Time) tuple - // self.remove_trigger(&match_); + // delete specific (Match, Time) tuple + self.remove_trigger(&mt.m, &mt.t); } - let pattern_status = cs.entry(mt.clone()).or_default(); + let m = mt.m.clone(); + let pattern_status = cs.entry(m).or_default(); for action in self.filter.actions().values() { - let mut action_times = Vec::new(); - for time in times.keys() { - let action_time = *time + action.after_duration().unwrap_or_default(); - if action_time > now { - action_times.push(action_time.to_rfc3339().chars().take(19).collect()); - // Execute the action early - if let Order::Flush = order { - self.exec_now(action, mt.clone()); - } + let action_times = pattern_status + .actions + .entry(action.name().into()) + .or_default(); + + let action_time = mt.t + action.after_duration().unwrap_or_default(); + if action_time > now { + action_times.push(action_time.to_rfc3339().chars().take(19).collect()); + // Execute the action early + if let Order::Flush = order { + self.exec_now(action, mt.m.clone()); } } - if !action_times.is_empty() { - pattern_status - .actions - .insert(action.name().into(), action_times); - } } } @@ -214,7 +248,7 @@ impl FilterManager { fn add_match(&self, m: &Match, t: Time) { let mut wtxn = self.env.write_txn().unwrap(); - self.matches.put(&mut wtxn, m, &t); + self.matches.put(&mut wtxn, m, &t).unwrap(); self.ordered_times.put(&mut wtxn, &t, m).unwrap(); wtxn.commit().unwrap(); } @@ -224,8 +258,11 @@ impl FilterManager { if self.has_after { // Add the (Match, Time) to the triggers map let mut wtxn = self.env.write_txn().unwrap(); - self.triggers - .put(&mut wtxn, &MatchTime { m, t }, &self.filter.actions().len()); + self.triggers.put( + &mut wtxn, + &MatchTime { m, t }, + &(self.filter.actions().len() as u64), + ).unwrap(); wtxn.commit().unwrap(); } } @@ -237,7 +274,7 @@ impl FilterManager { if let Some(iter) = self.matches.get_duplicates(&rtxn, m).unwrap() { for t in iter { let t = t.unwrap().1; - self.ordered_times.delete(&mut wtxn, &t); + self.ordered_times.delete(&mut wtxn, &t).unwrap(); } } self.matches.delete(&mut wtxn, m).unwrap(); @@ -254,7 +291,7 @@ impl FilterManager { m: m.clone(), t: *t, }, - ); + ).unwrap(); wtxn.commit().unwrap(); } @@ -269,7 +306,7 @@ impl FilterManager { if let Some(count) = count { exec_needed = true; if count <= 1 { - self.triggers.delete(&mut wtxn, &mt); + self.triggers.delete(&mut wtxn, &mt).unwrap(); } wtxn.commit().unwrap(); } @@ -291,8 +328,8 @@ impl FilterManager { #[allow(clippy::unwrap_used)] // second unwrap: we just checked in the condition that first is_some let (t, m) = self.ordered_times.first(&wtxn).unwrap().unwrap(); - self.ordered_times.delete(&mut wtxn, &t); - self.matches.delete_one_duplicate(&mut wtxn, &m, &t); + self.ordered_times.delete(&mut wtxn, &t).unwrap(); + self.matches.delete_one_duplicate(&mut wtxn, &m, &t).unwrap(); wtxn.commit().unwrap(); wtxn = self.env.write_txn().unwrap(); } @@ -314,13 +351,14 @@ impl FilterManager { let rtxn = self.env.read_txn().unwrap(); let mut wtxn = self.env.write_txn().unwrap(); for (mt, remaining) in self.triggers.iter(&rtxn).unwrap().map(|elt| elt.unwrap()) { - if mt.t + longuest_action_duration > now { + if remaining > 0 && mt.t + longuest_action_duration > now { // Insert back the upcoming times - self.triggers.put(&mut wtxn, &mt, &number_of_actions); + self.triggers + .put(&mut wtxn, &mt, &(number_of_actions as u64)).unwrap(); // Schedule the upcoming times self.schedule_exec(mt.m, mt.t, now); } else { - self.triggers.delete(&mut wtxn, &mt); + self.triggers.delete(&mut wtxn, &mt).unwrap(); } } wtxn.commit().unwrap(); diff --git a/src/daemon/heedext.rs b/src/daemon/heedext.rs index 95dc96a..c12eaac 100644 --- a/src/daemon/heedext.rs +++ b/src/daemon/heedext.rs @@ -1,7 +1,9 @@ -use std::collections::BTreeSet; +use std::{collections::BTreeSet, fs::create_dir}; use heed::{ - byteorder::LittleEndian, types::{DecodeIgnore, SerdeBincode, Str, U64}, Database, DatabaseFlags, Result + byteorder::LittleEndian, + types::{DecodeIgnore, SerdeBincode, Str, U64}, + Database, DatabaseFlags, Env, Result, }; use serde::{Deserialize, Serialize}; @@ -13,6 +15,28 @@ pub struct MatchTime { pub t: Time, } +pub fn open_db(config: &Config) -> std::result::Result { + let lmdb_dir = format!("{}/lmdb", config.state_directory()); + create_dir(&lmdb_dir).map_err(); + // We have no choice but to use unsafe, as the library exposes unsafe code + #[allow(unsafe_code)] + let env = unsafe { heed::EnvOpenOptions::new().open(lmdb_dir) }.map_err(|err| { + format!( + "while opening database at {}: {}", + config.state_directory(), + err + ) + })?; + env.cleanup_unused_dbs(config).map_err(|err| { + format!( + "while cleaning database on startup at {}: {}", + config.state_directory(), + err + ) + })?; + Ok(env) +} + /// This trait permits to manage in a single place what are the names of [`heed::Database`]s in /// reaction. It streamlines [`heed::Database`]s opening so that we can reliably open the same /// Databases in multiple places. diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index da448f8..121900e 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -26,7 +26,6 @@ use stream::stream_manager; mod filter; mod heedext; mod shutdown; -mod sledext; mod socket; mod stream; @@ -52,18 +51,7 @@ pub async fn daemon( }; // Open Database - #[allow(unsafe_code)] // We have no choice but to use unsafe, - // as the library exposes unsafe code - let env = - unsafe { heed::EnvOpenOptions::new().open(format!("{}/lmdb", config.state_directory())) } - .map_err(|err| { - format!( - "while opening database at {}: {}", - config.state_directory(), - err - ) - })?; - env.cleanup_unused_dbs(config); + let env = open_db(config)?; // Filter managers let now = Local::now(); diff --git a/src/daemon/sledext.rs b/src/daemon/sledext.rs deleted file mode 100644 index 927d0da..0000000 --- a/src/daemon/sledext.rs +++ /dev/null @@ -1,265 +0,0 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - marker::PhantomData, -}; - -use serde::{de::DeserializeOwned, Serialize}; - -use sled::Result; - -use crate::concepts::{Config, Filter, Match, Time}; - -/// This trait permits to manage in a single place what are the names of [`sled::Tree`]s in -/// reaction. It streamlines [`sled::Tree`]s opening so that we can reliably open the same Trees in -/// multiple places. -/// It also permits to manage the cleanup of unused trees. -pub trait SledDbExt { - fn open_filter_matches_tree(&self, filter: &Filter) -> Result>>; - fn open_filter_ordered_times_tree(&self, filter: &Filter) -> Result>; - fn open_filter_triggers_tree( - &self, - filter: &Filter, - ) -> Result>>; - - fn cleanup_unused_trees(&self, config: &Config); -} - -fn filter_matches_tree_name(filter: &Filter) -> String { - format!("filter_matches_{}.{}", filter.stream_name(), filter.name()) -} - -fn filter_ordered_times_tree_name(filter: &Filter) -> String { - format!( - "filter_ordered_times_{}.{}", - filter.stream_name(), - filter.name() - ) -} - -fn filter_triggers_tree_name(filter: &Filter) -> String { - format!("filter_triggers_{}.{}", filter.stream_name(), filter.name()) -} - -impl SledDbExt for sled::Db { - fn open_filter_matches_tree(&self, filter: &Filter) -> Result>> { - self.open_tree(filter_matches_tree_name(filter).as_bytes()) - .map(Tree::new) - } - - fn open_filter_ordered_times_tree(&self, filter: &Filter) -> Result> { - self.open_tree(filter_ordered_times_tree_name(filter).as_bytes()) - .map(Tree::new) - } - - fn open_filter_triggers_tree( - &self, - filter: &Filter, - ) -> Result>> { - self.open_tree(filter_triggers_tree_name(filter).as_bytes()) - .map(Tree::new) - } - - fn cleanup_unused_trees(&self, config: &Config) { - let valid_tree_names: BTreeSet<_> = config - .streams() - .values() - // for each filter - .flat_map(|stream| stream.filters().values()) - .flat_map(|filter| { - [ - filter_matches_tree_name(filter), - filter_ordered_times_tree_name(filter), - filter_triggers_tree_name(filter), - ] - }) - // plus sled's default map - .chain(std::iter::once("__sled__default".into())) - // convert as IVec which is sled's binary type - .map(|string| sled::IVec::from(string.as_bytes())) - .collect(); - - // Remove trees that are not in the list of valid trees - for outdated_tree in self - .tree_names() - .into_iter() - .filter(|tree_name| !valid_tree_names.contains(tree_name)) - { - self.drop_tree(outdated_tree) - .expect("Fatal error while cleaning DB on startup"); - } - } -} - -/// This [`sled::Tree`] wrapper permits to have typed Trees and avoid handling the de/serialization in -/// business logic. -/// Key and value types must be [`serde::Serialize`] and [`serde::Deserialize`]. -#[derive(Clone)] -pub struct Tree { - tree: sled::Tree, - _k_marker: PhantomData, - _v_marker: PhantomData, -} - -#[allow(clippy::unwrap_used)] -impl Tree { - fn new(tree: sled::Tree) -> Self { - Self { - tree, - _k_marker: PhantomData::, - _v_marker: PhantomData::, - } - } - - pub fn get(&self, k: &K) -> Result> { - let k = bincode::serialize(k).unwrap(); - Ok(self.tree.get(k)?.map(|v| bincode::deserialize(&v).unwrap())) - } - - pub fn remove(&self, key: &K) -> Option { - let key = bincode::serialize(key).unwrap(); - self.tree - .remove(key) - .unwrap() - .map(|value| bincode::deserialize(&value).unwrap()) - } - - pub fn first(&self) -> Result> { - let option = self.tree.first()?; - match option { - None => Ok(None), - Some((k, v)) => { - let k: K = bincode::deserialize(&k).unwrap(); - let v: V = bincode::deserialize(&v).unwrap(); - Ok(Some((k, v))) - } - } - } - - pub fn pop_min(&self) -> Result> { - let option = self.tree.pop_min()?; - match option { - None => Ok(None), - Some((k, v)) => { - let k: K = bincode::deserialize(&k).unwrap(); - let v: V = bincode::deserialize(&v).unwrap(); - Ok(Some((k, v))) - } - } - } - - pub fn fetch_and_update(&self, k: &K, mut f: F) -> Result> - where - F: FnMut(Option) -> Option, - { - let k = bincode::serialize(&k).unwrap(); - let f = |v: Option<&[u8]>| -> Option> { - let v = v.map(|v| bincode::deserialize(v).unwrap()); - f(v).map(|v| bincode::serialize(&v).unwrap()) - }; - Ok(self - .tree - .fetch_and_update(k, f)? - .map(|v| bincode::deserialize::(&v).unwrap())) - } - - pub fn insert(&self, k: &K, v: &V) -> Result> { - let k = bincode::serialize(k).unwrap(); - let v = bincode::serialize(v).unwrap(); - Ok(self - .tree - .insert(k, v)? - .map(|v| bincode::deserialize(&v).unwrap())) - } - - // The lifetime annotations permit to decouple the lifetime of self - // from the limetime of the Iterator - #[allow(clippy::needless_lifetimes)] // I find this clearer with 2 lifetimes - pub fn iter<'a, 'b>(&'a self) -> impl Iterator + 'b { - self.tree.iter().map(|elt| { - let (k, v) = elt.unwrap(); - let k: K = bincode::deserialize(&k).unwrap(); - let v: V = bincode::deserialize(&v).unwrap(); - (k, v) - }) - } - - #[cfg(test)] - pub fn as_map(&self) -> BTreeMap { - self.iter().collect() - } -} - -#[cfg(test)] -mod tests { - use std::collections::BTreeMap; - - use chrono::{Local, TimeDelta}; - - use super::SledDbExt; - use crate::{concepts::filter_tests::ok_filter, tests::TempDb}; - - #[test] - fn tree_crud() { - let filter = ok_filter(); - let db = TempDb::default(); - let triggers = db.open_filter_triggers_tree(&filter).unwrap(); - assert_eq!(BTreeMap::default(), triggers.as_map()); - - let now = Local::now(); - let then = now + TimeDelta::seconds(2); - - let k1 = vec!["a".into()]; - let k2 = vec!["a".into(), "b".into()]; - - let v1 = BTreeMap::from([(now, 4)]); - let v2 = BTreeMap::from([(then, 2)]); - - let map_1 = BTreeMap::from([(k1.clone(), v1.clone())]); - let map_2 = BTreeMap::from([(k2.clone(), v2.clone())]); - let map_1_2 = BTreeMap::from([(k1.clone(), v1.clone()), (k2.clone(), v2.clone())]); - - triggers.insert(&k1, &v1).unwrap(); - assert_eq!(triggers.as_map(), map_1); - assert_eq!(triggers.get(&k1).unwrap(), Some(v1.clone())); - assert_eq!(triggers.get(&k2).unwrap(), None); - - triggers.insert(&k2, &v2).unwrap(); - assert_eq!(triggers.as_map(), map_1_2); - assert_eq!(triggers.get(&k1).unwrap(), Some(v1.clone())); - assert_eq!(triggers.get(&k2).unwrap(), Some(v2.clone())); - - assert_eq!(triggers.remove(&k1), Some(v1.clone())); - assert_eq!(triggers.as_map(), map_2); - assert_eq!(triggers.get(&k1).unwrap(), None); - assert_eq!(triggers.get(&k2).unwrap(), Some(v2.clone())); - - // Add back - triggers - .fetch_and_update(&k1, |map| { - let mut map = map.unwrap_or_default(); - map.insert(now, 4); - Some(map) - }) - .unwrap(); - assert_eq!(triggers.as_map(), map_1_2); - assert_eq!(triggers.get(&k1).unwrap(), Some(v1.clone())); - assert_eq!(triggers.get(&k2).unwrap(), Some(v2.clone())); - - // Remove - triggers - .fetch_and_update(&k1, |map| match map { - Some(_) => None, - None => Some(v1.clone()), - }) - .unwrap(); - assert_eq!(triggers.as_map(), map_2); - assert_eq!(triggers.get(&k1).unwrap(), None); - assert_eq!(triggers.get(&k2).unwrap(), Some(v2.clone())); - - // Remove - triggers.fetch_and_update(&k2, |_| None).unwrap(); - assert_eq!(triggers.as_map(), BTreeMap::default()); - assert_eq!(triggers.get(&k1).unwrap(), None); - assert_eq!(triggers.get(&k2).unwrap(), None); - } -}