diff --git a/rust/src/concepts/action.rs b/rust/src/concepts/action.rs index 1e69f42..ec8ea63 100644 --- a/rust/src/concepts/action.rs +++ b/rust/src/concepts/action.rs @@ -43,6 +43,14 @@ impl Action { self.on_exit } + pub fn full_name(&self) -> (String, String, String) { + ( + self.stream_name.clone(), + self.filter_name.clone(), + self.name.clone(), + ) + } + pub fn setup( &mut self, stream_name: &str, diff --git a/rust/src/concepts/filter.rs b/rust/src/concepts/filter.rs index a7e3257..60dc99c 100644 --- a/rust/src/concepts/filter.rs +++ b/rust/src/concepts/filter.rs @@ -71,7 +71,6 @@ impl Filter { self.longuest_action_duration } - #[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used pub fn patterns(&self) -> &BTreeSet> { &self.patterns } @@ -122,7 +121,6 @@ impl Filter { return Err("no regex configured".into()); } - #[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used let mut new_patterns = BTreeSet::new(); let mut first = true; for regex in &self.regex { @@ -204,14 +202,25 @@ impl Filter { None } - pub fn send_actions(&'static self, m: &Match, t: Time, tx: &Sender) { + pub fn send_actions( + &'static self, + m: &Match, + t: Time, + tx: &Sender, + exec: bool, + ) { for action in self.actions.values() { - #[allow(clippy::unwrap_used)] // propagating panics is ok - tx.send(ExecsManagerInput::Exec(MAT { + let mat = MAT { m: m.clone(), a: action, t: t + action.after_duration().unwrap_or_default(), - })) + }; + #[allow(clippy::unwrap_used)] // propagating panics is ok + tx.send(if exec { + ExecsManagerInput::Exec(mat) + } else { + ExecsManagerInput::Flush(mat) + }) .unwrap(); } } diff --git a/rust/src/concepts/mod.rs b/rust/src/concepts/mod.rs index 4c237d3..e51511c 100644 --- a/rust/src/concepts/mod.rs +++ b/rust/src/concepts/mod.rs @@ -3,6 +3,7 @@ mod config; mod filter; mod messages; mod pattern; +mod socket_messages; mod stream; pub use action::*; @@ -10,4 +11,5 @@ pub use config::*; pub use filter::*; pub use messages::*; pub use pattern::*; +pub use socket_messages::*; pub use stream::*; diff --git a/rust/src/concepts/socket_messages.rs b/rust/src/concepts/socket_messages.rs new file mode 100644 index 0000000..c4be743 --- /dev/null +++ b/rust/src/concepts/socket_messages.rs @@ -0,0 +1,33 @@ +use std::collections::{BTreeMap, BTreeSet}; + +use super::Match; + +use serde::{Deserialize, Serialize}; + +// We don't need protocol versionning here because +// client and daemon are the same binary + +#[derive(Clone, Serialize, Deserialize)] +pub enum ClientRequest { + Info, + Flush(FlushOpts), +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct FlushOpts { + pub m: Match, + pub f: (String, String), +} + +#[derive(Clone, Serialize, Deserialize)] +pub enum DaemonResponse { + Info(InfoRes), + Flush, + Err(String), +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct InfoRes { + pub matches: BTreeMap<(String, String), BTreeMap>>, + pub execs: BTreeMap<(String, String, String), BTreeMap>>, +} diff --git a/rust/src/daemon/database/lowlevel.rs b/rust/src/daemon/database/lowlevel.rs index 7ed900d..d057133 100644 --- a/rust/src/daemon/database/lowlevel.rs +++ b/rust/src/daemon/database/lowlevel.rs @@ -11,7 +11,10 @@ use chrono::{DateTime, Local}; use log::{debug, error, warn}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use crate::concepts::{Config, Filter, LogEntry, Match}; +use crate::{ + concepts::{Config, Filter, LogEntry, Match}, + utils::{bincode_options, BincodeOptions}, +}; use super::DBError; @@ -23,20 +26,12 @@ type DatabaseHeader = BTreeMap; type ReadHeader = BTreeMap; type WriteHeader = BTreeMap<&'static Filter, usize>; -type BinOptions = bincode::config::WithOtherIntEncoding< - bincode::config::DefaultOptions, - bincode::config::VarintEncoding, ->; -fn bin_options() -> BinOptions { - bincode::DefaultOptions::new().with_varint_encoding() -} - const DB_SIGNATURE: &str = "reaction-db-v01"; pub struct ReadDB { f: BufReader, h: ReadHeader, - bin: BinOptions, + bin: BincodeOptions, } impl ReadDB { @@ -60,12 +55,12 @@ impl ReadDB { let mut ret = ReadDB { f: BufReader::new(file), h: BTreeMap::default(), - bin: bin_options(), + bin: bincode_options(), }; match ret.read::() { Ok(signature) => { - if DB_SIGNATURE == &signature { + if DB_SIGNATURE == signature { Ok(()) } else { Err(DBError::Error("database is not a reaction database".into())) @@ -115,7 +110,7 @@ impl Iterator for ReadDB { pub struct WriteDB { f: BufWriter, h: WriteHeader, - bin: BinOptions, + bin: BincodeOptions, } impl WriteDB { @@ -131,7 +126,7 @@ impl WriteDB { let mut ret = WriteDB { f: BufWriter::new(file), h: BTreeMap::default(), - bin: bin_options(), + bin: bincode_options(), }; if let Err(err) = ret._write(DB_SIGNATURE) { @@ -187,7 +182,6 @@ struct ComputedLogEntry { } impl ComputedLogEntry { - #[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used fn from(value: LogEntry, header: &WriteHeader) -> Result { match header.get(&value.f) { Some(f) => Ok(ComputedLogEntry { diff --git a/rust/src/daemon/database/mod.rs b/rust/src/daemon/database/mod.rs index 1049fa6..878750d 100644 --- a/rust/src/daemon/database/mod.rs +++ b/rust/src/daemon/database/mod.rs @@ -40,7 +40,6 @@ pub enum DBError { #[derive(Clone)] pub enum DatabaseManagerInput { Log(LogEntry), - #[allow(dead_code)] Flush(LogEntry), } @@ -192,7 +191,6 @@ fn __rotate_db( let mut millisecond_disambiguation_counter: u32 = 0; // Read flushes - #[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used let mut flushes: BTreeMap<&'static Filter, BTreeMap> = BTreeMap::new(); for flush_entry in flush_read_db { match flush_entry { diff --git a/rust/src/daemon/execs.rs b/rust/src/daemon/execs.rs index 46fb987..6e1afe7 100644 --- a/rust/src/daemon/execs.rs +++ b/rust/src/daemon/execs.rs @@ -1,15 +1,15 @@ use std::{ collections::{BTreeMap, BTreeSet}, process::Stdio, - sync::mpsc::{Receiver, Sender}, + sync::mpsc::{Receiver, Sender, SyncSender}, }; -use chrono::{DateTime, Local}; +use chrono::Local; use log::{error, info}; use timer::MessageTimer; use crate::{ - concepts::{Action, Config, Match, MAT}, + concepts::{Action, Config, Match, Time, MAT}, utils::ThreadPool, }; @@ -17,16 +17,17 @@ use crate::{ pub enum ExecsManagerInput { Exec(MAT), ExecPending(MAT), - #[allow(dead_code)] Flush(MAT), + Gimme(SyncSender), Stop, } -type ExecsMap = BTreeMap<&'static Action, BTreeMap>>>; +type ExecsMap = BTreeMap<&'static Action, BTreeMap>>; trait ExecsMapTrait { fn add(&mut self, mat: &MAT); - fn rm(&mut self, mat: &MAT); + fn rm(&mut self, mat: &MAT) -> bool; + fn rm_times(&mut self, mat: &MAT) -> Option>; } impl ExecsMapTrait for ExecsMap { fn add(&mut self, mat: &MAT) { @@ -35,10 +36,12 @@ impl ExecsMapTrait for ExecsMap { inner_set.insert(mat.t); } - fn rm(&mut self, mat: &MAT) { + fn rm(&mut self, mat: &MAT) -> bool { + let mut removed = false; if let Some(inner_map) = self.get_mut(&mat.a) { if let Some(inner_set) = inner_map.get_mut(&mat.m) { inner_set.remove(&mat.t); + removed = true; if inner_set.is_empty() { inner_map.remove(&mat.m); } @@ -47,6 +50,18 @@ impl ExecsMapTrait for ExecsMap { self.remove(&mat.a); } } + removed + } + + fn rm_times(&mut self, mat: &MAT) -> Option> { + let mut set = None; + if let Some(inner_map) = self.get_mut(&mat.a) { + set = inner_map.remove(&mat.m); + if inner_map.is_empty() { + self.remove(&mat.a); + } + } + set } } @@ -91,7 +106,6 @@ pub fn execs_manager( } }; - #[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used let mut execs: ExecsMap = BTreeMap::new(); let timer = MessageTimer::new(exec_tx); @@ -110,11 +124,19 @@ pub fn execs_manager( } } ExecsManagerInput::ExecPending(mat) => { - execs.rm(&mat); - exec_now(mat); + if execs.rm(&mat) { + exec_now(mat); + } } - #[allow(clippy::todo)] - ExecsManagerInput::Flush(_mat) => todo!(), + ExecsManagerInput::Flush(mat) => { + if let Some(set) = execs.rm_times(&mat) { + for _ in set { + exec_now(mat.clone()); + } + } + } + #[allow(clippy::unwrap_used)] // propagating panics is ok + ExecsManagerInput::Gimme(tx) => tx.send(execs.clone()).unwrap(), ExecsManagerInput::Stop => { for (action, inner_map) in execs { if action.on_exit() { diff --git a/rust/src/daemon/matches.rs b/rust/src/daemon/matches.rs index 406c79c..45cd129 100644 --- a/rust/src/daemon/matches.rs +++ b/rust/src/daemon/matches.rs @@ -16,8 +16,8 @@ use crate::{ pub enum MatchManagerInput { Match(MFT), Unmatch(MFT), - #[allow(dead_code)] Flush(MFT), + Gimme(SyncSender), EndOfStartup, Stop, } @@ -75,7 +75,6 @@ pub fn matches_manager( action_tx: Sender, log_tx: SyncSender, ) { - #[allow(clippy::mutable_key_type)] // Interior mutability of Arc is not used let mut matches: MatchesMap = BTreeMap::new(); let timer = MessageTimer::new(match_tx); @@ -114,7 +113,7 @@ pub fn matches_manager( if mft.f.retry().is_some() { matches.rm_times(&mft); } - mft.f.send_actions(&mft.m, mft.t, &action_tx); + mft.f.send_actions(&mft.m, mft.t, &action_tx, true); } if !startup { @@ -131,7 +130,22 @@ pub fn matches_manager( } MatchManagerInput::Unmatch(mft) => matches.rm(&mft), #[allow(clippy::todo)] - MatchManagerInput::Flush(_) => todo!(), // TODO handle flushes + MatchManagerInput::Flush(mft) => { + // remove from matches + matches.rm_times(&mft); + // send to DB + #[allow(clippy::unwrap_used)] // propagating panics is ok + log_tx + .send(DatabaseManagerInput::Flush(LogEntry { + exec: false, + m: mft.m, + f: mft.f, + t: mft.t, + })) + .unwrap(); + } + #[allow(clippy::unwrap_used)] // propagating panics is ok + MatchManagerInput::Gimme(tx) => tx.send(matches.clone()).unwrap(), MatchManagerInput::Stop => break, } } diff --git a/rust/src/daemon/mod.rs b/rust/src/daemon/mod.rs index e6dd19e..5a1d66e 100644 --- a/rust/src/daemon/mod.rs +++ b/rust/src/daemon/mod.rs @@ -1,4 +1,5 @@ use std::{ + fs, path::Path, process::exit, sync::{ @@ -10,6 +11,7 @@ use std::{ }; use log::{error, info, Level}; +use socket::socket_manager; use crate::{concepts::Config, utils::SimpleLogger}; use database::database_manager; @@ -22,6 +24,7 @@ pub use matches::MatchManagerInput; mod database; mod execs; mod matches; +mod socket; #[allow(unused_variables)] pub fn daemon(config_path: &Path, loglevel: Level, socket: &Path) { @@ -70,6 +73,13 @@ pub fn daemon(config_path: &Path, loglevel: Level, socket: &Path) { database_manager(config, log_rx, match_tx_database) }; + { + let match_tx_socket = match_tx.clone(); + let exec_tx_socket = exec_tx.clone(); + let socket = socket.to_owned(); + thread::spawn(move || socket_manager(config, socket, match_tx_socket, exec_tx_socket)); + } + for stream in config.streams().values() { let match_tx = match_tx.clone(); let (child_tx, child_rx) = sync_channel(0); @@ -116,7 +126,10 @@ pub fn daemon(config_path: &Path, loglevel: Level, socket: &Path) { let stop_ok = config.stop(); - // TODO remove socket + // not waiting for the socket_manager to finish, sorry + if let Err(err) = fs::remove_file(socket) { + error!("failed to remove socket: {}", err); + } exit(match !signal_received.load(Ordering::SeqCst) && stop_ok { true => 0, diff --git a/rust/src/daemon/socket.rs b/rust/src/daemon/socket.rs new file mode 100644 index 0000000..fa28632 --- /dev/null +++ b/rust/src/daemon/socket.rs @@ -0,0 +1,164 @@ +use std::{ + fs, io, + os::unix::net::{UnixListener, UnixStream}, + path::PathBuf, + process::exit, + sync::mpsc::{sync_channel, Sender}, +}; + +use bincode::Options; +use chrono::Local; +use log::{error, warn}; + +use crate::{ + concepts::{ClientRequest, Config, DaemonResponse, InfoRes, MFT}, + utils::bincode_options, +}; + +use super::{ExecsManagerInput, MatchManagerInput}; + +macro_rules! err_str { + ($expression:expr) => { + $expression.map_err(|err| err.to_string()) + }; +} + +fn open_socket(path: PathBuf) -> Result { + // First create all directories to the file + let dir = path + .parent() + .ok_or(format!("socket {path:?} has no parent directory"))?; + err_str!(fs::create_dir_all(dir))?; + // Test if file exists + match fs::metadata(&path) { + Ok(meta) => { + if meta.file_type().is_dir() { + Err(format!("socket {path:?} is already a directory")) + } else { + warn!("socket {path:?} already exists: is the daemon already running? deleting."); + err_str!(fs::remove_file(&path)) + } + } + Err(err) => err_str!(match err.kind() { + io::ErrorKind::NotFound => Ok(()), + _ => Err(err), + }), + }?; + // Open socket + err_str!(UnixListener::bind(path)) +} + +macro_rules! or_next { + ($msg:expr, $expression:expr) => { + match $expression { + Ok(x) => x, + Err(err) => { + error!("failed to answer client: {}, {}", $msg, err); + continue; + } + } + }; +} + +pub fn socket_manager( + config: &'static Config, + socket: PathBuf, + match_tx: Sender, + exec_tx: Sender, +) { + let listener = match open_socket(socket) { + Ok(l) => l, + Err(err) => { + error!("while creating communication socket: {err}"); + exit(1); + } + }; + + let bin = bincode_options(); + for try_conn in listener.incoming() { + match try_conn { + Ok(conn) => { + let conn2 = or_next!("failed to clone stream", conn.try_clone()); + // read request + let request = or_next!( + "invalid message received: ", + bin.deserialize_from::(conn) + ); + let response = match request { + ClientRequest::Info => { + // ask for matches clone + let (m_tx, m_rx) = sync_channel(0); + #[allow(clippy::unwrap_used)] // propagating panics is ok + match_tx.send(MatchManagerInput::Gimme(m_tx)).unwrap(); + #[allow(clippy::unwrap_used)] // propagating panics is ok + let matches = m_rx.recv().unwrap(); + + // ask for execs clone + let (e_tx, e_rx) = sync_channel(0); + #[allow(clippy::unwrap_used)] // propagating panics is ok + exec_tx.send(ExecsManagerInput::Gimme(e_tx)).unwrap(); + #[allow(clippy::unwrap_used)] // propagating panics is ok + let execs = e_rx.recv().unwrap(); + + // Transform structures + macro_rules! map_map { + ($map:expr) => { + $map.into_iter() + .map(|(object, inner_map)| { + ( + object.full_name(), + inner_map + .into_iter() + .map(|(key, set)| { + ( + key, + set.into_iter() + .map(|time| time.timestamp()) + .collect(), + ) + }) + .collect(), + ) + }) + .collect() + }; + } + + DaemonResponse::Info(InfoRes { + matches: map_map!(matches), + execs: map_map!(execs), + }) + } + ClientRequest::Flush(flush) => { + match config.get_filter(&flush.f) { + Some(filter) => { + let now = Local::now(); + // Flush actions + filter.send_actions(&flush.m, now, &exec_tx, false); + // Flush filters + #[allow(clippy::unwrap_used)] // propagating panics is ok + match_tx + .send(MatchManagerInput::Flush(MFT { + m: flush.m, + f: filter, + t: now, + })) + .unwrap(); + DaemonResponse::Flush + } + None => DaemonResponse::Err(format!( + "no filter with name {}.{}", + flush.f.0, flush.f.1 + )), + } + } + }; + or_next!( + "failed to send response:", + bin.serialize_into(conn2, &response) + ); + } + Err(err) => error!("failed to open connection from cli: {err}"), + } + } +} diff --git a/rust/src/main.rs b/rust/src/main.rs index a435132..01fc447 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -3,9 +3,9 @@ clippy::panic, clippy::todo, clippy::unimplemented, - clippy::unwrap_used, + clippy::unwrap_used )] -#![allow(clippy::upper_case_acronyms)] +#![allow(clippy::upper_case_acronyms, clippy::mutable_key_type)] #![forbid(unsafe_code)] //! TODO document a bit diff --git a/rust/src/utils/mod.rs b/rust/src/utils/mod.rs index 46debbe..49c2207 100644 --- a/rust/src/utils/mod.rs +++ b/rust/src/utils/mod.rs @@ -3,6 +3,15 @@ pub mod logger; mod parse_duration; mod threadpool; +use bincode::Options; pub use logger::SimpleLogger; pub use parse_duration::parse_duration; pub use threadpool::ThreadPool; + +pub type BincodeOptions = bincode::config::WithOtherIntEncoding< + bincode::config::DefaultOptions, + bincode::config::VarintEncoding, +>; +pub fn bincode_options() -> BincodeOptions { + bincode::DefaultOptions::new().with_varint_encoding() +}