mirror of
https://framagit.org/ppom/reaction
synced 2026-03-14 12:45:47 +01:00
Transform SledTreeExt into a struct that wraps sled::Tree
This permits to have a stronger type system, as the type of a Tree is then decided at construction, not for every function call. This also makes the code clearer, with much less type annotations.
This commit is contained in:
parent
d7824faf3d
commit
a238c7411f
2 changed files with 87 additions and 68 deletions
|
|
@ -14,20 +14,25 @@ use crate::{
|
|||
protocol::{Order, PatternStatus},
|
||||
};
|
||||
|
||||
use super::{shutdown::ShutdownToken, SledDbExt, SledTreeExt};
|
||||
use super::{shutdown::ShutdownToken, SledDbExt, Tree};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct FilterManager {
|
||||
/// the Filter managed
|
||||
filter: &'static Filter,
|
||||
/// Has the filter at least an action with an after directive?
|
||||
has_after: bool,
|
||||
/// Permits to limit concurrency of actions execution
|
||||
exec_limit: Option<Arc<Semaphore>>,
|
||||
/// Permits to run pending actions on shutdown
|
||||
shutdown: ShutdownToken,
|
||||
/// sled::Tree equivalent to BTreeMap<Match, BTreeSet<Time>>
|
||||
matches: sled::Tree,
|
||||
/// sled::Tree equivalent to BTreeMap<Time, Match>
|
||||
ordered_times: sled::Tree,
|
||||
/// sled::Tree equivalent to BTreeMap<Match, BTreeMap<Time, usize>>
|
||||
triggers: sled::Tree,
|
||||
/// Saves all the current Matches for this Filter
|
||||
matches: Tree<Match, BTreeSet<Time>>,
|
||||
/// Alternative view of the current Matches for O(1) cleaning of old Matches
|
||||
/// without added async Tasks to remove them
|
||||
ordered_times: Tree<Time, Match>,
|
||||
/// Saves all the current Triggers for this Filter
|
||||
triggers: Tree<Match, BTreeMap<Time, usize>>,
|
||||
}
|
||||
|
||||
#[allow(clippy::unwrap_used)]
|
||||
|
|
@ -96,7 +101,7 @@ impl FilterManager {
|
|||
|
||||
let mut cs: BTreeMap<_, _> = self
|
||||
.matches
|
||||
.iter_ext::<Match, BTreeSet<Time>>()
|
||||
.iter()
|
||||
// match filtering
|
||||
.filter(|(match_, _)| is_match(match_))
|
||||
.map(|(match_, times)| {
|
||||
|
|
@ -116,7 +121,7 @@ impl FilterManager {
|
|||
let now = Local::now();
|
||||
for (match_, times) in self
|
||||
.triggers
|
||||
.iter_ext::<Match, BTreeMap<Time, usize>>()
|
||||
.iter()
|
||||
// match filtering
|
||||
.filter(|(match_, _)| is_match(match_))
|
||||
{
|
||||
|
|
@ -189,13 +194,13 @@ impl FilterManager {
|
|||
fn add_match(&self, m: &Match, t: Time) {
|
||||
// FIXME do this in a transaction
|
||||
self.matches
|
||||
.fetch_and_update_ext::<Match, BTreeSet<Time>, _>(m, |set| {
|
||||
.fetch_and_update(m, |set| {
|
||||
let mut set = set.unwrap_or_default();
|
||||
set.insert(t);
|
||||
Some(set)
|
||||
})
|
||||
.unwrap();
|
||||
self.ordered_times.insert_ext::<Time, Match>(&t, m).unwrap();
|
||||
self.ordered_times.insert(&t, m).unwrap();
|
||||
}
|
||||
|
||||
fn add_trigger(&self, m: &Match, t: Time) {
|
||||
|
|
@ -203,7 +208,7 @@ impl FilterManager {
|
|||
if self.has_after {
|
||||
// Add the (Match, Time) to the triggers map
|
||||
self.triggers
|
||||
.fetch_and_update_ext::<Match, BTreeMap<Time, usize>, _>(m, |map| {
|
||||
.fetch_and_update(m, |map| {
|
||||
let mut map = map.unwrap_or_default();
|
||||
map.insert(t, self.filter.actions().len());
|
||||
Some(map)
|
||||
|
|
@ -215,9 +220,9 @@ impl FilterManager {
|
|||
// Completely remove a Match from the matches
|
||||
fn remove_match(&self, m: &Match) {
|
||||
// FIXME do this in a transaction
|
||||
if let Some(times) = self.matches.remove_ext::<Match, BTreeSet<Time>>(m) {
|
||||
if let Some(times) = self.matches.remove(m) {
|
||||
for t in times {
|
||||
self.ordered_times.remove_ext::<Time, Match>(&t);
|
||||
self.ordered_times.remove(&t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -225,7 +230,7 @@ impl FilterManager {
|
|||
/// Completely remove a Match from the triggers
|
||||
fn remove_trigger(&self, m: &Match) {
|
||||
// FIXME do this in a transaction
|
||||
self.triggers.remove_ext::<Match, BTreeMap<Time, usize>>(m);
|
||||
self.triggers.remove(m);
|
||||
}
|
||||
|
||||
/// Returns whether we should still execute an action for this (Match, Time) trigger
|
||||
|
|
@ -234,7 +239,7 @@ impl FilterManager {
|
|||
if self.has_after {
|
||||
let mut exec_needed = false;
|
||||
self.triggers
|
||||
.fetch_and_update_ext::<Match, BTreeMap<Time, usize>, _>(&m, |map| {
|
||||
.fetch_and_update(&m, |map| {
|
||||
map.map(|mut map| {
|
||||
if let Some(counter) = map.get(&t) {
|
||||
exec_needed = true;
|
||||
|
|
@ -261,7 +266,7 @@ impl FilterManager {
|
|||
let retry_duration = self.filter.retry_duration().unwrap_or_default();
|
||||
while self
|
||||
.ordered_times
|
||||
.first_ext::<Time, Match>()
|
||||
.first()
|
||||
.unwrap()
|
||||
.is_some_and(|(t, _)| t + retry_duration < now)
|
||||
{
|
||||
|
|
@ -270,11 +275,11 @@ impl FilterManager {
|
|||
// second unwrap: we just checked in the condition that first is_some
|
||||
let (t, m) = self
|
||||
.ordered_times
|
||||
.pop_min_ext::<Time, Match>()
|
||||
.pop_min()
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
self.matches
|
||||
.fetch_and_update_ext::<Match, BTreeSet<Time>, _>(&m, |set| {
|
||||
.fetch_and_update(&m, |set| {
|
||||
let mut set = set.unwrap();
|
||||
set.remove(&t);
|
||||
Some(set)
|
||||
|
|
@ -285,7 +290,7 @@ impl FilterManager {
|
|||
|
||||
fn get_times(&self, m: &Match) -> usize {
|
||||
self.matches
|
||||
.get_ext::<Match, BTreeSet<Time>>(m)
|
||||
.get(m)
|
||||
.unwrap()
|
||||
.map(|v| v.len())
|
||||
.unwrap_or(0)
|
||||
|
|
@ -295,7 +300,7 @@ impl FilterManager {
|
|||
let longuest_action_duration = self.filter.longuest_action_duration();
|
||||
let number_of_actions = self.filter.actions().len();
|
||||
|
||||
for (m, map) in self.triggers.iter_ext::<Match, BTreeMap<Time, usize>>() {
|
||||
for (m, map) in self.triggers.iter() {
|
||||
let new_map: BTreeMap<_, _> = map
|
||||
.into_iter()
|
||||
// Keep only times that are still relevant
|
||||
|
|
@ -306,12 +311,12 @@ impl FilterManager {
|
|||
|
||||
if new_map.is_empty() {
|
||||
// No upcoming time, delete the entry from the Tree
|
||||
self.triggers.remove_ext::<Match, BTreeMap<Time, usize>>(&m);
|
||||
self.triggers.remove(&m);
|
||||
} else {
|
||||
// Insert back the upcoming times
|
||||
let _ = self
|
||||
.triggers
|
||||
.insert_ext::<Match, BTreeMap<Time, usize>>(&m, &new_map);
|
||||
.insert(&m, &new_map);
|
||||
|
||||
// Schedule the upcoming times
|
||||
for t in new_map.into_keys() {
|
||||
|
|
|
|||
|
|
@ -1,19 +1,26 @@
|
|||
use std::collections::BTreeSet;
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
marker::PhantomData,
|
||||
};
|
||||
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
|
||||
use sled::Result;
|
||||
|
||||
use crate::concepts::{Config, Filter};
|
||||
use crate::concepts::{Config, Filter, Match, Time};
|
||||
|
||||
/// This trait permits to streamline how to open the correct [`Tree`]s
|
||||
/// so that we can reliably open the same Trees in multiple places
|
||||
///
|
||||
/// [`Tree`]: https://docs.rs/sled/latest/sled/struct.Tree.html
|
||||
/// This trait permits to manage in a single place what are the names of [`sled::Tree`]s in
|
||||
/// reaction. It streamlines [`sled::Tree`]s opening so that we can reliably open the same Trees in
|
||||
/// multiple places.
|
||||
/// It also permits to manage the cleanup of unused trees.
|
||||
pub trait SledDbExt {
|
||||
fn open_filter_matches_tree(&self, filter: &Filter) -> Result<sled::Tree>;
|
||||
fn open_filter_ordered_times_tree(&self, filter: &Filter) -> Result<sled::Tree>;
|
||||
fn open_filter_triggers_tree(&self, filter: &Filter) -> Result<sled::Tree>;
|
||||
fn open_filter_matches_tree(&self, filter: &Filter) -> Result<Tree<Match, BTreeSet<Time>>>;
|
||||
fn open_filter_ordered_times_tree(&self, filter: &Filter) -> Result<Tree<Time, Match>>;
|
||||
fn open_filter_triggers_tree(
|
||||
&self,
|
||||
filter: &Filter,
|
||||
) -> Result<Tree<Match, BTreeMap<Time, usize>>>;
|
||||
|
||||
fn cleanup_unused_trees(&self, config: &Config);
|
||||
}
|
||||
|
||||
|
|
@ -34,14 +41,22 @@ fn filter_triggers_tree_name(filter: &Filter) -> String {
|
|||
}
|
||||
|
||||
impl SledDbExt for sled::Db {
|
||||
fn open_filter_matches_tree(&self, filter: &Filter) -> Result<sled::Tree> {
|
||||
fn open_filter_matches_tree(&self, filter: &Filter) -> Result<Tree<Match, BTreeSet<Time>>> {
|
||||
self.open_tree(filter_matches_tree_name(filter).as_bytes())
|
||||
.map(|tree| Tree::new(tree))
|
||||
}
|
||||
fn open_filter_ordered_times_tree(&self, filter: &Filter) -> Result<sled::Tree> {
|
||||
|
||||
fn open_filter_ordered_times_tree(&self, filter: &Filter) -> Result<Tree<Time, Match>> {
|
||||
self.open_tree(filter_ordered_times_tree_name(filter).as_bytes())
|
||||
.map(|tree| Tree::new(tree))
|
||||
}
|
||||
fn open_filter_triggers_tree(&self, filter: &Filter) -> Result<sled::Tree> {
|
||||
|
||||
fn open_filter_triggers_tree(
|
||||
&self,
|
||||
filter: &Filter,
|
||||
) -> Result<Tree<Match, BTreeMap<Time, usize>>> {
|
||||
self.open_tree(filter_triggers_tree_name(filter).as_bytes())
|
||||
.map(|tree| Tree::new(tree))
|
||||
}
|
||||
|
||||
fn cleanup_unused_trees(&self, config: &Config) {
|
||||
|
|
@ -75,44 +90,41 @@ impl SledDbExt for sled::Db {
|
|||
}
|
||||
}
|
||||
|
||||
/// This trait just permits to have less verbose typing in the next trait
|
||||
pub trait KV: Serialize + DeserializeOwned {}
|
||||
impl<T> KV for T where T: Serialize + DeserializeOwned {}
|
||||
|
||||
/// This trait permits to have typed Trees and avoid handling the de/serialization in business logic.
|
||||
/// key and value types must be [`Serialize`] and [`Deserialize`].
|
||||
///
|
||||
/// [`Serialize`]: https://docs.rs/serde/latest/serde/trait.Serialize.html
|
||||
/// [`Deserialize`]: https://docs.rs/serde/latest/serde/trait.Deserialize.html
|
||||
pub trait SledTreeExt {
|
||||
fn get_ext<K: KV, V: KV>(&self, k: &K) -> Result<Option<V>>;
|
||||
fn remove_ext<K: KV, V: KV>(&self, k: &K) -> Option<V>;
|
||||
fn first_ext<K: KV, V: KV>(&self) -> Result<Option<(K, V)>>;
|
||||
fn pop_min_ext<K: KV, V: KV>(&self) -> Result<Option<(K, V)>>;
|
||||
fn fetch_and_update_ext<K: KV, V: KV, F>(&self, k: &K, f: F) -> Result<Option<V>>
|
||||
where
|
||||
F: FnMut(Option<V>) -> Option<V>;
|
||||
fn insert_ext<K: KV, V: KV>(&self, k: &K, v: &V) -> Result<Option<V>>;
|
||||
#[allow(clippy::needless_lifetimes)] // I find this clearer with 2 lifetimes
|
||||
fn iter_ext<'a, 'b, K: KV, V: KV>(&'a self) -> impl Iterator<Item = (K, V)> + 'b;
|
||||
/// This [`sled::Tree`] wrapper permits to have typed Trees and avoid handling the de/serialization in
|
||||
/// business logic.
|
||||
/// Key and value types must be [`serde::Serialize`] and [`serde::Deserialize`].
|
||||
#[derive(Clone)]
|
||||
pub struct Tree<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned> {
|
||||
tree: sled::Tree,
|
||||
_k_marker: PhantomData<K>,
|
||||
_v_marker: PhantomData<V>,
|
||||
}
|
||||
|
||||
#[allow(clippy::unwrap_used)]
|
||||
impl SledTreeExt for sled::Tree {
|
||||
fn get_ext<K: KV, V: KV>(&self, k: &K) -> Result<Option<V>> {
|
||||
let k = bincode::serialize(k).unwrap();
|
||||
Ok(self.get(k)?.map(|v| bincode::deserialize(&v).unwrap()))
|
||||
impl<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned> Tree<K, V> {
|
||||
fn new(tree: sled::Tree) -> Self {
|
||||
Self {
|
||||
tree,
|
||||
_k_marker: PhantomData::<K>,
|
||||
_v_marker: PhantomData::<V>,
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_ext<K: KV, V: KV>(&self, key: &K) -> Option<V> {
|
||||
pub fn get(&self, k: &K) -> Result<Option<V>> {
|
||||
let k = bincode::serialize(k).unwrap();
|
||||
Ok(self.tree.get(k)?.map(|v| bincode::deserialize(&v).unwrap()))
|
||||
}
|
||||
|
||||
pub fn remove(&self, key: &K) -> Option<V> {
|
||||
let key = bincode::serialize(key).unwrap();
|
||||
self.remove(key)
|
||||
self.tree
|
||||
.remove(key)
|
||||
.unwrap()
|
||||
.map(|value| bincode::deserialize(&value).unwrap())
|
||||
}
|
||||
|
||||
fn first_ext<K: KV, V: KV>(&self) -> Result<Option<(K, V)>> {
|
||||
let option = self.first()?;
|
||||
pub fn first(&self) -> Result<Option<(K, V)>> {
|
||||
let option = self.tree.first()?;
|
||||
match option {
|
||||
None => Ok(None),
|
||||
Some((k, v)) => {
|
||||
|
|
@ -123,8 +135,8 @@ impl SledTreeExt for sled::Tree {
|
|||
}
|
||||
}
|
||||
|
||||
fn pop_min_ext<K: KV, V: KV>(&self) -> Result<Option<(K, V)>> {
|
||||
let option = self.pop_min()?;
|
||||
pub fn pop_min(&self) -> Result<Option<(K, V)>> {
|
||||
let option = self.tree.pop_min()?;
|
||||
match option {
|
||||
None => Ok(None),
|
||||
Some((k, v)) => {
|
||||
|
|
@ -135,7 +147,7 @@ impl SledTreeExt for sled::Tree {
|
|||
}
|
||||
}
|
||||
|
||||
fn fetch_and_update_ext<K: KV, V: KV, F>(&self, k: &K, mut f: F) -> Result<Option<V>>
|
||||
pub fn fetch_and_update<F>(&self, k: &K, mut f: F) -> Result<Option<V>>
|
||||
where
|
||||
F: FnMut(Option<V>) -> Option<V>,
|
||||
{
|
||||
|
|
@ -145,14 +157,16 @@ impl SledTreeExt for sled::Tree {
|
|||
f(v).map(|v| bincode::serialize(&v).unwrap())
|
||||
};
|
||||
Ok(self
|
||||
.tree
|
||||
.fetch_and_update(k, f)?
|
||||
.map(|v| bincode::deserialize::<V>(&v).unwrap()))
|
||||
}
|
||||
|
||||
fn insert_ext<K: KV, V: KV>(&self, k: &K, v: &V) -> Result<Option<V>> {
|
||||
pub fn insert(&self, k: &K, v: &V) -> Result<Option<V>> {
|
||||
let k = bincode::serialize(k).unwrap();
|
||||
let v = bincode::serialize(v).unwrap();
|
||||
Ok(self
|
||||
.tree
|
||||
.insert(k, v)?
|
||||
.map(|v| bincode::deserialize(&v).unwrap()))
|
||||
}
|
||||
|
|
@ -160,8 +174,8 @@ impl SledTreeExt for sled::Tree {
|
|||
// The lifetime annotations permit to decouple the lifetime of self
|
||||
// from the limetime of the Iterator
|
||||
#[allow(clippy::needless_lifetimes)] // I find this clearer with 2 lifetimes
|
||||
fn iter_ext<'a, 'b, K: KV, V: KV>(&'a self) -> impl Iterator<Item = (K, V)> + 'b {
|
||||
self.iter().map(|elt| {
|
||||
pub fn iter<'a, 'b>(&'a self) -> impl Iterator<Item = (K, V)> + 'b {
|
||||
self.tree.iter().map(|elt| {
|
||||
let (k, v) = elt.unwrap();
|
||||
let k: K = bincode::deserialize(&k).unwrap();
|
||||
let v: V = bincode::deserialize(&v).unwrap();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue