use sled in action.rs

- Fix remove logic in clear_past_times
- add iter_ext() in SledTreeExt
- add SledDbExt for opening correct trees
This commit is contained in:
ppom 2025-02-07 12:00:00 +01:00
commit f5dd36eec1
5 changed files with 186 additions and 91 deletions

View file

@ -39,6 +39,15 @@ impl Action {
pub fn name(&self) -> &str {
&self.name
}
pub fn stream_name(&self) -> &str {
&self.stream_name
}
pub fn filter_name(&self) -> &str {
&self.filter_name
}
pub fn after_duration(&self) -> Option<TimeDelta> {
self.after_duration
}

View file

@ -1,10 +1,10 @@
use std::{
collections::{BTreeMap, BTreeSet},
process::Stdio,
sync::{Arc, Mutex},
sync::Arc,
};
use chrono::{Local, TimeDelta};
use chrono::Local;
use tokio::sync::Semaphore;
use tracing::{error, info};
@ -13,43 +13,16 @@ use crate::{
protocol::Order,
};
struct State {
pending: BTreeMap<Match, BTreeSet<Time>>,
ordered_times: BTreeMap<Time, Match>,
}
impl State {
fn add_match(&mut self, m: &Match, t: &Time) {
self.pending.entry(m.clone()).or_default().insert(*t);
self.ordered_times.insert(*t, m.clone());
}
fn remove(&mut self, m: &Match, t: &Time) -> bool {
self.pending.entry(m.clone()).and_modify(|times| {
times.remove(t);
});
self.ordered_times.remove(t).is_some()
}
fn clear_past_times(&mut self, now: &Time, after: Option<TimeDelta>) {
let after = after.unwrap_or_default();
while self
.ordered_times
.first_key_value()
.is_some_and(|(k, _)| *k + after < *now)
{
#[allow(clippy::unwrap_used)] // we just checked in the condition that first is_some
let (_, m) = self.ordered_times.pop_first().unwrap();
self.pending.remove(&m);
}
}
}
use super::{SledDbExt, SledTreeExt};
#[derive(Clone)]
pub struct ActionManager {
action: &'static Action,
exec_limit: Option<Arc<Semaphore>>,
state: Arc<Mutex<State>>,
// BTreeMap<Match, BTreeSet<Time>>,
pending: sled::Tree,
// BTreeMap<Time, Match>,
ordered_times: sled::Tree,
}
impl ActionManager {
@ -59,20 +32,18 @@ impl ActionManager {
pub fn new(
action: &'static Action,
pending: BTreeMap<Match, BTreeSet<Time>>,
exec_limit: Option<Arc<Semaphore>>,
) -> Self {
Self {
db: &sled::Db,
) -> Result<Self, sled::Error> {
let manager = Self {
action,
exec_limit,
state: Arc::new(Mutex::new(State {
pending: pending.clone(),
ordered_times: pending
.into_iter()
.flat_map(|(m, times)| times.into_iter().map(move |time| (time, m.clone())))
.collect(),
})),
}
pending: db.open_action_pending_tree(action)?,
ordered_times: db.open_action_ordered_times_tree(action)?,
};
let now = Local::now();
manager.clear_past_times(&now);
Ok(manager)
}
pub fn handle_exec(&mut self, m: Match, t: Time) {
@ -81,19 +52,16 @@ impl ActionManager {
if exec_t < now {
self.exec_now(m);
} else {
{
#[allow(clippy::unwrap_used)] // propagating panics is ok
let mut state = self.state.lock().unwrap();
state.clear_past_times(&t, self.action.after_duration());
state.add_match(&m, &exec_t);
}
// FIXME is clearing here buggy logic or right logic?
self.clear_past_times(&t);
self.add_match(&m, &exec_t);
let this = self.clone();
tokio::spawn(async move {
let dur = (exec_t - now).to_std().expect("Duration is bigger than what's supported. Did you put an enormous after duration?");
tokio::time::sleep(dur).await;
#[allow(clippy::unwrap_used)] // propagating panics is ok
let mut state = this.state.lock().unwrap();
if state.remove(&m, &exec_t) {
if this.remove(&m, &exec_t) {
this.exec_now(m);
}
});
@ -106,11 +74,9 @@ impl ActionManager {
is_match: F,
) -> BTreeMap<Vec<String>, Vec<String>> {
#[allow(clippy::unwrap_used)] // propagating panics is ok
let mut state = self.state.lock().unwrap();
state
.pending
self.pending
.clone()
.into_iter()
.iter_ext::<Match, BTreeSet<Time>>()
// match filtering
.filter(|(match_, _)| is_match(match_))
.fold(BTreeMap::default(), |mut acc, (match_, times)| {
@ -118,7 +84,7 @@ impl ActionManager {
.iter()
.map(|time| {
if let Order::Flush = order {
if state.remove(&match_, time) {
if self.remove(&match_, time) {
self.exec_now(match_.clone());
}
}
@ -160,14 +126,66 @@ impl ActionManager {
pub fn quit(&mut self) {
if self.action.on_exit() {
#[allow(clippy::unwrap_used)] // propagating panics is ok
let mut state = self.state.lock().unwrap();
for (m, times) in &state.pending {
for _ in times {
self.exec_now(m.clone());
}
}
state.pending.clear();
state.ordered_times.clear();
self.pending
.iter()
.map(|elt| {
let (k, v) = elt.unwrap();
let k: Match = bincode::deserialize(&k).unwrap();
let v: BTreeSet<Time> = bincode::deserialize(&v).unwrap();
(k, v)
})
.for_each(|(m, times)| {
for _ in times {
self.exec_now(m.clone());
}
});
}
}
fn add_match(&self, m: &Match, t: &Time) {
// FIXME do this in a transaction
self.pending
.fetch_and_update_ext::<Match, BTreeSet<Time>, _>(m, |set| {
let mut set = set.unwrap_or_default();
set.insert(*t);
Some(set)
})
.unwrap();
self.ordered_times.insert_ext::<Time, Match>(t, m).unwrap();
}
fn remove(&self, m: &Match, t: &Time) -> bool {
self.pending
.fetch_and_update_ext::<Match, BTreeSet<Time>, _>(m, |set| {
let mut set = set.unwrap_or_default();
set.remove(t);
Some(set)
})
.unwrap();
self.ordered_times.remove_ext::<Time, Match>(t).is_some()
}
fn clear_past_times(&self, now: &Time) {
let after = self.action.after_duration().unwrap_or_default();
while self
.ordered_times
.first_ext::<Time, Match>()
.unwrap()
.is_some_and(|(k, _)| k + after < *now)
{
#[allow(clippy::unwrap_used)] // second unwrap: we just checked in the condition that first is_some
let (t, m) = self
.ordered_times
.pop_min_ext::<Time, Match>()
.unwrap()
.unwrap();
self.pending
.fetch_and_update_ext::<Match, BTreeSet<Time>, _>(&m, |set| {
let mut set = set.unwrap();
set.remove(&t);
Some(set)
})
.unwrap();
}
}
}

View file

@ -12,7 +12,7 @@ use crate::{
protocol::{Order, PatternStatus},
};
use super::{action::ActionManager, SledTreeExt};
use super::{action::ActionManager, SledDbExt, SledTreeExt};
pub struct FilterManager {
filter: &'static Filter,
@ -30,19 +30,15 @@ impl FilterManager {
exec_limit: Option<Arc<Semaphore>>,
db: &sled::Db,
) -> Result<Self, sled::Error> {
let mut manager = Self {
let manager = Self {
filter,
action_managers: filter
.actions()
.values()
.map(|action| ActionManager::new(action, BTreeMap::default(), exec_limit.clone()))
.collect(),
matches: db.open_tree(
format!("matches_{}.{}", filter.stream_name(), filter.name()).as_bytes(),
)?,
ordered_times: db.open_tree(
format!("ordered_times_{}.{}", filter.stream_name(), filter.name()).as_bytes(),
)?,
.map(|action| ActionManager::new(action, exec_limit.clone(), db))
.collect::<Result<Vec<_>, _>>()?,
matches: db.open_filter_matches_tree(filter)?,
ordered_times: db.open_filter_ordered_times_tree(filter)?,
};
let now = Local::now();
manager.clear_past_times(now);
@ -97,15 +93,9 @@ impl FilterManager {
.all(|(a_match, regex)| regex.is_match(a_match))
};
let cs: BTreeMap<_, _> = self
.matches
.iter()
.map(|elt| {
let (k, v) = elt.unwrap();
let k: Match = bincode::deserialize(&k).unwrap();
let v: BTreeSet<Time> = bincode::deserialize(&v).unwrap();
(k, v)
})
let matches = self.matches.clone();
let cs: BTreeMap<_, _> = matches
.iter_ext::<Match, BTreeSet<Time>>()
// match filtering
.filter(|(match_, _)| is_match(match_))
.map(|(match_, times)| {
@ -136,6 +126,7 @@ impl FilterManager {
}
fn add_match(&mut self, m: &Match, t: Time) {
// FIXME do this in a transaction
self.matches
.fetch_and_update_ext::<Match, BTreeSet<Time>, _>(m, |set| {
let mut set = set.unwrap_or_default();
@ -147,6 +138,7 @@ impl FilterManager {
}
fn remove_match(&mut self, m: &Match) {
// FIXME do this in a transaction
if let Some(times) = self.matches.remove_ext::<Match, BTreeSet<Time>>(m) {
for t in times {
self.ordered_times.remove_ext::<Time, Match>(&t);
@ -154,21 +146,28 @@ impl FilterManager {
}
}
fn clear_past_times(&mut self, now: Time) {
fn clear_past_times(&self, now: Time) {
let retry_duration = self.filter.retry_duration().unwrap_or_default();
while self
.ordered_times
.first_ext::<Time, Match>()
.unwrap()
.is_some_and(|(k, _)| k + retry_duration < now)
.is_some_and(|(t, _)| t + retry_duration < now)
{
#[allow(clippy::unwrap_used)] // we just checked in the condition that first is_some
let (_, m) = self
// FIXME do this in a transaction
#[allow(clippy::unwrap_used)] // second unwrap: we just checked in the condition that first is_some
let (t, m) = self
.ordered_times
.pop_min_ext::<Time, Match>()
.unwrap()
.unwrap();
self.matches.remove_ext::<Match, BTreeSet<Time>>(&m);
self.matches
.fetch_and_update_ext::<Match, BTreeSet<Time>, _>(&m, |set| {
let mut set = set.unwrap();
set.remove(&t);
Some(set)
})
.unwrap();
}
}

View file

@ -17,7 +17,7 @@ use tracing::info;
use crate::concepts::{Config, Filter, Stream};
use filter::FilterManager;
use sledext::SledTreeExt;
use sledext::*;
use socket::socket_manager;
use stream::stream_manager;

View file

@ -2,9 +2,68 @@ use serde::{de::DeserializeOwned, Serialize};
use sled::Result;
use crate::concepts::{Action, Filter};
/// This trait permits to streamline how to open the correct [`Tree`]s
/// so that we can reliably open the same Trees in multiple places
///
/// [`Tree`]: https://docs.rs/sled/latest/sled/struct.Tree.html
pub trait SledDbExt {
fn open_filter_matches_tree(&self, filter: &Filter) -> Result<sled::Tree>;
fn open_filter_ordered_times_tree(&self, filter: &Filter) -> Result<sled::Tree>;
fn open_action_pending_tree(&self, action: &Action) -> Result<sled::Tree>;
fn open_action_ordered_times_tree(&self, action: &Action) -> Result<sled::Tree>;
}
impl SledDbExt for sled::Db {
fn open_filter_matches_tree(&self, filter: &Filter) -> Result<sled::Tree> {
self.open_tree(
format!("filter_matches_{}.{}", filter.stream_name(), filter.name()).as_bytes(),
)
}
fn open_filter_ordered_times_tree(&self, filter: &Filter) -> Result<sled::Tree> {
self.open_tree(
format!(
"filter_ordered_times_{}.{}",
filter.stream_name(),
filter.name()
)
.as_bytes(),
)
}
fn open_action_pending_tree(&self, action: &Action) -> Result<sled::Tree> {
self.open_tree(
format!(
"action_pending_{}.{}.{}",
action.stream_name(),
action.filter_name(),
action.name()
)
.as_bytes(),
)
}
fn open_action_ordered_times_tree(&self, action: &Action) -> Result<sled::Tree> {
self.open_tree(
format!(
"action_ordered_times_{}.{}.{}",
action.stream_name(),
action.filter_name(),
action.name()
)
.as_bytes(),
)
}
}
/// This trait just permits to have less verbose typing in the next trait
pub trait KV: Serialize + DeserializeOwned {}
impl<T> KV for T where T: Serialize + DeserializeOwned {}
/// This trait permits to have typed Trees and avoid handling the de/serialization in business logic.
/// key and value types must be [`Serialize`] and [`Deserialize`].
///
/// [`Serialize`]: https://docs.rs/serde/latest/serde/trait.Serialize.html
/// [`Deserialize`]: https://docs.rs/serde/latest/serde/trait.Deserialize.html
pub trait SledTreeExt {
fn get_ext<K: KV, V: KV>(&self, k: &K) -> Result<Option<V>>;
fn remove_ext<K: KV, V: KV>(&self, k: &K) -> Option<V>;
@ -14,6 +73,7 @@ pub trait SledTreeExt {
where
F: FnMut(Option<V>) -> Option<V>;
fn insert_ext<K: KV, V: KV>(&self, k: &K, v: &V) -> Result<Option<V>>;
fn iter_ext<K: KV, V: KV>(&self) -> impl Iterator<Item = (K, V)>;
}
#[allow(clippy::unwrap_used)]
@ -75,4 +135,13 @@ impl SledTreeExt for sled::Tree {
.insert(k, v)?
.map(|v| bincode::deserialize(&v).unwrap()))
}
fn iter_ext<K: KV, V: KV>(&self) -> impl Iterator<Item = (K, V)> {
self.iter().map(|elt| {
let (k, v) = elt.unwrap();
let k: K = bincode::deserialize(&k).unwrap();
let v: V = bincode::deserialize(&v).unwrap();
(k, v)
})
}
}