Restructure code and document it in ARCHITECTURE.md

This commit is contained in:
ppom 2024-10-26 12:00:00 +02:00
commit 79677cf327
21 changed files with 262 additions and 175 deletions

71
ARCHITECTURE.md Normal file
View file

@ -0,0 +1,71 @@
# Architecture
Here is a high-level overview of the codebase.
*Don't hesitate to create an issue or a merge request if something is unclear, missing or outdated.*
## Build
- `build.rs`: permits to create shell completions and man pages on build.
- `Cargo.toml`, `Cargo.lock`: manifest and dependencies.
- `config`: example / test configuration files. Look at the git history to discover more!
- `debian`: reaction.deb generation.
- `Makefile`: Makefile. I plan to remove this at some point.
- `release.py`: Build process for a release. I'd like to make it more modular, to permit to build specific parts only for example.
## Main source code
- `helpers_c`: C helpers. I wish to have special IP support in reaction and get rid of them.
- `tests`: Integration tests. For now they test basic reaction runtime behavior, persistance, and client-daemon communication.
- `src`: The source code, here we go!
### Top-level files
- `src/main.rs`: Main entrypoint
- `src/lib.rs`: Second main entrypoint
- `src/cli.rs`: Command-line arguments
- `src/tests.rs`: Test utilities
### `src/concepts`
reaction really is about its configuration, which is at the center of the code.
There is one file for each of its concepts: configuration, streams, filters, actions, patterns.
### `src/protocol`
Low-level serialization/deserialization and client-daemon protocol messages.
Shared by the client and daemon's socket. Also used by daemon's database.
### `src/client`
Client code: `reaction show`, `reaction flush`, `reaction test-regex`.
- `show_flush.rs`: `show` & `flush` commands.
- `test_regex.rs`: `test-regex` command.
### `src/daemon`
Daemon runtime structures and logic.
This code is mainly async, with the tokio runtime.
- `mod.rs`: daemon main function. Initializes all tasks, handles synchronization and quitting, etc.
- `stream.rs`: Stream managers: start the stream `cmd` and dispatch its stdout lines to its Filter managers.
- `filter.rs`: Filter managers: handle lines, store matches, send logs to database and decide when to trigger actions.
- `action.rs`: Action managers: handle action triggers (*execs*), store & manage pending actions.
- `socket.rs`: The socket task, responsible for communication with clients.
- `database`: The database thread. This is a sync thread, because it's somehow muuch faster. At startup it sends persisted matches to the Filter managers. Then it receives match/exec logs from the filters and persist them.
- `database/mod.rs`: Main logic.
- `database/lowlevel.rs`: Low-level implementation details (serialization / deserialization and size optimizations).
- `database/tests.rs`: Unit tests.
## Migration from Go to Rust
- `go.old/`: Go / v1 codebase.
Those scripts are merged in a single-file executable by `release.py`:
- `export-go-db/`: Go script to export the reaction-v1 database as JSON.
- `import-rust-db/`: Rust script to import the JSON export as a reaction-v2 database.

View file

@ -227,5 +227,9 @@ make install_systemd
Contributions are welcome. For any substantial feature, please file an issue first, to be assured that we agree on the feature, and to avoid unnecessary work.
I recommend reading [`ARCHITECTURE.md`](ARCHITECTURE.md) first. This is a tour of the codebase, which should save time to potential contributors.
## Funding
This is a free time project, so I'm not working on schedule.
However, if you're willing to fund the project, I can priorise and plan paid work. This includes features, documentation and specific JSONnet configurations.

View file

