fix database deserializing and use BufWriter

Deserializing was failing because Filter::Deserialize and
Filter::Serialize had different attributes.
Now directly de·serializing (String, String) from/to database

Using BufWriter greatly increases performance.
Adding flushes where necessary.
This commit is contained in:
ppom 2024-09-24 12:00:00 +02:00
commit 6b52b03025
9 changed files with 74 additions and 97 deletions

View file

@ -1,4 +1,6 @@
---
concurrency: 32
patterns:
num:
regex: '[0-9]{3}'
@ -9,12 +11,12 @@ patterns:
streams:
tailDown1:
cmd: [ 'sh', '-c', 'sleep 2; seq 1000100 | while read i; do echo found $i; done' ]
cmd: [ 'sh', '-c', 'sleep 2; seq 10001 | while read i; do echo found $i; done' ]
filters:
find:
regex:
- '^found <num>'
retry: 480
retry: 9
retryperiod: 1m
actions:
damn:
@ -68,48 +70,3 @@ streams:
cmd: [ 'sleep', '0.0<num>' ]
after: 1m
onexit: false
# tailDown2:
# cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo prout $i; done' ]
# filters:
# findIP:
# regex:
# - '^prout <num>'
# retry: 500
# retryperiod: 1m
# actions:
# damn:
# cmd: [ 'sleep', '0.0<num>' ]
# undamn:
# cmd: [ 'sleep', '0.0<num>' ]
# after: 1m
# onexit: false
# tailDown3:
# cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo nanana $i; done' ]
# filters:
# findIP:
# regex:
# - '^nanana <num>'
# retry: 500
# retryperiod: 2m
# actions:
# damn:
# cmd: [ 'sleep', '0.0<num>' ]
# undamn:
# cmd: [ 'sleep', '0.0<num>' ]
# after: 1m
# onexit: false
# tailDown4:
# cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo nanana $i; done' ]
# filters:
# findIP:
# regex:
# - '^nomatch <num>$'
# retry: 500
# retryperiod: 2m
# actions:
# damn:
# cmd: [ 'sleep', '0.0<num>' ]
# undamn:
# cmd: [ 'sleep', '0.0<num>' ]
# after: 1m
# onexit: false

View file

