From 72887f3af0e7eb8903b8523dc1aed4501deb3bd8 Mon Sep 17 00:00:00 2001 From: ppom Date: Mon, 14 Oct 2024 12:00:00 +0200 Subject: [PATCH] big refacto avoid channels remove matches_manager and execs_manager one stream_manager task handles filter_managers and action_managers --- rust/heavy-load.yml | 2 +- rust/src/concepts/filter.rs | 4 + rust/src/concepts/stream.rs | 93 +++++------------- rust/src/daemon/action.rs | 133 ++++++++++++++++++++++++++ rust/src/daemon/database/mod.rs | 32 +++---- rust/src/daemon/execs.rs | 163 -------------------------------- rust/src/daemon/filter.rs | 126 ++++++++++++++++++++++++ rust/src/daemon/matches.rs | 142 ---------------------------- rust/src/daemon/mod.rs | 117 +++++++++++------------ rust/src/daemon/socket.rs | 6 +- rust/src/daemon/statemap.rs | 122 ------------------------ rust/src/daemon/stream.rs | 69 ++++++++++++++ rust/src/main.rs | 50 +++++----- 13 files changed, 460 insertions(+), 599 deletions(-) create mode 100644 rust/src/daemon/action.rs delete mode 100644 rust/src/daemon/execs.rs create mode 100644 rust/src/daemon/filter.rs delete mode 100644 rust/src/daemon/matches.rs delete mode 100644 rust/src/daemon/statemap.rs create mode 100644 rust/src/daemon/stream.rs diff --git a/rust/heavy-load.yml b/rust/heavy-load.yml index d3cde2a..6af6dfa 100644 --- a/rust/heavy-load.yml +++ b/rust/heavy-load.yml @@ -56,7 +56,7 @@ streams: after: 1m onexit: false tailDown4: - cmd: [ 'sh', '-c', 'sleep 2; seq 1000100 | while read i; do echo found $i; done; sleep infinity' ] + cmd: [ 'sh', '-c', 'sleep 2; seq 1000100 | while read i; do echo found $i; done' ] filters: find: regex: diff --git a/rust/src/concepts/filter.rs b/rust/src/concepts/filter.rs index 9f784ca..ec6c2ac 100644 --- a/rust/src/concepts/filter.rs +++ b/rust/src/concepts/filter.rs @@ -74,6 +74,10 @@ impl Filter { self.retry_duration } + pub fn actions(&self) -> &BTreeMap { + &self.actions + } + pub fn setup( &mut self, stream_name: &str, diff --git a/rust/src/concepts/stream.rs b/rust/src/concepts/stream.rs index 491f1fd..ddef48d 100644 --- a/rust/src/concepts/stream.rs +++ b/rust/src/concepts/stream.rs @@ -1,15 +1,8 @@ -use std::{collections::BTreeMap, process::Stdio}; +use std::{cmp::Ordering, collections::BTreeMap}; -use chrono::Local; use serde::Deserialize; -use tokio::{ - io::{AsyncBufReadExt, BufReader}, - process::{Child, Command}, - sync::{mpsc, oneshot}, -}; -use tracing::{error, info}; -use super::{Filter, Patterns, MFT}; +use super::{Filter, Patterns}; #[derive(Clone, Debug, Deserialize)] #[serde(deny_unknown_fields)] @@ -30,6 +23,14 @@ impl Stream { self.filters.get(filter_name) } + pub fn name(&self) -> &str { + &self.name + } + + pub fn cmd(&self) -> &Vec { + &self.cmd + } + pub fn setup(&mut self, name: &str, patterns: &Patterns) -> Result<(), String> { self._setup(name, patterns) .map_err(|msg| format!("stream {}: {}", name, msg)) @@ -62,66 +63,22 @@ impl Stream { Ok(()) } +} - pub async fn manager( - &'static self, - child_tx: oneshot::Sender>, - match_tx: mpsc::Sender, - ) { - info!("{}: start {:?}", self.name, self.cmd); - let mut child = match Command::new(&self.cmd[0]) - .args(&self.cmd[1..]) - .stdin(Stdio::null()) - .stderr(Stdio::null()) - .stdout(Stdio::piped()) - .spawn() - { - Ok(child) => child, - Err(err) => { - error!("could not execute stream {} cmd: {}", self.name, err); - let _ = child_tx.send(None); - return; - } - }; - - // keep stdout before sending/moving child to the main thread - #[allow(clippy::unwrap_used)] - // we know there is an stdout because we asked for Stdio::piped() - let mut lines = BufReader::new(child.stdout.take().unwrap()).lines(); - - // let main handle the child process - let _ = child_tx.send(Some(child)); - - loop { - match lines.next_line().await { - Ok(Some(line)) => { - for filter in self.filters.values() { - if let Some(match_) = filter.get_match(&line) { - #[allow(clippy::unwrap_used)] // propagating panics is ok - match_tx - .send(MFT { - m: match_, - o: filter, - t: Local::now(), - }) - .await - .unwrap(); - } - } - } - Ok(None) => { - error!("stream {} exited: its command returned.", self.name); - break; - } - Err(err) => { - error!( - "impossible to read output from stream {}: {}", - self.name, err - ); - break; - } - } - } +impl PartialEq for Stream { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + } +} +impl Eq for Stream {} +impl Ord for Stream { + fn cmp(&self, other: &Self) -> Ordering { + self.name.cmp(&other.name) + } +} +impl PartialOrd for Stream { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) } } diff --git a/rust/src/daemon/action.rs b/rust/src/daemon/action.rs new file mode 100644 index 0000000..b812a2f --- /dev/null +++ b/rust/src/daemon/action.rs @@ -0,0 +1,133 @@ +use std::{ + collections::{BTreeMap, BTreeSet}, + process::Stdio, + sync::{Arc, Mutex}, +}; + +use chrono::{Local, TimeDelta}; +use tokio::sync::Semaphore; +use tracing::{error, info}; + +use crate::concepts::{Action, Match, Time}; + +struct State { + pending: BTreeMap>, + ordered_times: BTreeMap, +} + +impl State { + fn add_match(&mut self, m: &Match, t: Time) { + self.pending.entry(m.clone()).or_default().insert(t); + self.ordered_times.insert(t, m.clone()); + } + + fn remove(&mut self, m: Match, t: Time) -> bool { + self.pending.entry(m).and_modify(|times| { + times.remove(&t); + }); + self.ordered_times.remove(&t).is_some() + } + + fn clear_past_times(&mut self, after: Option) { + let now = Local::now(); + let after = after.unwrap_or_default(); + while self + .ordered_times + .first_key_value() + .is_some_and(|(k, _)| *k + after < now) + { + let (_, m) = self.ordered_times.pop_first().unwrap(); + self.pending.remove(&m); + } + } +} + +#[derive(Clone)] +pub struct ActionManager { + action: &'static Action, + exec_limit: Option>, + state: Arc>, +} + +impl ActionManager { + pub fn new( + action: &'static Action, + pending: BTreeMap>, + exec_limit: Option>, + ) -> Self { + Self { + action, + exec_limit, + state: Arc::new(Mutex::new(State { + pending: pending.clone(), + ordered_times: pending + .into_iter() + .flat_map(|(m, times)| times.into_iter().map(move |time| (time, m.clone()))) + .collect(), + })), + } + } + + pub fn handle_exec(&mut self, m: Match, t: Time) { + let now = Local::now(); + let exec_t = t + self.action.after_duration().unwrap_or_default(); + if exec_t < now { + self.exec_now(m); + } else { + { + let mut state = self.state.lock().unwrap(); + state.clear_past_times(self.action.after_duration()); + state.add_match(&m, exec_t); + } + let this = self.clone(); + tokio::spawn(async move { + let dur = (exec_t - now).to_std().expect("Duration is bigger than what's supported. Did you put an enormous after duration?"); + tokio::time::sleep(dur).await; + let mut state = this.state.lock().unwrap(); + if state.remove(m.clone(), t) { + this.exec_now(m); + } + }); + } + } + + fn exec_now(&self, m: Match) { + let semaphore = self.exec_limit.clone(); + let action = self.action; + tokio::spawn(async move { + // Wait for semaphore's permission, if it is Some + let _permit = match semaphore { + #[allow(clippy::unwrap_used)] // We know the semaphore is not closed + Some(semaphore) => Some(semaphore.acquire_owned().await.unwrap()), + None => None, + }; + + // Construct command + let mut command = action.exec(&m); + + info!("{}: run [{:?}]", &action, command.as_std()); + if let Err(err) = command + .stdin(Stdio::null()) + .stderr(Stdio::null()) + .stdout(Stdio::piped()) + .status() + .await + { + error!("{}: run [{:?}], code {}", &action, command.as_std(), err); + } + }); + } + + pub fn quit(&mut self) { + if self.action.on_exit() { + let mut state = self.state.lock().unwrap(); + for (m, times) in &state.pending { + for _ in times { + self.exec_now(m.clone()); + } + } + state.pending.clear(); + state.ordered_times.clear(); + } + } +} diff --git a/rust/src/daemon/database/mod.rs b/rust/src/daemon/database/mod.rs index 0f0dfd0..2bdb1f2 100644 --- a/rust/src/daemon/database/mod.rs +++ b/rust/src/daemon/database/mod.rs @@ -63,7 +63,7 @@ macro_rules! flush_or_die { pub async fn database_manager( config: &'static Config, mut log_rx: mpsc::Receiver, - matches_tx: mpsc::Sender, + matches_tx: BTreeMap<&Filter, mpsc::Sender>, ) -> task::JoinHandle<()> { let (mut log_db, mut flush_db) = match rotate_db(config, Some(matches_tx)).await { Ok(dbs) => dbs, @@ -82,6 +82,7 @@ pub async fn database_manager( write_or_die!(log_db, entry); cpt += 1; if cpt == MAX_WRITES { + info!("Rotating database..."); cpt = 0; flush_or_die!(log_db); flush_or_die!(flush_db); @@ -97,6 +98,7 @@ pub async fn database_manager( exit(1); } }; + info!("Rotated database"); } } }; @@ -108,17 +110,7 @@ pub async fn database_manager( async fn rotate_db( config: &'static Config, - matches_tx: Option>, -) -> Result<(WriteDB, WriteDB), DBError> { - info!("Rotating database..."); - let res = _rotate_db(config, &matches_tx).await; - info!("Rotated database"); - res -} - -async fn _rotate_db( - config: &'static Config, - matches_tx: &Option>, + matches_tx: Option>>, ) -> Result<(WriteDB, WriteDB), DBError> { // TODO asyncify this let mut log_read_db = match ReadDB::open(LOG_DB_NAME, config).await? { @@ -148,7 +140,7 @@ async fn _rotate_db( let mut log_write_db = WriteDB::create(LOG_DB_NEW_NAME, config).await; - __rotate_db( + _rotate_db( matches_tx, &mut log_read_db, &mut flush_read_db, @@ -174,8 +166,8 @@ async fn _rotate_db( Ok((log_write_db, flush_write_db)) } -async fn __rotate_db( - matches_tx: &Option>, +async fn _rotate_db( + matches_tx: Option>>, log_read_db: &mut ReadDB, flush_read_db: &mut ReadDB, log_write_db: &mut WriteDB, @@ -238,10 +230,12 @@ async fn __rotate_db( millisecond_disambiguation_counter += 1; } - if let Some(tx) = matches_tx { - debug!("DB sending match from DB: {:?}", entry.m); - #[allow(clippy::unwrap_used)] // propagating panics is ok - tx.send(entry.clone().into()).await.unwrap(); + if let Some(matches_tx) = &matches_tx { + if let Some(tx) = matches_tx.get(entry.f) { + debug!("DB sending match from DB: {:?}", entry.m); + #[allow(clippy::unwrap_used)] // propagating panics is ok + tx.send(entry.clone().into()).await.unwrap(); + } } write_or_die!(log_write_db, entry); diff --git a/rust/src/daemon/execs.rs b/rust/src/daemon/execs.rs deleted file mode 100644 index 3c4c489..0000000 --- a/rust/src/daemon/execs.rs +++ /dev/null @@ -1,163 +0,0 @@ -use std::{collections::BTreeMap, process::Stdio, sync::Arc}; - -use chrono::Local; -use tokio::{ - sync::{mpsc, watch, Semaphore}, - time, -}; -use tracing::{error, info}; - -use crate::concepts::{Action, ActionFilter, Config, LogEntry, Order, MAT}; - -use super::{ - database::DatabaseManagerInput, - socket::SocketOrder, - statemap::{StateMap, StateMapTrait}, -}; - -pub type ExecsMap = StateMap; - -pub async fn execs_manager( - config: &'static Config, - mut exec_rx: mpsc::Receiver, - mut socket_order_rx: mpsc::Receiver>, - log_tx: mpsc::Sender, - mut stop: watch::Receiver, -) { - // FIXME replace with TryStreamExt::try_for_each_concurrent? - let semaphore = if config.concurrency() > 0 { - Some(Arc::new(Semaphore::new(config.concurrency()))) - } else { - None - }; - - let exec_now = |mat: MAT| { - let semaphore = semaphore.clone(); - tokio::spawn(async move { - let action = mat.o; - - // Wait for semaphore's permission, if it is Some - let _permit = match semaphore { - #[allow(clippy::unwrap_used)] // We know the semaphore is not closed - Some(semaphore) => Some(semaphore.acquire_owned().await.unwrap()), - None => None, - }; - - // Construct command - let mut command = action.exec(&mat.m); - - info!("{}: run [{:?}]", &action, command.as_std()); - if let Err(err) = command - .stdin(Stdio::null()) - .stderr(Stdio::null()) - .stdout(Stdio::piped()) - .status() - .await - { - error!("{}: run [{:?}], code {}", &action, command.as_std(), err); - } - }); - }; - - let mut execs: ExecsMap = BTreeMap::new(); - - let (pendings_tx, mut pendings_rx) = mpsc::channel(1); - - loop { - tokio::select! { - _ = stop.changed() => break, - Some(mat) = exec_rx.recv() => { - let now = Local::now(); - if mat.t < now { - exec_now(mat); - } else { - execs.add(&mat); - { - let mat = mat.clone(); - let pendings_tx = pendings_tx.clone(); - let mut stop = stop.clone(); - tokio::spawn(async move { - let dur = (mat.t - now).to_std().expect("Duration is bigger than what's supported. Did you put an enormous after duration?"); - tokio::select! { - biased; - _ = stop.changed() => {} - _ = time::sleep(dur) => { - #[allow(clippy::unwrap_used)] // propagating panics is ok - pendings_tx - .send(mat.clone()) - .await - .unwrap(); - } - } - }); - } - } - } - Some(mat) = pendings_rx.recv() => { - if execs.rm(&mat) { - exec_now(mat); - } - } - Some((order, options, tx)) = socket_order_rx.recv() => { - let filtered = execs.filtered(options); - - if let Order::Flush = order { - let now = Local::now(); - // filter the state_map according to provided options - for (action, inner_map) in &filtered { - // get filter (required for LogEntry, FIXME optimize this) - let filter = { - let name = action.full_name(); - #[allow(clippy::unwrap_used)] - // We're pretty confident our action has a filter - config - .get_filter(&(name.0.to_string(), name.1.to_string())) - .unwrap() - }; - - for match_ in inner_map.keys() { - let mat = MAT { - m: match_.clone(), - o: action, - t: now, - }; - // delete them from state and execute them - if let Some(set) = execs.rm_times(&mat) { - for _ in set { - exec_now(mat.clone()); - } - } - #[allow(clippy::unwrap_used)] // propagating panics is ok - log_tx - .send(DatabaseManagerInput::Flush(LogEntry { - exec: false, - m: mat.m, - f: filter, - t: mat.t, - })) - .await - .unwrap(); - } - } - } - #[allow(clippy::unwrap_used)] // propagating panics is ok - tx.send(filtered).unwrap(); - } - else => break - } - } - - for (action, inner_map) in &mut execs { - if action.on_exit() { - for (match_, inner_set) in inner_map { - for _ in inner_set.iter() { - exec_now(MAT { - m: match_.clone(), - o: action, - t: Local::now(), - }); - } - } - } - } -} diff --git a/rust/src/daemon/filter.rs b/rust/src/daemon/filter.rs new file mode 100644 index 0000000..4f76e34 --- /dev/null +++ b/rust/src/daemon/filter.rs @@ -0,0 +1,126 @@ +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, +}; + +use chrono::Local; +use tokio::sync::{mpsc, Semaphore}; + +use crate::concepts::{Filter, LogEntry, Match, Time, MFT}; + +use super::{action::ActionManager, database::DatabaseManagerInput}; + +pub struct FilterManager { + filter: &'static Filter, + log_tx: mpsc::Sender, + action_managers: Vec, + matches: BTreeMap>, + ordered_times: BTreeMap, +} + +impl FilterManager { + pub fn new( + filter: &'static Filter, + matches: BTreeMap>, + exec_limit: Option>, + log_tx: mpsc::Sender, + ) -> Self { + Self { + filter, + log_tx, + action_managers: filter + .actions() + .values() + .map(|action| ActionManager::new(action, BTreeMap::default(), exec_limit.clone())) + .collect(), + matches: matches.clone(), + ordered_times: matches + .into_iter() + .flat_map(|(m, times)| times.into_iter().map(move |time| (time, m.clone()))) + .collect(), + } + } + + pub async fn handle_db_entries(mut self, mut match_rx: mpsc::Receiver) -> Self { + while let Some(mft) = match_rx.recv().await { + self.handle_match(mft.m, mft.t, false).await; + } + self + } + + pub async fn handle_line(&mut self, line: &str) { + if let Some(match_) = self.filter.get_match(line) { + let now = Local::now(); + self.handle_match(match_, now, true).await; + } + } + + pub async fn handle_match(&mut self, m: Match, t: Time, send_log: bool) { + self.clear_past_times(); + + let exec = match self.filter.retry() { + None => true, + Some(retry) => { + self.add_match(&m, t); + // Number of stored times for this match >= configured retry for this filter + self.get_times(&m) >= retry as usize + } + }; + + if exec { + self.remove_match(&m); + for manager in &mut self.action_managers { + manager.handle_exec(m.clone(), t); + } + } + + if send_log { + #[allow(clippy::unwrap_used)] // propagating panics is ok + self.log_tx + .send(DatabaseManagerInput::Log(LogEntry { + exec, + m, + f: self.filter, + t, + })) + .await + .unwrap(); + } + } + + pub fn quit(&mut self) { + self.action_managers + .iter_mut() + .for_each(|manager| manager.quit()); + } + + fn add_match(&mut self, m: &Match, t: Time) { + self.matches.entry(m.clone()).or_default().insert(t); + self.ordered_times.insert(t, m.clone()); + } + + fn remove_match(&mut self, m: &Match) { + if let Some(times) = self.matches.remove(m) { + for t in times { + self.ordered_times.remove(&t); + } + } + } + + fn clear_past_times(&mut self) { + let now = Local::now(); + let retry_duration = self.filter.retry_duration().unwrap_or_default(); + while self + .ordered_times + .first_key_value() + .is_some_and(|(k, _)| *k + retry_duration < now) + { + let (_, m) = self.ordered_times.pop_first().unwrap(); + self.matches.remove(&m); + } + } + + fn get_times(&self, m: &Match) -> usize { + self.matches.get(m).map(|v| v.len()).unwrap_or(0) + } +} diff --git a/rust/src/daemon/matches.rs b/rust/src/daemon/matches.rs deleted file mode 100644 index 85a5a55..0000000 --- a/rust/src/daemon/matches.rs +++ /dev/null @@ -1,142 +0,0 @@ -use std::collections::BTreeMap; - -use chrono::Local; -use tokio::{ - sync::{mpsc, watch}, - time, -}; - -use super::{ - database::DatabaseManagerInput, - socket::SocketOrder, - statemap::{StateMap, StateMapTrait}, -}; -use crate::concepts::{Filter, LogEntry, Order, MAT, MFT}; - -pub type MatchesMap = StateMap; - -pub async fn matches_manager( - mut match_rx: mpsc::Receiver, - mut startup_match_rx: mpsc::Receiver, - mut socket_order_rx: mpsc::Receiver>, - action_tx: mpsc::Sender, - log_tx: mpsc::Sender, - mut stop: watch::Receiver, -) { - let mut matches: MatchesMap = BTreeMap::new(); - - let (unmatches_tx, mut unmatches_rx) = mpsc::channel(1); - - while let Some(mft) = startup_match_rx.recv().await { - let _ = handle_match(&mut matches, mft.clone(), &unmatches_tx, &action_tx, &stop).await; - } - - loop { - tokio::select! { - _ = stop.changed() => break, - Some(mft) = match_rx.recv() => { - let exec = handle_match(&mut matches, mft.clone(), &unmatches_tx, &action_tx, &stop).await; - - #[allow(clippy::unwrap_used)] // propagating panics is ok - log_tx - .send(DatabaseManagerInput::Log(LogEntry { - exec, - m: mft.m, - f: mft.o, - t: mft.t, - })) - .await - .unwrap(); - } - Some(mft) = unmatches_rx.recv() => { - matches.rm(&mft); - } - Some((order, options, tx)) = socket_order_rx.recv() => { - // FIXME do not clone - let filtered = matches.clone().filtered(options); - - if let Order::Flush = order { - let now = Local::now(); - // filter the state_map according to provided options - for (filter, inner_map) in &filtered { - for match_ in inner_map.keys() { - let mft = MFT { - m: match_.clone(), - o: filter, - t: now, - }; - // delete them from state - matches.rm_times(&mft); - // send them to DB - #[allow(clippy::unwrap_used)] // propagating panics is ok - log_tx - .send(DatabaseManagerInput::Flush(LogEntry { - exec: false, - m: mft.m, - f: mft.o, - t: mft.t, - })) - .await - .unwrap(); - } - } - } - - #[allow(clippy::unwrap_used)] // propagating panics is ok - tx.send(filtered).unwrap(); - } - else => break - } - } -} - -async fn handle_match( - matches: &mut MatchesMap, - mft: MFT, - unmatches_tx: &mpsc::Sender, - action_tx: &mpsc::Sender, - stop: &watch::Receiver, -) -> bool { - // Store matches - let exec = match mft.o.retry() { - None => true, - Some(retry) => { - // Add new match - matches.add(&mft); - // Remove match when expired - { - let mft = mft.clone(); - let unmatches_tx = unmatches_tx.clone(); - let mut stop = stop.clone(); - tokio::spawn(async move { - #[allow(clippy::unwrap_used)] - // retry_duration is always Some() after filter's setup - let dur = (mft.t - Local::now() + mft.o.retry_duration().unwrap()) - .to_std() - .expect("Duration is bigger than what's supported. Did you put an enormous retry_duration?"); - tokio::select! { - biased; - _ = stop.changed() => {} - _ = time::sleep(dur) => { - #[allow(clippy::unwrap_used)] // propagating panics is ok - unmatches_tx.send(mft).await.unwrap(); - } - } - }); - } - - matches.get_times(&mft) >= retry as usize - } - }; - - // Executing actions - if exec { - // Delete matches only if storing them - if mft.o.retry().is_some() { - matches.rm_times(&mft); - } - mft.o.send_actions(&mft.m, mft.t, action_tx).await; - } - - exec -} diff --git a/rust/src/daemon/mod.rs b/rust/src/daemon/mod.rs index 6cb7174..db9779f 100644 --- a/rust/src/daemon/mod.rs +++ b/rust/src/daemon/mod.rs @@ -1,28 +1,28 @@ -use std::{error::Error, fs, path::PathBuf}; +use std::{collections::BTreeMap, error::Error, path::PathBuf, sync::Arc}; -use socket::socket_manager; use tokio::{ process::Child, select, signal::unix::{signal, SignalKind}, - sync::{mpsc, oneshot, watch}, + sync::{mpsc, oneshot, watch, Semaphore}, }; -use tracing::{error, info}; +use tracing::info; use crate::concepts::Config; use database::database_manager; -use execs::execs_manager; -use matches::matches_manager; +use filter::FilterManager; +use stream::stream_manager; mod database; -mod execs; -mod matches; -mod socket; -mod statemap; +// mod socket; + +mod action; +mod filter; +mod stream; pub async fn daemon( config_path: PathBuf, - socket: PathBuf, + _socket: PathBuf, ) -> Result<(), Box> { let config: &'static Config = Config::from_file(&config_path).map(|config| Box::leak(Box::new(config)))?; @@ -34,67 +34,76 @@ pub async fn daemon( let mut stream_process_child_handles = Vec::new(); let mut stream_task_handles = Vec::new(); - let (stream2match_tx, stream2match_rx) = mpsc::channel(123456); - let (database2match_tx, database2match_rx) = mpsc::channel(234560); + // let (socket2match_tx, socket2match_rx) = mpsc::channel(1); + // let (socket2exec_tx, socket2exec_rx) = mpsc::channel(1); - let (socket2match_tx, socket2match_rx) = mpsc::channel(1); - let (socket2exec_tx, socket2exec_rx) = mpsc::channel(1); - - let (matches2exec_tx, matches2exec_rx) = mpsc::channel(234560); let (log_tx, log_rx) = mpsc::channel(234560); // Shutdown channel let (shutdown_tx, shutdown_rx) = watch::channel(false); - let matches_manager_task_handle = { - let log_tx = log_tx.clone(); - let shutdown_rx = shutdown_rx.clone(); - tokio::spawn(async move { - matches_manager( - stream2match_rx, - database2match_rx, - socket2match_rx, - matches2exec_tx, - log_tx, - shutdown_rx, - ) - .await - }) + // Semaphore limiting action execution concurrency + let exec_limit = if config.concurrency() > 0 { + Some(Arc::new(Semaphore::new(config.concurrency()))) + } else { + None }; - let execs_manager_task_handle = { - let shutdown_rx = shutdown_rx.clone(); - tokio::spawn(async move { - execs_manager(config, matches2exec_rx, socket2exec_rx, log_tx, shutdown_rx).await - }) - }; + // Filter managers + let mut stream_filter_managers_handlers = BTreeMap::new(); + let mut log2filter_tx = BTreeMap::new(); + for stream in config.streams().values() { + let mut filter_managers_handlers = BTreeMap::new(); + for filter in stream.filters().values() { + let manager = FilterManager::new( + filter, + BTreeMap::default(), + exec_limit.clone(), + log_tx.clone(), + ); + let (tx, rx) = mpsc::channel(1); + let handle = tokio::spawn(async move { manager.handle_db_entries(rx).await }); + filter_managers_handlers.insert(filter, handle); + log2filter_tx.insert(filter, tx); + } + stream_filter_managers_handlers.insert(stream, filter_managers_handlers); + } + drop(log_tx); + drop(exec_limit); let database_manager_task_handle = { // The `task::spawn` is done in the function, after database rotation is finished - database_manager(config, log_rx, database2match_tx).await + database_manager(config, log_rx, log2filter_tx).await }; - let socket_manager_task_handle = { - let socket = socket.to_owned(); - let shutdown_rx = shutdown_rx.clone(); - tokio::spawn(async move { - socket_manager(config, socket, socket2match_tx, socket2exec_tx, shutdown_rx).await - }) - }; + let mut stream_filter_managers = BTreeMap::new(); + for (stream, filter_manager_handlers) in stream_filter_managers_handlers { + let mut filter_managers = BTreeMap::new(); + for (filter, filter_manager_handler) in filter_manager_handlers { + filter_managers.insert(filter, filter_manager_handler.await.unwrap()); + } + stream_filter_managers.insert(stream, filter_managers); + } - for stream in config.streams().values() { - let stream2match_tx = stream2match_tx.clone(); + // let socket_manager_task_handle = { + // let socket = socket.to_owned(); + // let shutdown_rx = shutdown_rx.clone(); + // tokio::spawn(async move { + // socket_manager(config, socket, socket2match_tx, socket2exec_tx, shutdown_rx).await + // }) + // }; + + for (stream, filter_managers) in stream_filter_managers { let (child_tx, child_rx) = oneshot::channel(); stream_task_handles.push(tokio::spawn(async move { - stream.manager(child_tx, stream2match_tx).await + stream_manager(stream, child_tx, filter_managers.into_values().collect()).await })); if let Ok(Some(child)) = child_rx.await { stream_process_child_handles.push(child); } } - drop(stream2match_tx); // Close streams when we receive a quit signal handle_signals(stream_process_child_handles, shutdown_tx.clone())?; @@ -105,19 +114,11 @@ pub async fn daemon( } let _ = shutdown_tx.send(true); - let _ = socket_manager_task_handle.await; - let _ = matches_manager_task_handle.await; - let _ = execs_manager_task_handle.await; + // let _ = socket_manager_task_handle.await; let _ = database_manager_task_handle.await; let stop_ok = config.stop(); - // not waiting for the socket_manager to finish, sorry - // TODO make it listen on shutdown_rx - if let Err(err) = fs::remove_file(socket) { - error!("failed to remove socket: {}", err); - } - if !*shutdown_rx.borrow() { Err("quitting because all streams finished".into()) } else if !stop_ok { diff --git a/rust/src/daemon/socket.rs b/rust/src/daemon/socket.rs index 3bc2fec..4a500d0 100644 --- a/rust/src/daemon/socket.rs +++ b/rust/src/daemon/socket.rs @@ -198,7 +198,7 @@ pub async fn socket_manager( exec_tx: mpsc::Sender>, mut stop: watch::Receiver, ) { - let listener = match open_socket(socket) { + let listener = match open_socket(socket.clone()) { Ok(l) => l, Err(err) => { error!("while creating communication socket: {err}"); @@ -245,4 +245,8 @@ pub async fn socket_manager( } } } + + if let Err(err) = fs::remove_file(socket) { + error!("failed to remove socket: {}", err); + } } diff --git a/rust/src/daemon/statemap.rs b/rust/src/daemon/statemap.rs deleted file mode 100644 index 0af8142..0000000 --- a/rust/src/daemon/statemap.rs +++ /dev/null @@ -1,122 +0,0 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - sync::Arc, -}; - -use regex::Regex; - -use crate::concepts::{ActionFilter, Match, Pattern, Time, MT}; - -#[derive(Clone, Debug)] -pub struct FilterOptions { - pub stream_name: Option, - pub filter_name: Option, - pub patterns: BTreeMap, Regex>, -} - -pub type StateMap = BTreeMap<&'static T, BTreeMap>>; - -// This trait is needed to permit to implement methods on an external type -pub trait StateMapTrait { - fn add(&mut self, mt: &MT); - fn rm(&mut self, mt: &MT) -> bool; - fn rm_times(&mut self, mt: &MT) -> Option>; - fn get_times(&self, mt: &MT) -> usize; - fn filtered(&self, filter_options: FilterOptions) -> Self; -} - -impl StateMapTrait for StateMap { - fn add(&mut self, mt: &MT) { - let inner_map = self.entry(mt.o).or_default(); - let inner_set = inner_map.entry(mt.m.clone()).or_default(); - inner_set.insert(mt.t); - } - - fn rm(&mut self, mt: &MT) -> bool { - let mut removed = false; - if let Some(inner_map) = self.get_mut(&mt.o) { - if let Some(inner_set) = inner_map.get_mut(&mt.m) { - inner_set.remove(&mt.t); - removed = true; - if inner_set.is_empty() { - inner_map.remove(&mt.m); - } - } - if inner_map.is_empty() { - self.remove(&mt.o); - } - } - removed - } - - fn rm_times(&mut self, mt: &MT) -> Option> { - let mut set = None; - if let Some(inner_map) = self.get_mut(&mt.o) { - set = inner_map.remove(&mt.m); - if inner_map.is_empty() { - self.remove(&mt.o); - } - } - set - } - - fn get_times(&self, mt: &MT) -> usize { - match self.get(&mt.o).and_then(|map| map.get(&mt.m)) { - Some(x) => x.len(), - None => 0, - } - } - - fn filtered(&self, filter_options: FilterOptions) -> Self { - let FilterOptions { - stream_name, - filter_name, - patterns, - } = filter_options; - self.iter() - // stream/filter filtering - .filter(|(object, _)| { - if let Some(stream_name) = &stream_name { - let full_name = object.full_name(); - let (s, f) = (full_name.0, full_name.1); - if *stream_name != s { - return false; - } - if let Some(filter_name) = &filter_name { - if *filter_name != f { - return false; - } - } - } - true - }) - // pattern filtering - .filter(|(object, _)| { - patterns - .iter() - .all(|(pattern, _)| object.patterns().get(pattern).is_some()) - }) - // match filtering - .filter_map(|(object, inner_map)| { - let map: BTreeMap> = inner_map - .iter() - .filter(|(match_, _)| { - match_ - .iter() - .zip(object.patterns()) - .filter_map(|(a_match, pattern)| { - patterns.get(pattern.as_ref()).map(|regex| (a_match, regex)) - }) - .all(|(a_match, regex)| regex.is_match(a_match)) - }) - .map(|(a, b)| (a.clone(), b.clone())) - .collect(); - if !map.is_empty() { - Some((*object, map)) - } else { - None - } - }) - .collect() - } -} diff --git a/rust/src/daemon/stream.rs b/rust/src/daemon/stream.rs new file mode 100644 index 0000000..eba3ad7 --- /dev/null +++ b/rust/src/daemon/stream.rs @@ -0,0 +1,69 @@ +use std::process::Stdio; + +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + process::{Child, Command}, + sync::oneshot, +}; +use tracing::{error, info}; + +use crate::{concepts::Stream, daemon::filter::FilterManager}; + +pub async fn stream_manager( + stream: &'static Stream, + child_tx: oneshot::Sender>, + mut filter_managers: Vec, +) { + info!("{}: start {:?}", stream.name(), stream.cmd()); + let mut child = match Command::new(&stream.cmd()[0]) + .args(&stream.cmd()[1..]) + .stdin(Stdio::null()) + .stderr(Stdio::null()) + .stdout(Stdio::piped()) + .spawn() + { + Ok(child) => child, + Err(err) => { + error!("could not execute stream {} cmd: {}", stream.name(), err); + let _ = child_tx.send(None); + return; + } + }; + + // keep stdout before sending/moving child to the main thread + #[allow(clippy::unwrap_used)] + // we know there is an stdout because we asked for Stdio::piped() + let mut lines = BufReader::new(child.stdout.take().unwrap()).lines(); + + // let main handle the child process + let _ = child_tx.send(Some(child)); + + loop { + match lines.next_line().await { + Ok(Some(line)) => { + futures::future::join_all( + filter_managers + .iter_mut() + .map(|manager| manager.handle_line(&line)), + ) + .await; + } + Ok(None) => { + error!("stream {} exited: its command returned.", stream.name()); + break; + } + Err(err) => { + error!( + "impossible to read output from stream {}: {}", + stream.name(), + err + ); + break; + } + } + } + + filter_managers + .iter_mut() + .for_each(|manager| manager.quit()); +} diff --git a/rust/src/main.rs b/rust/src/main.rs index 33c292c..4e80d8f 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -25,34 +25,34 @@ async fn main() { eprintln!("this error occurred."); })); - console_subscriber::init(); + // console_subscriber::init(); let cli = Cli::parse(); - // { - // // Set log level - // let level = if let Command::Start { - // loglevel, - // config: _, - // socket: _, - // } = cli.command - // { - // loglevel - // } else { - // Level::DEBUG - // }; - // if let Err(err) = tracing_subscriber::fmt::fmt() - // .without_time() - // .with_target(false) - // .with_ansi(std::io::stdout().is_terminal()) - // // .with_max_level(level) - // .with_max_level(Level::TRACE) - // .try_init() - // { - // eprintln!("ERROR could not initialize logging: {err}"); - // exit(1); - // } - // } + { + // Set log level + let level = if let Command::Start { + loglevel, + config: _, + socket: _, + } = cli.command + { + loglevel + } else { + Level::DEBUG + }; + if let Err(err) = tracing_subscriber::fmt::fmt() + .without_time() + .with_target(false) + .with_ansi(std::io::stdout().is_terminal()) + .with_max_level(level) + // .with_max_level(Level::TRACE) + .try_init() + { + eprintln!("ERROR could not initialize logging: {err}"); + exit(1); + } + } let result = match cli.command { Command::Start {