diff --git a/src/concepts/pattern/ip/mod.rs b/src/concepts/pattern/ip/mod.rs index 3bf727c..ba62dd9 100644 --- a/src/concepts/pattern/ip/mod.rs +++ b/src/concepts/pattern/ip/mod.rs @@ -710,7 +710,7 @@ mod patternip_tests { ); let bed = bed.part2(filter, Local::now(), None).await; assert_eq!( - bed.manager.handle_line(&line, Local::now()), + bed.manager.handle_line(&line, Local::now()).await, React::Trigger, "line: {line}" ); diff --git a/src/daemon/filter/mod.rs b/src/daemon/filter/mod.rs index ab98579..3f3cfdc 100644 --- a/src/daemon/filter/mod.rs +++ b/src/daemon/filter/mod.rs @@ -3,14 +3,10 @@ pub mod tests; mod state; -use std::{ - collections::BTreeMap, - process::Stdio, - sync::{Arc, Mutex, MutexGuard}, -}; +use std::{collections::BTreeMap, process::Stdio, sync::Arc}; use regex::Regex; -use tokio::sync::Semaphore; +use tokio::sync::{Mutex, MutexGuard, Semaphore}; use tracing::{error, info}; use crate::{ @@ -67,7 +63,8 @@ impl FilterManager { shutdown, state: Arc::new(Mutex::new(State::new(filter, db, now).await?)), }; - this.clear_past_triggers_and_schedule_future_actions(now); + this.clear_past_triggers_and_schedule_future_actions(now) + .await; Ok(this) } @@ -85,8 +82,8 @@ impl FilterManager { async fn handle_match(&self, m: Match, now: Time) -> bool { #[allow(clippy::unwrap_used)] // propagating panics is ok - let mut state = self.state.lock().unwrap(); - state.clear_past_matches(now); + let mut state = self.state.lock().await; + state.clear_past_matches(now).await; // if Duplicate::Ignore and already triggered, skip if state.triggers.contains_key(&m) && Duplicate::Ignore == self.filter.duplicate { @@ -98,25 +95,27 @@ impl FilterManager { let trigger = match self.filter.retry { None => true, Some(retry) => { - state.add_match(m.clone(), now); + state.add_match(m.clone(), now).await; // Number of stored times for this match >= configured retry for this filter state.get_times(&m).await >= retry as usize } }; if trigger { - state.remove_match(&m); + state.remove_match(&m).await; let actions_left = if Duplicate::Extend == self.filter.duplicate { // Get number of actions left from last trigger state - .remove_trigger(&m).await + .remove_trigger(&m) + .await // Only one entry in the map because Duplicate::Extend .and_then(|map| map.first_key_value().map(|(_, n)| n.clone())) } else { None }; - state.add_trigger(m.clone(), now, actions_left); - self.schedule_exec(m, now, now, &mut state, false, actions_left); + state.add_trigger(m.clone(), now, actions_left).await; + self.schedule_exec(m, now, now, &mut state, false, actions_left) + .await; } trigger @@ -130,10 +129,11 @@ impl FilterManager { let match_ = self.filter.get_match_from_patterns(patterns)?; #[allow(clippy::unwrap_used)] // propagating panics is ok - let mut state = self.state.lock().unwrap(); - state.remove_match(&match_); - state.add_trigger(match_.clone(), now, None); - self.schedule_exec(match_, now, now, &mut state, false, None); + let mut state = self.state.lock().await; + state.remove_match(&match_).await; + state.add_trigger(match_.clone(), now, None).await; + self.schedule_exec(match_, now, now, &mut state, false, None) + .await; Ok(()) } @@ -155,7 +155,7 @@ impl FilterManager { }; #[allow(clippy::unwrap_used)] // propagating panics is ok - let mut state = self.state.lock().unwrap(); + let mut state = self.state.lock().await; let mut cs: BTreeMap<_, _> = { let cloned_matches = state @@ -167,27 +167,26 @@ impl FilterManager { .cloned() .collect::>(); - cloned_matches - .into_iter() - .map(|match_| { - // mutable State required here - if let Order::Flush = order { - state.remove_match(&match_); - } - let matches = state - .matches - .get(&match_) - .map(|times| times.len()) - .unwrap_or(0); - ( - match_, - PatternStatus { - matches, - ..Default::default() - }, - ) - }) - .collect() + let mut cs = BTreeMap::new(); + for match_ in cloned_matches { + // mutable State required here + if let Order::Flush = order { + state.remove_match(&match_).await; + } + let matches = state + .matches + .get(&match_) + .map(|times| times.len()) + .unwrap_or(0); + cs.insert( + match_, + PatternStatus { + matches, + ..Default::default() + }, + ); + } + cs }; let cloned_triggers = state @@ -203,7 +202,7 @@ impl FilterManager { let map = state.triggers.get(&m).unwrap().clone(); if let Order::Flush = order { - state.remove_trigger(&m); + state.remove_trigger(&m).await; } for (t, remaining) in map { @@ -227,7 +226,7 @@ impl FilterManager { self.shutdown.clone(), action, m.clone(), - ); + ).await; } } } @@ -269,7 +268,7 @@ impl FilterManager { if exec_time <= now { if state.decrement_trigger(&m, t, false).await { - exec_now(&self.exec_limit, self.shutdown.clone(), action, m); + exec_now(&self.exec_limit, self.shutdown.clone(), action, m).await; } } else { let this = self.clone(); @@ -288,9 +287,9 @@ impl FilterManager { // Exec action if triggered hasn't been already flushed if !exiting || action.on_exit { #[allow(clippy::unwrap_used)] // propagating panics is ok - let mut state = this.state.lock().unwrap(); + let mut state = this.state.lock().await; if state.decrement_trigger(&m, t, exiting).await { - exec_now(&this.exec_limit, this.shutdown, action, m); + exec_now(&this.exec_limit, this.shutdown, action, m).await; } } }); @@ -309,7 +308,7 @@ impl FilterManager { .count() as u64; #[allow(clippy::unwrap_used)] // propagating panics is ok - let mut state = self.state.lock().unwrap(); + let mut state = self.state.lock().await; let cloned_triggers = state .triggers @@ -327,7 +326,7 @@ impl FilterManager { .collect(); if map.is_empty() { - state.triggers.remove(&m); + state.triggers.remove(&m).await; } else { // Filter duplicates // unwrap is fine because map is not empty (see if) @@ -339,10 +338,11 @@ impl FilterManager { // No filtering Duplicate::Rerun => map, }; - state.triggers.insert(m.clone(), map.clone()); + state.triggers.insert(m.clone(), map.clone()).await; for (t, _) in map { // Schedule the upcoming times - self.schedule_exec(m.clone(), t, now, &mut state, true, None); + self.schedule_exec(m.clone(), t, now, &mut state, true, None) + .await; } } } diff --git a/src/daemon/filter/state.rs b/src/daemon/filter/state.rs index a04ba90..e74ba79 100644 --- a/src/daemon/filter/state.rs +++ b/src/daemon/filter/state.rs @@ -89,7 +89,7 @@ impl State { map } }) - }); + }).await; } } let mut this = Self { @@ -99,15 +99,15 @@ impl State { ordered_times, triggers, }; - this.clear_past_matches(now); - this.load_matches_from_ordered_times(); + this.clear_past_matches(now).await; + this.load_matches_from_ordered_times().await; Ok(this) } pub async fn add_match(&mut self, m: Match, t: Time) { let set = self.matches.entry(m.clone()).or_default(); set.insert(t); - self.ordered_times.insert(t, m); + self.ordered_times.insert(t, m).await; } pub async fn add_trigger(&mut self, m: Match, t: Time, action_count: Option) { @@ -124,7 +124,7 @@ impl State { value } }) - }); + }).await; } } @@ -132,7 +132,7 @@ impl State { pub async fn remove_match(&mut self, m: &Match) { if let Some(set) = self.matches.get(m) { for t in set { - self.ordered_times.remove(t); + self.ordered_times.remove(t).await; } self.matches.remove(m); } @@ -167,7 +167,7 @@ impl State { Some(map) } }) - }); + }).await; } // else don't do anything // Because that will remove the entry in the DB, and make @@ -181,7 +181,7 @@ impl State { map.insert(mt.t, count - 1); map }) - }); + }).await; } } exec_needed @@ -203,7 +203,7 @@ impl State { let (t, m) = self.ordered_times.first_key_value().unwrap(); (*t, m.clone()) }; - self.ordered_times.remove(&t); + self.ordered_times.remove(&t).await; if let Some(set) = self.matches.get(&m) { let mut set = set.clone(); set.remove(&t); @@ -396,7 +396,7 @@ mod tests { assert!(state.matches.is_empty()); // Add non-previously added match - state.add_match(one.clone(), now_less_1s); + state.add_match(one.clone(), now_less_1s).await; assert_eq!( state.ordered_times.tree(), &BTreeMap::from([(now_less_1s, one.clone()),]) @@ -407,7 +407,7 @@ mod tests { ); // Add previously added match - state.add_match(one.clone(), now_less_4s); + state.add_match(one.clone(), now_less_4s).await; assert_eq!( state.ordered_times.tree(), &BTreeMap::from([(now_less_1s, one.clone()), (now_less_4s, one.clone())]) @@ -418,7 +418,7 @@ mod tests { ); // Remove added match - state.remove_match(&one); + state.remove_match(&one).await; assert!(state.ordered_times.tree().is_empty()); assert!(state.matches.is_empty()); } @@ -436,7 +436,7 @@ mod tests { assert!(state.triggers.tree().is_empty()); // Add unique trigger - state.add_trigger(one.clone(), now, None); + state.add_trigger(one.clone(), now, None).await; // Nothing is really added assert!(state.triggers.tree().is_empty()); @@ -490,7 +490,7 @@ mod tests { assert!(state.triggers.tree().is_empty()); // Add unique trigger - state.add_trigger(one.clone(), now, None); + state.add_trigger(one.clone(), now, None).await; assert_eq!( state.triggers.tree(), &BTreeMap::from([(one.clone(), [(now, 3)].into())]) @@ -514,7 +514,7 @@ mod tests { assert!(!state.decrement_trigger(&one, now, false).await); // Add unique trigger (but decrement exiting-like) - state.add_trigger(one.clone(), now, None); + state.add_trigger(one.clone(), now, None).await; assert_eq!( state.triggers.tree(), &BTreeMap::from([(one.clone(), [(now, 3)].into())]) @@ -544,8 +544,8 @@ mod tests { assert!(!state.decrement_trigger(&one, now, false).await); // Add trigger with neighbour - state.add_trigger(one.clone(), now, None); - state.add_trigger(one.clone(), now_plus_1s, None); + state.add_trigger(one.clone(), now, None).await; + state.add_trigger(one.clone(), now_plus_1s, None).await; assert_eq!( state.triggers.tree(), &BTreeMap::from([(one.clone(), [(now_plus_1s, 3), (now, 3)].into())]) @@ -571,18 +571,18 @@ mod tests { // Decrement → false assert!(!state.decrement_trigger(&one, now, false).await); // Remove neighbour - state.remove_trigger(&one); + state.remove_trigger(&one).await; assert!(state.triggers.tree().is_empty()); // Add two neighbour triggers - state.add_trigger(one.clone(), now, None); - state.add_trigger(one.clone(), now_plus_1s, None); + state.add_trigger(one.clone(), now, None).await; + state.add_trigger(one.clone(), now_plus_1s, None).await; assert_eq!( state.triggers.tree(), &BTreeMap::from([(one.clone(), [(now_plus_1s, 3), (now, 3)].into())]) ); // Remove them - state.remove_trigger(&one); + state.remove_trigger(&one).await; assert!(state.triggers.tree().is_empty()); } } diff --git a/src/daemon/filter/tests.rs b/src/daemon/filter/tests.rs index 936d728..1b7a72c 100644 --- a/src/daemon/filter/tests.rs +++ b/src/daemon/filter/tests.rs @@ -102,6 +102,7 @@ impl TestBed { &mut db, now, ) + .await .unwrap(), semaphore, } @@ -119,8 +120,8 @@ pub struct TestBed2 { } impl TestBed2 { - pub fn assert_empty_trees(&self) { - let state = self.manager.state.lock().unwrap(); + pub async fn assert_empty_trees(&self) { + let state = self.manager.state.lock().await; assert!(state.matches.is_empty(), "matches must be empty"); assert!( state.ordered_times.is_empty(), @@ -177,14 +178,17 @@ async fn three_matches_then_action_then_delayed_action() { let now2s = bed.now + TimeDelta::seconds(2); // No match - assert_eq!(bed.manager.handle_line("test 131", now), React::NoMatch); - bed.assert_empty_trees(); + assert_eq!( + bed.manager.handle_line("test 131", now).await, + React::NoMatch + ); + bed.assert_empty_trees().await; // First match let one = vec!["one".to_string()]; - assert_eq!(bed.manager.handle_line("test one", now), React::Match); + assert_eq!(bed.manager.handle_line("test one", now).await, React::Match); { - let state = bed.manager.state.lock().unwrap(); + let state = bed.manager.state.lock().await; assert_eq!( state.matches, BTreeMap::from([(one.clone(), BTreeSet::from([now]))]), @@ -199,9 +203,12 @@ async fn three_matches_then_action_then_delayed_action() { } // Second match - assert_eq!(bed.manager.handle_line("test one", now1s), React::Match); + assert_eq!( + bed.manager.handle_line("test one", now1s).await, + React::Match + ); { - let state = bed.manager.state.lock().unwrap(); + let state = bed.manager.state.lock().await; assert_eq!( state.matches, BTreeMap::from([(one.clone(), BTreeSet::from([now, now1s]))]), @@ -217,9 +224,12 @@ async fn three_matches_then_action_then_delayed_action() { // Third match, exec let _block = bed.semaphore.acquire().await.unwrap(); - assert_eq!(bed.manager.handle_line("test one", now2s), React::Trigger); + assert_eq!( + bed.manager.handle_line("test one", now2s).await, + React::Trigger + ); { - let state = bed.manager.state.lock().unwrap(); + let state = bed.manager.state.lock().await; assert!( state.matches.is_empty(), "matches are emptied after trigger" @@ -241,7 +251,7 @@ async fn three_matches_then_action_then_delayed_action() { tokio::time::sleep(Duration::from_millis(40)).await; // Check first action assert_eq!( - bed.manager.state.lock().unwrap().triggers.tree(), + bed.manager.state.lock().await.triggers.tree(), &BTreeMap::from([(one.clone(), BTreeMap::from([(now2s, 1)]))]), "triggers still contain the triggered match with 1 action left" ); @@ -255,7 +265,7 @@ async fn three_matches_then_action_then_delayed_action() { tokio::time::sleep(Duration::from_millis(100)).await; // Check second action assert!( - bed.manager.state.lock().unwrap().triggers.is_empty(), + bed.manager.state.lock().await.triggers.is_empty(), "triggers are empty again" ); assert_eq!( @@ -264,7 +274,7 @@ async fn three_matches_then_action_then_delayed_action() { "the output file contains the result of the 2 actions" ); - bed.assert_empty_trees(); + bed.assert_empty_trees().await; } } @@ -296,12 +306,18 @@ async fn one_match_one_action() { let now = bed.now; // No match - assert_eq!(bed.manager.handle_line("test 131", now), React::NoMatch); - bed.assert_empty_trees(); + assert_eq!( + bed.manager.handle_line("test 131", now).await, + React::NoMatch + ); + bed.assert_empty_trees().await; // match - assert_eq!(bed.manager.handle_line("test one", now), React::Trigger); - bed.assert_empty_trees(); + assert_eq!( + bed.manager.handle_line("test one", now).await, + React::Trigger + ); + bed.assert_empty_trees().await; // the action executes tokio::time::sleep(Duration::from_millis(40)).await; @@ -311,7 +327,7 @@ async fn one_match_one_action() { "the output file contains the result of the first action" ); - bed.assert_empty_trees(); + bed.assert_empty_trees().await; } } @@ -343,14 +359,20 @@ async fn one_match_one_delayed_action() { let now = bed.now; // No match - assert_eq!(bed.manager.handle_line("test 131", now), React::NoMatch); - bed.assert_empty_trees(); + assert_eq!( + bed.manager.handle_line("test 131", now).await, + React::NoMatch + ); + bed.assert_empty_trees().await; // Match let one = vec!["one".to_string()]; - assert_eq!(bed.manager.handle_line("test one", now), React::Trigger); + assert_eq!( + bed.manager.handle_line("test one", now).await, + React::Trigger + ); { - let state = bed.manager.state.lock().unwrap(); + let state = bed.manager.state.lock().await; assert!(state.matches.is_empty(), "matches stay empty"); assert!(state.ordered_times.is_empty(), "ordered_times stay empty"); assert_eq!( @@ -368,7 +390,7 @@ async fn one_match_one_delayed_action() { // The action executes tokio::time::sleep(Duration::from_millis(140)).await; assert!( - bed.manager.state.lock().unwrap().triggers.is_empty(), + bed.manager.state.lock().await.triggers.is_empty(), "triggers are empty again" ); assert_eq!( @@ -377,7 +399,7 @@ async fn one_match_one_delayed_action() { "the output file contains the result of the action" ); - bed.assert_empty_trees(); + bed.assert_empty_trees().await; } } @@ -421,7 +443,7 @@ async fn one_db_match_one_runtime_match_one_action() { let bed = bed.part2(filter, now, Some(db)).await; { - let state = bed.manager.state.lock().unwrap(); + let state = bed.manager.state.lock().await; assert_eq!( state.matches, BTreeMap::from([(one.clone(), BTreeSet::from([now1s]))]), @@ -436,8 +458,11 @@ async fn one_db_match_one_runtime_match_one_action() { } // match - assert_eq!(bed.manager.handle_line("test one", now), React::Trigger); - bed.assert_empty_trees(); + assert_eq!( + bed.manager.handle_line("test one", now).await, + React::Trigger + ); + bed.assert_empty_trees().await; // the action executes tokio::time::sleep(Duration::from_millis(40)).await; assert_eq!( @@ -486,7 +511,7 @@ async fn one_outdated_db_match() { // Finish setup let bed = bed.part2(filter, now, Some(db)).await; - bed.assert_empty_trees(); + bed.assert_empty_trees().await; } } @@ -545,6 +570,7 @@ async fn trigger_unmatched_pattern() { .collect(), now, ) + .await .unwrap(); // the action executes @@ -552,7 +578,7 @@ async fn trigger_unmatched_pattern() { // No matches, one action registered { - let state = bed.manager.state.lock().unwrap(); + let state = bed.manager.state.lock().await; assert!(state.matches.is_empty()); assert!(state.ordered_times.is_empty()); assert_eq!( @@ -625,6 +651,7 @@ async fn trigger_matched_pattern() { .collect(), now, ) + .await .unwrap(); // the action executes @@ -632,7 +659,7 @@ async fn trigger_matched_pattern() { // No matches, one action registered { - let state = bed.manager.state.lock().unwrap(); + let state = bed.manager.state.lock().await; assert!(state.matches.is_empty()); assert!(state.ordered_times.is_empty()); assert_eq!( @@ -707,7 +734,7 @@ async fn trigger_deduplication_on_start() { // No matches, one or two action·s registered { - let state = bed.manager.state.lock().unwrap(); + let state = bed.manager.state.lock().await; assert!(state.matches.is_empty()); assert!(state.ordered_times.is_empty()); assert_eq!( @@ -773,12 +800,12 @@ async fn multiple_triggers() { let bed = bed.part2(filter, Local::now(), None).await; assert_eq!( - bed.manager.handle_line("test one", Local::now()), + bed.manager.handle_line("test one", Local::now()).await, React::Match, "Duplicate: {dup:?}" ); assert_eq!( - bed.manager.handle_line("test one", Local::now()), + bed.manager.handle_line("test one", Local::now()).await, React::Trigger, "Duplicate: {dup:?}" ); @@ -795,7 +822,7 @@ async fn multiple_triggers() { tokio::time::sleep(Duration::from_millis(50)).await; assert_eq!( - bed.manager.handle_line("test one", Local::now()), + bed.manager.handle_line("test one", Local::now()).await, match dup { Duplicate::Ignore => React::Match, _ => React::Match, @@ -804,7 +831,7 @@ async fn multiple_triggers() { ); assert_eq!( - bed.manager.handle_line("test one", Local::now()), + bed.manager.handle_line("test one", Local::now()).await, match dup { Duplicate::Ignore => React::Match, _ => React::Trigger, @@ -938,7 +965,7 @@ async fn extend_trigger_multiple_after_actions() { let bed = bed.part2(filter, Local::now(), None).await; assert_eq!( - bed.manager.handle_line("test one", Local::now()), + bed.manager.handle_line("test one", Local::now()).await, React::Trigger, ); @@ -952,7 +979,7 @@ async fn extend_trigger_multiple_after_actions() { ); assert_eq!( - bed.manager.handle_line("test one", Local::now()), + bed.manager.handle_line("test one", Local::now()).await, React::Trigger, ); @@ -1019,7 +1046,7 @@ async fn ip_specific() { let bed = bed.part2(filter, Local::now(), None).await; assert_eq!( - bed.manager.handle_line("test 1.2.3.4", Local::now()), + bed.manager.handle_line("test 1.2.3.4", Local::now()).await, React::Trigger, ); @@ -1035,7 +1062,8 @@ async fn ip_specific() { assert_eq!( bed.manager - .handle_line("test 1:2:3:4:5:6:7:8", Local::now()), + .handle_line("test 1:2:3:4:5:6:7:8", Local::now()) + .await, React::Trigger, );