diff --git a/src/concepts/action.rs b/src/concepts/action.rs index 6c07fae..fc8ef1c 100644 --- a/src/concepts/action.rs +++ b/src/concepts/action.rs @@ -39,6 +39,15 @@ impl Action { pub fn name(&self) -> &str { &self.name } + + pub fn stream_name(&self) -> &str { + &self.stream_name + } + + pub fn filter_name(&self) -> &str { + &self.filter_name + } + pub fn after_duration(&self) -> Option { self.after_duration } diff --git a/src/daemon/action.rs b/src/daemon/action.rs index c86093b..92cf54d 100644 --- a/src/daemon/action.rs +++ b/src/daemon/action.rs @@ -1,10 +1,10 @@ use std::{ collections::{BTreeMap, BTreeSet}, process::Stdio, - sync::{Arc, Mutex}, + sync::Arc, }; -use chrono::{Local, TimeDelta}; +use chrono::Local; use tokio::sync::Semaphore; use tracing::{error, info}; @@ -13,43 +13,16 @@ use crate::{ protocol::Order, }; -struct State { - pending: BTreeMap>, - ordered_times: BTreeMap, -} - -impl State { - fn add_match(&mut self, m: &Match, t: &Time) { - self.pending.entry(m.clone()).or_default().insert(*t); - self.ordered_times.insert(*t, m.clone()); - } - - fn remove(&mut self, m: &Match, t: &Time) -> bool { - self.pending.entry(m.clone()).and_modify(|times| { - times.remove(t); - }); - self.ordered_times.remove(t).is_some() - } - - fn clear_past_times(&mut self, now: &Time, after: Option) { - let after = after.unwrap_or_default(); - while self - .ordered_times - .first_key_value() - .is_some_and(|(k, _)| *k + after < *now) - { - #[allow(clippy::unwrap_used)] // we just checked in the condition that first is_some - let (_, m) = self.ordered_times.pop_first().unwrap(); - self.pending.remove(&m); - } - } -} +use super::{SledDbExt, SledTreeExt}; #[derive(Clone)] pub struct ActionManager { action: &'static Action, exec_limit: Option>, - state: Arc>, + // BTreeMap>, + pending: sled::Tree, + // BTreeMap, + ordered_times: sled::Tree, } impl ActionManager { @@ -59,20 +32,18 @@ impl ActionManager { pub fn new( action: &'static Action, - pending: BTreeMap>, exec_limit: Option>, - ) -> Self { - Self { + db: &sled::Db, + ) -> Result { + let manager = Self { action, exec_limit, - state: Arc::new(Mutex::new(State { - pending: pending.clone(), - ordered_times: pending - .into_iter() - .flat_map(|(m, times)| times.into_iter().map(move |time| (time, m.clone()))) - .collect(), - })), - } + pending: db.open_action_pending_tree(action)?, + ordered_times: db.open_action_ordered_times_tree(action)?, + }; + let now = Local::now(); + manager.clear_past_times(&now); + Ok(manager) } pub fn handle_exec(&mut self, m: Match, t: Time) { @@ -81,19 +52,16 @@ impl ActionManager { if exec_t < now { self.exec_now(m); } else { - { - #[allow(clippy::unwrap_used)] // propagating panics is ok - let mut state = self.state.lock().unwrap(); - state.clear_past_times(&t, self.action.after_duration()); - state.add_match(&m, &exec_t); - } + // FIXME is clearing here buggy logic or right logic? + self.clear_past_times(&t); + self.add_match(&m, &exec_t); + let this = self.clone(); tokio::spawn(async move { let dur = (exec_t - now).to_std().expect("Duration is bigger than what's supported. Did you put an enormous after duration?"); tokio::time::sleep(dur).await; #[allow(clippy::unwrap_used)] // propagating panics is ok - let mut state = this.state.lock().unwrap(); - if state.remove(&m, &exec_t) { + if this.remove(&m, &exec_t) { this.exec_now(m); } }); @@ -106,11 +74,9 @@ impl ActionManager { is_match: F, ) -> BTreeMap, Vec> { #[allow(clippy::unwrap_used)] // propagating panics is ok - let mut state = self.state.lock().unwrap(); - state - .pending + self.pending .clone() - .into_iter() + .iter_ext::>() // match filtering .filter(|(match_, _)| is_match(match_)) .fold(BTreeMap::default(), |mut acc, (match_, times)| { @@ -118,7 +84,7 @@ impl ActionManager { .iter() .map(|time| { if let Order::Flush = order { - if state.remove(&match_, time) { + if self.remove(&match_, time) { self.exec_now(match_.clone()); } } @@ -160,14 +126,66 @@ impl ActionManager { pub fn quit(&mut self) { if self.action.on_exit() { #[allow(clippy::unwrap_used)] // propagating panics is ok - let mut state = self.state.lock().unwrap(); - for (m, times) in &state.pending { - for _ in times { - self.exec_now(m.clone()); - } - } - state.pending.clear(); - state.ordered_times.clear(); + self.pending + .iter() + .map(|elt| { + let (k, v) = elt.unwrap(); + let k: Match = bincode::deserialize(&k).unwrap(); + let v: BTreeSet