big refacto

avoid channels
remove matches_manager and execs_manager
one stream_manager task handles filter_managers and action_managers
This commit is contained in:
ppom 2024-10-14 12:00:00 +02:00
commit 72887f3af0
13 changed files with 460 additions and 599 deletions

View file

@ -56,7 +56,7 @@ streams:
after: 1m
onexit: false
tailDown4:
cmd: [ 'sh', '-c', 'sleep 2; seq 1000100 | while read i; do echo found $i; done; sleep infinity' ]
cmd: [ 'sh', '-c', 'sleep 2; seq 1000100 | while read i; do echo found $i; done' ]
filters:
find:
regex:

View file

@ -74,6 +74,10 @@ impl Filter {
self.retry_duration
}
pub fn actions(&self) -> &BTreeMap<String, Action> {
&self.actions
}
pub fn setup(
&mut self,
stream_name: &str,

View file

@ -1,15 +1,8 @@
use std::{collections::BTreeMap, process::Stdio};
use std::{cmp::Ordering, collections::BTreeMap};
use chrono::Local;
use serde::Deserialize;
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{Child, Command},
sync::{mpsc, oneshot},
};
use tracing::{error, info};
use super::{Filter, Patterns, MFT};
use super::{Filter, Patterns};
#[derive(Clone, Debug, Deserialize)]
#[serde(deny_unknown_fields)]
@ -30,6 +23,14 @@ impl Stream {
self.filters.get(filter_name)
}
pub fn name(&self) -> &str {
&self.name
}
pub fn cmd(&self) -> &Vec<String> {
&self.cmd
}
pub fn setup(&mut self, name: &str, patterns: &Patterns) -> Result<(), String> {
self._setup(name, patterns)
.map_err(|msg| format!("stream {}: {}", name, msg))
@ -62,66 +63,22 @@ impl Stream {
Ok(())
}
}
pub async fn manager(
&'static self,
child_tx: oneshot::Sender<Option<Child>>,
match_tx: mpsc::Sender<MFT>,
) {
info!("{}: start {:?}", self.name, self.cmd);
let mut child = match Command::new(&self.cmd[0])
.args(&self.cmd[1..])
.stdin(Stdio::null())
.stderr(Stdio::null())
.stdout(Stdio::piped())
.spawn()
{
Ok(child) => child,
Err(err) => {
error!("could not execute stream {} cmd: {}", self.name, err);
let _ = child_tx.send(None);
return;
}
};
// keep stdout before sending/moving child to the main thread
#[allow(clippy::unwrap_used)]
// we know there is an stdout because we asked for Stdio::piped()
let mut lines = BufReader::new(child.stdout.take().unwrap()).lines();
// let main handle the child process
let _ = child_tx.send(Some(child));
loop {
match lines.next_line().await {
Ok(Some(line)) => {
for filter in self.filters.values() {
if let Some(match_) = filter.get_match(&line) {
#[allow(clippy::unwrap_used)] // propagating panics is ok
match_tx
.send(MFT {
m: match_,
o: filter,
t: Local::now(),
})
.await
.unwrap();
}
}
}
Ok(None) => {
error!("stream {} exited: its command returned.", self.name);
break;
}
Err(err) => {
error!(
"impossible to read output from stream {}: {}",
self.name, err
);
break;
}
}
}
impl PartialEq for Stream {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
}
}
impl Eq for Stream {}
impl Ord for Stream {
fn cmp(&self, other: &Self) -> Ordering {
self.name.cmp(&other.name)
}
}
impl PartialOrd for Stream {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

133
rust/src/daemon/action.rs Normal file
View file

@ -0,0 +1,133 @@
use std::{
collections::{BTreeMap, BTreeSet},
process::Stdio,
sync::{Arc, Mutex},
};
use chrono::{Local, TimeDelta};
use tokio::sync::Semaphore;
use tracing::{error, info};
use crate::concepts::{Action, Match, Time};
struct State {
pending: BTreeMap<Match, BTreeSet<Time>>,
ordered_times: BTreeMap<Time, Match>,
}
impl State {
fn add_match(&mut self, m: &Match, t: Time) {
self.pending.entry(m.clone()).or_default().insert(t);
self.ordered_times.insert(t, m.clone());
}
fn remove(&mut self, m: Match, t: Time) -> bool {
self.pending.entry(m).and_modify(|times| {
times.remove(&t);
});
self.ordered_times.remove(&t).is_some()
}
fn clear_past_times(&mut self, after: Option<TimeDelta>) {
let now = Local::now();
let after = after.unwrap_or_default();
while self
.ordered_times
.first_key_value()
.is_some_and(|(k, _)| *k + after < now)
{
let (_, m) = self.ordered_times.pop_first().unwrap();
self.pending.remove(&m);
}
}
}
#[derive(Clone)]
pub struct ActionManager {
action: &'static Action,
exec_limit: Option<Arc<Semaphore>>,
state: Arc<Mutex<State>>,
}
impl ActionManager {
pub fn new(
action: &'static Action,
pending: BTreeMap<Match, BTreeSet<Time>>,
exec_limit: Option<Arc<Semaphore>>,
) -> Self {
Self {
action,
exec_limit,
state: Arc::new(Mutex::new(State {
pending: pending.clone(),
ordered_times: pending
.into_iter()
.flat_map(|(m, times)| times.into_iter().map(move |time| (time, m.clone())))
.collect(),
})),
}
}
pub fn handle_exec(&mut self, m: Match, t: Time) {
let now = Local::now();
let exec_t = t + self.action.after_duration().unwrap_or_default();
if exec_t < now {
self.exec_now(m);
} else {
{
let mut state = self.state.lock().unwrap();
state.clear_past_times(self.action.after_duration());
state.add_match(&m, exec_t);
}
let this = self.clone();
tokio::spawn(async move {
let dur = (exec_t - now).to_std().expect("Duration is bigger than what's supported. Did you put an enormous after duration?");
tokio::time::sleep(dur).await;
let mut state = this.state.lock().unwrap();
if state.remove(m.clone(), t) {
this.exec_now(m);
}
});
}
}
fn exec_now(&self, m: Match) {
let semaphore = self.exec_limit.clone();
let action = self.action;
tokio::spawn(async move {
// Wait for semaphore's permission, if it is Some
let _permit = match semaphore {
#[allow(clippy::unwrap_used)] // We know the semaphore is not closed
Some(semaphore) => Some(semaphore.acquire_owned().await.unwrap()),
None => None,
};
// Construct command
let mut command = action.exec(&m);
info!("{}: run [{:?}]", &action, command.as_std());
if let Err(err) = command
.stdin(Stdio::null())
.stderr(Stdio::null())
.stdout(Stdio::piped())
.status()
.await
{
error!("{}: run [{:?}], code {}", &action, command.as_std(), err);
}
});
}
pub fn quit(&mut self) {
if self.action.on_exit() {
let mut state = self.state.lock().unwrap();
for (m, times) in &state.pending {
for _ in times {
self.exec_now(m.clone());
}
}
state.pending.clear();
state.ordered_times.clear();
}
}
}

View file

@ -63,7 +63,7 @@ macro_rules! flush_or_die {
pub async fn database_manager(
config: &'static Config,
mut log_rx: mpsc::Receiver<DatabaseManagerInput>,
matches_tx: mpsc::Sender<MFT>,
matches_tx: BTreeMap<&Filter, mpsc::Sender<MFT>>,
) -> task::JoinHandle<()> {
let (mut log_db, mut flush_db) = match rotate_db(config, Some(matches_tx)).await {
Ok(dbs) => dbs,
@ -82,6 +82,7 @@ pub async fn database_manager(
write_or_die!(log_db, entry);
cpt += 1;
if cpt == MAX_WRITES {
info!("Rotating database...");
cpt = 0;
flush_or_die!(log_db);
flush_or_die!(flush_db);
@ -97,6 +98,7 @@ pub async fn database_manager(
exit(1);
}
};
info!("Rotated database");
}
}
};
@ -108,17 +110,7 @@ pub async fn database_manager(
async fn rotate_db(
config: &'static Config,
matches_tx: Option<mpsc::Sender<MFT>>,
) -> Result<(WriteDB, WriteDB), DBError> {
info!("Rotating database...");
let res = _rotate_db(config, &matches_tx).await;
info!("Rotated database");
res
}
async fn _rotate_db(
config: &'static Config,
matches_tx: &Option<mpsc::Sender<MFT>>,
matches_tx: Option<BTreeMap<&Filter, mpsc::Sender<MFT>>>,
) -> Result<(WriteDB, WriteDB), DBError> {
// TODO asyncify this
let mut log_read_db = match ReadDB::open(LOG_DB_NAME, config).await? {
@ -148,7 +140,7 @@ async fn _rotate_db(
let mut log_write_db = WriteDB::create(LOG_DB_NEW_NAME, config).await;
__rotate_db(
_rotate_db(
matches_tx,
&mut log_read_db,
&mut flush_read_db,
@ -174,8 +166,8 @@ async fn _rotate_db(
Ok((log_write_db, flush_write_db))
}
async fn __rotate_db(
matches_tx: &Option<mpsc::Sender<MFT>>,
async fn _rotate_db(
matches_tx: Option<BTreeMap<&Filter, mpsc::Sender<MFT>>>,
log_read_db: &mut ReadDB,
flush_read_db: &mut ReadDB,
log_write_db: &mut WriteDB,
@ -238,10 +230,12 @@ async fn __rotate_db(
millisecond_disambiguation_counter += 1;
}
if let Some(tx) = matches_tx {
debug!("DB sending match from DB: {:?}", entry.m);
#[allow(clippy::unwrap_used)] // propagating panics is ok
tx.send(entry.clone().into()).await.unwrap();
if let Some(matches_tx) = &matches_tx {
if let Some(tx) = matches_tx.get(entry.f) {
debug!("DB sending match from DB: {:?}", entry.m);
#[allow(clippy::unwrap_used)] // propagating panics is ok
tx.send(entry.clone().into()).await.unwrap();
}
}
write_or_die!(log_write_db, entry);

View file

@ -1,163 +0,0 @@
use std::{collections::BTreeMap, process::Stdio, sync::Arc};
use chrono::Local;
use tokio::{
sync::{mpsc, watch, Semaphore},
time,
};
use tracing::{error, info};
use crate::concepts::{Action, ActionFilter, Config, LogEntry, Order, MAT};
use super::{
database::DatabaseManagerInput,
socket::SocketOrder,
statemap::{StateMap, StateMapTrait},
};
pub type ExecsMap = StateMap<Action>;
pub async fn execs_manager(
config: &'static Config,
mut exec_rx: mpsc::Receiver<MAT>,
mut socket_order_rx: mpsc::Receiver<SocketOrder<ExecsMap>>,
log_tx: mpsc::Sender<DatabaseManagerInput>,
mut stop: watch::Receiver<bool>,
) {
// FIXME replace with TryStreamExt::try_for_each_concurrent?
let semaphore = if config.concurrency() > 0 {
Some(Arc::new(Semaphore::new(config.concurrency())))
} else {
None
};
let exec_now = |mat: MAT| {
let semaphore = semaphore.clone();
tokio::spawn(async move {
let action = mat.o;
// Wait for semaphore's permission, if it is Some
let _permit = match semaphore {
#[allow(clippy::unwrap_used)] // We know the semaphore is not closed
Some(semaphore) => Some(semaphore.acquire_owned().await.unwrap()),
None => None,
};
// Construct command
let mut command = action.exec(&mat.m);
info!("{}: run [{:?}]", &action, command.as_std());
if let Err(err) = command
.stdin(Stdio::null())
.stderr(Stdio::null())
.stdout(Stdio::piped())
.status()
.await
{
error!("{}: run [{:?}], code {}", &action, command.as_std(), err);
}
});
};
let mut execs: ExecsMap = BTreeMap::new();
let (pendings_tx, mut pendings_rx) = mpsc::channel(1);
loop {
tokio::select! {
_ = stop.changed() => break,
Some(mat) = exec_rx.recv() => {
let now = Local::now();
if mat.t < now {
exec_now(mat);
} else {
execs.add(&mat);
{
let mat = mat.clone();
let pendings_tx = pendings_tx.clone();
let mut stop = stop.clone();
tokio::spawn(async move {
let dur = (mat.t - now).to_std().expect("Duration is bigger than what's supported. Did you put an enormous after duration?");
tokio::select! {
biased;
_ = stop.changed() => {}
_ = time::sleep(dur) => {
#[allow(clippy::unwrap_used)] // propagating panics is ok
pendings_tx
.send(mat.clone())
.await
.unwrap();
}
}
});
}
}
}
Some(mat) = pendings_rx.recv() => {
if execs.rm(&mat) {
exec_now(mat);
}
}
Some((order, options, tx)) = socket_order_rx.recv() => {
let filtered = execs.filtered(options);
if let Order::Flush = order {
let now = Local::now();
// filter the state_map according to provided options
for (action, inner_map) in &filtered {
// get filter (required for LogEntry, FIXME optimize this)
let filter = {
let name = action.full_name();
#[allow(clippy::unwrap_used)]
// We're pretty confident our action has a filter
config
.get_filter(&(name.0.to_string(), name.1.to_string()))
.unwrap()
};
for match_ in inner_map.keys() {
let mat = MAT {
m: match_.clone(),
o: action,
t: now,
};
// delete them from state and execute them
if let Some(set) = execs.rm_times(&mat) {
for _ in set {
exec_now(mat.clone());
}
}
#[allow(clippy::unwrap_used)] // propagating panics is ok
log_tx
.send(DatabaseManagerInput::Flush(LogEntry {
exec: false,
m: mat.m,
f: filter,
t: mat.t,
}))
.await
.unwrap();
}
}
}
#[allow(clippy::unwrap_used)] // propagating panics is ok
tx.send(filtered).unwrap();
}
else => break
}
}
for (action, inner_map) in &mut execs {
if action.on_exit() {
for (match_, inner_set) in inner_map {
for _ in inner_set.iter() {
exec_now(MAT {
m: match_.clone(),
o: action,
t: Local::now(),
});
}
}
}
}
}

126
rust/src/daemon/filter.rs Normal file
View file

@ -0,0 +1,126 @@
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};
use chrono::Local;
use tokio::sync::{mpsc, Semaphore};
use crate::concepts::{Filter, LogEntry, Match, Time, MFT};
use super::{action::ActionManager, database::DatabaseManagerInput};
pub struct FilterManager {
filter: &'static Filter,
log_tx: mpsc::Sender<DatabaseManagerInput>,
action_managers: Vec<ActionManager>,
matches: BTreeMap<Match, BTreeSet<Time>>,
ordered_times: BTreeMap<Time, Match>,
}
impl FilterManager {
pub fn new(
filter: &'static Filter,
matches: BTreeMap<Match, BTreeSet<Time>>,
exec_limit: Option<Arc<Semaphore>>,
log_tx: mpsc::Sender<DatabaseManagerInput>,
) -> Self {
Self {
filter,
log_tx,
action_managers: filter
.actions()
.values()
.map(|action| ActionManager::new(action, BTreeMap::default(), exec_limit.clone()))
.collect(),
matches: matches.clone(),
ordered_times: matches
.into_iter()
.flat_map(|(m, times)| times.into_iter().map(move |time| (time, m.clone())))
.collect(),
}
}
pub async fn handle_db_entries(mut self, mut match_rx: mpsc::Receiver<MFT>) -> Self {
while let Some(mft) = match_rx.recv().await {
self.handle_match(mft.m, mft.t, false).await;
}
self
}
pub async fn handle_line(&mut self, line: &str) {
if let Some(match_) = self.filter.get_match(line) {
let now = Local::now();
self.handle_match(match_, now, true).await;
}
}
pub async fn handle_match(&mut self, m: Match, t: Time, send_log: bool) {
self.clear_past_times();
let exec = match self.filter.retry() {
None => true,
Some(retry) => {
self.add_match(&m, t);
// Number of stored times for this match >= configured retry for this filter
self.get_times(&m) >= retry as usize
}
};
if exec {
self.remove_match(&m);
for manager in &mut self.action_managers {
manager.handle_exec(m.clone(), t);
}
}
if send_log {
#[allow(clippy::unwrap_used)] // propagating panics is ok
self.log_tx
.send(DatabaseManagerInput::Log(LogEntry {
exec,
m,
f: self.filter,
t,
}))
.await
.unwrap();
}
}
pub fn quit(&mut self) {
self.action_managers
.iter_mut()
.for_each(|manager| manager.quit());
}
fn add_match(&mut self, m: &Match, t: Time) {
self.matches.entry(m.clone()).or_default().insert(t);
self.ordered_times.insert(t, m.clone());
}
fn remove_match(&mut self, m: &Match) {
if let Some(times) = self.matches.remove(m) {
for t in times {
self.ordered_times.remove(&t);
}
}
}
fn clear_past_times(&mut self) {
let now = Local::now();
let retry_duration = self.filter.retry_duration().unwrap_or_default();
while self
.ordered_times
.first_key_value()
.is_some_and(|(k, _)| *k + retry_duration < now)
{
let (_, m) = self.ordered_times.pop_first().unwrap();
self.matches.remove(&m);
}
}
fn get_times(&self, m: &Match) -> usize {
self.matches.get(m).map(|v| v.len()).unwrap_or(0)
}
}

View file

@ -1,142 +0,0 @@
use std::collections::BTreeMap;
use chrono::Local;
use tokio::{
sync::{mpsc, watch},
time,
};
use super::{
database::DatabaseManagerInput,
socket::SocketOrder,
statemap::{StateMap, StateMapTrait},
};
use crate::concepts::{Filter, LogEntry, Order, MAT, MFT};
pub type MatchesMap = StateMap<Filter>;
pub async fn matches_manager(
mut match_rx: mpsc::Receiver<MFT>,
mut startup_match_rx: mpsc::Receiver<MFT>,
mut socket_order_rx: mpsc::Receiver<SocketOrder<MatchesMap>>,
action_tx: mpsc::Sender<MAT>,
log_tx: mpsc::Sender<DatabaseManagerInput>,
mut stop: watch::Receiver<bool>,
) {
let mut matches: MatchesMap = BTreeMap::new();
let (unmatches_tx, mut unmatches_rx) = mpsc::channel(1);
while let Some(mft) = startup_match_rx.recv().await {
let _ = handle_match(&mut matches, mft.clone(), &unmatches_tx, &action_tx, &stop).await;
}
loop {
tokio::select! {
_ = stop.changed() => break,
Some(mft) = match_rx.recv() => {
let exec = handle_match(&mut matches, mft.clone(), &unmatches_tx, &action_tx, &stop).await;
#[allow(clippy::unwrap_used)] // propagating panics is ok
log_tx
.send(DatabaseManagerInput::Log(LogEntry {
exec,
m: mft.m,
f: mft.o,
t: mft.t,
}))
.await
.unwrap();
}
Some(mft) = unmatches_rx.recv() => {
matches.rm(&mft);
}
Some((order, options, tx)) = socket_order_rx.recv() => {
// FIXME do not clone
let filtered = matches.clone().filtered(options);
if let Order::Flush = order {
let now = Local::now();
// filter the state_map according to provided options
for (filter, inner_map) in &filtered {
for match_ in inner_map.keys() {
let mft = MFT {
m: match_.clone(),
o: filter,
t: now,
};
// delete them from state
matches.rm_times(&mft);
// send them to DB
#[allow(clippy::unwrap_used)] // propagating panics is ok
log_tx
.send(DatabaseManagerInput::Flush(LogEntry {
exec: false,
m: mft.m,
f: mft.o,
t: mft.t,
}))
.await
.unwrap();
}
}
}
#[allow(clippy::unwrap_used)] // propagating panics is ok
tx.send(filtered).unwrap();
}
else => break
}
}
}
async fn handle_match(
matches: &mut MatchesMap,
mft: MFT,
unmatches_tx: &mpsc::Sender<MFT>,
action_tx: &mpsc::Sender<MAT>,
stop: &watch::Receiver<bool>,
) -> bool {
// Store matches
let exec = match mft.o.retry() {
None => true,
Some(retry) => {
// Add new match
matches.add(&mft);
// Remove match when expired
{
let mft = mft.clone();
let unmatches_tx = unmatches_tx.clone();
let mut stop = stop.clone();
tokio::spawn(async move {
#[allow(clippy::unwrap_used)]
// retry_duration is always Some() after filter's setup
let dur = (mft.t - Local::now() + mft.o.retry_duration().unwrap())
.to_std()
.expect("Duration is bigger than what's supported. Did you put an enormous retry_duration?");
tokio::select! {
biased;
_ = stop.changed() => {}
_ = time::sleep(dur) => {
#[allow(clippy::unwrap_used)] // propagating panics is ok
unmatches_tx.send(mft).await.unwrap();
}
}
});
}
matches.get_times(&mft) >= retry as usize
}
};
// Executing actions
if exec {
// Delete matches only if storing them
if mft.o.retry().is_some() {
matches.rm_times(&mft);
}
mft.o.send_actions(&mft.m, mft.t, action_tx).await;
}
exec
}

View file

@ -1,28 +1,28 @@
use std::{error::Error, fs, path::PathBuf};
use std::{collections::BTreeMap, error::Error, path::PathBuf, sync::Arc};
use socket::socket_manager;
use tokio::{
process::Child,
select,
signal::unix::{signal, SignalKind},
sync::{mpsc, oneshot, watch},
sync::{mpsc, oneshot, watch, Semaphore},
};
use tracing::{error, info};
use tracing::info;
use crate::concepts::Config;
use database::database_manager;
use execs::execs_manager;
use matches::matches_manager;
use filter::FilterManager;
use stream::stream_manager;
mod database;
mod execs;
mod matches;
mod socket;
mod statemap;
// mod socket;
mod action;
mod filter;
mod stream;
pub async fn daemon(
config_path: PathBuf,
socket: PathBuf,
_socket: PathBuf,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let config: &'static Config =
Config::from_file(&config_path).map(|config| Box::leak(Box::new(config)))?;
@ -34,67 +34,76 @@ pub async fn daemon(
let mut stream_process_child_handles = Vec::new();
let mut stream_task_handles = Vec::new();
let (stream2match_tx, stream2match_rx) = mpsc::channel(123456);
let (database2match_tx, database2match_rx) = mpsc::channel(234560);
// let (socket2match_tx, socket2match_rx) = mpsc::channel(1);
// let (socket2exec_tx, socket2exec_rx) = mpsc::channel(1);
let (socket2match_tx, socket2match_rx) = mpsc::channel(1);
let (socket2exec_tx, socket2exec_rx) = mpsc::channel(1);
let (matches2exec_tx, matches2exec_rx) = mpsc::channel(234560);
let (log_tx, log_rx) = mpsc::channel(234560);
// Shutdown channel
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let matches_manager_task_handle = {
let log_tx = log_tx.clone();
let shutdown_rx = shutdown_rx.clone();
tokio::spawn(async move {
matches_manager(
stream2match_rx,
database2match_rx,
socket2match_rx,
matches2exec_tx,
log_tx,
shutdown_rx,
)
.await
})
// Semaphore limiting action execution concurrency
let exec_limit = if config.concurrency() > 0 {
Some(Arc::new(Semaphore::new(config.concurrency())))
} else {
None
};
let execs_manager_task_handle = {
let shutdown_rx = shutdown_rx.clone();
tokio::spawn(async move {
execs_manager(config, matches2exec_rx, socket2exec_rx, log_tx, shutdown_rx).await
})
};
// Filter managers
let mut stream_filter_managers_handlers = BTreeMap::new();
let mut log2filter_tx = BTreeMap::new();
for stream in config.streams().values() {
let mut filter_managers_handlers = BTreeMap::new();
for filter in stream.filters().values() {
let manager = FilterManager::new(
filter,
BTreeMap::default(),
exec_limit.clone(),
log_tx.clone(),
);
let (tx, rx) = mpsc::channel(1);
let handle = tokio::spawn(async move { manager.handle_db_entries(rx).await });
filter_managers_handlers.insert(filter, handle);
log2filter_tx.insert(filter, tx);
}
stream_filter_managers_handlers.insert(stream, filter_managers_handlers);
}
drop(log_tx);
drop(exec_limit);
let database_manager_task_handle = {
// The `task::spawn` is done in the function, after database rotation is finished
database_manager(config, log_rx, database2match_tx).await
database_manager(config, log_rx, log2filter_tx).await
};
let socket_manager_task_handle = {
let socket = socket.to_owned();
let shutdown_rx = shutdown_rx.clone();
tokio::spawn(async move {
socket_manager(config, socket, socket2match_tx, socket2exec_tx, shutdown_rx).await
})
};
let mut stream_filter_managers = BTreeMap::new();
for (stream, filter_manager_handlers) in stream_filter_managers_handlers {
let mut filter_managers = BTreeMap::new();
for (filter, filter_manager_handler) in filter_manager_handlers {
filter_managers.insert(filter, filter_manager_handler.await.unwrap());
}
stream_filter_managers.insert(stream, filter_managers);
}
for stream in config.streams().values() {
let stream2match_tx = stream2match_tx.clone();
// let socket_manager_task_handle = {
// let socket = socket.to_owned();
// let shutdown_rx = shutdown_rx.clone();
// tokio::spawn(async move {
// socket_manager(config, socket, socket2match_tx, socket2exec_tx, shutdown_rx).await
// })
// };
for (stream, filter_managers) in stream_filter_managers {
let (child_tx, child_rx) = oneshot::channel();
stream_task_handles.push(tokio::spawn(async move {
stream.manager(child_tx, stream2match_tx).await
stream_manager(stream, child_tx, filter_managers.into_values().collect()).await
}));
if let Ok(Some(child)) = child_rx.await {
stream_process_child_handles.push(child);
}
}
drop(stream2match_tx);
// Close streams when we receive a quit signal
handle_signals(stream_process_child_handles, shutdown_tx.clone())?;
@ -105,19 +114,11 @@ pub async fn daemon(
}
let _ = shutdown_tx.send(true);
let _ = socket_manager_task_handle.await;
let _ = matches_manager_task_handle.await;
let _ = execs_manager_task_handle.await;
// let _ = socket_manager_task_handle.await;
let _ = database_manager_task_handle.await;
let stop_ok = config.stop();
// not waiting for the socket_manager to finish, sorry
// TODO make it listen on shutdown_rx
if let Err(err) = fs::remove_file(socket) {
error!("failed to remove socket: {}", err);
}
if !*shutdown_rx.borrow() {
Err("quitting because all streams finished".into())
} else if !stop_ok {

View file

@ -198,7 +198,7 @@ pub async fn socket_manager(
exec_tx: mpsc::Sender<SocketOrder<ExecsMap>>,
mut stop: watch::Receiver<bool>,
) {
let listener = match open_socket(socket) {
let listener = match open_socket(socket.clone()) {
Ok(l) => l,
Err(err) => {
error!("while creating communication socket: {err}");
@ -245,4 +245,8 @@ pub async fn socket_manager(
}
}
}
if let Err(err) = fs::remove_file(socket) {
error!("failed to remove socket: {}", err);
}
}

View file

@ -1,122 +0,0 @@
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};
use regex::Regex;
use crate::concepts::{ActionFilter, Match, Pattern, Time, MT};
#[derive(Clone, Debug)]
pub struct FilterOptions {
pub stream_name: Option<String>,
pub filter_name: Option<String>,
pub patterns: BTreeMap<Arc<Pattern>, Regex>,
}
pub type StateMap<T> = BTreeMap<&'static T, BTreeMap<Match, BTreeSet<Time>>>;
// This trait is needed to permit to implement methods on an external type
pub trait StateMapTrait<T: ActionFilter> {
fn add(&mut self, mt: &MT<T>);
fn rm(&mut self, mt: &MT<T>) -> bool;
fn rm_times(&mut self, mt: &MT<T>) -> Option<BTreeSet<Time>>;
fn get_times(&self, mt: &MT<T>) -> usize;
fn filtered(&self, filter_options: FilterOptions) -> Self;
}
impl<T: ActionFilter> StateMapTrait<T> for StateMap<T> {
fn add(&mut self, mt: &MT<T>) {
let inner_map = self.entry(mt.o).or_default();
let inner_set = inner_map.entry(mt.m.clone()).or_default();
inner_set.insert(mt.t);
}
fn rm(&mut self, mt: &MT<T>) -> bool {
let mut removed = false;
if let Some(inner_map) = self.get_mut(&mt.o) {
if let Some(inner_set) = inner_map.get_mut(&mt.m) {
inner_set.remove(&mt.t);
removed = true;
if inner_set.is_empty() {
inner_map.remove(&mt.m);
}
}
if inner_map.is_empty() {
self.remove(&mt.o);
}
}
removed
}
fn rm_times(&mut self, mt: &MT<T>) -> Option<BTreeSet<Time>> {
let mut set = None;
if let Some(inner_map) = self.get_mut(&mt.o) {
set = inner_map.remove(&mt.m);
if inner_map.is_empty() {
self.remove(&mt.o);
}
}
set
}
fn get_times(&self, mt: &MT<T>) -> usize {
match self.get(&mt.o).and_then(|map| map.get(&mt.m)) {
Some(x) => x.len(),
None => 0,
}
}
fn filtered(&self, filter_options: FilterOptions) -> Self {
let FilterOptions {
stream_name,
filter_name,
patterns,
} = filter_options;
self.iter()
// stream/filter filtering
.filter(|(object, _)| {
if let Some(stream_name) = &stream_name {
let full_name = object.full_name();
let (s, f) = (full_name.0, full_name.1);
if *stream_name != s {
return false;
}
if let Some(filter_name) = &filter_name {
if *filter_name != f {
return false;
}
}
}
true
})
// pattern filtering
.filter(|(object, _)| {
patterns
.iter()
.all(|(pattern, _)| object.patterns().get(pattern).is_some())
})
// match filtering
.filter_map(|(object, inner_map)| {
let map: BTreeMap<Match, BTreeSet<Time>> = inner_map
.iter()
.filter(|(match_, _)| {
match_
.iter()
.zip(object.patterns())
.filter_map(|(a_match, pattern)| {
patterns.get(pattern.as_ref()).map(|regex| (a_match, regex))
})
.all(|(a_match, regex)| regex.is_match(a_match))
})
.map(|(a, b)| (a.clone(), b.clone()))
.collect();
if !map.is_empty() {
Some((*object, map))
} else {
None
}
})
.collect()
}
}

69
rust/src/daemon/stream.rs Normal file
View file

@ -0,0 +1,69 @@
use std::process::Stdio;
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{Child, Command},
sync::oneshot,
};
use tracing::{error, info};
use crate::{concepts::Stream, daemon::filter::FilterManager};
pub async fn stream_manager(
stream: &'static Stream,
child_tx: oneshot::Sender<Option<Child>>,
mut filter_managers: Vec<FilterManager>,
) {
info!("{}: start {:?}", stream.name(), stream.cmd());
let mut child = match Command::new(&stream.cmd()[0])
.args(&stream.cmd()[1..])
.stdin(Stdio::null())
.stderr(Stdio::null())
.stdout(Stdio::piped())
.spawn()
{
Ok(child) => child,
Err(err) => {
error!("could not execute stream {} cmd: {}", stream.name(), err);
let _ = child_tx.send(None);
return;
}
};
// keep stdout before sending/moving child to the main thread
#[allow(clippy::unwrap_used)]
// we know there is an stdout because we asked for Stdio::piped()
let mut lines = BufReader::new(child.stdout.take().unwrap()).lines();
// let main handle the child process
let _ = child_tx.send(Some(child));
loop {
match lines.next_line().await {
Ok(Some(line)) => {
futures::future::join_all(
filter_managers
.iter_mut()
.map(|manager| manager.handle_line(&line)),
)
.await;
}
Ok(None) => {
error!("stream {} exited: its command returned.", stream.name());
break;
}
Err(err) => {
error!(
"impossible to read output from stream {}: {}",
stream.name(),
err
);
break;
}
}
}
filter_managers
.iter_mut()
.for_each(|manager| manager.quit());
}

View file

@ -25,34 +25,34 @@ async fn main() {
eprintln!("this error occurred.");
}));
console_subscriber::init();
// console_subscriber::init();
let cli = Cli::parse();
// {
// // Set log level
// let level = if let Command::Start {
// loglevel,
// config: _,
// socket: _,
// } = cli.command
// {
// loglevel
// } else {
// Level::DEBUG
// };
// if let Err(err) = tracing_subscriber::fmt::fmt()
// .without_time()
// .with_target(false)
// .with_ansi(std::io::stdout().is_terminal())
// // .with_max_level(level)
// .with_max_level(Level::TRACE)
// .try_init()
// {
// eprintln!("ERROR could not initialize logging: {err}");
// exit(1);
// }
// }
{
// Set log level
let level = if let Command::Start {
loglevel,
config: _,
socket: _,
} = cli.command
{
loglevel
} else {
Level::DEBUG
};
if let Err(err) = tracing_subscriber::fmt::fmt()
.without_time()
.with_target(false)
.with_ansi(std::io::stdout().is_terminal())
.with_max_level(level)
// .with_max_level(Level::TRACE)
.try_init()
{
eprintln!("ERROR could not initialize logging: {err}");
exit(1);
}
}
let result = match cli.command {
Command::Start {