mirror of
https://framagit.org/ppom/reaction
synced 2026-03-14 12:45:47 +01:00
Box::leak global Config into a &'static ref instead of Arcs everywhere
This commit is contained in:
parent
feb863670e
commit
d30d03bae8
11 changed files with 58 additions and 86 deletions
|
|
@ -161,20 +161,20 @@ pub mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn ok_action() -> Arc<Action> {
|
||||
pub fn ok_action() -> Action {
|
||||
let mut action = default_action();
|
||||
action.cmd = vec!["command".into()];
|
||||
Arc::new(action)
|
||||
action
|
||||
}
|
||||
|
||||
pub fn ok_action_with_after(d: String, name: &str) -> Arc<Action> {
|
||||
pub fn ok_action_with_after(d: String, name: &str) -> Action {
|
||||
let mut action = default_action();
|
||||
action.cmd = vec!["command".into()];
|
||||
action.after = Some(d);
|
||||
action
|
||||
.setup("", "", name, Arc::new(BTreeSet::default()))
|
||||
.unwrap();
|
||||
Arc::new(action)
|
||||
action
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -198,11 +198,11 @@ pub mod tests {
|
|||
assert!(action.setup(&name, &name, &name, patterns.clone()).is_err());
|
||||
|
||||
// command ok
|
||||
action = ok_action().as_ref().clone();
|
||||
action = ok_action();
|
||||
assert!(action.setup(&name, &name, &name, patterns.clone()).is_ok());
|
||||
|
||||
// command ok
|
||||
action = ok_action().as_ref().clone();
|
||||
action = ok_action();
|
||||
action.cmd.push("arg1".into());
|
||||
assert!(action.setup(&name, &name, &name, patterns.clone()).is_ok());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,14 +40,14 @@ impl Config {
|
|||
self.concurrency
|
||||
}
|
||||
|
||||
pub fn filters(&self) -> Vec<Arc<Filter>> {
|
||||
pub fn filters(&self) -> Vec<&Filter> {
|
||||
self.streams
|
||||
.values()
|
||||
.flat_map(|stream| stream.filters().values().cloned())
|
||||
.flat_map(|stream| stream.filters().values())
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn get_filter(&self, name: &(String, String)) -> Option<&Arc<Filter>> {
|
||||
pub fn get_filter(&self, name: &(String, String)) -> Option<&Filter> {
|
||||
self.streams
|
||||
.get(&name.0)
|
||||
.and_then(|stream| stream.get_filter(&name.1))
|
||||
|
|
|
|||
|
|
@ -26,8 +26,8 @@ pub fn daemon(config_path: &Path, loglevel: Level, socket: &Path) {
|
|||
exit(1);
|
||||
}
|
||||
|
||||
let config = match Config::from_file(config_path) {
|
||||
Ok(config) => Arc::new(config),
|
||||
let config: &'static Config = match Config::from_file(config_path) {
|
||||
Ok(config) => Box::leak(Box::new(config)),
|
||||
Err(err) => {
|
||||
error!("{err}");
|
||||
exit(1);
|
||||
|
|
@ -53,20 +53,17 @@ pub fn daemon(config_path: &Path, loglevel: Level, socket: &Path) {
|
|||
};
|
||||
|
||||
let execs_manager_thread_handle = {
|
||||
let config_execs = config.clone();
|
||||
let exec_tx_execs = exec_tx.clone();
|
||||
thread::spawn(move || execs_manager(config_execs, exec_rx, exec_tx_execs))
|
||||
thread::spawn(move || execs_manager(config, exec_rx, exec_tx_execs))
|
||||
};
|
||||
|
||||
let database_manager_thread_handle = {
|
||||
let config_database = config.clone();
|
||||
let match_tx_database = match_tx.clone();
|
||||
// The `thread::spawn` is done in the function, after database rotation is finished
|
||||
database_manager(config_database, log_rx, match_tx_database)
|
||||
database_manager(config, log_rx, match_tx_database)
|
||||
};
|
||||
|
||||
for stream in config.streams().values() {
|
||||
let stream = stream.clone();
|
||||
let match_tx = match_tx.clone();
|
||||
let (child_tx, child_rx) = sync_channel(0);
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ use std::{
|
|||
fs::File,
|
||||
io::{self, BufReader, BufWriter, Write},
|
||||
process::exit,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use chrono::{DateTime, Local};
|
||||
|
|
@ -24,8 +23,8 @@ use super::DBError;
|
|||
// It may permit to win 1-4 bytes per entry, don't know if it's worth it
|
||||
// FIXME put signature in the header?
|
||||
type DatabaseHeader = BTreeMap<usize, (String, String)>;
|
||||
type ReadHeader = BTreeMap<usize, Arc<Filter>>;
|
||||
type WriteHeader = BTreeMap<Arc<Filter>, usize>;
|
||||
type ReadHeader = BTreeMap<usize, &'static Filter>;
|
||||
type WriteHeader = BTreeMap<&'static Filter, usize>;
|
||||
|
||||
const BUFFER_MAX_SIZE: usize = 10 * 1024 * 1024;
|
||||
const DB_SIGNATURE: &str = "reaction-db-v01";
|
||||
|
|
@ -37,7 +36,7 @@ pub struct ReadDB {
|
|||
}
|
||||
|
||||
impl ReadDB {
|
||||
pub fn open(path: &str, config: &Arc<Config>) -> Result<Option<Self>, DBError> {
|
||||
pub fn open(path: &str, config: &'static Config) -> Result<Option<Self>, DBError> {
|
||||
let file = match File::open(path) {
|
||||
Ok(file) => file,
|
||||
Err(err) => match err.kind() {
|
||||
|
|
@ -72,7 +71,7 @@ impl ReadDB {
|
|||
|
||||
ret.h = db_header
|
||||
.iter()
|
||||
.filter_map(|(key, name)| config.get_filter(name).map(|filter| (*key, filter.clone())))
|
||||
.filter_map(|(key, name)| config.get_filter(name).map(|filter| (*key, filter)))
|
||||
.collect();
|
||||
|
||||
Ok(Some(ret))
|
||||
|
|
@ -110,7 +109,7 @@ pub struct WriteDB {
|
|||
}
|
||||
|
||||
impl WriteDB {
|
||||
pub fn create(path: &str, config: &Arc<Config>) -> Self {
|
||||
pub fn create(path: &str, config: &'static Config) -> Self {
|
||||
let file = match File::create(path) {
|
||||
Ok(file) => file,
|
||||
Err(err) => {
|
||||
|
|
@ -198,7 +197,7 @@ impl ComputedLogEntry {
|
|||
match header.get(&self.f) {
|
||||
Some(f) => Ok(LogEntry {
|
||||
m: self.m,
|
||||
f: f.clone(),
|
||||
f,
|
||||
t: DateTime::from_timestamp(self.t, 0)
|
||||
.unwrap()
|
||||
.with_timezone(&Local),
|
||||
|
|
|
|||
|
|
@ -3,10 +3,7 @@ use std::{
|
|||
fmt::Debug,
|
||||
fs, io,
|
||||
process::exit,
|
||||
sync::{
|
||||
mpsc::{Receiver, Sender},
|
||||
Arc,
|
||||
},
|
||||
sync::mpsc::{Receiver, Sender},
|
||||
thread,
|
||||
};
|
||||
|
||||
|
|
@ -71,11 +68,11 @@ macro_rules! flush_or_die {
|
|||
|
||||
/// First rotates the database, then spawns the database thread
|
||||
pub fn database_manager(
|
||||
config: Arc<Config>,
|
||||
config: &'static Config,
|
||||
log_rx: Receiver<DatabaseManagerInput>,
|
||||
matches_tx: Sender<MatchManagerInput>,
|
||||
) -> thread::JoinHandle<()> {
|
||||
let (mut log_db, mut flush_db) = match rotate_db(&config, Some(matches_tx)) {
|
||||
let (mut log_db, mut flush_db) = match rotate_db(config, Some(matches_tx)) {
|
||||
Ok(dbs) => dbs,
|
||||
Err(err) => {
|
||||
error!("while rotating databases on start: {}", err);
|
||||
|
|
@ -97,7 +94,7 @@ pub fn database_manager(
|
|||
flush_or_die!(flush_db);
|
||||
drop(log_db);
|
||||
drop(flush_db);
|
||||
(log_db, flush_db) = match rotate_db(&config, None) {
|
||||
(log_db, flush_db) = match rotate_db(config, None) {
|
||||
Ok(dbs) => dbs,
|
||||
Err(err) => {
|
||||
error!(
|
||||
|
|
@ -117,7 +114,7 @@ pub fn database_manager(
|
|||
}
|
||||
|
||||
fn rotate_db(
|
||||
config: &Arc<Config>,
|
||||
config: &'static Config,
|
||||
matches_tx: Option<Sender<MatchManagerInput>>,
|
||||
) -> Result<(WriteDB, WriteDB), DBError> {
|
||||
info!("Rotating database...");
|
||||
|
|
@ -133,7 +130,7 @@ fn rotate_db(
|
|||
}
|
||||
|
||||
fn _rotate_db(
|
||||
config: &Arc<Config>,
|
||||
config: &'static Config,
|
||||
matches_tx: &Option<Sender<MatchManagerInput>>,
|
||||
) -> Result<(WriteDB, WriteDB), DBError> {
|
||||
let mut log_read_db = match ReadDB::open(LOG_DB_NAME, config)? {
|
||||
|
|
@ -200,7 +197,7 @@ fn __rotate_db(
|
|||
|
||||
// Read flushes
|
||||
#[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used
|
||||
let mut flushes: BTreeMap<Arc<Filter>, BTreeMap<Match, Time>> = BTreeMap::new();
|
||||
let mut flushes: BTreeMap<&'static Filter, BTreeMap<Match, Time>> = BTreeMap::new();
|
||||
for flush_entry in flush_read_db {
|
||||
match flush_entry {
|
||||
Ok(entry) => {
|
||||
|
|
|
|||
|
|
@ -1,7 +1,5 @@
|
|||
#![cfg(test)]
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use chrono::Local;
|
||||
|
||||
use crate::database::ReadDB;
|
||||
|
|
@ -40,36 +38,36 @@ fn write_and_read_db() {
|
|||
",
|
||||
);
|
||||
|
||||
let config = Arc::new(Config::from_file(&config_file).unwrap());
|
||||
let config = Box::leak(Box::new(Config::from_file(&config_file).unwrap()));
|
||||
|
||||
let correct_filter_name = Arc::new(Filter::from_name("stream1", "filter1"));
|
||||
let correct_filter_name = Box::leak(Box::new(Filter::from_name("stream1", "filter1")));
|
||||
|
||||
let incorrect_filter_name = Arc::new(Filter::from_name("stream0", "filter1"));
|
||||
let incorrect_filter_name = Box::leak(Box::new(Filter::from_name("stream0", "filter1")));
|
||||
|
||||
let correct_log_entry = LogEntry {
|
||||
m: vec!["match1".into()],
|
||||
f: correct_filter_name.clone(),
|
||||
f: correct_filter_name,
|
||||
t: Local::now(),
|
||||
exec: false,
|
||||
};
|
||||
|
||||
let incorrect_log_entry = LogEntry {
|
||||
m: vec!["match1".into()],
|
||||
f: incorrect_filter_name.clone(),
|
||||
f: incorrect_filter_name,
|
||||
t: Local::now(),
|
||||
exec: false,
|
||||
};
|
||||
|
||||
let db_path = Fixture::empty("matches.db");
|
||||
|
||||
let mut write_db = WriteDB::create(db_path.to_str().unwrap(), &config);
|
||||
let mut write_db = WriteDB::create(db_path.to_str().unwrap(), config);
|
||||
|
||||
assert!(write_db.write(correct_log_entry.clone()).is_ok());
|
||||
assert!(write_db.write(incorrect_log_entry).is_err());
|
||||
|
||||
drop(write_db);
|
||||
|
||||
let read_db = ReadDB::open(db_path.to_str().unwrap(), &config);
|
||||
let read_db = ReadDB::open(db_path.to_str().unwrap(), config);
|
||||
|
||||
assert!(read_db.is_ok());
|
||||
let read_db = read_db.unwrap();
|
||||
|
|
|
|||
|
|
@ -1,10 +1,7 @@
|
|||
use std::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
process::Stdio,
|
||||
sync::{
|
||||
mpsc::{Receiver, Sender},
|
||||
Arc,
|
||||
},
|
||||
sync::mpsc::{Receiver, Sender},
|
||||
};
|
||||
|
||||
use chrono::{DateTime, Local};
|
||||
|
|
@ -27,7 +24,7 @@ pub enum ExecsManagerInput {
|
|||
Stop,
|
||||
}
|
||||
|
||||
type ExecsMap = BTreeMap<Arc<Action>, BTreeMap<Match, BTreeSet<DateTime<Local>>>>;
|
||||
type ExecsMap = BTreeMap<&'static Action, BTreeMap<Match, BTreeSet<DateTime<Local>>>>;
|
||||
|
||||
trait ExecsMapTrait {
|
||||
fn add(&mut self, mat: &MAT);
|
||||
|
|
@ -35,7 +32,7 @@ trait ExecsMapTrait {
|
|||
}
|
||||
impl ExecsMapTrait for ExecsMap {
|
||||
fn add(&mut self, mat: &MAT) {
|
||||
let inner_map = self.entry(mat.a.clone()).or_default();
|
||||
let inner_map = self.entry(mat.a).or_default();
|
||||
let inner_set = inner_map.entry(mat.m.clone()).or_default();
|
||||
inner_set.insert(mat.t);
|
||||
}
|
||||
|
|
@ -56,7 +53,7 @@ impl ExecsMapTrait for ExecsMap {
|
|||
}
|
||||
|
||||
pub fn execs_manager(
|
||||
config: Arc<Config>,
|
||||
config: &'static Config,
|
||||
exec_rx: Receiver<ExecsManagerInput>,
|
||||
exec_tx: Sender<ExecsManagerInput>,
|
||||
) {
|
||||
|
|
@ -127,7 +124,7 @@ pub fn execs_manager(
|
|||
for _ in inner_set {
|
||||
exec_now(MAT {
|
||||
m: match_.clone(),
|
||||
a: action.clone(),
|
||||
a: action,
|
||||
t: Local::now(),
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ use crate::{
|
|||
#[derive(Clone, Debug, Default, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct Filter {
|
||||
actions: BTreeMap<String, Arc<Action>>,
|
||||
actions: BTreeMap<String, Action>,
|
||||
#[serde(skip)]
|
||||
longuest_action_duration: TimeDelta,
|
||||
|
||||
|
|
@ -165,13 +165,9 @@ impl Filter {
|
|||
return Err("no actions configured".into());
|
||||
}
|
||||
|
||||
let mut new_actions = BTreeMap::new();
|
||||
for (key, action) in &self.actions {
|
||||
let mut new_action = action.as_ref().clone();
|
||||
new_action.setup(stream_name, name, key, self.patterns.clone())?;
|
||||
new_actions.insert(key.clone(), Arc::new(new_action));
|
||||
for (key, action) in &mut self.actions {
|
||||
action.setup(stream_name, name, key, self.patterns.clone())?;
|
||||
}
|
||||
self.actions = new_actions;
|
||||
|
||||
self.longuest_action_duration =
|
||||
self.actions.values().fold(TimeDelta::seconds(0), |acc, v| {
|
||||
|
|
@ -206,11 +202,11 @@ impl Filter {
|
|||
None
|
||||
}
|
||||
|
||||
pub fn send_actions(&self, m: &Match, t: Time, tx: &Sender<ExecsManagerInput>) {
|
||||
pub fn send_actions(&'static self, m: &Match, t: Time, tx: &Sender<ExecsManagerInput>) {
|
||||
for action in self.actions.values() {
|
||||
tx.send(ExecsManagerInput::Exec(MAT {
|
||||
m: m.clone(),
|
||||
a: action.clone(),
|
||||
a: action,
|
||||
t: t + action.after_duration().unwrap_or_default(),
|
||||
}))
|
||||
.unwrap();
|
||||
|
|
|
|||
|
|
@ -1,9 +1,6 @@
|
|||
use std::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
sync::{
|
||||
mpsc::{Receiver, Sender, SyncSender},
|
||||
Arc,
|
||||
},
|
||||
sync::mpsc::{Receiver, Sender, SyncSender},
|
||||
};
|
||||
|
||||
use log::debug;
|
||||
|
|
@ -26,7 +23,7 @@ pub enum MatchManagerInput {
|
|||
Stop,
|
||||
}
|
||||
|
||||
type MatchesMap = BTreeMap<Arc<Filter>, BTreeMap<Match, BTreeSet<Time>>>;
|
||||
type MatchesMap = BTreeMap<&'static Filter, BTreeMap<Match, BTreeSet<Time>>>;
|
||||
|
||||
// This trait is needed to permit to implement methods on an external type
|
||||
trait MatchesMapTrait {
|
||||
|
|
@ -37,7 +34,7 @@ trait MatchesMapTrait {
|
|||
}
|
||||
impl MatchesMapTrait for MatchesMap {
|
||||
fn add(&mut self, mft: &MFT) {
|
||||
let inner_map = self.entry(mft.f.clone()).or_default();
|
||||
let inner_map = self.entry(mft.f).or_default();
|
||||
let inner_set = inner_map.entry(mft.m.clone()).or_default();
|
||||
inner_set.insert(mft.t);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,3 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use chrono::{DateTime, Local, TimeDelta};
|
||||
|
||||
use crate::{action::Action, filter::Filter};
|
||||
|
|
@ -10,21 +8,21 @@ pub type Match = Vec<String>;
|
|||
#[derive(Clone)]
|
||||
pub struct MFT {
|
||||
pub m: Match,
|
||||
pub f: Arc<Filter>,
|
||||
pub f: &'static Filter,
|
||||
pub t: Time,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MAT {
|
||||
pub m: Match,
|
||||
pub a: Arc<Action>,
|
||||
pub a: &'static Action,
|
||||
pub t: Time,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LogEntry {
|
||||
pub m: Match,
|
||||
pub f: Arc<Filter>,
|
||||
pub f: &'static Filter,
|
||||
pub t: Time,
|
||||
pub exec: bool,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,10 +2,7 @@ use std::{
|
|||
collections::BTreeMap,
|
||||
io::{BufRead, BufReader},
|
||||
process::{Child, Command, Stdio},
|
||||
sync::{
|
||||
mpsc::{Sender, SyncSender},
|
||||
Arc,
|
||||
},
|
||||
sync::mpsc::{Sender, SyncSender},
|
||||
};
|
||||
|
||||
use chrono::Local;
|
||||
|
|
@ -18,18 +15,18 @@ use crate::{config::Patterns, filter::Filter, matches::MatchManagerInput, messag
|
|||
#[serde(deny_unknown_fields)]
|
||||
pub struct Stream {
|
||||
cmd: Vec<String>,
|
||||
filters: BTreeMap<String, Arc<Filter>>,
|
||||
filters: BTreeMap<String, Filter>,
|
||||
|
||||
#[serde(skip)]
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl Stream {
|
||||
pub fn filters(&self) -> &BTreeMap<String, Arc<Filter>> {
|
||||
pub fn filters(&self) -> &BTreeMap<String, Filter> {
|
||||
&self.filters
|
||||
}
|
||||
|
||||
pub fn get_filter(&self, filter_name: &str) -> Option<&Arc<Filter>> {
|
||||
pub fn get_filter(&self, filter_name: &str) -> Option<&Filter> {
|
||||
self.filters.get(filter_name)
|
||||
}
|
||||
|
||||
|
|
@ -59,19 +56,15 @@ impl Stream {
|
|||
return Err("no filters configured".into());
|
||||
}
|
||||
|
||||
let mut new_filters = BTreeMap::new();
|
||||
for (key, filter) in &self.filters {
|
||||
let mut new_filter = filter.as_ref().clone();
|
||||
new_filter.setup(name, key, patterns)?;
|
||||
new_filters.insert(key.clone(), Arc::new(new_filter));
|
||||
for (key, filter) in &mut self.filters {
|
||||
filter.setup(name, key, patterns)?;
|
||||
}
|
||||
self.filters = new_filters;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn manager(
|
||||
&self,
|
||||
&'static self,
|
||||
child_tx: SyncSender<Option<Child>>,
|
||||
match_tx: Sender<MatchManagerInput>,
|
||||
) {
|
||||
|
|
@ -109,7 +102,7 @@ impl Stream {
|
|||
match_tx
|
||||
.send(MatchManagerInput::Match(MFT {
|
||||
m: match_,
|
||||
f: filter.clone(),
|
||||
f: filter,
|
||||
t: Local::now(),
|
||||
}))
|
||||
.unwrap();
|
||||
|
|
@ -138,7 +131,7 @@ pub mod tests {
|
|||
stream.cmd = vec!["command".into()];
|
||||
stream
|
||||
.filters
|
||||
.insert("name".into(), Arc::new(crate::filter::tests::ok_filter()));
|
||||
.insert("name".into(), crate::filter::tests::ok_filter());
|
||||
stream
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue