This commit is contained in:
ppom 2025-09-07 12:00:00 +02:00
commit aec3bb54ed
No known key found for this signature in database
4 changed files with 138 additions and 110 deletions

View file

@ -710,7 +710,7 @@ mod patternip_tests {
);
let bed = bed.part2(filter, Local::now(), None).await;
assert_eq!(
bed.manager.handle_line(&line, Local::now()),
bed.manager.handle_line(&line, Local::now()).await,
React::Trigger,
"line: {line}"
);

View file

@ -3,14 +3,10 @@ pub mod tests;
mod state;
use std::{
collections::BTreeMap,
process::Stdio,
sync::{Arc, Mutex, MutexGuard},
};
use std::{collections::BTreeMap, process::Stdio, sync::Arc};
use regex::Regex;
use tokio::sync::Semaphore;
use tokio::sync::{Mutex, MutexGuard, Semaphore};
use tracing::{error, info};
use crate::{
@ -67,7 +63,8 @@ impl FilterManager {
shutdown,
state: Arc::new(Mutex::new(State::new(filter, db, now).await?)),
};
this.clear_past_triggers_and_schedule_future_actions(now);
this.clear_past_triggers_and_schedule_future_actions(now)
.await;
Ok(this)
}
@ -85,8 +82,8 @@ impl FilterManager {
async fn handle_match(&self, m: Match, now: Time) -> bool {
#[allow(clippy::unwrap_used)] // propagating panics is ok
let mut state = self.state.lock().unwrap();
state.clear_past_matches(now);
let mut state = self.state.lock().await;
state.clear_past_matches(now).await;
// if Duplicate::Ignore and already triggered, skip
if state.triggers.contains_key(&m) && Duplicate::Ignore == self.filter.duplicate {
@ -98,25 +95,27 @@ impl FilterManager {
let trigger = match self.filter.retry {
None => true,
Some(retry) => {
state.add_match(m.clone(), now);
state.add_match(m.clone(), now).await;
// Number of stored times for this match >= configured retry for this filter
state.get_times(&m).await >= retry as usize
}
};
if trigger {
state.remove_match(&m);
state.remove_match(&m).await;
let actions_left = if Duplicate::Extend == self.filter.duplicate {
// Get number of actions left from last trigger
state
.remove_trigger(&m).await
.remove_trigger(&m)
.await
// Only one entry in the map because Duplicate::Extend
.and_then(|map| map.first_key_value().map(|(_, n)| n.clone()))
} else {
None
};
state.add_trigger(m.clone(), now, actions_left);
self.schedule_exec(m, now, now, &mut state, false, actions_left);
state.add_trigger(m.clone(), now, actions_left).await;
self.schedule_exec(m, now, now, &mut state, false, actions_left)
.await;
}
trigger
@ -130,10 +129,11 @@ impl FilterManager {
let match_ = self.filter.get_match_from_patterns(patterns)?;
#[allow(clippy::unwrap_used)] // propagating panics is ok
let mut state = self.state.lock().unwrap();
state.remove_match(&match_);
state.add_trigger(match_.clone(), now, None);
self.schedule_exec(match_, now, now, &mut state, false, None);
let mut state = self.state.lock().await;
state.remove_match(&match_).await;
state.add_trigger(match_.clone(), now, None).await;
self.schedule_exec(match_, now, now, &mut state, false, None)
.await;
Ok(())
}
@ -155,7 +155,7 @@ impl FilterManager {
};
#[allow(clippy::unwrap_used)] // propagating panics is ok
let mut state = self.state.lock().unwrap();
let mut state = self.state.lock().await;
let mut cs: BTreeMap<_, _> = {
let cloned_matches = state
@ -167,27 +167,26 @@ impl FilterManager {
.cloned()
.collect::<Vec<_>>();
cloned_matches
.into_iter()
.map(|match_| {
// mutable State required here
if let Order::Flush = order {
state.remove_match(&match_);
}
let matches = state
.matches
.get(&match_)
.map(|times| times.len())
.unwrap_or(0);
(
match_,
PatternStatus {
matches,
..Default::default()
},
)
})
.collect()
let mut cs = BTreeMap::new();
for match_ in cloned_matches {
// mutable State required here
if let Order::Flush = order {
state.remove_match(&match_).await;
}
let matches = state
.matches
.get(&match_)
.map(|times| times.len())
.unwrap_or(0);
cs.insert(
match_,
PatternStatus {
matches,
..Default::default()
},
);
}
cs
};
let cloned_triggers = state
@ -203,7 +202,7 @@ impl FilterManager {
let map = state.triggers.get(&m).unwrap().clone();
if let Order::Flush = order {
state.remove_trigger(&m);
state.remove_trigger(&m).await;
}
for (t, remaining) in map {
@ -227,7 +226,7 @@ impl FilterManager {
self.shutdown.clone(),
action,
m.clone(),
);
).await;
}
}
}
@ -269,7 +268,7 @@ impl FilterManager {
if exec_time <= now {
if state.decrement_trigger(&m, t, false).await {
exec_now(&self.exec_limit, self.shutdown.clone(), action, m);
exec_now(&self.exec_limit, self.shutdown.clone(), action, m).await;
}
} else {
let this = self.clone();
@ -288,9 +287,9 @@ impl FilterManager {
// Exec action if triggered hasn't been already flushed
if !exiting || action.on_exit {
#[allow(clippy::unwrap_used)] // propagating panics is ok
let mut state = this.state.lock().unwrap();
let mut state = this.state.lock().await;
if state.decrement_trigger(&m, t, exiting).await {
exec_now(&this.exec_limit, this.shutdown, action, m);
exec_now(&this.exec_limit, this.shutdown, action, m).await;
}
}
});
@ -309,7 +308,7 @@ impl FilterManager {
.count() as u64;
#[allow(clippy::unwrap_used)] // propagating panics is ok
let mut state = self.state.lock().unwrap();
let mut state = self.state.lock().await;
let cloned_triggers = state
.triggers
@ -327,7 +326,7 @@ impl FilterManager {
.collect();
if map.is_empty() {
state.triggers.remove(&m);
state.triggers.remove(&m).await;
} else {
// Filter duplicates
// unwrap is fine because map is not empty (see if)
@ -339,10 +338,11 @@ impl FilterManager {
// No filtering
Duplicate::Rerun => map,
};
state.triggers.insert(m.clone(), map.clone());
state.triggers.insert(m.clone(), map.clone()).await;
for (t, _) in map {
// Schedule the upcoming times
self.schedule_exec(m.clone(), t, now, &mut state, true, None);
self.schedule_exec(m.clone(), t, now, &mut state, true, None)
.await;
}
}
}

View file

@ -89,7 +89,7 @@ impl State {
map
}
})
});
}).await;
}
}
let mut this = Self {
@ -99,15 +99,15 @@ impl State {
ordered_times,
triggers,
};
this.clear_past_matches(now);
this.load_matches_from_ordered_times();
this.clear_past_matches(now).await;
this.load_matches_from_ordered_times().await;
Ok(this)
}
pub async fn add_match(&mut self, m: Match, t: Time) {
let set = self.matches.entry(m.clone()).or_default();
set.insert(t);
self.ordered_times.insert(t, m);
self.ordered_times.insert(t, m).await;
}
pub async fn add_trigger(&mut self, m: Match, t: Time, action_count: Option<u64>) {
@ -124,7 +124,7 @@ impl State {
value
}
})
});
}).await;
}
}
@ -132,7 +132,7 @@ impl State {
pub async fn remove_match(&mut self, m: &Match) {
if let Some(set) = self.matches.get(m) {
for t in set {
self.ordered_times.remove(t);
self.ordered_times.remove(t).await;
}
self.matches.remove(m);
}
@ -167,7 +167,7 @@ impl State {
Some(map)
}
})
});
}).await;
}
// else don't do anything
// Because that will remove the entry in the DB, and make
@ -181,7 +181,7 @@ impl State {
map.insert(mt.t, count - 1);
map
})
});
}).await;
}
}
exec_needed
@ -203,7 +203,7 @@ impl State {
let (t, m) = self.ordered_times.first_key_value().unwrap();
(*t, m.clone())
};
self.ordered_times.remove(&t);
self.ordered_times.remove(&t).await;
if let Some(set) = self.matches.get(&m) {
let mut set = set.clone();
set.remove(&t);
@ -396,7 +396,7 @@ mod tests {
assert!(state.matches.is_empty());
// Add non-previously added match
state.add_match(one.clone(), now_less_1s);
state.add_match(one.clone(), now_less_1s).await;
assert_eq!(
state.ordered_times.tree(),
&BTreeMap::from([(now_less_1s, one.clone()),])
@ -407,7 +407,7 @@ mod tests {
);
// Add previously added match
state.add_match(one.clone(), now_less_4s);
state.add_match(one.clone(), now_less_4s).await;
assert_eq!(
state.ordered_times.tree(),
&BTreeMap::from([(now_less_1s, one.clone()), (now_less_4s, one.clone())])
@ -418,7 +418,7 @@ mod tests {
);
// Remove added match
state.remove_match(&one);
state.remove_match(&one).await;
assert!(state.ordered_times.tree().is_empty());
assert!(state.matches.is_empty());
}
@ -436,7 +436,7 @@ mod tests {
assert!(state.triggers.tree().is_empty());
// Add unique trigger
state.add_trigger(one.clone(), now, None);
state.add_trigger(one.clone(), now, None).await;
// Nothing is really added
assert!(state.triggers.tree().is_empty());
@ -490,7 +490,7 @@ mod tests {
assert!(state.triggers.tree().is_empty());
// Add unique trigger
state.add_trigger(one.clone(), now, None);
state.add_trigger(one.clone(), now, None).await;
assert_eq!(
state.triggers.tree(),
&BTreeMap::from([(one.clone(), [(now, 3)].into())])
@ -514,7 +514,7 @@ mod tests {
assert!(!state.decrement_trigger(&one, now, false).await);
// Add unique trigger (but decrement exiting-like)
state.add_trigger(one.clone(), now, None);
state.add_trigger(one.clone(), now, None).await;
assert_eq!(
state.triggers.tree(),
&BTreeMap::from([(one.clone(), [(now, 3)].into())])
@ -544,8 +544,8 @@ mod tests {
assert!(!state.decrement_trigger(&one, now, false).await);
// Add trigger with neighbour
state.add_trigger(one.clone(), now, None);
state.add_trigger(one.clone(), now_plus_1s, None);
state.add_trigger(one.clone(), now, None).await;
state.add_trigger(one.clone(), now_plus_1s, None).await;
assert_eq!(
state.triggers.tree(),
&BTreeMap::from([(one.clone(), [(now_plus_1s, 3), (now, 3)].into())])
@ -571,18 +571,18 @@ mod tests {
// Decrement → false
assert!(!state.decrement_trigger(&one, now, false).await);
// Remove neighbour
state.remove_trigger(&one);
state.remove_trigger(&one).await;
assert!(state.triggers.tree().is_empty());
// Add two neighbour triggers
state.add_trigger(one.clone(), now, None);
state.add_trigger(one.clone(), now_plus_1s, None);
state.add_trigger(one.clone(), now, None).await;
state.add_trigger(one.clone(), now_plus_1s, None).await;
assert_eq!(
state.triggers.tree(),
&BTreeMap::from([(one.clone(), [(now_plus_1s, 3), (now, 3)].into())])
);
// Remove them
state.remove_trigger(&one);
state.remove_trigger(&one).await;
assert!(state.triggers.tree().is_empty());
}
}

View file

@ -102,6 +102,7 @@ impl TestBed {
&mut db,
now,
)
.await
.unwrap(),
semaphore,
}
@ -119,8 +120,8 @@ pub struct TestBed2 {
}
impl TestBed2 {
pub fn assert_empty_trees(&self) {
let state = self.manager.state.lock().unwrap();
pub async fn assert_empty_trees(&self) {
let state = self.manager.state.lock().await;
assert!(state.matches.is_empty(), "matches must be empty");
assert!(
state.ordered_times.is_empty(),
@ -177,14 +178,17 @@ async fn three_matches_then_action_then_delayed_action() {
let now2s = bed.now + TimeDelta::seconds(2);
// No match
assert_eq!(bed.manager.handle_line("test 131", now), React::NoMatch);
bed.assert_empty_trees();
assert_eq!(
bed.manager.handle_line("test 131", now).await,
React::NoMatch
);
bed.assert_empty_trees().await;
// First match
let one = vec!["one".to_string()];
assert_eq!(bed.manager.handle_line("test one", now), React::Match);
assert_eq!(bed.manager.handle_line("test one", now).await, React::Match);
{
let state = bed.manager.state.lock().unwrap();
let state = bed.manager.state.lock().await;
assert_eq!(
state.matches,
BTreeMap::from([(one.clone(), BTreeSet::from([now]))]),
@ -199,9 +203,12 @@ async fn three_matches_then_action_then_delayed_action() {
}
// Second match
assert_eq!(bed.manager.handle_line("test one", now1s), React::Match);
assert_eq!(
bed.manager.handle_line("test one", now1s).await,
React::Match
);
{
let state = bed.manager.state.lock().unwrap();
let state = bed.manager.state.lock().await;
assert_eq!(
state.matches,
BTreeMap::from([(one.clone(), BTreeSet::from([now, now1s]))]),
@ -217,9 +224,12 @@ async fn three_matches_then_action_then_delayed_action() {
// Third match, exec
let _block = bed.semaphore.acquire().await.unwrap();
assert_eq!(bed.manager.handle_line("test one", now2s), React::Trigger);
assert_eq!(
bed.manager.handle_line("test one", now2s).await,
React::Trigger
);
{
let state = bed.manager.state.lock().unwrap();
let state = bed.manager.state.lock().await;
assert!(
state.matches.is_empty(),
"matches are emptied after trigger"
@ -241,7 +251,7 @@ async fn three_matches_then_action_then_delayed_action() {
tokio::time::sleep(Duration::from_millis(40)).await;
// Check first action
assert_eq!(
bed.manager.state.lock().unwrap().triggers.tree(),
bed.manager.state.lock().await.triggers.tree(),
&BTreeMap::from([(one.clone(), BTreeMap::from([(now2s, 1)]))]),
"triggers still contain the triggered match with 1 action left"
);
@ -255,7 +265,7 @@ async fn three_matches_then_action_then_delayed_action() {
tokio::time::sleep(Duration::from_millis(100)).await;
// Check second action
assert!(
bed.manager.state.lock().unwrap().triggers.is_empty(),
bed.manager.state.lock().await.triggers.is_empty(),
"triggers are empty again"
);
assert_eq!(
@ -264,7 +274,7 @@ async fn three_matches_then_action_then_delayed_action() {
"the output file contains the result of the 2 actions"
);
bed.assert_empty_trees();
bed.assert_empty_trees().await;
}
}
@ -296,12 +306,18 @@ async fn one_match_one_action() {
let now = bed.now;
// No match
assert_eq!(bed.manager.handle_line("test 131", now), React::NoMatch);
bed.assert_empty_trees();
assert_eq!(
bed.manager.handle_line("test 131", now).await,
React::NoMatch
);
bed.assert_empty_trees().await;
// match
assert_eq!(bed.manager.handle_line("test one", now), React::Trigger);
bed.assert_empty_trees();
assert_eq!(
bed.manager.handle_line("test one", now).await,
React::Trigger
);
bed.assert_empty_trees().await;
// the action executes
tokio::time::sleep(Duration::from_millis(40)).await;
@ -311,7 +327,7 @@ async fn one_match_one_action() {
"the output file contains the result of the first action"
);
bed.assert_empty_trees();
bed.assert_empty_trees().await;
}
}
@ -343,14 +359,20 @@ async fn one_match_one_delayed_action() {
let now = bed.now;
// No match
assert_eq!(bed.manager.handle_line("test 131", now), React::NoMatch);
bed.assert_empty_trees();
assert_eq!(
bed.manager.handle_line("test 131", now).await,
React::NoMatch
);
bed.assert_empty_trees().await;
// Match
let one = vec!["one".to_string()];
assert_eq!(bed.manager.handle_line("test one", now), React::Trigger);
assert_eq!(
bed.manager.handle_line("test one", now).await,
React::Trigger
);
{
let state = bed.manager.state.lock().unwrap();
let state = bed.manager.state.lock().await;
assert!(state.matches.is_empty(), "matches stay empty");
assert!(state.ordered_times.is_empty(), "ordered_times stay empty");
assert_eq!(
@ -368,7 +390,7 @@ async fn one_match_one_delayed_action() {
// The action executes
tokio::time::sleep(Duration::from_millis(140)).await;
assert!(
bed.manager.state.lock().unwrap().triggers.is_empty(),
bed.manager.state.lock().await.triggers.is_empty(),
"triggers are empty again"
);
assert_eq!(
@ -377,7 +399,7 @@ async fn one_match_one_delayed_action() {
"the output file contains the result of the action"
);
bed.assert_empty_trees();
bed.assert_empty_trees().await;
}
}
@ -421,7 +443,7 @@ async fn one_db_match_one_runtime_match_one_action() {
let bed = bed.part2(filter, now, Some(db)).await;
{
let state = bed.manager.state.lock().unwrap();
let state = bed.manager.state.lock().await;
assert_eq!(
state.matches,
BTreeMap::from([(one.clone(), BTreeSet::from([now1s]))]),
@ -436,8 +458,11 @@ async fn one_db_match_one_runtime_match_one_action() {
}
// match
assert_eq!(bed.manager.handle_line("test one", now), React::Trigger);
bed.assert_empty_trees();
assert_eq!(
bed.manager.handle_line("test one", now).await,
React::Trigger
);
bed.assert_empty_trees().await;
// the action executes
tokio::time::sleep(Duration::from_millis(40)).await;
assert_eq!(
@ -486,7 +511,7 @@ async fn one_outdated_db_match() {
// Finish setup
let bed = bed.part2(filter, now, Some(db)).await;
bed.assert_empty_trees();
bed.assert_empty_trees().await;
}
}
@ -545,6 +570,7 @@ async fn trigger_unmatched_pattern() {
.collect(),
now,
)
.await
.unwrap();
// the action executes
@ -552,7 +578,7 @@ async fn trigger_unmatched_pattern() {
// No matches, one action registered
{
let state = bed.manager.state.lock().unwrap();
let state = bed.manager.state.lock().await;
assert!(state.matches.is_empty());
assert!(state.ordered_times.is_empty());
assert_eq!(
@ -625,6 +651,7 @@ async fn trigger_matched_pattern() {
.collect(),
now,
)
.await
.unwrap();
// the action executes
@ -632,7 +659,7 @@ async fn trigger_matched_pattern() {
// No matches, one action registered
{
let state = bed.manager.state.lock().unwrap();
let state = bed.manager.state.lock().await;
assert!(state.matches.is_empty());
assert!(state.ordered_times.is_empty());
assert_eq!(
@ -707,7 +734,7 @@ async fn trigger_deduplication_on_start() {
// No matches, one or two action·s registered
{
let state = bed.manager.state.lock().unwrap();
let state = bed.manager.state.lock().await;
assert!(state.matches.is_empty());
assert!(state.ordered_times.is_empty());
assert_eq!(
@ -773,12 +800,12 @@ async fn multiple_triggers() {
let bed = bed.part2(filter, Local::now(), None).await;
assert_eq!(
bed.manager.handle_line("test one", Local::now()),
bed.manager.handle_line("test one", Local::now()).await,
React::Match,
"Duplicate: {dup:?}"
);
assert_eq!(
bed.manager.handle_line("test one", Local::now()),
bed.manager.handle_line("test one", Local::now()).await,
React::Trigger,
"Duplicate: {dup:?}"
);
@ -795,7 +822,7 @@ async fn multiple_triggers() {
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(
bed.manager.handle_line("test one", Local::now()),
bed.manager.handle_line("test one", Local::now()).await,
match dup {
Duplicate::Ignore => React::Match,
_ => React::Match,
@ -804,7 +831,7 @@ async fn multiple_triggers() {
);
assert_eq!(
bed.manager.handle_line("test one", Local::now()),
bed.manager.handle_line("test one", Local::now()).await,
match dup {
Duplicate::Ignore => React::Match,
_ => React::Trigger,
@ -938,7 +965,7 @@ async fn extend_trigger_multiple_after_actions() {
let bed = bed.part2(filter, Local::now(), None).await;
assert_eq!(
bed.manager.handle_line("test one", Local::now()),
bed.manager.handle_line("test one", Local::now()).await,
React::Trigger,
);
@ -952,7 +979,7 @@ async fn extend_trigger_multiple_after_actions() {
);
assert_eq!(
bed.manager.handle_line("test one", Local::now()),
bed.manager.handle_line("test one", Local::now()).await,
React::Trigger,
);
@ -1019,7 +1046,7 @@ async fn ip_specific() {
let bed = bed.part2(filter, Local::now(), None).await;
assert_eq!(
bed.manager.handle_line("test 1.2.3.4", Local::now()),
bed.manager.handle_line("test 1.2.3.4", Local::now()).await,
React::Trigger,
);
@ -1035,7 +1062,8 @@ async fn ip_specific() {
assert_eq!(
bed.manager
.handle_line("test 1:2:3:4:5:6:7:8", Local::now()),
.handle_line("test 1:2:3:4:5:6:7:8", Local::now())
.await,
React::Trigger,
);