@ -8,7 +8,7 @@ use std::{
use clap_complete::shells;
// SubCommand defined here
include!("src/utils/cli.rs");
include!("src/cli.rs");
fn compile_helper(name: &str, out_dir: &Path) -> io::Result<()> {
process::Command::new("gcc")

View file

@ -1,155 +1,5 @@
use std::{
collections::BTreeSet,
error::Error,
io::{stdin, BufRead, BufReader},
path::{Path, PathBuf},
sync::Arc,
};
mod show_flush;
mod test_regex;
use bincode::Options;
use futures::{SinkExt, StreamExt};
use regex::Regex;
use tokio::net::UnixStream;
use tokio_util::{
bytes::Bytes,
codec::{Framed, LengthDelimitedCodec},
};
use crate::{
concepts::{ClientRequest, ClientStatus, Config, DaemonResponse, Order, Pattern},
utils::{bincode_options, cli::Format},
};
macro_rules! or_quit {
($msg:expr, $expression:expr) => {
$expression.map_err(|err| format!("failed to communicate to daemon: {}, {}", $msg, err))?
};
}
async fn send_retrieve(socket: &Path, req: &ClientRequest) -> Result<DaemonResponse, String> {
let bin = bincode_options();
let conn = or_quit!(
"opening connection to daemon",
UnixStream::connect(socket).await
);
// Encode
let mut transport = Framed::new(conn, LengthDelimitedCodec::new());
let encoded_request = or_quit!("failed to encode request", bin.serialize(req));
or_quit!(
"failed to send request",
transport.send(Bytes::from(encoded_request)).await
);
// Decode
let encoded_response = or_quit!(
"failed to read response",
transport.next().await.ok_or("empty response from server")
);
let encoded_response = or_quit!("failed to decode response", encoded_response);
Ok(or_quit!(
"failed to decode response",
bin.deserialize::<DaemonResponse>(&encoded_response)
))
}
async fn print_status(cs: ClientStatus, format: Format) -> Result<(), Box<dyn Error>> {
let encoded = match format {
Format::JSON => serde_json::to_string_pretty(&cs)?,
Format::YAML => serde_yaml::to_string(&cs)?,
};
println!("{}", encoded);
Ok(())
}
pub async fn request(
socket: PathBuf,
format: Format,
stream_filter: Option<String>,
patterns: Vec<(String, String)>,
order: Order,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let response = send_retrieve(
&socket,
&ClientRequest {
order,
stream_filter,
patterns,
},
)
.await;
match response? {
DaemonResponse::Order(cs) => print_status(cs, format)
.await
.map_err(|err| format!("while printing response: {err}")),
DaemonResponse::Err(err) => Err(format!(
"failed to communicate to daemon: error response: {err}"
)),
}?;
Ok(())
}
pub fn test_regex(
config_path: PathBuf,
mut regex: String,
line: Option<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let config: Config = Config::from_file(&config_path)?;
// Code close to Filter::setup()
let mut used_patterns: BTreeSet<Arc<Pattern>> = BTreeSet::new();
for pattern in config.patterns().values() {
if let Some(index) = regex.find(pattern.name_with_braces()) {
// we already `find` it, so we must be able to `rfind` it
#[allow(clippy::unwrap_used)]
if regex.rfind(pattern.name_with_braces()).unwrap() != index {
return Err(format!(
"pattern {} present multiple times in regex",
pattern.name_with_braces()
)
.into());
}
used_patterns.insert(pattern.clone());
}
regex = regex.replacen(pattern.name_with_braces(), &pattern.regex, 1);
}
let compiled = Regex::new(&regex).map_err(|err| format!("regex doesn't compile: {err}"))?;
let match_closure = |line: String| {
let mut ignored = false;
if let Some(matches) = compiled.captures(&line) {
let mut result = Vec::new();
if !used_patterns.is_empty() {
for pattern in used_patterns.iter() {
if let Some(match_) = matches.name(pattern.name()) {
result.push(match_.as_str().to_string());
if !pattern.not_an_ignore(match_.as_str()) {
ignored = true;
}
}
}
if !ignored {
println!("\x1b[32mmatching\x1b[0m {result:?}: {line}");
} else {
println!("\x1b[33mignore matching\x1b[0m {result:?}: {line}");
}
} else {
println!("\x1b[32mmatching\x1b[0m: {line}");
}
} else {
println!("\x1b[31mno match\x1b[0m: {line}");
}
};
if let Some(line) = line {
match_closure(line);
} else {
eprintln!("no second argument: reading from stdin");
for line in BufReader::new(stdin()).lines() {
match line {
Ok(line) => match_closure(line),
Err(_) => break,
};
}
}
Ok(())
}
pub use show_flush::request;
pub use test_regex::test_regex;

80
src/client/show_flush.rs Normal file
View file

@ -0,0 +1,80 @@
use std::{
error::Error,
path::{Path, PathBuf},
};
use bincode::Options;
use futures::{SinkExt, StreamExt};
use tokio::net::UnixStream;
use tokio_util::{
bytes::Bytes,
codec::{Framed, LengthDelimitedCodec},
};
use crate::{cli::Format, protocol::{bincode_options, ClientRequest, ClientStatus, DaemonResponse, Order}};
macro_rules! or_quit {
($msg:expr, $expression:expr) => {
$expression.map_err(|err| format!("failed to communicate to daemon: {}, {}", $msg, err))?
};
}
async fn send_retrieve(socket: &Path, req: &ClientRequest) -> Result<DaemonResponse, String> {
let bin = bincode_options();
let conn = or_quit!(
"opening connection to daemon",
UnixStream::connect(socket).await
);
// Encode
let mut transport = Framed::new(conn, LengthDelimitedCodec::new());
let encoded_request = or_quit!("failed to encode request", bin.serialize(req));
or_quit!(
"failed to send request",
transport.send(Bytes::from(encoded_request)).await
);
// Decode
let encoded_response = or_quit!(
"failed to read response",
transport.next().await.ok_or("empty response from server")
);
let encoded_response = or_quit!("failed to decode response", encoded_response);
Ok(or_quit!(
"failed to decode response",
bin.deserialize::<DaemonResponse>(&encoded_response)
))
}
fn print_status(cs: ClientStatus, format: Format) -> Result<(), Box<dyn Error>> {
let encoded = match format {
Format::JSON => serde_json::to_string_pretty(&cs)?,
Format::YAML => serde_yaml::to_string(&cs)?,
};
println!("{}", encoded);
Ok(())
}
pub async fn request(
socket: PathBuf,
format: Format,
stream_filter: Option<String>,
patterns: Vec<(String, String)>,
order: Order,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let response = send_retrieve(
&socket,
&ClientRequest {
order,
stream_filter,
patterns,
},
)
.await;
match response? {
DaemonResponse::Order(cs) => print_status(cs, format)
.map_err(|err| format!("while printing response: {err}")),
DaemonResponse::Err(err) => Err(format!(
"failed to communicate to daemon: error response: {err}"
)),
}?;
Ok(())
}

78
src/client/test_regex.rs Normal file
View file

@ -0,0 +1,78 @@
use std::{
collections::BTreeSet,
error::Error,
io::{stdin, BufRead, BufReader},
path::PathBuf,
sync::Arc,
};
use regex::Regex;
use crate::concepts::{Config, Pattern};
pub fn test_regex(
config_path: PathBuf,
mut regex: String,
line: Option<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let config: Config = Config::from_file(&config_path)?;
// Code close to Filter::setup()
let mut used_patterns: BTreeSet<Arc<Pattern>> = BTreeSet::new();
for pattern in config.patterns().values() {
if let Some(index) = regex.find(pattern.name_with_braces()) {
// we already `find` it, so we must be able to `rfind` it
#[allow(clippy::unwrap_used)]
if regex.rfind(pattern.name_with_braces()).unwrap() != index {
return Err(format!(
"pattern {} present multiple times in regex",
pattern.name_with_braces()
)
.into());
}
used_patterns.insert(pattern.clone());
}
regex = regex.replacen(pattern.name_with_braces(), &pattern.regex, 1);
}
let compiled = Regex::new(&regex).map_err(|err| format!("regex doesn't compile: {err}"))?;
let match_closure = |line: String| {
let mut ignored = false;
if let Some(matches) = compiled.captures(&line) {
let mut result = Vec::new();
if !used_patterns.is_empty() {
for pattern in used_patterns.iter() {
if let Some(match_) = matches.name(pattern.name()) {
result.push(match_.as_str().to_string());
if !pattern.not_an_ignore(match_.as_str()) {
ignored = true;
}
}
}
if !ignored {
println!("\x1b[32mmatching\x1b[0m {result:?}: {line}");
} else {
println!("\x1b[33mignore matching\x1b[0m {result:?}: {line}");
}
} else {
println!("\x1b[32mmatching\x1b[0m: {line}");
}
} else {
println!("\x1b[31mno match\x1b[0m: {line}");
}
};
if let Some(line) = line {
match_closure(line);
} else {
eprintln!("no second argument: reading from stdin");
for line in BufReader::new(stdin()).lines() {
match line {
Ok(line) => match_closure(line),
Err(_) => break,
};
}
}
Ok(())
}

View file

@ -6,7 +6,7 @@ use serde::Deserialize;
use tokio::process::Command;
use super::{Match, Pattern};
use crate::utils::parse_duration;
use super::parse_duration;
#[derive(Clone, Debug, Deserialize)]
#[serde(deny_unknown_fields)]

View file

@ -16,7 +16,7 @@ use super::{
messages::{Match, Time, MAT},
Action, LogEntry, Pattern, Patterns,
};
use crate::utils::parse_duration;
use super::parse_duration;
// Only names are serialized
// Only computed fields are not deserialized

View file

@ -2,8 +2,8 @@ mod action;
mod config;
mod filter;
mod messages;
mod parse_duration;
mod pattern;
mod socket_messages;
mod stream;
pub use action::Action;
@ -11,5 +11,5 @@ pub use config::{Config, Patterns};
pub use filter::Filter;
pub use messages::*;
pub use pattern::Pattern;
pub use socket_messages::*;
pub use stream::Stream;
pub use parse_duration::parse_duration;

View file

@ -8,7 +8,7 @@ use chrono::{Local, TimeDelta};
use tokio::sync::Semaphore;
use tracing::{error, info};
use crate::concepts::{Action, Match, Order, Time};
use crate::{concepts::{Action, Match, Time}, protocol::Order};
struct State {
pending: BTreeMap<Match, BTreeSet<Time>>,

View file

@ -13,7 +13,7 @@ use tracing::{debug, error, warn};
use crate::{
concepts::{Config, Filter, LogEntry, Match},
utils::{bincode_options, BincodeOptions},
protocol::{bincode_options, BincodeOptions},
};
use super::DBError;

View file

@ -7,7 +7,10 @@ use chrono::Local;
use regex::Regex;
use tokio::sync::{mpsc, Semaphore};
use crate::concepts::{Filter, LogEntry, Match, Order, Pattern, PatternStatus, Time, MFT};
use crate::{
concepts::{Filter, LogEntry, Match, Pattern, Time, MFT},
protocol::{Order, PatternStatus},
};
use super::{action::ActionManager, database::DatabaseManagerInput};

View file

@ -11,8 +11,8 @@ use tokio_util::{
use tracing::{error, warn};
use crate::{
concepts::{ClientRequest, ClientStatus, Config, DaemonResponse, Pattern},
utils::bincode_options,
concepts::{Config, Pattern},
protocol::{bincode_options, ClientRequest, ClientStatus, DaemonResponse},
};
use super::SharedState;

View file

@ -7,8 +7,9 @@
)]
#![allow(clippy::upper_case_acronyms, clippy::mutable_key_type)]
pub mod cli;
pub mod client;
pub mod concepts;
pub mod daemon;
pub mod protocol;
pub mod tests;
pub mod utils;

View file

@ -1,12 +1,11 @@
use std::{io::IsTerminal, process::exit};
use clap::Parser;
use reaction::{
cli::{Cli, SubCommand},
client::{request, test_regex},
concepts::Order,
daemon::daemon,
utils::cli::{Cli, SubCommand},
protocol::Order,
};
use tracing::{error, Level};

View file

@ -1,12 +1,12 @@
use std::collections::BTreeMap;
use super::Match;
use serde::{
// ser::{SerializeMap, SerializeStruct},
Deserialize, Serialize,
};
use crate::concepts::Match;
// We don't need protocol versionning here because
// client and daemon are the same binary

5
src/protocol/mod.rs Normal file
View file

@ -0,0 +1,5 @@
mod messages;
mod serialization;
pub use messages::*;
pub use serialization::*;

View file

@ -1,10 +1,6 @@
pub mod cli;
mod parse_duration;
use std::marker::PhantomData;
use bincode::Options;
pub use parse_duration::parse_duration;
use serde::de::DeserializeOwned;
use thiserror::Error;
use tokio_util::codec::{Decoder, LengthDelimitedCodec};

View file

@ -8,7 +8,7 @@ use std::{
use tempfile::TempDir;
use tracing::Level;
use reaction::{client::request, concepts::Order, daemon::daemon, utils::cli::Format};
use reaction::{cli::Format, client::request, daemon::daemon, protocol::Order};
use tokio::time::sleep;
fn file_with_contents(path: &str, contents: &str) {