Socket communication!

This commit is contained in:
ppom 2024-10-01 12:00:00 +02:00
commit ce0c34de8e
18 changed files with 577 additions and 320 deletions

View file

@ -1,36 +1,80 @@
use std::path::PathBuf;
use std::{error::Error, io::stdout, os::unix::net::UnixStream, path::PathBuf, process::exit};
use log::debug;
use regex::Regex;
use bincode::Options;
use log::{debug, error, Level};
use crate::utils::cli::{Format, NamedRegex};
pub fn show(
socket: &PathBuf,
format: Format,
limit: &Option<String>,
pattern: &Option<Regex>,
patterns: &Vec<NamedRegex>,
) {
debug!(
"show {:?} {:?} {:?} {:?} {:?}",
socket, format, limit, pattern, patterns
);
use crate::{
concepts::{ClientRequest, ClientStatus, DaemonResponse, Order},
utils::{bincode_options, cli::Format, SimpleLogger},
};
macro_rules! or_quit {
($msg:expr, $expression:expr) => {
match $expression {
Ok(x) => x,
Err(err) => {
error!("failed to communicate to daemon: {}, {}", $msg, err);
exit(1);
}
}
};
}
pub fn flush(
socket: &PathBuf,
format: Format,
limit: &Option<String>,
pattern: &Option<Regex>,
patterns: &Vec<NamedRegex>,
) {
debug!(
"flush {:?} {:?} {:?} {:?} {:?}",
socket, format, limit, pattern, patterns
);
fn send_retrieve(socket: &PathBuf, req: &ClientRequest) -> DaemonResponse {
let bin = bincode_options();
let conn = or_quit!("opening connection to daemon", UnixStream::connect(socket));
let conn2 = or_quit!("failed to clone stream", conn.try_clone());
or_quit!("failed to send request", bin.serialize_into(conn, req));
or_quit!(
"failed to send request",
bin.deserialize_from::<UnixStream, DaemonResponse>(conn2)
)
}
pub fn test_regex(config_path: &PathBuf, regex: &String, line: &Option<String>) {
fn print_status(cs: ClientStatus, format: Format) -> Result<(), Box<dyn Error>> {
Ok(match format {
Format::JSON => serde_json::to_writer(stdout().lock(), &cs)?,
Format::YAML => serde_yaml::to_writer(stdout().lock(), &cs)?,
})
}
pub fn request(
socket: PathBuf,
format: Format,
stream_filter: Option<String>,
patterns: Vec<(String, String)>,
order: Order,
) {
if let Err(err) = SimpleLogger::init(Level::Debug) {
eprintln!("ERROR could not initialize logging: {err}");
exit(1);
}
let response = send_retrieve(
&socket,
&ClientRequest {
order,
stream_filter,
patterns,
},
);
match response {
DaemonResponse::Order(cs) => {
if let Err(err) = print_status(cs, format) {
error!("while printing response: {err}");
exit(1);
}
}
DaemonResponse::Err(err) => {
error!("failed to communicate to daemon: error response: {err}");
exit(1);
}
}
}
pub fn test_regex(config_path: PathBuf, regex: String, line: Option<String>) {
if let Err(err) = SimpleLogger::init(Level::Debug) {
eprintln!("ERROR could not initialize logging: {err}");
exit(1);
}
debug!("test-regex {:?} {:?} {:?} ", config_path, regex, line);
}

View file

@ -4,7 +4,7 @@ use chrono::TimeDelta;
use serde::Deserialize;
use super::{Match, Pattern};
use super::{ActionFilter, Match, Pattern};
use crate::utils::parse_duration;
#[derive(Clone, Debug, Deserialize)]
@ -34,6 +34,20 @@ fn set_false() -> bool {
false
}
impl ActionFilter for Action {
fn patterns(&self) -> &BTreeSet<Arc<Pattern>> {
&self.patterns
}
fn full_name<'a>(&'a self) -> (&'a str, &'a str, &'a str) {
(
&self.stream_name,
&self.filter_name,
&self.name,
)
}
}
impl Action {
pub fn after_duration(&self) -> Option<TimeDelta> {
self.after_duration
@ -43,14 +57,6 @@ impl Action {
self.on_exit
}
pub fn full_name(&self) -> (String, String, String) {
(
self.stream_name.clone(),
self.filter_name.clone(),
self.name.clone(),
)
}
pub fn setup(
&mut self,
stream_name: &str,

View file

@ -36,6 +36,10 @@ impl Config {
&self.streams
}
pub fn patterns(&self) -> &Patterns {
&self.patterns
}
pub fn concurrency(&self) -> usize {
self.concurrency
}

View file

@ -12,7 +12,7 @@ use serde::Deserialize;
use super::{
messages::{Match, Time, MAT},
Action, Pattern, Patterns,
Action, ActionFilter, Pattern, Patterns,
};
use crate::{daemon::ExecsManagerInput, utils::parse_duration};
@ -45,6 +45,16 @@ pub struct Filter {
stream_name: String,
}
impl ActionFilter for Filter {
fn full_name<'a>(&'a self) -> (&'a str, &'a str, &'a str) {
(self.stream_name.as_ref(), self.name.as_ref(), "")
}
fn patterns(&self) -> &BTreeSet<Arc<Pattern>> {
&self.patterns
}
}
impl Filter {
#[cfg(test)]
pub fn from_name(stream_name: &str, filter_name: &str) -> Filter {
@ -55,10 +65,6 @@ impl Filter {
}
}
pub fn full_name(&self) -> (String, String) {
(self.stream_name.clone(), self.name.clone())
}
pub fn retry(&self) -> Option<u32> {
self.retry
}
@ -71,10 +77,6 @@ impl Filter {
self.longuest_action_duration
}
pub fn patterns(&self) -> &BTreeSet<Arc<Pattern>> {
&self.patterns
}
pub fn setup(
&mut self,
stream_name: &str,
@ -202,26 +204,15 @@ impl Filter {
None
}
pub fn send_actions(
&'static self,
m: &Match,
t: Time,
tx: &Sender<ExecsManagerInput>,
exec: bool,
) {
pub fn send_actions(&'static self, m: &Match, t: Time, tx: &Sender<ExecsManagerInput>) {
for action in self.actions.values() {
let mat = MAT {
m: m.clone(),
a: action,
o: action,
t: t + action.after_duration().unwrap_or_default(),
};
#[allow(clippy::unwrap_used)] // propagating panics is ok
tx.send(if exec {
ExecsManagerInput::Exec(mat)
} else {
ExecsManagerInput::Flush(mat)
})
.unwrap();
tx.send(ExecsManagerInput::Exec(mat)).unwrap();
}
}
}

View file

@ -1,23 +1,33 @@
use chrono::{DateTime, Local, TimeDelta};
use super::{Action, Filter};
use super::{Action, ActionFilter, Filter};
pub type Time = DateTime<Local>;
pub type Match = Vec<String>;
#[derive(Clone)]
pub struct MFT {
pub struct MT<T: ActionFilter + 'static> {
pub m: Match,
pub f: &'static Filter,
pub o: &'static T,
pub t: Time,
}
#[derive(Clone)]
pub struct MAT {
pub m: Match,
pub a: &'static Action,
pub t: Time,
}
pub type MFT = MT<Filter>;
pub type MAT = MT<Action>;
// #[derive(Clone)]
// pub struct MFT {
// pub m: Match,
// pub f: &'static Filter,
// pub t: Time,
// }
// #[derive(Clone)]
// pub struct MAT {
// pub m: Match,
// pub a: &'static Action,
// pub t: Time,
// }
#[derive(Clone, Debug)]
pub struct LogEntry {
@ -42,7 +52,7 @@ impl From<LogEntry> for MFT {
fn from(value: LogEntry) -> Self {
MFT {
m: value.m,
f: value.f,
o: value.f,
t: value.t,
}
}

View file

@ -6,6 +6,8 @@ mod pattern;
mod socket_messages;
mod stream;
use std::{collections::BTreeSet, fmt::Display, sync::Arc};
pub use action::*;
pub use config::*;
pub use filter::*;
@ -13,3 +15,8 @@ pub use messages::*;
pub use pattern::*;
pub use socket_messages::*;
pub use stream::*;
pub trait ActionFilter: Clone + Display + PartialEq + Eq + PartialOrd + Ord {
fn patterns(&self) -> &BTreeSet<Arc<Pattern>>;
fn full_name<'a>(&'a self) -> (&'a str, &'a str, &'a str);
}

View file

@ -2,27 +2,33 @@ use std::collections::{BTreeMap, BTreeSet};
use super::Match;
use serde::{Deserialize, Serialize};
use serde::{ser::SerializeStruct, Deserialize, Serialize};
// We don't need protocol versionning here because
// client and daemon are the same binary
#[derive(Clone, Serialize, Deserialize)]
pub enum ClientRequest {
Info,
Flush(FlushOpts),
#[derive(Copy, Clone, Serialize, Deserialize)]
pub enum Order {
Show,
Flush,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct FlushOpts {
pub struct ClientRequest {
pub order: Order,
pub stream_filter: Option<String>,
pub patterns: Vec<(String, String)>,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct FlushOptions {
pub m: Match,
pub f: (String, String),
}
#[derive(Clone, Serialize, Deserialize)]
#[derive(Serialize, Deserialize)]
pub enum DaemonResponse {
Info(InfoRes),
Flush,
Order(ClientStatus),
Err(String),
}
@ -31,3 +37,40 @@ pub struct InfoRes {
pub matches: BTreeMap<(String, String), BTreeMap<Match, BTreeSet<i64>>>,
pub execs: BTreeMap<(String, String, String), BTreeMap<Match, BTreeSet<i64>>>,
}
pub type ClientStatus = BTreeMap<String, BTreeMap<String, BTreeMap<String, PatternStatus>>>;
#[derive(Debug, Default, Deserialize)]
pub struct PatternStatus {
pub matches: usize,
pub actions: BTreeMap<String, Vec<String>>,
}
impl Serialize for PatternStatus {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
// We only skip serializing emptiness if we're on a human-readable format
// This means we're printing for user, not exchanging it over a socket
let state = if serializer.is_human_readable() {
let ser_matches = self.matches != 0;
let ser_actions = self.actions.len() != 0;
let mut state = serializer
.serialize_struct("PatternStatus", ser_matches as usize + ser_actions as usize)?;
if ser_matches {
state.serialize_field("matches", &self.matches)?;
}
if ser_actions {
state.serialize_field("actions", &self.actions)?;
}
state
} else {
let mut state = serializer.serialize_struct("PatternStatus", 2)?;
state.serialize_field("matches", &self.matches)?;
state.serialize_field("actions", &self.actions)?;
state
};
state.end()
}
}

View file

@ -106,7 +106,7 @@ impl Stream {
match_tx
.send(MatchManagerInput::Match(MFT {
m: match_,
f: filter,
o: filter,
t: Local::now(),
}))
.unwrap();

View file

@ -12,7 +12,7 @@ use log::{debug, error, warn};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use crate::{
concepts::{Config, Filter, LogEntry, Match},
concepts::{ActionFilter, Config, Filter, LogEntry, Match},
utils::{bincode_options, BincodeOptions},
};
@ -137,7 +137,10 @@ impl WriteDB {
let database_header: DatabaseHeader = config
.filters()
.into_iter()
.map(|f| f.full_name())
.map(|f| {
let names = f.full_name();
(names.0.to_owned(), names.1.to_owned())
})
.enumerate()
.collect();
@ -163,7 +166,7 @@ impl WriteDB {
fn _write<T: Serialize + std::fmt::Debug>(&mut self, data: T) -> Result<(), DBError> {
let encoded = self.bin.serialize(&data)?;
debug!("writing this: {:?}, {:?}", &data, &encoded);
// debug!("writing this: {:?}, {:?}", &data, &encoded);
self.f.write_all(&encoded)?;
Ok(())
}

View file

@ -12,7 +12,7 @@ use log::{error, info, warn};
use thiserror::Error;
use super::MatchManagerInput;
use crate::concepts::{Config, Filter, LogEntry, Match, Time};
use crate::concepts::{ActionFilter, Config, Filter, LogEntry, Match, Time};
mod lowlevel;
mod tests;

View file

@ -1,5 +1,5 @@
use std::{
collections::{BTreeMap, BTreeSet},
collections::BTreeMap,
process::Stdio,
sync::mpsc::{Receiver, Sender, SyncSender},
};
@ -9,66 +9,30 @@ use log::{error, info};
use timer::MessageTimer;
use crate::{
concepts::{Action, Config, Match, Time, MAT},
concepts::{Action, ActionFilter, Config, LogEntry, Order, MAT},
utils::ThreadPool,
};
use super::{
database::DatabaseManagerInput,
statemap::{FilterOptions, StateMap, StateMapTrait},
};
#[derive(Clone)]
pub enum ExecsManagerInput {
Exec(MAT),
ExecPending(MAT),
Flush(MAT),
Gimme(SyncSender<ExecsMap>),
Order(Order, FilterOptions, SyncSender<ExecsMap>),
Stop,
}
type ExecsMap = BTreeMap<&'static Action, BTreeMap<Match, BTreeSet<Time>>>;
trait ExecsMapTrait {
fn add(&mut self, mat: &MAT);
fn rm(&mut self, mat: &MAT) -> bool;
fn rm_times(&mut self, mat: &MAT) -> Option<BTreeSet<Time>>;
}
impl ExecsMapTrait for ExecsMap {
fn add(&mut self, mat: &MAT) {
let inner_map = self.entry(mat.a).or_default();
let inner_set = inner_map.entry(mat.m.clone()).or_default();
inner_set.insert(mat.t);
}
fn rm(&mut self, mat: &MAT) -> bool {
let mut removed = false;
if let Some(inner_map) = self.get_mut(&mat.a) {
if let Some(inner_set) = inner_map.get_mut(&mat.m) {
inner_set.remove(&mat.t);
removed = true;
if inner_set.is_empty() {
inner_map.remove(&mat.m);
}
}
if inner_map.is_empty() {
self.remove(&mat.a);
}
}
removed
}
fn rm_times(&mut self, mat: &MAT) -> Option<BTreeSet<Time>> {
let mut set = None;
if let Some(inner_map) = self.get_mut(&mat.a) {
set = inner_map.remove(&mat.m);
if inner_map.is_empty() {
self.remove(&mat.a);
}
}
set
}
}
type ExecsMap = StateMap<Action>;
pub fn execs_manager(
config: &'static Config,
exec_rx: Receiver<ExecsManagerInput>,
exec_tx: Sender<ExecsManagerInput>,
log_tx: SyncSender<DatabaseManagerInput>,
) {
// Initialize a ThreadPool only when concurrency hasn't been disabled
let thread_pool = if config.concurrency() > 1 {
@ -79,7 +43,7 @@ pub fn execs_manager(
let exec_now = |mat: MAT| {
let mut closure = {
let action = mat.a;
let action = mat.o;
// Construct command
let mut command = action.exec(&mat.m);
@ -128,15 +92,50 @@ pub fn execs_manager(
exec_now(mat);
}
}
ExecsManagerInput::Flush(mat) => {
if let Some(set) = execs.rm_times(&mat) {
for _ in set {
exec_now(mat.clone());
ExecsManagerInput::Order(order, options, tx) => {
let filtered = execs.filtered(options);
if let Order::Flush = order {
let now = Local::now();
// filter the state_map according to provided options
for (action, inner_map) in &filtered {
// get filter (required for LogEntry, FIXME optimize this)
let filter = {
let name = action.full_name();
#[allow(clippy::unwrap_used)]
// We're pretty confident our action has a filter
config
.get_filter(&(name.0.to_string(), name.1.to_string()))
.unwrap()
};
for (match_, _) in inner_map {
let mat = MAT {
m: match_.clone(),
o: action,
t: now,
};
// delete them from state and execute them
if let Some(set) = execs.rm_times(&mat) {
for _ in set {
exec_now(mat.clone());
}
}
#[allow(clippy::unwrap_used)] // propagating panics is ok
log_tx
.send(DatabaseManagerInput::Flush(LogEntry {
exec: false,
m: mat.m,
f: filter,
t: mat.t,
}))
.unwrap();
}
}
}
#[allow(clippy::unwrap_used)] // propagating panics is ok
tx.send(filtered).unwrap();
}
#[allow(clippy::unwrap_used)] // propagating panics is ok
ExecsManagerInput::Gimme(tx) => tx.send(execs.clone()).unwrap(),
ExecsManagerInput::Stop => {
for (action, inner_map) in execs {
if action.on_exit() {
@ -144,7 +143,7 @@ pub fn execs_manager(
for _ in inner_set {
exec_now(MAT {
m: match_.clone(),
a: action,
o: action,
t: Local::now(),
});
}

View file

@ -1,73 +1,29 @@
use std::{
collections::{BTreeMap, BTreeSet},
collections::BTreeMap,
sync::mpsc::{Receiver, Sender, SyncSender},
};
use chrono::Local;
use log::debug;
use timer::MessageTimer;
use super::{database::DatabaseManagerInput, ExecsManagerInput};
use crate::{
concepts::Filter,
concepts::{LogEntry, Match, Time, MFT},
use super::{
database::DatabaseManagerInput,
statemap::{FilterOptions, StateMap, StateMapTrait},
ExecsManagerInput,
};
use crate::concepts::{ActionFilter, Filter, LogEntry, Order, MFT};
#[derive(Clone)]
pub enum MatchManagerInput {
Match(MFT),
Unmatch(MFT),
Flush(MFT),
Gimme(SyncSender<MatchesMap>),
Order(Order, FilterOptions, SyncSender<MatchesMap>),
EndOfStartup,
Stop,
}
type MatchesMap = BTreeMap<&'static Filter, BTreeMap<Match, BTreeSet<Time>>>;
// This trait is needed to permit to implement methods on an external type
trait MatchesMapTrait {
fn add(&mut self, mft: &MFT);
fn rm(&mut self, mft: &MFT);
fn rm_times(&mut self, mft: &MFT);
fn get_times(&self, mft: &MFT) -> usize;
}
impl MatchesMapTrait for MatchesMap {
fn add(&mut self, mft: &MFT) {
let inner_map = self.entry(mft.f).or_default();
let inner_set = inner_map.entry(mft.m.clone()).or_default();
inner_set.insert(mft.t);
}
fn rm(&mut self, mft: &MFT) {
if let Some(inner_map) = self.get_mut(&mft.f) {
if let Some(inner_set) = inner_map.get_mut(&mft.m) {
inner_set.remove(&mft.t);
if inner_set.is_empty() {
inner_map.remove(&mft.m);
}
}
if inner_map.is_empty() {
self.remove(&mft.f);
}
}
}
fn rm_times(&mut self, mft: &MFT) {
if let Some(inner_map) = self.get_mut(&mft.f) {
inner_map.remove(&mft.m);
if inner_map.is_empty() {
self.remove(&mft.f);
}
}
}
fn get_times(&self, mft: &MFT) -> usize {
match self.get(&mft.f).and_then(|map| map.get(&mft.m)) {
Some(x) => x.len(),
None => 0,
}
}
}
pub type MatchesMap = StateMap<Filter>;
pub fn matches_manager(
match_rx: Receiver<MatchManagerInput>,
@ -82,6 +38,9 @@ pub fn matches_manager(
let mut startup = true;
for mft in match_rx.iter() {
for (filter, map) in matches.iter() {
debug!("MATCHES {:?} {:?}", filter.full_name(), map.keys());
}
match mft {
MatchManagerInput::EndOfStartup => {
debug!("end of startup!");
@ -89,7 +48,7 @@ pub fn matches_manager(
}
MatchManagerInput::Match(mft) => {
// Store matches
let exec = match mft.f.retry() {
let exec = match mft.o.retry() {
None => true,
Some(retry) => {
// Add new match
@ -98,7 +57,7 @@ pub fn matches_manager(
let guard = timer.schedule_with_delay(
// retry_duration is always Some() after filter's setup
#[allow(clippy::unwrap_used)]
mft.f.retry_duration().unwrap(),
mft.o.retry_duration().unwrap(),
MatchManagerInput::Unmatch(mft.clone()),
);
guard.ignore();
@ -110,10 +69,10 @@ pub fn matches_manager(
// Executing actions
if exec {
// Delete matches only if storing them
if mft.f.retry().is_some() {
if mft.o.retry().is_some() {
matches.rm_times(&mft);
}
mft.f.send_actions(&mft.m, mft.t, &action_tx, true);
mft.o.send_actions(&mft.m, mft.t, &action_tx);
}
if !startup {
@ -122,30 +81,48 @@ pub fn matches_manager(
.send(DatabaseManagerInput::Log(LogEntry {
exec,
m: mft.m,
f: mft.f,
f: mft.o,
t: mft.t,
}))
.unwrap();
}
}
MatchManagerInput::Unmatch(mft) => matches.rm(&mft),
#[allow(clippy::todo)]
MatchManagerInput::Flush(mft) => {
// remove from matches
matches.rm_times(&mft);
// send to DB
MatchManagerInput::Unmatch(mft) => {
matches.rm(&mft);
}
MatchManagerInput::Order(order, options, tx) => {
let filtered = matches.filtered(options);
if let Order::Flush = order {
let now = Local::now();
// filter the state_map according to provided options
for (filter, inner_map) in &filtered {
for (match_, _) in inner_map {
let mft = MFT {
m: match_.clone(),
o: filter,
t: now,
};
// delete them from state
matches.rm_times(&mft);
// send them to DB
#[allow(clippy::unwrap_used)] // propagating panics is ok
log_tx
.send(DatabaseManagerInput::Flush(LogEntry {
exec: false,
m: mft.m,
f: mft.o,
t: mft.t,
}))
.unwrap();
}
}
}
#[allow(clippy::unwrap_used)] // propagating panics is ok
log_tx
.send(DatabaseManagerInput::Flush(LogEntry {
exec: false,
m: mft.m,
f: mft.f,
t: mft.t,
}))
.unwrap();
tx.send(filtered).unwrap();
}
#[allow(clippy::unwrap_used)] // propagating panics is ok
MatchManagerInput::Gimme(tx) => tx.send(matches.clone()).unwrap(),
MatchManagerInput::Stop => break,
}
}

View file

@ -1,6 +1,6 @@
use std::{
fs,
path::Path,
path::PathBuf,
process::exit,
sync::{
atomic::{AtomicBool, Ordering},
@ -25,15 +25,16 @@ mod database;
mod execs;
mod matches;
mod socket;
mod statemap;
#[allow(unused_variables)]
pub fn daemon(config_path: &Path, loglevel: Level, socket: &Path) {
pub fn daemon(config_path: PathBuf, loglevel: Level, socket: PathBuf) {
if let Err(err) = SimpleLogger::init(loglevel) {
eprintln!("ERROR could not initialize logging: {err}");
exit(1);
}
let config: &'static Config = match Config::from_file(config_path) {
let config: &'static Config = match Config::from_file(&config_path) {
Ok(config) => Box::leak(Box::new(config)),
Err(err) => {
error!("{err}");
@ -59,12 +60,15 @@ pub fn daemon(config_path: &Path, loglevel: Level, socket: &Path) {
let matches_manager_thread_handle = {
let match_tx_matches = match_tx.clone();
let exec_tx_matches = exec_tx.clone();
thread::spawn(move || matches_manager(match_rx, match_tx_matches, exec_tx_matches, log_tx))
let log_tx_matches = log_tx.clone();
thread::spawn(move || {
matches_manager(match_rx, match_tx_matches, exec_tx_matches, log_tx_matches)
})
};
let execs_manager_thread_handle = {
let exec_tx_execs = exec_tx.clone();
thread::spawn(move || execs_manager(config, exec_rx, exec_tx_execs))
thread::spawn(move || execs_manager(config, exec_rx, exec_tx_execs, log_tx))
};
let database_manager_thread_handle = {

View file

@ -1,21 +1,25 @@
use std::{
collections::BTreeMap,
fs, io,
os::unix::net::{UnixListener, UnixStream},
path::PathBuf,
process::exit,
sync::mpsc::{sync_channel, Sender},
sync::{
mpsc::{sync_channel, Sender},
Arc,
},
};
use bincode::Options;
use chrono::Local;
use log::{error, warn};
use regex::Regex;
use crate::{
concepts::{ClientRequest, Config, DaemonResponse, InfoRes, MFT},
concepts::{ActionFilter, ClientRequest, ClientStatus, Config, DaemonResponse, Pattern, PatternStatus},
utils::bincode_options,
};
use super::{ExecsManagerInput, MatchManagerInput};
use super::{statemap::FilterOptions, ExecsManagerInput, MatchManagerInput};
macro_rules! err_str {
($expression:expr) => {
@ -48,6 +52,131 @@ fn open_socket(path: PathBuf) -> Result<UnixListener, String> {
err_str!(UnixListener::bind(path))
}
fn answer_order(
config: &'static Config,
match_tx: &Sender<MatchManagerInput>,
exec_tx: &Sender<ExecsManagerInput>,
options: ClientRequest,
) -> Result<ClientStatus, String> {
// Compute options
let filtering_options = {
let (stream_name, filter_name) = match options.stream_filter {
Some(sf) => match sf.split_once(".") {
Some((s, f)) => (Some(s.to_string()), Some(f.to_string())),
None => (Some(sf), None),
},
None => (None, None),
};
// Compute the Vec<(pattern_name, String)> into a BTreeMap<Arc<Pattern>, Regex>
let patterns = options
.patterns
.iter()
.map(|(name, reg)| {
// lookup pattern in config.patterns
config
.patterns()
.iter()
// retrieve or Err
.find(|(pattern_name, _)| name == *pattern_name)
.ok_or_else(|| format!("pattern '{name}' doesn't exist"))
// compile Regex or Err
.and_then(|(_, pattern)| match Regex::new(reg) {
Ok(reg) => Ok((pattern.clone(), reg)),
Err(err) => Err(format!("pattern '{name}' regex doesn't compile: {err}")),
})
})
.collect::<Result<BTreeMap<Arc<Pattern>, Regex>, String>>()?;
FilterOptions {
stream_name,
filter_name,
patterns,
}
};
// ask for matches clone
let matches = {
let (m_tx, m_rx) = sync_channel(0);
#[allow(clippy::unwrap_used)] // propagating panics is ok
match_tx
.send(MatchManagerInput::Order(
options.order,
filtering_options.clone(),
m_tx,
))
.unwrap();
#[allow(clippy::unwrap_used)] // propagating panics is ok
m_rx.recv().unwrap()
};
// ask for execs clone
let execs = {
let (e_tx, e_rx) = sync_channel(0);
#[allow(clippy::unwrap_used)] // propagating panics is ok
exec_tx
.send(ExecsManagerInput::Order(
options.order,
filtering_options,
e_tx,
))
.unwrap();
#[allow(clippy::unwrap_used)] // propagating panics is ok
e_rx.recv().unwrap()
};
// Transform matches and execs into a ClientStatus
let cs: ClientStatus = matches
.into_iter()
.fold(BTreeMap::new(), |mut acc, (object, map)| {
let (stream, filter, _) = object.full_name();
acc.entry(stream.into())
.or_default()
.entry(filter.into())
.or_default()
.extend(map.into_iter().map(|(match_, times)| {
(
match_.join(" "),
PatternStatus {
matches: times.len(),
..Default::default()
},
)
}));
acc
});
let cs = execs.into_iter().fold(cs, |mut acc, (object, map)| {
let (stream, filter, action) = object.full_name();
let inner_map = acc
.entry(stream.into())
.or_default()
.entry(filter.into())
.or_default();
map.into_iter().for_each(|(match_, times)| {
inner_map
.entry(match_.join(" "))
.or_default()
.actions
.insert(
action.to_string(),
times
.into_iter()
.map(|time| time.to_rfc3339().chars().take(19).collect())
.collect(),
);
});
acc
});
Ok(cs)
}
macro_rules! or_next {
($msg:expr, $expression:expr) => {
match $expression {
@ -84,78 +213,13 @@ pub fn socket_manager(
"invalid message received: ",
bin.deserialize_from::<UnixStream, ClientRequest>(conn)
);
let response = match request {
ClientRequest::Info => {
// ask for matches clone
let (m_tx, m_rx) = sync_channel(0);
#[allow(clippy::unwrap_used)] // propagating panics is ok
match_tx.send(MatchManagerInput::Gimme(m_tx)).unwrap();
#[allow(clippy::unwrap_used)] // propagating panics is ok
let matches = m_rx.recv().unwrap();
// ask for execs clone
let (e_tx, e_rx) = sync_channel(0);
#[allow(clippy::unwrap_used)] // propagating panics is ok
exec_tx.send(ExecsManagerInput::Gimme(e_tx)).unwrap();
#[allow(clippy::unwrap_used)] // propagating panics is ok
let execs = e_rx.recv().unwrap();
// Transform structures
macro_rules! map_map {
($map:expr) => {
$map.into_iter()
.map(|(object, inner_map)| {
(
object.full_name(),
inner_map
.into_iter()
.map(|(key, set)| {
(
key,
set.into_iter()
.map(|time| time.timestamp())
.collect(),
)
})
.collect(),
)
})
.collect()
};
}
DaemonResponse::Info(InfoRes {
matches: map_map!(matches),
execs: map_map!(execs),
})
}
ClientRequest::Flush(flush) => {
match config.get_filter(&flush.f) {
Some(filter) => {
let now = Local::now();
// Flush actions
filter.send_actions(&flush.m, now, &exec_tx, false);
// Flush filters
#[allow(clippy::unwrap_used)] // propagating panics is ok
match_tx
.send(MatchManagerInput::Flush(MFT {
m: flush.m,
f: filter,
t: now,
}))
.unwrap();
DaemonResponse::Flush
}
None => DaemonResponse::Err(format!(
"no filter with name {}.{}",
flush.f.0, flush.f.1
)),
}
}
};
let response = answer_order(config, &match_tx, &exec_tx, request);
or_next!(
"failed to send response:",
bin.serialize_into(conn2, &response)
match response {
Ok(res) => bin.serialize_into(conn2, &DaemonResponse::Order(res)),
Err(err) => bin.serialize_into(conn2, &DaemonResponse::Err(err)),
}
);
}
Err(err) => error!("failed to open connection from cli: {err}"),

123
rust/src/daemon/statemap.rs Normal file
View file

@ -0,0 +1,123 @@
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};
use regex::Regex;
use crate::concepts::{ActionFilter, Match, Pattern, Time, MT};
#[derive(Clone)]
pub struct FilterOptions {
pub stream_name: Option<String>,
pub filter_name: Option<String>,
pub patterns: BTreeMap<Arc<Pattern>, Regex>,
}
pub type StateMap<T> = BTreeMap<&'static T, BTreeMap<Match, BTreeSet<Time>>>;
// This trait is needed to permit to implement methods on an external type
pub trait StateMapTrait<T: ActionFilter> {
fn add(&mut self, mt: &MT<T>);
fn rm(&mut self, mt: &MT<T>) -> bool;
fn rm_times(&mut self, mt: &MT<T>) -> Option<BTreeSet<Time>>;
fn get_times(&self, mt: &MT<T>) -> usize;
fn filtered(&self, filter_options: FilterOptions) -> Self;
}
impl<T: ActionFilter> StateMapTrait<T> for StateMap<T> {
fn add(&mut self, mt: &MT<T>) {
let inner_map = self.entry(mt.o).or_default();
let inner_set = inner_map.entry(mt.m.clone()).or_default();
inner_set.insert(mt.t);
}
fn rm(&mut self, mt: &MT<T>) -> bool {
let mut removed = false;
if let Some(inner_map) = self.get_mut(&mt.o) {
if let Some(inner_set) = inner_map.get_mut(&mt.m) {
inner_set.remove(&mt.t);
removed = true;
if inner_set.is_empty() {
inner_map.remove(&mt.m);
}
}
if inner_map.is_empty() {
self.remove(&mt.o);
}
}
removed
}
fn rm_times(&mut self, mt: &MT<T>) -> Option<BTreeSet<Time>> {
let mut set = None;
if let Some(inner_map) = self.get_mut(&mt.o) {
set = inner_map.remove(&mt.m);
if inner_map.is_empty() {
self.remove(&mt.o);
}
}
set
}
fn get_times(&self, mt: &MT<T>) -> usize {
match self.get(&mt.o).and_then(|map| map.get(&mt.m)) {
Some(x) => x.len(),
None => 0,
}
}
fn filtered(&self, filter_options: FilterOptions) -> Self {
let FilterOptions {
stream_name,
filter_name,
patterns,
} = filter_options;
self.iter()
// stream/filter filtering
.filter(|(object, _)| {
if let Some(stream_name) = &stream_name {
let full_name = object.full_name();
let (s, f) = (full_name.0, full_name.1);
if *stream_name != s {
return false;
}
if let Some(filter_name) = &filter_name {
if *filter_name != f {
return false;
}
}
}
return true;
})
// pattern filtering
.filter(|(object, _)| {
patterns
.iter()
.all(|(pattern, _)| object.patterns().get(pattern).is_some())
})
// match filtering
.filter_map(|(object, inner_map)| {
let map: BTreeMap<Match, BTreeSet<Time>> = inner_map
.iter()
.filter(|(match_, _)| {
match_
.into_iter()
.zip(object.patterns())
.filter_map(|(a_match, pattern)| match patterns.get(pattern.as_ref()) {
Some(regex) => Some((a_match, regex)),
None => None,
})
.all(|(a_match, regex)| regex.is_match(a_match))
})
.map(|(a, b)| (a.clone(), b.clone()))
.collect();
if map.len() > 0 {
Some((*object, map))
} else {
None
}
})
.collect()
}
}

View file

@ -18,7 +18,8 @@ mod daemon;
mod tests;
mod utils;
use client::{flush, show, test_regex};
use client::{request, test_regex};
use concepts::Order;
use daemon::daemon;
use utils::cli::{Cli, Command};
@ -44,26 +45,24 @@ fn main() {
loglevel,
socket,
} => {
daemon(&config, loglevel, &socket);
daemon(config, loglevel, socket);
}
Command::Show {
socket,
format,
limit,
pattern,
patterns,
} => show(&socket, format, &limit, &pattern, &patterns),
} => request(socket, format, limit, patterns, Order::Show),
Command::Flush {
socket,
format,
limit,
pattern,
patterns,
} => flush(&socket, format, &limit, &pattern, &patterns),
} => request(socket, format, limit, patterns, Order::Flush),
Command::TestRegex {
config,
regex,
line,
} => test_regex(&config, &regex, &line),
} => test_regex(config, regex, line),
}
}

View file

@ -55,13 +55,9 @@ pub enum Command {
#[clap(short = 'l', long, value_name = "STREAM[.FILTER]")]
limit: Option<String>,
/// only show items matching PATTERN regex
#[clap(short = 'p', long, value_name = "PATTERN")]
pattern: Option<Regex>,
/// only show items matching name=PATTERN regex
#[clap(value_parser = parse_named_regex, value_name = "NAME=PATTERN")]
patterns: Vec<NamedRegex>,
patterns: Vec<(String, String)>,
},
/// Remove a target from reaction (e.g. unban)
@ -82,13 +78,9 @@ Then prints the flushed matches and actions."
#[clap(short = 'l', long, value_name = "STREAM[.FILTER]")]
limit: Option<String>,
/// only show items matching PATTERN regex
#[clap(short = 'p', long, value_name = "PATTERN")]
pattern: Option<Regex>,
/// only show items matching name=PATTERN regex
#[clap(value_parser = parse_named_regex, value_name = "NAME=PATTERN")]
patterns: Vec<NamedRegex>,
patterns: Vec<(String, String)>,
},
/// Test a regex
@ -129,24 +121,15 @@ impl fmt::Display for Format {
}
}
// Structs
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct NamedRegex {
pub regex: Regex,
pub name: String,
}
fn parse_named_regex(s: &str) -> Result<NamedRegex, String> {
fn parse_named_regex(s: &str) -> Result<(String, String), String> {
let (name, v) = s
.split_once('=')
.ok_or("When given as a positional argument, a pattern must be prefixed with a name, ex: ip=192.168.0.1")?;
let regex = Regex::new(v).map_err(|err| format!("{}", err))?;
Ok(NamedRegex {
regex,
name: name.to_string(),
})
let _ = Regex::new(v).map_err(|err| format!("{}", err))?;
Ok((
name.to_string(),
v.to_string(),
))
}
fn parse_log_level(s: &str) -> Result<log::Level, String> {

View file

@ -22,21 +22,21 @@
streams: {
s1: {
cmd: ['sh', '-c', "seq 20 | tr ' ' '\n' | while read i; do echo found $((i % 5)); sleep 0.3; done"],
cmd: ['sh', '-c', "seq 20 | tr ' ' '\n' | while read i; do echo found $((i % 5)); sleep 3; done"],
filters: {
f1: {
regex: [
'^found <num>$',
],
retry: 2,
retryperiod: '5s',
retryperiod: '60s',
actions: {
damn: {
cmd: ['notify-send', 'first stream', 'ban <num>'],
},
undamn: {
cmd: ['notify-send', 'first stream', 'unban <num>'],
after: '6s',
after: '20s',
onexit: true,
},
},