Fix after action commands not being correctly awaited

We were scheduling the action with exec_now, but it spawns a new task
itself, which did not have the ShutdownToken.

Persistance part of the start_stop test doesn't work
because when the after actions are executed, they decrement the trigger,
which is then removed from DB.
So they should not decrement it anymore, just check that it's still
there. Next commit!
This commit is contained in:
ppom 2025-08-06 12:00:00 +02:00
commit 607141f22f
No known key found for this signature in database
3 changed files with 55 additions and 35 deletions

View file

@ -222,7 +222,12 @@ impl FilterManager {
// Execute the action early
if let Order::Flush = order {
exec_now(&self.exec_limit, action, m.clone());
exec_now(
&self.exec_limit,
self.shutdown.clone(),
action,
m.clone(),
);
}
}
}
@ -264,7 +269,7 @@ impl FilterManager {
if exec_time <= now {
if state.decrement_trigger(&m, t) {
exec_now(&self.exec_limit, action, m);
exec_now(&self.exec_limit, self.shutdown.clone(), action, m);
}
} else {
let this = self.clone();
@ -285,7 +290,7 @@ impl FilterManager {
#[allow(clippy::unwrap_used)] // propagating panics is ok
let mut state = this.state.lock().unwrap();
if state.decrement_trigger(&m, t) {
exec_now(&this.exec_limit, action, m);
exec_now(&this.exec_limit, this.shutdown, action, m);
}
}
});
@ -344,9 +349,16 @@ impl FilterManager {
}
}
fn exec_now(exec_limit: &Option<Arc<Semaphore>>, action: &'static Action, m: Match) {
fn exec_now(
exec_limit: &Option<Arc<Semaphore>>,
shutdown: ShutdownToken,
action: &'static Action,
m: Match,
) {
let exec_limit = exec_limit.clone();
tokio::spawn(async move {
// Move ShutdownToken in task
let _shutdown = shutdown;
// Wait for semaphore's permission, if it is Some
let _permit = match exec_limit {
#[allow(clippy::unwrap_used)] // We know the semaphore is not closed

View file

@ -1,4 +1,8 @@
local echo(message) = ['sh', '-c', 'echo %s >> ./log' % message];
local echo(message, before='true') = [
'sh',
'-c',
before + '; echo ' + message + ' >> ./log',
];
{
patterns: {
num: {
@ -29,8 +33,8 @@ local echo(message) = ['sh', '-c', 'echo %s >> ./log' % message];
cmd: echo('runtime <num>'),
},
two: {
cmd: echo('after <num>'),
after: '1s',
cmd: echo('after', before='sleep 1'),
after: '5s',
onexit: true,
},
},

View file

@ -5,7 +5,7 @@ use assert_fs::{prelude::*, TempDir};
use predicates::prelude::predicate;
#[test]
#[ignore = "currently failing"] // FIXME
// #[ignore = "currently failing"] // FIXME
fn start_stop() {
let tmp_dir = assert_fs::TempDir::new().unwrap();
@ -17,8 +17,9 @@ fn start_stop() {
"start 2",
"runtime 1",
"runtime 2",
"after 1",
"after 2",
// no order required because they'll be awaken all together on exit
"after",
"after",
"stop 1",
"stop 2",
"",
@ -26,26 +27,27 @@ fn start_stop() {
tmp_dir.child("log").assert(&output.join("\n"));
tmp_dir.child("log").write_str("").unwrap();
// Second run
run_reaction(&tmp_dir);
// // Second run
// run_reaction(&tmp_dir);
// Expected output
let output = [
"start 1",
"start 2",
"runtime 1",
"runtime 2",
"runtime 1",
"runtime 2",
"after 1",
"after 2",
"after 1",
"after 2",
"stop 1",
"stop 2",
"",
];
tmp_dir.child("log").assert(&output.join("\n"));
// // Expected output
// let output = [
// "start 1",
// "start 2",
// "runtime 1",
// "runtime 2",
// "runtime 1",
// "runtime 2",
// // no order required because they'll be awaken all together on exit
// "after",
// "after",
// "after",
// "after",
// "stop 1",
// "stop 2",
// "",
// ];
// tmp_dir.child("log").assert(&output.join("\n"));
}
fn run_reaction(tmp_dir: &TempDir) {
@ -54,10 +56,12 @@ fn run_reaction(tmp_dir: &TempDir) {
.write_file(Path::new("tests/start_stop.jsonnet"))
.unwrap();
let mut cmd = Command::cargo_bin("reaction").unwrap();
cmd.args(["start", "--socket", "./s", "--config", "./config.jsonnet"]);
cmd.current_dir(tmp_dir.path());
cmd.timeout(Duration::from_secs(5));
// Expected exit 1: all stream exited
cmd.assert().code(predicate::eq(1));
Command::cargo_bin("reaction")
.unwrap()
.args(["start", "--socket", "./s", "--config", "./config.jsonnet"])
.current_dir(tmp_dir.path())
.timeout(Duration::from_secs(5))
// Expected exit 1: all stream exited
.assert()
.code(predicate::eq(1));
}