Add oneshot option for actions

Fixes #92
This commit is contained in:
ppom 2025-06-10 12:00:00 +02:00
commit 5bccdb5ba7
No known key found for this signature in database
5 changed files with 70 additions and 11 deletions

View file

@ -114,6 +114,12 @@ local banFor(time) = {
// here it is not useful because we will flush and delete the chain containing the bans anyway
// (with the stop commands)
},
mail: {
cmd: ['sendmail', '...', '<ip>'],
// some commands, such as alerting commands, are "oneshot".
// this means they'll be run only once, and won't be executed again when reaction is restarted
oneshot: true,
},
},
// or use the banFor function defined at the beginning!
// actions: banFor('48h'),

View file

@ -96,6 +96,11 @@ streams:
# (defaults to false)
# here it is not useful because we will flush and delete the chain containing the bans anyway
# (with the stop commands)
mail:
cmd: ['sendmail', '...', '<ip>']
# some commands, such as alerting commands, are "oneshot".
# this means they'll be run only once, and won't be executed again when reaction is restarted
oneshot: true
# persistence
# tldr; when an `after` action is set in a filter, such filter acts as a 'jail',

View file

@ -8,7 +8,7 @@ use tokio::process::Command;
use super::parse_duration::*;
use super::{Match, Pattern};
#[derive(Clone, Debug, Default, Deserialize , Serialize)]
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct Action {
cmd: Vec<String>,
@ -25,6 +25,8 @@ pub struct Action {
skip_serializing_if = "is_false"
)]
on_exit: bool,
#[serde(default = "set_false", skip_serializing_if = "is_false")]
oneshot: bool,
#[serde(skip)]
patterns: Arc<BTreeSet<Arc<Pattern>>>,
@ -57,6 +59,10 @@ impl Action {
self.on_exit
}
pub fn oneshot(&self) -> bool {
self.oneshot
}
pub fn setup(
&mut self,
stream_name: &str,
@ -206,6 +212,7 @@ pub mod tests {
after: None,
after_duration: None,
on_exit: false,
oneshot: false,
patterns: Arc::new(BTreeSet::default()),
}
}

View file

@ -100,7 +100,7 @@ impl FilterManager {
if exec {
state.remove_match(&m);
state.add_trigger(m.clone(), now);
self.schedule_exec(m, now, now, &mut state);
self.schedule_exec(m, now, now, &mut state, false);
}
exec
@ -199,11 +199,24 @@ impl FilterManager {
cs.into_iter().map(|(k, v)| (k.join(" "), v)).collect()
}
/// Schedule execution for a given Action and Match.
/// Schedule execution for a given Match.
/// We check first if the trigger is still here
/// because pending actions can be flushed.
fn schedule_exec(&self, m: Match, t: Time, now: Time, state: &mut MutexGuard<State>) {
for action in self.filter.actions().values() {
fn schedule_exec(
&self,
m: Match,
t: Time,
now: Time,
state: &mut MutexGuard<State>,
startup: bool,
) {
for action in self
.filter
.actions()
.values()
// On startup, skip oneshot actions
.filter(|action| !startup || !action.oneshot())
{
let exec_time = t + action.after_duration().unwrap_or_default();
let m = m.clone();
@ -240,7 +253,13 @@ impl FilterManager {
fn clear_past_triggers_and_schedule_future_actions(&self, now: Time) {
let longuest_action_duration = self.filter.longuest_action_duration();
let number_of_actions = self.filter.actions().len();
let number_of_actions = self
.filter
.actions()
.values()
// On startup, skip oneshot actions
.filter(|action| !action.oneshot())
.count();
#[allow(clippy::unwrap_used)] // propagating panics is ok
let mut state = self.state.lock().unwrap();
@ -256,7 +275,7 @@ impl FilterManager {
// Insert back the upcoming times
state.triggers.insert(mt.clone(), number_of_actions as u64);
// Schedule the upcoming times
self.schedule_exec(mt.m, mt.t, now, &mut state);
self.schedule_exec(mt.m, mt.t, now, &mut state, true);
} else {
state.triggers.remove(&mt);
}

View file

@ -49,6 +49,10 @@ fn config_with_cmd(config_path: &str, cmd: &str) {
after: '30s',
onexit: false,
},
action_oneshot: {
cmd: ['sh', '-c', 'echo oneshot <num> >> ./oneshot.txt'],
oneshot: true,
},
}
}
}
@ -72,6 +76,7 @@ async fn simple() {
let config_path = "config.jsonnet";
let out_path = "./out.txt";
let oneshot_path = "./oneshot.txt";
let socket_path = "./reaction.sock";
config_with_cmd(
@ -80,6 +85,7 @@ async fn simple() {
);
file_with_contents(out_path, "");
file_with_contents(oneshot_path, "");
// Set the logger before running any code from the crate
tracing_subscriber::fmt::fmt()
@ -134,6 +140,12 @@ async fn simple() {
"24\n36\n12\ndel 24".to_owned().trim()
);
// oneshot actions are also executed
assert_eq!(
get_file_content(oneshot_path).trim(),
"oneshot 24\noneshot 36\noneshot 12".to_owned().trim()
);
// Second part of the test
// We test that persistence worked as intended
// Both for matches and for flushes
@ -144,6 +156,7 @@ async fn simple() {
);
file_with_contents(out_path, "");
file_with_contents(oneshot_path, "");
let daemon_exit = daemon(config_path.into(), socket_path.into()).await;
assert!(daemon_exit.is_err());
@ -152,10 +165,10 @@ async fn simple() {
"quitting because all streams finished"
);
// 36 from DB
// 12 from DB
// 12 from DB + new match
// 67 from DB + new match
// 36 trigger from DB
// 12 trigger from DB
// 12 match from DB + new match
// 67 match from DB + new match
let content = get_file_content(out_path).trim().to_owned();
let scenario1 = "36\n12\n12\n67".trim().to_owned();
let scenario2 = "12\n36\n12\n67".trim().to_owned();
@ -167,6 +180,15 @@ async fn simple() {
scenario2
);
// triggers from the DB aren't executed for oneshot actions
// only for new triggers
// 12 match from DB + new match
// 67 match from DB + new match
assert_eq!(
get_file_content(oneshot_path).trim(),
"oneshot 12\noneshot 67".to_owned().trim()
);
// Third part of the test
// Check we can capture both stdout and stderr from spawned processes