@ -47,6 +47,12 @@ impl Config {
.collect()
}
pub fn get_filter(&self, name: &(String, String)) -> Option<&Arc<Filter>> {
self.streams
.get(&name.0)
.and_then(|stream| stream.get_filter(&name.1))
}
pub fn setup(&mut self) -> Result<(), String> {
if self.concurrency == 0 {
self.concurrency = num_cpus::get();

View file

@ -2,7 +2,7 @@ use std::{
collections::BTreeMap,
fmt::Debug,
fs::File,
io::{self, BufWriter, Write},
io::{self, BufReader, BufWriter, Write},
process::exit,
sync::Arc,
};
@ -19,20 +19,25 @@ use crate::{
use super::DBError;
type DatabaseWriteHeader = BTreeMap<Arc<Filter>, usize>;
type DatabaseReadHeader = BTreeMap<usize, Arc<Filter>>;
// OPTIM Add a timestamp prefix to the header, to permit having
// shorter timestamps?
// It may permit to win 1-4 bytes per entry, don't know if it's worth it
// FIXME put signature in the header?
type DatabaseHeader = BTreeMap<usize, (String, String)>;
type ReadHeader = BTreeMap<usize, Arc<Filter>>;
type WriteHeader = BTreeMap<Arc<Filter>, usize>;
const BUFFER_MAX_SIZE: usize = 10 * 1024 * 1024;
const DB_SIGNATURE: &str = "reaction-db-v01";
pub struct ReadDB {
f: File,
h: DatabaseReadHeader,
f: BufReader<File>,
h: ReadHeader,
buf: Vec<u8>,
}
impl ReadDB {
pub fn open(path: &str) -> Result<Option<Self>, DBError> {
pub fn open(path: &str, config: &Arc<Config>) -> Result<Option<Self>, DBError> {
let file = match File::open(path) {
Ok(file) => file,
Err(err) => match err.kind() {
@ -50,7 +55,7 @@ impl ReadDB {
};
let mut ret = ReadDB {
f: file,
f: BufReader::new(file),
h: BTreeMap::new(),
buf: vec![0; BUFFER_MAX_SIZE],
};
@ -61,10 +66,15 @@ impl ReadDB {
Err(err) => Err(DBError::Error(format!("reading database signature: {err}"))),
}?;
ret.h = ret
.read()
let db_header = ret
.read::<DatabaseHeader>()
.map_err(|err| DBError::Error(format!("while reading database header: {err}")))?;
ret.h = db_header
.iter()
.filter_map(|(key, name)| config.get_filter(name).map(|filter| (*key, filter.clone())))
.collect();
Ok(Some(ret))
}
@ -82,13 +92,9 @@ impl Iterator for ReadDB {
match self.read::<ComputedLogEntry>() {
// FIXME why we got a default item instead of an error or something?
// How do we really know we reached the end?
Ok(item) => {
if item.t == 0 {
None
} else {
Some(item.to(&self.h))
}
}
// For now, checking if time is 0
Ok(ComputedLogEntry { t: 0, .. }) => None,
Ok(item) => Some(item.to(&self.h)),
Err(err) => match err {
postcard::Error::DeserializeUnexpectedEnd => None,
_ => Some(Err(err.into())),
@ -99,7 +105,7 @@ impl Iterator for ReadDB {
pub struct WriteDB {
f: BufWriter<File>,
h: DatabaseWriteHeader,
h: WriteHeader,
buf: Vec<u8>,
}
@ -124,20 +130,25 @@ impl WriteDB {
exit(1);
}
let database_read_header: DatabaseReadHeader =
config.filters().into_iter().enumerate().collect();
let database_header: DatabaseHeader = config
.filters()
.into_iter()
.map(|f| f.full_name())
.enumerate()
.collect();
if let Err(err) = ret._write(&database_read_header) {
if let Err(err) = ret._write(&database_header) {
error!("Failed to write to DB: {}", err);
exit(1);
}
let database_write_header = database_read_header
ret.h = config
.filters()
.into_iter()
.map(|(i, name)| (name, i))
.enumerate()
.map(|(key, filter)| (filter, key))
.collect();
ret.h = database_write_header;
ret
}
@ -171,19 +182,19 @@ struct ComputedLogEntry {
}
impl ComputedLogEntry {
fn from(value: LogEntry, header: &DatabaseWriteHeader) -> Result<Self, DBError> {
let filter = value.f;
match header.get(&filter) {
#[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used
fn from(value: LogEntry, header: &WriteHeader) -> Result<Self, DBError> {
match header.get(&value.f) {
Some(f) => Ok(ComputedLogEntry {
m: value.m,
f: *f,
t: value.t.timestamp(),
exec: value.exec,
}),
None => Err(DBError::InvalidFilterError(filter.to_string())),
None => Err(DBError::InvalidFilterError(value.f.to_string())),
}
}
fn to(self, header: &DatabaseReadHeader) -> Result<LogEntry, DBError> {
fn to(self, header: &ReadHeader) -> Result<LogEntry, DBError> {
match header.get(&self.f) {
Some(f) => Ok(LogEntry {
m: self.m,

View file

@ -12,7 +12,6 @@ use std::{
use chrono::{Local, TimeDelta};
use log::{debug, error, info, warn};
use postcard;
use thiserror::Error;
use crate::{
@ -32,6 +31,7 @@ const LOG_DB_NEW_NAME: &str = "./reaction-matches.new.db";
const FLUSH_DB_NAME: &str = "./reaction-flushes.db";
const MAX_WRITES: u32 = 500_000;
#[derive(Error, Debug)]
pub enum DBError {
#[error("invalid filter: {0}")]
@ -43,6 +43,7 @@ pub enum DBError {
#[error("{0}")]
Error(String),
}
#[derive(Clone)]
pub enum DatabaseManagerInput {
Log(LogEntry),
@ -135,7 +136,7 @@ fn _rotate_db(
config: &Arc<Config>,
matches_tx: &Option<Sender<MatchManagerInput>>,
) -> Result<(WriteDB, WriteDB), DBError> {
let mut log_read_db = match ReadDB::open(LOG_DB_NAME)? {
let mut log_read_db = match ReadDB::open(LOG_DB_NAME, config)? {
Some(db) => db,
None => {
return Ok((
@ -144,14 +145,14 @@ fn _rotate_db(
));
}
};
let mut flush_read_db = match ReadDB::open(FLUSH_DB_NAME)? {
let mut flush_read_db = match ReadDB::open(FLUSH_DB_NAME, config)? {
Some(db) => db,
None => {
warn!(
"Strange! Found a {} but no {}, opening /dev/null instead",
LOG_DB_NAME, FLUSH_DB_NAME
);
match ReadDB::open("/dev/null")? {
match ReadDB::open("/dev/null", config)? {
Some(db) => db,
None => {
return Err(DBError::Error("/dev/null is not accessible".into()));
@ -198,6 +199,7 @@ fn __rotate_db(
let mut millisecond_disambiguation_counter: u32 = 0;
// Read flushes
#[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used
let mut flushes: BTreeMap<Arc<Filter>, BTreeMap<Match, Time>> = BTreeMap::new();
for flush_entry in flush_read_db {
match flush_entry {

View file

@ -69,7 +69,7 @@ fn write_and_read_db() {
drop(write_db);
let read_db = ReadDB::open(db_path.to_str().unwrap());
let read_db = ReadDB::open(db_path.to_str().unwrap(), &config);
assert!(read_db.is_ok());
let read_db = read_db.unwrap();

View file

@ -96,6 +96,7 @@ pub fn execs_manager(
}
};
#[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used
let mut execs: ExecsMap = BTreeMap::new();
let timer = MessageTimer::new(exec_tx);

View file

@ -8,7 +8,7 @@ use std::{
use chrono::TimeDelta;
use log::info;
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use crate::{
action::Action,
@ -21,15 +21,13 @@ use crate::{
// Only names are serialized
// Only computed fields are not deserialized
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[derive(Clone, Debug, Default, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Filter {
#[serde(skip_serializing)]
actions: BTreeMap<String, Arc<Action>>,
#[serde(skip)]
longuest_action_duration: TimeDelta,
#[serde(skip_serializing)]
regex: Vec<String>,
#[serde(skip)]
compiled_regex: Vec<Regex>,
@ -38,16 +36,15 @@ pub struct Filter {
#[serde(skip)]
patterns: Arc<BTreeSet<Arc<Pattern>>>,
#[serde(skip_serializing)]
retry: Option<u32>,
#[serde(skip_serializing, rename = "retryperiod")]
#[serde(rename = "retryperiod")]
retry_period: Option<String>,
#[serde(skip)]
retry_duration: Option<TimeDelta>,
#[serde(skip_deserializing)]
#[serde(skip)]
name: String,
#[serde(skip_deserializing)]
#[serde(skip)]
stream_name: String,
}
@ -61,6 +58,10 @@ impl Filter {
}
}
pub fn full_name(&self) -> (String, String) {
(self.stream_name.clone(), self.name.clone())
}
pub fn retry(&self) -> Option<u32> {
self.retry
}
@ -73,6 +74,7 @@ impl Filter {
self.longuest_action_duration
}
#[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used
pub fn patterns(&self) -> &BTreeSet<Arc<Pattern>> {
&self.patterns
}
@ -123,6 +125,7 @@ impl Filter {
return Err("no regex configured".into());
}
#[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used
let mut new_patterns = BTreeSet::new();
let mut first = true;
for regex in &self.regex {

View file

@ -79,6 +79,7 @@ pub fn matches_manager(
action_tx: Sender<ExecsManagerInput>,
log_tx: SyncSender<DatabaseManagerInput>,
) {
#[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used
let mut matches: MatchesMap = BTreeMap::new();
let timer = MessageTimer::new(match_tx);

View file

@ -29,6 +29,10 @@ impl Stream {
&self.filters
}
pub fn get_filter(&self, filter_name: &str) -> Option<&Arc<Filter>> {
self.filters.get(filter_name)
}
pub fn setup(&mut self, name: &str, patterns: &Patterns) -> Result<(), String> {
self._setup(name, patterns)
.map_err(|msg| format!("stream {}: {}", name, msg))
@ -94,17 +98,11 @@ impl Stream {
let _ = child_tx.send(Some(child));
drop(child_tx);
// let mut line: String = "".into();
// while let Ok(nb_chars) = stdout.read_line(&mut line) {
for line in stdout.lines() {
let line = match line {
Ok(line) => line,
Err(_) => break,
};
// if nb_chars == 0 {
// break;
// }
// line.pop(); // remove trailing newline
for filter in self.filters.values() {
if let Some(match_) = filter.get_match(&line) {
@ -117,10 +115,8 @@ impl Stream {
.unwrap();
}
}
// line.clear();
}
info!("stream {} exited", self.name);
error!("stream {} exited", self.name);
}
}