Added daemon tests; parse_duration now supports milliseconds

TODO test persistance handling on FilterManager
This commit is contained in:
ppom 2025-02-25 12:00:00 +01:00
commit b448089f58
12 changed files with 587 additions and 34 deletions

View file

@ -8,7 +8,7 @@ use tokio::process::Command;
use super::parse_duration;
use super::{Match, Pattern};
#[derive(Clone, Debug, Deserialize)]
#[derive(Clone, Debug, Default, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Action {
cmd: Vec<String>,
@ -149,6 +149,40 @@ impl Display for Action {
}
}
#[cfg(test)]
impl Action {
/// Test-only constructor designed to be easy to call
pub fn new(
cmd: Vec<&str>,
after: Option<&str>,
on_exit: bool,
stream_name: &str,
filter_name: &str,
name: &str,
config_patterns: &super::Patterns,
) -> Self {
let mut action = Self {
cmd: cmd.into_iter().map(|s| s.into()).collect(),
after: after.map(|s| s.into()),
on_exit,
..Default::default()
};
action
.setup(
stream_name,
filter_name,
name,
config_patterns
.clone()
.into_values()
.collect::<BTreeSet<_>>()
.into(),
)
.unwrap();
action
}
}
#[allow(clippy::unwrap_used)]
#[cfg(test)]
pub mod tests {

View file

@ -232,6 +232,50 @@ impl Hash for Filter {
}
}
#[cfg(test)]
impl Filter {
/// Test-only constructor designed to be easy to call
pub fn new(
actions: Vec<Action>,
regex: Vec<&str>,
retry: Option<u32>,
retry_period: Option<&str>,
stream_name: &str,
name: &str,
config_patterns: &Patterns,
) -> Self {
let mut filter = Self {
actions: actions.into_iter().map(|a| (a.name().into(), a)).collect(),
regex: regex.into_iter().map(|s| s.into()).collect(),
retry,
retry_period: retry_period.map(|s| s.into()),
..Default::default()
};
filter.setup(&stream_name, &name, config_patterns).unwrap();
filter
}
pub fn new_static(
actions: Vec<Action>,
regex: Vec<&str>,
retry: Option<u32>,
retry_period: Option<&str>,
stream_name: &str,
name: &str,
config_patterns: &Patterns,
) -> &'static Self {
Box::leak(Box::new(Self::new(
actions,
regex,
retry,
retry_period,
stream_name,
name,
config_patterns,
)))
}
}
#[allow(clippy::unwrap_used)]
#[cfg(test)]
pub mod tests {

View file

@ -16,3 +16,6 @@ use chrono::{DateTime, Local};
pub type Time = DateTime<Local>;
pub type Match = Vec<String>;
#[cfg(test)]
pub use filter::tests as filter_tests;

View file

@ -12,15 +12,14 @@ pub fn parse_duration(d: &str) -> Result<TimeDelta, String> {
if i == 0 {
return Err(format!("duration '{}' doesn't start with digits", d));
}
let ok_secs = |mul: u32| -> Result<TimeDelta, String> {
Ok(TimeDelta::seconds(mul as i64 * value as i64))
};
let ok_as = |func: fn(i64) -> TimeDelta| -> Result<_, String> { Ok(func(value as i64)) };
match d_trimmed[i..].trim() {
"s" | "sec" | "secs" | "second" | "seconds" => ok_secs(1),
"m" | "min" | "mins" | "minute" | "minutes" => ok_secs(60),
"h" | "hour" | "hours" => ok_secs(60 * 60),
"d" | "day" | "days" => ok_secs(24 * 60 * 60),
"ms" | "millis" | "millisecond" | "milliseconds" => ok_as(TimeDelta::milliseconds),
"s" | "sec" | "secs" | "second" | "seconds" => ok_as(TimeDelta::seconds),
"m" | "min" | "mins" | "minute" | "minutes" => ok_as(TimeDelta::minutes),
"h" | "hour" | "hours" => ok_as(TimeDelta::hours),
"d" | "day" | "days" => ok_as(TimeDelta::days),
unit => Err(format!(
"unit {} not recognised. must be one of s/sec/seconds, m/min/minutes, h/hours, d/days",
unit

View file

@ -116,6 +116,25 @@ impl PartialEq for Pattern {
}
}
#[cfg(test)]
impl Pattern {
/// Test-only constructor designed to be easy to call
pub fn new(name: &str, regex: &str) -> Result<Self, String> {
let mut pattern = Self {
regex: regex.into(),
..Default::default()
};
pattern.setup(name)?;
Ok(pattern)
}
/// Test-only constructor designed to be easy to call.
/// Constructs a full super::Paterns collection with one given pattern
pub fn new_map(name: &str, regex: &str) -> Result<super::Patterns, String> {
Ok(std::iter::once((name.into(), Self::new(name, regex)?.into())).collect())
}
}
#[allow(clippy::unwrap_used)]
#[cfg(test)]
pub mod tests {

View file

@ -64,15 +64,6 @@ impl Stream {
Ok(())
}
#[cfg(test)]
fn from_filters(filters: BTreeMap<String, Filter>, name: &str) -> Self {
Self {
filters,
name: name.to_string(),
..Default::default()
}
}
}
impl PartialEq for Stream {

View file

@ -4,7 +4,6 @@ use std::{
sync::Arc,
};
use chrono::Local;
use regex::Regex;
use tokio::sync::Semaphore;
use tracing::{error, info};
@ -42,6 +41,7 @@ impl FilterManager {
exec_limit: Option<Arc<Semaphore>>,
shutdown: ShutdownToken,
db: &sled::Db,
now: Time,
) -> Result<Self, sled::Error> {
let manager = Self {
filter,
@ -52,20 +52,21 @@ impl FilterManager {
ordered_times: db.open_filter_ordered_times_tree(filter)?,
triggers: db.open_filter_triggers_tree(filter)?,
};
let now = Local::now();
manager.clear_past_matches(now);
manager.clear_past_triggers_and_schedule_future_actions(now);
Ok(manager)
}
pub fn handle_line(&self, line: &str) {
pub fn handle_line(&self, line: &str, now: Time) -> bool {
if let Some(match_) = self.filter.get_match(line) {
self.handle_match(match_);
self.handle_match(match_, now);
true
} else {
false
}
}
fn handle_match(&self, m: Match) {
let now = Local::now();
fn handle_match(&self, m: Match, now: Time) {
self.clear_past_matches(now);
let exec = match self.filter.retry() {
@ -80,7 +81,7 @@ impl FilterManager {
if exec {
self.remove_match(&m);
self.add_trigger(&m, now);
self.schedule_exec(m.clone(), now);
self.schedule_exec(m.clone(), now, now);
}
}
@ -88,6 +89,7 @@ impl FilterManager {
&self,
patterns: &BTreeMap<Arc<Pattern>, Regex>,
order: Order,
now: Time,
) -> BTreeMap<String, PatternStatus> {
let is_match = |match_: &Match| {
match_
@ -118,7 +120,6 @@ impl FilterManager {
})
.collect();
let now = Local::now();
for (match_, times) in self
.triggers
.iter()
@ -158,13 +159,12 @@ impl FilterManager {
/// Schedule execution for a given Action and Match.
/// We check first if the trigger is still here
/// because pending actions can be flushed.
fn schedule_exec(&self, m: Match, t: Time) {
let now = Local::now();
fn schedule_exec(&self, m: Match, t: Time, now: Time) {
for action in self.filter.actions().values() {
let exec_time = t + action.after_duration().unwrap_or_default();
let m = m.clone();
if exec_time < now {
if exec_time <= now {
if self.decrement_trigger(&m, t) {
self.exec_now(action, m);
}
@ -310,7 +310,7 @@ impl FilterManager {
// Schedule the upcoming times
for t in new_map.into_keys() {
self.schedule_exec(m.clone(), t);
self.schedule_exec(m.clone(), t, now);
}
}
}
@ -342,3 +342,358 @@ impl FilterManager {
});
}
}
#[allow(clippy::unwrap_used)]
#[cfg(test)]
mod tests {
use std::{
collections::{BTreeMap, BTreeSet},
fs::read_to_string,
sync::Arc,
time::Duration,
};
use chrono::{Local, TimeDelta};
use tempfile::TempPath;
use tokio::sync::Semaphore;
use super::{FilterManager, SledDbExt};
use crate::{
concepts::{Action, Filter, Match, Pattern, Patterns, Time},
daemon::{shutdown::ShutdownController, Tree},
};
struct TestBed {
pub _out_path: TempPath,
pub out_file: String,
pub az_patterns: Patterns,
}
impl TestBed {
fn new() -> Self {
let _out_path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let out_file = _out_path.to_str().unwrap().to_string();
let az_patterns = Pattern::new_map("az", "[a-z]+").unwrap();
Self {
_out_path,
out_file,
az_patterns,
}
}
fn part2(self, filter: &'static Filter, now: Time) -> TestBed2 {
let db = crate::tests::TempDb::new();
let controller = ShutdownController::new();
let semaphore = Arc::new(Semaphore::new(1));
TestBed2 {
_out_path: self._out_path,
out_file: self.out_file,
now,
matches: db.open_filter_matches_tree(filter).unwrap(),
ordered_times: db.open_filter_ordered_times_tree(filter).unwrap(),
triggers: db.open_filter_triggers_tree(filter).unwrap(),
manager: FilterManager::new(
&filter,
Some(semaphore.clone()),
controller.token(),
&db,
now,
)
.unwrap(),
// db,
// controller,
semaphore,
}
}
}
struct TestBed2 {
pub _out_path: TempPath,
pub out_file: String,
// pub db: TempDb,
// pub controller: ShutdownController,
pub semaphore: Arc<Semaphore>,
pub now: Time,
pub matches: Tree<Match, BTreeSet<Time>>,
pub ordered_times: Tree<Time, Match>,
pub triggers: Tree<Match, BTreeMap<Time, usize>>,
pub manager: FilterManager,
}
impl TestBed2 {
fn assert_empty_trees(&self) {
assert!(
self.matches.iter().next().is_none(),
"matches must be empty"
);
assert!(
self.ordered_times.iter().next().is_none(),
"ordered_times must be empty"
);
assert!(
self.triggers.iter().next().is_none(),
"triggers must be empty"
);
}
}
#[tokio::test]
async fn three_matches_then_action_then_delayed_action() {
let bed = TestBed::new();
let filter = Filter::new_static(
vec![
Action::new(
vec!["sh", "-c", &format!("echo a1 <az> >> {}", &bed.out_file)],
None,
false,
"test",
"test",
"a1",
&bed.az_patterns,
),
Action::new(
vec!["sh", "-c", &format!("echo a2 <az> >> {}", &bed.out_file)],
Some("100ms"),
false,
"test",
"test",
"a2",
&bed.az_patterns,
),
],
vec!["test <az>"],
Some(3),
Some("2s"),
"test",
"test",
&bed.az_patterns,
);
let bed = bed.part2(filter, Local::now());
let now = bed.now;
let now1s = bed.now + TimeDelta::seconds(1);
let now2s = bed.now + TimeDelta::seconds(2);
// No match
assert!(!bed.manager.handle_line("test 131", now));
bed.assert_empty_trees();
// First match
assert!(bed.manager.handle_line("test one", now));
let one = vec!["one".to_string()];
assert_eq!(
bed.matches.as_map(),
BTreeMap::from([(one.clone(), BTreeSet::from([now]))]),
"the match has been added to matches"
);
assert_eq!(
bed.ordered_times.as_map(),
BTreeMap::from([(now, one.clone())]),
"the match has been added to ordered_times"
);
assert!(
bed.triggers.iter().next().is_none(),
"triggers is still empty"
);
// Second match
assert!(bed.manager.handle_line("test one", now1s));
assert_eq!(
bed.matches.as_map(),
BTreeMap::from([(one.clone(), BTreeSet::from([now, now1s]))]),
"a second match is present in matches"
);
assert_eq!(
bed.ordered_times.as_map(),
BTreeMap::from([(now, one.clone()), (now1s, one.clone())]),
"a second match is present in ordered_times"
);
assert!(
bed.triggers.iter().next().is_none(),
"triggers is still empty"
);
// Third match, exec
let _block = bed.semaphore.acquire().await.unwrap();
bed.manager.handle_line("test one", now2s);
assert!(
bed.matches.iter().next().is_none(),
"matches are emptied after trigger"
);
assert!(
bed.ordered_times.iter().next().is_none(),
"ordered_times are emptied after trigger"
);
assert_eq!(
bed.triggers.as_map(),
BTreeMap::from([(one.clone(), BTreeMap::from([(now2s, 1)]))]),
"triggers now contain the triggered match with 1 action left" // 1 and not 2 because the decrement_trigger() doesn't wait for the semaphore
);
drop(_block);
// Now the first action executes
tokio::time::sleep(Duration::from_millis(40)).await;
// Check first action
assert_eq!(
bed.triggers.as_map(),
BTreeMap::from([(one.clone(), BTreeMap::from([(now2s, 1)]))]),
"triggers still contain the triggered match with 1 action left"
);
assert_eq!(
"a1 one\n",
&read_to_string(&bed.out_file).unwrap(),
"the output file contains the result of the first action"
);
// Now the second action executes
tokio::time::sleep(Duration::from_millis(100)).await;
// Check second action
assert!(
bed.triggers.iter().next().is_none(),
"triggers are empty again"
);
assert_eq!(
"a1 one\na2 one\n",
&read_to_string(&bed.out_file).unwrap(),
"the output file contains the result of the 2 actions"
);
bed.assert_empty_trees();
}
#[tokio::test]
async fn one_match_one_action() {
let bed = TestBed::new();
let filter = Filter::new_static(
vec![Action::new(
vec!["sh", "-c", &format!("echo a1 <az> >> {}", &bed.out_file)],
None,
false,
"test",
"test",
"a1",
&bed.az_patterns,
)],
vec!["test <az>"],
None,
None,
"test",
"test",
&bed.az_patterns,
);
let bed = bed.part2(filter, Local::now());
let now = bed.now;
// No match
assert!(!bed.manager.handle_line("test 131", now));
assert!(
bed.matches.iter().next().is_none(),
"matches must be initially empty"
);
assert!(
bed.ordered_times.iter().next().is_none(),
"ordered_times must be initially empty"
);
assert!(
bed.triggers.iter().next().is_none(),
"triggers must be initially empty"
);
// match
assert!(bed.manager.handle_line("test one", now));
assert!(bed.matches.iter().next().is_none(), "matches stay empty");
assert!(
bed.ordered_times.iter().next().is_none(),
"ordered_times stay empty"
);
assert!(bed.triggers.iter().next().is_none(), "triggers stay empty");
// the action executes
tokio::time::sleep(Duration::from_millis(40)).await;
assert_eq!(
"a1 one\n",
&read_to_string(&bed.out_file).unwrap(),
"the output file contains the result of the first action"
);
bed.assert_empty_trees();
}
#[tokio::test]
async fn one_match_one_delayed_action() {
let bed = TestBed::new();
let filter = Filter::new_static(
vec![Action::new(
vec!["sh", "-c", &format!("echo a1 <az> >> {}", &bed.out_file)],
Some("100ms"),
false,
"test",
"test",
"a1",
&bed.az_patterns,
)],
vec!["test <az>"],
None,
None,
"test",
"test",
&bed.az_patterns,
);
let bed = bed.part2(filter, Local::now());
let now = bed.now;
// No match
assert!(!bed.manager.handle_line("test 131", now));
assert!(
bed.matches.iter().next().is_none(),
"matches must be initially empty"
);
assert!(
bed.ordered_times.iter().next().is_none(),
"ordered_times must be initially empty"
);
assert!(
bed.triggers.iter().next().is_none(),
"triggers must be initially empty"
);
// Match
let one = vec!["one".to_string()];
assert!(bed.manager.handle_line("test one", now));
assert!(bed.matches.iter().next().is_none(), "matches stay empty");
assert!(
bed.ordered_times.iter().next().is_none(),
"ordered_times stay empty"
);
assert_eq!(
bed.triggers.as_map(),
BTreeMap::from([(one.clone(), BTreeMap::from([(now, 1)]))]),
"triggers still contain the triggered match with 1 action left"
);
assert_eq!(
"",
&read_to_string(&bed.out_file).unwrap(),
"the output file is empty"
);
// The action executes
tokio::time::sleep(Duration::from_millis(140)).await;
assert!(
bed.triggers.iter().next().is_none(),
"triggers are empty again"
);
assert_eq!(
"a1 one\n",
&read_to_string(&bed.out_file).unwrap(),
"the output file contains the result of the action"
);
bed.assert_empty_trees();
}
// TODO test persistance, ie. FilterManagers created with non-empty db
}

View file

@ -8,6 +8,7 @@ use std::{
},
};
use chrono::Local;
use tokio::{
select,
signal::unix::{signal, SignalKind},
@ -61,11 +62,13 @@ pub async fn daemon(
db.cleanup_unused_trees(config);
// Filter managers
let now = Local::now();
let mut state = HashMap::new();
for stream in config.streams().values() {
let mut filter_managers = HashMap::new();
for filter in stream.filters().values() {
let manager = FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &db)?;
let manager =
FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &db, now)?;
filter_managers.insert(filter, manager);
}
state.insert(stream, filter_managers.clone());

View file

@ -94,14 +94,14 @@ impl SledDbExt for sled::Db {
/// business logic.
/// Key and value types must be [`serde::Serialize`] and [`serde::Deserialize`].
#[derive(Clone)]
pub struct Tree<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned> {
pub struct Tree<K: Serialize + DeserializeOwned + Ord, V: Serialize + DeserializeOwned> {
tree: sled::Tree,
_k_marker: PhantomData<K>,
_v_marker: PhantomData<V>,
}
#[allow(clippy::unwrap_used)]
impl<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned> Tree<K, V> {
impl<K: Serialize + DeserializeOwned + Ord, V: Serialize + DeserializeOwned> Tree<K, V> {
fn new(tree: sled::Tree) -> Self {
Self {
tree,
@ -182,4 +182,84 @@ impl<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned> Tree<K, V
(k, v)
})
}
#[cfg(test)]
pub fn as_map(&self) -> BTreeMap<K, V> {
self.iter().collect()
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use chrono::{Local, TimeDelta};
use super::SledDbExt;
use crate::{concepts::filter_tests::ok_filter, tests::TempDb};
#[test]
fn tree_crud() {
let filter = ok_filter();
let db = TempDb::new();
let triggers = db.open_filter_triggers_tree(&filter).unwrap();
assert_eq!(BTreeMap::default(), triggers.as_map());
let now = Local::now();
let then = now + TimeDelta::seconds(2);
let k1 = vec!["a".into()];
let k2 = vec!["a".into(), "b".into()];
let v1 = BTreeMap::from([(now, 4)]);
let v2 = BTreeMap::from([(then, 2)]);
let map_1 = BTreeMap::from([(k1.clone(), v1.clone())]);
let map_2 = BTreeMap::from([(k2.clone(), v2.clone())]);
let map_1_2 = BTreeMap::from([(k1.clone(), v1.clone()), (k2.clone(), v2.clone())]);
triggers.insert(&k1, &v1).unwrap();
assert_eq!(triggers.as_map(), map_1);
assert_eq!(triggers.get(&k1).unwrap(), Some(v1.clone()));
assert_eq!(triggers.get(&k2).unwrap(), None);
triggers.insert(&k2, &v2).unwrap();
assert_eq!(triggers.as_map(), map_1_2);
assert_eq!(triggers.get(&k1).unwrap(), Some(v1.clone()));
assert_eq!(triggers.get(&k2).unwrap(), Some(v2.clone()));
assert_eq!(triggers.remove(&k1), Some(v1.clone()));
assert_eq!(triggers.as_map(), map_2);
assert_eq!(triggers.get(&k1).unwrap(), None);
assert_eq!(triggers.get(&k2).unwrap(), Some(v2.clone()));
// Add back
triggers
.fetch_and_update(&k1, |map| {
let mut map = map.unwrap_or_default();
map.insert(now, 4);
Some(map)
})
.unwrap();
assert_eq!(triggers.as_map(), map_1_2);
assert_eq!(triggers.get(&k1).unwrap(), Some(v1.clone()));
assert_eq!(triggers.get(&k2).unwrap(), Some(v2.clone()));
// Remove
triggers
.fetch_and_update(&k1, |map| match map {
Some(_) => None,
None => Some(v1.clone()),
})
.unwrap();
assert_eq!(triggers.as_map(), map_2);
assert_eq!(triggers.get(&k1).unwrap(), None);
assert_eq!(triggers.get(&k2).unwrap(), Some(v2.clone()));
// Remove
triggers.fetch_and_update(&k2, |_| None).unwrap();
assert_eq!(triggers.as_map(), BTreeMap::default());
assert_eq!(triggers.get(&k1).unwrap(), None);
assert_eq!(triggers.get(&k2).unwrap(), None);
}
}

View file

@ -6,6 +6,7 @@ use std::{
sync::Arc,
};
use chrono::Local;
use futures::{SinkExt, StreamExt};
use regex::Regex;
use tokio::net::UnixListener;
@ -87,6 +88,7 @@ fn answer_order(
})
.collect::<Result<BTreeMap<Arc<Pattern>, Regex>, String>>()?;
let now = Local::now();
let cs: ClientStatus = shared_state
.iter()
// stream filtering
@ -115,7 +117,7 @@ fn answer_order(
.map(|(filter, manager)| {
(
filter.name().to_owned(),
manager.handle_order(&patterns, options.order),
manager.handle_order(&patterns, options.order, now),
)
})
.collect();

View file

@ -1,5 +1,6 @@
use std::{collections::HashMap, process::Stdio, task::Poll, time::Duration};
use chrono::Local;
use futures::{FutureExt, StreamExt};
use tokio::{
io::{AsyncBufReadExt, BufReader, Lines},
@ -125,8 +126,9 @@ async fn handle_io(
loop {
match lines.next().await {
Some(Ok(line)) => {
let now = Local::now();
for manager in filter_managers.values() {
manager.handle_line(&line);
manager.handle_line(&line, now);
}
}
Some(Err(err)) => {

View file

@ -55,3 +55,24 @@ impl Deref for Fixture {
self.path.deref()
}
}
pub struct TempDb {
db: sled::Db,
_tempdir: TempDir,
}
impl TempDb {
pub fn new() -> Self {
let _tempdir = TempDir::new().unwrap();
let db = sled::open(_tempdir.path()).unwrap();
TempDb { _tempdir, db }
}
}
impl Deref for TempDb {
type Target = sled::Db;
fn deref(&self) -> &Self::Target {
&self.db
}
}