diff --git a/TODO b/TODO index 5f13e77..663070f 100644 --- a/TODO +++ b/TODO @@ -5,5 +5,5 @@ stream: test regex ending with $ should an ipv6-mapped ipv4 match a pattern of type ipv6? should it be normalized as ipv4 then? -fix filter commands executing before start commands -fix order of db write subject to race condition (make writes async?) +fix order of db writes subject to race condition (make writes async?) +DB: add tests on stress testing (lines should always be in order) diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 415394a..c0b848f 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -20,7 +20,7 @@ use crate::{concepts::Config, treedb::Database}; use filter::FilterManager; pub use filter::React; pub use shutdown::{ShutdownController, ShutdownDelegate, ShutdownToken}; -use socket::socket_manager; +use socket::Socket; use stream::StreamManager; #[cfg(test)] @@ -43,6 +43,14 @@ pub async fn daemon( // Open Database let mut db = Database::open(config).await?; + // Open Socket + let socket = Socket::open(socket).await?; + + // reaction won't abort on startup anymore, we can run start commands + if !config.start() { + return Err("a start command failed, exiting.".into()); + } + let (state, stream_managers) = { // Semaphore limiting action execution concurrency let exec_limit = match config.concurrency { @@ -76,12 +84,7 @@ pub async fn daemon( let mut db_status_rx = db.manager(shutdown.token()); // Run socket task - socket_manager(config, socket, state, shutdown.token()).await?; - - // reaction won't abort on startup anymore, we can run start commands - if !config.start() { - return Err("a start command failed, exiting.".into()); - } + socket.manager(config, state, shutdown.token()); // Start Stream managers let mut stream_task_handles = Vec::new(); diff --git a/src/daemon/socket.rs b/src/daemon/socket.rs index 6fd8a26..74d0a7c 100644 --- a/src/daemon/socket.rs +++ b/src/daemon/socket.rs @@ -21,13 +21,12 @@ use crate::{ use super::{filter::FilterManager, shutdown::ShutdownToken}; -macro_rules! err_str { - ($expression:expr) => { - $expression.map_err(|err| err.to_string()) - }; -} - async fn open_socket(path: PathBuf) -> Result { + macro_rules! err_str { + ($expression:expr) => { + $expression.map_err(|err| err.to_string()) + }; + } // First create all directories to the file let dir = path .parent() @@ -222,60 +221,67 @@ macro_rules! or_next { }; } -pub async fn socket_manager( - config: &'static Config, - socket: PathBuf, - shared_state: HashMap<&'static Stream, HashMap<&'static Filter, FilterManager>>, - shutdown: ShutdownToken, -) -> Result<(), String> { - let listener = match open_socket(socket.clone()).await { - Ok(l) => l, - Err(err) => { - return Err(format!("while creating communication socket: {err}")); - } - }; +pub struct Socket { + path: PathBuf, + socket: UnixListener, +} - tokio::spawn(async move { - loop { - tokio::select! { - _ = shutdown.wait() => break, - try_conn = listener.accept() => { - match try_conn { - Ok((conn, _)) => { - let mut transport = Framed::new(conn, LengthDelimitedCodec::new()); - // Decode - let received = transport.next().await; - let encoded_request = match received { - Some(r) => or_next!("while reading request", r), - None => { - error!("failed to answer client: client sent no request"); - continue; - } - }; - let request = or_next!( - "failed to decode request", - serde_json::from_slice(&encoded_request) - ); - // Process - let response = answer_order(config, &shared_state, request); - // Encode - let encoded_response = - or_next!("failed to serialize response", serde_json::to_string::(&response)); - or_next!( - "failed to send response:", - transport.send(Bytes::from(encoded_response)).await - ); +impl Socket { + pub async fn open(socket: PathBuf) -> Result { + Ok(Socket { + socket: open_socket(socket.clone()) + .await + .map_err(|err| format!("while creating communication socket: {err}"))?, + path: socket, + }) + } + + pub fn manager( + self, + config: &'static Config, + shared_state: HashMap<&'static Stream, HashMap<&'static Filter, FilterManager>>, + shutdown: ShutdownToken, + ) { + tokio::spawn(async move { + loop { + tokio::select! { + _ = shutdown.wait() => break, + try_conn = self.socket.accept() => { + match try_conn { + Ok((conn, _)) => { + let mut transport = Framed::new(conn, LengthDelimitedCodec::new()); + // Decode + let received = transport.next().await; + let encoded_request = match received { + Some(r) => or_next!("while reading request", r), + None => { + error!("failed to answer client: client sent no request"); + continue; + } + }; + let request = or_next!( + "failed to decode request", + serde_json::from_slice(&encoded_request) + ); + // Process + let response = answer_order(config, &shared_state, request); + // Encode + let encoded_response = + or_next!("failed to serialize response", serde_json::to_string::(&response)); + or_next!( + "failed to send response:", + transport.send(Bytes::from(encoded_response)).await + ); + } + Err(err) => error!("failed to open connection from cli: {err}"), } - Err(err) => error!("failed to open connection from cli: {err}"), } } } - } - if let Err(err) = fs::remove_file(socket).await { - error!("failed to remove socket: {}", err); - } - }); - - Ok(()) + if let Err(err) = fs::remove_file(self.path).await { + error!("failed to remove socket: {}", err); + } + }); + } } diff --git a/tests/start_stop.rs b/tests/start_stop.rs index 64112f5..09a459e 100644 --- a/tests/start_stop.rs +++ b/tests/start_stop.rs @@ -35,23 +35,43 @@ fn start_stop() { run_reaction(&tmp_dir); // Expected output - let output = [ - "start 1", - "start 2", - "runtime 1", - "runtime 2", - "runtime 1", - "runtime 2", - // no order required because they'll be awaken all together on exit - "after", - "after", - "after", - "after", - "stop 1", - "stop 2", - "", + // (one of them) + let outputs = [ + [ + "start 1", + "start 2", + "runtime 1", + "runtime 2", + "runtime 1", + "runtime 2", + // no order required because they'll be awaken all together on exit + "after", + "after", + "after", + "after", + "stop 1", + "stop 2", + "", + ], + [ + "start 1", + "start 2", + "runtime 2", + "runtime 1", + "runtime 1", + "runtime 2", + // no order required because they'll be awaken all together on exit + "after", + "after", + "after", + "after", + "stop 1", + "stop 2", + "", + ], ]; - tmp_dir.child("log").assert(&output.join("\n")); + let contents = std::fs::read_to_string(tmp_dir.child("log")).unwrap(); + assert!(contents == outputs[0].join("\n") || contents == outputs[1].join("\n")); } fn run_reaction(tmp_dir: &TempDir) {