From aca19fea8f2099a8c8e8a0105970554c6542843f Mon Sep 17 00:00:00 2001 From: ppom Date: Mon, 21 Oct 2024 12:00:00 +0200 Subject: [PATCH] fix all async & clippy issues - use hasmap instead of btreemap https://github.com/rust-lang/rust/issues/64552 - use async iterators as few as possible - move first rotate_db into its own thread to be out of the runtime, so that we can use blocking_send - send flush to database --- rust/src/concepts/filter.rs | 7 +++++ rust/src/concepts/stream.rs | 7 ++++- rust/src/daemon/action.rs | 9 ++++-- rust/src/daemon/database/mod.rs | 22 +++++++------- rust/src/daemon/filter.rs | 34 ++++++++++++++++------ rust/src/daemon/mod.rs | 16 +++++------ rust/src/daemon/socket.rs | 51 +++++++++++++++++++-------------- rust/src/daemon/stream.rs | 4 +-- rust/tests/simple.rs | 4 ++- 9 files changed, 98 insertions(+), 56 deletions(-) diff --git a/rust/src/concepts/filter.rs b/rust/src/concepts/filter.rs index ece5734..d895b53 100644 --- a/rust/src/concepts/filter.rs +++ b/rust/src/concepts/filter.rs @@ -2,6 +2,7 @@ use std::{ cmp::Ordering, collections::{BTreeMap, BTreeSet}, fmt::Display, + hash::Hash, sync::Arc, }; @@ -253,6 +254,12 @@ impl PartialOrd for Filter { Some(self.cmp(other)) } } +impl Hash for Filter { + fn hash(&self, state: &mut H) { + self.stream_name.hash(state); + self.name.hash(state); + } +} #[allow(clippy::unwrap_used)] #[cfg(test)] diff --git a/rust/src/concepts/stream.rs b/rust/src/concepts/stream.rs index ddef48d..377968d 100644 --- a/rust/src/concepts/stream.rs +++ b/rust/src/concepts/stream.rs @@ -1,4 +1,4 @@ -use std::{cmp::Ordering, collections::BTreeMap}; +use std::{cmp::Ordering, collections::BTreeMap, hash::Hash}; use serde::Deserialize; @@ -81,6 +81,11 @@ impl PartialOrd for Stream { Some(self.cmp(other)) } } +impl Hash for Stream { + fn hash(&self, state: &mut H) { + self.name.hash(state); + } +} #[cfg(test)] pub mod tests { diff --git a/rust/src/daemon/action.rs b/rust/src/daemon/action.rs index bd0c73f..44742e8 100644 --- a/rust/src/daemon/action.rs +++ b/rust/src/daemon/action.rs @@ -35,6 +35,7 @@ impl State { .first_key_value() .is_some_and(|(k, _)| *k + after < now) { + #[allow(clippy::unwrap_used)] // we just checked in the condition that first is_some let (_, m) = self.ordered_times.pop_first().unwrap(); self.pending.remove(&m); } @@ -78,6 +79,7 @@ impl ActionManager { self.exec_now(m); } else { { + #[allow(clippy::unwrap_used)] // propagating panics is ok let mut state = self.state.lock().unwrap(); state.clear_past_times(t, self.action.after_duration()); state.add_match(&m, exec_t); @@ -86,6 +88,7 @@ impl ActionManager { tokio::spawn(async move { let dur = (exec_t - now).to_std().expect("Duration is bigger than what's supported. Did you put an enormous after duration?"); tokio::time::sleep(dur).await; + #[allow(clippy::unwrap_used)] // propagating panics is ok let mut state = this.state.lock().unwrap(); if state.remove(m.clone(), t) { this.exec_now(m); @@ -98,7 +101,8 @@ impl ActionManager { &self, order: Order, is_match: F, - ) -> BTreeMap> { + ) -> BTreeMap, Vec> { + #[allow(clippy::unwrap_used)] // propagating panics is ok let mut state = self.state.lock().unwrap(); state .pending @@ -118,7 +122,7 @@ impl ActionManager { time.to_rfc3339().chars().take(19).collect() }) .collect(); - acc.insert(match_.join(" "), times); + acc.insert(match_, times); acc }) } @@ -152,6 +156,7 @@ impl ActionManager { pub fn quit(&mut self) { if self.action.on_exit() { + #[allow(clippy::unwrap_used)] // propagating panics is ok let mut state = self.state.lock().unwrap(); for (m, times) in &state.pending { for _ in times { diff --git a/rust/src/daemon/database/mod.rs b/rust/src/daemon/database/mod.rs index dcb8af4..9d2919c 100644 --- a/rust/src/daemon/database/mod.rs +++ b/rust/src/daemon/database/mod.rs @@ -64,17 +64,17 @@ macro_rules! flush_or_die { pub fn database_manager( config: &'static Config, mut log_rx: mpsc::Receiver, - matches_tx: BTreeMap<&Filter, mpsc::Sender>, + matches_tx: BTreeMap<&'static Filter, mpsc::Sender>, ) -> thread::JoinHandle<()> { - let (mut log_db, mut flush_db) = match rotate_db(config, Some(matches_tx)) { - Ok(dbs) => dbs, - Err(err) => { - error!("while rotating databases on start: {}", err); - exit(1); - } - }; - thread::spawn(move || { + let (mut log_db, mut flush_db) = match rotate_db(config, Some(matches_tx)) { + Ok(dbs) => dbs, + Err(err) => { + error!("while rotating databases on start: {}", err); + exit(1); + } + }; + let mut cpt = 0; while let Some(order) = log_rx.blocking_recv() { match order { @@ -177,7 +177,7 @@ fn _rotate_db( // Read flushes let mut flushes: BTreeMap<&'static Filter, BTreeMap> = BTreeMap::new(); - while let Some(flush_entry) = flush_read_db.next() { + for flush_entry in flush_read_db { match flush_entry { Ok(entry) => { let matches_map = flushes.entry(entry.f).or_default(); @@ -193,7 +193,7 @@ fn _rotate_db( let now = Local::now(); // Read matches - while let Some(log_entry) = log_read_db.next() { + for log_entry in log_read_db { match log_entry { Ok(mut entry) => { // Check if number of patterns is in sync diff --git a/rust/src/daemon/filter.rs b/rust/src/daemon/filter.rs index 41f4a7a..136f444 100644 --- a/rust/src/daemon/filter.rs +++ b/rust/src/daemon/filter.rs @@ -95,7 +95,7 @@ impl FilterManager { .for_each(|manager| manager.quit()); } - pub fn handle_order( + pub async fn handle_order( &mut self, patterns: &BTreeMap, Regex>, order: Order, @@ -110,18 +110,17 @@ impl FilterManager { .all(|(a_match, regex)| regex.is_match(a_match)) }; - let cs = self - .matches - .clone() - .iter() + let matches = self.matches.clone(); + let cs: BTreeMap<_, _> = matches + .into_iter() // match filtering .filter(|(match_, _)| is_match(match_)) .map(|(match_, times)| { if let Order::Flush = order { - self.remove_match(match_); + self.remove_match(&match_); } ( - match_.join(" "), + match_, PatternStatus { matches: times.len(), ..Default::default() @@ -130,7 +129,7 @@ impl FilterManager { }) .collect(); - self.action_managers.iter().fold(cs, |mut acc, manager| { + let cs = self.action_managers.iter().fold(cs, |mut acc, manager| { for (match_, times) in manager.handle_order(order, is_match) { let pattern_status = acc.entry(match_).or_default(); pattern_status @@ -138,7 +137,23 @@ impl FilterManager { .insert(manager.action().to_string(), times); } acc - }) + }); + + let now = Local::now(); + for match_ in cs.keys() { + #[allow(clippy::unwrap_used)] // propagating panics is ok + self.log_tx + .send(DatabaseManagerInput::Flush(LogEntry { + exec: false, + m: match_.to_vec(), + f: self.filter, + t: now, + })) + .await + .unwrap() + } + + cs.into_iter().map(|(k, v)| (k.join(" "), v)).collect() } fn add_match(&mut self, m: &Match, t: Time) { @@ -162,6 +177,7 @@ impl FilterManager { .first_key_value() .is_some_and(|(k, _)| *k + retry_duration < now) { + #[allow(clippy::unwrap_used)] // we just checked in the condition that first is_some let (_, m) = self.ordered_times.pop_first().unwrap(); self.matches.remove(&m); } diff --git a/rust/src/daemon/mod.rs b/rust/src/daemon/mod.rs index 2e06164..7d6adbc 100644 --- a/rust/src/daemon/mod.rs +++ b/rust/src/daemon/mod.rs @@ -1,5 +1,5 @@ use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashMap}, error::Error, path::PathBuf, sync::{ @@ -8,7 +8,6 @@ use std::{ }, }; -use socket::socket_manager; use tokio::{ process::Child, select, @@ -20,6 +19,7 @@ use tracing::info; use crate::concepts::{Config, Filter, Stream}; use database::database_manager; use filter::FilterManager; +use socket::socket_manager; use stream::stream_manager; mod database; @@ -31,7 +31,7 @@ mod stream; // type SharedState = BTreeMap<&'static Stream, Arc>>>; struct SharedState { - pub s: BTreeMap<&'static Stream, Arc>>>, + pub s: HashMap<&'static Stream, Arc>>>, } // #[allow(unsafe_code)] @@ -66,10 +66,10 @@ pub async fn daemon( }; // Filter managers - let mut stream_filter_managers_handlers = BTreeMap::new(); + let mut stream_filter_managers_handlers = HashMap::new(); let mut log2filter_tx = BTreeMap::new(); for stream in config.streams().values() { - let mut filter_managers_handlers = BTreeMap::new(); + let mut filter_managers_handlers = HashMap::new(); for filter in stream.filters().values() { let manager = FilterManager::new( filter, @@ -92,10 +92,11 @@ pub async fn daemon( database_manager(config, log_rx, log2filter_tx) }; - let mut stream_filter_managers = SharedState { s: BTreeMap::new() }; + let mut stream_filter_managers = SharedState { s: HashMap::new() }; for (stream, filter_manager_handlers) in stream_filter_managers_handlers { - let mut filter_managers = BTreeMap::new(); + let mut filter_managers = HashMap::new(); for (filter, filter_manager_handler) in filter_manager_handlers { + #[allow(clippy::unwrap_used)] // propagating panics is ok filter_managers.insert(filter, filter_manager_handler.await.unwrap()); } stream_filter_managers @@ -128,7 +129,6 @@ pub async fn daemon( let socket_manager_task_handle = { let socket = socket.to_owned(); - let stream_filter_managers = stream_filter_managers.clone(); tokio::spawn(async move { socket_manager(config, socket, stream_filter_managers, shutdown_rx).await }) diff --git a/rust/src/daemon/socket.rs b/rust/src/daemon/socket.rs index f0973ef..269b38e 100644 --- a/rust/src/daemon/socket.rs +++ b/rust/src/daemon/socket.rs @@ -50,7 +50,7 @@ fn open_socket(path: PathBuf) -> Result { async fn answer_order( config: &'static Config, - shared_state: &SharedState, + shared_state: &Arc, options: ClientRequest, ) -> Result { // Compute options @@ -82,17 +82,22 @@ async fn answer_order( }) .collect::, Regex>, String>>()?; - let cs: ClientStatus = futures::stream::iter(shared_state.s.iter()) - // stream filtering - .filter(|(stream, _)| async { - stream_name.is_none() - || stream_name - .clone() - .is_some_and(|name| name == stream.name()) - }) - .fold(BTreeMap::new(), |mut acc, (stream, filter_manager)| async { - let mut filter_manager = filter_manager.lock().await; - let inner_map = filter_manager + let cs: ClientStatus = futures::stream::iter( + shared_state + .s + .iter() + // stream filtering + .filter(|(stream, _)| { + stream_name.is_none() + || stream_name + .clone() + .is_some_and(|name| name == stream.name()) + }), + ) + .fold(BTreeMap::new(), |mut acc, (stream, filter_manager)| async { + let mut filter_manager = filter_manager.lock().await; + let inner_map = futures::stream::iter( + filter_manager .iter_mut() // filter filtering .filter(|(filter, _)| { @@ -106,18 +111,20 @@ async fn answer_order( patterns .iter() .all(|(pattern, _)| filter.patterns().get(pattern).is_some()) - }) - .map(|(filter, manager)| { - ( - filter.name().to_owned(), - manager.handle_order(&patterns, options.order), - ) - }) - .collect(); - acc.insert(stream.name().to_owned(), inner_map); - acc + }), + ) + .then(|(filter, manager)| async { + ( + filter.name().to_owned(), + manager.handle_order(&patterns, options.order).await, + ) }) + .collect() .await; + acc.insert(stream.name().to_owned(), inner_map); + acc + }) + .await; Ok(cs) } diff --git a/rust/src/daemon/stream.rs b/rust/src/daemon/stream.rs index b0f31e9..e86ffaa 100644 --- a/rust/src/daemon/stream.rs +++ b/rust/src/daemon/stream.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, process::Stdio, sync::Arc}; +use std::{collections::HashMap, process::Stdio, sync::Arc}; use tokio::{ io::{AsyncBufReadExt, BufReader}, @@ -15,7 +15,7 @@ use crate::{ pub async fn stream_manager( stream: &'static Stream, child_tx: oneshot::Sender>, - filter_managers: Arc>>, + filter_managers: Arc>>, ) { info!("{}: start {:?}", stream.name(), stream.cmd()); let mut child = match Command::new(&stream.cmd()[0]) diff --git a/rust/tests/simple.rs b/rust/tests/simple.rs index 4834b62..7fb2370 100644 --- a/rust/tests/simple.rs +++ b/rust/tests/simple.rs @@ -146,7 +146,9 @@ async fn simple() { file_with_contents(out_path, ""); - assert!(daemon(config_path.into(), socket_path.into()).await.is_ok()); + assert!(daemon(config_path.into(), socket_path.into()) + .await + .is_err()); // 36 from DB // 12 from DB