From 8579e308900ccd8bd3d598321a7897ed342f9145 Mon Sep 17 00:00:00 2001 From: ppom Date: Mon, 11 Nov 2024 12:00:00 +0100 Subject: [PATCH] test infrastructure, new conf's state_directory, less deps reaction's configuration now has a state_directory optional member, which is where it will save its databases. defaults to cwd. added a lot of code necessary to properly test databases. The new tests are currently failing, which is good : they'll permit to hunt down this database consistency bug. also removed some indirect dependencies via features removal, and moved test dependencies to dedicated [dev-dependencies] also small fix on an nft46.c function type and empty conf file for ccls LSP server. --- .ccls | 0 .gitignore | 2 + Cargo.lock | 115 ++++++++++++------ Cargo.toml | 9 +- helpers_c/nft46.c | 2 +- src/concepts/config.rs | 29 +++-- src/concepts/stream.rs | 10 ++ src/daemon/database/lowlevel.rs | 21 ++-- src/daemon/database/mod.rs | 62 +++++++--- src/daemon/database/tests.rs | 204 +++++++++++++++++++++++++++++--- 10 files changed, 361 insertions(+), 93 deletions(-) create mode 100644 .ccls diff --git a/.ccls b/.ccls new file mode 100644 index 0000000..e69de29 diff --git a/.gitignore b/.gitignore index 14eaadd..4f92806 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,5 @@ debian-packaging/* export-go-db/export-go-db import-rust-db/target /target +/local +.ccls-cache diff --git a/Cargo.lock b/Cargo.lock index 832022e..f436b46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -148,6 +148,12 @@ version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.7.2" @@ -284,7 +290,6 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", - "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -307,34 +312,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" -[[package]] -name = "futures-executor" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - [[package]] name = "futures-io" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" -[[package]] -name = "futures-macro" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.82", -] - [[package]] name = "futures-sink" version = "0.3.31" @@ -353,16 +336,22 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ - "futures-channel", "futures-core", - "futures-io", - "futures-macro", "futures-sink", "futures-task", - "memchr", "pin-project-lite", "pin-utils", - "slab", +] + +[[package]] +name = "getrandom" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +dependencies = [ + "cfg-if", + "libc", + "wasi", ] [[package]] @@ -711,6 +700,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "ppv-lite86" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy", +] + [[package]] name = "proc-macro2" version = "1.0.88" @@ -729,6 +727,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "reaction" version = "2.0.0-rc1" @@ -741,6 +769,7 @@ dependencies = [ "futures", "jrsonnet-evaluator", "num_cpus", + "rand", "regex", "serde", "serde_json", @@ -904,15 +933,6 @@ dependencies = [ "libc", ] -[[package]] -name = "slab" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" -dependencies = [ - "autocfg", -] - [[package]] name = "smallvec" version = "1.13.2" @@ -1345,3 +1365,24 @@ checksum = "fe5c30ade05e61656247b2e334a031dfd0cc466fadef865bdcdea8d537951bf1" dependencies = [ "winapi", ] + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "byteorder", + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.82", +] diff --git a/Cargo.toml b/Cargo.toml index d287106..5c9138d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "reaction" version = "2.0.0-rc1" edition = "2021" -authors = ["ppom "] license = "AGPL-3.0" description = "Scan logs and take action" readme = "README.md" @@ -21,10 +21,9 @@ regex = "1.10.4" serde = { version = "1.0.203", features = ["derive"] } serde_json = "1.0.117" serde_yaml = "0.9.34" -tempfile = "3.12.0" thiserror = "1.0.63" timer = "0.2.0" -futures = "0.3.30" +futures = { version = "0.3.30", default-features = false, features = ["alloc"] } tokio = { version = "1.40.0", features = ["full", "tracing"] } tokio-util = { version = "0.7.12", features = ["codec"] } tracing = "0.1.40" @@ -36,3 +35,7 @@ clap_complete = "4.5.2" clap_mangen = "0.2.24" regex = "1.10.4" tracing = "0.1.40" + +[dev-dependencies] +rand = "0.8.5" +tempfile = "3.12.0" diff --git a/helpers_c/nft46.c b/helpers_c/nft46.c index fc84aa0..cb0f419 100644 --- a/helpers_c/nft46.c +++ b/helpers_c/nft46.c @@ -80,7 +80,7 @@ void adapt_args(char *tab) { exit(1); } -int exec(char *str, char **argv) { +void exec(char *str, char **argv) { argv[0] = str; execvp(str, argv); // returns only if fails diff --git a/src/concepts/config.rs b/src/concepts/config.rs index cc66443..45d8edb 100644 --- a/src/concepts/config.rs +++ b/src/concepts/config.rs @@ -16,6 +16,7 @@ use super::{Filter, Pattern, Stream}; pub type Patterns = BTreeMap>; #[derive(Clone, Debug, Deserialize)] +#[cfg_attr(test, derive(Default))] #[serde(deny_unknown_fields)] pub struct Config { patterns: Patterns, @@ -29,6 +30,9 @@ pub struct Config { start: Vec>, #[serde(default)] stop: Vec>, + + #[serde(default)] + state_directory: String, } impl Config { @@ -44,6 +48,10 @@ impl Config { self.concurrency } + pub fn state_directory(&self) -> &str { + &self.state_directory + } + pub fn filters(&self) -> Vec<&Filter> { self.streams .values() @@ -119,6 +127,15 @@ impl Config { Ok(config) } + + #[cfg(test)] + pub fn from_streams(streams: BTreeMap, dir: &str) -> Self { + Self { + streams, + state_directory: dir.to_string(), + ..Default::default() + } + } } enum Format { @@ -211,19 +228,9 @@ mod tests { use super::*; - fn default_config() -> Config { - Config { - concurrency: 0, - patterns: BTreeMap::new(), - streams: BTreeMap::new(), - start: Vec::new(), - stop: Vec::new(), - } - } - #[test] fn config_missing() { - let mut config = default_config(); + let mut config = Config::default(); assert!(config.setup().is_err()); } } diff --git a/src/concepts/stream.rs b/src/concepts/stream.rs index 377968d..d8a4e47 100644 --- a/src/concepts/stream.rs +++ b/src/concepts/stream.rs @@ -5,6 +5,7 @@ use serde::Deserialize; use super::{Filter, Patterns}; #[derive(Clone, Debug, Deserialize)] +#[cfg_attr(test, derive(Default))] #[serde(deny_unknown_fields)] pub struct Stream { cmd: Vec, @@ -63,6 +64,15 @@ impl Stream { Ok(()) } + + #[cfg(test)] + pub fn from_filters(filters: BTreeMap, name: &str) -> Self { + Self { + filters, + name: name.to_string(), + ..Default::default() + } + } } impl PartialEq for Stream { diff --git a/src/daemon/database/lowlevel.rs b/src/daemon/database/lowlevel.rs index 5b0dc43..0a8a1fd 100644 --- a/src/daemon/database/lowlevel.rs +++ b/src/daemon/database/lowlevel.rs @@ -1,9 +1,5 @@ use std::{ - collections::BTreeMap, - fmt::Debug, - fs::File, - io::{self, BufReader, BufWriter, Read, Write}, - process::exit, + collections::BTreeMap, fmt::Debug, fs::File, io::{self, BufReader, BufWriter, Read, Write}, path::Path, process::exit }; use bincode::Options; @@ -16,7 +12,7 @@ use crate::{ protocol::{bincode_options, BincodeOptions}, }; -use super::{DBError, LogEntry}; +use super::{DBError, DatabaseNames, LogEntry}; // OPTIM Add a timestamp prefix to the header, to permit having // shorter timestamps? @@ -34,13 +30,13 @@ pub struct ReadDB { } impl ReadDB { - pub fn open(path: &str, config: &'static Config) -> Result, DBError> { + pub fn open(path: &Path, config: &'static Config) -> Result, DBError> { let file = match File::open(path) { Ok(file) => file, Err(err) => match err.kind() { std::io::ErrorKind::NotFound => { warn!( - "No DB found at {}. It's ok if this is the first time reaction is running.", + "No DB found at {:?}. It's ok if this is the first time reaction is running.", path ); return Ok(None); @@ -58,20 +54,21 @@ impl ReadDB { }; // Signature checking + // TODO if "failed to fill whole buffer", file is empty or almost empty. Ignore let mut signature = [0u8; 15]; ret.f .read_exact(&mut signature) .map_err(|err| DBError::Error(format!("reading database signature: {err}")))?; if DB_SIGNATURE.as_bytes()[0..13] != signature[0..13] { return Err(DBError::Error(format!( - "{path} is not a reaction database, or it is a reaction-v1.x database. + "{path:?} is not a reaction database, or it is a reaction-v1.x database. You can migrate your old database to a new one by following documented steps at https://reaction.ppom.me/migrate-to-v2 -You can also choose to delete the local {} and {} if you don't care about your old matches.", super::LOG_DB_NAME, super::FLUSH_DB_NAME +You can also choose to delete the local {} and {} if you don't care about your old matches.", DatabaseNames::LogDbName, DatabaseNames::FlushDbName ))); } if DB_SIGNATURE.as_bytes()[13..15] != signature[13..15] { return Err(DBError::Error(format!( - "{path} seem to be the database of a newer version of reaction. + "{path:?} seem to be the database of a newer version of reaction. Are you sure you're running the last version of reaction?" ))); } @@ -120,7 +117,7 @@ pub struct WriteDB { } impl WriteDB { - pub fn create(path: &str, config: &'static Config) -> Self { + pub fn create(path: &Path, config: &'static Config) -> Self { let file = match File::create(path) { Ok(file) => file, Err(err) => { diff --git a/src/daemon/database/mod.rs b/src/daemon/database/mod.rs index e597ff4..419a7e3 100644 --- a/src/daemon/database/mod.rs +++ b/src/daemon/database/mod.rs @@ -1,7 +1,8 @@ use std::{ collections::{BTreeMap, HashMap}, - fmt::Debug, + fmt::{Debug, Display}, fs, io, + path::PathBuf, process::exit, thread, }; @@ -18,9 +19,36 @@ mod tests; use lowlevel::{ReadDB, WriteDB}; -const LOG_DB_NAME: &str = "./reaction-matches.db"; -const LOG_DB_NEW_NAME: &str = "./reaction-matches.new.db"; -const FLUSH_DB_NAME: &str = "./reaction-flushes.db"; +enum DatabaseNames { + LogDbName, + LogDbNewName, + FlushDbName, +} +impl DatabaseNames { + fn basename(&self) -> &'static str { + match self { + DatabaseNames::LogDbName => "reaction-matches.db", + DatabaseNames::LogDbNewName => "reaction-matches.new.db", + DatabaseNames::FlushDbName => "reaction-flushes.db", + } + } +} +impl Display for DatabaseNames { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.basename()) + } +} + +impl Config { + fn path_of(&self, name: DatabaseNames) -> PathBuf { + if self.state_directory().is_empty() { + name.basename().into() + } else { + PathBuf::from(self.state_directory()).join(name.basename()) + } + } +} +use DatabaseNames::*; const MAX_WRITES: u32 = 500_000; @@ -132,23 +160,23 @@ fn rotate_db( config: &'static Config, matches_tx: Option>>, ) -> Result<(WriteDB, WriteDB), DBError> { - let mut log_read_db = match ReadDB::open(LOG_DB_NAME, config)? { + let mut log_read_db = match ReadDB::open(&config.path_of(LogDbName), config)? { Some(db) => db, None => { return Ok(( - WriteDB::create(LOG_DB_NAME, config), - WriteDB::create(FLUSH_DB_NAME, config), + WriteDB::create(&config.path_of(LogDbName), config), + WriteDB::create(&config.path_of(FlushDbName), config), )); } }; - let mut flush_read_db = match ReadDB::open(FLUSH_DB_NAME, config)? { + let mut flush_read_db = match ReadDB::open(&config.path_of(FlushDbName), config)? { Some(db) => db, None => { warn!( - "Strange! Found a {} but no {}, opening /dev/null instead", - LOG_DB_NAME, FLUSH_DB_NAME + "Strange! Found a {:?} but no {:?}, opening /dev/null instead", + &config.path_of(LogDbName), &config.path_of(FlushDbName) ); - match ReadDB::open("/dev/null", config)? { + match ReadDB::open(&PathBuf::from("/dev/null"), config)? { Some(db) => db, None => { return Err(DBError::Error("/dev/null is not accessible".into())); @@ -157,7 +185,7 @@ fn rotate_db( } }; - let mut log_write_db = WriteDB::create(LOG_DB_NEW_NAME, config); + let mut log_write_db = WriteDB::create(&config.path_of(LogDbNewName), config); _rotate_db( matches_tx, @@ -169,18 +197,18 @@ fn rotate_db( drop(log_read_db); drop(flush_read_db); - if let Err(err) = fs::rename(LOG_DB_NEW_NAME, LOG_DB_NAME) { + if let Err(err) = fs::rename(&config.path_of(LogDbNewName), &config.path_of(LogDbName)) { return Err(DBError::Error(format!( "Failed to replace old DB with new one: {}", err ))); } - if let Err(err) = fs::remove_file(FLUSH_DB_NAME) { + if let Err(err) = fs::remove_file(&config.path_of(FlushDbName)) { return Err(DBError::Error(format!("Failed to delete old DB: {}", err))); } - let flush_write_db = WriteDB::create(FLUSH_DB_NAME, config); + let flush_write_db = WriteDB::create(&config.path_of(FlushDbName), config); Ok((log_write_db, flush_write_db)) } @@ -250,7 +278,11 @@ fn _rotate_db( debug!("DB sending match from DB: {:?}", entry.m); #[allow(clippy::unwrap_used)] // propagating panics is ok tx.blocking_send((entry.m.clone(), entry.t)).unwrap(); + } else { + dbg!("no tx for ", entry.f); } + } else { + dbg!("no matches_tx"); } write_or_die!(log_write_db, entry); diff --git a/src/daemon/database/tests.rs b/src/daemon/database/tests.rs index c1af360..0538675 100644 --- a/src/daemon/database/tests.rs +++ b/src/daemon/database/tests.rs @@ -1,21 +1,24 @@ #![allow(clippy::unwrap_used)] #![cfg(test)] -use chrono::Local; +use std::{collections::BTreeMap, path::PathBuf}; + +use chrono::{Local, TimeDelta}; +use rand::{ + distributions::{Alphanumeric, DistString, Uniform}, + prelude::Distribution as _, +}; +use tempfile::TempDir; +use tokio::sync::mpsc; use super::{ReadDB, WriteDB}; use crate::{ - concepts::{Config, Filter}, - daemon::database::LogEntry, + concepts::{Config, Filter, Stream}, + daemon::database::{rotate_db, DatabaseNames, LogEntry}, tests::Fixture, }; -#[test] -fn write_and_read_db() { - let config_file = Fixture::from_string( - "config.jsonnet", - " -{ +const BASIC_INNER_CONF: &str = " patterns: { num: { regex: '[0-9]+' }, }, @@ -36,8 +39,65 @@ fn write_and_read_db() { } } } +"; + +#[test] +fn db_name() { + fn eq(path: PathBuf, str: &str) { + assert_eq!(path.into_os_string().into_string().unwrap(), str); + } + + let config_file = Fixture::from_string( + "config.jsonnet", + &format!( + "{{ {}, state_directory: '/var/lib/reaction' }}", + BASIC_INNER_CONF + ), + ); + let config = Config::from_file(&config_file).unwrap(); + assert_eq!(config.state_directory(), "/var/lib/reaction"); + eq( + config.path_of(DatabaseNames::LogDbName), + "/var/lib/reaction/reaction-matches.db", + ); + eq( + config.path_of(DatabaseNames::LogDbNewName), + "/var/lib/reaction/reaction-matches.new.db", + ); + eq( + config.path_of(DatabaseNames::FlushDbName), + "/var/lib/reaction/reaction-flushes.db", + ); + + let config_file = + Fixture::from_string("config.jsonnet", &format!("{{ {} }}", BASIC_INNER_CONF)); + let config = Config::from_file(&config_file).unwrap(); + assert_eq!(config.state_directory(), ""); + eq( + config.path_of(DatabaseNames::LogDbName), + "reaction-matches.db", + ); + eq( + config.path_of(DatabaseNames::LogDbNewName), + "reaction-matches.new.db", + ); + eq( + config.path_of(DatabaseNames::FlushDbName), + "reaction-flushes.db", + ); } - ", + +#[test] +fn write_and_read_db() { + let dir = TempDir::new().unwrap(); + + let config_file = Fixture::from_string( + "config.jsonnet", + &format!( + "{{ {}, state_directory: '{}' }}", + BASIC_INNER_CONF, + dir.into_path().into_os_string().into_string().unwrap() + ), ); let config = Box::leak(Box::new(Config::from_file(&config_file).unwrap())); @@ -60,16 +120,14 @@ fn write_and_read_db() { exec: false, }; - let db_path = Fixture::empty("matches.db"); - - let mut write_db = WriteDB::create(db_path.to_str().unwrap(), config); + let mut write_db = WriteDB::create(&config.path_of(DatabaseNames::LogDbName), config); assert!(write_db.write(correct_log_entry.clone()).is_ok()); assert!(write_db.write(incorrect_log_entry).is_err()); drop(write_db); - let read_db = ReadDB::open(db_path.to_str().unwrap(), config); + let read_db = ReadDB::open(&config.path_of(DatabaseNames::LogDbName), config); assert!(read_db.is_ok()); let read_db = read_db.unwrap(); @@ -85,3 +143,121 @@ fn write_and_read_db() { let read_entry = read_db.next(); assert!(read_entry.is_none()); } + +#[test] +fn write_and_read_db_big() { + let mut rng = rand::thread_rng(); + + let streams = (0..10) + .map(|_| Alphanumeric.sample_string(&mut rng, 10)) + .collect::>() + .into_iter() + .map(|sname| { + ( + sname.clone(), + Stream::from_filters( + (0..10) + .map(|_| Alphanumeric.sample_string(&mut rng, 10)) + .map(|fname| (fname.clone(), Filter::from_name(&sname, &fname))) + .collect(), + &sname, + ), + ) + }) + .collect(); + + let dir = TempDir::new().unwrap(); + let config = Config::from_streams( + streams, + &dir.into_path().into_os_string().into_string().unwrap(), + ); + let config = Box::leak(Box::new(config)); + + let u0t5 = Uniform::new(0, 5); + // let u0t10 = Uniform::new(0, 9); + + // 300 random entries + let entries: Vec<_> = (0..300) + .map(|i| LogEntry { + // Random match of 5 Strings of size 10 + m: (0..u0t5.sample(&mut rng)) + .map(|_| Alphanumeric.sample_string(&mut rng, 10)) + .collect(), + // Random filter in config + f: config + .streams() + .iter() + // .take(u0t10.sample(&mut rng)) + .take(1) + .last() + .unwrap() + .1 + .filters() + .iter() + // .take(u0t10.sample(&mut rng)) + .take(1) + .last() + .unwrap() + .1, + // Now + incremented microsecond (avoid duplication) + t: Local::now() + TimeDelta::microseconds(i), + exec: false, + }) + .collect(); + + let mut write_db = WriteDB::create(&config.path_of(DatabaseNames::LogDbName), config); + { + let mut _flush_db = WriteDB::create(&config.path_of(DatabaseNames::FlushDbName), config); + } + + for i in 0..entries.len() { + assert!( + write_db.write(entries[i].clone()).is_ok(), + "could not write entry n°{i}" + ); + } + + drop(write_db); + + let mut log2filter_tx = BTreeMap::new(); + let mut log2filter_rx = BTreeMap::new(); + for stream in config.streams().values() { + for filter in stream.filters().values() { + let (tx, rx) = mpsc::channel(3000); + log2filter_tx.insert(filter, tx); + log2filter_rx.insert(filter, rx); + } + } + + // We clone the senders so that the channels are not closed after database rotation + let _log2filter_tx = log2filter_tx.clone(); + + let rotated = rotate_db(config, Some(log2filter_tx)); + assert!( + rotated.is_ok(), + "database rotation failed: {}", + rotated.err().unwrap() + ); + + for i in 0..entries.len() { + let entry = &entries[i]; + let rx = &mut log2filter_rx.get_mut(entry.f).unwrap(); + let read_entry = rx.try_recv(); + assert!( + read_entry.is_ok(), + "entry n°{i} is err: {}", + read_entry.err().unwrap() + ); + let (m, t) = read_entry.unwrap(); + assert_eq!(entry.m, m, "entry n°{i}'s match is incorrect"); + assert_eq!( + t.timestamp(), + entry.t.timestamp(), + "entry n°{i}'s t is incorrect", + ); + } + + for (_, rx) in &log2filter_rx { + assert!(rx.is_empty()); + } +}