diff --git a/src/daemon/filter/mod.rs b/src/daemon/filter/mod.rs index 9d2c322..ab98579 100644 --- a/src/daemon/filter/mod.rs +++ b/src/daemon/filter/mod.rs @@ -54,7 +54,7 @@ pub enum React { #[allow(clippy::unwrap_used)] impl FilterManager { - pub fn new( + pub async fn new( filter: &'static Filter, exec_limit: Option>, shutdown: ShutdownToken, @@ -65,15 +65,15 @@ impl FilterManager { filter, exec_limit, shutdown, - state: Arc::new(Mutex::new(State::new(filter, db, now)?)), + state: Arc::new(Mutex::new(State::new(filter, db, now).await?)), }; this.clear_past_triggers_and_schedule_future_actions(now); Ok(this) } - pub fn handle_line(&self, line: &str, now: Time) -> React { + pub async fn handle_line(&self, line: &str, now: Time) -> React { if let Some(match_) = self.filter.get_match(line) { - if self.handle_match(match_, now) { + if self.handle_match(match_, now).await { React::Trigger } else { React::Match @@ -83,7 +83,7 @@ impl FilterManager { } } - fn handle_match(&self, m: Match, now: Time) -> bool { + async fn handle_match(&self, m: Match, now: Time) -> bool { #[allow(clippy::unwrap_used)] // propagating panics is ok let mut state = self.state.lock().unwrap(); state.clear_past_matches(now); @@ -100,7 +100,7 @@ impl FilterManager { Some(retry) => { state.add_match(m.clone(), now); // Number of stored times for this match >= configured retry for this filter - state.get_times(&m) >= retry as usize + state.get_times(&m).await >= retry as usize } }; @@ -109,7 +109,7 @@ impl FilterManager { let actions_left = if Duplicate::Extend == self.filter.duplicate { // Get number of actions left from last trigger state - .remove_trigger(&m) + .remove_trigger(&m).await // Only one entry in the map because Duplicate::Extend .and_then(|map| map.first_key_value().map(|(_, n)| n.clone())) } else { @@ -122,7 +122,7 @@ impl FilterManager { trigger } - pub fn handle_trigger( + pub async fn handle_trigger( &self, patterns: BTreeMap, String>, now: Time, @@ -138,7 +138,7 @@ impl FilterManager { Ok(()) } - pub fn handle_order( + pub async fn handle_order( &self, patterns: &BTreeMap, Regex>, order: Order, @@ -241,12 +241,12 @@ impl FilterManager { /// Schedule execution for a given Match. /// We check first if the trigger is still here /// because pending actions can be flushed. - fn schedule_exec( + async fn schedule_exec<'a>( &self, m: Match, t: Time, now: Time, - state: &mut MutexGuard, + state: &'a mut MutexGuard<'_, State>, startup: bool, actions_left: Option, ) { @@ -268,7 +268,7 @@ impl FilterManager { let m = m.clone(); if exec_time <= now { - if state.decrement_trigger(&m, t, false) { + if state.decrement_trigger(&m, t, false).await { exec_now(&self.exec_limit, self.shutdown.clone(), action, m); } } else { @@ -289,7 +289,7 @@ impl FilterManager { if !exiting || action.on_exit { #[allow(clippy::unwrap_used)] // propagating panics is ok let mut state = this.state.lock().unwrap(); - if state.decrement_trigger(&m, t, exiting) { + if state.decrement_trigger(&m, t, exiting).await { exec_now(&this.exec_limit, this.shutdown, action, m); } } @@ -298,7 +298,7 @@ impl FilterManager { } } - fn clear_past_triggers_and_schedule_future_actions(&self, now: Time) { + async fn clear_past_triggers_and_schedule_future_actions(&self, now: Time) { let longuest_action_duration = self.filter.longuest_action_duration; let number_of_actions = self .filter @@ -349,7 +349,7 @@ impl FilterManager { } } -fn exec_now( +async fn exec_now( exec_limit: &Option>, shutdown: ShutdownToken, action: &'static Action, diff --git a/src/daemon/filter/state.rs b/src/daemon/filter/state.rs index ec7e9ab..a04ba90 100644 --- a/src/daemon/filter/state.rs +++ b/src/daemon/filter/state.rs @@ -59,7 +59,11 @@ pub struct State { } impl State { - pub fn new(filter: &'static Filter, db: &mut Database, now: Time) -> Result { + pub async fn new( + filter: &'static Filter, + db: &mut Database, + now: Time, + ) -> Result { let ordered_times = db.open_tree( filter_ordered_times_db_name(filter), filter.retry_duration.unwrap_or_default(), @@ -100,13 +104,13 @@ impl State { Ok(this) } - pub fn add_match(&mut self, m: Match, t: Time) { + pub async fn add_match(&mut self, m: Match, t: Time) { let set = self.matches.entry(m.clone()).or_default(); set.insert(t); self.ordered_times.insert(t, m); } - pub fn add_trigger(&mut self, m: Match, t: Time, action_count: Option) { + pub async fn add_trigger(&mut self, m: Match, t: Time, action_count: Option) { // We record triggered filters only when there is an action with an `after` directive if self.has_after { // Add the (Match, Time) to the triggers map @@ -125,7 +129,7 @@ impl State { } // Completely remove a Match from the matches - pub fn remove_match(&mut self, m: &Match) { + pub async fn remove_match(&mut self, m: &Match) { if let Some(set) = self.matches.get(m) { for t in set { self.ordered_times.remove(t); @@ -135,12 +139,12 @@ impl State { } /// Completely remove a Match from the triggers - pub fn remove_trigger(&mut self, m: &Match) -> Option> { - self.triggers.remove(m) + pub async fn remove_trigger(&mut self, m: &Match) -> Option> { + self.triggers.remove(m).await } /// Returns whether we should still execute an action for this (Match, Time) trigger - pub fn decrement_trigger(&mut self, m: &Match, t: Time, exiting: bool) -> bool { + pub async fn decrement_trigger(&mut self, m: &Match, t: Time, exiting: bool) -> bool { // We record triggered filters only when there is an action with an `after` directive if self.has_after { let mut exec_needed = false; @@ -186,7 +190,7 @@ impl State { } } - pub fn clear_past_matches(&mut self, now: Time) { + pub async fn clear_past_matches(&mut self, now: Time) { let retry_duration = self.filter.retry_duration.unwrap_or_default(); while self .ordered_times @@ -212,14 +216,14 @@ impl State { } } - pub fn get_times(&self, m: &Match) -> usize { + pub async fn get_times(&self, m: &Match) -> usize { match self.matches.get(m) { Some(vec) => vec.len(), None => 0, } } - fn load_matches_from_ordered_times(&mut self) { + async fn load_matches_from_ordered_times(&mut self) { for (t, m) in self.ordered_times.iter() { let set = self.matches.entry(m.clone()).or_default(); set.insert(*t); @@ -342,7 +346,7 @@ mod tests { trigger_db, ])); - let state = State::new(filter, &mut db, now).unwrap(); + let state = State::new(filter, &mut db, now).await.unwrap(); assert_eq!( state.ordered_times.tree(), @@ -386,7 +390,7 @@ mod tests { let now_less_4s = now - TimeDelta::seconds(4); let mut db = TempDatabase::default().await; - let mut state = State::new(filter, &mut db, now).unwrap(); + let mut state = State::new(filter, &mut db, now).await.unwrap(); assert!(state.ordered_times.tree().is_empty()); assert!(state.matches.is_empty()); @@ -427,7 +431,7 @@ mod tests { let now = Local::now(); let mut db = TempDatabase::default().await; - let mut state = State::new(filter, &mut db, now).unwrap(); + let mut state = State::new(filter, &mut db, now).await.unwrap(); assert!(state.triggers.tree().is_empty()); @@ -437,7 +441,7 @@ mod tests { assert!(state.triggers.tree().is_empty()); // Will be called immediately after, it returns true - assert!(state.decrement_trigger(&one, now, false)); + assert!(state.decrement_trigger(&one, now, false).await); } #[tokio::test] @@ -481,7 +485,7 @@ mod tests { let now_plus_1s = now + TimeDelta::seconds(1); let mut db = TempDatabase::default().await; - let mut state = State::new(filter, &mut db, now).unwrap(); + let mut state = State::new(filter, &mut db, now).await.unwrap(); assert!(state.triggers.tree().is_empty()); @@ -492,22 +496,22 @@ mod tests { &BTreeMap::from([(one.clone(), [(now, 3)].into())]) ); // Decrement → true - assert!(state.decrement_trigger(&one, now, false)); + assert!(state.decrement_trigger(&one, now, false).await); assert_eq!( state.triggers.tree(), &BTreeMap::from([(one.clone(), [(now, 2)].into())]) ); // Decrement → true - assert!(state.decrement_trigger(&one, now, false)); + assert!(state.decrement_trigger(&one, now, false).await); assert_eq!( state.triggers.tree(), &BTreeMap::from([(one.clone(), [(now, 1)].into())]) ); // Decrement → true - assert!(state.decrement_trigger(&one, now, false)); + assert!(state.decrement_trigger(&one, now, false).await); assert!(state.triggers.tree().is_empty()); // Decrement → false - assert!(!state.decrement_trigger(&one, now, false)); + assert!(!state.decrement_trigger(&one, now, false).await); // Add unique trigger (but decrement exiting-like) state.add_trigger(one.clone(), now, None); @@ -516,28 +520,28 @@ mod tests { &BTreeMap::from([(one.clone(), [(now, 3)].into())]) ); // Decrement → true - assert!(state.decrement_trigger(&one, now, true)); + assert!(state.decrement_trigger(&one, now, true).await); assert_eq!( state.triggers.tree(), &BTreeMap::from([(one.clone(), [(now, 2)].into())]) ); // Decrement → true - assert!(state.decrement_trigger(&one, now, true)); + assert!(state.decrement_trigger(&one, now, true).await); assert_eq!( state.triggers.tree(), &BTreeMap::from([(one.clone(), [(now, 1)].into())]) ); // Decrement but exiting → true, does nothing - assert!(state.decrement_trigger(&one, now, true)); + assert!(state.decrement_trigger(&one, now, true).await); assert_eq!( state.triggers.tree(), &BTreeMap::from([(one.clone(), [(now, 1)].into())]) ); // Decrement → true - assert!(state.decrement_trigger(&one, now, false)); + assert!(state.decrement_trigger(&one, now, false).await); assert!(state.triggers.tree().is_empty()); // Decrement → false - assert!(!state.decrement_trigger(&one, now, false)); + assert!(!state.decrement_trigger(&one, now, false).await); // Add trigger with neighbour state.add_trigger(one.clone(), now, None); @@ -547,25 +551,25 @@ mod tests { &BTreeMap::from([(one.clone(), [(now_plus_1s, 3), (now, 3)].into())]) ); // Decrement → true - assert!(state.decrement_trigger(&one, now, false)); + assert!(state.decrement_trigger(&one, now, false).await); assert_eq!( state.triggers.tree(), &BTreeMap::from([(one.clone(), [(now_plus_1s, 3), (now, 2)].into())]) ); // Decrement → true - assert!(state.decrement_trigger(&one, now, false)); + assert!(state.decrement_trigger(&one, now, false).await); assert_eq!( state.triggers.tree(), &BTreeMap::from([(one.clone(), [(now_plus_1s, 3), (now, 1)].into())]) ); // Decrement → true - assert!(state.decrement_trigger(&one, now, false)); + assert!(state.decrement_trigger(&one, now, false).await); assert_eq!( state.triggers.tree(), &BTreeMap::from([(one.clone(), [(now_plus_1s, 3)].into())]) ); // Decrement → false - assert!(!state.decrement_trigger(&one, now, false)); + assert!(!state.decrement_trigger(&one, now, false).await); // Remove neighbour state.remove_trigger(&one); assert!(state.triggers.tree().is_empty()); diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index c0b848f..c62d84f 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -66,7 +66,7 @@ pub async fn daemon( let mut filter_managers = HashMap::new(); for filter in stream.filters.values() { let manager = - FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &mut db, now)?; + FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &mut db, now).await?; filter_managers.insert(filter, manager); } state.insert(stream, filter_managers.clone()); diff --git a/src/daemon/socket.rs b/src/daemon/socket.rs index 74d0a7c..124b6c5 100644 --- a/src/daemon/socket.rs +++ b/src/daemon/socket.rs @@ -51,7 +51,7 @@ async fn open_socket(path: PathBuf) -> Result { err_str!(UnixListener::bind(path)) } -fn handle_trigger_order( +async fn handle_trigger_order( stream_name: Option, filter_name: Option, patterns: BTreeMap, String>, @@ -100,13 +100,13 @@ fn handle_trigger_order( }; let now = Local::now(); - match filter_manager.handle_trigger(patterns, now) { + match filter_manager.handle_trigger(patterns, now).await { Ok(()) => DaemonResponse::Ok(()), Err(err) => DaemonResponse::Err(err), } } -fn handle_show_or_flush_order( +async fn handle_show_or_flush_order( stream_name: Option, filter_name: Option, patterns: BTreeMap, Regex>, @@ -114,40 +114,39 @@ fn handle_show_or_flush_order( shared_state: &HashMap<&'static Stream, HashMap<&'static Filter, FilterManager>>, ) -> DaemonResponse { let now = Local::now(); - let cs: ClientStatus = shared_state + let iter = shared_state .iter() // stream filtering .filter(|(stream, _)| { stream_name.is_none() || stream_name.clone().is_some_and(|name| name == stream.name) - }) - .fold(BTreeMap::new(), |mut acc, (stream, filter_manager)| { - let inner_map = filter_manager - .iter() - // filter filtering - .filter(|(filter, _)| { - filter_name.is_none() - || filter_name.clone().is_some_and(|name| name == filter.name) - }) - // pattern filtering - .filter(|(filter, _)| { - patterns - .iter() - .all(|(pattern, _)| filter.patterns.get(pattern).is_some()) - }) - .map(|(filter, manager)| { - ( - filter.name.to_owned(), - manager.handle_order(&patterns, order, now), - ) - }) - .collect(); - acc.insert(stream.name.to_owned(), inner_map); - acc }); + let mut cs = ClientStatus::new(); + for (stream, filter_manager) in iter { + let iter = filter_manager + .iter() + // filter filtering + .filter(|(filter, _)| { + filter_name.is_none() || filter_name.clone().is_some_and(|name| name == filter.name) + }) + // pattern filtering + .filter(|(filter, _)| { + patterns + .iter() + .all(|(pattern, _)| filter.patterns.get(pattern).is_some()) + }); + let mut inner_map = BTreeMap::new(); + for (filter, manager) in iter { + inner_map.insert( + filter.name.to_owned(), + manager.handle_order(&patterns, order, now).await, + ); + } + cs.insert(stream.name.to_owned(), inner_map); + } DaemonResponse::Order(cs) } -fn answer_order( +async fn answer_order( config: &'static Config, shared_state: &HashMap<&'static Stream, HashMap<&'static Filter, FilterManager>>, options: ClientRequest, @@ -182,7 +181,7 @@ fn answer_order( }; if let Order::Trigger = options.order { - handle_trigger_order(stream_name, filter_name, patterns, shared_state) + handle_trigger_order(stream_name, filter_name, patterns, shared_state).await } else { let patterns = match patterns .into_iter() @@ -206,6 +205,7 @@ fn answer_order( options.order, shared_state, ) + .await } } @@ -264,7 +264,7 @@ impl Socket { serde_json::from_slice(&encoded_request) ); // Process - let response = answer_order(config, &shared_state, request); + let response = answer_order(config, &shared_state, request).await; // Encode let encoded_response = or_next!("failed to serialize response", serde_json::to_string::(&response)); diff --git a/src/daemon/stream.rs b/src/daemon/stream.rs index ac2d46a..a55d449 100644 --- a/src/daemon/stream.rs +++ b/src/daemon/stream.rs @@ -173,7 +173,7 @@ impl StreamManager { Some(Ok(line)) => { let now = Local::now(); for manager in self.matching_filters(&line) { - manager.handle_line(&line, now); + manager.handle_line(&line, now).await; } } Some(Err(err)) => { diff --git a/src/treedb/mod.rs b/src/treedb/mod.rs index fc0c71d..e14db68 100644 --- a/src/treedb/mod.rs +++ b/src/treedb/mod.rs @@ -330,7 +330,7 @@ impl Deref for Tree { // Reimplement write functions impl Tree { /// Log an [`Entry`] to the [`Database`] - fn log(&mut self, k: &K, v: Option<&V>) { + async fn log(&mut self, k: &K, v: Option<&V>) { let e = Entry { tree: self.id.clone(), key: serde_json::to_value(k).expect("could not serialize key"), @@ -339,36 +339,40 @@ impl Tree { }; let tx = self.tx.clone(); // FIXME what if send fails? - tokio::spawn(async move { - let _ = tx.send(e).await; - }); + let _ = tx.send(e).await; } /// Asynchronously persisted version of [`BTreeMap::insert`] - pub fn insert(&mut self, key: K, value: V) -> Option { - self.log(&key, Some(&value)); + pub async fn insert(&mut self, key: K, value: V) -> Option { + self.log(&key, Some(&value)).await; self.tree.insert(key, value) } /// Asynchronously persisted version of [`BTreeMap::pop_first`] - pub fn pop_first(&mut self) -> Option<(K, V)> { - self.tree.pop_first().map(|(key, value)| { - self.log(&key, None); - (key, value) - }) + pub async fn pop_first(&mut self) -> Option<(K, V)> { + match self.tree.pop_first() { + Some((key, value)) => { + self.log(&key, None).await; + Some((key, value)) + } + None => None, + } } /// Asynchronously persisted version of [`BTreeMap::pop_last`] - pub fn pop_last(&mut self) -> Option<(K, V)> { - self.tree.pop_last().map(|(key, value)| { - self.log(&key, None); - (key, value) - }) + pub async fn pop_last(&mut self) -> Option<(K, V)> { + match self.tree.pop_last() { + Some((key, value)) => { + self.log(&key, None).await; + Some((key, value)) + } + None => None, + } } /// Asynchronously persisted version of [`BTreeMap::remove`] - pub fn remove(&mut self, key: &K) -> Option { - self.log(key, None); + pub async fn remove(&mut self, key: &K) -> Option { + self.log(key, None).await; self.tree.remove(key) } @@ -376,14 +380,14 @@ impl Tree { /// Returning None removes the item if it existed before. /// Asynchronously persisted. /// *API design borrowed from [`fjall::WriteTransaction::fetch_update`].* - pub fn fetch_update) -> Option>( + pub async fn fetch_update) -> Option>( &mut self, key: K, mut f: F, ) -> Option { let old_value = self.get(&key).map(|v| v.to_owned()); let new_value = f(old_value); - self.log(&key, new_value.as_ref()); + self.log(&key, new_value.as_ref()).await; if let Some(new_value) = new_value { self.tree.insert(key, new_value) } else {