From ea0452f62cd730d03565c4f8d753789c2d206aca Mon Sep 17 00:00:00 2001 From: ppom Date: Mon, 4 Aug 2025 12:00:00 +0200 Subject: [PATCH] Fix components starting order Now Database and Socket components are created before start commands are executed. So in case of error, start commands are not executed. Also socket syscalls are now async instead of blocking, for better integration with the async runtime. New start order: - DB - Socket - Start commands - Streams --- src/daemon/mod.rs | 78 ++++++++++++++++++------------------- src/daemon/socket.rs | 93 ++++++++++++++++++++++---------------------- 2 files changed, 84 insertions(+), 87 deletions(-) diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index ec1ee47..78abb41 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -33,42 +33,51 @@ pub async fn daemon( ) -> Result<(), Box> { let config: &'static Config = Box::leak(Box::new(Config::from_path(&config_path)?)); - if !config.start() { - return Err("a start command failed, exiting.".into()); - } - // Cancellation Token let shutdown = ShutdownController::new(); - // Semaphore limiting action execution concurrency - let exec_limit = match config.concurrency() { - 0 => None, - n => Some(Arc::new(Semaphore::new(n))), - }; - // Open Database let mut db = Database::open(config).await?; - // Filter managers - let now = Local::now(); - let mut state = HashMap::new(); - let mut stream_managers = Vec::new(); - for stream in config.streams().values() { - let mut filter_managers = HashMap::new(); - for filter in stream.filters().values() { - let manager = - FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &mut db, now)?; - filter_managers.insert(filter, manager); - } - state.insert(stream, filter_managers.clone()); + let (state, stream_managers) = { + // Semaphore limiting action execution concurrency + let exec_limit = match config.concurrency() { + 0 => None, + n => Some(Arc::new(Semaphore::new(n))), + }; - stream_managers.push(StreamManager::new( - stream, - filter_managers, - shutdown.token(), - )?); + // Filter managers + let now = Local::now(); + let mut state = HashMap::new(); + let mut stream_managers = Vec::new(); + for stream in config.streams().values() { + let mut filter_managers = HashMap::new(); + for filter in stream.filters().values() { + let manager = + FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &mut db, now)?; + filter_managers.insert(filter, manager); + } + state.insert(stream, filter_managers.clone()); + + stream_managers.push(StreamManager::new( + stream, + filter_managers, + shutdown.token(), + )?); + } + (state, stream_managers) + }; + + // Run database task + let mut db_status_rx = db.manager(shutdown.token()); + + // Run socket task + socket_manager(config, socket, state, shutdown.token()).await?; + + // reaction won't abort on startup anymore, we can run start commands + if !config.start() { + return Err("a start command failed, exiting.".into()); } - drop(exec_limit); // Start Stream managers let mut stream_task_handles = Vec::new(); @@ -76,23 +85,10 @@ pub async fn daemon( stream_task_handles.push(tokio::spawn(async move { stream_manager.start().await })); } - // Run database task - let mut db_status_rx = { - let token = shutdown.token(); - db.manager(token) - }; - // Close streams when we receive a quit signal let signal_received = Arc::new(AtomicBool::new(false)); handle_signals(shutdown.delegate(), signal_received.clone())?; - // Run socket task - { - let socket = socket.to_owned(); - let token = shutdown.token(); - tokio::spawn(async move { socket_manager(config, socket, state, token).await }); - } - // Wait for all streams to quit for task_handle in stream_task_handles { let _ = task_handle.await; diff --git a/src/daemon/socket.rs b/src/daemon/socket.rs index 388d954..7c3d0c9 100644 --- a/src/daemon/socket.rs +++ b/src/daemon/socket.rs @@ -1,15 +1,13 @@ use std::{ collections::{BTreeMap, HashMap}, - fs, io, path::PathBuf, - process::exit, sync::Arc, }; use chrono::Local; use futures::{SinkExt, StreamExt}; use regex::Regex; -use tokio::net::UnixListener; +use tokio::{fs, net::UnixListener}; use tokio_util::{ bytes::Bytes, codec::{Framed, LengthDelimitedCodec}, @@ -29,24 +27,24 @@ macro_rules! err_str { }; } -fn open_socket(path: PathBuf) -> Result { +async fn open_socket(path: PathBuf) -> Result { // First create all directories to the file let dir = path .parent() .ok_or(format!("socket {path:?} has no parent directory"))?; - err_str!(fs::create_dir_all(dir))?; + err_str!(fs::create_dir_all(dir).await)?; // Test if file exists - match fs::metadata(&path) { + match fs::metadata(&path).await { Ok(meta) => { if meta.file_type().is_dir() { Err(format!("socket {path:?} is already a directory")) } else { warn!("socket {path:?} already exists: is the daemon already running? deleting."); - err_str!(fs::remove_file(&path)) + err_str!(fs::remove_file(&path).await) } } Err(err) => err_str!(match err.kind() { - io::ErrorKind::NotFound => Ok(()), + std::io::ErrorKind::NotFound => Ok(()), _ => Err(err), }), }?; @@ -234,52 +232,55 @@ pub async fn socket_manager( socket: PathBuf, shared_state: HashMap<&'static Stream, HashMap<&'static Filter, FilterManager>>, shutdown: ShutdownToken, -) { - let listener = match open_socket(socket.clone()) { +) -> Result<(), String> { + let listener = match open_socket(socket.clone()).await { Ok(l) => l, Err(err) => { - error!("while creating communication socket: {err}"); - exit(1); + return Err(format!("while creating communication socket: {err}")); } }; - loop { - tokio::select! { - _ = shutdown.wait() => break, - try_conn = listener.accept() => { - match try_conn { - Ok((conn, _)) => { - let mut transport = Framed::new(conn, LengthDelimitedCodec::new()); - // Decode - let received = transport.next().await; - let encoded_request = match received { - Some(r) => or_next!("while reading request", r), - None => { - error!("failed to answer client: client sent no request"); - continue; - } - }; - let request = or_next!( - "failed to decode request", - serde_json::from_slice(&encoded_request) - ); - // Process - let response = answer_order(config, &shared_state, request); - // Encode - let encoded_response = - or_next!("failed to serialize response", serde_json::to_string::(&response)); - or_next!( - "failed to send response:", - transport.send(Bytes::from(encoded_response)).await - ); + tokio::spawn(async move { + loop { + tokio::select! { + _ = shutdown.wait() => break, + try_conn = listener.accept() => { + match try_conn { + Ok((conn, _)) => { + let mut transport = Framed::new(conn, LengthDelimitedCodec::new()); + // Decode + let received = transport.next().await; + let encoded_request = match received { + Some(r) => or_next!("while reading request", r), + None => { + error!("failed to answer client: client sent no request"); + continue; + } + }; + let request = or_next!( + "failed to decode request", + serde_json::from_slice(&encoded_request) + ); + // Process + let response = answer_order(config, &shared_state, request); + // Encode + let encoded_response = + or_next!("failed to serialize response", serde_json::to_string::(&response)); + or_next!( + "failed to send response:", + transport.send(Bytes::from(encoded_response)).await + ); + } + Err(err) => error!("failed to open connection from cli: {err}"), } - Err(err) => error!("failed to open connection from cli: {err}"), } } } - } - if let Err(err) = fs::remove_file(socket) { - error!("failed to remove socket: {}", err); - } + if let Err(err) = fs::remove_file(socket).await { + error!("failed to remove socket: {}", err); + } + }); + + Ok(()) }