bug fixing

- correctly quitting all threads
  I had to add a Stop variant of manager's enums because they hold a
  Sender of their own Receiver. So streams leaving their Sender don't
  close the channel. The Stop is sent to matches_manager when streams
  have quit, and sent to execs_manager when matches_manager has quit.
- fix ThreadPool panic when channel is closed
- fix filter not adding after_duration to MAT messages
- better logging of execs_manager
This commit is contained in:
ppom 2024-07-30 12:00:00 +02:00
commit ca89d5e61c
6 changed files with 54 additions and 42 deletions

View file

@ -11,7 +11,12 @@ use std::{
use log::{debug, error, info, Level};
use crate::{config, execs::execs_manager, matches::matches_manager, utils::SimpleLogger};
use crate::{
config,
execs::{execs_manager, ExecsManagerInput},
matches::{matches_manager, MatchManagerInput},
utils::SimpleLogger,
};
pub fn daemon(config_path: &PathBuf, loglevel: Level, socket: &PathBuf) {
if let Err(err) = SimpleLogger::init(loglevel) {
@ -37,34 +42,30 @@ pub fn daemon(config_path: &PathBuf, loglevel: Level, socket: &PathBuf) {
let mut stream_process_child_handles = Vec::new();
let mut stream_thread_handles = Vec::new();
{
let (match_tx, match_rx) = channel();
let (exec_tx, exec_rx) = channel();
let (match_tx, match_rx) = channel();
let (exec_tx, exec_rx) = channel();
let config_matches = config.clone();
let match_tx_matches = match_tx.clone();
let exec_tx_matches = exec_tx.clone();
stream_thread_handles.push(thread::spawn(move || {
matches_manager(config_matches, match_rx, match_tx_matches, exec_tx_matches)
}));
let config_matches = config.clone();
let match_tx_matches = match_tx.clone();
let exec_tx_matches = exec_tx.clone();
let matches_manager_thread_handle = thread::spawn(move || {
matches_manager(config_matches, match_rx, match_tx_matches, exec_tx_matches)
});
let config_execs = config.clone();
stream_thread_handles.push(thread::spawn(move || {
execs_manager(config_execs, exec_rx, exec_tx)
}));
let config_execs = config.clone();
let exec_tx_execs = exec_tx.clone();
let execs_manager_thread_handle =
thread::spawn(move || execs_manager(config_execs, exec_rx, exec_tx_execs));
// TODO execs manager
for stream in config.streams().values() {
let stream = stream.clone();
let match_tx = match_tx.clone();
let (child_tx, child_rx) = sync_channel(0);
for stream in config.streams().values() {
let stream = stream.clone();
let match_tx = match_tx.clone();
let (child_tx, child_rx) = sync_channel(0);
stream_thread_handles.push(thread::spawn(move || stream.manager(child_tx, match_tx)));
stream_thread_handles.push(thread::spawn(move || stream.manager(child_tx, match_tx)));
if let Ok(Some(child)) = child_rx.recv() {
stream_process_child_handles.push(child);
}
if let Ok(Some(child)) = child_rx.recv() {
stream_process_child_handles.push(child);
}
}
@ -91,7 +92,11 @@ pub fn daemon(config_path: &PathBuf, loglevel: Level, socket: &PathBuf) {
let _ = thread_handle.join();
}
// TODO wait for actions to complete
match_tx.send(MatchManagerInput::Stop).unwrap();
let _ = matches_manager_thread_handle.join();
exec_tx.send(ExecsManagerInput::Stop).unwrap();
let _ = execs_manager_thread_handle.join();
let stop_ok = config.stop();

View file

@ -24,6 +24,7 @@ pub enum ExecsManagerInput {
ExecPending(MAT),
#[allow(dead_code)]
Flush(MAT),
Stop,
}
type ExecsMap = BTreeMap<ActionName, BTreeMap<Match, BTreeSet<DateTime<Local>>>>;
@ -56,8 +57,8 @@ impl ExecsMapTrait for ExecsMap {
pub fn execs_manager(
config: Arc<Config>,
action_rx: Receiver<ExecsManagerInput>,
action_tx: Sender<ExecsManagerInput>,
exec_rx: Receiver<ExecsManagerInput>,
exec_tx: Sender<ExecsManagerInput>,
) {
// Initialize a ThreadPool only when concurrency hasn't been disabled
let thread_pool = if config.concurrency() > 1 {
@ -76,14 +77,14 @@ pub fn execs_manager(
let mut command = action.exec(&mat.m, patterns);
let mut closure = move || {
info!("{}: run {:?}", &mat.a, command);
info!("{}: run [{:?}]", &mat.a, command);
if let Err(err) = command
.stdin(Stdio::null())
.stderr(Stdio::null())
.stdout(Stdio::piped())
.status()
{
error!("{}: run {:?}, code {}", &mat.a, command, err);
error!("{}: run [{:?}], code {}", &mat.a, command, err);
}
};
@ -98,9 +99,9 @@ pub fn execs_manager(
let mut execs: ExecsMap = BTreeMap::new();
let timer = MessageTimer::new(action_tx);
let timer = MessageTimer::new(exec_tx);
while let Ok(mat) = action_rx.recv() {
while let Ok(mat) = exec_rx.recv() {
match mat {
ExecsManagerInput::Exec(mat) => {
let now = Local::now();
@ -119,6 +120,7 @@ pub fn execs_manager(
}
#[allow(clippy::todo)]
ExecsManagerInput::Flush(_mat) => todo!(),
ExecsManagerInput::Stop => break,
}
}

View file

@ -189,7 +189,7 @@ impl Filter {
tx.send(ExecsManagerInput::Exec(MAT {
m: m.clone(),
a: action.name(),
t,
t: t + action.after_duration().unwrap_or_default(),
}))
.unwrap();
}

View file

@ -21,6 +21,7 @@ pub enum MatchManagerInput {
Unmatch(MFT),
#[allow(dead_code)]
Flush(MFT),
Stop,
}
type MatchesMap = BTreeMap<FilterName, BTreeMap<Match, BTreeSet<Time>>>;
@ -96,6 +97,7 @@ pub fn matches_manager(
MatchManagerInput::Unmatch(mft) => matches.rm(&mft),
#[allow(clippy::todo)]
MatchManagerInput::Flush(_) => todo!(), // TODO handle flushes
MatchManagerInput::Stop => break,
}
}
}

View file

@ -53,8 +53,11 @@ struct Worker {
impl Worker {
fn new(receiver: Arc<Mutex<Receiver<Job>>>) -> Worker {
let thread = spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
job();
let received = receiver.lock().unwrap().recv();
match received {
Ok(job) => job(),
Err(_) => break,
}
});
Worker { thread }

View file

@ -33,11 +33,11 @@
retryperiod: '30s',
actions: {
damn: {
cmd: ['echo', '<num>'],
cmd: ['notify-send', 'first stream', 'ban <num>'],
},
undamn: {
cmd: ['echo', 'undamn', '<num>'],
after: '28s',
cmd: ['notify-send', 'first stream', 'unban <num>'],
after: '6s',
onexit: true,
},
},
@ -45,7 +45,7 @@
},
},
tailDown2: {
cmd: ['sh', '-c', "echo 1_abc 2_abc 3_abc abc_1 abc_2 abc_3 | tr ' ' '\n' | while read i; do sleep 1; echo found $i; done; sleep 3"],
cmd: ['sh', '-c', "echo 1_abc 2_abc 3_abc abc_1 abc_2 abc_3 | tr ' ' '\n' | while read i; do sleep 3; echo found $i; done; sleep 3"],
filters: {
findIP: {
regex: [
@ -53,14 +53,14 @@
'^found <letter>_<num>$',
],
retry: 2,
retryperiod: '30s',
retryperiod: '2s',
actions: {
damn: {
cmd: ['echo', '<num>'],
cmd: ['notify-send', 'second stream', 'ban <num>'],
},
undamn: {
cmd: ['echo', 'undamn', '<num>'],
after: '28s',
cmd: ['notify-send', 'second stream', 'unban <num>'],
after: '4s',
onexit: true,
},
},