From ce0c34de8e6529d69aae437922defd29188cf861 Mon Sep 17 00:00:00 2001 From: ppom Date: Tue, 1 Oct 2024 12:00:00 +0200 Subject: [PATCH] Socket communication! --- rust/src/client/mod.rs | 100 +++++++++---- rust/src/concepts/action.rs | 24 +-- rust/src/concepts/config.rs | 4 + rust/src/concepts/filter.rs | 37 ++--- rust/src/concepts/messages.rs | 30 ++-- rust/src/concepts/mod.rs | 7 + rust/src/concepts/socket_messages.rs | 61 ++++++-- rust/src/concepts/stream.rs | 2 +- rust/src/daemon/database/lowlevel.rs | 9 +- rust/src/daemon/database/mod.rs | 2 +- rust/src/daemon/execs.rs | 107 +++++++------- rust/src/daemon/matches.rs | 123 +++++++--------- rust/src/daemon/mod.rs | 14 +- rust/src/daemon/socket.rs | 212 +++++++++++++++++---------- rust/src/daemon/statemap.rs | 123 ++++++++++++++++ rust/src/main.rs | 13 +- rust/src/utils/cli.rs | 33 +---- rust/test.jsonnet | 6 +- 18 files changed, 582 insertions(+), 325 deletions(-) create mode 100644 rust/src/daemon/statemap.rs diff --git a/rust/src/client/mod.rs b/rust/src/client/mod.rs index ffc1b65..91aa1a1 100644 --- a/rust/src/client/mod.rs +++ b/rust/src/client/mod.rs @@ -1,36 +1,80 @@ -use std::path::PathBuf; +use std::{error::Error, io::stdout, os::unix::net::UnixStream, path::PathBuf, process::exit}; -use log::debug; -use regex::Regex; +use bincode::Options; +use log::{debug, error, Level}; -use crate::utils::cli::{Format, NamedRegex}; - -pub fn show( - socket: &PathBuf, - format: Format, - limit: &Option, - pattern: &Option, - patterns: &Vec, -) { - debug!( - "show {:?} {:?} {:?} {:?} {:?}", - socket, format, limit, pattern, patterns - ); +use crate::{ + concepts::{ClientRequest, ClientStatus, DaemonResponse, Order}, + utils::{bincode_options, cli::Format, SimpleLogger}, +}; +macro_rules! or_quit { + ($msg:expr, $expression:expr) => { + match $expression { + Ok(x) => x, + Err(err) => { + error!("failed to communicate to daemon: {}, {}", $msg, err); + exit(1); + } + } + }; } -pub fn flush( - socket: &PathBuf, - format: Format, - limit: &Option, - pattern: &Option, - patterns: &Vec, -) { - debug!( - "flush {:?} {:?} {:?} {:?} {:?}", - socket, format, limit, pattern, patterns - ); +fn send_retrieve(socket: &PathBuf, req: &ClientRequest) -> DaemonResponse { + let bin = bincode_options(); + let conn = or_quit!("opening connection to daemon", UnixStream::connect(socket)); + let conn2 = or_quit!("failed to clone stream", conn.try_clone()); + or_quit!("failed to send request", bin.serialize_into(conn, req)); + or_quit!( + "failed to send request", + bin.deserialize_from::(conn2) + ) } -pub fn test_regex(config_path: &PathBuf, regex: &String, line: &Option) { +fn print_status(cs: ClientStatus, format: Format) -> Result<(), Box> { + Ok(match format { + Format::JSON => serde_json::to_writer(stdout().lock(), &cs)?, + Format::YAML => serde_yaml::to_writer(stdout().lock(), &cs)?, + }) +} + +pub fn request( + socket: PathBuf, + format: Format, + stream_filter: Option, + patterns: Vec<(String, String)>, + order: Order, +) { + if let Err(err) = SimpleLogger::init(Level::Debug) { + eprintln!("ERROR could not initialize logging: {err}"); + exit(1); + } + + let response = send_retrieve( + &socket, + &ClientRequest { + order, + stream_filter, + patterns, + }, + ); + match response { + DaemonResponse::Order(cs) => { + if let Err(err) = print_status(cs, format) { + error!("while printing response: {err}"); + exit(1); + } + } + DaemonResponse::Err(err) => { + error!("failed to communicate to daemon: error response: {err}"); + exit(1); + } + } +} + +pub fn test_regex(config_path: PathBuf, regex: String, line: Option) { + if let Err(err) = SimpleLogger::init(Level::Debug) { + eprintln!("ERROR could not initialize logging: {err}"); + exit(1); + } debug!("test-regex {:?} {:?} {:?} ", config_path, regex, line); } diff --git a/rust/src/concepts/action.rs b/rust/src/concepts/action.rs index ec8ea63..6d1561d 100644 --- a/rust/src/concepts/action.rs +++ b/rust/src/concepts/action.rs @@ -4,7 +4,7 @@ use chrono::TimeDelta; use serde::Deserialize; -use super::{Match, Pattern}; +use super::{ActionFilter, Match, Pattern}; use crate::utils::parse_duration; #[derive(Clone, Debug, Deserialize)] @@ -34,6 +34,20 @@ fn set_false() -> bool { false } +impl ActionFilter for Action { + fn patterns(&self) -> &BTreeSet> { + &self.patterns + } + + fn full_name<'a>(&'a self) -> (&'a str, &'a str, &'a str) { + ( + &self.stream_name, + &self.filter_name, + &self.name, + ) + } +} + impl Action { pub fn after_duration(&self) -> Option { self.after_duration @@ -43,14 +57,6 @@ impl Action { self.on_exit } - pub fn full_name(&self) -> (String, String, String) { - ( - self.stream_name.clone(), - self.filter_name.clone(), - self.name.clone(), - ) - } - pub fn setup( &mut self, stream_name: &str, diff --git a/rust/src/concepts/config.rs b/rust/src/concepts/config.rs index 61cf401..08c86b7 100644 --- a/rust/src/concepts/config.rs +++ b/rust/src/concepts/config.rs @@ -36,6 +36,10 @@ impl Config { &self.streams } + pub fn patterns(&self) -> &Patterns { + &self.patterns + } + pub fn concurrency(&self) -> usize { self.concurrency } diff --git a/rust/src/concepts/filter.rs b/rust/src/concepts/filter.rs index 60dc99c..61aae3e 100644 --- a/rust/src/concepts/filter.rs +++ b/rust/src/concepts/filter.rs @@ -12,7 +12,7 @@ use serde::Deserialize; use super::{ messages::{Match, Time, MAT}, - Action, Pattern, Patterns, + Action, ActionFilter, Pattern, Patterns, }; use crate::{daemon::ExecsManagerInput, utils::parse_duration}; @@ -45,6 +45,16 @@ pub struct Filter { stream_name: String, } +impl ActionFilter for Filter { + fn full_name<'a>(&'a self) -> (&'a str, &'a str, &'a str) { + (self.stream_name.as_ref(), self.name.as_ref(), "") + } + + fn patterns(&self) -> &BTreeSet> { + &self.patterns + } +} + impl Filter { #[cfg(test)] pub fn from_name(stream_name: &str, filter_name: &str) -> Filter { @@ -55,10 +65,6 @@ impl Filter { } } - pub fn full_name(&self) -> (String, String) { - (self.stream_name.clone(), self.name.clone()) - } - pub fn retry(&self) -> Option { self.retry } @@ -71,10 +77,6 @@ impl Filter { self.longuest_action_duration } - pub fn patterns(&self) -> &BTreeSet> { - &self.patterns - } - pub fn setup( &mut self, stream_name: &str, @@ -202,26 +204,15 @@ impl Filter { None } - pub fn send_actions( - &'static self, - m: &Match, - t: Time, - tx: &Sender, - exec: bool, - ) { + pub fn send_actions(&'static self, m: &Match, t: Time, tx: &Sender) { for action in self.actions.values() { let mat = MAT { m: m.clone(), - a: action, + o: action, t: t + action.after_duration().unwrap_or_default(), }; #[allow(clippy::unwrap_used)] // propagating panics is ok - tx.send(if exec { - ExecsManagerInput::Exec(mat) - } else { - ExecsManagerInput::Flush(mat) - }) - .unwrap(); + tx.send(ExecsManagerInput::Exec(mat)).unwrap(); } } } diff --git a/rust/src/concepts/messages.rs b/rust/src/concepts/messages.rs index f4541bf..5a9b1c7 100644 --- a/rust/src/concepts/messages.rs +++ b/rust/src/concepts/messages.rs @@ -1,23 +1,33 @@ use chrono::{DateTime, Local, TimeDelta}; -use super::{Action, Filter}; +use super::{Action, ActionFilter, Filter}; pub type Time = DateTime; pub type Match = Vec; #[derive(Clone)] -pub struct MFT { +pub struct MT { pub m: Match, - pub f: &'static Filter, + pub o: &'static T, pub t: Time, } -#[derive(Clone)] -pub struct MAT { - pub m: Match, - pub a: &'static Action, - pub t: Time, -} +pub type MFT = MT; +pub type MAT = MT; + +// #[derive(Clone)] +// pub struct MFT { +// pub m: Match, +// pub f: &'static Filter, +// pub t: Time, +// } + +// #[derive(Clone)] +// pub struct MAT { +// pub m: Match, +// pub a: &'static Action, +// pub t: Time, +// } #[derive(Clone, Debug)] pub struct LogEntry { @@ -42,7 +52,7 @@ impl From for MFT { fn from(value: LogEntry) -> Self { MFT { m: value.m, - f: value.f, + o: value.f, t: value.t, } } diff --git a/rust/src/concepts/mod.rs b/rust/src/concepts/mod.rs index e51511c..c0a87b1 100644 --- a/rust/src/concepts/mod.rs +++ b/rust/src/concepts/mod.rs @@ -6,6 +6,8 @@ mod pattern; mod socket_messages; mod stream; +use std::{collections::BTreeSet, fmt::Display, sync::Arc}; + pub use action::*; pub use config::*; pub use filter::*; @@ -13,3 +15,8 @@ pub use messages::*; pub use pattern::*; pub use socket_messages::*; pub use stream::*; + +pub trait ActionFilter: Clone + Display + PartialEq + Eq + PartialOrd + Ord { + fn patterns(&self) -> &BTreeSet>; + fn full_name<'a>(&'a self) -> (&'a str, &'a str, &'a str); +} diff --git a/rust/src/concepts/socket_messages.rs b/rust/src/concepts/socket_messages.rs index c4be743..b0d1088 100644 --- a/rust/src/concepts/socket_messages.rs +++ b/rust/src/concepts/socket_messages.rs @@ -2,27 +2,33 @@ use std::collections::{BTreeMap, BTreeSet}; use super::Match; -use serde::{Deserialize, Serialize}; +use serde::{ser::SerializeStruct, Deserialize, Serialize}; // We don't need protocol versionning here because // client and daemon are the same binary -#[derive(Clone, Serialize, Deserialize)] -pub enum ClientRequest { - Info, - Flush(FlushOpts), +#[derive(Copy, Clone, Serialize, Deserialize)] +pub enum Order { + Show, + Flush, } #[derive(Clone, Serialize, Deserialize)] -pub struct FlushOpts { +pub struct ClientRequest { + pub order: Order, + pub stream_filter: Option, + pub patterns: Vec<(String, String)>, +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct FlushOptions { pub m: Match, pub f: (String, String), } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Serialize, Deserialize)] pub enum DaemonResponse { - Info(InfoRes), - Flush, + Order(ClientStatus), Err(String), } @@ -31,3 +37,40 @@ pub struct InfoRes { pub matches: BTreeMap<(String, String), BTreeMap>>, pub execs: BTreeMap<(String, String, String), BTreeMap>>, } + +pub type ClientStatus = BTreeMap>>; + +#[derive(Debug, Default, Deserialize)] +pub struct PatternStatus { + pub matches: usize, + pub actions: BTreeMap>, +} + +impl Serialize for PatternStatus { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + // We only skip serializing emptiness if we're on a human-readable format + // This means we're printing for user, not exchanging it over a socket + let state = if serializer.is_human_readable() { + let ser_matches = self.matches != 0; + let ser_actions = self.actions.len() != 0; + let mut state = serializer + .serialize_struct("PatternStatus", ser_matches as usize + ser_actions as usize)?; + if ser_matches { + state.serialize_field("matches", &self.matches)?; + } + if ser_actions { + state.serialize_field("actions", &self.actions)?; + } + state + } else { + let mut state = serializer.serialize_struct("PatternStatus", 2)?; + state.serialize_field("matches", &self.matches)?; + state.serialize_field("actions", &self.actions)?; + state + }; + state.end() + } +} diff --git a/rust/src/concepts/stream.rs b/rust/src/concepts/stream.rs index 2971a2c..d1fa6bd 100644 --- a/rust/src/concepts/stream.rs +++ b/rust/src/concepts/stream.rs @@ -106,7 +106,7 @@ impl Stream { match_tx .send(MatchManagerInput::Match(MFT { m: match_, - f: filter, + o: filter, t: Local::now(), })) .unwrap(); diff --git a/rust/src/daemon/database/lowlevel.rs b/rust/src/daemon/database/lowlevel.rs index d057133..7172156 100644 --- a/rust/src/daemon/database/lowlevel.rs +++ b/rust/src/daemon/database/lowlevel.rs @@ -12,7 +12,7 @@ use log::{debug, error, warn}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use crate::{ - concepts::{Config, Filter, LogEntry, Match}, + concepts::{ActionFilter, Config, Filter, LogEntry, Match}, utils::{bincode_options, BincodeOptions}, }; @@ -137,7 +137,10 @@ impl WriteDB { let database_header: DatabaseHeader = config .filters() .into_iter() - .map(|f| f.full_name()) + .map(|f| { + let names = f.full_name(); + (names.0.to_owned(), names.1.to_owned()) + }) .enumerate() .collect(); @@ -163,7 +166,7 @@ impl WriteDB { fn _write(&mut self, data: T) -> Result<(), DBError> { let encoded = self.bin.serialize(&data)?; - debug!("writing this: {:?}, {:?}", &data, &encoded); + // debug!("writing this: {:?}, {:?}", &data, &encoded); self.f.write_all(&encoded)?; Ok(()) } diff --git a/rust/src/daemon/database/mod.rs b/rust/src/daemon/database/mod.rs index 878750d..d0bed01 100644 --- a/rust/src/daemon/database/mod.rs +++ b/rust/src/daemon/database/mod.rs @@ -12,7 +12,7 @@ use log::{error, info, warn}; use thiserror::Error; use super::MatchManagerInput; -use crate::concepts::{Config, Filter, LogEntry, Match, Time}; +use crate::concepts::{ActionFilter, Config, Filter, LogEntry, Match, Time}; mod lowlevel; mod tests; diff --git a/rust/src/daemon/execs.rs b/rust/src/daemon/execs.rs index 6e1afe7..91793e4 100644 --- a/rust/src/daemon/execs.rs +++ b/rust/src/daemon/execs.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, BTreeSet}, + collections::BTreeMap, process::Stdio, sync::mpsc::{Receiver, Sender, SyncSender}, }; @@ -9,66 +9,30 @@ use log::{error, info}; use timer::MessageTimer; use crate::{ - concepts::{Action, Config, Match, Time, MAT}, + concepts::{Action, ActionFilter, Config, LogEntry, Order, MAT}, utils::ThreadPool, }; +use super::{ + database::DatabaseManagerInput, + statemap::{FilterOptions, StateMap, StateMapTrait}, +}; + #[derive(Clone)] pub enum ExecsManagerInput { Exec(MAT), ExecPending(MAT), - Flush(MAT), - Gimme(SyncSender), + Order(Order, FilterOptions, SyncSender), Stop, } -type ExecsMap = BTreeMap<&'static Action, BTreeMap>>; - -trait ExecsMapTrait { - fn add(&mut self, mat: &MAT); - fn rm(&mut self, mat: &MAT) -> bool; - fn rm_times(&mut self, mat: &MAT) -> Option>; -} -impl ExecsMapTrait for ExecsMap { - fn add(&mut self, mat: &MAT) { - let inner_map = self.entry(mat.a).or_default(); - let inner_set = inner_map.entry(mat.m.clone()).or_default(); - inner_set.insert(mat.t); - } - - fn rm(&mut self, mat: &MAT) -> bool { - let mut removed = false; - if let Some(inner_map) = self.get_mut(&mat.a) { - if let Some(inner_set) = inner_map.get_mut(&mat.m) { - inner_set.remove(&mat.t); - removed = true; - if inner_set.is_empty() { - inner_map.remove(&mat.m); - } - } - if inner_map.is_empty() { - self.remove(&mat.a); - } - } - removed - } - - fn rm_times(&mut self, mat: &MAT) -> Option> { - let mut set = None; - if let Some(inner_map) = self.get_mut(&mat.a) { - set = inner_map.remove(&mat.m); - if inner_map.is_empty() { - self.remove(&mat.a); - } - } - set - } -} +type ExecsMap = StateMap; pub fn execs_manager( config: &'static Config, exec_rx: Receiver, exec_tx: Sender, + log_tx: SyncSender, ) { // Initialize a ThreadPool only when concurrency hasn't been disabled let thread_pool = if config.concurrency() > 1 { @@ -79,7 +43,7 @@ pub fn execs_manager( let exec_now = |mat: MAT| { let mut closure = { - let action = mat.a; + let action = mat.o; // Construct command let mut command = action.exec(&mat.m); @@ -128,15 +92,50 @@ pub fn execs_manager( exec_now(mat); } } - ExecsManagerInput::Flush(mat) => { - if let Some(set) = execs.rm_times(&mat) { - for _ in set { - exec_now(mat.clone()); + ExecsManagerInput::Order(order, options, tx) => { + let filtered = execs.filtered(options); + + if let Order::Flush = order { + let now = Local::now(); + // filter the state_map according to provided options + for (action, inner_map) in &filtered { + // get filter (required for LogEntry, FIXME optimize this) + let filter = { + let name = action.full_name(); + #[allow(clippy::unwrap_used)] + // We're pretty confident our action has a filter + config + .get_filter(&(name.0.to_string(), name.1.to_string())) + .unwrap() + }; + + for (match_, _) in inner_map { + let mat = MAT { + m: match_.clone(), + o: action, + t: now, + }; + // delete them from state and execute them + if let Some(set) = execs.rm_times(&mat) { + for _ in set { + exec_now(mat.clone()); + } + } + #[allow(clippy::unwrap_used)] // propagating panics is ok + log_tx + .send(DatabaseManagerInput::Flush(LogEntry { + exec: false, + m: mat.m, + f: filter, + t: mat.t, + })) + .unwrap(); + } } } + #[allow(clippy::unwrap_used)] // propagating panics is ok + tx.send(filtered).unwrap(); } - #[allow(clippy::unwrap_used)] // propagating panics is ok - ExecsManagerInput::Gimme(tx) => tx.send(execs.clone()).unwrap(), ExecsManagerInput::Stop => { for (action, inner_map) in execs { if action.on_exit() { @@ -144,7 +143,7 @@ pub fn execs_manager( for _ in inner_set { exec_now(MAT { m: match_.clone(), - a: action, + o: action, t: Local::now(), }); } diff --git a/rust/src/daemon/matches.rs b/rust/src/daemon/matches.rs index 45cd129..6b4bad5 100644 --- a/rust/src/daemon/matches.rs +++ b/rust/src/daemon/matches.rs @@ -1,73 +1,29 @@ use std::{ - collections::{BTreeMap, BTreeSet}, + collections::BTreeMap, sync::mpsc::{Receiver, Sender, SyncSender}, }; +use chrono::Local; use log::debug; use timer::MessageTimer; -use super::{database::DatabaseManagerInput, ExecsManagerInput}; -use crate::{ - concepts::Filter, - concepts::{LogEntry, Match, Time, MFT}, +use super::{ + database::DatabaseManagerInput, + statemap::{FilterOptions, StateMap, StateMapTrait}, + ExecsManagerInput, }; +use crate::concepts::{ActionFilter, Filter, LogEntry, Order, MFT}; #[derive(Clone)] pub enum MatchManagerInput { Match(MFT), Unmatch(MFT), - Flush(MFT), - Gimme(SyncSender), + Order(Order, FilterOptions, SyncSender), EndOfStartup, Stop, } -type MatchesMap = BTreeMap<&'static Filter, BTreeMap>>; - -// This trait is needed to permit to implement methods on an external type -trait MatchesMapTrait { - fn add(&mut self, mft: &MFT); - fn rm(&mut self, mft: &MFT); - fn rm_times(&mut self, mft: &MFT); - fn get_times(&self, mft: &MFT) -> usize; -} -impl MatchesMapTrait for MatchesMap { - fn add(&mut self, mft: &MFT) { - let inner_map = self.entry(mft.f).or_default(); - let inner_set = inner_map.entry(mft.m.clone()).or_default(); - inner_set.insert(mft.t); - } - - fn rm(&mut self, mft: &MFT) { - if let Some(inner_map) = self.get_mut(&mft.f) { - if let Some(inner_set) = inner_map.get_mut(&mft.m) { - inner_set.remove(&mft.t); - if inner_set.is_empty() { - inner_map.remove(&mft.m); - } - } - if inner_map.is_empty() { - self.remove(&mft.f); - } - } - } - - fn rm_times(&mut self, mft: &MFT) { - if let Some(inner_map) = self.get_mut(&mft.f) { - inner_map.remove(&mft.m); - if inner_map.is_empty() { - self.remove(&mft.f); - } - } - } - - fn get_times(&self, mft: &MFT) -> usize { - match self.get(&mft.f).and_then(|map| map.get(&mft.m)) { - Some(x) => x.len(), - None => 0, - } - } -} +pub type MatchesMap = StateMap; pub fn matches_manager( match_rx: Receiver, @@ -82,6 +38,9 @@ pub fn matches_manager( let mut startup = true; for mft in match_rx.iter() { + for (filter, map) in matches.iter() { + debug!("MATCHES {:?} {:?}", filter.full_name(), map.keys()); + } match mft { MatchManagerInput::EndOfStartup => { debug!("end of startup!"); @@ -89,7 +48,7 @@ pub fn matches_manager( } MatchManagerInput::Match(mft) => { // Store matches - let exec = match mft.f.retry() { + let exec = match mft.o.retry() { None => true, Some(retry) => { // Add new match @@ -98,7 +57,7 @@ pub fn matches_manager( let guard = timer.schedule_with_delay( // retry_duration is always Some() after filter's setup #[allow(clippy::unwrap_used)] - mft.f.retry_duration().unwrap(), + mft.o.retry_duration().unwrap(), MatchManagerInput::Unmatch(mft.clone()), ); guard.ignore(); @@ -110,10 +69,10 @@ pub fn matches_manager( // Executing actions if exec { // Delete matches only if storing them - if mft.f.retry().is_some() { + if mft.o.retry().is_some() { matches.rm_times(&mft); } - mft.f.send_actions(&mft.m, mft.t, &action_tx, true); + mft.o.send_actions(&mft.m, mft.t, &action_tx); } if !startup { @@ -122,30 +81,48 @@ pub fn matches_manager( .send(DatabaseManagerInput::Log(LogEntry { exec, m: mft.m, - f: mft.f, + f: mft.o, t: mft.t, })) .unwrap(); } } - MatchManagerInput::Unmatch(mft) => matches.rm(&mft), - #[allow(clippy::todo)] - MatchManagerInput::Flush(mft) => { - // remove from matches - matches.rm_times(&mft); - // send to DB + MatchManagerInput::Unmatch(mft) => { + matches.rm(&mft); + } + MatchManagerInput::Order(order, options, tx) => { + let filtered = matches.filtered(options); + + if let Order::Flush = order { + let now = Local::now(); + // filter the state_map according to provided options + for (filter, inner_map) in &filtered { + for (match_, _) in inner_map { + let mft = MFT { + m: match_.clone(), + o: filter, + t: now, + }; + // delete them from state + matches.rm_times(&mft); + // send them to DB + #[allow(clippy::unwrap_used)] // propagating panics is ok + log_tx + .send(DatabaseManagerInput::Flush(LogEntry { + exec: false, + m: mft.m, + f: mft.o, + t: mft.t, + })) + .unwrap(); + } + } + } + #[allow(clippy::unwrap_used)] // propagating panics is ok - log_tx - .send(DatabaseManagerInput::Flush(LogEntry { - exec: false, - m: mft.m, - f: mft.f, - t: mft.t, - })) - .unwrap(); + tx.send(filtered).unwrap(); } #[allow(clippy::unwrap_used)] // propagating panics is ok - MatchManagerInput::Gimme(tx) => tx.send(matches.clone()).unwrap(), MatchManagerInput::Stop => break, } } diff --git a/rust/src/daemon/mod.rs b/rust/src/daemon/mod.rs index 5a1d66e..f60a08f 100644 --- a/rust/src/daemon/mod.rs +++ b/rust/src/daemon/mod.rs @@ -1,6 +1,6 @@ use std::{ fs, - path::Path, + path::PathBuf, process::exit, sync::{ atomic::{AtomicBool, Ordering}, @@ -25,15 +25,16 @@ mod database; mod execs; mod matches; mod socket; +mod statemap; #[allow(unused_variables)] -pub fn daemon(config_path: &Path, loglevel: Level, socket: &Path) { +pub fn daemon(config_path: PathBuf, loglevel: Level, socket: PathBuf) { if let Err(err) = SimpleLogger::init(loglevel) { eprintln!("ERROR could not initialize logging: {err}"); exit(1); } - let config: &'static Config = match Config::from_file(config_path) { + let config: &'static Config = match Config::from_file(&config_path) { Ok(config) => Box::leak(Box::new(config)), Err(err) => { error!("{err}"); @@ -59,12 +60,15 @@ pub fn daemon(config_path: &Path, loglevel: Level, socket: &Path) { let matches_manager_thread_handle = { let match_tx_matches = match_tx.clone(); let exec_tx_matches = exec_tx.clone(); - thread::spawn(move || matches_manager(match_rx, match_tx_matches, exec_tx_matches, log_tx)) + let log_tx_matches = log_tx.clone(); + thread::spawn(move || { + matches_manager(match_rx, match_tx_matches, exec_tx_matches, log_tx_matches) + }) }; let execs_manager_thread_handle = { let exec_tx_execs = exec_tx.clone(); - thread::spawn(move || execs_manager(config, exec_rx, exec_tx_execs)) + thread::spawn(move || execs_manager(config, exec_rx, exec_tx_execs, log_tx)) }; let database_manager_thread_handle = { diff --git a/rust/src/daemon/socket.rs b/rust/src/daemon/socket.rs index fa28632..2c167a5 100644 --- a/rust/src/daemon/socket.rs +++ b/rust/src/daemon/socket.rs @@ -1,21 +1,25 @@ use std::{ + collections::BTreeMap, fs, io, os::unix::net::{UnixListener, UnixStream}, path::PathBuf, process::exit, - sync::mpsc::{sync_channel, Sender}, + sync::{ + mpsc::{sync_channel, Sender}, + Arc, + }, }; use bincode::Options; -use chrono::Local; use log::{error, warn}; +use regex::Regex; use crate::{ - concepts::{ClientRequest, Config, DaemonResponse, InfoRes, MFT}, + concepts::{ActionFilter, ClientRequest, ClientStatus, Config, DaemonResponse, Pattern, PatternStatus}, utils::bincode_options, }; -use super::{ExecsManagerInput, MatchManagerInput}; +use super::{statemap::FilterOptions, ExecsManagerInput, MatchManagerInput}; macro_rules! err_str { ($expression:expr) => { @@ -48,6 +52,131 @@ fn open_socket(path: PathBuf) -> Result { err_str!(UnixListener::bind(path)) } +fn answer_order( + config: &'static Config, + match_tx: &Sender, + exec_tx: &Sender, + options: ClientRequest, +) -> Result { + // Compute options + let filtering_options = { + let (stream_name, filter_name) = match options.stream_filter { + Some(sf) => match sf.split_once(".") { + Some((s, f)) => (Some(s.to_string()), Some(f.to_string())), + None => (Some(sf), None), + }, + None => (None, None), + }; + + // Compute the Vec<(pattern_name, String)> into a BTreeMap, Regex> + let patterns = options + .patterns + .iter() + .map(|(name, reg)| { + // lookup pattern in config.patterns + config + .patterns() + .iter() + // retrieve or Err + .find(|(pattern_name, _)| name == *pattern_name) + .ok_or_else(|| format!("pattern '{name}' doesn't exist")) + // compile Regex or Err + .and_then(|(_, pattern)| match Regex::new(reg) { + Ok(reg) => Ok((pattern.clone(), reg)), + Err(err) => Err(format!("pattern '{name}' regex doesn't compile: {err}")), + }) + }) + .collect::, Regex>, String>>()?; + + FilterOptions { + stream_name, + filter_name, + patterns, + } + }; + + // ask for matches clone + let matches = { + let (m_tx, m_rx) = sync_channel(0); + + #[allow(clippy::unwrap_used)] // propagating panics is ok + match_tx + .send(MatchManagerInput::Order( + options.order, + filtering_options.clone(), + m_tx, + )) + .unwrap(); + + #[allow(clippy::unwrap_used)] // propagating panics is ok + m_rx.recv().unwrap() + }; + + // ask for execs clone + let execs = { + let (e_tx, e_rx) = sync_channel(0); + + #[allow(clippy::unwrap_used)] // propagating panics is ok + exec_tx + .send(ExecsManagerInput::Order( + options.order, + filtering_options, + e_tx, + )) + .unwrap(); + + #[allow(clippy::unwrap_used)] // propagating panics is ok + e_rx.recv().unwrap() + }; + + // Transform matches and execs into a ClientStatus + let cs: ClientStatus = matches + .into_iter() + .fold(BTreeMap::new(), |mut acc, (object, map)| { + let (stream, filter, _) = object.full_name(); + acc.entry(stream.into()) + .or_default() + .entry(filter.into()) + .or_default() + .extend(map.into_iter().map(|(match_, times)| { + ( + match_.join(" "), + PatternStatus { + matches: times.len(), + ..Default::default() + }, + ) + })); + acc + }); + + let cs = execs.into_iter().fold(cs, |mut acc, (object, map)| { + let (stream, filter, action) = object.full_name(); + let inner_map = acc + .entry(stream.into()) + .or_default() + .entry(filter.into()) + .or_default(); + + map.into_iter().for_each(|(match_, times)| { + inner_map + .entry(match_.join(" ")) + .or_default() + .actions + .insert( + action.to_string(), + times + .into_iter() + .map(|time| time.to_rfc3339().chars().take(19).collect()) + .collect(), + ); + }); + acc + }); + + Ok(cs) +} + macro_rules! or_next { ($msg:expr, $expression:expr) => { match $expression { @@ -84,78 +213,13 @@ pub fn socket_manager( "invalid message received: ", bin.deserialize_from::(conn) ); - let response = match request { - ClientRequest::Info => { - // ask for matches clone - let (m_tx, m_rx) = sync_channel(0); - #[allow(clippy::unwrap_used)] // propagating panics is ok - match_tx.send(MatchManagerInput::Gimme(m_tx)).unwrap(); - #[allow(clippy::unwrap_used)] // propagating panics is ok - let matches = m_rx.recv().unwrap(); - - // ask for execs clone - let (e_tx, e_rx) = sync_channel(0); - #[allow(clippy::unwrap_used)] // propagating panics is ok - exec_tx.send(ExecsManagerInput::Gimme(e_tx)).unwrap(); - #[allow(clippy::unwrap_used)] // propagating panics is ok - let execs = e_rx.recv().unwrap(); - - // Transform structures - macro_rules! map_map { - ($map:expr) => { - $map.into_iter() - .map(|(object, inner_map)| { - ( - object.full_name(), - inner_map - .into_iter() - .map(|(key, set)| { - ( - key, - set.into_iter() - .map(|time| time.timestamp()) - .collect(), - ) - }) - .collect(), - ) - }) - .collect() - }; - } - - DaemonResponse::Info(InfoRes { - matches: map_map!(matches), - execs: map_map!(execs), - }) - } - ClientRequest::Flush(flush) => { - match config.get_filter(&flush.f) { - Some(filter) => { - let now = Local::now(); - // Flush actions - filter.send_actions(&flush.m, now, &exec_tx, false); - // Flush filters - #[allow(clippy::unwrap_used)] // propagating panics is ok - match_tx - .send(MatchManagerInput::Flush(MFT { - m: flush.m, - f: filter, - t: now, - })) - .unwrap(); - DaemonResponse::Flush - } - None => DaemonResponse::Err(format!( - "no filter with name {}.{}", - flush.f.0, flush.f.1 - )), - } - } - }; + let response = answer_order(config, &match_tx, &exec_tx, request); or_next!( "failed to send response:", - bin.serialize_into(conn2, &response) + match response { + Ok(res) => bin.serialize_into(conn2, &DaemonResponse::Order(res)), + Err(err) => bin.serialize_into(conn2, &DaemonResponse::Err(err)), + } ); } Err(err) => error!("failed to open connection from cli: {err}"), diff --git a/rust/src/daemon/statemap.rs b/rust/src/daemon/statemap.rs new file mode 100644 index 0000000..f386c00 --- /dev/null +++ b/rust/src/daemon/statemap.rs @@ -0,0 +1,123 @@ +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, +}; + +use regex::Regex; + +use crate::concepts::{ActionFilter, Match, Pattern, Time, MT}; + +#[derive(Clone)] +pub struct FilterOptions { + pub stream_name: Option, + pub filter_name: Option, + pub patterns: BTreeMap, Regex>, +} + +pub type StateMap = BTreeMap<&'static T, BTreeMap>>; + +// This trait is needed to permit to implement methods on an external type +pub trait StateMapTrait { + fn add(&mut self, mt: &MT); + fn rm(&mut self, mt: &MT) -> bool; + fn rm_times(&mut self, mt: &MT) -> Option>; + fn get_times(&self, mt: &MT) -> usize; + fn filtered(&self, filter_options: FilterOptions) -> Self; +} + +impl StateMapTrait for StateMap { + fn add(&mut self, mt: &MT) { + let inner_map = self.entry(mt.o).or_default(); + let inner_set = inner_map.entry(mt.m.clone()).or_default(); + inner_set.insert(mt.t); + } + + fn rm(&mut self, mt: &MT) -> bool { + let mut removed = false; + if let Some(inner_map) = self.get_mut(&mt.o) { + if let Some(inner_set) = inner_map.get_mut(&mt.m) { + inner_set.remove(&mt.t); + removed = true; + if inner_set.is_empty() { + inner_map.remove(&mt.m); + } + } + if inner_map.is_empty() { + self.remove(&mt.o); + } + } + removed + } + + fn rm_times(&mut self, mt: &MT) -> Option> { + let mut set = None; + if let Some(inner_map) = self.get_mut(&mt.o) { + set = inner_map.remove(&mt.m); + if inner_map.is_empty() { + self.remove(&mt.o); + } + } + set + } + + fn get_times(&self, mt: &MT) -> usize { + match self.get(&mt.o).and_then(|map| map.get(&mt.m)) { + Some(x) => x.len(), + None => 0, + } + } + + fn filtered(&self, filter_options: FilterOptions) -> Self { + let FilterOptions { + stream_name, + filter_name, + patterns, + } = filter_options; + self.iter() + // stream/filter filtering + .filter(|(object, _)| { + if let Some(stream_name) = &stream_name { + let full_name = object.full_name(); + let (s, f) = (full_name.0, full_name.1); + if *stream_name != s { + return false; + } + if let Some(filter_name) = &filter_name { + if *filter_name != f { + return false; + } + } + } + return true; + }) + // pattern filtering + .filter(|(object, _)| { + patterns + .iter() + .all(|(pattern, _)| object.patterns().get(pattern).is_some()) + }) + // match filtering + .filter_map(|(object, inner_map)| { + let map: BTreeMap> = inner_map + .iter() + .filter(|(match_, _)| { + match_ + .into_iter() + .zip(object.patterns()) + .filter_map(|(a_match, pattern)| match patterns.get(pattern.as_ref()) { + Some(regex) => Some((a_match, regex)), + None => None, + }) + .all(|(a_match, regex)| regex.is_match(a_match)) + }) + .map(|(a, b)| (a.clone(), b.clone())) + .collect(); + if map.len() > 0 { + Some((*object, map)) + } else { + None + } + }) + .collect() + } +} diff --git a/rust/src/main.rs b/rust/src/main.rs index 01fc447..ca53659 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -18,7 +18,8 @@ mod daemon; mod tests; mod utils; -use client::{flush, show, test_regex}; +use client::{request, test_regex}; +use concepts::Order; use daemon::daemon; use utils::cli::{Cli, Command}; @@ -44,26 +45,24 @@ fn main() { loglevel, socket, } => { - daemon(&config, loglevel, &socket); + daemon(config, loglevel, socket); } Command::Show { socket, format, limit, - pattern, patterns, - } => show(&socket, format, &limit, &pattern, &patterns), + } => request(socket, format, limit, patterns, Order::Show), Command::Flush { socket, format, limit, - pattern, patterns, - } => flush(&socket, format, &limit, &pattern, &patterns), + } => request(socket, format, limit, patterns, Order::Flush), Command::TestRegex { config, regex, line, - } => test_regex(&config, ®ex, &line), + } => test_regex(config, regex, line), } } diff --git a/rust/src/utils/cli.rs b/rust/src/utils/cli.rs index 663ac5e..e3d28bc 100644 --- a/rust/src/utils/cli.rs +++ b/rust/src/utils/cli.rs @@ -55,13 +55,9 @@ pub enum Command { #[clap(short = 'l', long, value_name = "STREAM[.FILTER]")] limit: Option, - /// only show items matching PATTERN regex - #[clap(short = 'p', long, value_name = "PATTERN")] - pattern: Option, - /// only show items matching name=PATTERN regex #[clap(value_parser = parse_named_regex, value_name = "NAME=PATTERN")] - patterns: Vec, + patterns: Vec<(String, String)>, }, /// Remove a target from reaction (e.g. unban) @@ -82,13 +78,9 @@ Then prints the flushed matches and actions." #[clap(short = 'l', long, value_name = "STREAM[.FILTER]")] limit: Option, - /// only show items matching PATTERN regex - #[clap(short = 'p', long, value_name = "PATTERN")] - pattern: Option, - /// only show items matching name=PATTERN regex #[clap(value_parser = parse_named_regex, value_name = "NAME=PATTERN")] - patterns: Vec, + patterns: Vec<(String, String)>, }, /// Test a regex @@ -129,24 +121,15 @@ impl fmt::Display for Format { } } -// Structs - -#[allow(dead_code)] -#[derive(Clone, Debug)] -pub struct NamedRegex { - pub regex: Regex, - pub name: String, -} - -fn parse_named_regex(s: &str) -> Result { +fn parse_named_regex(s: &str) -> Result<(String, String), String> { let (name, v) = s .split_once('=') .ok_or("When given as a positional argument, a pattern must be prefixed with a name, ex: ip=192.168.0.1")?; - let regex = Regex::new(v).map_err(|err| format!("{}", err))?; - Ok(NamedRegex { - regex, - name: name.to_string(), - }) + let _ = Regex::new(v).map_err(|err| format!("{}", err))?; + Ok(( + name.to_string(), + v.to_string(), + )) } fn parse_log_level(s: &str) -> Result { diff --git a/rust/test.jsonnet b/rust/test.jsonnet index 9106979..41335f7 100644 --- a/rust/test.jsonnet +++ b/rust/test.jsonnet @@ -22,21 +22,21 @@ streams: { s1: { - cmd: ['sh', '-c', "seq 20 | tr ' ' '\n' | while read i; do echo found $((i % 5)); sleep 0.3; done"], + cmd: ['sh', '-c', "seq 20 | tr ' ' '\n' | while read i; do echo found $((i % 5)); sleep 3; done"], filters: { f1: { regex: [ '^found $', ], retry: 2, - retryperiod: '5s', + retryperiod: '60s', actions: { damn: { cmd: ['notify-send', 'first stream', 'ban '], }, undamn: { cmd: ['notify-send', 'first stream', 'unban '], - after: '6s', + after: '20s', onexit: true, }, },