From 38d433fa6c08c230bfd91de258ec85ed65f096ab Mon Sep 17 00:00:00 2001 From: ppom Date: Mon, 10 Feb 2025 12:00:00 +0100 Subject: [PATCH] actions startup logic realized ordered_times where not needed in ActionManager so I removed them. We cleanup old actions only on startup, and we run the up-to-date ones, so it's ok to do this in O(n). Thus ordered_times is not needed. Whenever actions expire, they're run by a scheduled tokio task, so no old state is kept. Now on startup: - cleanup old actions - run still relevant ones --- src/daemon/action.rs | 87 +++++++++++++++++++++---------------------- src/daemon/filter.rs | 4 +- src/daemon/sledext.rs | 12 ------ 3 files changed, 44 insertions(+), 59 deletions(-) diff --git a/src/daemon/action.rs b/src/daemon/action.rs index 92cf54d..e262570 100644 --- a/src/daemon/action.rs +++ b/src/daemon/action.rs @@ -19,10 +19,8 @@ use super::{SledDbExt, SledTreeExt}; pub struct ActionManager { action: &'static Action, exec_limit: Option>, - // BTreeMap>, + /// sled::Tree quivalent to BTreeMap>, pending: sled::Tree, - // BTreeMap, - ordered_times: sled::Tree, } impl ActionManager { @@ -39,10 +37,8 @@ impl ActionManager { action, exec_limit, pending: db.open_action_pending_tree(action)?, - ordered_times: db.open_action_ordered_times_tree(action)?, }; - let now = Local::now(); - manager.clear_past_times(&now); + manager.clear_past_times_and_schedule_future_actions(); Ok(manager) } @@ -52,19 +48,8 @@ impl ActionManager { if exec_t < now { self.exec_now(m); } else { - // FIXME is clearing here buggy logic or right logic? - self.clear_past_times(&t); self.add_match(&m, &exec_t); - - let this = self.clone(); - tokio::spawn(async move { - let dur = (exec_t - now).to_std().expect("Duration is bigger than what's supported. Did you put an enormous after duration?"); - tokio::time::sleep(dur).await; - #[allow(clippy::unwrap_used)] // propagating panics is ok - if this.remove(&m, &exec_t) { - this.exec_now(m); - } - }); + self.schedule_exec(m, exec_t, now); } } @@ -84,7 +69,7 @@ impl ActionManager { .iter() .map(|time| { if let Order::Flush = order { - if self.remove(&match_, time) { + if self.remove_match(&match_, time) { self.exec_now(match_.clone()); } } @@ -96,6 +81,20 @@ impl ActionManager { }) } + fn schedule_exec(&self, m: Match, t: Time, now: Time) { + let this = self.clone(); + tokio::spawn(async move { + let dur = (t - now).to_std().expect( + "Duration is bigger than what's supported. Did you put an enormous after duration?", + ); + tokio::time::sleep(dur).await; + #[allow(clippy::unwrap_used)] // propagating panics is ok + if this.remove_match(&m, &t) { + this.exec_now(m); + } + }); + } + fn exec_now(&self, m: Match) { let semaphore = self.exec_limit.clone(); let action = self.action; @@ -143,7 +142,6 @@ impl ActionManager { } fn add_match(&self, m: &Match, t: &Time) { - // FIXME do this in a transaction self.pending .fetch_and_update_ext::, _>(m, |set| { let mut set = set.unwrap_or_default(); @@ -151,41 +149,40 @@ impl ActionManager { Some(set) }) .unwrap(); - self.ordered_times.insert_ext::(t, m).unwrap(); } - fn remove(&self, m: &Match, t: &Time) -> bool { + fn remove_match(&self, m: &Match, t: &Time) -> bool { self.pending .fetch_and_update_ext::, _>(m, |set| { let mut set = set.unwrap_or_default(); set.remove(t); Some(set) }) - .unwrap(); - self.ordered_times.remove_ext::(t).is_some() + .unwrap() + .is_some() } - fn clear_past_times(&self, now: &Time) { - let after = self.action.after_duration().unwrap_or_default(); - while self - .ordered_times - .first_ext::() - .unwrap() - .is_some_and(|(k, _)| k + after < *now) - { - #[allow(clippy::unwrap_used)] // second unwrap: we just checked in the condition that first is_some - let (t, m) = self - .ordered_times - .pop_min_ext::() - .unwrap() - .unwrap(); - self.pending - .fetch_and_update_ext::, _>(&m, |set| { - let mut set = set.unwrap(); - set.remove(&t); - Some(set) - }) - .unwrap(); + fn clear_past_times_and_schedule_future_actions(&self) { + let now = Local::now(); + for (m, set) in self.pending.iter_ext::>() { + + // Keep only times that are still future + let new_set: BTreeSet<_> = set.into_iter().filter(|t| *t > now).collect(); + + if new_set.is_empty() { + // No upcoming time, delete the entry from the Tree + self.pending.remove_ext::>(&m); + } else { + // Insert back the upcoming times + let _ = self + .pending + .insert_ext::>(&m, &new_set); + + // Schedule the upcoming times + for t in new_set.into_iter() { + self.schedule_exec(m.clone(), t, now); + } + } } } } diff --git a/src/daemon/filter.rs b/src/daemon/filter.rs index ca8e41a..2e26033 100644 --- a/src/daemon/filter.rs +++ b/src/daemon/filter.rs @@ -17,9 +17,9 @@ use super::{action::ActionManager, SledDbExt, SledTreeExt}; pub struct FilterManager { filter: &'static Filter, action_managers: Vec, - // BTreeMap> + /// sled::Tree quivalent to BTreeMap>, matches: sled::Tree, - // BTreeMap + /// sled::Tree quivalent to BTreeMap ordered_times: sled::Tree, } diff --git a/src/daemon/sledext.rs b/src/daemon/sledext.rs index afb598b..d64673d 100644 --- a/src/daemon/sledext.rs +++ b/src/daemon/sledext.rs @@ -12,7 +12,6 @@ pub trait SledDbExt { fn open_filter_matches_tree(&self, filter: &Filter) -> Result; fn open_filter_ordered_times_tree(&self, filter: &Filter) -> Result; fn open_action_pending_tree(&self, action: &Action) -> Result; - fn open_action_ordered_times_tree(&self, action: &Action) -> Result; } impl SledDbExt for sled::Db { @@ -42,17 +41,6 @@ impl SledDbExt for sled::Db { .as_bytes(), ) } - fn open_action_ordered_times_tree(&self, action: &Action) -> Result { - self.open_tree( - format!( - "action_ordered_times_{}.{}.{}", - action.stream_name(), - action.filter_name(), - action.name() - ) - .as_bytes(), - ) - } } /// This trait just permits to have less verbose typing in the next trait