Fix components starting order

Now Database and Socket components are created before start commands are
executed. So in case of error, start commands are not executed.

Also socket syscalls are now async instead of blocking, for better
integration with the async runtime.

New start order:
- DB
- Socket
- Start commands
- Streams
This commit is contained in:
ppom 2025-08-04 12:00:00 +02:00
commit ea0452f62c
No known key found for this signature in database
2 changed files with 83 additions and 86 deletions

View file

@ -33,42 +33,51 @@ pub async fn daemon(
) -> Result<(), Box<dyn Error + Send + Sync>> {
let config: &'static Config = Box::leak(Box::new(Config::from_path(&config_path)?));
if !config.start() {
return Err("a start command failed, exiting.".into());
}
// Cancellation Token
let shutdown = ShutdownController::new();
// Semaphore limiting action execution concurrency
let exec_limit = match config.concurrency() {
0 => None,
n => Some(Arc::new(Semaphore::new(n))),
};
// Open Database
let mut db = Database::open(config).await?;
// Filter managers
let now = Local::now();
let mut state = HashMap::new();
let mut stream_managers = Vec::new();
for stream in config.streams().values() {
let mut filter_managers = HashMap::new();
for filter in stream.filters().values() {
let manager =
FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &mut db, now)?;
filter_managers.insert(filter, manager);
}
state.insert(stream, filter_managers.clone());
let (state, stream_managers) = {
// Semaphore limiting action execution concurrency
let exec_limit = match config.concurrency() {
0 => None,
n => Some(Arc::new(Semaphore::new(n))),
};
stream_managers.push(StreamManager::new(
stream,
filter_managers,
shutdown.token(),
)?);
// Filter managers
let now = Local::now();
let mut state = HashMap::new();
let mut stream_managers = Vec::new();
for stream in config.streams().values() {
let mut filter_managers = HashMap::new();
for filter in stream.filters().values() {
let manager =
FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &mut db, now)?;
filter_managers.insert(filter, manager);
}
state.insert(stream, filter_managers.clone());
stream_managers.push(StreamManager::new(
stream,
filter_managers,
shutdown.token(),
)?);
}
(state, stream_managers)
};
// Run database task
let mut db_status_rx = db.manager(shutdown.token());
// Run socket task
socket_manager(config, socket, state, shutdown.token()).await?;
// reaction won't abort on startup anymore, we can run start commands
if !config.start() {
return Err("a start command failed, exiting.".into());
}
drop(exec_limit);
// Start Stream managers
let mut stream_task_handles = Vec::new();
@ -76,23 +85,10 @@ pub async fn daemon(
stream_task_handles.push(tokio::spawn(async move { stream_manager.start().await }));
}
// Run database task
let mut db_status_rx = {
let token = shutdown.token();
db.manager(token)
};
// Close streams when we receive a quit signal
let signal_received = Arc::new(AtomicBool::new(false));
handle_signals(shutdown.delegate(), signal_received.clone())?;
// Run socket task
{
let socket = socket.to_owned();
let token = shutdown.token();
tokio::spawn(async move { socket_manager(config, socket, state, token).await });
}
// Wait for all streams to quit
for task_handle in stream_task_handles {
let _ = task_handle.await;

View file

@ -1,15 +1,13 @@
use std::{
collections::{BTreeMap, HashMap},
fs, io,
path::PathBuf,
process::exit,
sync::Arc,
};
use chrono::Local;
use futures::{SinkExt, StreamExt};
use regex::Regex;
use tokio::net::UnixListener;
use tokio::{fs, net::UnixListener};
use tokio_util::{
bytes::Bytes,
codec::{Framed, LengthDelimitedCodec},
@ -29,24 +27,24 @@ macro_rules! err_str {
};
}
fn open_socket(path: PathBuf) -> Result<UnixListener, String> {
async fn open_socket(path: PathBuf) -> Result<UnixListener, String> {
// First create all directories to the file
let dir = path
.parent()
.ok_or(format!("socket {path:?} has no parent directory"))?;
err_str!(fs::create_dir_all(dir))?;
err_str!(fs::create_dir_all(dir).await)?;
// Test if file exists
match fs::metadata(&path) {
match fs::metadata(&path).await {
Ok(meta) => {
if meta.file_type().is_dir() {
Err(format!("socket {path:?} is already a directory"))
} else {
warn!("socket {path:?} already exists: is the daemon already running? deleting.");
err_str!(fs::remove_file(&path))
err_str!(fs::remove_file(&path).await)
}
}
Err(err) => err_str!(match err.kind() {
io::ErrorKind::NotFound => Ok(()),
std::io::ErrorKind::NotFound => Ok(()),
_ => Err(err),
}),
}?;
@ -234,52 +232,55 @@ pub async fn socket_manager(
socket: PathBuf,
shared_state: HashMap<&'static Stream, HashMap<&'static Filter, FilterManager>>,
shutdown: ShutdownToken,
) {
let listener = match open_socket(socket.clone()) {
) -> Result<(), String> {
let listener = match open_socket(socket.clone()).await {
Ok(l) => l,
Err(err) => {
error!("while creating communication socket: {err}");
exit(1);
return Err(format!("while creating communication socket: {err}"));
}
};
loop {
tokio::select! {
_ = shutdown.wait() => break,
try_conn = listener.accept() => {
match try_conn {
Ok((conn, _)) => {
let mut transport = Framed::new(conn, LengthDelimitedCodec::new());
// Decode
let received = transport.next().await;
let encoded_request = match received {
Some(r) => or_next!("while reading request", r),
None => {
error!("failed to answer client: client sent no request");
continue;
}
};
let request = or_next!(
"failed to decode request",
serde_json::from_slice(&encoded_request)
);
// Process
let response = answer_order(config, &shared_state, request);
// Encode
let encoded_response =
or_next!("failed to serialize response", serde_json::to_string::<DaemonResponse>(&response));
or_next!(
"failed to send response:",
transport.send(Bytes::from(encoded_response)).await
);
tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown.wait() => break,
try_conn = listener.accept() => {
match try_conn {
Ok((conn, _)) => {
let mut transport = Framed::new(conn, LengthDelimitedCodec::new());
// Decode
let received = transport.next().await;
let encoded_request = match received {
Some(r) => or_next!("while reading request", r),
None => {
error!("failed to answer client: client sent no request");
continue;
}
};
let request = or_next!(
"failed to decode request",
serde_json::from_slice(&encoded_request)
);
// Process
let response = answer_order(config, &shared_state, request);
// Encode
let encoded_response =
or_next!("failed to serialize response", serde_json::to_string::<DaemonResponse>(&response));
or_next!(
"failed to send response:",
transport.send(Bytes::from(encoded_response)).await
);
}
Err(err) => error!("failed to open connection from cli: {err}"),
}
Err(err) => error!("failed to open connection from cli: {err}"),
}
}
}
}
if let Err(err) = fs::remove_file(socket) {
error!("failed to remove socket: {}", err);
}
if let Err(err) = fs::remove_file(socket).await {
error!("failed to remove socket: {}", err);
}
});
Ok(())
}