actions startup logic

realized ordered_times where not needed in ActionManager so I removed
them. We cleanup old actions only on startup, and we run the up-to-date
ones, so it's ok to do this in O(n). Thus ordered_times is not needed.
Whenever actions expire, they're run by a scheduled tokio task, so no
old state is kept.

Now on startup:
- cleanup old actions
- run still relevant ones
This commit is contained in:
ppom 2025-02-10 12:00:00 +01:00
commit 38d433fa6c
3 changed files with 44 additions and 59 deletions

View file

@ -19,10 +19,8 @@ use super::{SledDbExt, SledTreeExt};
pub struct ActionManager {
action: &'static Action,
exec_limit: Option<Arc<Semaphore>>,
// BTreeMap<Match, BTreeSet<Time>>,
/// sled::Tree quivalent to BTreeMap<Match, BTreeSet<Time>>,
pending: sled::Tree,
// BTreeMap<Time, Match>,
ordered_times: sled::Tree,
}
impl ActionManager {
@ -39,10 +37,8 @@ impl ActionManager {
action,
exec_limit,
pending: db.open_action_pending_tree(action)?,
ordered_times: db.open_action_ordered_times_tree(action)?,
};
let now = Local::now();
manager.clear_past_times(&now);
manager.clear_past_times_and_schedule_future_actions();
Ok(manager)
}
@ -52,19 +48,8 @@ impl ActionManager {
if exec_t < now {
self.exec_now(m);
} else {
// FIXME is clearing here buggy logic or right logic?
self.clear_past_times(&t);
self.add_match(&m, &exec_t);
let this = self.clone();
tokio::spawn(async move {
let dur = (exec_t - now).to_std().expect("Duration is bigger than what's supported. Did you put an enormous after duration?");
tokio::time::sleep(dur).await;
#[allow(clippy::unwrap_used)] // propagating panics is ok
if this.remove(&m, &exec_t) {
this.exec_now(m);
}
});
self.schedule_exec(m, exec_t, now);
}
}
@ -84,7 +69,7 @@ impl ActionManager {
.iter()
.map(|time| {
if let Order::Flush = order {
if self.remove(&match_, time) {
if self.remove_match(&match_, time) {
self.exec_now(match_.clone());
}
}
@ -96,6 +81,20 @@ impl ActionManager {
})
}
fn schedule_exec(&self, m: Match, t: Time, now: Time) {
let this = self.clone();
tokio::spawn(async move {
let dur = (t - now).to_std().expect(
"Duration is bigger than what's supported. Did you put an enormous after duration?",
);
tokio::time::sleep(dur).await;
#[allow(clippy::unwrap_used)] // propagating panics is ok
if this.remove_match(&m, &t) {
this.exec_now(m);
}
});
}
fn exec_now(&self, m: Match) {
let semaphore = self.exec_limit.clone();
let action = self.action;
@ -143,7 +142,6 @@ impl ActionManager {
}
fn add_match(&self, m: &Match, t: &Time) {
// FIXME do this in a transaction
self.pending
.fetch_and_update_ext::<Match, BTreeSet<Time>, _>(m, |set| {
let mut set = set.unwrap_or_default();
@ -151,41 +149,40 @@ impl ActionManager {
Some(set)
})
.unwrap();
self.ordered_times.insert_ext::<Time, Match>(t, m).unwrap();
}
fn remove(&self, m: &Match, t: &Time) -> bool {
fn remove_match(&self, m: &Match, t: &Time) -> bool {
self.pending
.fetch_and_update_ext::<Match, BTreeSet<Time>, _>(m, |set| {
let mut set = set.unwrap_or_default();
set.remove(t);
Some(set)
})
.unwrap();
self.ordered_times.remove_ext::<Time, Match>(t).is_some()
.unwrap()
.is_some()
}
fn clear_past_times(&self, now: &Time) {
let after = self.action.after_duration().unwrap_or_default();
while self
.ordered_times
.first_ext::<Time, Match>()
.unwrap()
.is_some_and(|(k, _)| k + after < *now)
{
#[allow(clippy::unwrap_used)] // second unwrap: we just checked in the condition that first is_some
let (t, m) = self
.ordered_times
.pop_min_ext::<Time, Match>()
.unwrap()
.unwrap();
self.pending
.fetch_and_update_ext::<Match, BTreeSet<Time>, _>(&m, |set| {
let mut set = set.unwrap();
set.remove(&t);
Some(set)
})
.unwrap();
fn clear_past_times_and_schedule_future_actions(&self) {
let now = Local::now();
for (m, set) in self.pending.iter_ext::<Match, BTreeSet<Time>>() {
// Keep only times that are still future
let new_set: BTreeSet<_> = set.into_iter().filter(|t| *t > now).collect();
if new_set.is_empty() {
// No upcoming time, delete the entry from the Tree
self.pending.remove_ext::<Match, BTreeSet<Time>>(&m);
} else {
// Insert back the upcoming times
let _ = self
.pending
.insert_ext::<Match, BTreeSet<Time>>(&m, &new_set);
// Schedule the upcoming times
for t in new_set.into_iter() {
self.schedule_exec(m.clone(), t, now);
}
}
}
}
}

View file

@ -17,9 +17,9 @@ use super::{action::ActionManager, SledDbExt, SledTreeExt};
pub struct FilterManager {
filter: &'static Filter,
action_managers: Vec<ActionManager>,
// BTreeMap<Match, BTreeSet<Time>>
/// sled::Tree quivalent to BTreeMap<Match, BTreeSet<Time>>,
matches: sled::Tree,
// BTreeMap<Time, Match>
/// sled::Tree quivalent to BTreeMap<Time, Match>
ordered_times: sled::Tree,
}

View file

@ -12,7 +12,6 @@ pub trait SledDbExt {
fn open_filter_matches_tree(&self, filter: &Filter) -> Result<sled::Tree>;
fn open_filter_ordered_times_tree(&self, filter: &Filter) -> Result<sled::Tree>;
fn open_action_pending_tree(&self, action: &Action) -> Result<sled::Tree>;
fn open_action_ordered_times_tree(&self, action: &Action) -> Result<sled::Tree>;
}
impl SledDbExt for sled::Db {
@ -42,17 +41,6 @@ impl SledDbExt for sled::Db {
.as_bytes(),
)
}
fn open_action_ordered_times_tree(&self, action: &Action) -> Result<sled::Tree> {
self.open_tree(
format!(
"action_ordered_times_{}.{}.{}",
action.stream_name(),
action.filter_name(),
action.name()
)
.as_bytes(),
)
}
}
/// This trait just permits to have less verbose typing in the next trait