Tests passing but no error has been found

Tests are finally passing but nothing has been fixed on real code, so bug is not found
This commit is contained in:
ppom 2024-11-18 12:00:00 +01:00
commit 7c3116b7c9
4 changed files with 164 additions and 47 deletions

View file

@ -53,6 +53,16 @@ impl Filter {
}
}
#[cfg(test)]
pub fn from_name_and_patterns(stream_name: &str, filter_name: &str, patterns: Vec<Pattern>) -> Filter {
Filter {
stream_name: stream_name.into(),
name: filter_name.into(),
patterns: Arc::new(patterns.into_iter().map(|p| Arc::new(p)).collect()),
..Filter::default()
}
}
pub fn name(&self) -> &str {
&self.name
}

View file

@ -4,6 +4,7 @@ use regex::Regex;
use serde::Deserialize;
#[derive(Clone, Debug, Deserialize)]
#[cfg_attr(test, derive(Default))]
#[serde(deny_unknown_fields)]
pub struct Pattern {
pub regex: String,
@ -23,6 +24,13 @@ pub struct Pattern {
}
impl Pattern {
#[cfg(test)]
pub fn from_name(name: &str) -> Pattern {
Pattern {
name: name.into(),
..Pattern::default()
}
}
pub fn setup(&mut self, name: &str) -> Result<(), String> {
self._setup(name)
.map_err(|msg| format!("pattern {}: {}", name, msg))

View file

@ -174,7 +174,8 @@ fn rotate_db(
None => {
warn!(
"Strange! Found a {:?} but no {:?}, opening /dev/null instead",
&config.path_of(LogDbName), &config.path_of(FlushDbName)
&config.path_of(LogDbName),
&config.path_of(FlushDbName)
);
match ReadDB::open(&PathBuf::from("/dev/null"), config)? {
Some(db) => db,
@ -243,6 +244,7 @@ fn _rotate_db(
Ok(mut entry) => {
// Check if number of patterns is in sync
if entry.m.len() != entry.f.patterns().len() {
debug!("DB ignoring entry: nb of patterns in filter not in sync with nb of matches: {:?}", entry);
continue;
}
@ -274,15 +276,12 @@ fn _rotate_db(
}
if let Some(matches_tx) = &matches_tx {
if let Some(tx) = matches_tx.get(entry.f) {
debug!("DB sending match from DB: {:?}", entry.m);
#[allow(clippy::unwrap_used)] // propagating panics is ok
tx.blocking_send((entry.m.clone(), entry.t)).unwrap();
} else {
dbg!("no tx for ", entry.f);
}
} else {
dbg!("no matches_tx");
let tx = matches_tx
.get(entry.f)
.expect("each filter should have an associated channel Sender");
debug!("DB sending match from DB: {:?}", entry.m);
#[allow(clippy::unwrap_used)] // propagating panics is ok
tx.blocking_send((entry.m.clone(), entry.t)).unwrap();
}
write_or_die!(log_write_db, entry);

View file

@ -7,13 +7,15 @@ use chrono::{Local, TimeDelta};
use rand::{
distributions::{Alphanumeric, DistString, Uniform},
prelude::Distribution as _,
rngs::ThreadRng,
};
use tempfile::TempDir;
use tokio::sync::mpsc;
use tracing::Level;
use super::{ReadDB, WriteDB};
use crate::{
concepts::{Config, Filter, Stream},
concepts::{Config, Filter, Pattern, Stream},
daemon::database::{rotate_db, DatabaseNames, LogEntry},
tests::Fixture,
};
@ -144,28 +146,91 @@ fn write_and_read_db() {
assert!(read_entry.is_none());
}
#[test]
fn write_and_read_db_big() {
let mut rng = rand::thread_rng();
let streams = (0..10)
.map(|_| Alphanumeric.sample_string(&mut rng, 10))
fn generate_random_streams(
rng: &mut ThreadRng,
nb_streams: usize,
nb_filters_per_stream: usize,
) -> BTreeMap<String, Stream> {
let mut rng2 = rand::thread_rng();
let nb_patterns = Uniform::new(0, 4);
(0..nb_streams)
.map(|_| Alphanumeric.sample_string(rng, 10))
.collect::<Vec<_>>()
.into_iter()
.map(|sname| {
(
sname.clone(),
Stream::from_filters(
(0..10)
.map(|_| Alphanumeric.sample_string(&mut rng, 10))
.map(|fname| (fname.clone(), Filter::from_name(&sname, &fname)))
(0..nb_filters_per_stream)
.map(|_| Alphanumeric.sample_string(rng, 10))
.map(|fname| {
(
fname.clone(),
Filter::from_name_and_patterns(
&sname,
&fname,
(0..nb_patterns.sample(&mut rng2))
.map(|_| Pattern::default())
.collect(),
),
)
})
.collect(),
&sname,
),
)
})
.collect();
.collect()
}
fn generate_random_entries(
rng: &mut ThreadRng,
config: &'static Config,
nb_entries: usize,
_nb_streams: usize,
_nb_filters_per_stream: usize,
) -> Vec<LogEntry> {
// let rand_strings = Uniform::new(0, 5);
// let rand_streams = Uniform::new(0, nb_streams);
// let rand_filters = Uniform::new(0, nb_filters_per_stream);
(0..nb_entries)
.map(|i| {
let f = config
.streams()
.iter()
// .take(rand_streams.sample(rng))
.take(1)
.last()
.unwrap()
.1
.filters()
.iter()
// .take(rand_filters.sample(rng))
.take(1)
.last()
.unwrap()
.1;
LogEntry {
// Random match of n Strings of size 10
m: (0..f.patterns().len())
.map(|_| Alphanumeric.sample_string(rng, 10))
.collect(),
// Random filter in config
f,
// Now + incremented microsecond (avoid duplication)
t: Local::now() + TimeDelta::microseconds(i as i64),
exec: false,
}
})
.collect()
}
#[test]
fn write_and_read_db_big() {
let mut rng = rand::thread_rng();
let streams = generate_random_streams(&mut rng, 10, 10);
let dir = TempDir::new().unwrap();
let config = Config::from_streams(
streams,
@ -173,37 +238,65 @@ fn write_and_read_db_big() {
);
let config = Box::leak(Box::new(config));
let u0t5 = Uniform::new(0, 5);
// let u0t10 = Uniform::new(0, 9);
let entries = generate_random_entries(&mut rng, config, 300, 10, 10);
// 300 random entries
let entries: Vec<_> = (0..300)
.map(|i| LogEntry {
// Random match of 5 Strings of size 10
m: (0..u0t5.sample(&mut rng))
.map(|_| Alphanumeric.sample_string(&mut rng, 10))
.collect(),
// Random filter in config
f: config
.streams()
.iter()
// .take(u0t10.sample(&mut rng))
.take(1)
.last()
.unwrap()
.1
.filters()
.iter()
// .take(u0t10.sample(&mut rng))
.take(1)
.last()
.unwrap()
.1,
// Now + incremented microsecond (avoid duplication)
t: Local::now() + TimeDelta::microseconds(i),
exec: false,
})
.collect();
let mut write_db = WriteDB::create(&config.path_of(DatabaseNames::LogDbName), config);
for i in 0..entries.len() {
assert!(
write_db.write(entries[i].clone()).is_ok(),
"could not write entry n°{i}"
);
}
drop(write_db);
let read_db = ReadDB::open(&config.path_of(DatabaseNames::LogDbName), config);
assert!(read_db.is_ok());
let read_db = read_db.unwrap();
assert!(read_db.is_some());
let mut read_db = read_db.unwrap();
for i in 0..entries.len() {
let read_entry = read_db.next();
assert!(read_entry.is_some(), "entry n°{i} is none",);
let read_entry = read_entry.unwrap();
assert!(
read_entry.is_ok(),
"entry n°{i} is err: {}",
read_entry.err().unwrap()
);
let read_entry = read_entry.unwrap();
let entry = &entries[i];
assert_eq!(entry.m, read_entry.m, "entry n°{i}'s match is incorrect");
assert_eq!(
entry.t.timestamp(),
read_entry.t.timestamp(),
"entry n°{i}'s t is incorrect",
);
}
let read_entry = read_db.next();
assert!(
read_entry.is_none(),
"entry left at end of db: {:?}",
read_entry
);
}
#[test]
fn rotate_db_big() {
let mut rng = rand::thread_rng();
let streams = generate_random_streams(&mut rng, 10, 10);
let dir = TempDir::new().unwrap();
let config = Config::from_streams(
streams,
&dir.into_path().into_os_string().into_string().unwrap(),
);
let config = Box::leak(Box::new(config));
let entries = generate_random_entries(&mut rng, config, 300, 10, 10);
let mut write_db = WriteDB::create(&config.path_of(DatabaseNames::LogDbName), config);
{
@ -232,6 +325,13 @@ fn write_and_read_db_big() {
// We clone the senders so that the channels are not closed after database rotation
let _log2filter_tx = log2filter_tx.clone();
tracing_subscriber::fmt::fmt()
.without_time()
.with_target(false)
.with_max_level(Level::TRACE)
.try_init()
.unwrap();
let rotated = rotate_db(config, Some(log2filter_tx));
assert!(
rotated.is_ok(),