Split database lowlevel code; sync_channel(1) for better perf; flush db

This commit is contained in:
ppom 2024-09-22 12:00:00 +02:00
commit 0bac011ab1
6 changed files with 225 additions and 170 deletions

View file

@ -44,7 +44,7 @@ pub fn daemon(config_path: &Path, loglevel: Level, socket: &Path) {
let (match_tx, match_rx) = channel();
let (exec_tx, exec_rx) = channel();
let (log_tx, log_rx) = channel();
let (log_tx, log_rx) = sync_channel(1);
let matches_manager_thread_handle = {
let match_tx_matches = match_tx.clone();
@ -110,8 +110,6 @@ pub fn daemon(config_path: &Path, loglevel: Level, socket: &Path) {
let stop_ok = config.stop();
// TODO flush DB
// TODO remove socket
exit(match !signal_received.load(Ordering::SeqCst) && stop_ok {

View file

@ -0,0 +1,199 @@
use std::{
collections::BTreeMap,
fmt::Debug,
fs::File,
io::{self, BufWriter, Write},
process::exit,
sync::Arc,
};
use chrono::{DateTime, Local};
use log::{debug, error, warn};
use serde::{Deserialize, Serialize};
use crate::{
config::Config,
filter::Filter,
messages::{LogEntry, Match},
};
use super::DBError;
type DatabaseWriteHeader = BTreeMap<Arc<Filter>, usize>;
type DatabaseReadHeader = BTreeMap<usize, Arc<Filter>>;
const BUFFER_MAX_SIZE: usize = 10 * 1024 * 1024;
const DB_SIGNATURE: &str = "reaction-db-v01";
pub struct ReadDB {
f: File,
h: DatabaseReadHeader,
buf: Vec<u8>,
}
impl ReadDB {
pub fn open(path: &str) -> Result<Option<Self>, DBError> {
let file = match File::open(path) {
Ok(file) => file,
Err(err) => match err.kind() {
std::io::ErrorKind::NotFound => {
warn!(
"No DB found at {}. It's ok if this is the first time reaction is running.",
path
);
return Ok(None);
}
_ => {
return Err(DBError::Error(format!("Could not open database: {}", err)));
}
},
};
let mut ret = ReadDB {
f: file,
h: BTreeMap::new(),
buf: vec![0; BUFFER_MAX_SIZE],
};
match ret.read::<&str>() {
Ok(DB_SIGNATURE) => Ok(()),
Ok(_) => Err(DBError::Error("database is not a reaction database".into())),
Err(err) => Err(DBError::Error(format!("reading database signature: {err}"))),
}?;
ret.h = ret
.read()
.map_err(|err| DBError::Error(format!("while reading database header: {err}")))?;
Ok(Some(ret))
}
fn read<'a, T: Deserialize<'a> + Debug>(&'a mut self) -> Result<T, postcard::Error> {
let (decoded, _) = postcard::from_io::<T, _>((&mut self.f, &mut self.buf))?;
debug!("reading this: {:?}", &decoded);
Ok(decoded)
}
}
impl Iterator for ReadDB {
type Item = Result<LogEntry, DBError>;
fn next(&mut self) -> Option<Self::Item> {
match self.read::<ComputedLogEntry>() {
// FIXME why we got a default item instead of an error or something?
// How do we really know we reached the end?
Ok(item) => {
if item.t == 0 {
None
} else {
Some(item.to(&self.h))
}
}
Err(err) => match err {
postcard::Error::DeserializeUnexpectedEnd => None,
_ => Some(Err(err.into())),
},
}
}
}
pub struct WriteDB {
f: BufWriter<File>,
h: DatabaseWriteHeader,
buf: Vec<u8>,
}
impl WriteDB {
pub fn create(path: &str, config: &Arc<Config>) -> Self {
let file = match File::create(path) {
Ok(file) => file,
Err(err) => {
error!("Failed to create DB: {}", err);
exit(1);
}
};
let mut ret = WriteDB {
f: BufWriter::new(file),
h: BTreeMap::new(),
buf: vec![0; BUFFER_MAX_SIZE],
};
if let Err(err) = ret._write(DB_SIGNATURE) {
error!("Failed to write to DB: {}", err);
exit(1);
}
let database_read_header: DatabaseReadHeader =
config.filters().into_iter().enumerate().collect();
if let Err(err) = ret._write(&database_read_header) {
error!("Failed to write to DB: {}", err);
exit(1);
}
let database_write_header = database_read_header
.into_iter()
.map(|(i, name)| (name, i))
.collect();
ret.h = database_write_header;
ret
}
pub fn write(&mut self, entry: LogEntry) -> Result<(), DBError> {
let computed = ComputedLogEntry::from(entry, &self.h)?;
self._write(computed)
}
fn _write<T: Serialize + std::fmt::Debug>(&mut self, data: T) -> Result<(), DBError> {
let encoded = postcard::to_slice(&data, &mut self.buf)?;
debug!("writing this: {:?}, {:?}", &data, &encoded);
self.f.write_all(encoded)?;
// clear
// for i in 0..self.buf.len() {
// self.buf[i] = 0;
// }
Ok(())
}
pub fn flush(&mut self) -> io::Result<()> {
self.f.flush()
}
}
#[derive(Debug, Serialize, Deserialize)]
struct ComputedLogEntry {
pub m: Match,
pub f: usize,
pub t: i64,
pub exec: bool,
}
impl ComputedLogEntry {
fn from(value: LogEntry, header: &DatabaseWriteHeader) -> Result<Self, DBError> {
let filter = value.f;
match header.get(&filter) {
Some(f) => Ok(ComputedLogEntry {
m: value.m,
f: *f,
t: value.t.timestamp(),
exec: value.exec,
}),
None => Err(DBError::InvalidFilterError(filter.to_string())),
}
}
fn to(self, header: &DatabaseReadHeader) -> Result<LogEntry, DBError> {
match header.get(&self.f) {
Some(f) => Ok(LogEntry {
m: self.m,
f: f.clone(),
t: DateTime::from_timestamp(self.t, 0)
.unwrap()
.with_timezone(&Local),
exec: self.exec,
}),
None => Err(DBError::InvalidFilterError(self.f.to_string())),
}
}
}

View file

@ -1,7 +1,7 @@
use std::{
collections::{BTreeMap, HashMap},
fs::{self, File},
io::{self, Write},
fmt::Debug,
fs, io,
process::exit,
sync::{
mpsc::{Receiver, Sender},
@ -10,10 +10,9 @@ use std::{
thread,
};
use chrono::{DateTime, Local, TimeDelta};
use chrono::{Local, TimeDelta};
use log::{debug, error, info, warn};
use postcard::{from_io, Error};
use serde::{Deserialize, Serialize};
use postcard;
use thiserror::Error;
use crate::{
@ -23,131 +22,18 @@ use crate::{
messages::{LogEntry, Match, Time},
};
mod lowlevel;
mod tests;
use lowlevel::{ReadDB, WriteDB};
const LOG_DB_NAME: &str = "./reaction-matches.db";
const LOG_DB_NEW_NAME: &str = "./reaction-matches.new.db";
const FLUSH_DB_NAME: &str = "./reaction-flushes.db";
const MAX_WRITES: u32 = 500_000;
const BUFFER_MAX_SIZE: usize = 10 * 1024 * 1024;
struct ReadDB {
f: File,
h: DatabaseReadHeader,
}
impl ReadDB {
fn open(path: &str) -> Result<Option<Self>, DBError> {
let mut file = match File::open(path) {
Ok(file) => file,
Err(err) => match err.kind() {
std::io::ErrorKind::NotFound => {
warn!(
"No DB found at {}. It's ok if this is the first time reaction is running.",
path
);
return Ok(None);
}
_ => {
return Err(DBError::Error(format!("Could not open database: {}", err)));
}
},
};
let mut buf: Vec<u8> = vec![0; BUFFER_MAX_SIZE];
let (database_header, _) = from_io((&mut file, &mut buf))?;
Ok(Some(ReadDB {
f: file,
h: database_header,
}))
}
}
impl Iterator for ReadDB {
type Item = Result<LogEntry, DBError>;
fn next(&mut self) -> Option<Self::Item> {
let mut buf: Vec<u8> = vec![0; BUFFER_MAX_SIZE];
let result = from_io::<ComputedLogEntry, _>((&mut self.f, &mut buf));
match result {
// FIXME why we got a default item instead of an error or something?
// How do we really know we reached the end?
Ok((item, _)) => {
if item.t == 0 {
None
} else {
Some(item.to(&self.h))
}
}
Err(err) => match err {
Error::DeserializeUnexpectedEnd => None,
_ => Some(Err(err.into())),
},
}
}
}
struct WriteDB {
f: File,
h: DatabaseWriteHeader,
buf: Vec<u8>,
}
impl WriteDB {
fn create(path: &str, config: &Arc<Config>) -> Self {
let file = match File::create(path) {
Ok(file) => file,
Err(err) => {
error!("Failed to create DB: {}", err);
exit(1);
}
};
let mut ret = WriteDB {
f: file,
h: BTreeMap::new(),
buf: vec![0; BUFFER_MAX_SIZE],
};
let database_read_header: DatabaseReadHeader =
config.filters().into_iter().enumerate().collect();
if let Err(err) = ret._write(&database_read_header) {
error!("Failed to write to DB: {}", err);
exit(1);
}
let database_write_header = database_read_header
.into_iter()
.map(|(i, name)| (name, i))
.collect();
ret.h = database_write_header;
ret
}
fn write(&mut self, entry: LogEntry) -> Result<(), DBError> {
let computed = ComputedLogEntry::from(entry, &self.h)?;
self._write(computed)
}
fn _write<T: Serialize + std::fmt::Debug>(&mut self, data: T) -> Result<(), DBError> {
let encoded = postcard::to_slice(&data, &mut self.buf)?;
debug!("writing this: {:?}, {:?}", &data, &encoded);
self.f.write_all(encoded)?;
// clear
for i in 0..self.buf.len() {
self.buf[i] = 0;
}
Ok(())
}
}
#[derive(Error, Debug)]
enum DBError {
pub enum DBError {
#[error("invalid filter: {0}")]
InvalidFilterError(String),
#[error("decode error: {0}")]
@ -157,46 +43,6 @@ enum DBError {
#[error("{0}")]
Error(String),
}
type DatabaseWriteHeader = BTreeMap<Arc<Filter>, usize>;
type DatabaseReadHeader = BTreeMap<usize, Arc<Filter>>;
#[derive(Debug, Serialize, Deserialize)]
struct ComputedLogEntry {
pub m: Match,
pub f: usize,
pub t: i64,
pub exec: bool,
}
impl ComputedLogEntry {
fn from(value: LogEntry, header: &DatabaseWriteHeader) -> Result<Self, DBError> {
let filter = value.f;
match header.get(&filter) {
Some(f) => Ok(ComputedLogEntry {
m: value.m,
f: *f,
t: value.t.timestamp(),
exec: value.exec,
}),
None => Err(DBError::InvalidFilterError(filter.to_string())),
}
}
fn to(self, header: &DatabaseReadHeader) -> Result<LogEntry, DBError> {
match header.get(&self.f) {
Some(f) => Ok(LogEntry {
m: self.m,
f: f.clone(),
t: DateTime::from_timestamp(self.t, 0)
.unwrap()
.with_timezone(&Local),
exec: self.exec,
}),
None => Err(DBError::InvalidFilterError(self.f.to_string())),
}
}
}
#[derive(Clone)]
pub enum DatabaseManagerInput {
Log(LogEntry),
@ -213,6 +59,14 @@ macro_rules! write_or_die {
}
};
}
macro_rules! flush_or_die {
($db:expr) => {
if let Err(err) = $db.flush() {
error!("Could not write to DB: {}", err);
exit(1);
}
};
}
/// First rotates the database, then spawns the database thread
pub fn database_manager(
@ -238,6 +92,8 @@ pub fn database_manager(
cpt += 1;
if cpt == MAX_WRITES {
cpt = 0;
flush_or_die!(log_db);
flush_or_die!(flush_db);
drop(log_db);
drop(flush_db);
(log_db, flush_db) = match rotate_db(&config, None) {
@ -254,6 +110,8 @@ pub fn database_manager(
}
};
}
flush_or_die!(log_db);
flush_or_die!(flush_db);
})
}
@ -272,6 +130,7 @@ fn rotate_db(
info!("Rotated database");
res
}
fn _rotate_db(
config: &Arc<Config>,
matches_tx: &Option<Sender<MatchManagerInput>>,

View file

@ -337,9 +337,7 @@ pub mod tests {
// duration 120
filter = ok_filter();
filter
.actions
.insert(two_minutes_str, two_minutes_action);
filter.actions.insert(two_minutes_str, two_minutes_action);
filter.actions.insert(minute_str, minute_action);
filter.setup(&name, &name, &empty_patterns).unwrap();
assert_eq!(filter.longuest_action_duration, two_minutes);

View file

@ -1,7 +1,7 @@
use std::{
collections::{BTreeMap, BTreeSet},
sync::{
mpsc::{Receiver, Sender},
mpsc::{Receiver, Sender, SyncSender},
Arc,
},
};
@ -77,7 +77,7 @@ pub fn matches_manager(
match_rx: Receiver<MatchManagerInput>,
match_tx: Sender<MatchManagerInput>,
action_tx: Sender<ExecsManagerInput>,
log_tx: Sender<DatabaseManagerInput>,
log_tx: SyncSender<DatabaseManagerInput>,
) {
let mut matches: MatchesMap = BTreeMap::new();

View file

@ -131,6 +131,7 @@ impl fmt::Display for Format {
// Structs
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct NamedRegex {
pub regex: Regex,