test infrastructure, new conf's state_directory, less deps

reaction's configuration now has a state_directory optional member,
which is where it will save its databases. defaults to cwd.

added a lot of code necessary to properly test databases.
The new tests are currently failing, which is good : they'll permit
to hunt down this database consistency bug.

also removed some indirect dependencies via features removal,
and moved test dependencies to dedicated [dev-dependencies]

also small fix on an nft46.c function type and empty conf file
for ccls LSP server.
This commit is contained in:
ppom 2024-11-11 12:00:00 +01:00
commit 8579e30890
10 changed files with 361 additions and 93 deletions

0
.ccls Normal file
View file

2
.gitignore vendored
View file

@ -16,3 +16,5 @@ debian-packaging/*
export-go-db/export-go-db
import-rust-db/target
/target
/local
.ccls-cache

115
Cargo.lock generated
View file

@ -148,6 +148,12 @@ version = "3.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c"
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.7.2"
@ -284,7 +290,6 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
@ -307,34 +312,12 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.82",
]
[[package]]
name = "futures-sink"
version = "0.3.31"
@ -353,16 +336,22 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "getrandom"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
@ -711,6 +700,15 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "ppv-lite86"
version = "0.2.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04"
dependencies = [
"zerocopy",
]
[[package]]
name = "proc-macro2"
version = "1.0.88"
@ -729,6 +727,36 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
]
[[package]]
name = "reaction"
version = "2.0.0-rc1"
@ -741,6 +769,7 @@ dependencies = [
"futures",
"jrsonnet-evaluator",
"num_cpus",
"rand",
"regex",
"serde",
"serde_json",
@ -904,15 +933,6 @@ dependencies = [
"libc",
]
[[package]]
name = "slab"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
dependencies = [
"autocfg",
]
[[package]]
name = "smallvec"
version = "1.13.2"
@ -1345,3 +1365,24 @@ checksum = "fe5c30ade05e61656247b2e334a031dfd0cc466fadef865bdcdea8d537951bf1"
dependencies = [
"winapi",
]
[[package]]
name = "zerocopy"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
dependencies = [
"byteorder",
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.82",
]

View file

@ -2,7 +2,7 @@
name = "reaction"
version = "2.0.0-rc1"
edition = "2021"
authors = ["ppom <reaction@ppom.me"]
authors = ["ppom <reaction@ppom.me>"]
license = "AGPL-3.0"
description = "Scan logs and take action"
readme = "README.md"
@ -21,10 +21,9 @@ regex = "1.10.4"
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.117"
serde_yaml = "0.9.34"
tempfile = "3.12.0"
thiserror = "1.0.63"
timer = "0.2.0"
futures = "0.3.30"
futures = { version = "0.3.30", default-features = false, features = ["alloc"] }
tokio = { version = "1.40.0", features = ["full", "tracing"] }
tokio-util = { version = "0.7.12", features = ["codec"] }
tracing = "0.1.40"
@ -36,3 +35,7 @@ clap_complete = "4.5.2"
clap_mangen = "0.2.24"
regex = "1.10.4"
tracing = "0.1.40"
[dev-dependencies]
rand = "0.8.5"
tempfile = "3.12.0"

View file

@ -80,7 +80,7 @@ void adapt_args(char *tab) {
exit(1);
}
int exec(char *str, char **argv) {
void exec(char *str, char **argv) {
argv[0] = str;
execvp(str, argv);
// returns only if fails

View file

@ -16,6 +16,7 @@ use super::{Filter, Pattern, Stream};
pub type Patterns = BTreeMap<String, Arc<Pattern>>;
#[derive(Clone, Debug, Deserialize)]
#[cfg_attr(test, derive(Default))]
#[serde(deny_unknown_fields)]
pub struct Config {
patterns: Patterns,
@ -29,6 +30,9 @@ pub struct Config {
start: Vec<Vec<String>>,
#[serde(default)]
stop: Vec<Vec<String>>,
#[serde(default)]
state_directory: String,
}
impl Config {
@ -44,6 +48,10 @@ impl Config {
self.concurrency
}
pub fn state_directory(&self) -> &str {
&self.state_directory
}
pub fn filters(&self) -> Vec<&Filter> {
self.streams
.values()
@ -119,6 +127,15 @@ impl Config {
Ok(config)
}
#[cfg(test)]
pub fn from_streams(streams: BTreeMap<String, Stream>, dir: &str) -> Self {
Self {
streams,
state_directory: dir.to_string(),
..Default::default()
}
}
}
enum Format {
@ -211,19 +228,9 @@ mod tests {
use super::*;
fn default_config() -> Config {
Config {
concurrency: 0,
patterns: BTreeMap::new(),
streams: BTreeMap::new(),
start: Vec::new(),
stop: Vec::new(),
}
}
#[test]
fn config_missing() {
let mut config = default_config();
let mut config = Config::default();
assert!(config.setup().is_err());
}
}

View file

@ -5,6 +5,7 @@ use serde::Deserialize;
use super::{Filter, Patterns};
#[derive(Clone, Debug, Deserialize)]
#[cfg_attr(test, derive(Default))]
#[serde(deny_unknown_fields)]
pub struct Stream {
cmd: Vec<String>,
@ -63,6 +64,15 @@ impl Stream {
Ok(())
}
#[cfg(test)]
pub fn from_filters(filters: BTreeMap<String, Filter>, name: &str) -> Self {
Self {
filters,
name: name.to_string(),
..Default::default()
}
}
}
impl PartialEq for Stream {

View file

@ -1,9 +1,5 @@
use std::{
collections::BTreeMap,
fmt::Debug,
fs::File,
io::{self, BufReader, BufWriter, Read, Write},
process::exit,
collections::BTreeMap, fmt::Debug, fs::File, io::{self, BufReader, BufWriter, Read, Write}, path::Path, process::exit
};
use bincode::Options;
@ -16,7 +12,7 @@ use crate::{
protocol::{bincode_options, BincodeOptions},
};
use super::{DBError, LogEntry};
use super::{DBError, DatabaseNames, LogEntry};
// OPTIM Add a timestamp prefix to the header, to permit having
// shorter timestamps?
@ -34,13 +30,13 @@ pub struct ReadDB {
}
impl ReadDB {
pub fn open(path: &str, config: &'static Config) -> Result<Option<Self>, DBError> {
pub fn open(path: &Path, config: &'static Config) -> Result<Option<Self>, DBError> {
let file = match File::open(path) {
Ok(file) => file,
Err(err) => match err.kind() {
std::io::ErrorKind::NotFound => {
warn!(
"No DB found at {}. It's ok if this is the first time reaction is running.",
"No DB found at {:?}. It's ok if this is the first time reaction is running.",
path
);
return Ok(None);
@ -58,20 +54,21 @@ impl ReadDB {
};
// Signature checking
// TODO if "failed to fill whole buffer", file is empty or almost empty. Ignore
let mut signature = [0u8; 15];
ret.f
.read_exact(&mut signature)
.map_err(|err| DBError::Error(format!("reading database signature: {err}")))?;
if DB_SIGNATURE.as_bytes()[0..13] != signature[0..13] {
return Err(DBError::Error(format!(
"{path} is not a reaction database, or it is a reaction-v1.x database.
"{path:?} is not a reaction database, or it is a reaction-v1.x database.
You can migrate your old database to a new one by following documented steps at https://reaction.ppom.me/migrate-to-v2
You can also choose to delete the local {} and {} if you don't care about your old matches.", super::LOG_DB_NAME, super::FLUSH_DB_NAME
You can also choose to delete the local {} and {} if you don't care about your old matches.", DatabaseNames::LogDbName, DatabaseNames::FlushDbName
)));
}
if DB_SIGNATURE.as_bytes()[13..15] != signature[13..15] {
return Err(DBError::Error(format!(
"{path} seem to be the database of a newer version of reaction.
"{path:?} seem to be the database of a newer version of reaction.
Are you sure you're running the last version of reaction?"
)));
}
@ -120,7 +117,7 @@ pub struct WriteDB {
}
impl WriteDB {
pub fn create(path: &str, config: &'static Config) -> Self {
pub fn create(path: &Path, config: &'static Config) -> Self {
let file = match File::create(path) {
Ok(file) => file,
Err(err) => {

View file

@ -1,7 +1,8 @@
use std::{
collections::{BTreeMap, HashMap},
fmt::Debug,
fmt::{Debug, Display},
fs, io,
path::PathBuf,
process::exit,
thread,
};
@ -18,9 +19,36 @@ mod tests;
use lowlevel::{ReadDB, WriteDB};
const LOG_DB_NAME: &str = "./reaction-matches.db";
const LOG_DB_NEW_NAME: &str = "./reaction-matches.new.db";
const FLUSH_DB_NAME: &str = "./reaction-flushes.db";
enum DatabaseNames {
LogDbName,
LogDbNewName,
FlushDbName,
}
impl DatabaseNames {
fn basename(&self) -> &'static str {
match self {
DatabaseNames::LogDbName => "reaction-matches.db",
DatabaseNames::LogDbNewName => "reaction-matches.new.db",
DatabaseNames::FlushDbName => "reaction-flushes.db",
}
}
}
impl Display for DatabaseNames {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.basename())
}
}
impl Config {
fn path_of(&self, name: DatabaseNames) -> PathBuf {
if self.state_directory().is_empty() {
name.basename().into()
} else {
PathBuf::from(self.state_directory()).join(name.basename())
}
}
}
use DatabaseNames::*;
const MAX_WRITES: u32 = 500_000;
@ -132,23 +160,23 @@ fn rotate_db(
config: &'static Config,
matches_tx: Option<BTreeMap<&Filter, mpsc::Sender<(Match, Time)>>>,
) -> Result<(WriteDB, WriteDB), DBError> {
let mut log_read_db = match ReadDB::open(LOG_DB_NAME, config)? {
let mut log_read_db = match ReadDB::open(&config.path_of(LogDbName), config)? {
Some(db) => db,
None => {
return Ok((
WriteDB::create(LOG_DB_NAME, config),
WriteDB::create(FLUSH_DB_NAME, config),
WriteDB::create(&config.path_of(LogDbName), config),
WriteDB::create(&config.path_of(FlushDbName), config),
));
}
};
let mut flush_read_db = match ReadDB::open(FLUSH_DB_NAME, config)? {
let mut flush_read_db = match ReadDB::open(&config.path_of(FlushDbName), config)? {
Some(db) => db,
None => {
warn!(
"Strange! Found a {} but no {}, opening /dev/null instead",
LOG_DB_NAME, FLUSH_DB_NAME
"Strange! Found a {:?} but no {:?}, opening /dev/null instead",
&config.path_of(LogDbName), &config.path_of(FlushDbName)
);
match ReadDB::open("/dev/null", config)? {
match ReadDB::open(&PathBuf::from("/dev/null"), config)? {
Some(db) => db,
None => {
return Err(DBError::Error("/dev/null is not accessible".into()));
@ -157,7 +185,7 @@ fn rotate_db(
}
};
let mut log_write_db = WriteDB::create(LOG_DB_NEW_NAME, config);
let mut log_write_db = WriteDB::create(&config.path_of(LogDbNewName), config);
_rotate_db(
matches_tx,
@ -169,18 +197,18 @@ fn rotate_db(
drop(log_read_db);
drop(flush_read_db);
if let Err(err) = fs::rename(LOG_DB_NEW_NAME, LOG_DB_NAME) {
if let Err(err) = fs::rename(&config.path_of(LogDbNewName), &config.path_of(LogDbName)) {
return Err(DBError::Error(format!(
"Failed to replace old DB with new one: {}",
err
)));
}
if let Err(err) = fs::remove_file(FLUSH_DB_NAME) {
if let Err(err) = fs::remove_file(&config.path_of(FlushDbName)) {
return Err(DBError::Error(format!("Failed to delete old DB: {}", err)));
}
let flush_write_db = WriteDB::create(FLUSH_DB_NAME, config);
let flush_write_db = WriteDB::create(&config.path_of(FlushDbName), config);
Ok((log_write_db, flush_write_db))
}
@ -250,7 +278,11 @@ fn _rotate_db(
debug!("DB sending match from DB: {:?}", entry.m);
#[allow(clippy::unwrap_used)] // propagating panics is ok
tx.blocking_send((entry.m.clone(), entry.t)).unwrap();
} else {
dbg!("no tx for ", entry.f);
}
} else {
dbg!("no matches_tx");
}
write_or_die!(log_write_db, entry);

View file

@ -1,21 +1,24 @@
#![allow(clippy::unwrap_used)]
#![cfg(test)]
use chrono::Local;
use std::{collections::BTreeMap, path::PathBuf};
use chrono::{Local, TimeDelta};
use rand::{
distributions::{Alphanumeric, DistString, Uniform},
prelude::Distribution as _,
};
use tempfile::TempDir;
use tokio::sync::mpsc;
use super::{ReadDB, WriteDB};
use crate::{
concepts::{Config, Filter},
daemon::database::LogEntry,
concepts::{Config, Filter, Stream},
daemon::database::{rotate_db, DatabaseNames, LogEntry},
tests::Fixture,
};
#[test]
fn write_and_read_db() {
let config_file = Fixture::from_string(
"config.jsonnet",
"
{
const BASIC_INNER_CONF: &str = "
patterns: {
num: { regex: '[0-9]+' },
},
@ -36,8 +39,65 @@ fn write_and_read_db() {
}
}
}
";
#[test]
fn db_name() {
fn eq(path: PathBuf, str: &str) {
assert_eq!(path.into_os_string().into_string().unwrap(), str);
}
",
let config_file = Fixture::from_string(
"config.jsonnet",
&format!(
"{{ {}, state_directory: '/var/lib/reaction' }}",
BASIC_INNER_CONF
),
);
let config = Config::from_file(&config_file).unwrap();
assert_eq!(config.state_directory(), "/var/lib/reaction");
eq(
config.path_of(DatabaseNames::LogDbName),
"/var/lib/reaction/reaction-matches.db",
);
eq(
config.path_of(DatabaseNames::LogDbNewName),
"/var/lib/reaction/reaction-matches.new.db",
);
eq(
config.path_of(DatabaseNames::FlushDbName),
"/var/lib/reaction/reaction-flushes.db",
);
let config_file =
Fixture::from_string("config.jsonnet", &format!("{{ {} }}", BASIC_INNER_CONF));
let config = Config::from_file(&config_file).unwrap();
assert_eq!(config.state_directory(), "");
eq(
config.path_of(DatabaseNames::LogDbName),
"reaction-matches.db",
);
eq(
config.path_of(DatabaseNames::LogDbNewName),
"reaction-matches.new.db",
);
eq(
config.path_of(DatabaseNames::FlushDbName),
"reaction-flushes.db",
);
}
#[test]
fn write_and_read_db() {
let dir = TempDir::new().unwrap();
let config_file = Fixture::from_string(
"config.jsonnet",
&format!(
"{{ {}, state_directory: '{}' }}",
BASIC_INNER_CONF,
dir.into_path().into_os_string().into_string().unwrap()
),
);
let config = Box::leak(Box::new(Config::from_file(&config_file).unwrap()));
@ -60,16 +120,14 @@ fn write_and_read_db() {
exec: false,
};
let db_path = Fixture::empty("matches.db");
let mut write_db = WriteDB::create(db_path.to_str().unwrap(), config);
let mut write_db = WriteDB::create(&config.path_of(DatabaseNames::LogDbName), config);
assert!(write_db.write(correct_log_entry.clone()).is_ok());
assert!(write_db.write(incorrect_log_entry).is_err());
drop(write_db);
let read_db = ReadDB::open(db_path.to_str().unwrap(), config);
let read_db = ReadDB::open(&config.path_of(DatabaseNames::LogDbName), config);
assert!(read_db.is_ok());
let read_db = read_db.unwrap();
@ -85,3 +143,121 @@ fn write_and_read_db() {
let read_entry = read_db.next();
assert!(read_entry.is_none());
}
#[test]
fn write_and_read_db_big() {
let mut rng = rand::thread_rng();
let streams = (0..10)
.map(|_| Alphanumeric.sample_string(&mut rng, 10))
.collect::<Vec<_>>()
.into_iter()
.map(|sname| {
(
sname.clone(),
Stream::from_filters(
(0..10)
.map(|_| Alphanumeric.sample_string(&mut rng, 10))
.map(|fname| (fname.clone(), Filter::from_name(&sname, &fname)))
.collect(),
&sname,
),
)
})
.collect();
let dir = TempDir::new().unwrap();
let config = Config::from_streams(
streams,
&dir.into_path().into_os_string().into_string().unwrap(),
);
let config = Box::leak(Box::new(config));
let u0t5 = Uniform::new(0, 5);
// let u0t10 = Uniform::new(0, 9);
// 300 random entries
let entries: Vec<_> = (0..300)
.map(|i| LogEntry {
// Random match of 5 Strings of size 10
m: (0..u0t5.sample(&mut rng))
.map(|_| Alphanumeric.sample_string(&mut rng, 10))
.collect(),
// Random filter in config
f: config
.streams()
.iter()
// .take(u0t10.sample(&mut rng))
.take(1)
.last()
.unwrap()
.1
.filters()
.iter()
// .take(u0t10.sample(&mut rng))
.take(1)
.last()
.unwrap()
.1,
// Now + incremented microsecond (avoid duplication)
t: Local::now() + TimeDelta::microseconds(i),
exec: false,
})
.collect();
let mut write_db = WriteDB::create(&config.path_of(DatabaseNames::LogDbName), config);
{
let mut _flush_db = WriteDB::create(&config.path_of(DatabaseNames::FlushDbName), config);
}
for i in 0..entries.len() {
assert!(
write_db.write(entries[i].clone()).is_ok(),
"could not write entry n°{i}"
);
}
drop(write_db);
let mut log2filter_tx = BTreeMap::new();
let mut log2filter_rx = BTreeMap::new();
for stream in config.streams().values() {
for filter in stream.filters().values() {
let (tx, rx) = mpsc::channel(3000);
log2filter_tx.insert(filter, tx);
log2filter_rx.insert(filter, rx);
}
}
// We clone the senders so that the channels are not closed after database rotation
let _log2filter_tx = log2filter_tx.clone();
let rotated = rotate_db(config, Some(log2filter_tx));
assert!(
rotated.is_ok(),
"database rotation failed: {}",
rotated.err().unwrap()
);
for i in 0..entries.len() {
let entry = &entries[i];
let rx = &mut log2filter_rx.get_mut(entry.f).unwrap();
let read_entry = rx.try_recv();
assert!(
read_entry.is_ok(),
"entry n°{i} is err: {}",
read_entry.err().unwrap()
);
let (m, t) = read_entry.unwrap();
assert_eq!(entry.m, m, "entry n°{i}'s match is incorrect");
assert_eq!(
t.timestamp(),
entry.t.timestamp(),
"entry n°{i}'s t is incorrect",
);
}
for (_, rx) in &log2filter_rx {
assert!(rx.is_empty());
}
}