diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index ec1ee47..78abb41 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -33,42 +33,51 @@ pub async fn daemon( ) -> Result<(), Box> { let config: &'static Config = Box::leak(Box::new(Config::from_path(&config_path)?)); - if !config.start() { - return Err("a start command failed, exiting.".into()); - } - // Cancellation Token let shutdown = ShutdownController::new(); - // Semaphore limiting action execution concurrency - let exec_limit = match config.concurrency() { - 0 => None, - n => Some(Arc::new(Semaphore::new(n))), - }; - // Open Database let mut db = Database::open(config).await?; - // Filter managers - let now = Local::now(); - let mut state = HashMap::new(); - let mut stream_managers = Vec::new(); - for stream in config.streams().values() { - let mut filter_managers = HashMap::new(); - for filter in stream.filters().values() { - let manager = - FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &mut db, now)?; - filter_managers.insert(filter, manager); - } - state.insert(stream, filter_managers.clone()); + let (state, stream_managers) = { + // Semaphore limiting action execution concurrency + let exec_limit = match config.concurrency() { + 0 => None, + n => Some(Arc::new(Semaphore::new(n))), + }; - stream_managers.push(StreamManager::new( - stream, - filter_managers, - shutdown.token(), - )?); + // Filter managers + let now = Local::now(); + let mut state = HashMap::new(); + let mut stream_managers = Vec::new(); + for stream in config.streams().values() { + let mut filter_managers = HashMap::new(); + for filter in stream.filters().values() { + let manager = + FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &mut db, now)?; + filter_managers.insert(filter, manager); + } + state.insert(stream, filter_managers.clone()); + + stream_managers.push(StreamManager::new( + stream, + filter_managers, + shutdown.token(), + )?); + } + (state, stream_managers) + }; + + // Run database task + let mut db_status_rx = db.manager(shutdown.token()); + + // Run socket task + socket_manager(config, socket, state, shutdown.token()).await?; + + // reaction won't abort on startup anymore, we can run start commands + if !config.start() { + return Err("a start command failed, exiting.".into()); } - drop(exec_limit); // Start Stream managers let mut stream_task_handles = Vec::new(); @@ -76,23 +85,10 @@ pub async fn daemon( stream_task_handles.push(tokio::spawn(async move { stream_manager.start().await })); } - // Run database task - let mut db_status_rx = { - let token = shutdown.token(); - db.manager(token) - }; - // Close streams when we receive a quit signal let signal_received = Arc::new(AtomicBool::new(false)); handle_signals(shutdown.delegate(), signal_received.clone())?; - // Run socket task - { - let socket = socket.to_owned(); - let token = shutdown.token(); - tokio::spawn(async move { socket_manager(config, socket, state, token).await }); - } - // Wait for all streams to quit for task_handle in stream_task_handles { let _ = task_handle.await; diff --git a/src/daemon/socket.rs b/src/daemon/socket.rs index 388d954..7c3d0c9 100644 --- a/src/daemon/socket.rs +++ b/src/daemon/socket.rs @@ -1,15 +1,13 @@ use std::{ collections::{BTreeMap, HashMap}, - fs, io, path::PathBuf, - process::exit, sync::Arc, }; use chrono::Local; use futures::{SinkExt, StreamExt}; use regex::Regex; -use tokio::net::UnixListener; +use tokio::{fs, net::UnixListener}; use tokio_util::{ bytes::Bytes, codec::{Framed, LengthDelimitedCodec}, @@ -29,24 +27,24 @@ macro_rules! err_str { }; } -fn open_socket(path: PathBuf) -> Result { +async fn open_socket(path: PathBuf) -> Result { // First create all directories to the file let dir = path .parent() .ok_or(format!("socket {path:?} has no parent directory"))?; - err_str!(fs::create_dir_all(dir))?; + err_str!(fs::create_dir_all(dir).await)?; // Test if file exists - match fs::metadata(&path) { + match fs::metadata(&path).await { Ok(meta) => { if meta.file_type().is_dir() { Err(format!("socket {path:?} is already a directory")) } else { warn!("socket {path:?} already exists: is the daemon already running? deleting."); - err_str!(fs::remove_file(&path)) + err_str!(fs::remove_file(&path).await) } } Err(err) => err_str!(match err.kind() { - io::ErrorKind::NotFound => Ok(()), + std::io::ErrorKind::NotFound => Ok(()), _ => Err(err), }), }?; @@ -234,52 +232,55 @@ pub async fn socket_manager( socket: PathBuf, shared_state: HashMap<&'static Stream, HashMap<&'static Filter, FilterManager>>, shutdown: ShutdownToken, -) { - let listener = match open_socket(socket.clone()) { +) -> Result<(), String> { + let listener = match open_socket(socket.clone()).await { Ok(l) => l, Err(err) => { - error!("while creating communication socket: {err}"); - exit(1); + return Err(format!("while creating communication socket: {err}")); } }; - loop { - tokio::select! { - _ = shutdown.wait() => break, - try_conn = listener.accept() => { - match try_conn { - Ok((conn, _)) => { - let mut transport = Framed::new(conn, LengthDelimitedCodec::new()); - // Decode - let received = transport.next().await; - let encoded_request = match received { - Some(r) => or_next!("while reading request", r), - None => { - error!("failed to answer client: client sent no request"); - continue; - } - }; - let request = or_next!( - "failed to decode request", - serde_json::from_slice(&encoded_request) - ); - // Process - let response = answer_order(config, &shared_state, request); - // Encode - let encoded_response = - or_next!("failed to serialize response", serde_json::to_string::(&response)); - or_next!( - "failed to send response:", - transport.send(Bytes::from(encoded_response)).await - ); + tokio::spawn(async move { + loop { + tokio::select! { + _ = shutdown.wait() => break, + try_conn = listener.accept() => { + match try_conn { + Ok((conn, _)) => { + let mut transport = Framed::new(conn, LengthDelimitedCodec::new()); + // Decode + let received = transport.next().await; + let encoded_request = match received { + Some(r) => or_next!("while reading request", r), + None => { + error!("failed to answer client: client sent no request"); + continue; + } + }; + let request = or_next!( + "failed to decode request", + serde_json::from_slice(&encoded_request) + ); + // Process + let response = answer_order(config, &shared_state, request); + // Encode + let encoded_response = + or_next!("failed to serialize response", serde_json::to_string::(&response)); + or_next!( + "failed to send response:", + transport.send(Bytes::from(encoded_response)).await + ); + } + Err(err) => error!("failed to open connection from cli: {err}"), } - Err(err) => error!("failed to open connection from cli: {err}"), } } } - } - if let Err(err) = fs::remove_file(socket) { - error!("failed to remove socket: {}", err); - } + if let Err(err) = fs::remove_file(socket).await { + error!("failed to remove socket: {}", err); + } + }); + + Ok(()) }