From 2e00092c18a08060a4eddd3dd60fb68967720fa4 Mon Sep 17 00:00:00 2001 From: ppom Date: Sun, 20 Oct 2024 12:00:00 +0200 Subject: [PATCH] WIP reimplement socket part I think I'm stumbling on this rust compiler bug: https://github.com/rust-lang/rust/issues/110338 --- rust/src/concepts/action.rs | 3 + rust/src/concepts/filter.rs | 4 + rust/src/daemon/action.rs | 17 +++- rust/src/daemon/filter.rs | 40 +++++++- rust/src/daemon/mod.rs | 78 +++++++++----- rust/src/daemon/socket.rs | 196 ++++++++++++++---------------------- rust/src/daemon/stream.rs | 19 ++-- rust/src/lib.rs | 4 +- 8 files changed, 203 insertions(+), 158 deletions(-) diff --git a/rust/src/concepts/action.rs b/rust/src/concepts/action.rs index 0d5c9b4..d4a707f 100644 --- a/rust/src/concepts/action.rs +++ b/rust/src/concepts/action.rs @@ -46,6 +46,9 @@ impl ActionFilter for Action { } impl Action { + pub fn name(&self) -> &str { + &self.name + } pub fn after_duration(&self) -> Option { self.after_duration } diff --git a/rust/src/concepts/filter.rs b/rust/src/concepts/filter.rs index ec6c2ac..5449de3 100644 --- a/rust/src/concepts/filter.rs +++ b/rust/src/concepts/filter.rs @@ -66,6 +66,10 @@ impl Filter { } } + pub fn name(&self) -> &str { + &self.name + } + pub fn retry(&self) -> Option { self.retry } diff --git a/rust/src/daemon/action.rs b/rust/src/daemon/action.rs index b812a2f..7a94982 100644 --- a/rust/src/daemon/action.rs +++ b/rust/src/daemon/action.rs @@ -28,8 +28,7 @@ impl State { self.ordered_times.remove(&t).is_some() } - fn clear_past_times(&mut self, after: Option) { - let now = Local::now(); + fn clear_past_times(&mut self, now: Time, after: Option) { let after = after.unwrap_or_default(); while self .ordered_times @@ -50,6 +49,10 @@ pub struct ActionManager { } impl ActionManager { + pub fn action(&self) -> &'static Action { + self.action + } + pub fn new( action: &'static Action, pending: BTreeMap>, @@ -76,7 +79,7 @@ impl ActionManager { } else { { let mut state = self.state.lock().unwrap(); - state.clear_past_times(self.action.after_duration()); + state.clear_past_times(t, self.action.after_duration()); state.add_match(&m, exec_t); } let this = self.clone(); @@ -91,6 +94,14 @@ impl ActionManager { } } + pub fn to_readable_vec(&self, match_: &Match) -> Option> { + self.state.lock().unwrap().pending.get(match_).map(|set| { + set.iter() + .map(|time| time.to_rfc3339().chars().take(19).collect()) + .collect() + }) + } + fn exec_now(&self, m: Match) { let semaphore = self.exec_limit.clone(); let action = self.action; diff --git a/rust/src/daemon/filter.rs b/rust/src/daemon/filter.rs index 4f76e34..da913db 100644 --- a/rust/src/daemon/filter.rs +++ b/rust/src/daemon/filter.rs @@ -4,9 +4,10 @@ use std::{ }; use chrono::Local; +use regex::Regex; use tokio::sync::{mpsc, Semaphore}; -use crate::concepts::{Filter, LogEntry, Match, Time, MFT}; +use crate::concepts::{ActionFilter, Filter, LogEntry, Match, Pattern, PatternStatus, Time, MFT}; use super::{action::ActionManager, database::DatabaseManagerInput}; @@ -94,6 +95,43 @@ impl FilterManager { .for_each(|manager| manager.quit()); } + pub fn to_pattern_status_map( + &self, + patterns: &BTreeMap, Regex>, + ) -> BTreeMap { + self.matches + .iter() + // TODO match filtering + .filter(|(match_, _)| { + match_ + .iter() + .zip(self.filter.patterns()) + .filter_map(|(a_match, pattern)| { + patterns.get(pattern.as_ref()).map(|regex| (a_match, regex)) + }) + .all(|(a_match, regex)| regex.is_match(a_match)) + }) + .map(|(match_, times)| { + let actions = + self.action_managers + .iter() + .fold(BTreeMap::default(), |mut acc, manager| { + if let Some(times) = manager.to_readable_vec(match_) { + acc.insert(manager.action().name().to_owned(), times); + } + acc + }); + ( + match_.join(" "), + PatternStatus { + matches: times.len(), + actions, + }, + ) + }) + .collect() + } + fn add_match(&mut self, m: &Match, t: Time) { self.matches.entry(m.clone()).or_default().insert(t); self.ordered_times.insert(t, m.clone()); diff --git a/rust/src/daemon/mod.rs b/rust/src/daemon/mod.rs index bb35dbf..9ab48bd 100644 --- a/rust/src/daemon/mod.rs +++ b/rust/src/daemon/mod.rs @@ -1,28 +1,48 @@ -use std::{collections::BTreeMap, error::Error, path::PathBuf, sync::Arc}; +use std::{ + collections::BTreeMap, + error::Error, + path::PathBuf, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; +use socket::socket_manager; use tokio::{ process::Child, select, signal::unix::{signal, SignalKind}, - sync::{mpsc, oneshot, watch, Semaphore}, + sync::{broadcast, mpsc, oneshot, Mutex, Semaphore}, }; use tracing::info; -use crate::concepts::Config; +use crate::concepts::{Config, Filter, Stream}; use database::database_manager; use filter::FilterManager; use stream::stream_manager; mod database; -// mod socket; +mod socket; mod action; mod filter; mod stream; +// type SharedState = BTreeMap<&'static Stream, Arc>>>; +struct SharedState { + pub s: BTreeMap<&'static Stream, Arc>>>, +} + +#[allow(unsafe_code)] +// It's actually safe. It's a bug in rust that shadows the 'static reference, which ensures the +// reference will always link to something +// https://github.com/rust-lang/rust/issues/96865 +unsafe impl Send for SharedState {} + pub async fn daemon( config_path: PathBuf, - _socket: PathBuf, + socket: PathBuf, ) -> Result<(), Box> { let config: &'static Config = Config::from_file(&config_path).map(|config| Box::leak(Box::new(config)))?; @@ -40,7 +60,7 @@ pub async fn daemon( let (log_tx, log_rx) = mpsc::channel(234560); // Shutdown channel - let (shutdown_tx, shutdown_rx) = watch::channel(false); + let (shutdown_tx, shutdown_rx) = broadcast::channel(1); // Semaphore limiting action execution concurrency let exec_limit = if config.concurrency() > 0 { @@ -76,28 +96,23 @@ pub async fn daemon( database_manager(config, log_rx, log2filter_tx) }; - let mut stream_filter_managers = BTreeMap::new(); + let mut stream_filter_managers = SharedState { s: BTreeMap::new() }; for (stream, filter_manager_handlers) in stream_filter_managers_handlers { let mut filter_managers = BTreeMap::new(); for (filter, filter_manager_handler) in filter_manager_handlers { filter_managers.insert(filter, filter_manager_handler.await.unwrap()); } - stream_filter_managers.insert(stream, filter_managers); + stream_filter_managers.s.insert(stream, Arc::new(Mutex::new(filter_managers))); } + let stream_filter_managers: Arc = Arc::new(stream_filter_managers); - // let socket_manager_task_handle = { - // let socket = socket.to_owned(); - // let shutdown_rx = shutdown_rx.clone(); - // tokio::spawn(async move { - // socket_manager(config, socket, socket2match_tx, socket2exec_tx, shutdown_rx).await - // }) - // }; - - for (stream, filter_managers) in stream_filter_managers { + for (stream, filter_managers) in stream_filter_managers.s.iter() { let (child_tx, child_rx) = oneshot::channel(); + let filter_managers = filter_managers.clone(); + let stream = *stream; stream_task_handles.push(tokio::spawn(async move { - stream_manager(stream, child_tx, filter_managers.into_values().collect()).await + stream_manager(stream, child_tx, filter_managers).await })); if let Ok(Some(child)) = child_rx.await { @@ -106,20 +121,33 @@ pub async fn daemon( } // Close streams when we receive a quit signal - handle_signals(stream_process_child_handles, shutdown_tx.clone())?; + let signal_received = Arc::new(AtomicBool::new(false)); + handle_signals( + stream_process_child_handles, + shutdown_tx.clone(), + signal_received.clone(), + )?; + + let socket_manager_task_handle = { + let socket = socket.to_owned(); + let stream_filter_managers: Arc = stream_filter_managers.clone(); + tokio::spawn(async move { + socket_manager(config, socket, stream_filter_managers, shutdown_rx).await + }) + }; // Wait for all streams to quit for task_handle in stream_task_handles { let _ = task_handle.await; } - let _ = shutdown_tx.send(true); + let _ = shutdown_tx.send(()); - // let _ = socket_manager_task_handle.await; + let _ = socket_manager_task_handle.await; let _ = database_manager_thread_handle.join(); let stop_ok = config.stop(); - if !*shutdown_rx.borrow() { + if !signal_received.load(Ordering::SeqCst) { Err("quitting because all streams finished".into()) } else if !stop_ok { Err("while executing stop command".into()) @@ -130,7 +158,8 @@ pub async fn daemon( fn handle_signals( stream_process_child_handles: Vec, - shutdown_tx: watch::Sender, + shutdown_tx: broadcast::Sender<()>, + signal_received: Arc, ) -> tokio::io::Result<()> { let mut sighup = signal(SignalKind::hangup())?; let mut sigint = signal(SignalKind::interrupt())?; @@ -141,7 +170,8 @@ fn handle_signals( _ = sigint.recv() => "SIGINT", _ = sigterm.recv() => "SIGTERM", }; - let _ = shutdown_tx.send(true); + let _ = shutdown_tx.send(()); + signal_received.store(true, Ordering::SeqCst); info!("received {signal}, closing streams..."); // Kill stream subprocesses for mut child_handle in stream_process_child_handles.into_iter() { diff --git a/rust/src/daemon/socket.rs b/rust/src/daemon/socket.rs index 4a500d0..82931e5 100644 --- a/rust/src/daemon/socket.rs +++ b/rust/src/daemon/socket.rs @@ -3,11 +3,7 @@ use std::{collections::BTreeMap, fs, io, path::PathBuf, process::exit, sync::Arc use bincode::Options; use futures::{SinkExt, StreamExt}; use regex::Regex; -use tokio::{ - join, - net::UnixListener, - sync::{mpsc, oneshot, watch}, -}; +use tokio::{net::UnixListener, sync::{broadcast, oneshot}}; use tokio_util::{ bytes::Bytes, codec::{Framed, LengthDelimitedCodec}, @@ -15,14 +11,18 @@ use tokio_util::{ use tracing::{error, warn}; use crate::{ - concepts::{ - ActionFilter, ClientRequest, ClientStatus, Config, DaemonResponse, Order, Pattern, - PatternStatus, - }, + concepts::{ActionFilter, ClientRequest, ClientStatus, Config, DaemonResponse, Order, Pattern}, utils::bincode_options, }; -use super::{execs::ExecsMap, matches::MatchesMap, statemap::FilterOptions}; +use super::SharedState; + +#[derive(Clone)] +pub struct FilterOptions { + pub stream_name: Option, + pub filter_name: Option, + pub patterns: BTreeMap, Regex>, +} macro_rules! err_str { ($expression:expr) => { @@ -57,122 +57,75 @@ fn open_socket(path: PathBuf) -> Result { async fn answer_order( config: &'static Config, - match_tx: &mpsc::Sender>, - exec_tx: &mpsc::Sender>, + shared_state: &SharedState, options: ClientRequest, ) -> Result { // Compute options - let filtering_options = { - let (stream_name, filter_name) = match options.stream_filter { - Some(sf) => match sf.split_once(".") { - Some((s, f)) => (Some(s.to_string()), Some(f.to_string())), - None => (Some(sf), None), - }, - None => (None, None), - }; - - // Compute the Vec<(pattern_name, String)> into a BTreeMap, Regex> - let patterns = options - .patterns - .iter() - .map(|(name, reg)| { - // lookup pattern in config.patterns - config - .patterns() - .iter() - // retrieve or Err - .find(|(pattern_name, _)| name == *pattern_name) - .ok_or_else(|| format!("pattern '{name}' doesn't exist")) - // compile Regex or Err - .and_then(|(_, pattern)| match Regex::new(reg) { - Ok(reg) => Ok((pattern.clone(), reg)), - Err(err) => Err(format!("pattern '{name}' regex doesn't compile: {err}")), - }) - }) - .collect::, Regex>, String>>()?; - - FilterOptions { - stream_name, - filter_name, - patterns, - } + let (stream_name, filter_name) = match options.stream_filter { + Some(sf) => match sf.split_once(".") { + Some((s, f)) => (Some(s.to_string()), Some(f.to_string())), + None => (Some(sf), None), + }, + None => (None, None), }; - // ask for matches clone - let filtering_options2 = filtering_options.clone(); - let matches = async move { - let (m_tx, m_rx) = oneshot::channel(); + // Compute the Vec<(pattern_name: String, regex: String)> into a BTreeMap, Regex> + let patterns = options + .patterns + .iter() + .map(|(name, reg)| { + // lookup pattern in config.patterns + config + .patterns() + .iter() + // retrieve or Err + .find(|(pattern_name, _)| name == *pattern_name) + .ok_or_else(|| format!("pattern '{name}' doesn't exist")) + // compile Regex or Err + .and_then(|(_, pattern)| match Regex::new(reg) { + Ok(reg) => Ok((pattern.clone(), reg)), + Err(err) => Err(format!("pattern '{name}' regex doesn't compile: {err}")), + }) + }) + .collect::, Regex>, String>>()?; - #[allow(clippy::unwrap_used)] // propagating panics is ok - match_tx - .send((options.order, filtering_options2, m_tx)) - .await - .unwrap(); - - #[allow(clippy::unwrap_used)] // propagating panics is ok - m_rx.await.unwrap() - }; - - // ask for execs clone - let execs = async move { - let (e_tx, e_rx) = oneshot::channel(); - - #[allow(clippy::unwrap_used)] // propagating panics is ok - exec_tx - .send((options.order, filtering_options, e_tx)) - .await - .unwrap(); - - #[allow(clippy::unwrap_used)] // propagating panics is ok - e_rx.await.unwrap() - }; - - let (matches, execs) = join!(matches, execs); - - // Transform matches and execs into a ClientStatus - let cs: ClientStatus = matches - .into_iter() - .fold(BTreeMap::new(), |mut acc, (object, map)| { - let (stream, filter, _) = object.full_name(); - acc.entry(stream.into()) - .or_default() - .entry(filter.into()) - .or_default() - .extend(map.into_iter().map(|(match_, times)| { + // TODO directly call flush function here? + let cs: ClientStatus = futures::stream::iter(shared_state.s.iter()) + // stream filtering + .filter(|(stream, _)| async { + stream_name.is_none() + || stream_name + .clone() + .is_some_and(|name| name == stream.name()) + }) + .fold(BTreeMap::new(), |mut acc, (stream, filter_manager)| async { + let filter_manager = filter_manager.lock().await; + let inner_map = filter_manager + .iter() + // filter filtering + .filter(|(filter, _)| { + filter_name.is_none() + || filter_name + .clone() + .is_some_and(|name| name == filter.name()) + }) + // pattern filtering + .filter(|(filter, _)| { + patterns + .iter() + .all(|(pattern, _)| filter.patterns().get(pattern).is_some()) + }) + .map(|(filter, manager)| { ( - match_.join(" "), - PatternStatus { - matches: times.len(), - ..Default::default() - }, + filter.name().to_owned(), + manager.to_pattern_status_map(&patterns), ) - })); + }) + .collect(); + acc.insert(stream.name().to_owned(), inner_map); acc - }); - - let cs = execs.into_iter().fold(cs, |mut acc, (object, map)| { - let (stream, filter, action) = object.full_name(); - let inner_map = acc - .entry(stream.into()) - .or_default() - .entry(filter.into()) - .or_default(); - - map.into_iter().for_each(|(match_, times)| { - inner_map - .entry(match_.join(" ")) - .or_default() - .actions - .insert( - action.to_string(), - times - .into_iter() - .map(|time| time.to_rfc3339().chars().take(19).collect()) - .collect(), - ); - }); - acc - }); + }) + .await; Ok(cs) } @@ -194,9 +147,8 @@ pub type SocketOrder = (Order, FilterOptions, oneshot::Sender); pub async fn socket_manager( config: &'static Config, socket: PathBuf, - match_tx: mpsc::Sender>, - exec_tx: mpsc::Sender>, - mut stop: watch::Receiver, + shared_state: Arc, + mut stop: broadcast::Receiver<()>, ) { let listener = match open_socket(socket.clone()) { Ok(l) => l, @@ -209,7 +161,7 @@ pub async fn socket_manager( let bin = bincode_options(); loop { tokio::select! { - _ = stop.changed() => break, + _ = stop.recv() => break, try_conn = listener.accept() => { match try_conn { Ok((conn, _)) => { @@ -228,7 +180,7 @@ pub async fn socket_manager( bin.deserialize(&encoded_request) ); // Process - let response = match answer_order(config, &match_tx, &exec_tx, request).await { + let response = match answer_order(config, &shared_state, request).await { Ok(res) => DaemonResponse::Order(res), Err(err) => DaemonResponse::Err(err), }; diff --git a/rust/src/daemon/stream.rs b/rust/src/daemon/stream.rs index eba3ad7..b0f31e9 100644 --- a/rust/src/daemon/stream.rs +++ b/rust/src/daemon/stream.rs @@ -1,18 +1,21 @@ -use std::process::Stdio; +use std::{collections::BTreeMap, process::Stdio, sync::Arc}; use tokio::{ io::{AsyncBufReadExt, BufReader}, process::{Child, Command}, - sync::oneshot, + sync::{oneshot, Mutex}, }; use tracing::{error, info}; -use crate::{concepts::Stream, daemon::filter::FilterManager}; +use crate::{ + concepts::{Filter, Stream}, + daemon::filter::FilterManager, +}; pub async fn stream_manager( stream: &'static Stream, child_tx: oneshot::Sender>, - mut filter_managers: Vec, + filter_managers: Arc>>, ) { info!("{}: start {:?}", stream.name(), stream.cmd()); let mut child = match Command::new(&stream.cmd()[0]) @@ -43,7 +46,9 @@ pub async fn stream_manager( Ok(Some(line)) => { futures::future::join_all( filter_managers - .iter_mut() + .lock() + .await + .values_mut() .map(|manager| manager.handle_line(&line)), ) .await; @@ -64,6 +69,8 @@ pub async fn stream_manager( } filter_managers - .iter_mut() + .lock() + .await + .values_mut() .for_each(|manager| manager.quit()); } diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 961a40e..012b4d3 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -2,10 +2,10 @@ clippy::panic, clippy::todo, clippy::unimplemented, - clippy::unwrap_used + clippy::unwrap_used, + unsafe_code )] #![allow(clippy::upper_case_acronyms, clippy::mutable_key_type)] -#![forbid(unsafe_code)] pub mod client; pub mod concepts;