This commit is contained in:
ppom 2025-05-03 12:00:00 +02:00
commit 3b6c352204
No known key found for this signature in database
4 changed files with 117 additions and 332 deletions

View file

@ -110,67 +110,101 @@ impl FilterManager {
};
let read_txn = self.env.read_txn().unwrap();
let mut cs: BTreeMap<_, _> = self
.matches
.iter(&read_txn)
.unwrap()
.map(|result| result.unwrap())
// aggregate by key
.fold(BTreeMap::new(), |acc, (k, v)| {
let // TODO
})
// match filtering
.filter(|(match_, _)| is_match(match_))
// FIXME aggregate keys before this map
.map(|(match_, times)| {
if let Order::Flush = order {
self.remove_match(&match_);
let mut cs = BTreeMap::new();
{
let mut last_key: Option<Match> = None;
let mut n_times = 0;
// let mut insert_last_key = |last_key: &Option<&Match>, n_times: usize| {
// if let Some(last_key) = last_key {
// if let Order::Flush = order {
// self.remove_match(&last_key);
// }
// let last_key = *(last_key.clone());
// cs.insert(
// last_key,
// PatternStatus {
// matches: n_times,
// ..Default::default()
// },
// );
// }
// };
for (k, _) in self
.matches
.iter(&read_txn)
.unwrap()
.map(|result| result.unwrap())
// match filtering
.filter(|(match_, _)| is_match(match_))
{
if last_key.clone().is_some_and(|last_key| last_key == k) {
n_times += 1;
} else {
// insert_last_key(&last_key, n_times);
if let Some(last_key) = last_key {
if let Order::Flush = order {
self.remove_match(&last_key);
}
cs.insert(
last_key.clone(),
PatternStatus {
matches: n_times,
..Default::default()
},
);
}
n_times = 0;
last_key = Some(k);
}
(
match_,
}
if let Some(last_key) = last_key {
if let Order::Flush = order {
self.remove_match(&last_key);
}
cs.insert(
last_key.clone(),
PatternStatus {
// matches: times.len(),
matches: 1,
matches: n_times,
..Default::default()
},
)
})
.collect();
);
}
// insert_last_key(&last_key, n_times);
}
for (mt, times) in self
for (mt, _) in self
.triggers
.iter(&read_txn)
.unwrap()
.map(|result| result.unwrap())
// match filtering
.filter(|(match_, _)| is_match(match_))
.filter(|(match_, _)| is_match(&match_.m))
{
// Remove the match from the triggers
if let Order::Flush = order {
// FIXME delete specific (Match, Time) tuple
// self.remove_trigger(&match_);
// delete specific (Match, Time) tuple
self.remove_trigger(&mt.m, &mt.t);
}
let pattern_status = cs.entry(mt.clone()).or_default();
let m = mt.m.clone();
let pattern_status = cs.entry(m).or_default();
for action in self.filter.actions().values() {
let mut action_times = Vec::new();
for time in times.keys() {
let action_time = *time + action.after_duration().unwrap_or_default();
if action_time > now {
action_times.push(action_time.to_rfc3339().chars().take(19).collect());
// Execute the action early
if let Order::Flush = order {
self.exec_now(action, mt.clone());
}
let action_times = pattern_status
.actions
.entry(action.name().into())
.or_default();
let action_time = mt.t + action.after_duration().unwrap_or_default();
if action_time > now {
action_times.push(action_time.to_rfc3339().chars().take(19).collect());
// Execute the action early
if let Order::Flush = order {
self.exec_now(action, mt.m.clone());
}
}
if !action_times.is_empty() {
pattern_status
.actions
.insert(action.name().into(), action_times);
}
}
}
@ -214,7 +248,7 @@ impl FilterManager {
fn add_match(&self, m: &Match, t: Time) {
let mut wtxn = self.env.write_txn().unwrap();
self.matches.put(&mut wtxn, m, &t);
self.matches.put(&mut wtxn, m, &t).unwrap();
self.ordered_times.put(&mut wtxn, &t, m).unwrap();
wtxn.commit().unwrap();
}
@ -224,8 +258,11 @@ impl FilterManager {
if self.has_after {
// Add the (Match, Time) to the triggers map
let mut wtxn = self.env.write_txn().unwrap();
self.triggers
.put(&mut wtxn, &MatchTime { m, t }, &self.filter.actions().len());
self.triggers.put(
&mut wtxn,
&MatchTime { m, t },
&(self.filter.actions().len() as u64),
).unwrap();
wtxn.commit().unwrap();
}
}
@ -237,7 +274,7 @@ impl FilterManager {
if let Some(iter) = self.matches.get_duplicates(&rtxn, m).unwrap() {
for t in iter {
let t = t.unwrap().1;
self.ordered_times.delete(&mut wtxn, &t);
self.ordered_times.delete(&mut wtxn, &t).unwrap();
}
}
self.matches.delete(&mut wtxn, m).unwrap();
@ -254,7 +291,7 @@ impl FilterManager {
m: m.clone(),
t: *t,
},
);
).unwrap();
wtxn.commit().unwrap();
}
@ -269,7 +306,7 @@ impl FilterManager {
if let Some(count) = count {
exec_needed = true;
if count <= 1 {
self.triggers.delete(&mut wtxn, &mt);
self.triggers.delete(&mut wtxn, &mt).unwrap();
}
wtxn.commit().unwrap();
}
@ -291,8 +328,8 @@ impl FilterManager {
#[allow(clippy::unwrap_used)]
// second unwrap: we just checked in the condition that first is_some
let (t, m) = self.ordered_times.first(&wtxn).unwrap().unwrap();
self.ordered_times.delete(&mut wtxn, &t);
self.matches.delete_one_duplicate(&mut wtxn, &m, &t);
self.ordered_times.delete(&mut wtxn, &t).unwrap();
self.matches.delete_one_duplicate(&mut wtxn, &m, &t).unwrap();
wtxn.commit().unwrap();
wtxn = self.env.write_txn().unwrap();
}
@ -314,13 +351,14 @@ impl FilterManager {
let rtxn = self.env.read_txn().unwrap();
let mut wtxn = self.env.write_txn().unwrap();
for (mt, remaining) in self.triggers.iter(&rtxn).unwrap().map(|elt| elt.unwrap()) {
if mt.t + longuest_action_duration > now {
if remaining > 0 && mt.t + longuest_action_duration > now {
// Insert back the upcoming times
self.triggers.put(&mut wtxn, &mt, &number_of_actions);
self.triggers
.put(&mut wtxn, &mt, &(number_of_actions as u64)).unwrap();
// Schedule the upcoming times
self.schedule_exec(mt.m, mt.t, now);
} else {
self.triggers.delete(&mut wtxn, &mt);
self.triggers.delete(&mut wtxn, &mt).unwrap();
}
}
wtxn.commit().unwrap();

View file

@ -1,7 +1,9 @@
use std::collections::BTreeSet;
use std::{collections::BTreeSet, fs::create_dir};
use heed::{
byteorder::LittleEndian, types::{DecodeIgnore, SerdeBincode, Str, U64}, Database, DatabaseFlags, Result
byteorder::LittleEndian,
types::{DecodeIgnore, SerdeBincode, Str, U64},
Database, DatabaseFlags, Env, Result,
};
use serde::{Deserialize, Serialize};
@ -13,6 +15,28 @@ pub struct MatchTime {
pub t: Time,
}
pub fn open_db(config: &Config) -> std::result::Result<Env, String> {
let lmdb_dir = format!("{}/lmdb", config.state_directory());
create_dir(&lmdb_dir).map_err();
// We have no choice but to use unsafe, as the library exposes unsafe code
#[allow(unsafe_code)]
let env = unsafe { heed::EnvOpenOptions::new().open(lmdb_dir) }.map_err(|err| {
format!(
"while opening database at {}: {}",
config.state_directory(),
err
)
})?;
env.cleanup_unused_dbs(config).map_err(|err| {
format!(
"while cleaning database on startup at {}: {}",
config.state_directory(),
err
)
})?;
Ok(env)
}
/// This trait permits to manage in a single place what are the names of [`heed::Database`]s in
/// reaction. It streamlines [`heed::Database`]s opening so that we can reliably open the same
/// Databases in multiple places.

View file

@ -26,7 +26,6 @@ use stream::stream_manager;
mod filter;
mod heedext;
mod shutdown;
mod sledext;
mod socket;
mod stream;
@ -52,18 +51,7 @@ pub async fn daemon(
};
// Open Database
#[allow(unsafe_code)] // We have no choice but to use unsafe,
// as the library exposes unsafe code
let env =
unsafe { heed::EnvOpenOptions::new().open(format!("{}/lmdb", config.state_directory())) }
.map_err(|err| {
format!(
"while opening database at {}: {}",
config.state_directory(),
err
)
})?;
env.cleanup_unused_dbs(config);
let env = open_db(config)?;
// Filter managers
let now = Local::now();

View file

@ -1,265 +0,0 @@
use std::{
collections::{BTreeMap, BTreeSet},
marker::PhantomData,
};
use serde::{de::DeserializeOwned, Serialize};
use sled::Result;
use crate::concepts::{Config, Filter, Match, Time};
/// This trait permits to manage in a single place what are the names of [`sled::Tree`]s in
/// reaction. It streamlines [`sled::Tree`]s opening so that we can reliably open the same Trees in
/// multiple places.
/// It also permits to manage the cleanup of unused trees.
pub trait SledDbExt {
fn open_filter_matches_tree(&self, filter: &Filter) -> Result<Tree<Match, BTreeSet<Time>>>;
fn open_filter_ordered_times_tree(&self, filter: &Filter) -> Result<Tree<Time, Match>>;
fn open_filter_triggers_tree(
&self,
filter: &Filter,
) -> Result<Tree<Match, BTreeMap<Time, usize>>>;
fn cleanup_unused_trees(&self, config: &Config);
}
fn filter_matches_tree_name(filter: &Filter) -> String {
format!("filter_matches_{}.{}", filter.stream_name(), filter.name())
}
fn filter_ordered_times_tree_name(filter: &Filter) -> String {
format!(
"filter_ordered_times_{}.{}",
filter.stream_name(),
filter.name()
)
}
fn filter_triggers_tree_name(filter: &Filter) -> String {
format!("filter_triggers_{}.{}", filter.stream_name(), filter.name())
}
impl SledDbExt for sled::Db {
fn open_filter_matches_tree(&self, filter: &Filter) -> Result<Tree<Match, BTreeSet<Time>>> {
self.open_tree(filter_matches_tree_name(filter).as_bytes())
.map(Tree::new)
}
fn open_filter_ordered_times_tree(&self, filter: &Filter) -> Result<Tree<Time, Match>> {
self.open_tree(filter_ordered_times_tree_name(filter).as_bytes())
.map(Tree::new)
}
fn open_filter_triggers_tree(
&self,
filter: &Filter,
) -> Result<Tree<Match, BTreeMap<Time, usize>>> {
self.open_tree(filter_triggers_tree_name(filter).as_bytes())
.map(Tree::new)
}
fn cleanup_unused_trees(&self, config: &Config) {
let valid_tree_names: BTreeSet<_> = config
.streams()
.values()
// for each filter
.flat_map(|stream| stream.filters().values())
.flat_map(|filter| {
[
filter_matches_tree_name(filter),
filter_ordered_times_tree_name(filter),
filter_triggers_tree_name(filter),
]
})
// plus sled's default map
.chain(std::iter::once("__sled__default".into()))
// convert as IVec which is sled's binary type
.map(|string| sled::IVec::from(string.as_bytes()))
.collect();
// Remove trees that are not in the list of valid trees
for outdated_tree in self
.tree_names()
.into_iter()
.filter(|tree_name| !valid_tree_names.contains(tree_name))
{
self.drop_tree(outdated_tree)
.expect("Fatal error while cleaning DB on startup");
}
}
}
/// This [`sled::Tree`] wrapper permits to have typed Trees and avoid handling the de/serialization in
/// business logic.
/// Key and value types must be [`serde::Serialize`] and [`serde::Deserialize`].
#[derive(Clone)]
pub struct Tree<K: Serialize + DeserializeOwned + Ord, V: Serialize + DeserializeOwned> {
tree: sled::Tree,
_k_marker: PhantomData<K>,
_v_marker: PhantomData<V>,
}
#[allow(clippy::unwrap_used)]
impl<K: Serialize + DeserializeOwned + Ord, V: Serialize + DeserializeOwned> Tree<K, V> {
fn new(tree: sled::Tree) -> Self {
Self {
tree,
_k_marker: PhantomData::<K>,
_v_marker: PhantomData::<V>,
}
}
pub fn get(&self, k: &K) -> Result<Option<V>> {
let k = bincode::serialize(k).unwrap();
Ok(self.tree.get(k)?.map(|v| bincode::deserialize(&v).unwrap()))
}
pub fn remove(&self, key: &K) -> Option<V> {
let key = bincode::serialize(key).unwrap();
self.tree
.remove(key)
.unwrap()
.map(|value| bincode::deserialize(&value).unwrap())
}
pub fn first(&self) -> Result<Option<(K, V)>> {
let option = self.tree.first()?;
match option {
None => Ok(None),
Some((k, v)) => {
let k: K = bincode::deserialize(&k).unwrap();
let v: V = bincode::deserialize(&v).unwrap();
Ok(Some((k, v)))
}
}
}
pub fn pop_min(&self) -> Result<Option<(K, V)>> {
let option = self.tree.pop_min()?;
match option {
None => Ok(None),
Some((k, v)) => {
let k: K = bincode::deserialize(&k).unwrap();
let v: V = bincode::deserialize(&v).unwrap();
Ok(Some((k, v)))
}
}
}
pub fn fetch_and_update<F>(&self, k: &K, mut f: F) -> Result<Option<V>>
where
F: FnMut(Option<V>) -> Option<V>,
{
let k = bincode::serialize(&k).unwrap();
let f = |v: Option<&[u8]>| -> Option<Vec<u8>> {
let v = v.map(|v| bincode::deserialize(v).unwrap());
f(v).map(|v| bincode::serialize(&v).unwrap())
};
Ok(self
.tree
.fetch_and_update(k, f)?
.map(|v| bincode::deserialize::<V>(&v).unwrap()))
}
pub fn insert(&self, k: &K, v: &V) -> Result<Option<V>> {
let k = bincode::serialize(k).unwrap();
let v = bincode::serialize(v).unwrap();
Ok(self
.tree
.insert(k, v)?
.map(|v| bincode::deserialize(&v).unwrap()))
}
// The lifetime annotations permit to decouple the lifetime of self
// from the limetime of the Iterator
#[allow(clippy::needless_lifetimes)] // I find this clearer with 2 lifetimes
pub fn iter<'a, 'b>(&'a self) -> impl Iterator<Item = (K, V)> + 'b {
self.tree.iter().map(|elt| {
let (k, v) = elt.unwrap();
let k: K = bincode::deserialize(&k).unwrap();
let v: V = bincode::deserialize(&v).unwrap();
(k, v)
})
}
#[cfg(test)]
pub fn as_map(&self) -> BTreeMap<K, V> {
self.iter().collect()
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use chrono::{Local, TimeDelta};
use super::SledDbExt;
use crate::{concepts::filter_tests::ok_filter, tests::TempDb};
#[test]
fn tree_crud() {
let filter = ok_filter();
let db = TempDb::default();
let triggers = db.open_filter_triggers_tree(&filter).unwrap();
assert_eq!(BTreeMap::default(), triggers.as_map());
let now = Local::now();
let then = now + TimeDelta::seconds(2);
let k1 = vec!["a".into()];
let k2 = vec!["a".into(), "b".into()];
let v1 = BTreeMap::from([(now, 4)]);
let v2 = BTreeMap::from([(then, 2)]);
let map_1 = BTreeMap::from([(k1.clone(), v1.clone())]);
let map_2 = BTreeMap::from([(k2.clone(), v2.clone())]);
let map_1_2 = BTreeMap::from([(k1.clone(), v1.clone()), (k2.clone(), v2.clone())]);
triggers.insert(&k1, &v1).unwrap();
assert_eq!(triggers.as_map(), map_1);
assert_eq!(triggers.get(&k1).unwrap(), Some(v1.clone()));
assert_eq!(triggers.get(&k2).unwrap(), None);
triggers.insert(&k2, &v2).unwrap();
assert_eq!(triggers.as_map(), map_1_2);
assert_eq!(triggers.get(&k1).unwrap(), Some(v1.clone()));
assert_eq!(triggers.get(&k2).unwrap(), Some(v2.clone()));
assert_eq!(triggers.remove(&k1), Some(v1.clone()));
assert_eq!(triggers.as_map(), map_2);
assert_eq!(triggers.get(&k1).unwrap(), None);
assert_eq!(triggers.get(&k2).unwrap(), Some(v2.clone()));
// Add back
triggers
.fetch_and_update(&k1, |map| {
let mut map = map.unwrap_or_default();
map.insert(now, 4);
Some(map)
})
.unwrap();
assert_eq!(triggers.as_map(), map_1_2);
assert_eq!(triggers.get(&k1).unwrap(), Some(v1.clone()));
assert_eq!(triggers.get(&k2).unwrap(), Some(v2.clone()));
// Remove
triggers
.fetch_and_update(&k1, |map| match map {
Some(_) => None,
None => Some(v1.clone()),
})
.unwrap();
assert_eq!(triggers.as_map(), map_2);
assert_eq!(triggers.get(&k1).unwrap(), None);
assert_eq!(triggers.get(&k2).unwrap(), Some(v2.clone()));
// Remove
triggers.fetch_and_update(&k2, |_| None).unwrap();
assert_eq!(triggers.as_map(), BTreeMap::default());
assert_eq!(triggers.get(&k1).unwrap(), None);
assert_eq!(triggers.get(&k2).unwrap(), None);
}
}