From 79677cf327de5f23f171f0b95f02f21ab9c495f4 Mon Sep 17 00:00:00 2001 From: ppom Date: Sat, 26 Oct 2024 12:00:00 +0200 Subject: [PATCH] Restructure code and document it in ARCHITECTURE.md --- ARCHITECTURE.md | 71 ++++++++ README.md | 4 + build.rs | 2 +- src/{utils => }/cli.rs | 0 src/client/mod.rs | 158 +----------------- src/client/show_flush.rs | 80 +++++++++ src/client/test_regex.rs | 78 +++++++++ src/concepts/action.rs | 2 +- src/concepts/filter.rs | 2 +- src/concepts/mod.rs | 4 +- src/{utils => concepts}/parse_duration.rs | 0 src/daemon/action.rs | 2 +- src/daemon/database/lowlevel.rs | 2 +- src/daemon/filter.rs | 5 +- src/daemon/socket.rs | 4 +- src/lib.rs | 3 +- src/main.rs | 5 +- .../messages.rs} | 4 +- src/protocol/mod.rs | 5 + .../mod.rs => protocol/serialization.rs} | 4 - tests/simple.rs | 2 +- 21 files changed, 262 insertions(+), 175 deletions(-) create mode 100644 ARCHITECTURE.md rename src/{utils => }/cli.rs (100%) create mode 100644 src/client/show_flush.rs create mode 100644 src/client/test_regex.rs rename src/{utils => concepts}/parse_duration.rs (100%) rename src/{concepts/socket_messages.rs => protocol/messages.rs} (98%) create mode 100644 src/protocol/mod.rs rename src/{utils/mod.rs => protocol/serialization.rs} (95%) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md new file mode 100644 index 0000000..b010403 --- /dev/null +++ b/ARCHITECTURE.md @@ -0,0 +1,71 @@ +# Architecture + +Here is a high-level overview of the codebase. + +*Don't hesitate to create an issue or a merge request if something is unclear, missing or outdated.* + +## Build + +- `build.rs`: permits to create shell completions and man pages on build. +- `Cargo.toml`, `Cargo.lock`: manifest and dependencies. +- `config`: example / test configuration files. Look at the git history to discover more! +- `debian`: reaction.deb generation. +- `Makefile`: Makefile. I plan to remove this at some point. +- `release.py`: Build process for a release. I'd like to make it more modular, to permit to build specific parts only for example. + +## Main source code + +- `helpers_c`: C helpers. I wish to have special IP support in reaction and get rid of them. +- `tests`: Integration tests. For now they test basic reaction runtime behavior, persistance, and client-daemon communication. +- `src`: The source code, here we go! + +### Top-level files + +- `src/main.rs`: Main entrypoint +- `src/lib.rs`: Second main entrypoint +- `src/cli.rs`: Command-line arguments +- `src/tests.rs`: Test utilities + +### `src/concepts` + +reaction really is about its configuration, which is at the center of the code. + +There is one file for each of its concepts: configuration, streams, filters, actions, patterns. + +### `src/protocol` + +Low-level serialization/deserialization and client-daemon protocol messages. + +Shared by the client and daemon's socket. Also used by daemon's database. + +### `src/client` + +Client code: `reaction show`, `reaction flush`, `reaction test-regex`. + +- `show_flush.rs`: `show` & `flush` commands. +- `test_regex.rs`: `test-regex` command. + +### `src/daemon` + +Daemon runtime structures and logic. + +This code is mainly async, with the tokio runtime. + +- `mod.rs`: daemon main function. Initializes all tasks, handles synchronization and quitting, etc. +- `stream.rs`: Stream managers: start the stream `cmd` and dispatch its stdout lines to its Filter managers. +- `filter.rs`: Filter managers: handle lines, store matches, send logs to database and decide when to trigger actions. +- `action.rs`: Action managers: handle action triggers (*execs*), store & manage pending actions. +- `socket.rs`: The socket task, responsible for communication with clients. +- `database`: The database thread. This is a sync thread, because it's somehow muuch faster. At startup it sends persisted matches to the Filter managers. Then it receives match/exec logs from the filters and persist them. + - `database/mod.rs`: Main logic. + - `database/lowlevel.rs`: Low-level implementation details (serialization / deserialization and size optimizations). + - `database/tests.rs`: Unit tests. + +## Migration from Go to Rust + +- `go.old/`: Go / v1 codebase. + +Those scripts are merged in a single-file executable by `release.py`: + +- `export-go-db/`: Go script to export the reaction-v1 database as JSON. +- `import-rust-db/`: Rust script to import the JSON export as a reaction-v2 database. diff --git a/README.md b/README.md index fc61821..0028631 100644 --- a/README.md +++ b/README.md @@ -227,5 +227,9 @@ make install_systemd Contributions are welcome. For any substantial feature, please file an issue first, to be assured that we agree on the feature, and to avoid unnecessary work. +I recommend reading [`ARCHITECTURE.md`](ARCHITECTURE.md) first. This is a tour of the codebase, which should save time to potential contributors. + +## Funding + This is a free time project, so I'm not working on schedule. However, if you're willing to fund the project, I can priorise and plan paid work. This includes features, documentation and specific JSONnet configurations. diff --git a/build.rs b/build.rs index f8ce4db..3c9ebe9 100644 --- a/build.rs +++ b/build.rs @@ -8,7 +8,7 @@ use std::{ use clap_complete::shells; // SubCommand defined here -include!("src/utils/cli.rs"); +include!("src/cli.rs"); fn compile_helper(name: &str, out_dir: &Path) -> io::Result<()> { process::Command::new("gcc") diff --git a/src/utils/cli.rs b/src/cli.rs similarity index 100% rename from src/utils/cli.rs rename to src/cli.rs diff --git a/src/client/mod.rs b/src/client/mod.rs index 8a1b61a..8565a55 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,155 +1,5 @@ -use std::{ - collections::BTreeSet, - error::Error, - io::{stdin, BufRead, BufReader}, - path::{Path, PathBuf}, - sync::Arc, -}; +mod show_flush; +mod test_regex; -use bincode::Options; -use futures::{SinkExt, StreamExt}; -use regex::Regex; -use tokio::net::UnixStream; -use tokio_util::{ - bytes::Bytes, - codec::{Framed, LengthDelimitedCodec}, -}; - -use crate::{ - concepts::{ClientRequest, ClientStatus, Config, DaemonResponse, Order, Pattern}, - utils::{bincode_options, cli::Format}, -}; - -macro_rules! or_quit { - ($msg:expr, $expression:expr) => { - $expression.map_err(|err| format!("failed to communicate to daemon: {}, {}", $msg, err))? - }; -} - -async fn send_retrieve(socket: &Path, req: &ClientRequest) -> Result { - let bin = bincode_options(); - let conn = or_quit!( - "opening connection to daemon", - UnixStream::connect(socket).await - ); - // Encode - let mut transport = Framed::new(conn, LengthDelimitedCodec::new()); - let encoded_request = or_quit!("failed to encode request", bin.serialize(req)); - or_quit!( - "failed to send request", - transport.send(Bytes::from(encoded_request)).await - ); - // Decode - let encoded_response = or_quit!( - "failed to read response", - transport.next().await.ok_or("empty response from server") - ); - let encoded_response = or_quit!("failed to decode response", encoded_response); - Ok(or_quit!( - "failed to decode response", - bin.deserialize::(&encoded_response) - )) -} - -async fn print_status(cs: ClientStatus, format: Format) -> Result<(), Box> { - let encoded = match format { - Format::JSON => serde_json::to_string_pretty(&cs)?, - Format::YAML => serde_yaml::to_string(&cs)?, - }; - println!("{}", encoded); - Ok(()) -} - -pub async fn request( - socket: PathBuf, - format: Format, - stream_filter: Option, - patterns: Vec<(String, String)>, - order: Order, -) -> Result<(), Box> { - let response = send_retrieve( - &socket, - &ClientRequest { - order, - stream_filter, - patterns, - }, - ) - .await; - match response? { - DaemonResponse::Order(cs) => print_status(cs, format) - .await - .map_err(|err| format!("while printing response: {err}")), - DaemonResponse::Err(err) => Err(format!( - "failed to communicate to daemon: error response: {err}" - )), - }?; - Ok(()) -} - -pub fn test_regex( - config_path: PathBuf, - mut regex: String, - line: Option, -) -> Result<(), Box> { - let config: Config = Config::from_file(&config_path)?; - - // Code close to Filter::setup() - let mut used_patterns: BTreeSet> = BTreeSet::new(); - for pattern in config.patterns().values() { - if let Some(index) = regex.find(pattern.name_with_braces()) { - // we already `find` it, so we must be able to `rfind` it - #[allow(clippy::unwrap_used)] - if regex.rfind(pattern.name_with_braces()).unwrap() != index { - return Err(format!( - "pattern {} present multiple times in regex", - pattern.name_with_braces() - ) - .into()); - } - used_patterns.insert(pattern.clone()); - } - regex = regex.replacen(pattern.name_with_braces(), &pattern.regex, 1); - } - - let compiled = Regex::new(®ex).map_err(|err| format!("regex doesn't compile: {err}"))?; - - let match_closure = |line: String| { - let mut ignored = false; - if let Some(matches) = compiled.captures(&line) { - let mut result = Vec::new(); - if !used_patterns.is_empty() { - for pattern in used_patterns.iter() { - if let Some(match_) = matches.name(pattern.name()) { - result.push(match_.as_str().to_string()); - if !pattern.not_an_ignore(match_.as_str()) { - ignored = true; - } - } - } - if !ignored { - println!("\x1b[32mmatching\x1b[0m {result:?}: {line}"); - } else { - println!("\x1b[33mignore matching\x1b[0m {result:?}: {line}"); - } - } else { - println!("\x1b[32mmatching\x1b[0m: {line}"); - } - } else { - println!("\x1b[31mno match\x1b[0m: {line}"); - } - }; - - if let Some(line) = line { - match_closure(line); - } else { - eprintln!("no second argument: reading from stdin"); - for line in BufReader::new(stdin()).lines() { - match line { - Ok(line) => match_closure(line), - Err(_) => break, - }; - } - } - Ok(()) -} +pub use show_flush::request; +pub use test_regex::test_regex; diff --git a/src/client/show_flush.rs b/src/client/show_flush.rs new file mode 100644 index 0000000..73e0591 --- /dev/null +++ b/src/client/show_flush.rs @@ -0,0 +1,80 @@ +use std::{ + error::Error, + path::{Path, PathBuf}, +}; + +use bincode::Options; +use futures::{SinkExt, StreamExt}; +use tokio::net::UnixStream; +use tokio_util::{ + bytes::Bytes, + codec::{Framed, LengthDelimitedCodec}, +}; + +use crate::{cli::Format, protocol::{bincode_options, ClientRequest, ClientStatus, DaemonResponse, Order}}; + +macro_rules! or_quit { + ($msg:expr, $expression:expr) => { + $expression.map_err(|err| format!("failed to communicate to daemon: {}, {}", $msg, err))? + }; +} + +async fn send_retrieve(socket: &Path, req: &ClientRequest) -> Result { + let bin = bincode_options(); + let conn = or_quit!( + "opening connection to daemon", + UnixStream::connect(socket).await + ); + // Encode + let mut transport = Framed::new(conn, LengthDelimitedCodec::new()); + let encoded_request = or_quit!("failed to encode request", bin.serialize(req)); + or_quit!( + "failed to send request", + transport.send(Bytes::from(encoded_request)).await + ); + // Decode + let encoded_response = or_quit!( + "failed to read response", + transport.next().await.ok_or("empty response from server") + ); + let encoded_response = or_quit!("failed to decode response", encoded_response); + Ok(or_quit!( + "failed to decode response", + bin.deserialize::(&encoded_response) + )) +} + +fn print_status(cs: ClientStatus, format: Format) -> Result<(), Box> { + let encoded = match format { + Format::JSON => serde_json::to_string_pretty(&cs)?, + Format::YAML => serde_yaml::to_string(&cs)?, + }; + println!("{}", encoded); + Ok(()) +} + +pub async fn request( + socket: PathBuf, + format: Format, + stream_filter: Option, + patterns: Vec<(String, String)>, + order: Order, +) -> Result<(), Box> { + let response = send_retrieve( + &socket, + &ClientRequest { + order, + stream_filter, + patterns, + }, + ) + .await; + match response? { + DaemonResponse::Order(cs) => print_status(cs, format) + .map_err(|err| format!("while printing response: {err}")), + DaemonResponse::Err(err) => Err(format!( + "failed to communicate to daemon: error response: {err}" + )), + }?; + Ok(()) +} diff --git a/src/client/test_regex.rs b/src/client/test_regex.rs new file mode 100644 index 0000000..3197106 --- /dev/null +++ b/src/client/test_regex.rs @@ -0,0 +1,78 @@ +use std::{ + collections::BTreeSet, + error::Error, + io::{stdin, BufRead, BufReader}, + path::PathBuf, + sync::Arc, +}; + +use regex::Regex; + +use crate::concepts::{Config, Pattern}; + +pub fn test_regex( + config_path: PathBuf, + mut regex: String, + line: Option, +) -> Result<(), Box> { + let config: Config = Config::from_file(&config_path)?; + + // Code close to Filter::setup() + let mut used_patterns: BTreeSet> = BTreeSet::new(); + for pattern in config.patterns().values() { + if let Some(index) = regex.find(pattern.name_with_braces()) { + // we already `find` it, so we must be able to `rfind` it + #[allow(clippy::unwrap_used)] + if regex.rfind(pattern.name_with_braces()).unwrap() != index { + return Err(format!( + "pattern {} present multiple times in regex", + pattern.name_with_braces() + ) + .into()); + } + used_patterns.insert(pattern.clone()); + } + regex = regex.replacen(pattern.name_with_braces(), &pattern.regex, 1); + } + + let compiled = Regex::new(®ex).map_err(|err| format!("regex doesn't compile: {err}"))?; + + let match_closure = |line: String| { + let mut ignored = false; + if let Some(matches) = compiled.captures(&line) { + let mut result = Vec::new(); + if !used_patterns.is_empty() { + for pattern in used_patterns.iter() { + if let Some(match_) = matches.name(pattern.name()) { + result.push(match_.as_str().to_string()); + if !pattern.not_an_ignore(match_.as_str()) { + ignored = true; + } + } + } + if !ignored { + println!("\x1b[32mmatching\x1b[0m {result:?}: {line}"); + } else { + println!("\x1b[33mignore matching\x1b[0m {result:?}: {line}"); + } + } else { + println!("\x1b[32mmatching\x1b[0m: {line}"); + } + } else { + println!("\x1b[31mno match\x1b[0m: {line}"); + } + }; + + if let Some(line) = line { + match_closure(line); + } else { + eprintln!("no second argument: reading from stdin"); + for line in BufReader::new(stdin()).lines() { + match line { + Ok(line) => match_closure(line), + Err(_) => break, + }; + } + } + Ok(()) +} diff --git a/src/concepts/action.rs b/src/concepts/action.rs index f5378ff..2dc18eb 100644 --- a/src/concepts/action.rs +++ b/src/concepts/action.rs @@ -6,7 +6,7 @@ use serde::Deserialize; use tokio::process::Command; use super::{Match, Pattern}; -use crate::utils::parse_duration; +use super::parse_duration; #[derive(Clone, Debug, Deserialize)] #[serde(deny_unknown_fields)] diff --git a/src/concepts/filter.rs b/src/concepts/filter.rs index d895b53..be8d911 100644 --- a/src/concepts/filter.rs +++ b/src/concepts/filter.rs @@ -16,7 +16,7 @@ use super::{ messages::{Match, Time, MAT}, Action, LogEntry, Pattern, Patterns, }; -use crate::utils::parse_duration; +use super::parse_duration; // Only names are serialized // Only computed fields are not deserialized diff --git a/src/concepts/mod.rs b/src/concepts/mod.rs index 3e25648..4c5850e 100644 --- a/src/concepts/mod.rs +++ b/src/concepts/mod.rs @@ -2,8 +2,8 @@ mod action; mod config; mod filter; mod messages; +mod parse_duration; mod pattern; -mod socket_messages; mod stream; pub use action::Action; @@ -11,5 +11,5 @@ pub use config::{Config, Patterns}; pub use filter::Filter; pub use messages::*; pub use pattern::Pattern; -pub use socket_messages::*; pub use stream::Stream; +pub use parse_duration::parse_duration; diff --git a/src/utils/parse_duration.rs b/src/concepts/parse_duration.rs similarity index 100% rename from src/utils/parse_duration.rs rename to src/concepts/parse_duration.rs diff --git a/src/daemon/action.rs b/src/daemon/action.rs index db36557..5b9a746 100644 --- a/src/daemon/action.rs +++ b/src/daemon/action.rs @@ -8,7 +8,7 @@ use chrono::{Local, TimeDelta}; use tokio::sync::Semaphore; use tracing::{error, info}; -use crate::concepts::{Action, Match, Order, Time}; +use crate::{concepts::{Action, Match, Time}, protocol::Order}; struct State { pending: BTreeMap>, diff --git a/src/daemon/database/lowlevel.rs b/src/daemon/database/lowlevel.rs index 7d0536c..7d6284c 100644 --- a/src/daemon/database/lowlevel.rs +++ b/src/daemon/database/lowlevel.rs @@ -13,7 +13,7 @@ use tracing::{debug, error, warn}; use crate::{ concepts::{Config, Filter, LogEntry, Match}, - utils::{bincode_options, BincodeOptions}, + protocol::{bincode_options, BincodeOptions}, }; use super::DBError; diff --git a/src/daemon/filter.rs b/src/daemon/filter.rs index 310f448..027b45f 100644 --- a/src/daemon/filter.rs +++ b/src/daemon/filter.rs @@ -7,7 +7,10 @@ use chrono::Local; use regex::Regex; use tokio::sync::{mpsc, Semaphore}; -use crate::concepts::{Filter, LogEntry, Match, Order, Pattern, PatternStatus, Time, MFT}; +use crate::{ + concepts::{Filter, LogEntry, Match, Pattern, Time, MFT}, + protocol::{Order, PatternStatus}, +}; use super::{action::ActionManager, database::DatabaseManagerInput}; diff --git a/src/daemon/socket.rs b/src/daemon/socket.rs index 64935b4..d167cda 100644 --- a/src/daemon/socket.rs +++ b/src/daemon/socket.rs @@ -11,8 +11,8 @@ use tokio_util::{ use tracing::{error, warn}; use crate::{ - concepts::{ClientRequest, ClientStatus, Config, DaemonResponse, Pattern}, - utils::bincode_options, + concepts::{Config, Pattern}, + protocol::{bincode_options, ClientRequest, ClientStatus, DaemonResponse}, }; use super::SharedState; diff --git a/src/lib.rs b/src/lib.rs index 012b4d3..dff7680 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,8 +7,9 @@ )] #![allow(clippy::upper_case_acronyms, clippy::mutable_key_type)] +pub mod cli; pub mod client; pub mod concepts; pub mod daemon; +pub mod protocol; pub mod tests; -pub mod utils; diff --git a/src/main.rs b/src/main.rs index 97d06fb..ab42624 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,11 @@ use std::{io::IsTerminal, process::exit}; use clap::Parser; - use reaction::{ + cli::{Cli, SubCommand}, client::{request, test_regex}, - concepts::Order, daemon::daemon, - utils::cli::{Cli, SubCommand}, + protocol::Order, }; use tracing::{error, Level}; diff --git a/src/concepts/socket_messages.rs b/src/protocol/messages.rs similarity index 98% rename from src/concepts/socket_messages.rs rename to src/protocol/messages.rs index 5f00d79..0fa94de 100644 --- a/src/concepts/socket_messages.rs +++ b/src/protocol/messages.rs @@ -1,12 +1,12 @@ use std::collections::BTreeMap; -use super::Match; - use serde::{ // ser::{SerializeMap, SerializeStruct}, Deserialize, Serialize, }; +use crate::concepts::Match; + // We don't need protocol versionning here because // client and daemon are the same binary diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs new file mode 100644 index 0000000..9dfbe69 --- /dev/null +++ b/src/protocol/mod.rs @@ -0,0 +1,5 @@ +mod messages; +mod serialization; + +pub use messages::*; +pub use serialization::*; diff --git a/src/utils/mod.rs b/src/protocol/serialization.rs similarity index 95% rename from src/utils/mod.rs rename to src/protocol/serialization.rs index 9505e7c..e82de5f 100644 --- a/src/utils/mod.rs +++ b/src/protocol/serialization.rs @@ -1,10 +1,6 @@ -pub mod cli; -mod parse_duration; - use std::marker::PhantomData; use bincode::Options; -pub use parse_duration::parse_duration; use serde::de::DeserializeOwned; use thiserror::Error; use tokio_util::codec::{Decoder, LengthDelimitedCodec}; diff --git a/tests/simple.rs b/tests/simple.rs index 056811c..b7fa1e0 100644 --- a/tests/simple.rs +++ b/tests/simple.rs @@ -8,7 +8,7 @@ use std::{ use tempfile::TempDir; use tracing::Level; -use reaction::{client::request, concepts::Order, daemon::daemon, utils::cli::Format}; +use reaction::{cli::Format, client::request, daemon::daemon, protocol::Order}; use tokio::time::sleep; fn file_with_contents(path: &str, contents: &str) {