From 6b52b030259a08808b94952c9323ef223aa01f7d Mon Sep 17 00:00:00 2001 From: ppom Date: Tue, 24 Sep 2024 12:00:00 +0200 Subject: [PATCH] fix database deserializing and use BufWriter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Deserializing was failing because Filter::Deserialize and Filter::Serialize had different attributes. Now directly de·serializing (String, String) from/to database Using BufWriter greatly increases performance. Adding flushes where necessary. --- rust/heavy-load.yml | 51 +++----------------------- rust/src/config.rs | 6 ++++ rust/src/database/lowlevel.rs | 67 ++++++++++++++++++++--------------- rust/src/database/mod.rs | 10 +++--- rust/src/database/tests.rs | 2 +- rust/src/execs.rs | 1 + rust/src/filter.rs | 19 +++++----- rust/src/matches.rs | 1 + rust/src/stream.rs | 14 +++----- 9 files changed, 74 insertions(+), 97 deletions(-) diff --git a/rust/heavy-load.yml b/rust/heavy-load.yml index 98a094a..331dacd 100644 --- a/rust/heavy-load.yml +++ b/rust/heavy-load.yml @@ -1,4 +1,6 @@ --- +concurrency: 32 + patterns: num: regex: '[0-9]{3}' @@ -9,12 +11,12 @@ patterns: streams: tailDown1: - cmd: [ 'sh', '-c', 'sleep 2; seq 1000100 | while read i; do echo found $i; done' ] + cmd: [ 'sh', '-c', 'sleep 2; seq 10001 | while read i; do echo found $i; done' ] filters: find: regex: - '^found ' - retry: 480 + retry: 9 retryperiod: 1m actions: damn: @@ -68,48 +70,3 @@ streams: cmd: [ 'sleep', '0.0' ] after: 1m onexit: false - # tailDown2: - # cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo prout $i; done' ] - # filters: - # findIP: - # regex: - # - '^prout ' - # retry: 500 - # retryperiod: 1m - # actions: - # damn: - # cmd: [ 'sleep', '0.0' ] - # undamn: - # cmd: [ 'sleep', '0.0' ] - # after: 1m - # onexit: false - # tailDown3: - # cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo nanana $i; done' ] - # filters: - # findIP: - # regex: - # - '^nanana ' - # retry: 500 - # retryperiod: 2m - # actions: - # damn: - # cmd: [ 'sleep', '0.0' ] - # undamn: - # cmd: [ 'sleep', '0.0' ] - # after: 1m - # onexit: false - # tailDown4: - # cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo nanana $i; done' ] - # filters: - # findIP: - # regex: - # - '^nomatch $' - # retry: 500 - # retryperiod: 2m - # actions: - # damn: - # cmd: [ 'sleep', '0.0' ] - # undamn: - # cmd: [ 'sleep', '0.0' ] - # after: 1m - # onexit: false diff --git a/rust/src/config.rs b/rust/src/config.rs index 0d485d0..4347cd0 100644 --- a/rust/src/config.rs +++ b/rust/src/config.rs @@ -47,6 +47,12 @@ impl Config { .collect() } + pub fn get_filter(&self, name: &(String, String)) -> Option<&Arc> { + self.streams + .get(&name.0) + .and_then(|stream| stream.get_filter(&name.1)) + } + pub fn setup(&mut self) -> Result<(), String> { if self.concurrency == 0 { self.concurrency = num_cpus::get(); diff --git a/rust/src/database/lowlevel.rs b/rust/src/database/lowlevel.rs index 3fe7c8b..174de68 100644 --- a/rust/src/database/lowlevel.rs +++ b/rust/src/database/lowlevel.rs @@ -2,7 +2,7 @@ use std::{ collections::BTreeMap, fmt::Debug, fs::File, - io::{self, BufWriter, Write}, + io::{self, BufReader, BufWriter, Write}, process::exit, sync::Arc, }; @@ -19,20 +19,25 @@ use crate::{ use super::DBError; -type DatabaseWriteHeader = BTreeMap, usize>; -type DatabaseReadHeader = BTreeMap>; +// OPTIM Add a timestamp prefix to the header, to permit having +// shorter timestamps? +// It may permit to win 1-4 bytes per entry, don't know if it's worth it +// FIXME put signature in the header? +type DatabaseHeader = BTreeMap; +type ReadHeader = BTreeMap>; +type WriteHeader = BTreeMap, usize>; const BUFFER_MAX_SIZE: usize = 10 * 1024 * 1024; const DB_SIGNATURE: &str = "reaction-db-v01"; pub struct ReadDB { - f: File, - h: DatabaseReadHeader, + f: BufReader, + h: ReadHeader, buf: Vec, } impl ReadDB { - pub fn open(path: &str) -> Result, DBError> { + pub fn open(path: &str, config: &Arc) -> Result, DBError> { let file = match File::open(path) { Ok(file) => file, Err(err) => match err.kind() { @@ -50,7 +55,7 @@ impl ReadDB { }; let mut ret = ReadDB { - f: file, + f: BufReader::new(file), h: BTreeMap::new(), buf: vec![0; BUFFER_MAX_SIZE], }; @@ -61,10 +66,15 @@ impl ReadDB { Err(err) => Err(DBError::Error(format!("reading database signature: {err}"))), }?; - ret.h = ret - .read() + let db_header = ret + .read::() .map_err(|err| DBError::Error(format!("while reading database header: {err}")))?; + ret.h = db_header + .iter() + .filter_map(|(key, name)| config.get_filter(name).map(|filter| (*key, filter.clone()))) + .collect(); + Ok(Some(ret)) } @@ -82,13 +92,9 @@ impl Iterator for ReadDB { match self.read::() { // FIXME why we got a default item instead of an error or something? // How do we really know we reached the end? - Ok(item) => { - if item.t == 0 { - None - } else { - Some(item.to(&self.h)) - } - } + // For now, checking if time is 0 + Ok(ComputedLogEntry { t: 0, .. }) => None, + Ok(item) => Some(item.to(&self.h)), Err(err) => match err { postcard::Error::DeserializeUnexpectedEnd => None, _ => Some(Err(err.into())), @@ -99,7 +105,7 @@ impl Iterator for ReadDB { pub struct WriteDB { f: BufWriter, - h: DatabaseWriteHeader, + h: WriteHeader, buf: Vec, } @@ -124,20 +130,25 @@ impl WriteDB { exit(1); } - let database_read_header: DatabaseReadHeader = - config.filters().into_iter().enumerate().collect(); + let database_header: DatabaseHeader = config + .filters() + .into_iter() + .map(|f| f.full_name()) + .enumerate() + .collect(); - if let Err(err) = ret._write(&database_read_header) { + if let Err(err) = ret._write(&database_header) { error!("Failed to write to DB: {}", err); exit(1); } - let database_write_header = database_read_header + ret.h = config + .filters() .into_iter() - .map(|(i, name)| (name, i)) + .enumerate() + .map(|(key, filter)| (filter, key)) .collect(); - ret.h = database_write_header; ret } @@ -171,19 +182,19 @@ struct ComputedLogEntry { } impl ComputedLogEntry { - fn from(value: LogEntry, header: &DatabaseWriteHeader) -> Result { - let filter = value.f; - match header.get(&filter) { + #[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used + fn from(value: LogEntry, header: &WriteHeader) -> Result { + match header.get(&value.f) { Some(f) => Ok(ComputedLogEntry { m: value.m, f: *f, t: value.t.timestamp(), exec: value.exec, }), - None => Err(DBError::InvalidFilterError(filter.to_string())), + None => Err(DBError::InvalidFilterError(value.f.to_string())), } } - fn to(self, header: &DatabaseReadHeader) -> Result { + fn to(self, header: &ReadHeader) -> Result { match header.get(&self.f) { Some(f) => Ok(LogEntry { m: self.m, diff --git a/rust/src/database/mod.rs b/rust/src/database/mod.rs index 9db025b..98fdd18 100644 --- a/rust/src/database/mod.rs +++ b/rust/src/database/mod.rs @@ -12,7 +12,6 @@ use std::{ use chrono::{Local, TimeDelta}; use log::{debug, error, info, warn}; -use postcard; use thiserror::Error; use crate::{ @@ -32,6 +31,7 @@ const LOG_DB_NEW_NAME: &str = "./reaction-matches.new.db"; const FLUSH_DB_NAME: &str = "./reaction-flushes.db"; const MAX_WRITES: u32 = 500_000; + #[derive(Error, Debug)] pub enum DBError { #[error("invalid filter: {0}")] @@ -43,6 +43,7 @@ pub enum DBError { #[error("{0}")] Error(String), } + #[derive(Clone)] pub enum DatabaseManagerInput { Log(LogEntry), @@ -135,7 +136,7 @@ fn _rotate_db( config: &Arc, matches_tx: &Option>, ) -> Result<(WriteDB, WriteDB), DBError> { - let mut log_read_db = match ReadDB::open(LOG_DB_NAME)? { + let mut log_read_db = match ReadDB::open(LOG_DB_NAME, config)? { Some(db) => db, None => { return Ok(( @@ -144,14 +145,14 @@ fn _rotate_db( )); } }; - let mut flush_read_db = match ReadDB::open(FLUSH_DB_NAME)? { + let mut flush_read_db = match ReadDB::open(FLUSH_DB_NAME, config)? { Some(db) => db, None => { warn!( "Strange! Found a {} but no {}, opening /dev/null instead", LOG_DB_NAME, FLUSH_DB_NAME ); - match ReadDB::open("/dev/null")? { + match ReadDB::open("/dev/null", config)? { Some(db) => db, None => { return Err(DBError::Error("/dev/null is not accessible".into())); @@ -198,6 +199,7 @@ fn __rotate_db( let mut millisecond_disambiguation_counter: u32 = 0; // Read flushes + #[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used let mut flushes: BTreeMap, BTreeMap> = BTreeMap::new(); for flush_entry in flush_read_db { match flush_entry { diff --git a/rust/src/database/tests.rs b/rust/src/database/tests.rs index 1044533..55a380a 100644 --- a/rust/src/database/tests.rs +++ b/rust/src/database/tests.rs @@ -69,7 +69,7 @@ fn write_and_read_db() { drop(write_db); - let read_db = ReadDB::open(db_path.to_str().unwrap()); + let read_db = ReadDB::open(db_path.to_str().unwrap(), &config); assert!(read_db.is_ok()); let read_db = read_db.unwrap(); diff --git a/rust/src/execs.rs b/rust/src/execs.rs index 510f881..aeb3f0f 100644 --- a/rust/src/execs.rs +++ b/rust/src/execs.rs @@ -96,6 +96,7 @@ pub fn execs_manager( } }; + #[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used let mut execs: ExecsMap = BTreeMap::new(); let timer = MessageTimer::new(exec_tx); diff --git a/rust/src/filter.rs b/rust/src/filter.rs index cb55d7e..6e2fd19 100644 --- a/rust/src/filter.rs +++ b/rust/src/filter.rs @@ -8,7 +8,7 @@ use std::{ use chrono::TimeDelta; use log::info; use regex::Regex; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use crate::{ action::Action, @@ -21,15 +21,13 @@ use crate::{ // Only names are serialized // Only computed fields are not deserialized -#[derive(Clone, Debug, Default, Deserialize, Serialize)] +#[derive(Clone, Debug, Default, Deserialize)] #[serde(deny_unknown_fields)] pub struct Filter { - #[serde(skip_serializing)] actions: BTreeMap>, #[serde(skip)] longuest_action_duration: TimeDelta, - #[serde(skip_serializing)] regex: Vec, #[serde(skip)] compiled_regex: Vec, @@ -38,16 +36,15 @@ pub struct Filter { #[serde(skip)] patterns: Arc>>, - #[serde(skip_serializing)] retry: Option, - #[serde(skip_serializing, rename = "retryperiod")] + #[serde(rename = "retryperiod")] retry_period: Option, #[serde(skip)] retry_duration: Option, - #[serde(skip_deserializing)] + #[serde(skip)] name: String, - #[serde(skip_deserializing)] + #[serde(skip)] stream_name: String, } @@ -61,6 +58,10 @@ impl Filter { } } + pub fn full_name(&self) -> (String, String) { + (self.stream_name.clone(), self.name.clone()) + } + pub fn retry(&self) -> Option { self.retry } @@ -73,6 +74,7 @@ impl Filter { self.longuest_action_duration } + #[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used pub fn patterns(&self) -> &BTreeSet> { &self.patterns } @@ -123,6 +125,7 @@ impl Filter { return Err("no regex configured".into()); } + #[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used let mut new_patterns = BTreeSet::new(); let mut first = true; for regex in &self.regex { diff --git a/rust/src/matches.rs b/rust/src/matches.rs index 5d69de2..8f7b63c 100644 --- a/rust/src/matches.rs +++ b/rust/src/matches.rs @@ -79,6 +79,7 @@ pub fn matches_manager( action_tx: Sender, log_tx: SyncSender, ) { + #[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used let mut matches: MatchesMap = BTreeMap::new(); let timer = MessageTimer::new(match_tx); diff --git a/rust/src/stream.rs b/rust/src/stream.rs index 50bbc03..c7c6dce 100644 --- a/rust/src/stream.rs +++ b/rust/src/stream.rs @@ -29,6 +29,10 @@ impl Stream { &self.filters } + pub fn get_filter(&self, filter_name: &str) -> Option<&Arc> { + self.filters.get(filter_name) + } + pub fn setup(&mut self, name: &str, patterns: &Patterns) -> Result<(), String> { self._setup(name, patterns) .map_err(|msg| format!("stream {}: {}", name, msg)) @@ -94,17 +98,11 @@ impl Stream { let _ = child_tx.send(Some(child)); drop(child_tx); - // let mut line: String = "".into(); - // while let Ok(nb_chars) = stdout.read_line(&mut line) { for line in stdout.lines() { let line = match line { Ok(line) => line, Err(_) => break, }; - // if nb_chars == 0 { - // break; - // } - // line.pop(); // remove trailing newline for filter in self.filters.values() { if let Some(match_) = filter.get_match(&line) { @@ -117,10 +115,8 @@ impl Stream { .unwrap(); } } - - // line.clear(); } - info!("stream {} exited", self.name); + error!("stream {} exited", self.name); } }