Fix filter commands executing before start commands

Now creating the socket file before starting its manager.
So I can launch start commands after its creation, and before creating
the filter managers.
This commit is contained in:
ppom 2025-08-08 12:00:00 +02:00
commit ca89c7f72a
No known key found for this signature in database
4 changed files with 110 additions and 81 deletions

4
TODO
View file

@ -5,5 +5,5 @@ stream: test regex ending with $
should an ipv6-mapped ipv4 match a pattern of type ipv6?
should it be normalized as ipv4 then?
fix filter commands executing before start commands
fix order of db write subject to race condition (make writes async?)
fix order of db writes subject to race condition (make writes async?)
DB: add tests on stress testing (lines should always be in order)

View file

@ -20,7 +20,7 @@ use crate::{concepts::Config, treedb::Database};
use filter::FilterManager;
pub use filter::React;
pub use shutdown::{ShutdownController, ShutdownDelegate, ShutdownToken};
use socket::socket_manager;
use socket::Socket;
use stream::StreamManager;
#[cfg(test)]
@ -43,6 +43,14 @@ pub async fn daemon(
// Open Database
let mut db = Database::open(config).await?;
// Open Socket
let socket = Socket::open(socket).await?;
// reaction won't abort on startup anymore, we can run start commands
if !config.start() {
return Err("a start command failed, exiting.".into());
}
let (state, stream_managers) = {
// Semaphore limiting action execution concurrency
let exec_limit = match config.concurrency {
@ -76,12 +84,7 @@ pub async fn daemon(
let mut db_status_rx = db.manager(shutdown.token());
// Run socket task
socket_manager(config, socket, state, shutdown.token()).await?;
// reaction won't abort on startup anymore, we can run start commands
if !config.start() {
return Err("a start command failed, exiting.".into());
}
socket.manager(config, state, shutdown.token());
// Start Stream managers
let mut stream_task_handles = Vec::new();

View file

@ -21,13 +21,12 @@ use crate::{
use super::{filter::FilterManager, shutdown::ShutdownToken};
macro_rules! err_str {
($expression:expr) => {
$expression.map_err(|err| err.to_string())
};
}
async fn open_socket(path: PathBuf) -> Result<UnixListener, String> {
macro_rules! err_str {
($expression:expr) => {
$expression.map_err(|err| err.to_string())
};
}
// First create all directories to the file
let dir = path
.parent()
@ -222,60 +221,67 @@ macro_rules! or_next {
};
}
pub async fn socket_manager(
config: &'static Config,
socket: PathBuf,
shared_state: HashMap<&'static Stream, HashMap<&'static Filter, FilterManager>>,
shutdown: ShutdownToken,
) -> Result<(), String> {
let listener = match open_socket(socket.clone()).await {
Ok(l) => l,
Err(err) => {
return Err(format!("while creating communication socket: {err}"));
}
};
pub struct Socket {
path: PathBuf,
socket: UnixListener,
}
tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown.wait() => break,
try_conn = listener.accept() => {
match try_conn {
Ok((conn, _)) => {
let mut transport = Framed::new(conn, LengthDelimitedCodec::new());
// Decode
let received = transport.next().await;
let encoded_request = match received {
Some(r) => or_next!("while reading request", r),
None => {
error!("failed to answer client: client sent no request");
continue;
}
};
let request = or_next!(
"failed to decode request",
serde_json::from_slice(&encoded_request)
);
// Process
let response = answer_order(config, &shared_state, request);
// Encode
let encoded_response =
or_next!("failed to serialize response", serde_json::to_string::<DaemonResponse>(&response));
or_next!(
"failed to send response:",
transport.send(Bytes::from(encoded_response)).await
);
impl Socket {
pub async fn open(socket: PathBuf) -> Result<Self, String> {
Ok(Socket {
socket: open_socket(socket.clone())
.await
.map_err(|err| format!("while creating communication socket: {err}"))?,
path: socket,
})
}
pub fn manager(
self,
config: &'static Config,
shared_state: HashMap<&'static Stream, HashMap<&'static Filter, FilterManager>>,
shutdown: ShutdownToken,
) {
tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown.wait() => break,
try_conn = self.socket.accept() => {
match try_conn {
Ok((conn, _)) => {
let mut transport = Framed::new(conn, LengthDelimitedCodec::new());
// Decode
let received = transport.next().await;
let encoded_request = match received {
Some(r) => or_next!("while reading request", r),
None => {
error!("failed to answer client: client sent no request");
continue;
}
};
let request = or_next!(
"failed to decode request",
serde_json::from_slice(&encoded_request)
);
// Process
let response = answer_order(config, &shared_state, request);
// Encode
let encoded_response =
or_next!("failed to serialize response", serde_json::to_string::<DaemonResponse>(&response));
or_next!(
"failed to send response:",
transport.send(Bytes::from(encoded_response)).await
);
}
Err(err) => error!("failed to open connection from cli: {err}"),
}
Err(err) => error!("failed to open connection from cli: {err}"),
}
}
}
}
if let Err(err) = fs::remove_file(socket).await {
error!("failed to remove socket: {}", err);
}
});
Ok(())
if let Err(err) = fs::remove_file(self.path).await {
error!("failed to remove socket: {}", err);
}
});
}
}

View file

@ -35,23 +35,43 @@ fn start_stop() {
run_reaction(&tmp_dir);
// Expected output
let output = [
"start 1",
"start 2",
"runtime 1",
"runtime 2",
"runtime 1",
"runtime 2",
// no order required because they'll be awaken all together on exit
"after",
"after",
"after",
"after",
"stop 1",
"stop 2",
"",
// (one of them)
let outputs = [
[
"start 1",
"start 2",
"runtime 1",
"runtime 2",
"runtime 1",
"runtime 2",
// no order required because they'll be awaken all together on exit
"after",
"after",
"after",
"after",
"stop 1",
"stop 2",
"",
],
[
"start 1",
"start 2",
"runtime 2",
"runtime 1",
"runtime 1",
"runtime 2",
// no order required because they'll be awaken all together on exit
"after",
"after",
"after",
"after",
"stop 1",
"stop 2",
"",
],
];
tmp_dir.child("log").assert(&output.join("\n"));
let contents = std::fs::read_to_string(tmp_dir.child("log")).unwrap();
assert!(contents == outputs[0].join("\n") || contents == outputs[1].join("\n"));
}
fn run_reaction(tmp_dir: &TempDir) {