diff --git a/rust/src/action.rs b/rust/src/action.rs index f95de7b..184aef9 100644 --- a/rust/src/action.rs +++ b/rust/src/action.rs @@ -161,20 +161,20 @@ pub mod tests { } } - pub fn ok_action() -> Arc { + pub fn ok_action() -> Action { let mut action = default_action(); action.cmd = vec!["command".into()]; - Arc::new(action) + action } - pub fn ok_action_with_after(d: String, name: &str) -> Arc { + pub fn ok_action_with_after(d: String, name: &str) -> Action { let mut action = default_action(); action.cmd = vec!["command".into()]; action.after = Some(d); action .setup("", "", name, Arc::new(BTreeSet::default())) .unwrap(); - Arc::new(action) + action } #[test] @@ -198,11 +198,11 @@ pub mod tests { assert!(action.setup(&name, &name, &name, patterns.clone()).is_err()); // command ok - action = ok_action().as_ref().clone(); + action = ok_action(); assert!(action.setup(&name, &name, &name, patterns.clone()).is_ok()); // command ok - action = ok_action().as_ref().clone(); + action = ok_action(); action.cmd.push("arg1".into()); assert!(action.setup(&name, &name, &name, patterns.clone()).is_ok()); } diff --git a/rust/src/config.rs b/rust/src/config.rs index 4347cd0..38a2b18 100644 --- a/rust/src/config.rs +++ b/rust/src/config.rs @@ -40,14 +40,14 @@ impl Config { self.concurrency } - pub fn filters(&self) -> Vec> { + pub fn filters(&self) -> Vec<&Filter> { self.streams .values() - .flat_map(|stream| stream.filters().values().cloned()) + .flat_map(|stream| stream.filters().values()) .collect() } - pub fn get_filter(&self, name: &(String, String)) -> Option<&Arc> { + pub fn get_filter(&self, name: &(String, String)) -> Option<&Filter> { self.streams .get(&name.0) .and_then(|stream| stream.get_filter(&name.1)) diff --git a/rust/src/daemon.rs b/rust/src/daemon.rs index 399bd22..5339c27 100644 --- a/rust/src/daemon.rs +++ b/rust/src/daemon.rs @@ -26,8 +26,8 @@ pub fn daemon(config_path: &Path, loglevel: Level, socket: &Path) { exit(1); } - let config = match Config::from_file(config_path) { - Ok(config) => Arc::new(config), + let config: &'static Config = match Config::from_file(config_path) { + Ok(config) => Box::leak(Box::new(config)), Err(err) => { error!("{err}"); exit(1); @@ -53,20 +53,17 @@ pub fn daemon(config_path: &Path, loglevel: Level, socket: &Path) { }; let execs_manager_thread_handle = { - let config_execs = config.clone(); let exec_tx_execs = exec_tx.clone(); - thread::spawn(move || execs_manager(config_execs, exec_rx, exec_tx_execs)) + thread::spawn(move || execs_manager(config, exec_rx, exec_tx_execs)) }; let database_manager_thread_handle = { - let config_database = config.clone(); let match_tx_database = match_tx.clone(); // The `thread::spawn` is done in the function, after database rotation is finished - database_manager(config_database, log_rx, match_tx_database) + database_manager(config, log_rx, match_tx_database) }; for stream in config.streams().values() { - let stream = stream.clone(); let match_tx = match_tx.clone(); let (child_tx, child_rx) = sync_channel(0); diff --git a/rust/src/database/lowlevel.rs b/rust/src/database/lowlevel.rs index 174de68..ab916e7 100644 --- a/rust/src/database/lowlevel.rs +++ b/rust/src/database/lowlevel.rs @@ -4,7 +4,6 @@ use std::{ fs::File, io::{self, BufReader, BufWriter, Write}, process::exit, - sync::Arc, }; use chrono::{DateTime, Local}; @@ -24,8 +23,8 @@ use super::DBError; // It may permit to win 1-4 bytes per entry, don't know if it's worth it // FIXME put signature in the header? type DatabaseHeader = BTreeMap; -type ReadHeader = BTreeMap>; -type WriteHeader = BTreeMap, usize>; +type ReadHeader = BTreeMap; +type WriteHeader = BTreeMap<&'static Filter, usize>; const BUFFER_MAX_SIZE: usize = 10 * 1024 * 1024; const DB_SIGNATURE: &str = "reaction-db-v01"; @@ -37,7 +36,7 @@ pub struct ReadDB { } impl ReadDB { - pub fn open(path: &str, config: &Arc) -> Result, DBError> { + pub fn open(path: &str, config: &'static Config) -> Result, DBError> { let file = match File::open(path) { Ok(file) => file, Err(err) => match err.kind() { @@ -72,7 +71,7 @@ impl ReadDB { ret.h = db_header .iter() - .filter_map(|(key, name)| config.get_filter(name).map(|filter| (*key, filter.clone()))) + .filter_map(|(key, name)| config.get_filter(name).map(|filter| (*key, filter))) .collect(); Ok(Some(ret)) @@ -110,7 +109,7 @@ pub struct WriteDB { } impl WriteDB { - pub fn create(path: &str, config: &Arc) -> Self { + pub fn create(path: &str, config: &'static Config) -> Self { let file = match File::create(path) { Ok(file) => file, Err(err) => { @@ -198,7 +197,7 @@ impl ComputedLogEntry { match header.get(&self.f) { Some(f) => Ok(LogEntry { m: self.m, - f: f.clone(), + f, t: DateTime::from_timestamp(self.t, 0) .unwrap() .with_timezone(&Local), diff --git a/rust/src/database/mod.rs b/rust/src/database/mod.rs index 98fdd18..facb880 100644 --- a/rust/src/database/mod.rs +++ b/rust/src/database/mod.rs @@ -3,10 +3,7 @@ use std::{ fmt::Debug, fs, io, process::exit, - sync::{ - mpsc::{Receiver, Sender}, - Arc, - }, + sync::mpsc::{Receiver, Sender}, thread, }; @@ -71,11 +68,11 @@ macro_rules! flush_or_die { /// First rotates the database, then spawns the database thread pub fn database_manager( - config: Arc, + config: &'static Config, log_rx: Receiver, matches_tx: Sender, ) -> thread::JoinHandle<()> { - let (mut log_db, mut flush_db) = match rotate_db(&config, Some(matches_tx)) { + let (mut log_db, mut flush_db) = match rotate_db(config, Some(matches_tx)) { Ok(dbs) => dbs, Err(err) => { error!("while rotating databases on start: {}", err); @@ -97,7 +94,7 @@ pub fn database_manager( flush_or_die!(flush_db); drop(log_db); drop(flush_db); - (log_db, flush_db) = match rotate_db(&config, None) { + (log_db, flush_db) = match rotate_db(config, None) { Ok(dbs) => dbs, Err(err) => { error!( @@ -117,7 +114,7 @@ pub fn database_manager( } fn rotate_db( - config: &Arc, + config: &'static Config, matches_tx: Option>, ) -> Result<(WriteDB, WriteDB), DBError> { info!("Rotating database..."); @@ -133,7 +130,7 @@ fn rotate_db( } fn _rotate_db( - config: &Arc, + config: &'static Config, matches_tx: &Option>, ) -> Result<(WriteDB, WriteDB), DBError> { let mut log_read_db = match ReadDB::open(LOG_DB_NAME, config)? { @@ -200,7 +197,7 @@ fn __rotate_db( // Read flushes #[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used - let mut flushes: BTreeMap, BTreeMap> = BTreeMap::new(); + let mut flushes: BTreeMap<&'static Filter, BTreeMap> = BTreeMap::new(); for flush_entry in flush_read_db { match flush_entry { Ok(entry) => { diff --git a/rust/src/database/tests.rs b/rust/src/database/tests.rs index 55a380a..d673818 100644 --- a/rust/src/database/tests.rs +++ b/rust/src/database/tests.rs @@ -1,7 +1,5 @@ #![cfg(test)] -use std::sync::Arc; - use chrono::Local; use crate::database::ReadDB; @@ -40,36 +38,36 @@ fn write_and_read_db() { ", ); - let config = Arc::new(Config::from_file(&config_file).unwrap()); + let config = Box::leak(Box::new(Config::from_file(&config_file).unwrap())); - let correct_filter_name = Arc::new(Filter::from_name("stream1", "filter1")); + let correct_filter_name = Box::leak(Box::new(Filter::from_name("stream1", "filter1"))); - let incorrect_filter_name = Arc::new(Filter::from_name("stream0", "filter1")); + let incorrect_filter_name = Box::leak(Box::new(Filter::from_name("stream0", "filter1"))); let correct_log_entry = LogEntry { m: vec!["match1".into()], - f: correct_filter_name.clone(), + f: correct_filter_name, t: Local::now(), exec: false, }; let incorrect_log_entry = LogEntry { m: vec!["match1".into()], - f: incorrect_filter_name.clone(), + f: incorrect_filter_name, t: Local::now(), exec: false, }; let db_path = Fixture::empty("matches.db"); - let mut write_db = WriteDB::create(db_path.to_str().unwrap(), &config); + let mut write_db = WriteDB::create(db_path.to_str().unwrap(), config); assert!(write_db.write(correct_log_entry.clone()).is_ok()); assert!(write_db.write(incorrect_log_entry).is_err()); drop(write_db); - let read_db = ReadDB::open(db_path.to_str().unwrap(), &config); + let read_db = ReadDB::open(db_path.to_str().unwrap(), config); assert!(read_db.is_ok()); let read_db = read_db.unwrap(); diff --git a/rust/src/execs.rs b/rust/src/execs.rs index aeb3f0f..da5d881 100644 --- a/rust/src/execs.rs +++ b/rust/src/execs.rs @@ -1,10 +1,7 @@ use std::{ collections::{BTreeMap, BTreeSet}, process::Stdio, - sync::{ - mpsc::{Receiver, Sender}, - Arc, - }, + sync::mpsc::{Receiver, Sender}, }; use chrono::{DateTime, Local}; @@ -27,7 +24,7 @@ pub enum ExecsManagerInput { Stop, } -type ExecsMap = BTreeMap, BTreeMap>>>; +type ExecsMap = BTreeMap<&'static Action, BTreeMap>>>; trait ExecsMapTrait { fn add(&mut self, mat: &MAT); @@ -35,7 +32,7 @@ trait ExecsMapTrait { } impl ExecsMapTrait for ExecsMap { fn add(&mut self, mat: &MAT) { - let inner_map = self.entry(mat.a.clone()).or_default(); + let inner_map = self.entry(mat.a).or_default(); let inner_set = inner_map.entry(mat.m.clone()).or_default(); inner_set.insert(mat.t); } @@ -56,7 +53,7 @@ impl ExecsMapTrait for ExecsMap { } pub fn execs_manager( - config: Arc, + config: &'static Config, exec_rx: Receiver, exec_tx: Sender, ) { @@ -127,7 +124,7 @@ pub fn execs_manager( for _ in inner_set { exec_now(MAT { m: match_.clone(), - a: action.clone(), + a: action, t: Local::now(), }); } diff --git a/rust/src/filter.rs b/rust/src/filter.rs index 6e2fd19..2fb644a 100644 --- a/rust/src/filter.rs +++ b/rust/src/filter.rs @@ -24,7 +24,7 @@ use crate::{ #[derive(Clone, Debug, Default, Deserialize)] #[serde(deny_unknown_fields)] pub struct Filter { - actions: BTreeMap>, + actions: BTreeMap, #[serde(skip)] longuest_action_duration: TimeDelta, @@ -165,13 +165,9 @@ impl Filter { return Err("no actions configured".into()); } - let mut new_actions = BTreeMap::new(); - for (key, action) in &self.actions { - let mut new_action = action.as_ref().clone(); - new_action.setup(stream_name, name, key, self.patterns.clone())?; - new_actions.insert(key.clone(), Arc::new(new_action)); + for (key, action) in &mut self.actions { + action.setup(stream_name, name, key, self.patterns.clone())?; } - self.actions = new_actions; self.longuest_action_duration = self.actions.values().fold(TimeDelta::seconds(0), |acc, v| { @@ -206,11 +202,11 @@ impl Filter { None } - pub fn send_actions(&self, m: &Match, t: Time, tx: &Sender) { + pub fn send_actions(&'static self, m: &Match, t: Time, tx: &Sender) { for action in self.actions.values() { tx.send(ExecsManagerInput::Exec(MAT { m: m.clone(), - a: action.clone(), + a: action, t: t + action.after_duration().unwrap_or_default(), })) .unwrap(); diff --git a/rust/src/matches.rs b/rust/src/matches.rs index 8f7b63c..04e90fa 100644 --- a/rust/src/matches.rs +++ b/rust/src/matches.rs @@ -1,9 +1,6 @@ use std::{ collections::{BTreeMap, BTreeSet}, - sync::{ - mpsc::{Receiver, Sender, SyncSender}, - Arc, - }, + sync::mpsc::{Receiver, Sender, SyncSender}, }; use log::debug; @@ -26,7 +23,7 @@ pub enum MatchManagerInput { Stop, } -type MatchesMap = BTreeMap, BTreeMap>>; +type MatchesMap = BTreeMap<&'static Filter, BTreeMap>>; // This trait is needed to permit to implement methods on an external type trait MatchesMapTrait { @@ -37,7 +34,7 @@ trait MatchesMapTrait { } impl MatchesMapTrait for MatchesMap { fn add(&mut self, mft: &MFT) { - let inner_map = self.entry(mft.f.clone()).or_default(); + let inner_map = self.entry(mft.f).or_default(); let inner_set = inner_map.entry(mft.m.clone()).or_default(); inner_set.insert(mft.t); } diff --git a/rust/src/messages.rs b/rust/src/messages.rs index ed68d29..773087b 100644 --- a/rust/src/messages.rs +++ b/rust/src/messages.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use chrono::{DateTime, Local, TimeDelta}; use crate::{action::Action, filter::Filter}; @@ -10,21 +8,21 @@ pub type Match = Vec; #[derive(Clone)] pub struct MFT { pub m: Match, - pub f: Arc, + pub f: &'static Filter, pub t: Time, } #[derive(Clone)] pub struct MAT { pub m: Match, - pub a: Arc, + pub a: &'static Action, pub t: Time, } #[derive(Clone, Debug)] pub struct LogEntry { pub m: Match, - pub f: Arc, + pub f: &'static Filter, pub t: Time, pub exec: bool, } diff --git a/rust/src/stream.rs b/rust/src/stream.rs index c7c6dce..f0189a1 100644 --- a/rust/src/stream.rs +++ b/rust/src/stream.rs @@ -2,10 +2,7 @@ use std::{ collections::BTreeMap, io::{BufRead, BufReader}, process::{Child, Command, Stdio}, - sync::{ - mpsc::{Sender, SyncSender}, - Arc, - }, + sync::mpsc::{Sender, SyncSender}, }; use chrono::Local; @@ -18,18 +15,18 @@ use crate::{config::Patterns, filter::Filter, matches::MatchManagerInput, messag #[serde(deny_unknown_fields)] pub struct Stream { cmd: Vec, - filters: BTreeMap>, + filters: BTreeMap, #[serde(skip)] name: String, } impl Stream { - pub fn filters(&self) -> &BTreeMap> { + pub fn filters(&self) -> &BTreeMap { &self.filters } - pub fn get_filter(&self, filter_name: &str) -> Option<&Arc> { + pub fn get_filter(&self, filter_name: &str) -> Option<&Filter> { self.filters.get(filter_name) } @@ -59,19 +56,15 @@ impl Stream { return Err("no filters configured".into()); } - let mut new_filters = BTreeMap::new(); - for (key, filter) in &self.filters { - let mut new_filter = filter.as_ref().clone(); - new_filter.setup(name, key, patterns)?; - new_filters.insert(key.clone(), Arc::new(new_filter)); + for (key, filter) in &mut self.filters { + filter.setup(name, key, patterns)?; } - self.filters = new_filters; Ok(()) } pub fn manager( - &self, + &'static self, child_tx: SyncSender>, match_tx: Sender, ) { @@ -109,7 +102,7 @@ impl Stream { match_tx .send(MatchManagerInput::Match(MFT { m: match_, - f: filter.clone(), + f: filter, t: Local::now(), })) .unwrap(); @@ -138,7 +131,7 @@ pub mod tests { stream.cmd = vec!["command".into()]; stream .filters - .insert("name".into(), Arc::new(crate::filter::tests::ok_filter())); + .insert("name".into(), crate::filter::tests::ok_filter()); stream }