From f7184ff42b0d5fbb2eca6a4df7a089683399dfe0 Mon Sep 17 00:00:00 2001 From: ppom Date: Fri, 21 Feb 2025 12:00:00 +0100 Subject: [PATCH 001/308] release: Fix expected return code from Gitlab --- release.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release.py b/release.py index a7c27a1..9d0cfdf 100644 --- a/release.py +++ b/release.py @@ -269,7 +269,7 @@ curl -O https://static.ppom.me/reaction/releases/{tag}/{deb_name} \\ conn.request("POST", "/api/v4/projects/90566/releases", body=body, headers=headers) response = conn.getresponse() - if response.status != 200: + if response.status != 201: print( f"sending message failed: status: {response.status}, reason: {response.reason}" ) From f641f4521168900562dd17fbe6ea16312c704ef5 Mon Sep 17 00:00:00 2001 From: ppom Date: Sat, 22 Feb 2025 12:00:00 +0100 Subject: [PATCH 002/308] Update ARCHITECTURE.md --- ARCHITECTURE.md | 28 +++++++--------------------- Makefile | 3 +++ README.md | 2 +- 3 files changed, 11 insertions(+), 22 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index b010403..3b95b20 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -8,14 +8,14 @@ Here is a high-level overview of the codebase. - `build.rs`: permits to create shell completions and man pages on build. - `Cargo.toml`, `Cargo.lock`: manifest and dependencies. -- `config`: example / test configuration files. Look at the git history to discover more! -- `debian`: reaction.deb generation. -- `Makefile`: Makefile. I plan to remove this at some point. -- `release.py`: Build process for a release. I'd like to make it more modular, to permit to build specific parts only for example. +- `config`: example / test configuration files. Look at its git history to discover more. +- `Makefile`: Makefile. Resumes useful commands. +- `packaging`: Files useful for .deb and .tar generation. +- `release.py`: Build process for a release. Handles cross-compilation, .tar and .deb generation. ## Main source code -- `helpers_c`: C helpers. I wish to have special IP support in reaction and get rid of them. +- `helpers_c`: C helpers. I wish to have special IP support in reaction and get rid of them. See #79 and #116. - `tests`: Integration tests. For now they test basic reaction runtime behavior, persistance, and client-daemon communication. - `src`: The source code, here we go! @@ -49,23 +49,9 @@ Client code: `reaction show`, `reaction flush`, `reaction test-regex`. Daemon runtime structures and logic. -This code is mainly async, with the tokio runtime. +This code has async code, to handle input streams and communication with clients, using the tokio runtime. - `mod.rs`: daemon main function. Initializes all tasks, handles synchronization and quitting, etc. - `stream.rs`: Stream managers: start the stream `cmd` and dispatch its stdout lines to its Filter managers. -- `filter.rs`: Filter managers: handle lines, store matches, send logs to database and decide when to trigger actions. -- `action.rs`: Action managers: handle action triggers (*execs*), store & manage pending actions. +- `filter.rs`: Filter managers: handle lines, persistance, store matches and trigger actions. This is the main piece of runtime logic. - `socket.rs`: The socket task, responsible for communication with clients. -- `database`: The database thread. This is a sync thread, because it's somehow muuch faster. At startup it sends persisted matches to the Filter managers. Then it receives match/exec logs from the filters and persist them. - - `database/mod.rs`: Main logic. - - `database/lowlevel.rs`: Low-level implementation details (serialization / deserialization and size optimizations). - - `database/tests.rs`: Unit tests. - -## Migration from Go to Rust - -- `go.old/`: Go / v1 codebase. - -Those scripts are merged in a single-file executable by `release.py`: - -- `export-go-db/`: Go script to export the reaction-v1 database as JSON. -- `import-rust-db/`: Rust script to import the JSON export as a reaction-v2 database. diff --git a/Makefile b/Makefile index 52a3af8..3efd558 100644 --- a/Makefile +++ b/Makefile @@ -20,3 +20,6 @@ install: reaction install_systemd: install install -m644 packaging/reaction.service $(SYSTEMDDIR)/system/reaction.service sed -i 's#/usr/local/bin#$(DESTDIR)$(BINDIR)#' $(SYSTEMDDIR)/system/reaction.service + +release: + nix-shell release.py diff --git a/README.md b/README.md index 725e0d8..da20a16 100644 --- a/README.md +++ b/README.md @@ -198,7 +198,7 @@ make install_systemd Contributions are welcome. For any substantial feature, please file an issue first, to be assured that we agree on the feature, and to avoid unnecessary work. -I recommend reading [`ARCHITECTURE.md`](ARCHITECTURE.md) first. This is a tour of the codebase, which should save time to potential contributors. +I recommend reading [`ARCHITECTURE.md`](ARCHITECTURE.md) first. This is a tour of the codebase, which should save time to new contributors. ## Funding From fb9c0f869903d25b1163fe775e1f61cc13ec9592 Mon Sep 17 00:00:00 2001 From: ppom Date: Sun, 23 Feb 2025 12:00:00 +0100 Subject: [PATCH 003/308] Remove dead code Process is a bit tedious, as pub objects are not treated as unused: 1. Remove all `pub`s: sed -i 's/\bpub //' (rg -le rs src/) 2. Fix the errors in your IDE by adding the necessary `pub`s 3. Remove what's marked as unused, when the project compiles again. --- src/concepts/action.rs | 8 -------- src/concepts/config.rs | 26 ++------------------------ src/concepts/filter.rs | 30 ------------------------------ src/concepts/mod.rs | 2 +- src/concepts/stream.rs | 6 +++--- src/daemon/filter.rs | 18 ++++-------------- src/daemon/shutdown.rs | 2 +- src/daemon/sledext.rs | 6 +++--- src/protocol/messages.rs | 8 -------- 9 files changed, 14 insertions(+), 92 deletions(-) diff --git a/src/concepts/action.rs b/src/concepts/action.rs index fc8ef1c..8a659ef 100644 --- a/src/concepts/action.rs +++ b/src/concepts/action.rs @@ -40,14 +40,6 @@ impl Action { &self.name } - pub fn stream_name(&self) -> &str { - &self.stream_name - } - - pub fn filter_name(&self) -> &str { - &self.filter_name - } - pub fn after_duration(&self) -> Option { self.after_duration } diff --git a/src/concepts/config.rs b/src/concepts/config.rs index 3e9c639..3067a17 100644 --- a/src/concepts/config.rs +++ b/src/concepts/config.rs @@ -11,7 +11,7 @@ use serde::Deserialize; use thiserror::Error; use tracing::{error, info}; -use super::{Filter, Pattern, Stream}; +use super::{Pattern, Stream}; pub type Patterns = BTreeMap>; @@ -60,20 +60,7 @@ impl Config { &self.state_directory } - pub fn filters(&self) -> Vec<&Filter> { - self.streams - .values() - .flat_map(|stream| stream.filters().values()) - .collect() - } - - pub fn get_filter(&self, name: &(String, String)) -> Option<&Filter> { - self.streams - .get(&name.0) - .and_then(|stream| stream.get_filter(&name.1)) - } - - pub fn setup(&mut self) -> Result<(), String> { + fn setup(&mut self) -> Result<(), String> { if self.concurrency == 0 { self.concurrency = num_cpus::get(); } @@ -138,15 +125,6 @@ impl Config { Ok(config) } - - #[cfg(test)] - pub fn from_streams(streams: BTreeMap, dir: &str) -> Self { - Self { - streams, - state_directory: dir.to_string(), - ..Default::default() - } - } } enum Format { diff --git a/src/concepts/filter.rs b/src/concepts/filter.rs index b411773..14a0791 100644 --- a/src/concepts/filter.rs +++ b/src/concepts/filter.rs @@ -44,28 +44,6 @@ pub struct Filter { } impl Filter { - #[cfg(test)] - pub fn from_name(stream_name: &str, filter_name: &str) -> Filter { - Filter { - stream_name: stream_name.into(), - name: filter_name.into(), - ..Filter::default() - } - } - - #[cfg(test)] - pub fn from_name_and_patterns( - stream_name: &str, - filter_name: &str, - patterns: Vec, - ) -> Filter { - Filter { - stream_name: stream_name.into(), - name: filter_name.into(), - patterns: Arc::new(patterns.into_iter().map(|p| Arc::new(p)).collect()), - ..Filter::default() - } - } pub fn name(&self) -> &str { &self.name @@ -87,14 +65,6 @@ impl Filter { self.longuest_action_duration } - pub fn max_time_before_outdated(&self) -> TimeDelta { - if let Some(retry_duration) = self.retry_duration { - self.longuest_action_duration + retry_duration - } else { - self.longuest_action_duration - } - } - pub fn actions(&self) -> &BTreeMap { &self.actions } diff --git a/src/concepts/mod.rs b/src/concepts/mod.rs index d64c414..d64f012 100644 --- a/src/concepts/mod.rs +++ b/src/concepts/mod.rs @@ -8,7 +8,7 @@ mod stream; pub use action::Action; pub use config::{Config, Patterns}; pub use filter::Filter; -pub use parse_duration::parse_duration; +use parse_duration::parse_duration; pub use pattern::Pattern; pub use stream::Stream; diff --git a/src/concepts/stream.rs b/src/concepts/stream.rs index d8a4e47..64afc91 100644 --- a/src/concepts/stream.rs +++ b/src/concepts/stream.rs @@ -66,7 +66,7 @@ impl Stream { } #[cfg(test)] - pub fn from_filters(filters: BTreeMap, name: &str) -> Self { + fn from_filters(filters: BTreeMap, name: &str) -> Self { Self { filters, name: name.to_string(), @@ -98,7 +98,7 @@ impl Hash for Stream { } #[cfg(test)] -pub mod tests { +mod tests { use super::*; use crate::concepts::filter::tests::ok_filter; @@ -111,7 +111,7 @@ pub mod tests { } } - pub fn ok_stream() -> Stream { + fn ok_stream() -> Stream { let mut stream = default_stream(); stream.cmd = vec!["command".into()]; stream.filters.insert("name".into(), ok_filter()); diff --git a/src/daemon/filter.rs b/src/daemon/filter.rs index 4e7963d..21a84a5 100644 --- a/src/daemon/filter.rs +++ b/src/daemon/filter.rs @@ -239,7 +239,7 @@ impl FilterManager { if self.has_after { let mut exec_needed = false; self.triggers - .fetch_and_update(&m, |map| { + .fetch_and_update(m, |map| { map.map(|mut map| { if let Some(counter) = map.get(&t) { exec_needed = true; @@ -273,11 +273,7 @@ impl FilterManager { // FIXME do this in a transaction #[allow(clippy::unwrap_used)] // second unwrap: we just checked in the condition that first is_some - let (t, m) = self - .ordered_times - .pop_min() - .unwrap() - .unwrap(); + let (t, m) = self.ordered_times.pop_min().unwrap().unwrap(); self.matches .fetch_and_update(&m, |set| { let mut set = set.unwrap(); @@ -289,11 +285,7 @@ impl FilterManager { } fn get_times(&self, m: &Match) -> usize { - self.matches - .get(m) - .unwrap() - .map(|v| v.len()) - .unwrap_or(0) + self.matches.get(m).unwrap().map(|v| v.len()).unwrap_or(0) } fn clear_past_triggers_and_schedule_future_actions(&self, now: Time) { @@ -314,9 +306,7 @@ impl FilterManager { self.triggers.remove(&m); } else { // Insert back the upcoming times - let _ = self - .triggers - .insert(&m, &new_map); + let _ = self.triggers.insert(&m, &new_map); // Schedule the upcoming times for t in new_map.into_keys() { diff --git a/src/daemon/shutdown.rs b/src/daemon/shutdown.rs index da1fc6b..b601346 100644 --- a/src/daemon/shutdown.rs +++ b/src/daemon/shutdown.rs @@ -54,7 +54,7 @@ pub struct ShutdownToken { } impl ShutdownToken { - pub fn new(shutdown_notifyer: CancellationToken, _task_tracker: mpsc::Sender<()>) -> Self { + fn new(shutdown_notifyer: CancellationToken, _task_tracker: mpsc::Sender<()>) -> Self { Self { shutdown_notifyer, _task_tracker, diff --git a/src/daemon/sledext.rs b/src/daemon/sledext.rs index 15a5b5f..bbdb5f2 100644 --- a/src/daemon/sledext.rs +++ b/src/daemon/sledext.rs @@ -43,12 +43,12 @@ fn filter_triggers_tree_name(filter: &Filter) -> String { impl SledDbExt for sled::Db { fn open_filter_matches_tree(&self, filter: &Filter) -> Result>> { self.open_tree(filter_matches_tree_name(filter).as_bytes()) - .map(|tree| Tree::new(tree)) + .map(Tree::new) } fn open_filter_ordered_times_tree(&self, filter: &Filter) -> Result> { self.open_tree(filter_ordered_times_tree_name(filter).as_bytes()) - .map(|tree| Tree::new(tree)) + .map(Tree::new) } fn open_filter_triggers_tree( @@ -56,7 +56,7 @@ impl SledDbExt for sled::Db { filter: &Filter, ) -> Result>> { self.open_tree(filter_triggers_tree_name(filter).as_bytes()) - .map(|tree| Tree::new(tree)) + .map(Tree::new) } fn cleanup_unused_trees(&self, config: &Config) { diff --git a/src/protocol/messages.rs b/src/protocol/messages.rs index d072d7b..be6f926 100644 --- a/src/protocol/messages.rs +++ b/src/protocol/messages.rs @@ -6,8 +6,6 @@ use serde::{ Serialize, }; -use crate::concepts::Match; - // We don't need protocol versionning here because // client and daemon are the same binary @@ -24,12 +22,6 @@ pub struct ClientRequest { pub patterns: Vec<(String, String)>, } -#[derive(Clone, Serialize, Deserialize)] -pub struct FlushOptions { - pub m: Match, - pub f: (String, String), -} - #[derive(Serialize, Deserialize)] pub enum DaemonResponse { Order(ClientStatus), From fe1a93b8a23f4b6b756eb930209a6a4fa526a323 Mon Sep 17 00:00:00 2001 From: ppom Date: Sun, 23 Feb 2025 12:00:00 +0100 Subject: [PATCH 004/308] Remove more dead code Client/Daemon communication is now JSON so we don't need This quirks anymore --- src/concepts/filter.rs | 1 - src/{protocol/messages.rs => protocol.rs} | 0 src/protocol/mod.rs | 5 --- src/protocol/serialization.rs | 52 ----------------------- 4 files changed, 58 deletions(-) rename src/{protocol/messages.rs => protocol.rs} (100%) delete mode 100644 src/protocol/mod.rs delete mode 100644 src/protocol/serialization.rs diff --git a/src/concepts/filter.rs b/src/concepts/filter.rs index 14a0791..112212c 100644 --- a/src/concepts/filter.rs +++ b/src/concepts/filter.rs @@ -44,7 +44,6 @@ pub struct Filter { } impl Filter { - pub fn name(&self) -> &str { &self.name } diff --git a/src/protocol/messages.rs b/src/protocol.rs similarity index 100% rename from src/protocol/messages.rs rename to src/protocol.rs diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs deleted file mode 100644 index 9dfbe69..0000000 --- a/src/protocol/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod messages; -mod serialization; - -pub use messages::*; -pub use serialization::*; diff --git a/src/protocol/serialization.rs b/src/protocol/serialization.rs deleted file mode 100644 index e1e23d7..0000000 --- a/src/protocol/serialization.rs +++ /dev/null @@ -1,52 +0,0 @@ -use std::marker::PhantomData; - -use serde::de::DeserializeOwned; -use thiserror::Error; -use tokio_util::codec::{Decoder, LengthDelimitedCodec}; - -#[derive(Error, Debug)] -pub enum DecodeError { - #[error("{0}")] - Io(#[from] tokio::io::Error), - #[error("{0}")] - Json(#[from] serde_json::Error), -} - -pub struct LengthPrefixedBincode { - inner: LengthDelimitedCodec, - _marker: PhantomData, -} - -impl LengthPrefixedBincode { - pub fn new() -> Self { - LengthPrefixedBincode { - inner: LengthDelimitedCodec::new(), - _marker: PhantomData, - } - } -} - -impl Default for LengthPrefixedBincode { - fn default() -> Self { - Self::new() - } -} - -impl Decoder for LengthPrefixedBincode { - type Item = T; - type Error = DecodeError; - - fn decode( - &mut self, - src: &mut tokio_util::bytes::BytesMut, - ) -> Result, Self::Error> { - match self.inner.decode(src) { - Ok(Some(data)) => match serde_json::from_slice(&data) { - Ok(thing) => Ok(Some(thing)), - Err(err) => Err(err.into()), - }, - Ok(None) => Ok(None), - Err(err) => Err(err.into()), - } - } -} From b448089f5881169d3ee84b8abad0eacc765f81eb Mon Sep 17 00:00:00 2001 From: ppom Date: Tue, 25 Feb 2025 12:00:00 +0100 Subject: [PATCH 005/308] Added daemon tests; parse_duration now supports milliseconds TODO test persistance handling on FilterManager --- src/concepts/action.rs | 36 +++- src/concepts/filter.rs | 44 ++++ src/concepts/mod.rs | 3 + src/concepts/parse_duration.rs | 13 +- src/concepts/pattern.rs | 19 ++ src/concepts/stream.rs | 9 - src/daemon/filter.rs | 379 +++++++++++++++++++++++++++++++-- src/daemon/mod.rs | 5 +- src/daemon/sledext.rs | 84 +++++++- src/daemon/socket.rs | 4 +- src/daemon/stream.rs | 4 +- src/tests.rs | 21 ++ 12 files changed, 587 insertions(+), 34 deletions(-) diff --git a/src/concepts/action.rs b/src/concepts/action.rs index 8a659ef..dc6d62a 100644 --- a/src/concepts/action.rs +++ b/src/concepts/action.rs @@ -8,7 +8,7 @@ use tokio::process::Command; use super::parse_duration; use super::{Match, Pattern}; -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Default, Deserialize)] #[serde(deny_unknown_fields)] pub struct Action { cmd: Vec, @@ -149,6 +149,40 @@ impl Display for Action { } } +#[cfg(test)] +impl Action { + /// Test-only constructor designed to be easy to call + pub fn new( + cmd: Vec<&str>, + after: Option<&str>, + on_exit: bool, + stream_name: &str, + filter_name: &str, + name: &str, + config_patterns: &super::Patterns, + ) -> Self { + let mut action = Self { + cmd: cmd.into_iter().map(|s| s.into()).collect(), + after: after.map(|s| s.into()), + on_exit, + ..Default::default() + }; + action + .setup( + stream_name, + filter_name, + name, + config_patterns + .clone() + .into_values() + .collect::>() + .into(), + ) + .unwrap(); + action + } +} + #[allow(clippy::unwrap_used)] #[cfg(test)] pub mod tests { diff --git a/src/concepts/filter.rs b/src/concepts/filter.rs index 112212c..01cdaf7 100644 --- a/src/concepts/filter.rs +++ b/src/concepts/filter.rs @@ -232,6 +232,50 @@ impl Hash for Filter { } } +#[cfg(test)] +impl Filter { + /// Test-only constructor designed to be easy to call + pub fn new( + actions: Vec, + regex: Vec<&str>, + retry: Option, + retry_period: Option<&str>, + stream_name: &str, + name: &str, + config_patterns: &Patterns, + ) -> Self { + let mut filter = Self { + actions: actions.into_iter().map(|a| (a.name().into(), a)).collect(), + regex: regex.into_iter().map(|s| s.into()).collect(), + retry, + retry_period: retry_period.map(|s| s.into()), + ..Default::default() + }; + filter.setup(&stream_name, &name, config_patterns).unwrap(); + filter + } + + pub fn new_static( + actions: Vec, + regex: Vec<&str>, + retry: Option, + retry_period: Option<&str>, + stream_name: &str, + name: &str, + config_patterns: &Patterns, + ) -> &'static Self { + Box::leak(Box::new(Self::new( + actions, + regex, + retry, + retry_period, + stream_name, + name, + config_patterns, + ))) + } +} + #[allow(clippy::unwrap_used)] #[cfg(test)] pub mod tests { diff --git a/src/concepts/mod.rs b/src/concepts/mod.rs index d64f012..d388b20 100644 --- a/src/concepts/mod.rs +++ b/src/concepts/mod.rs @@ -16,3 +16,6 @@ use chrono::{DateTime, Local}; pub type Time = DateTime; pub type Match = Vec; + +#[cfg(test)] +pub use filter::tests as filter_tests; diff --git a/src/concepts/parse_duration.rs b/src/concepts/parse_duration.rs index f05ee8b..b5b5760 100644 --- a/src/concepts/parse_duration.rs +++ b/src/concepts/parse_duration.rs @@ -12,15 +12,14 @@ pub fn parse_duration(d: &str) -> Result { if i == 0 { return Err(format!("duration '{}' doesn't start with digits", d)); } - let ok_secs = |mul: u32| -> Result { - Ok(TimeDelta::seconds(mul as i64 * value as i64)) - }; + let ok_as = |func: fn(i64) -> TimeDelta| -> Result<_, String> { Ok(func(value as i64)) }; match d_trimmed[i..].trim() { - "s" | "sec" | "secs" | "second" | "seconds" => ok_secs(1), - "m" | "min" | "mins" | "minute" | "minutes" => ok_secs(60), - "h" | "hour" | "hours" => ok_secs(60 * 60), - "d" | "day" | "days" => ok_secs(24 * 60 * 60), + "ms" | "millis" | "millisecond" | "milliseconds" => ok_as(TimeDelta::milliseconds), + "s" | "sec" | "secs" | "second" | "seconds" => ok_as(TimeDelta::seconds), + "m" | "min" | "mins" | "minute" | "minutes" => ok_as(TimeDelta::minutes), + "h" | "hour" | "hours" => ok_as(TimeDelta::hours), + "d" | "day" | "days" => ok_as(TimeDelta::days), unit => Err(format!( "unit {} not recognised. must be one of s/sec/seconds, m/min/minutes, h/hours, d/days", unit diff --git a/src/concepts/pattern.rs b/src/concepts/pattern.rs index 00e0230..c248258 100644 --- a/src/concepts/pattern.rs +++ b/src/concepts/pattern.rs @@ -116,6 +116,25 @@ impl PartialEq for Pattern { } } +#[cfg(test)] +impl Pattern { + /// Test-only constructor designed to be easy to call + pub fn new(name: &str, regex: &str) -> Result { + let mut pattern = Self { + regex: regex.into(), + ..Default::default() + }; + pattern.setup(name)?; + Ok(pattern) + } + + /// Test-only constructor designed to be easy to call. + /// Constructs a full super::Paterns collection with one given pattern + pub fn new_map(name: &str, regex: &str) -> Result { + Ok(std::iter::once((name.into(), Self::new(name, regex)?.into())).collect()) + } +} + #[allow(clippy::unwrap_used)] #[cfg(test)] pub mod tests { diff --git a/src/concepts/stream.rs b/src/concepts/stream.rs index 64afc91..b57c28e 100644 --- a/src/concepts/stream.rs +++ b/src/concepts/stream.rs @@ -64,15 +64,6 @@ impl Stream { Ok(()) } - - #[cfg(test)] - fn from_filters(filters: BTreeMap, name: &str) -> Self { - Self { - filters, - name: name.to_string(), - ..Default::default() - } - } } impl PartialEq for Stream { diff --git a/src/daemon/filter.rs b/src/daemon/filter.rs index 21a84a5..7f88c22 100644 --- a/src/daemon/filter.rs +++ b/src/daemon/filter.rs @@ -4,7 +4,6 @@ use std::{ sync::Arc, }; -use chrono::Local; use regex::Regex; use tokio::sync::Semaphore; use tracing::{error, info}; @@ -42,6 +41,7 @@ impl FilterManager { exec_limit: Option>, shutdown: ShutdownToken, db: &sled::Db, + now: Time, ) -> Result { let manager = Self { filter, @@ -52,20 +52,21 @@ impl FilterManager { ordered_times: db.open_filter_ordered_times_tree(filter)?, triggers: db.open_filter_triggers_tree(filter)?, }; - let now = Local::now(); manager.clear_past_matches(now); manager.clear_past_triggers_and_schedule_future_actions(now); Ok(manager) } - pub fn handle_line(&self, line: &str) { + pub fn handle_line(&self, line: &str, now: Time) -> bool { if let Some(match_) = self.filter.get_match(line) { - self.handle_match(match_); + self.handle_match(match_, now); + true + } else { + false } } - fn handle_match(&self, m: Match) { - let now = Local::now(); + fn handle_match(&self, m: Match, now: Time) { self.clear_past_matches(now); let exec = match self.filter.retry() { @@ -80,7 +81,7 @@ impl FilterManager { if exec { self.remove_match(&m); self.add_trigger(&m, now); - self.schedule_exec(m.clone(), now); + self.schedule_exec(m.clone(), now, now); } } @@ -88,6 +89,7 @@ impl FilterManager { &self, patterns: &BTreeMap, Regex>, order: Order, + now: Time, ) -> BTreeMap { let is_match = |match_: &Match| { match_ @@ -118,7 +120,6 @@ impl FilterManager { }) .collect(); - let now = Local::now(); for (match_, times) in self .triggers .iter() @@ -158,13 +159,12 @@ impl FilterManager { /// Schedule execution for a given Action and Match. /// We check first if the trigger is still here /// because pending actions can be flushed. - fn schedule_exec(&self, m: Match, t: Time) { - let now = Local::now(); + fn schedule_exec(&self, m: Match, t: Time, now: Time) { for action in self.filter.actions().values() { let exec_time = t + action.after_duration().unwrap_or_default(); let m = m.clone(); - if exec_time < now { + if exec_time <= now { if self.decrement_trigger(&m, t) { self.exec_now(action, m); } @@ -310,7 +310,7 @@ impl FilterManager { // Schedule the upcoming times for t in new_map.into_keys() { - self.schedule_exec(m.clone(), t); + self.schedule_exec(m.clone(), t, now); } } } @@ -342,3 +342,358 @@ impl FilterManager { }); } } + +#[allow(clippy::unwrap_used)] +#[cfg(test)] +mod tests { + use std::{ + collections::{BTreeMap, BTreeSet}, + fs::read_to_string, + sync::Arc, + time::Duration, + }; + + use chrono::{Local, TimeDelta}; + use tempfile::TempPath; + use tokio::sync::Semaphore; + + use super::{FilterManager, SledDbExt}; + use crate::{ + concepts::{Action, Filter, Match, Pattern, Patterns, Time}, + daemon::{shutdown::ShutdownController, Tree}, + }; + + struct TestBed { + pub _out_path: TempPath, + pub out_file: String, + pub az_patterns: Patterns, + } + + impl TestBed { + fn new() -> Self { + let _out_path = tempfile::NamedTempFile::new().unwrap().into_temp_path(); + let out_file = _out_path.to_str().unwrap().to_string(); + + let az_patterns = Pattern::new_map("az", "[a-z]+").unwrap(); + Self { + _out_path, + out_file, + az_patterns, + } + } + + fn part2(self, filter: &'static Filter, now: Time) -> TestBed2 { + let db = crate::tests::TempDb::new(); + let controller = ShutdownController::new(); + let semaphore = Arc::new(Semaphore::new(1)); + TestBed2 { + _out_path: self._out_path, + out_file: self.out_file, + now, + matches: db.open_filter_matches_tree(filter).unwrap(), + ordered_times: db.open_filter_ordered_times_tree(filter).unwrap(), + triggers: db.open_filter_triggers_tree(filter).unwrap(), + manager: FilterManager::new( + &filter, + Some(semaphore.clone()), + controller.token(), + &db, + now, + ) + .unwrap(), + // db, + // controller, + semaphore, + } + } + } + + struct TestBed2 { + pub _out_path: TempPath, + pub out_file: String, + // pub db: TempDb, + // pub controller: ShutdownController, + pub semaphore: Arc, + pub now: Time, + pub matches: Tree>, + pub ordered_times: Tree, + pub triggers: Tree>, + pub manager: FilterManager, + } + + impl TestBed2 { + fn assert_empty_trees(&self) { + assert!( + self.matches.iter().next().is_none(), + "matches must be empty" + ); + assert!( + self.ordered_times.iter().next().is_none(), + "ordered_times must be empty" + ); + assert!( + self.triggers.iter().next().is_none(), + "triggers must be empty" + ); + } + } + + #[tokio::test] + async fn three_matches_then_action_then_delayed_action() { + let bed = TestBed::new(); + let filter = Filter::new_static( + vec![ + Action::new( + vec!["sh", "-c", &format!("echo a1 >> {}", &bed.out_file)], + None, + false, + "test", + "test", + "a1", + &bed.az_patterns, + ), + Action::new( + vec!["sh", "-c", &format!("echo a2 >> {}", &bed.out_file)], + Some("100ms"), + false, + "test", + "test", + "a2", + &bed.az_patterns, + ), + ], + vec!["test "], + Some(3), + Some("2s"), + "test", + "test", + &bed.az_patterns, + ); + + let bed = bed.part2(filter, Local::now()); + + let now = bed.now; + let now1s = bed.now + TimeDelta::seconds(1); + let now2s = bed.now + TimeDelta::seconds(2); + + // No match + assert!(!bed.manager.handle_line("test 131", now)); + bed.assert_empty_trees(); + + // First match + assert!(bed.manager.handle_line("test one", now)); + let one = vec!["one".to_string()]; + assert_eq!( + bed.matches.as_map(), + BTreeMap::from([(one.clone(), BTreeSet::from([now]))]), + "the match has been added to matches" + ); + assert_eq!( + bed.ordered_times.as_map(), + BTreeMap::from([(now, one.clone())]), + "the match has been added to ordered_times" + ); + assert!( + bed.triggers.iter().next().is_none(), + "triggers is still empty" + ); + + // Second match + assert!(bed.manager.handle_line("test one", now1s)); + assert_eq!( + bed.matches.as_map(), + BTreeMap::from([(one.clone(), BTreeSet::from([now, now1s]))]), + "a second match is present in matches" + ); + assert_eq!( + bed.ordered_times.as_map(), + BTreeMap::from([(now, one.clone()), (now1s, one.clone())]), + "a second match is present in ordered_times" + ); + assert!( + bed.triggers.iter().next().is_none(), + "triggers is still empty" + ); + + // Third match, exec + let _block = bed.semaphore.acquire().await.unwrap(); + bed.manager.handle_line("test one", now2s); + assert!( + bed.matches.iter().next().is_none(), + "matches are emptied after trigger" + ); + assert!( + bed.ordered_times.iter().next().is_none(), + "ordered_times are emptied after trigger" + ); + assert_eq!( + bed.triggers.as_map(), + BTreeMap::from([(one.clone(), BTreeMap::from([(now2s, 1)]))]), + "triggers now contain the triggered match with 1 action left" // 1 and not 2 because the decrement_trigger() doesn't wait for the semaphore + ); + drop(_block); + + // Now the first action executes + tokio::time::sleep(Duration::from_millis(40)).await; + // Check first action + assert_eq!( + bed.triggers.as_map(), + BTreeMap::from([(one.clone(), BTreeMap::from([(now2s, 1)]))]), + "triggers still contain the triggered match with 1 action left" + ); + assert_eq!( + "a1 one\n", + &read_to_string(&bed.out_file).unwrap(), + "the output file contains the result of the first action" + ); + + // Now the second action executes + tokio::time::sleep(Duration::from_millis(100)).await; + // Check second action + assert!( + bed.triggers.iter().next().is_none(), + "triggers are empty again" + ); + assert_eq!( + "a1 one\na2 one\n", + &read_to_string(&bed.out_file).unwrap(), + "the output file contains the result of the 2 actions" + ); + + bed.assert_empty_trees(); + } + + #[tokio::test] + async fn one_match_one_action() { + let bed = TestBed::new(); + let filter = Filter::new_static( + vec![Action::new( + vec!["sh", "-c", &format!("echo a1 >> {}", &bed.out_file)], + None, + false, + "test", + "test", + "a1", + &bed.az_patterns, + )], + vec!["test "], + None, + None, + "test", + "test", + &bed.az_patterns, + ); + + let bed = bed.part2(filter, Local::now()); + let now = bed.now; + + // No match + assert!(!bed.manager.handle_line("test 131", now)); + assert!( + bed.matches.iter().next().is_none(), + "matches must be initially empty" + ); + assert!( + bed.ordered_times.iter().next().is_none(), + "ordered_times must be initially empty" + ); + assert!( + bed.triggers.iter().next().is_none(), + "triggers must be initially empty" + ); + + // match + assert!(bed.manager.handle_line("test one", now)); + assert!(bed.matches.iter().next().is_none(), "matches stay empty"); + assert!( + bed.ordered_times.iter().next().is_none(), + "ordered_times stay empty" + ); + assert!(bed.triggers.iter().next().is_none(), "triggers stay empty"); + + // the action executes + tokio::time::sleep(Duration::from_millis(40)).await; + assert_eq!( + "a1 one\n", + &read_to_string(&bed.out_file).unwrap(), + "the output file contains the result of the first action" + ); + + bed.assert_empty_trees(); + } + + #[tokio::test] + async fn one_match_one_delayed_action() { + let bed = TestBed::new(); + let filter = Filter::new_static( + vec![Action::new( + vec!["sh", "-c", &format!("echo a1 >> {}", &bed.out_file)], + Some("100ms"), + false, + "test", + "test", + "a1", + &bed.az_patterns, + )], + vec!["test "], + None, + None, + "test", + "test", + &bed.az_patterns, + ); + + let bed = bed.part2(filter, Local::now()); + let now = bed.now; + + // No match + assert!(!bed.manager.handle_line("test 131", now)); + assert!( + bed.matches.iter().next().is_none(), + "matches must be initially empty" + ); + assert!( + bed.ordered_times.iter().next().is_none(), + "ordered_times must be initially empty" + ); + assert!( + bed.triggers.iter().next().is_none(), + "triggers must be initially empty" + ); + + // Match + let one = vec!["one".to_string()]; + assert!(bed.manager.handle_line("test one", now)); + assert!(bed.matches.iter().next().is_none(), "matches stay empty"); + assert!( + bed.ordered_times.iter().next().is_none(), + "ordered_times stay empty" + ); + assert_eq!( + bed.triggers.as_map(), + BTreeMap::from([(one.clone(), BTreeMap::from([(now, 1)]))]), + "triggers still contain the triggered match with 1 action left" + ); + assert_eq!( + "", + &read_to_string(&bed.out_file).unwrap(), + "the output file is empty" + ); + + // The action executes + tokio::time::sleep(Duration::from_millis(140)).await; + assert!( + bed.triggers.iter().next().is_none(), + "triggers are empty again" + ); + assert_eq!( + "a1 one\n", + &read_to_string(&bed.out_file).unwrap(), + "the output file contains the result of the action" + ); + + bed.assert_empty_trees(); + } + + // TODO test persistance, ie. FilterManagers created with non-empty db +} diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 6c64f04..d8a5f20 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -8,6 +8,7 @@ use std::{ }, }; +use chrono::Local; use tokio::{ select, signal::unix::{signal, SignalKind}, @@ -61,11 +62,13 @@ pub async fn daemon( db.cleanup_unused_trees(config); // Filter managers + let now = Local::now(); let mut state = HashMap::new(); for stream in config.streams().values() { let mut filter_managers = HashMap::new(); for filter in stream.filters().values() { - let manager = FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &db)?; + let manager = + FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &db, now)?; filter_managers.insert(filter, manager); } state.insert(stream, filter_managers.clone()); diff --git a/src/daemon/sledext.rs b/src/daemon/sledext.rs index bbdb5f2..2180e84 100644 --- a/src/daemon/sledext.rs +++ b/src/daemon/sledext.rs @@ -94,14 +94,14 @@ impl SledDbExt for sled::Db { /// business logic. /// Key and value types must be [`serde::Serialize`] and [`serde::Deserialize`]. #[derive(Clone)] -pub struct Tree { +pub struct Tree { tree: sled::Tree, _k_marker: PhantomData, _v_marker: PhantomData, } #[allow(clippy::unwrap_used)] -impl Tree { +impl Tree { fn new(tree: sled::Tree) -> Self { Self { tree, @@ -182,4 +182,84 @@ impl Tree BTreeMap { + self.iter().collect() + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use chrono::{Local, TimeDelta}; + + use super::SledDbExt; + use crate::{concepts::filter_tests::ok_filter, tests::TempDb}; + + #[test] + fn tree_crud() { + let filter = ok_filter(); + let db = TempDb::new(); + let triggers = db.open_filter_triggers_tree(&filter).unwrap(); + assert_eq!(BTreeMap::default(), triggers.as_map()); + + let now = Local::now(); + let then = now + TimeDelta::seconds(2); + + let k1 = vec!["a".into()]; + let k2 = vec!["a".into(), "b".into()]; + + let v1 = BTreeMap::from([(now, 4)]); + let v2 = BTreeMap::from([(then, 2)]); + + let map_1 = BTreeMap::from([(k1.clone(), v1.clone())]); + let map_2 = BTreeMap::from([(k2.clone(), v2.clone())]); + let map_1_2 = BTreeMap::from([(k1.clone(), v1.clone()), (k2.clone(), v2.clone())]); + + triggers.insert(&k1, &v1).unwrap(); + assert_eq!(triggers.as_map(), map_1); + assert_eq!(triggers.get(&k1).unwrap(), Some(v1.clone())); + assert_eq!(triggers.get(&k2).unwrap(), None); + + triggers.insert(&k2, &v2).unwrap(); + assert_eq!(triggers.as_map(), map_1_2); + assert_eq!(triggers.get(&k1).unwrap(), Some(v1.clone())); + assert_eq!(triggers.get(&k2).unwrap(), Some(v2.clone())); + + assert_eq!(triggers.remove(&k1), Some(v1.clone())); + assert_eq!(triggers.as_map(), map_2); + assert_eq!(triggers.get(&k1).unwrap(), None); + assert_eq!(triggers.get(&k2).unwrap(), Some(v2.clone())); + + // Add back + triggers + .fetch_and_update(&k1, |map| { + let mut map = map.unwrap_or_default(); + map.insert(now, 4); + Some(map) + }) + .unwrap(); + assert_eq!(triggers.as_map(), map_1_2); + assert_eq!(triggers.get(&k1).unwrap(), Some(v1.clone())); + assert_eq!(triggers.get(&k2).unwrap(), Some(v2.clone())); + + // Remove + triggers + .fetch_and_update(&k1, |map| match map { + Some(_) => None, + None => Some(v1.clone()), + }) + .unwrap(); + assert_eq!(triggers.as_map(), map_2); + assert_eq!(triggers.get(&k1).unwrap(), None); + assert_eq!(triggers.get(&k2).unwrap(), Some(v2.clone())); + + // Remove + triggers.fetch_and_update(&k2, |_| None).unwrap(); + assert_eq!(triggers.as_map(), BTreeMap::default()); + assert_eq!(triggers.get(&k1).unwrap(), None); + assert_eq!(triggers.get(&k2).unwrap(), None); + } } diff --git a/src/daemon/socket.rs b/src/daemon/socket.rs index f925b56..178d7f6 100644 --- a/src/daemon/socket.rs +++ b/src/daemon/socket.rs @@ -6,6 +6,7 @@ use std::{ sync::Arc, }; +use chrono::Local; use futures::{SinkExt, StreamExt}; use regex::Regex; use tokio::net::UnixListener; @@ -87,6 +88,7 @@ fn answer_order( }) .collect::, Regex>, String>>()?; + let now = Local::now(); let cs: ClientStatus = shared_state .iter() // stream filtering @@ -115,7 +117,7 @@ fn answer_order( .map(|(filter, manager)| { ( filter.name().to_owned(), - manager.handle_order(&patterns, options.order), + manager.handle_order(&patterns, options.order, now), ) }) .collect(); diff --git a/src/daemon/stream.rs b/src/daemon/stream.rs index b9eeaa9..f78662c 100644 --- a/src/daemon/stream.rs +++ b/src/daemon/stream.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, process::Stdio, task::Poll, time::Duration}; +use chrono::Local; use futures::{FutureExt, StreamExt}; use tokio::{ io::{AsyncBufReadExt, BufReader, Lines}, @@ -125,8 +126,9 @@ async fn handle_io( loop { match lines.next().await { Some(Ok(line)) => { + let now = Local::now(); for manager in filter_managers.values() { - manager.handle_line(&line); + manager.handle_line(&line, now); } } Some(Err(err)) => { diff --git a/src/tests.rs b/src/tests.rs index 7b92a75..9b01996 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -55,3 +55,24 @@ impl Deref for Fixture { self.path.deref() } } + +pub struct TempDb { + db: sled::Db, + _tempdir: TempDir, +} + +impl TempDb { + pub fn new() -> Self { + let _tempdir = TempDir::new().unwrap(); + let db = sled::open(_tempdir.path()).unwrap(); + TempDb { _tempdir, db } + } +} + +impl Deref for TempDb { + type Target = sled::Db; + + fn deref(&self) -> &Self::Target { + &self.db + } +} From 859e35e5c3a562e8fe2543e70909cb13c84990ec Mon Sep 17 00:00:00 2001 From: ppom Date: Wed, 26 Feb 2025 12:00:00 +0100 Subject: [PATCH 006/308] Speed up tests now that we handle millisecond precision Previously database cropped to the second precision Now it keeps millisecond precision and handle millisecond units --- tests/simple.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/tests/simple.rs b/tests/simple.rs index 30ae52e..70595be 100644 --- a/tests/simple.rs +++ b/tests/simple.rs @@ -35,7 +35,7 @@ fn config_with_cmd(config_path: &str, cmd: &str) { filter1: { regex: ['here is '], retry: 2, - retryperiod: '20s', + retryperiod: '2s', actions: { // Don't mix code and data at home! // You may permit arbitrary execution from vilains, @@ -46,7 +46,7 @@ fn config_with_cmd(config_path: &str, cmd: &str) { }, action2: { cmd: ['sh', '-c', 'echo del >> ./out.txt'], - after: '3min', + after: '30s', onexit: false, }, } @@ -76,7 +76,7 @@ async fn simple() { config_with_cmd( config_path, - "for i in 12 24 36 24 36 12 12 24 56 67; do echo here is $i; sleep 0.1; done; sleep 2", + "for i in 12 24 36 24 36 12 12 24 56 67; do echo here is $i; sleep 0.01; done; sleep 0.15", ); file_with_contents(out_path, ""); @@ -95,12 +95,9 @@ async fn simple() { // Run the flushes - // We sleep for the time the echoes are finished + 1 second - // This ensures that the subsecond precision lost from de/serialization - // never causes the flush to be interpreted as anterior to the match - + // We sleep for the time the echoes are finished + a bit (100ms) let handle2 = tokio::spawn(async move { - sleep(Duration::from_millis(2500)).await; + sleep(Duration::from_millis(200)).await; request( socket_path.into(), Format::JSON, @@ -112,7 +109,7 @@ async fn simple() { }); let handle3 = tokio::spawn(async move { - sleep(Duration::from_millis(2500)).await; + sleep(Duration::from_millis(220)).await; request( socket_path.into(), Format::JSON, @@ -143,7 +140,7 @@ async fn simple() { config_with_cmd( config_path, - "for i in 12 24 36 56 67; do echo here is $i; sleep 0.1; done", + "for i in 12 24 36 56 67; do echo here is $i; sleep 0.01; done", ); file_with_contents(out_path, ""); @@ -180,7 +177,7 @@ async fn simple() { // echo numbers twice, once on stdout, once on stderr config_with_cmd( config_path, - "for i in 1 2 3 4 5 6 7 8 9; do echo here is $i; echo here is $i 1>&2; sleep 0.1; done; sleep 1", + "for i in 1 2 3 4 5 6 7 8 9; do echo here is $i; echo here is $i 1>&2; sleep 0.01; done", ); file_with_contents(out_path, ""); From b655ef10089943d615a8c6c8ce6ab367abcf8f19 Mon Sep 17 00:00:00 2001 From: ppom Date: Wed, 26 Feb 2025 12:00:00 +0100 Subject: [PATCH 007/308] Move tests, add tests, fix memory leak - Move FilterManager tests to own file - Add some persistance tests - Fix memory leak where entry like (Match, EmptySet) would be kept in DB (instead of discarding it) --- src/daemon/filter.rs | 359 +----------------------------- src/daemon/filter/tests.rs | 438 +++++++++++++++++++++++++++++++++++++ 2 files changed, 444 insertions(+), 353 deletions(-) create mode 100644 src/daemon/filter/tests.rs diff --git a/src/daemon/filter.rs b/src/daemon/filter.rs index 7f88c22..2b0f766 100644 --- a/src/daemon/filter.rs +++ b/src/daemon/filter.rs @@ -278,7 +278,11 @@ impl FilterManager { .fetch_and_update(&m, |set| { let mut set = set.unwrap(); set.remove(&t); - Some(set) + if set.is_empty() { + None + } else { + Some(set) + } }) .unwrap(); } @@ -345,355 +349,4 @@ impl FilterManager { #[allow(clippy::unwrap_used)] #[cfg(test)] -mod tests { - use std::{ - collections::{BTreeMap, BTreeSet}, - fs::read_to_string, - sync::Arc, - time::Duration, - }; - - use chrono::{Local, TimeDelta}; - use tempfile::TempPath; - use tokio::sync::Semaphore; - - use super::{FilterManager, SledDbExt}; - use crate::{ - concepts::{Action, Filter, Match, Pattern, Patterns, Time}, - daemon::{shutdown::ShutdownController, Tree}, - }; - - struct TestBed { - pub _out_path: TempPath, - pub out_file: String, - pub az_patterns: Patterns, - } - - impl TestBed { - fn new() -> Self { - let _out_path = tempfile::NamedTempFile::new().unwrap().into_temp_path(); - let out_file = _out_path.to_str().unwrap().to_string(); - - let az_patterns = Pattern::new_map("az", "[a-z]+").unwrap(); - Self { - _out_path, - out_file, - az_patterns, - } - } - - fn part2(self, filter: &'static Filter, now: Time) -> TestBed2 { - let db = crate::tests::TempDb::new(); - let controller = ShutdownController::new(); - let semaphore = Arc::new(Semaphore::new(1)); - TestBed2 { - _out_path: self._out_path, - out_file: self.out_file, - now, - matches: db.open_filter_matches_tree(filter).unwrap(), - ordered_times: db.open_filter_ordered_times_tree(filter).unwrap(), - triggers: db.open_filter_triggers_tree(filter).unwrap(), - manager: FilterManager::new( - &filter, - Some(semaphore.clone()), - controller.token(), - &db, - now, - ) - .unwrap(), - // db, - // controller, - semaphore, - } - } - } - - struct TestBed2 { - pub _out_path: TempPath, - pub out_file: String, - // pub db: TempDb, - // pub controller: ShutdownController, - pub semaphore: Arc, - pub now: Time, - pub matches: Tree>, - pub ordered_times: Tree, - pub triggers: Tree>, - pub manager: FilterManager, - } - - impl TestBed2 { - fn assert_empty_trees(&self) { - assert!( - self.matches.iter().next().is_none(), - "matches must be empty" - ); - assert!( - self.ordered_times.iter().next().is_none(), - "ordered_times must be empty" - ); - assert!( - self.triggers.iter().next().is_none(), - "triggers must be empty" - ); - } - } - - #[tokio::test] - async fn three_matches_then_action_then_delayed_action() { - let bed = TestBed::new(); - let filter = Filter::new_static( - vec![ - Action::new( - vec!["sh", "-c", &format!("echo a1 >> {}", &bed.out_file)], - None, - false, - "test", - "test", - "a1", - &bed.az_patterns, - ), - Action::new( - vec!["sh", "-c", &format!("echo a2 >> {}", &bed.out_file)], - Some("100ms"), - false, - "test", - "test", - "a2", - &bed.az_patterns, - ), - ], - vec!["test "], - Some(3), - Some("2s"), - "test", - "test", - &bed.az_patterns, - ); - - let bed = bed.part2(filter, Local::now()); - - let now = bed.now; - let now1s = bed.now + TimeDelta::seconds(1); - let now2s = bed.now + TimeDelta::seconds(2); - - // No match - assert!(!bed.manager.handle_line("test 131", now)); - bed.assert_empty_trees(); - - // First match - assert!(bed.manager.handle_line("test one", now)); - let one = vec!["one".to_string()]; - assert_eq!( - bed.matches.as_map(), - BTreeMap::from([(one.clone(), BTreeSet::from([now]))]), - "the match has been added to matches" - ); - assert_eq!( - bed.ordered_times.as_map(), - BTreeMap::from([(now, one.clone())]), - "the match has been added to ordered_times" - ); - assert!( - bed.triggers.iter().next().is_none(), - "triggers is still empty" - ); - - // Second match - assert!(bed.manager.handle_line("test one", now1s)); - assert_eq!( - bed.matches.as_map(), - BTreeMap::from([(one.clone(), BTreeSet::from([now, now1s]))]), - "a second match is present in matches" - ); - assert_eq!( - bed.ordered_times.as_map(), - BTreeMap::from([(now, one.clone()), (now1s, one.clone())]), - "a second match is present in ordered_times" - ); - assert!( - bed.triggers.iter().next().is_none(), - "triggers is still empty" - ); - - // Third match, exec - let _block = bed.semaphore.acquire().await.unwrap(); - bed.manager.handle_line("test one", now2s); - assert!( - bed.matches.iter().next().is_none(), - "matches are emptied after trigger" - ); - assert!( - bed.ordered_times.iter().next().is_none(), - "ordered_times are emptied after trigger" - ); - assert_eq!( - bed.triggers.as_map(), - BTreeMap::from([(one.clone(), BTreeMap::from([(now2s, 1)]))]), - "triggers now contain the triggered match with 1 action left" // 1 and not 2 because the decrement_trigger() doesn't wait for the semaphore - ); - drop(_block); - - // Now the first action executes - tokio::time::sleep(Duration::from_millis(40)).await; - // Check first action - assert_eq!( - bed.triggers.as_map(), - BTreeMap::from([(one.clone(), BTreeMap::from([(now2s, 1)]))]), - "triggers still contain the triggered match with 1 action left" - ); - assert_eq!( - "a1 one\n", - &read_to_string(&bed.out_file).unwrap(), - "the output file contains the result of the first action" - ); - - // Now the second action executes - tokio::time::sleep(Duration::from_millis(100)).await; - // Check second action - assert!( - bed.triggers.iter().next().is_none(), - "triggers are empty again" - ); - assert_eq!( - "a1 one\na2 one\n", - &read_to_string(&bed.out_file).unwrap(), - "the output file contains the result of the 2 actions" - ); - - bed.assert_empty_trees(); - } - - #[tokio::test] - async fn one_match_one_action() { - let bed = TestBed::new(); - let filter = Filter::new_static( - vec![Action::new( - vec!["sh", "-c", &format!("echo a1 >> {}", &bed.out_file)], - None, - false, - "test", - "test", - "a1", - &bed.az_patterns, - )], - vec!["test "], - None, - None, - "test", - "test", - &bed.az_patterns, - ); - - let bed = bed.part2(filter, Local::now()); - let now = bed.now; - - // No match - assert!(!bed.manager.handle_line("test 131", now)); - assert!( - bed.matches.iter().next().is_none(), - "matches must be initially empty" - ); - assert!( - bed.ordered_times.iter().next().is_none(), - "ordered_times must be initially empty" - ); - assert!( - bed.triggers.iter().next().is_none(), - "triggers must be initially empty" - ); - - // match - assert!(bed.manager.handle_line("test one", now)); - assert!(bed.matches.iter().next().is_none(), "matches stay empty"); - assert!( - bed.ordered_times.iter().next().is_none(), - "ordered_times stay empty" - ); - assert!(bed.triggers.iter().next().is_none(), "triggers stay empty"); - - // the action executes - tokio::time::sleep(Duration::from_millis(40)).await; - assert_eq!( - "a1 one\n", - &read_to_string(&bed.out_file).unwrap(), - "the output file contains the result of the first action" - ); - - bed.assert_empty_trees(); - } - - #[tokio::test] - async fn one_match_one_delayed_action() { - let bed = TestBed::new(); - let filter = Filter::new_static( - vec![Action::new( - vec!["sh", "-c", &format!("echo a1 >> {}", &bed.out_file)], - Some("100ms"), - false, - "test", - "test", - "a1", - &bed.az_patterns, - )], - vec!["test "], - None, - None, - "test", - "test", - &bed.az_patterns, - ); - - let bed = bed.part2(filter, Local::now()); - let now = bed.now; - - // No match - assert!(!bed.manager.handle_line("test 131", now)); - assert!( - bed.matches.iter().next().is_none(), - "matches must be initially empty" - ); - assert!( - bed.ordered_times.iter().next().is_none(), - "ordered_times must be initially empty" - ); - assert!( - bed.triggers.iter().next().is_none(), - "triggers must be initially empty" - ); - - // Match - let one = vec!["one".to_string()]; - assert!(bed.manager.handle_line("test one", now)); - assert!(bed.matches.iter().next().is_none(), "matches stay empty"); - assert!( - bed.ordered_times.iter().next().is_none(), - "ordered_times stay empty" - ); - assert_eq!( - bed.triggers.as_map(), - BTreeMap::from([(one.clone(), BTreeMap::from([(now, 1)]))]), - "triggers still contain the triggered match with 1 action left" - ); - assert_eq!( - "", - &read_to_string(&bed.out_file).unwrap(), - "the output file is empty" - ); - - // The action executes - tokio::time::sleep(Duration::from_millis(140)).await; - assert!( - bed.triggers.iter().next().is_none(), - "triggers are empty again" - ); - assert_eq!( - "a1 one\n", - &read_to_string(&bed.out_file).unwrap(), - "the output file contains the result of the action" - ); - - bed.assert_empty_trees(); - } - - // TODO test persistance, ie. FilterManagers created with non-empty db -} +mod tests; diff --git a/src/daemon/filter/tests.rs b/src/daemon/filter/tests.rs new file mode 100644 index 0000000..b3f7d29 --- /dev/null +++ b/src/daemon/filter/tests.rs @@ -0,0 +1,438 @@ +use std::{ + collections::{BTreeMap, BTreeSet}, + fs::read_to_string, + sync::Arc, + time::Duration, +}; + +use chrono::{Local, TimeDelta}; +use tempfile::TempPath; +use tokio::sync::Semaphore; + +use super::{FilterManager, SledDbExt}; +use crate::{ + concepts::{Action, Filter, Match, Pattern, Patterns, Time}, + daemon::{shutdown::ShutdownController, Tree}, + tests::TempDb, +}; + +fn open_trees( + db: &TempDb, + filter: &Filter, +) -> ( + Tree>, + Tree, + Tree>, +) { + ( + db.open_filter_matches_tree(filter).unwrap(), + db.open_filter_ordered_times_tree(filter).unwrap(), + db.open_filter_triggers_tree(filter).unwrap(), + ) +} + +struct TestBed { + pub _out_path: TempPath, + pub out_file: String, + pub az_patterns: Patterns, +} + +impl TestBed { + fn new() -> Self { + let _out_path = tempfile::NamedTempFile::new().unwrap().into_temp_path(); + let out_file = _out_path.to_str().unwrap().to_string(); + + let az_patterns = Pattern::new_map("az", "[a-z]+").unwrap(); + Self { + _out_path, + out_file, + az_patterns, + } + } + + fn part2(self, filter: &'static Filter, now: Time, db: Option) -> TestBed2 { + let db = db.unwrap_or_else(|| TempDb::new()); + let controller = ShutdownController::new(); + let semaphore = Arc::new(Semaphore::new(1)); + TestBed2 { + _out_path: self._out_path, + out_file: self.out_file, + now, + matches: db.open_filter_matches_tree(filter).unwrap(), + ordered_times: db.open_filter_ordered_times_tree(filter).unwrap(), + triggers: db.open_filter_triggers_tree(filter).unwrap(), + manager: FilterManager::new( + &filter, + Some(semaphore.clone()), + controller.token(), + &db, + now, + ) + .unwrap(), + // db, + // controller, + semaphore, + } + } +} + +struct TestBed2 { + pub _out_path: TempPath, + pub out_file: String, + // pub db: TempDb, + // pub controller: ShutdownController, + pub semaphore: Arc, + pub now: Time, + pub matches: Tree>, + pub ordered_times: Tree, + pub triggers: Tree>, + pub manager: FilterManager, +} + +impl TestBed2 { + fn assert_empty_trees(&self) { + assert!( + self.matches.iter().next().is_none(), + "matches must be empty" + ); + assert!( + self.ordered_times.iter().next().is_none(), + "ordered_times must be empty" + ); + assert!( + self.triggers.iter().next().is_none(), + "triggers must be empty" + ); + } +} + +#[tokio::test] +async fn three_matches_then_action_then_delayed_action() { + let bed = TestBed::new(); + let filter = Filter::new_static( + vec![ + Action::new( + vec!["sh", "-c", &format!("echo a1 >> {}", &bed.out_file)], + None, + false, + "test", + "test", + "a1", + &bed.az_patterns, + ), + Action::new( + vec!["sh", "-c", &format!("echo a2 >> {}", &bed.out_file)], + Some("100ms"), + false, + "test", + "test", + "a2", + &bed.az_patterns, + ), + ], + vec!["test "], + Some(3), + Some("2s"), + "test", + "test", + &bed.az_patterns, + ); + + let bed = bed.part2(filter, Local::now(), None); + + let now = bed.now; + let now1s = bed.now + TimeDelta::seconds(1); + let now2s = bed.now + TimeDelta::seconds(2); + + // No match + assert!(!bed.manager.handle_line("test 131", now)); + bed.assert_empty_trees(); + + // First match + assert!(bed.manager.handle_line("test one", now)); + let one = vec!["one".to_string()]; + assert_eq!( + bed.matches.as_map(), + BTreeMap::from([(one.clone(), BTreeSet::from([now]))]), + "the match has been added to matches" + ); + assert_eq!( + bed.ordered_times.as_map(), + BTreeMap::from([(now, one.clone())]), + "the match has been added to ordered_times" + ); + assert!( + bed.triggers.iter().next().is_none(), + "triggers is still empty" + ); + + // Second match + assert!(bed.manager.handle_line("test one", now1s)); + assert_eq!( + bed.matches.as_map(), + BTreeMap::from([(one.clone(), BTreeSet::from([now, now1s]))]), + "a second match is present in matches" + ); + assert_eq!( + bed.ordered_times.as_map(), + BTreeMap::from([(now, one.clone()), (now1s, one.clone())]), + "a second match is present in ordered_times" + ); + assert!( + bed.triggers.iter().next().is_none(), + "triggers is still empty" + ); + + // Third match, exec + let _block = bed.semaphore.acquire().await.unwrap(); + bed.manager.handle_line("test one", now2s); + assert!( + bed.matches.iter().next().is_none(), + "matches are emptied after trigger" + ); + assert!( + bed.ordered_times.iter().next().is_none(), + "ordered_times are emptied after trigger" + ); + assert_eq!( + bed.triggers.as_map(), + BTreeMap::from([(one.clone(), BTreeMap::from([(now2s, 1)]))]), + "triggers now contain the triggered match with 1 action left" // 1 and not 2 because the decrement_trigger() doesn't wait for the semaphore + ); + drop(_block); + + // Now the first action executes + tokio::time::sleep(Duration::from_millis(40)).await; + // Check first action + assert_eq!( + bed.triggers.as_map(), + BTreeMap::from([(one.clone(), BTreeMap::from([(now2s, 1)]))]), + "triggers still contain the triggered match with 1 action left" + ); + assert_eq!( + "a1 one\n", + &read_to_string(&bed.out_file).unwrap(), + "the output file contains the result of the first action" + ); + + // Now the second action executes + tokio::time::sleep(Duration::from_millis(100)).await; + // Check second action + assert!( + bed.triggers.iter().next().is_none(), + "triggers are empty again" + ); + assert_eq!( + "a1 one\na2 one\n", + &read_to_string(&bed.out_file).unwrap(), + "the output file contains the result of the 2 actions" + ); + + bed.assert_empty_trees(); +} + +#[tokio::test] +async fn one_match_one_action() { + let bed = TestBed::new(); + let filter = Filter::new_static( + vec![Action::new( + vec!["sh", "-c", &format!("echo a1 >> {}", &bed.out_file)], + None, + false, + "test", + "test", + "a1", + &bed.az_patterns, + )], + vec!["test "], + None, + None, + "test", + "test", + &bed.az_patterns, + ); + + let bed = bed.part2(filter, Local::now(), None); + let now = bed.now; + + // No match + assert!(!bed.manager.handle_line("test 131", now)); + bed.assert_empty_trees(); + + // match + assert!(bed.manager.handle_line("test one", now)); + assert!(bed.matches.iter().next().is_none(), "matches stay empty"); + assert!( + bed.ordered_times.iter().next().is_none(), + "ordered_times stay empty" + ); + assert!(bed.triggers.iter().next().is_none(), "triggers stay empty"); + + // the action executes + tokio::time::sleep(Duration::from_millis(40)).await; + assert_eq!( + "a1 one\n", + &read_to_string(&bed.out_file).unwrap(), + "the output file contains the result of the first action" + ); + + bed.assert_empty_trees(); +} + +#[tokio::test] +async fn one_match_one_delayed_action() { + let bed = TestBed::new(); + let filter = Filter::new_static( + vec![Action::new( + vec!["sh", "-c", &format!("echo a1 >> {}", &bed.out_file)], + Some("100ms"), + false, + "test", + "test", + "a1", + &bed.az_patterns, + )], + vec!["test "], + None, + None, + "test", + "test", + &bed.az_patterns, + ); + + let bed = bed.part2(filter, Local::now(), None); + let now = bed.now; + + // No match + assert!(!bed.manager.handle_line("test 131", now)); + bed.assert_empty_trees(); + + // Match + let one = vec!["one".to_string()]; + assert!(bed.manager.handle_line("test one", now)); + assert!(bed.matches.iter().next().is_none(), "matches stay empty"); + assert!( + bed.ordered_times.iter().next().is_none(), + "ordered_times stay empty" + ); + assert_eq!( + bed.triggers.as_map(), + BTreeMap::from([(one.clone(), BTreeMap::from([(now, 1)]))]), + "triggers still contain the triggered match with 1 action left" + ); + assert_eq!( + "", + &read_to_string(&bed.out_file).unwrap(), + "the output file is empty" + ); + + // The action executes + tokio::time::sleep(Duration::from_millis(140)).await; + assert!( + bed.triggers.iter().next().is_none(), + "triggers are empty again" + ); + assert_eq!( + "a1 one\n", + &read_to_string(&bed.out_file).unwrap(), + "the output file contains the result of the action" + ); + + bed.assert_empty_trees(); +} + +#[tokio::test] +async fn one_db_match_one_runtime_match_one_action() { + let bed = TestBed::new(); + let filter = Filter::new_static( + vec![Action::new( + vec!["sh", "-c", &format!("echo a1 >> {}", &bed.out_file)], + None, + false, + "test", + "test", + "a1", + &bed.az_patterns, + )], + vec!["test "], + Some(2), + Some("2s"), + "test", + "test", + &bed.az_patterns, + ); + + let db = TempDb::new(); + let (matches, ordered_times, _) = open_trees(&db, filter); + + // Pre-add match + let now = Local::now(); + let one = vec!["one".to_string()]; + let now1s = now - TimeDelta::seconds(1); + + matches.insert(&one, &BTreeSet::from([now1s])).unwrap(); + ordered_times.insert(&now1s, &one).unwrap(); + + // Finish setup + let bed = bed.part2(filter, now, Some(db)); + + assert_eq!( + bed.matches.as_map(), + BTreeMap::from([(one.clone(), BTreeSet::from([now1s]))]), + "the match previously added to matches" + ); + assert_eq!( + bed.ordered_times.as_map(), + BTreeMap::from([(now1s, one.clone())]), + "the match previously added to matches" + ); + assert!(bed.triggers.iter().next().is_none(), "triggers stay empty"); + + // match + assert!(bed.manager.handle_line("test one", now)); + bed.assert_empty_trees(); + // the action executes + tokio::time::sleep(Duration::from_millis(40)).await; + assert_eq!( + "a1 one\n", + &read_to_string(&bed.out_file).unwrap(), + "the output file contains the result of the action" + ); +} + +#[tokio::test] +async fn one_outdated_db_match() { + let bed = TestBed::new(); + let filter = Filter::new_static( + vec![Action::new( + vec!["sh", "-c", &format!("echo a1 >> {}", &bed.out_file)], + None, + false, + "test", + "test", + "a1", + &bed.az_patterns, + )], + vec!["test "], + Some(2), + Some("1s"), + "test", + "test", + &bed.az_patterns, + ); + + let db = TempDb::new(); + let (matches, ordered_times, _) = open_trees(&db, filter); + + // Pre-add match + let now = Local::now(); + let one = vec!["one".to_string()]; + let now1s = now - TimeDelta::milliseconds(1001); + + matches.insert(&one, &BTreeSet::from([now1s])).unwrap(); + ordered_times.insert(&now1s, &one).unwrap(); + + // Finish setup + let bed = bed.part2(filter, now, Some(db)); + bed.assert_empty_trees(); +} From f4b8572e94e46dde49f18236da995fda2acebe4e Mon Sep 17 00:00:00 2001 From: ppom Date: Wed, 26 Feb 2025 12:00:00 +0100 Subject: [PATCH 008/308] run clippy on test target --- src/concepts/action.rs | 1 - src/concepts/filter.rs | 3 +-- src/concepts/pattern.rs | 1 - src/daemon/filter.rs | 13 ++++--------- src/daemon/filter/tests.rs | 9 +++++---- src/daemon/sledext.rs | 2 +- src/lib.rs | 3 +++ src/tests.rs | 5 ++--- 8 files changed, 16 insertions(+), 21 deletions(-) diff --git a/src/concepts/action.rs b/src/concepts/action.rs index dc6d62a..d11462c 100644 --- a/src/concepts/action.rs +++ b/src/concepts/action.rs @@ -183,7 +183,6 @@ impl Action { } } -#[allow(clippy::unwrap_used)] #[cfg(test)] pub mod tests { diff --git a/src/concepts/filter.rs b/src/concepts/filter.rs index 01cdaf7..57d4c67 100644 --- a/src/concepts/filter.rs +++ b/src/concepts/filter.rs @@ -251,7 +251,7 @@ impl Filter { retry_period: retry_period.map(|s| s.into()), ..Default::default() }; - filter.setup(&stream_name, &name, config_patterns).unwrap(); + filter.setup(stream_name, name, config_patterns).unwrap(); filter } @@ -276,7 +276,6 @@ impl Filter { } } -#[allow(clippy::unwrap_used)] #[cfg(test)] pub mod tests { use crate::concepts::action::tests::{ok_action, ok_action_with_after}; diff --git a/src/concepts/pattern.rs b/src/concepts/pattern.rs index c248258..38aae5b 100644 --- a/src/concepts/pattern.rs +++ b/src/concepts/pattern.rs @@ -135,7 +135,6 @@ impl Pattern { } } -#[allow(clippy::unwrap_used)] #[cfg(test)] pub mod tests { diff --git a/src/daemon/filter.rs b/src/daemon/filter.rs index 2b0f766..3605309 100644 --- a/src/daemon/filter.rs +++ b/src/daemon/filter.rs @@ -1,3 +1,6 @@ +#[cfg(test)] +mod tests; + use std::{ collections::{BTreeMap, BTreeSet}, process::Stdio, @@ -278,11 +281,7 @@ impl FilterManager { .fetch_and_update(&m, |set| { let mut set = set.unwrap(); set.remove(&t); - if set.is_empty() { - None - } else { - Some(set) - } + (!set.is_empty()).then_some(set) }) .unwrap(); } @@ -346,7 +345,3 @@ impl FilterManager { }); } } - -#[allow(clippy::unwrap_used)] -#[cfg(test)] -mod tests; diff --git a/src/daemon/filter/tests.rs b/src/daemon/filter/tests.rs index b3f7d29..7ba1879 100644 --- a/src/daemon/filter/tests.rs +++ b/src/daemon/filter/tests.rs @@ -16,6 +16,7 @@ use crate::{ tests::TempDb, }; +#[allow(clippy::type_complexity)] fn open_trees( db: &TempDb, filter: &Filter, @@ -51,7 +52,7 @@ impl TestBed { } fn part2(self, filter: &'static Filter, now: Time, db: Option) -> TestBed2 { - let db = db.unwrap_or_else(|| TempDb::new()); + let db = db.unwrap_or_default(); let controller = ShutdownController::new(); let semaphore = Arc::new(Semaphore::new(1)); TestBed2 { @@ -62,7 +63,7 @@ impl TestBed { ordered_times: db.open_filter_ordered_times_tree(filter).unwrap(), triggers: db.open_filter_triggers_tree(filter).unwrap(), manager: FilterManager::new( - &filter, + filter, Some(semaphore.clone()), controller.token(), &db, @@ -362,7 +363,7 @@ async fn one_db_match_one_runtime_match_one_action() { &bed.az_patterns, ); - let db = TempDb::new(); + let db = TempDb::default(); let (matches, ordered_times, _) = open_trees(&db, filter); // Pre-add match @@ -421,7 +422,7 @@ async fn one_outdated_db_match() { &bed.az_patterns, ); - let db = TempDb::new(); + let db = TempDb::default(); let (matches, ordered_times, _) = open_trees(&db, filter); // Pre-add match diff --git a/src/daemon/sledext.rs b/src/daemon/sledext.rs index 2180e84..927d0da 100644 --- a/src/daemon/sledext.rs +++ b/src/daemon/sledext.rs @@ -201,7 +201,7 @@ mod tests { #[test] fn tree_crud() { let filter = ok_filter(); - let db = TempDb::new(); + let db = TempDb::default(); let triggers = db.open_filter_triggers_tree(&filter).unwrap(); assert_eq!(BTreeMap::default(), triggers.as_map()); diff --git a/src/lib.rs b/src/lib.rs index dff7680..53b86dd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,9 @@ )] #![allow(clippy::upper_case_acronyms, clippy::mutable_key_type)] +// Allow unwrap in tests +#![cfg_attr(test, allow(clippy::unwrap_used))] + pub mod cli; pub mod client; pub mod concepts; diff --git a/src/tests.rs b/src/tests.rs index 9b01996..4039910 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,4 +1,3 @@ -#![allow(clippy::unwrap_used)] #![cfg(test)] use std::{ @@ -61,8 +60,8 @@ pub struct TempDb { _tempdir: TempDir, } -impl TempDb { - pub fn new() -> Self { +impl Default for TempDb { + fn default() -> Self { let _tempdir = TempDir::new().unwrap(); let db = sled::open(_tempdir.path()).unwrap(); TempDb { _tempdir, db } From deaf418afd35f2050c08d6bdf4001479e7aabda4 Mon Sep 17 00:00:00 2001 From: ppom Date: Tue, 4 Mar 2025 12:00:00 +0100 Subject: [PATCH 009/308] Add ip46tables and nft46 executables to the .deb --- Cargo.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index bcf990f..9a7e931 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,8 +15,10 @@ build = "build.rs" maintainer-scripts = "packaging/" systemd-units = { enable = false } assets = [ - # Executable + # Executables [ "target/release/reaction", "/usr/bin/reaction", "755" ], + [ "target/release/ip46tables", "/usr/bin/ip46tables", "755" ], + [ "target/release/nft46", "/usr/bin/nft46", "755" ], # Man pages [ "target/release/reaction*.1", "/usr/share/man/man1/", "644" ], # Shell completions From 170c1fd01e42f446d291bc8764165f7030c438b9 Mon Sep 17 00:00:00 2001 From: ppom Date: Wed, 26 Mar 2025 12:00:00 +0100 Subject: [PATCH 010/308] release.py: add cargo-deb to the depencies list --- release.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release.py b/release.py index 9d0cfdf..9e5e2b6 100644 --- a/release.py +++ b/release.py @@ -1,5 +1,5 @@ #!/usr/bin/env nix-shell -#!nix-shell -i python3 -p "python3.withPackages (ps: with ps; [ requests ])" -p debian-devscripts git minisign cargo-cross rustup +#!nix-shell -i python3 -p "python3.withPackages (ps: with ps; [ requests ])" -p debian-devscripts git minisign cargo-cross rustup cargo-deb import http.client import json import os From da9287c16c65e3bd7afdb3c2cae1abe38e6ec450 Mon Sep 17 00:00:00 2001 From: ppom Date: Tue, 22 Apr 2025 12:00:00 +0200 Subject: [PATCH 011/308] ip46tables: fix return type of exec func --- helpers_c/ip46tables.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helpers_c/ip46tables.c b/helpers_c/ip46tables.c index 3af3c78..02ce85a 100644 --- a/helpers_c/ip46tables.c +++ b/helpers_c/ip46tables.c @@ -59,7 +59,7 @@ int guess_type(int len, char *tab[]) { return 0; } -int exec(char *str, char **argv) { +void exec(char *str, char **argv) { argv[0] = str; execvp(str, argv); // returns only if fails From 94502443f7709629f5fcde9627c13fb2a2c6c8ad Mon Sep 17 00:00:00 2001 From: ppom Date: Tue, 29 Apr 2025 12:00:00 +0200 Subject: [PATCH 012/308] =?UTF-8?q?Prettier=20pattern-matching=20?= =?UTF-8?q?=F0=9F=92=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/main.rs b/src/main.rs index ab42624..923588a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,12 +28,7 @@ async fn main() { let cli = Cli::parse(); - let (is_daemon, level) = if let SubCommand::Start { - config: _, - loglevel, - socket: _, - } = cli.command - { + let (is_daemon, level) = if let SubCommand::Start { loglevel, .. } = cli.command { (true, loglevel) } else { (false, Level::DEBUG) @@ -57,8 +52,8 @@ async fn main() { let result = match cli.command { SubCommand::Start { config, - loglevel: _, socket, + .. } => daemon(config, socket).await, SubCommand::Show { socket, From 79b132d77540bb8064012b83b4fd5ab08262ff24 Mon Sep 17 00:00:00 2001 From: ppom Date: Sat, 3 May 2025 12:00:00 +0200 Subject: [PATCH 013/308] README: add a word on oppression --- README.md | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index da20a16..e1425ba 100644 --- a/README.md +++ b/README.md @@ -194,11 +194,22 @@ To install the systemd file as well make install_systemd ``` -## Development +## Contributing -Contributions are welcome. For any substantial feature, please file an issue first, to be assured that we agree on the feature, and to avoid unnecessary work. +> We, as participants in the open source ecosystem, are ethically responsible for the software +> and hardware we help create - as it can be used to perpetuate inequalities or help empower +> marginalized communities, and fight against patriarchy, capitalism, sexism, gender violence, +> racism, ableism, homophobia, colonialism, fascism, surveillance, and oppressive control. -I recommend reading [`ARCHITECTURE.md`](ARCHITECTURE.md) first. This is a tour of the codebase, which should save time to new contributors. +- [NGI's Diversity and Inclusion Guide](https://nlnet.nl/NGI0/bestpractices/DiversityAndInclusionGuide-v4.pdf) + +I'll do my best to maintain a safe contribution place, as free as possible from discrimination and elitism. +Your ideas are welcome in the issues. +Contributions are welcome. + +For any substantial feature, please file an issue first, to be assured that we agree on the feature, and to avoid unnecessary work. + +I recommend reading [`ARCHITECTURE.md`](ARCHITECTURE.md) first. This is a quick tour of the codebase, which should save time to new contributors. ## Funding From a297f26f3d52accdda75ee8fbd8ec51eb059b83d Mon Sep 17 00:00:00 2001 From: ppom Date: Sat, 3 May 2025 12:00:00 +0200 Subject: [PATCH 014/308] Prettier exec_limit handler --- src/daemon/mod.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index d8a5f20..743074c 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -45,10 +45,9 @@ pub async fn daemon( let shutdown = ShutdownController::new(); // Semaphore limiting action execution concurrency - let exec_limit = if config.concurrency() > 0 { - Some(Arc::new(Semaphore::new(config.concurrency()))) - } else { - None + let exec_limit = match config.concurrency() { + 0 => None, + n => Some(Arc::new(Semaphore::new(n))), }; // Open Database From 2ab6eeceaa829a9ba1225f9de75751c4a3887363 Mon Sep 17 00:00:00 2001 From: ppom Date: Fri, 16 May 2025 12:00:00 +0200 Subject: [PATCH 015/308] Matrix rooms --- README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.md b/README.md index e1425ba..497009f 100644 --- a/README.md +++ b/README.md @@ -211,6 +211,14 @@ For any substantial feature, please file an issue first, to be assured that we a I recommend reading [`ARCHITECTURE.md`](ARCHITECTURE.md) first. This is a quick tour of the codebase, which should save time to new contributors. +You can also join this Matrix development room: [#reaction-dev-en:club1.fr](https://matrix.to/#/#reaction-dev-en:club1.fr). +French version: [#reaction-dev-fr:club1.fr](https://matrix.to/#/#reaction-dev-fr:club1.fr). + +## Help + +You can ask for help in the issues or in this Matrix room: [#reaction-users-en:club1.fr](https://matrix.to/#/#reaction-users-en:club1.fr). +French version: [#reaction-users-fr:club1.fr](https://matrix.to/#/#reaction-users-fr:club1.fr). + ## Funding This is a free time project, so I'm not working on schedule. From 660a7d5a58a8ba14e57a340ecf7d4a8670be7685 Mon Sep 17 00:00:00 2001 From: ppom Date: Tue, 29 Apr 2025 12:00:00 +0200 Subject: [PATCH 016/308] WIP --- Cargo.lock | 474 ++++++++++++++++++++++++++++++++++++++++-- Cargo.toml | 1 + src/daemon/heedext.rs | 112 ++++++++++ src/daemon/mod.rs | 16 +- 4 files changed, 584 insertions(+), 19 deletions(-) create mode 100644 src/daemon/heedext.rs diff --git a/Cargo.lock b/Cargo.lock index 291b397..833b165 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -144,9 +144,12 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.6.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" +dependencies = [ + "serde", +] [[package]] name = "bumpalo" @@ -168,9 +171,9 @@ checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" [[package]] name = "cc" -version = "1.1.31" +version = "1.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2e7962b54006dcfcc61cb72735f4d89bb97061dd6a7ed882ec6b8ee53714c6f" +checksum = "8e3a13707ac958681c13b39b458c073d0d9bc8a22cb1b2f4c8e55eb72c13f362" dependencies = [ "shlex", ] @@ -291,12 +294,41 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.82", +] + +[[package]] +name = "doxygen-rs" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "415b6ec780d34dcf624666747194393603d0373b7141eef01d12ee58881507d9" +dependencies = [ + "phf", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -319,6 +351,15 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" +[[package]] +name = "form_urlencoded" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +dependencies = [ + "percent-encoding", +] + [[package]] name = "fs2" version = "0.4.3" @@ -456,6 +497,44 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "heed" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a56c94661ddfb51aa9cdfbf102cfcc340aa69267f95ebccc4af08d7c530d393" +dependencies = [ + "bitflags 2.9.0", + "byteorder", + "heed-traits", + "heed-types", + "libc", + "lmdb-master-sys", + "once_cell", + "page_size", + "serde", + "synchronoise", + "url", +] + +[[package]] +name = "heed-traits" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb3130048d404c57ce5a1ac61a903696e8fcde7e8c2991e9fcfc1f27c3ef74ff" + +[[package]] +name = "heed-types" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c255bdf46e07fb840d120a36dcc81f385140d7191c76a7391672675c01a55d" +dependencies = [ + "bincode", + "byteorder", + "heed-traits", + "serde", + "serde_json", +] + [[package]] name = "hermit-abi" version = "0.3.9" @@ -485,6 +564,145 @@ dependencies = [ "cc", ] +[[package]] +name = "icu_collections" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locid" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_locid_transform" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_locid_transform_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_locid_transform_data" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7515e6d781098bf9f7205ab3fc7e9709d34554ae0b21ddbcb5febfa4bc7df11d" + +[[package]] +name = "icu_normalizer" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "utf16_iter", + "utf8_iter", + "write16", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5e8338228bdc8ab83303f16b797e177953730f601a96c25d10cb3ab0daa0cb7" + +[[package]] +name = "icu_properties" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93d6020766cfc6302c15dbbc9c8778c37e62c14427cb7f6e601d849e092aeef5" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_locid_transform", + "icu_properties_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85fb8799753b75aee8d2a21d7c14d9f38921b54b3dbda10f5a3c7a7b82dba5e2" + +[[package]] +name = "icu_provider" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_provider_macros", + "stable_deref_trait", + "tinystr", + "writeable", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_provider_macros" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.82", +] + +[[package]] +name = "idna" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daca1df1c957320b2cf139ac61e7bd64fed304c5040df000a745aa1de3b4ef71" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + [[package]] name = "indexmap" version = "2.6.0" @@ -556,7 +774,7 @@ dependencies = [ "proc-macro2", "quote", "syn 1.0.109", - "synstructure", + "synstructure 0.12.6", ] [[package]] @@ -616,9 +834,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.161" +version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "linux-raw-sys" @@ -626,6 +844,23 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "litemap" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856" + +[[package]] +name = "lmdb-master-sys" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "864808e0b19fb6dd3b70ba94ee671b82fce17554cf80aeb0a155c65bb08027df" +dependencies = [ + "cc", + "doxygen-rs", + "libc", +] + [[package]] name = "lock_api" version = "0.4.12" @@ -681,7 +916,7 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "cfg-if", "cfg_aliases", "libc", @@ -727,9 +962,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.20.2" +version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" [[package]] name = "overload" @@ -737,6 +972,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "page_size" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30d5b2194ed13191c1999ae0704b7839fb18384fa22e49b57eeaa97d79ce40da" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "parking_lot" version = "0.11.2" @@ -818,6 +1063,54 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c719dcf55f09a3a7e764c6649ab594c18a177e3599c467983cdf644bfc0a4088" +[[package]] +name = "percent-encoding" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" + +[[package]] +name = "phf" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078" +dependencies = [ + "phf_macros", + "phf_shared", +] + +[[package]] +name = "phf_generator" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" +dependencies = [ + "phf_shared", + "rand", +] + +[[package]] +name = "phf_macros" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f84ac04429c13a7ff43785d75ad27569f2951ce0ffd30a3321230db2fc727216" +dependencies = [ + "phf_generator", + "phf_shared", + "proc-macro2", + "quote", + "syn 2.0.82", +] + +[[package]] +name = "phf_shared" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project-lite" version = "0.2.14" @@ -897,6 +1190,7 @@ dependencies = [ "clap_complete", "clap_mangen", "futures", + "heed", "jrsonnet-evaluator", "nix", "num_cpus", @@ -930,7 +1224,7 @@ version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", ] [[package]] @@ -986,7 +1280,7 @@ version = "0.38.37" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8acb788b847c24f28525660c4d7758620a7210875711f79e7f663cc152726811" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "errno", "libc", "linux-raw-sys", @@ -1007,18 +1301,18 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "serde" -version = "1.0.210" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8e3592472072e6e22e0a54d5904d9febf8508f65fb8552499a1abc7d1078c3a" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.210" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" dependencies = [ "proc-macro2", "quote", @@ -1027,9 +1321,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.132" +version = "1.0.140" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" +checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" dependencies = [ "itoa", "memchr", @@ -1074,6 +1368,12 @@ dependencies = [ "libc", ] +[[package]] +name = "siphasher" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" + [[package]] name = "slab" version = "0.4.9" @@ -1115,6 +1415,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "strsim" version = "0.11.1" @@ -1143,6 +1449,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "synchronoise" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dbc01390fc626ce8d1cffe3376ded2b72a11bb70e1c75f404a210e4daa4def2" +dependencies = [ + "crossbeam-queue", +] + [[package]] name = "synstructure" version = "0.12.6" @@ -1155,6 +1470,17 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "synstructure" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.82", +] + [[package]] name = "tempfile" version = "3.13.0" @@ -1207,6 +1533,16 @@ dependencies = [ "chrono", ] +[[package]] +name = "tinystr" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tokio" version = "1.40.0" @@ -1337,6 +1673,29 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" +[[package]] +name = "url" +version = "2.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + +[[package]] +name = "utf16_iter" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.2" @@ -1523,6 +1882,18 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "write16" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936" + +[[package]] +name = "writeable" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" + [[package]] name = "yansi-term" version = "0.1.2" @@ -1532,6 +1903,30 @@ dependencies = [ "winapi", ] +[[package]] +name = "yoke" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40" +dependencies = [ + "serde", + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.82", + "synstructure 0.13.1", +] + [[package]] name = "zerocopy" version = "0.7.35" @@ -1552,3 +1947,46 @@ dependencies = [ "quote", "syn 2.0.82", ] + +[[package]] +name = "zerofrom" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.82", + "synstructure 0.13.1", +] + +[[package]] +name = "zerovec" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa2b893d79df23bfb12d5461018d408ea19dfafe76c2c7ef6d4eba614f8ff079" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.82", +] diff --git a/Cargo.toml b/Cargo.toml index 9a7e931..43e78cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ assets = [ bincode = "1.3.3" chrono = { version = "0.4.38", features = ["std", "clock", "serde"] } clap = { version = "4.5.4", features = ["derive"] } +heed = "0.22.0" jrsonnet-evaluator = "0.4.2" nix = { version = "0.29.0", features = ["signal"] } num_cpus = "1.16.0" diff --git a/src/daemon/heedext.rs b/src/daemon/heedext.rs new file mode 100644 index 0000000..5fdf526 --- /dev/null +++ b/src/daemon/heedext.rs @@ -0,0 +1,112 @@ +use std::collections::{BTreeMap, BTreeSet}; + +use heed::{ + types::{DecodeIgnore, Str}, + Database, Result, +}; + +use crate::concepts::{Config, Filter, Match, Time}; + +/// This trait permits to manage in a single place what are the names of [`heed::Database`]s in +/// reaction. It streamlines [`heed::Database`]s opening so that we can reliably open the same +/// Databases in multiple places. +/// It also permits to manage the cleanup of unused databases. +pub trait EnvExt { + fn open_filter_matches_tree(&self, filter: &Filter) -> Result>>; + fn open_filter_ordered_times_tree(&self, filter: &Filter) -> Result>; + fn open_filter_triggers_tree( + &self, + filter: &Filter, + ) -> Result>>; + + fn cleanup_unused_trees(&self, config: &Config) -> Result<()>; +} + +fn filter_matches_tree_name(filter: &Filter) -> String { + format!("filter_matches_{}.{}", filter.stream_name(), filter.name()) +} + +fn filter_ordered_times_tree_name(filter: &Filter) -> String { + format!( + "filter_ordered_times_{}.{}", + filter.stream_name(), + filter.name() + ) +} + +fn filter_triggers_tree_name(filter: &Filter) -> String { + format!("filter_triggers_{}.{}", filter.stream_name(), filter.name()) +} + +impl EnvExt for heed::Env { + fn open_filter_matches_tree(&self, filter: &Filter) -> Result>> { + let mut wtxn = self.write_txn()?; + let res = self.create_database::>( + &mut wtxn, + Some(&filter_matches_tree_name(filter)), + ); + wtxn.commit()?; + res + } + + fn open_filter_ordered_times_tree(&self, filter: &Filter) -> Result> { + let mut wtxn = self.write_txn()?; + let res = self.create_database::( + &mut wtxn, + Some(&filter_ordered_times_tree_name(filter)), + ); + wtxn.commit()?; + res + } + + fn open_filter_triggers_tree( + &self, + filter: &Filter, + ) -> Result>> { + let mut wtxn = self.write_txn()?; + let res = self.create_database::>( + &mut wtxn, + Some(&filter_triggers_tree_name(filter)), + ); + wtxn.commit()?; + res + } + + fn cleanup_unused_trees(&self, config: &Config) -> Result<()> { + let valid_tree_names: BTreeSet<_> = config + .streams() + .values() + // for each filter + .flat_map(|stream| stream.filters().values()) + .flat_map(|filter| { + [ + filter_matches_tree_name(filter), + filter_ordered_times_tree_name(filter), + filter_triggers_tree_name(filter), + ] + }) + // plus sled's default map + .chain(std::iter::once("__sled__default".into())) + // convert as IVec which is sled's binary type + // .map(|string| sled::IVec::from(string.as_bytes())) + .collect(); + + let rtxn = self.read_txn()?; + let unnamed: Database = self + .open_database(&rtxn, None)? + .expect("the unnamed database always exists"); + + // Remove trees that are not in the list of valid trees + for outdated_tree in unnamed + .iter(&rtxn)? + .filter(|key| key.is_ok()) + .map(|key| key.unwrap().0) + .filter(|tree_name| !valid_tree_names.contains(*tree_name)) + { + self.remove_database(outdated_tree) + .expect("Fatal error while cleaning DB on startup"); + } + + Ok(()) + } +} diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 743074c..03dec41 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -19,13 +19,14 @@ use tracing::{debug, info}; use crate::concepts::Config; use filter::FilterManager; use shutdown::{ShutdownController, ShutdownDelegate}; -use sledext::*; +use heedext::*; use socket::socket_manager; use stream::stream_manager; mod filter; mod shutdown; mod sledext; +mod heedext; mod socket; mod stream; @@ -60,6 +61,19 @@ pub async fn daemon( })?; db.cleanup_unused_trees(config); + #[allow(unsafe_code)] + let db = unsafe { + heed::EnvOpenOptions::new() + .open(format!("{}/lmdb", config.state_directory())) + .map_err(|err| { + format!( + "while opening database at {}: {}", + config.state_directory(), + err + ) + }) + }?; + // Filter managers let now = Local::now(); let mut state = HashMap::new(); From 9e83ceed46f6efe0c0fcb8986955192153860c5a Mon Sep 17 00:00:00 2001 From: ppom Date: Fri, 2 May 2025 12:00:00 +0200 Subject: [PATCH 017/308] WIP --- src/daemon/filter.rs | 176 +++++++++++++++++++++--------------------- src/daemon/heedext.rs | 119 +++++++++++++++++----------- src/daemon/mod.rs | 39 ++++------ 3 files changed, 176 insertions(+), 158 deletions(-) diff --git a/src/daemon/filter.rs b/src/daemon/filter.rs index 3605309..7aeedc9 100644 --- a/src/daemon/filter.rs +++ b/src/daemon/filter.rs @@ -1,12 +1,9 @@ #[cfg(test)] mod tests; -use std::{ - collections::{BTreeMap, BTreeSet}, - process::Stdio, - sync::Arc, -}; +use std::{collections::BTreeMap, process::Stdio, sync::Arc}; +use heed::{types::SerdeBincode, Database}; use regex::Regex; use tokio::sync::Semaphore; use tracing::{error, info}; @@ -16,7 +13,7 @@ use crate::{ protocol::{Order, PatternStatus}, }; -use super::{shutdown::ShutdownToken, SledDbExt, Tree}; +use super::{shutdown::ShutdownToken, EnvExt as _, MatchTime}; #[derive(Clone)] pub struct FilterManager { @@ -28,13 +25,16 @@ pub struct FilterManager { exec_limit: Option>, /// Permits to run pending actions on shutdown shutdown: ShutdownToken, + /// LMDB environment, necessary to create transactions + env: heed::Env, /// Saves all the current Matches for this Filter - matches: Tree>, + /// Has duplicate values for a key + matches: Database, SerdeBincode