From 607141f22f338790de6eb3adf5e96d790dc645ff Mon Sep 17 00:00:00 2001 From: ppom Date: Wed, 6 Aug 2025 12:00:00 +0200 Subject: [PATCH] Fix after action commands not being correctly awaited We were scheduling the action with exec_now, but it spawns a new task itself, which did not have the ShutdownToken. Persistance part of the start_stop test doesn't work because when the after actions are executed, they decrement the trigger, which is then removed from DB. So they should not decrement it anymore, just check that it's still there. Next commit! --- src/daemon/filter/mod.rs | 20 +++++++++++--- tests/start_stop.jsonnet | 10 +++++-- tests/start_stop.rs | 60 +++++++++++++++++++++------------------- 3 files changed, 55 insertions(+), 35 deletions(-) diff --git a/src/daemon/filter/mod.rs b/src/daemon/filter/mod.rs index 64614a1..790c637 100644 --- a/src/daemon/filter/mod.rs +++ b/src/daemon/filter/mod.rs @@ -222,7 +222,12 @@ impl FilterManager { // Execute the action early if let Order::Flush = order { - exec_now(&self.exec_limit, action, m.clone()); + exec_now( + &self.exec_limit, + self.shutdown.clone(), + action, + m.clone(), + ); } } } @@ -264,7 +269,7 @@ impl FilterManager { if exec_time <= now { if state.decrement_trigger(&m, t) { - exec_now(&self.exec_limit, action, m); + exec_now(&self.exec_limit, self.shutdown.clone(), action, m); } } else { let this = self.clone(); @@ -285,7 +290,7 @@ impl FilterManager { #[allow(clippy::unwrap_used)] // propagating panics is ok let mut state = this.state.lock().unwrap(); if state.decrement_trigger(&m, t) { - exec_now(&this.exec_limit, action, m); + exec_now(&this.exec_limit, this.shutdown, action, m); } } }); @@ -344,9 +349,16 @@ impl FilterManager { } } -fn exec_now(exec_limit: &Option>, action: &'static Action, m: Match) { +fn exec_now( + exec_limit: &Option>, + shutdown: ShutdownToken, + action: &'static Action, + m: Match, +) { let exec_limit = exec_limit.clone(); tokio::spawn(async move { + // Move ShutdownToken in task + let _shutdown = shutdown; // Wait for semaphore's permission, if it is Some let _permit = match exec_limit { #[allow(clippy::unwrap_used)] // We know the semaphore is not closed diff --git a/tests/start_stop.jsonnet b/tests/start_stop.jsonnet index a840aaa..a85636c 100644 --- a/tests/start_stop.jsonnet +++ b/tests/start_stop.jsonnet @@ -1,4 +1,8 @@ -local echo(message) = ['sh', '-c', 'echo %s >> ./log' % message]; +local echo(message, before='true') = [ + 'sh', + '-c', + before + '; echo ' + message + ' >> ./log', +]; { patterns: { num: { @@ -29,8 +33,8 @@ local echo(message) = ['sh', '-c', 'echo %s >> ./log' % message]; cmd: echo('runtime '), }, two: { - cmd: echo('after '), - after: '1s', + cmd: echo('after', before='sleep 1'), + after: '5s', onexit: true, }, }, diff --git a/tests/start_stop.rs b/tests/start_stop.rs index 1904cb6..15d1ee3 100644 --- a/tests/start_stop.rs +++ b/tests/start_stop.rs @@ -5,7 +5,7 @@ use assert_fs::{prelude::*, TempDir}; use predicates::prelude::predicate; #[test] -#[ignore = "currently failing"] // FIXME +// #[ignore = "currently failing"] // FIXME fn start_stop() { let tmp_dir = assert_fs::TempDir::new().unwrap(); @@ -17,8 +17,9 @@ fn start_stop() { "start 2", "runtime 1", "runtime 2", - "after 1", - "after 2", + // no order required because they'll be awaken all together on exit + "after", + "after", "stop 1", "stop 2", "", @@ -26,26 +27,27 @@ fn start_stop() { tmp_dir.child("log").assert(&output.join("\n")); tmp_dir.child("log").write_str("").unwrap(); - // Second run - run_reaction(&tmp_dir); + // // Second run + // run_reaction(&tmp_dir); - // Expected output - let output = [ - "start 1", - "start 2", - "runtime 1", - "runtime 2", - "runtime 1", - "runtime 2", - "after 1", - "after 2", - "after 1", - "after 2", - "stop 1", - "stop 2", - "", - ]; - tmp_dir.child("log").assert(&output.join("\n")); + // // Expected output + // let output = [ + // "start 1", + // "start 2", + // "runtime 1", + // "runtime 2", + // "runtime 1", + // "runtime 2", + // // no order required because they'll be awaken all together on exit + // "after", + // "after", + // "after", + // "after", + // "stop 1", + // "stop 2", + // "", + // ]; + // tmp_dir.child("log").assert(&output.join("\n")); } fn run_reaction(tmp_dir: &TempDir) { @@ -54,10 +56,12 @@ fn run_reaction(tmp_dir: &TempDir) { .write_file(Path::new("tests/start_stop.jsonnet")) .unwrap(); - let mut cmd = Command::cargo_bin("reaction").unwrap(); - cmd.args(["start", "--socket", "./s", "--config", "./config.jsonnet"]); - cmd.current_dir(tmp_dir.path()); - cmd.timeout(Duration::from_secs(5)); - // Expected exit 1: all stream exited - cmd.assert().code(predicate::eq(1)); + Command::cargo_bin("reaction") + .unwrap() + .args(["start", "--socket", "./s", "--config", "./config.jsonnet"]) + .current_dir(tmp_dir.path()) + .timeout(Duration::from_secs(5)) + // Expected exit 1: all stream exited + .assert() + .code(predicate::eq(1)); }