WIP reimplement socket part

I think I'm stumbling on this rust compiler bug: https://github.com/rust-lang/rust/issues/110338
This commit is contained in:
ppom 2024-10-20 12:00:00 +02:00
commit 2e00092c18
8 changed files with 204 additions and 159 deletions

View file

@ -46,6 +46,9 @@ impl ActionFilter for Action {
}
impl Action {
pub fn name(&self) -> &str {
&self.name
}
pub fn after_duration(&self) -> Option<TimeDelta> {
self.after_duration
}

View file

@ -66,6 +66,10 @@ impl Filter {
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn retry(&self) -> Option<u32> {
self.retry
}

View file

@ -28,8 +28,7 @@ impl State {
self.ordered_times.remove(&t).is_some()
}
fn clear_past_times(&mut self, after: Option<TimeDelta>) {
let now = Local::now();
fn clear_past_times(&mut self, now: Time, after: Option<TimeDelta>) {
let after = after.unwrap_or_default();
while self
.ordered_times
@ -50,6 +49,10 @@ pub struct ActionManager {
}
impl ActionManager {
pub fn action(&self) -> &'static Action {
self.action
}
pub fn new(
action: &'static Action,
pending: BTreeMap<Match, BTreeSet<Time>>,
@ -76,7 +79,7 @@ impl ActionManager {
} else {
{
let mut state = self.state.lock().unwrap();
state.clear_past_times(self.action.after_duration());
state.clear_past_times(t, self.action.after_duration());
state.add_match(&m, exec_t);
}
let this = self.clone();
@ -91,6 +94,14 @@ impl ActionManager {
}
}
pub fn to_readable_vec(&self, match_: &Match) -> Option<Vec<String>> {
self.state.lock().unwrap().pending.get(match_).map(|set| {
set.iter()
.map(|time| time.to_rfc3339().chars().take(19).collect())
.collect()
})
}
fn exec_now(&self, m: Match) {
let semaphore = self.exec_limit.clone();
let action = self.action;

View file

@ -4,9 +4,10 @@ use std::{
};
use chrono::Local;
use regex::Regex;
use tokio::sync::{mpsc, Semaphore};
use crate::concepts::{Filter, LogEntry, Match, Time, MFT};
use crate::concepts::{ActionFilter, Filter, LogEntry, Match, Pattern, PatternStatus, Time, MFT};
use super::{action::ActionManager, database::DatabaseManagerInput};
@ -94,6 +95,43 @@ impl FilterManager {
.for_each(|manager| manager.quit());
}
pub fn to_pattern_status_map(
&self,
patterns: &BTreeMap<Arc<Pattern>, Regex>,
) -> BTreeMap<String, PatternStatus> {
self.matches
.iter()
// TODO match filtering
.filter(|(match_, _)| {
match_
.iter()
.zip(self.filter.patterns())
.filter_map(|(a_match, pattern)| {
patterns.get(pattern.as_ref()).map(|regex| (a_match, regex))
})
.all(|(a_match, regex)| regex.is_match(a_match))
})
.map(|(match_, times)| {
let actions =
self.action_managers
.iter()
.fold(BTreeMap::default(), |mut acc, manager| {
if let Some(times) = manager.to_readable_vec(match_) {
acc.insert(manager.action().name().to_owned(), times);
}
acc
});
(
match_.join(" "),
PatternStatus {
matches: times.len(),
actions,
},
)
})
.collect()
}
fn add_match(&mut self, m: &Match, t: Time) {
self.matches.entry(m.clone()).or_default().insert(t);
self.ordered_times.insert(t, m.clone());

View file

@ -1,28 +1,48 @@
use std::{collections::BTreeMap, error::Error, path::PathBuf, sync::Arc};
use std::{
collections::BTreeMap,
error::Error,
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use socket::socket_manager;
use tokio::{
process::Child,
select,
signal::unix::{signal, SignalKind},
sync::{mpsc, oneshot, watch, Semaphore},
sync::{broadcast, mpsc, oneshot, Mutex, Semaphore},
};
use tracing::info;
use crate::concepts::Config;
use crate::concepts::{Config, Filter, Stream};
use database::database_manager;
use filter::FilterManager;
use stream::stream_manager;
mod database;
// mod socket;
mod socket;
mod action;
mod filter;
mod stream;
// type SharedState = BTreeMap<&'static Stream, Arc<Mutex<BTreeMap<&'static Filter, FilterManager>>>>;
struct SharedState {
pub s: BTreeMap<&'static Stream, Arc<Mutex<BTreeMap<&'static Filter, FilterManager>>>>,
}
#[allow(unsafe_code)]
// It's actually safe. It's a bug in rust that shadows the 'static reference, which ensures the
// reference will always link to something
// https://github.com/rust-lang/rust/issues/96865
unsafe impl Send for SharedState {}
pub async fn daemon(
config_path: PathBuf,
_socket: PathBuf,
socket: PathBuf,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let config: &'static Config =
Config::from_file(&config_path).map(|config| Box::leak(Box::new(config)))?;
@ -40,7 +60,7 @@ pub async fn daemon(
let (log_tx, log_rx) = mpsc::channel(234560);
// Shutdown channel
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
// Semaphore limiting action execution concurrency
let exec_limit = if config.concurrency() > 0 {
@ -76,28 +96,23 @@ pub async fn daemon(
database_manager(config, log_rx, log2filter_tx)
};
let mut stream_filter_managers = BTreeMap::new();
let mut stream_filter_managers = SharedState { s: BTreeMap::new() };
for (stream, filter_manager_handlers) in stream_filter_managers_handlers {
let mut filter_managers = BTreeMap::new();
for (filter, filter_manager_handler) in filter_manager_handlers {
filter_managers.insert(filter, filter_manager_handler.await.unwrap());
}
stream_filter_managers.insert(stream, filter_managers);
stream_filter_managers.s.insert(stream, Arc::new(Mutex::new(filter_managers)));
}
let stream_filter_managers: Arc<SharedState> = Arc::new(stream_filter_managers);
// let socket_manager_task_handle = {
// let socket = socket.to_owned();
// let shutdown_rx = shutdown_rx.clone();
// tokio::spawn(async move {
// socket_manager(config, socket, socket2match_tx, socket2exec_tx, shutdown_rx).await
// })
// };
for (stream, filter_managers) in stream_filter_managers {
for (stream, filter_managers) in stream_filter_managers.s.iter() {
let (child_tx, child_rx) = oneshot::channel();
let filter_managers = filter_managers.clone();
let stream = *stream;
stream_task_handles.push(tokio::spawn(async move {
stream_manager(stream, child_tx, filter_managers.into_values().collect()).await
stream_manager(stream, child_tx, filter_managers).await
}));
if let Ok(Some(child)) = child_rx.await {
@ -106,20 +121,33 @@ pub async fn daemon(
}
// Close streams when we receive a quit signal
handle_signals(stream_process_child_handles, shutdown_tx.clone())?;
let signal_received = Arc::new(AtomicBool::new(false));
handle_signals(
stream_process_child_handles,
shutdown_tx.clone(),
signal_received.clone(),
)?;
let socket_manager_task_handle = {
let socket = socket.to_owned();
let stream_filter_managers: Arc<SharedState> = stream_filter_managers.clone();
tokio::spawn(async move {
socket_manager(config, socket, stream_filter_managers, shutdown_rx).await
})
};
// Wait for all streams to quit
for task_handle in stream_task_handles {
let _ = task_handle.await;
}
let _ = shutdown_tx.send(true);
let _ = shutdown_tx.send(());
// let _ = socket_manager_task_handle.await;
let _ = socket_manager_task_handle.await;
let _ = database_manager_thread_handle.join();
let stop_ok = config.stop();
if !*shutdown_rx.borrow() {
if !signal_received.load(Ordering::SeqCst) {
Err("quitting because all streams finished".into())
} else if !stop_ok {
Err("while executing stop command".into())
@ -130,7 +158,8 @@ pub async fn daemon(
fn handle_signals(
stream_process_child_handles: Vec<Child>,
shutdown_tx: watch::Sender<bool>,
shutdown_tx: broadcast::Sender<()>,
signal_received: Arc<AtomicBool>,
) -> tokio::io::Result<()> {
let mut sighup = signal(SignalKind::hangup())?;
let mut sigint = signal(SignalKind::interrupt())?;
@ -141,7 +170,8 @@ fn handle_signals(
_ = sigint.recv() => "SIGINT",
_ = sigterm.recv() => "SIGTERM",
};
let _ = shutdown_tx.send(true);
let _ = shutdown_tx.send(());
signal_received.store(true, Ordering::SeqCst);
info!("received {signal}, closing streams...");
// Kill stream subprocesses
for mut child_handle in stream_process_child_handles.into_iter() {

View file

@ -3,11 +3,7 @@ use std::{collections::BTreeMap, fs, io, path::PathBuf, process::exit, sync::Arc
use bincode::Options;
use futures::{SinkExt, StreamExt};
use regex::Regex;
use tokio::{
join,
net::UnixListener,
sync::{mpsc, oneshot, watch},
};
use tokio::{net::UnixListener, sync::{broadcast, oneshot}};
use tokio_util::{
bytes::Bytes,
codec::{Framed, LengthDelimitedCodec},
@ -15,14 +11,18 @@ use tokio_util::{
use tracing::{error, warn};
use crate::{
concepts::{
ActionFilter, ClientRequest, ClientStatus, Config, DaemonResponse, Order, Pattern,
PatternStatus,
},
concepts::{ActionFilter, ClientRequest, ClientStatus, Config, DaemonResponse, Order, Pattern},
utils::bincode_options,
};
use super::{execs::ExecsMap, matches::MatchesMap, statemap::FilterOptions};
use super::SharedState;
#[derive(Clone)]
pub struct FilterOptions {
pub stream_name: Option<String>,
pub filter_name: Option<String>,
pub patterns: BTreeMap<Arc<Pattern>, Regex>,
}
macro_rules! err_str {
($expression:expr) => {
@ -57,122 +57,75 @@ fn open_socket(path: PathBuf) -> Result<UnixListener, String> {
async fn answer_order(
config: &'static Config,
match_tx: &mpsc::Sender<SocketOrder<MatchesMap>>,
exec_tx: &mpsc::Sender<SocketOrder<ExecsMap>>,
shared_state: &SharedState,
options: ClientRequest,
) -> Result<ClientStatus, String> {
// Compute options
let filtering_options = {
let (stream_name, filter_name) = match options.stream_filter {
Some(sf) => match sf.split_once(".") {
Some((s, f)) => (Some(s.to_string()), Some(f.to_string())),
None => (Some(sf), None),
},
None => (None, None),
};
// Compute the Vec<(pattern_name, String)> into a BTreeMap<Arc<Pattern>, Regex>
let patterns = options
.patterns
.iter()
.map(|(name, reg)| {
// lookup pattern in config.patterns
config
.patterns()
.iter()
// retrieve or Err
.find(|(pattern_name, _)| name == *pattern_name)
.ok_or_else(|| format!("pattern '{name}' doesn't exist"))
// compile Regex or Err
.and_then(|(_, pattern)| match Regex::new(reg) {
Ok(reg) => Ok((pattern.clone(), reg)),
Err(err) => Err(format!("pattern '{name}' regex doesn't compile: {err}")),
})
})
.collect::<Result<BTreeMap<Arc<Pattern>, Regex>, String>>()?;
FilterOptions {
stream_name,
filter_name,
patterns,
}
let (stream_name, filter_name) = match options.stream_filter {
Some(sf) => match sf.split_once(".") {
Some((s, f)) => (Some(s.to_string()), Some(f.to_string())),
None => (Some(sf), None),
},
None => (None, None),
};
// ask for matches clone
let filtering_options2 = filtering_options.clone();
let matches = async move {
let (m_tx, m_rx) = oneshot::channel();
// Compute the Vec<(pattern_name: String, regex: String)> into a BTreeMap<Arc<Pattern>, Regex>
let patterns = options
.patterns
.iter()
.map(|(name, reg)| {
// lookup pattern in config.patterns
config
.patterns()
.iter()
// retrieve or Err
.find(|(pattern_name, _)| name == *pattern_name)
.ok_or_else(|| format!("pattern '{name}' doesn't exist"))
// compile Regex or Err
.and_then(|(_, pattern)| match Regex::new(reg) {
Ok(reg) => Ok((pattern.clone(), reg)),
Err(err) => Err(format!("pattern '{name}' regex doesn't compile: {err}")),
})
})
.collect::<Result<BTreeMap<Arc<Pattern>, Regex>, String>>()?;
#[allow(clippy::unwrap_used)] // propagating panics is ok
match_tx
.send((options.order, filtering_options2, m_tx))
.await
.unwrap();
#[allow(clippy::unwrap_used)] // propagating panics is ok
m_rx.await.unwrap()
};
// ask for execs clone
let execs = async move {
let (e_tx, e_rx) = oneshot::channel();
#[allow(clippy::unwrap_used)] // propagating panics is ok
exec_tx
.send((options.order, filtering_options, e_tx))
.await
.unwrap();
#[allow(clippy::unwrap_used)] // propagating panics is ok
e_rx.await.unwrap()
};
let (matches, execs) = join!(matches, execs);
// Transform matches and execs into a ClientStatus
let cs: ClientStatus = matches
.into_iter()
.fold(BTreeMap::new(), |mut acc, (object, map)| {
let (stream, filter, _) = object.full_name();
acc.entry(stream.into())
.or_default()
.entry(filter.into())
.or_default()
.extend(map.into_iter().map(|(match_, times)| {
// TODO directly call flush function here?
let cs: ClientStatus = futures::stream::iter(shared_state.s.iter())
// stream filtering
.filter(|(stream, _)| async {
stream_name.is_none()
|| stream_name
.clone()
.is_some_and(|name| name == stream.name())
})
.fold(BTreeMap::new(), |mut acc, (stream, filter_manager)| async {
let filter_manager = filter_manager.lock().await;
let inner_map = filter_manager
.iter()
// filter filtering
.filter(|(filter, _)| {
filter_name.is_none()
|| filter_name
.clone()
.is_some_and(|name| name == filter.name())
})
// pattern filtering
.filter(|(filter, _)| {
patterns
.iter()
.all(|(pattern, _)| filter.patterns().get(pattern).is_some())
})
.map(|(filter, manager)| {
(
match_.join(" "),
PatternStatus {
matches: times.len(),
..Default::default()
},
filter.name().to_owned(),
manager.to_pattern_status_map(&patterns),
)
}));
})
.collect();
acc.insert(stream.name().to_owned(), inner_map);
acc
});
let cs = execs.into_iter().fold(cs, |mut acc, (object, map)| {
let (stream, filter, action) = object.full_name();
let inner_map = acc
.entry(stream.into())
.or_default()
.entry(filter.into())
.or_default();
map.into_iter().for_each(|(match_, times)| {
inner_map
.entry(match_.join(" "))
.or_default()
.actions
.insert(
action.to_string(),
times
.into_iter()
.map(|time| time.to_rfc3339().chars().take(19).collect())
.collect(),
);
});
acc
});
})
.await;
Ok(cs)
}
@ -194,9 +147,8 @@ pub type SocketOrder<T> = (Order, FilterOptions, oneshot::Sender<T>);
pub async fn socket_manager(
config: &'static Config,
socket: PathBuf,
match_tx: mpsc::Sender<SocketOrder<MatchesMap>>,
exec_tx: mpsc::Sender<SocketOrder<ExecsMap>>,
mut stop: watch::Receiver<bool>,
shared_state: Arc<SharedState>,
mut stop: broadcast::Receiver<()>,
) {
let listener = match open_socket(socket.clone()) {
Ok(l) => l,
@ -209,7 +161,7 @@ pub async fn socket_manager(
let bin = bincode_options();
loop {
tokio::select! {
_ = stop.changed() => break,
_ = stop.recv() => break,
try_conn = listener.accept() => {
match try_conn {
Ok((conn, _)) => {
@ -228,7 +180,7 @@ pub async fn socket_manager(
bin.deserialize(&encoded_request)
);
// Process
let response = match answer_order(config, &match_tx, &exec_tx, request).await {
let response = match answer_order(config, &shared_state, request).await {
Ok(res) => DaemonResponse::Order(res),
Err(err) => DaemonResponse::Err(err),
};

View file

@ -1,18 +1,21 @@
use std::process::Stdio;
use std::{collections::BTreeMap, process::Stdio, sync::Arc};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{Child, Command},
sync::oneshot,
sync::{oneshot, Mutex},
};
use tracing::{error, info};
use crate::{concepts::Stream, daemon::filter::FilterManager};
use crate::{
concepts::{Filter, Stream},
daemon::filter::FilterManager,
};
pub async fn stream_manager(
stream: &'static Stream,
child_tx: oneshot::Sender<Option<Child>>,
mut filter_managers: Vec<FilterManager>,
filter_managers: Arc<Mutex<BTreeMap<&'static Filter, FilterManager>>>,
) {
info!("{}: start {:?}", stream.name(), stream.cmd());
let mut child = match Command::new(&stream.cmd()[0])
@ -43,7 +46,9 @@ pub async fn stream_manager(
Ok(Some(line)) => {
futures::future::join_all(
filter_managers
.iter_mut()
.lock()
.await
.values_mut()
.map(|manager| manager.handle_line(&line)),
)
.await;
@ -64,6 +69,8 @@ pub async fn stream_manager(
}
filter_managers
.iter_mut()
.lock()
.await
.values_mut()
.for_each(|manager| manager.quit());
}

View file

@ -2,10 +2,10 @@
clippy::panic,
clippy::todo,
clippy::unimplemented,
clippy::unwrap_used
clippy::unwrap_used,
unsafe_code
)]
#![allow(clippy::upper_case_acronyms, clippy::mutable_key_type)]
#![forbid(unsafe_code)]
pub mod client;
pub mod concepts;