fix all async & clippy issues

- use hasmap instead of btreemap https://github.com/rust-lang/rust/issues/64552
- use async iterators as few as possible
- move first rotate_db into its own thread to be out of the runtime, so
  that we can use blocking_send
- send flush to database
This commit is contained in:
ppom 2024-10-21 12:00:00 +02:00
commit aca19fea8f
9 changed files with 98 additions and 56 deletions

View file

@ -2,6 +2,7 @@ use std::{
cmp::Ordering,
collections::{BTreeMap, BTreeSet},
fmt::Display,
hash::Hash,
sync::Arc,
};
@ -253,6 +254,12 @@ impl PartialOrd for Filter {
Some(self.cmp(other))
}
}
impl Hash for Filter {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.stream_name.hash(state);
self.name.hash(state);
}
}
#[allow(clippy::unwrap_used)]
#[cfg(test)]

View file

@ -1,4 +1,4 @@
use std::{cmp::Ordering, collections::BTreeMap};
use std::{cmp::Ordering, collections::BTreeMap, hash::Hash};
use serde::Deserialize;
@ -81,6 +81,11 @@ impl PartialOrd for Stream {
Some(self.cmp(other))
}
}
impl Hash for Stream {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.name.hash(state);
}
}
#[cfg(test)]
pub mod tests {

View file

@ -35,6 +35,7 @@ impl State {
.first_key_value()
.is_some_and(|(k, _)| *k + after < now)
{
#[allow(clippy::unwrap_used)] // we just checked in the condition that first is_some
let (_, m) = self.ordered_times.pop_first().unwrap();
self.pending.remove(&m);
}
@ -78,6 +79,7 @@ impl ActionManager {
self.exec_now(m);
} else {
{
#[allow(clippy::unwrap_used)] // propagating panics is ok
let mut state = self.state.lock().unwrap();
state.clear_past_times(t, self.action.after_duration());
state.add_match(&m, exec_t);
@ -86,6 +88,7 @@ impl ActionManager {
tokio::spawn(async move {
let dur = (exec_t - now).to_std().expect("Duration is bigger than what's supported. Did you put an enormous after duration?");
tokio::time::sleep(dur).await;
#[allow(clippy::unwrap_used)] // propagating panics is ok
let mut state = this.state.lock().unwrap();
if state.remove(m.clone(), t) {
this.exec_now(m);
@ -98,7 +101,8 @@ impl ActionManager {
&self,
order: Order,
is_match: F,
) -> BTreeMap<String, Vec<String>> {
) -> BTreeMap<Vec<String>, Vec<String>> {
#[allow(clippy::unwrap_used)] // propagating panics is ok
let mut state = self.state.lock().unwrap();
state
.pending
@ -118,7 +122,7 @@ impl ActionManager {
time.to_rfc3339().chars().take(19).collect()
})
.collect();
acc.insert(match_.join(" "), times);
acc.insert(match_, times);
acc
})
}
@ -152,6 +156,7 @@ impl ActionManager {
pub fn quit(&mut self) {
if self.action.on_exit() {
#[allow(clippy::unwrap_used)] // propagating panics is ok
let mut state = self.state.lock().unwrap();
for (m, times) in &state.pending {
for _ in times {

View file

@ -64,17 +64,17 @@ macro_rules! flush_or_die {
pub fn database_manager(
config: &'static Config,
mut log_rx: mpsc::Receiver<DatabaseManagerInput>,
matches_tx: BTreeMap<&Filter, mpsc::Sender<MFT>>,
matches_tx: BTreeMap<&'static Filter, mpsc::Sender<MFT>>,
) -> thread::JoinHandle<()> {
let (mut log_db, mut flush_db) = match rotate_db(config, Some(matches_tx)) {
Ok(dbs) => dbs,
Err(err) => {
error!("while rotating databases on start: {}", err);
exit(1);
}
};
thread::spawn(move || {
let (mut log_db, mut flush_db) = match rotate_db(config, Some(matches_tx)) {
Ok(dbs) => dbs,
Err(err) => {
error!("while rotating databases on start: {}", err);
exit(1);
}
};
let mut cpt = 0;
while let Some(order) = log_rx.blocking_recv() {
match order {
@ -177,7 +177,7 @@ fn _rotate_db(
// Read flushes
let mut flushes: BTreeMap<&'static Filter, BTreeMap<Match, Time>> = BTreeMap::new();
while let Some(flush_entry) = flush_read_db.next() {
for flush_entry in flush_read_db {
match flush_entry {
Ok(entry) => {
let matches_map = flushes.entry(entry.f).or_default();
@ -193,7 +193,7 @@ fn _rotate_db(
let now = Local::now();
// Read matches
while let Some(log_entry) = log_read_db.next() {
for log_entry in log_read_db {
match log_entry {
Ok(mut entry) => {
// Check if number of patterns is in sync

View file

@ -95,7 +95,7 @@ impl FilterManager {
.for_each(|manager| manager.quit());
}
pub fn handle_order(
pub async fn handle_order(
&mut self,
patterns: &BTreeMap<Arc<Pattern>, Regex>,
order: Order,
@ -110,18 +110,17 @@ impl FilterManager {
.all(|(a_match, regex)| regex.is_match(a_match))
};
let cs = self
.matches
.clone()
.iter()
let matches = self.matches.clone();
let cs: BTreeMap<_, _> = matches
.into_iter()
// match filtering
.filter(|(match_, _)| is_match(match_))
.map(|(match_, times)| {
if let Order::Flush = order {
self.remove_match(match_);
self.remove_match(&match_);
}
(
match_.join(" "),
match_,
PatternStatus {
matches: times.len(),
..Default::default()
@ -130,7 +129,7 @@ impl FilterManager {
})
.collect();
self.action_managers.iter().fold(cs, |mut acc, manager| {
let cs = self.action_managers.iter().fold(cs, |mut acc, manager| {
for (match_, times) in manager.handle_order(order, is_match) {
let pattern_status = acc.entry(match_).or_default();
pattern_status
@ -138,7 +137,23 @@ impl FilterManager {
.insert(manager.action().to_string(), times);
}
acc
})
});
let now = Local::now();
for match_ in cs.keys() {
#[allow(clippy::unwrap_used)] // propagating panics is ok
self.log_tx
.send(DatabaseManagerInput::Flush(LogEntry {
exec: false,
m: match_.to_vec(),
f: self.filter,
t: now,
}))
.await
.unwrap()
}
cs.into_iter().map(|(k, v)| (k.join(" "), v)).collect()
}
fn add_match(&mut self, m: &Match, t: Time) {
@ -162,6 +177,7 @@ impl FilterManager {
.first_key_value()
.is_some_and(|(k, _)| *k + retry_duration < now)
{
#[allow(clippy::unwrap_used)] // we just checked in the condition that first is_some
let (_, m) = self.ordered_times.pop_first().unwrap();
self.matches.remove(&m);
}

View file

@ -1,5 +1,5 @@
use std::{
collections::BTreeMap,
collections::{BTreeMap, HashMap},
error::Error,
path::PathBuf,
sync::{
@ -8,7 +8,6 @@ use std::{
},
};
use socket::socket_manager;
use tokio::{
process::Child,
select,
@ -20,6 +19,7 @@ use tracing::info;
use crate::concepts::{Config, Filter, Stream};
use database::database_manager;
use filter::FilterManager;
use socket::socket_manager;
use stream::stream_manager;
mod database;
@ -31,7 +31,7 @@ mod stream;
// type SharedState = BTreeMap<&'static Stream, Arc<Mutex<BTreeMap<&'static Filter, FilterManager>>>>;
struct SharedState {
pub s: BTreeMap<&'static Stream, Arc<Mutex<BTreeMap<&'static Filter, FilterManager>>>>,
pub s: HashMap<&'static Stream, Arc<Mutex<HashMap<&'static Filter, FilterManager>>>>,
}
// #[allow(unsafe_code)]
@ -66,10 +66,10 @@ pub async fn daemon(
};
// Filter managers
let mut stream_filter_managers_handlers = BTreeMap::new();
let mut stream_filter_managers_handlers = HashMap::new();
let mut log2filter_tx = BTreeMap::new();
for stream in config.streams().values() {
let mut filter_managers_handlers = BTreeMap::new();
let mut filter_managers_handlers = HashMap::new();
for filter in stream.filters().values() {
let manager = FilterManager::new(
filter,
@ -92,10 +92,11 @@ pub async fn daemon(
database_manager(config, log_rx, log2filter_tx)
};
let mut stream_filter_managers = SharedState { s: BTreeMap::new() };
let mut stream_filter_managers = SharedState { s: HashMap::new() };
for (stream, filter_manager_handlers) in stream_filter_managers_handlers {
let mut filter_managers = BTreeMap::new();
let mut filter_managers = HashMap::new();
for (filter, filter_manager_handler) in filter_manager_handlers {
#[allow(clippy::unwrap_used)] // propagating panics is ok
filter_managers.insert(filter, filter_manager_handler.await.unwrap());
}
stream_filter_managers
@ -128,7 +129,6 @@ pub async fn daemon(
let socket_manager_task_handle = {
let socket = socket.to_owned();
let stream_filter_managers = stream_filter_managers.clone();
tokio::spawn(async move {
socket_manager(config, socket, stream_filter_managers, shutdown_rx).await
})

View file

@ -50,7 +50,7 @@ fn open_socket(path: PathBuf) -> Result<UnixListener, String> {
async fn answer_order(
config: &'static Config,
shared_state: &SharedState,
shared_state: &Arc<SharedState>,
options: ClientRequest,
) -> Result<ClientStatus, String> {
// Compute options
@ -82,17 +82,22 @@ async fn answer_order(
})
.collect::<Result<BTreeMap<Arc<Pattern>, Regex>, String>>()?;
let cs: ClientStatus = futures::stream::iter(shared_state.s.iter())
// stream filtering
.filter(|(stream, _)| async {
stream_name.is_none()
|| stream_name
.clone()
.is_some_and(|name| name == stream.name())
})
.fold(BTreeMap::new(), |mut acc, (stream, filter_manager)| async {
let mut filter_manager = filter_manager.lock().await;
let inner_map = filter_manager
let cs: ClientStatus = futures::stream::iter(
shared_state
.s
.iter()
// stream filtering
.filter(|(stream, _)| {
stream_name.is_none()
|| stream_name
.clone()
.is_some_and(|name| name == stream.name())
}),
)
.fold(BTreeMap::new(), |mut acc, (stream, filter_manager)| async {
let mut filter_manager = filter_manager.lock().await;
let inner_map = futures::stream::iter(
filter_manager
.iter_mut()
// filter filtering
.filter(|(filter, _)| {
@ -106,18 +111,20 @@ async fn answer_order(
patterns
.iter()
.all(|(pattern, _)| filter.patterns().get(pattern).is_some())
})
.map(|(filter, manager)| {
(
filter.name().to_owned(),
manager.handle_order(&patterns, options.order),
)
})
.collect();
acc.insert(stream.name().to_owned(), inner_map);
acc
}),
)
.then(|(filter, manager)| async {
(
filter.name().to_owned(),
manager.handle_order(&patterns, options.order).await,
)
})
.collect()
.await;
acc.insert(stream.name().to_owned(), inner_map);
acc
})
.await;
Ok(cs)
}

View file

@ -1,4 +1,4 @@
use std::{collections::BTreeMap, process::Stdio, sync::Arc};
use std::{collections::HashMap, process::Stdio, sync::Arc};
use tokio::{
io::{AsyncBufReadExt, BufReader},
@ -15,7 +15,7 @@ use crate::{
pub async fn stream_manager(
stream: &'static Stream,
child_tx: oneshot::Sender<Option<Child>>,
filter_managers: Arc<Mutex<BTreeMap<&'static Filter, FilterManager>>>,
filter_managers: Arc<Mutex<HashMap<&'static Filter, FilterManager>>>,
) {
info!("{}: start {:?}", stream.name(), stream.cmd());
let mut child = match Command::new(&stream.cmd()[0])

View file

@ -146,7 +146,9 @@ async fn simple() {
file_with_contents(out_path, "");
assert!(daemon(config_path.into(), socket_path.into()).await.is_ok());
assert!(daemon(config_path.into(), socket_path.into())
.await
.is_err());
// 36 from DB
// 12 from DB