diff --git a/.gitignore b/.gitignore index 07180cd..55ecad6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,15 +1,22 @@ /reaction -reaction.db -reaction.db.old +reaction*.db +reaction*.db.old /data +/lmdb +reaction*.export.json /reaction*.sock /result /wiki +/deb *.deb *.minisig *.qcow2 +debian-packaging/* *.swp +export-go-db/export-go-db +import-rust-db/target /target +reaction-plugin/target /local .ccls-cache .direnv diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000..78d7601 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,15 @@ +--- +image: golang:1.20-bookworm +stages: + - build + +variables: + DEBIAN_FRONTEND: noninteractive + +test_building: + stage: build + before_script: + - apt-get -qq -y update + - apt-get -qq -y install build-essential devscripts debhelper quilt wget + script: + - make reaction ip46tables nft46 diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 4289e65..f43d2ad 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -6,7 +6,6 @@ Here is a high-level overview of the codebase. ## Build -- `bench/`: Configuration that spawns a very high load on reaction. Useful to test performance improvements and regressions. - `build.rs`: permits to create shell completions and man pages on build. - `Cargo.toml`, `Cargo.lock`: manifest and dependencies. - `config/`: example / test configuration files. Look at its git history to discover more. @@ -16,7 +15,8 @@ Here is a high-level overview of the codebase. ## Main source code -- `tests/`: Integration tests. They test reaction runtime behavior, persistance, client-daemon communication, plugin integrations. +- `helpers_c/`: C helpers. I wish to have special IP support in reaction and get rid of them. See #79 and #116. +- `tests/`: Integration tests. For now they test basic reaction runtime behavior, persistance, and client-daemon communication. - `src/`: The source code, here we go! ### Top-level files @@ -25,13 +25,18 @@ Here is a high-level overview of the codebase. - `src/lib.rs`: Second main entrypoint - `src/cli.rs`: Command-line arguments - `src/tests.rs`: Test utilities -- `src/protocol.rs`: de/serialization and client/daemon protocol messages. ### `src/concepts/` reaction really is about its configuration, which is at the center of the code. -There is one file for each of its concepts: configuration, streams, filters, actions, patterns, plugins. +There is one file for each of its concepts: configuration, streams, filters, actions, patterns. + +### `src/protocol/` + +Low-level serialization/deserialization and client-daemon protocol messages. + +Shared by the client and daemon's socket. Also used by daemon's database. ### `src/client/` @@ -53,9 +58,9 @@ This code has async code, to handle input streams and communication with clients - `mod.rs`: High-level logic - `state.rs`: Inner state operations - `socket.rs`: The socket task, responsible for communication with clients. -- `plugin.rs`: Plugin startup, configuration loading and cleanup. +- `shutdown.rs`: Logic for passing shutdown signal across all tasks -### `crates/treedb` +### `src/tree` Persistence layer. @@ -63,19 +68,5 @@ This is a database highly adapted to reaction workload, making reaction faster t (heed, sled and fjall crates have been tested). Its design is explained in the comments of its files: -- `lib.rs`: main database code, with its two API structs: Tree and Database. -- `raw.rs`: low-level part, directly interacting with de/serializisation and files. -- `time.rs`: time definitions shared with reaction. -- `helpers.rs`: utilities to ease db deserialization from disk. - -### `plugins/reaction-plugin` - -Shared plugin interface between reaction daemon and its plugins. - -Also defines some shared logic between them: -- `shutdown.rs`: Logic for passing shutdown signal across all tasks -- `parse_duration.rs` Duration parsing - -### `plugins/reaction-plugin-*` - -All core plugins. +- `mod.rs`: main database code, with its two API structs: Tree and Database. +- `raw.rs` low-level part, directly interacting with de/serializisation and files. diff --git a/Cargo.lock b/Cargo.lock index c9a7b56..f5b71b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1980,15 +1980,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "libnftables1-sys" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b290d0d41f0ad578660aeed371bcae4cf85f129a6fe31350dbd2e097518cd7f" -dependencies = [ - "bindgen", -] - [[package]] name = "linux-raw-sys" version = "0.11.0" @@ -2257,22 +2248,6 @@ dependencies = [ "wmi", ] -[[package]] -name = "nftables" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c57e7343eed9e9330e084eef12651b15be3c8ed7825915a0ffa33736b852bed" -dependencies = [ - "schemars", - "serde", - "serde_json", - "serde_path_to_error", - "strum", - "strum_macros", - "thiserror 2.0.18", - "tokio", -] - [[package]] name = "nix" version = "0.29.0" @@ -2886,6 +2861,7 @@ dependencies = [ name = "reaction-plugin" version = "1.0.0" dependencies = [ + "chrono", "remoc", "serde", "serde_json", @@ -2923,19 +2899,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "reaction-plugin-nftables" -version = "0.1.0" -dependencies = [ - "libnftables1-sys", - "nftables", - "reaction-plugin", - "remoc", - "serde", - "serde_json", - "tokio", -] - [[package]] name = "reaction-plugin-virtual" version = "1.0.0" @@ -2956,26 +2919,6 @@ dependencies = [ "bitflags", ] -[[package]] -name = "ref-cast" -version = "1.0.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f354300ae66f76f1c85c5f84693f0ce81d747e2c3f21a45fef496d89c960bf7d" -dependencies = [ - "ref-cast-impl", -] - -[[package]] -name = "ref-cast-impl" -version = "1.0.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.114", -] - [[package]] name = "regex" version = "1.12.2" @@ -3251,31 +3194,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "schemars" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc" -dependencies = [ - "dyn-clone", - "ref-cast", - "schemars_derive", - "serde", - "serde_json", -] - -[[package]] -name = "schemars_derive" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d115b50f4aaeea07e79c1912f645c7513d81715d0420f8bc77a18c6260b307f" -dependencies = [ - "proc-macro2", - "quote", - "serde_derive_internals", - "syn 2.0.114", -] - [[package]] name = "scoped-tls" version = "1.0.1" @@ -3369,17 +3287,6 @@ dependencies = [ "syn 2.0.114", ] -[[package]] -name = "serde_derive_internals" -version = "0.29.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.114", -] - [[package]] name = "serde_json" version = "1.0.149" @@ -3393,17 +3300,6 @@ dependencies = [ "zmij", ] -[[package]] -name = "serde_path_to_error" -version = "0.1.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" -dependencies = [ - "itoa", - "serde", - "serde_core", -] - [[package]] name = "serde_urlencoded" version = "0.7.1" diff --git a/Cargo.toml b/Cargo.toml index 66e95e3..dd537aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,8 +29,6 @@ assets = [ [ "target/release/reaction.bash", "/usr/share/bash-completion/completions/reaction", "644" ], [ "target/release/reaction.fish", "/usr/share/fish/completions/", "644" ], [ "target/release/_reaction", "/usr/share/zsh/vendor-completions/", "644" ], - # Slice - [ "packaging/system-reaction.slice", "/usr/lib/systemd/system/", "644" ], ] [dependencies] @@ -85,7 +83,6 @@ members = [ "plugins/reaction-plugin", "plugins/reaction-plugin-cluster", "plugins/reaction-plugin-ipset", - "plugins/reaction-plugin-nftables", "plugins/reaction-plugin-virtual" ] diff --git a/packaging/reaction.service b/packaging/reaction.service index 343015d..5bd1478 100644 --- a/packaging/reaction.service +++ b/packaging/reaction.service @@ -1,6 +1,6 @@ # vim: ft=systemd [Unit] -Description=reaction daemon +Description=A daemon that scans program outputs for repeated patterns, and takes action. Documentation=https://reaction.ppom.me # Ensure reaction will insert its chain after docker has inserted theirs. Only useful when iptables & docker are used # After=docker.service @@ -17,8 +17,6 @@ RuntimeDirectory=reaction WorkingDirectory=/var/lib/reaction # Let reaction kill its child processes first KillMode=mixed -# Put reaction in its own slice so that plugins can be grouped within. -Slice=system-reaction.slice [Install] WantedBy=multi-user.target diff --git a/packaging/system-reaction.slice b/packaging/system-reaction.slice deleted file mode 100644 index 732f276..0000000 --- a/packaging/system-reaction.slice +++ /dev/null @@ -1 +0,0 @@ -[Slice] diff --git a/plugins/reaction-plugin-ipset/src/action.rs b/plugins/reaction-plugin-ipset/src/action.rs index c820e28..8522717 100644 --- a/plugins/reaction-plugin-ipset/src/action.rs +++ b/plugins/reaction-plugin-ipset/src/action.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, u32, usize}; +use std::{fmt::Debug, u32}; use reaction_plugin::{Exec, shutdown::ShutdownToken, time::parse_duration}; use remoc::rch::mpsc as remocMpsc; @@ -39,6 +39,9 @@ pub enum AddDel { Del, } +// FIXME block configs that have different set options for the same name +// treat default values as none? + /// User-facing action options #[derive(Serialize, Deserialize)] #[serde(deny_unknown_fields)] @@ -173,7 +176,7 @@ impl Set { } } - pub async fn init(&self, ipset: &mut IpSet) -> Result<(), (usize, String)> { + pub async fn init(&self, ipset: &mut IpSet) -> Result<(), String> { for (set, version) in [ (&self.sets.ipv4, Version::IPv4), (&self.sets.ipv6, Version::IPv6), @@ -186,42 +189,44 @@ impl Set { version, timeout: self.timeout, })) - .await - .map_err(|err| (0, err.to_string()))?; + .await?; // insert set in chains - for (i, chain) in self.chains.iter().enumerate() { + for chain in &self.chains { ipset .order(Order::InsertSet(SetChain { set: set.clone(), chain: chain.clone(), target: self.target.clone(), })) - .await - .map_err(|err| (i + 1, err.to_string()))?; + .await?; } } } Ok(()) } - pub async fn destroy(&self, ipset: &mut IpSet, until: Option) { - for set in [&self.sets.ipv4, &self.sets.ipv6] { + pub async fn destroy(&self, ipset: &mut IpSet) { + for (set, version) in [ + (&self.sets.ipv4, Version::IPv4), + (&self.sets.ipv6, Version::IPv6), + ] { if let Some(set) = set { - for chain in self - .chains - .iter() - .take(until.map(|until| until - 1).unwrap_or(usize::MAX)) - { - let _ = ipset + for chain in &self.chains { + if let Err(err) = ipset .order(Order::RemoveSet(SetChain { set: set.clone(), chain: chain.clone(), target: self.target.clone(), })) - .await; + .await + { + eprintln!( + "ERROR while removing {version} set {set} from chain {chain}: {err}" + ); + } } - if until.is_none_or(|until| until != 0) { - let _ = ipset.order(Order::DestroySet(set.clone())).await; + if let Err(err) = ipset.order(Order::DestroySet(set.clone())).await { + eprintln!("ERROR while destroying {version} set {set}: {err}"); } } } diff --git a/plugins/reaction-plugin-ipset/src/ipset.rs b/plugins/reaction-plugin-ipset/src/ipset.rs index 81b1061..b2fcb78 100644 --- a/plugins/reaction-plugin-ipset/src/ipset.rs +++ b/plugins/reaction-plugin-ipset/src/ipset.rs @@ -72,7 +72,7 @@ impl IpSet { pub enum IpSetError { Thread(String), - IpSet(()), + IpSet(String), } impl Display for IpSetError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -81,7 +81,7 @@ impl Display for IpSetError { "{}", match self { IpSetError::Thread(err) => err, - IpSetError::IpSet(()) => "ipset error", + IpSetError::IpSet(err) => err, } ) } @@ -90,12 +90,12 @@ impl From for String { fn from(value: IpSetError) -> Self { match value { IpSetError::Thread(err) => err, - IpSetError::IpSet(()) => "ipset error".to_string(), + IpSetError::IpSet(err) => err, } } } -pub type OrderType = (Order, oneshot::Sender>); +pub type OrderType = (Order, oneshot::Sender>); struct Set { session: Session, @@ -121,7 +121,7 @@ impl IPsetManager { } } - fn handle_order(&mut self, order: Order) -> Result<(), ()> { + fn handle_order(&mut self, order: Order) -> Result<(), String> { match order { Order::CreateSet(CreateSet { name, @@ -139,7 +139,7 @@ impl IPsetManager { }; builder.with_ipv6(version == Version::IPv6)?.build() }) - .map_err(|err| eprintln!("ERROR Could not create set {name}: {err}"))?; + .map_err(|err| format!("Could not create set {name}: {err}"))?; self.sessions.insert(name, Set { session, version }); } @@ -149,7 +149,7 @@ impl IPsetManager { session .session .destroy() - .map_err(|err| eprintln!("ERROR Could not destroy set {set}: {err}"))?; + .map_err(|err| format!("Could not destroy set {set}: {err}"))?; } } @@ -162,13 +162,9 @@ impl IPsetManager { Ok(()) } - fn insert_remove_ip(&mut self, set: String, ip: String, insert: bool) -> Result<(), ()> { - self._insert_remove_ip(set, ip, insert) - .map_err(|err| eprintln!("ERROR {err}")) - } - fn _insert_remove_ip(&mut self, set: String, ip: String, insert: bool) -> Result<(), String> { + fn insert_remove_ip(&mut self, set: String, ip: String, insert: bool) -> Result<(), String> { let session = self.sessions.get_mut(&set).ok_or(format!( - "No set handled by this plugin with this name: {set}. This likely is a bug." + "No set handled by us with this name: {set}. This likely is a bug." ))?; let mut net_data = NetDataType::new(Ipv4Addr::LOCALHOST, 0); @@ -186,28 +182,24 @@ impl IPsetManager { Ok(()) } - fn insert_remove_set(&self, options: SetChain, insert: bool) -> Result<(), ()> { - self._insert_remove_set(options, insert) - .map_err(|err| eprintln!("ERROR {err}")) - } - fn _insert_remove_set(&self, options: SetChain, insert: bool) -> Result<(), String> { - let SetChain { set, chain, target } = options; + fn insert_remove_set(&self, options: SetChain, insert: bool) -> Result<(), String> { + let SetChain { + set, + chain, + target: action, + } = options; let version = self .sessions .get(&set) - .ok_or(format!( - "No set managed by this plugin with this name: {set}" - ))? + .ok_or(format!("No set managed by us with this name: {set}"))? .version; - let (verb, verbing, from) = if insert { - ("insert", "inserting", "in") + if insert { + eprintln!("INFO inserting {version} set {set} in chain {chain}"); } else { - ("remove", "removing", "from") - }; - - eprintln!("INFO {verbing} {version} set {set} {from} chain {chain}"); + eprintln!("INFO removing {version} set {set} from chain {chain}"); + } let command = match version { Version::IPv4 => "iptables", @@ -225,20 +217,20 @@ impl IPsetManager { &set, "src", "-j", - &target, + &action, ]) .spawn() - .map_err(|err| format!("Could not {verb} ipset {set} {from} chain {chain}: Could not execute {command}: {err}"))?; + .map_err(|err| format!("Could not insert ipset {set} in chain {chain}: {err}"))?; let exit = child .wait() - .map_err(|err| format!("Could not {verb} ipset {set} {from} chain {chain}: {err}"))?; + .map_err(|err| format!("Could not insert ipset: {err}"))?; if exit.success() { Ok(()) } else { Err(format!( - "Could not {verb} ipset: exit code {}", + "Could not insert ipset: exit code {}", exit.code() .map(|c| c.to_string()) .unwrap_or_else(|| "".to_string()) diff --git a/plugins/reaction-plugin-ipset/src/main.rs b/plugins/reaction-plugin-ipset/src/main.rs index 828c3a8..1117529 100644 --- a/plugins/reaction-plugin-ipset/src/main.rs +++ b/plugins/reaction-plugin-ipset/src/main.rs @@ -109,21 +109,15 @@ impl PluginInfo for Plugin { let mut first_error = None; for (i, set) in self.sets.iter().enumerate() { // Retain if error - if let Err((failed_step, err)) = set.init(&mut self.ipset).await { - first_error = Some((i, failed_step, RemoteError::Plugin(err))); + if let Err(err) = set.init(&mut self.ipset).await { + first_error = Some((i, RemoteError::Plugin(err))); break; } } // Destroy initialized sets if error - if let Some((last_set, failed_step, err)) = first_error { - eprintln!("DEBUG last_set: {last_set} failed_step: {failed_step} err: {err}"); - for (curr_set, set) in self.sets.iter().enumerate().take(last_set + 1) { - let until = if last_set == curr_set { - Some(failed_step) - } else { - None - }; - let _ = set.destroy(&mut self.ipset, until).await; + if let Some((i, err)) = first_error { + for set in self.sets.iter().take(i + 1) { + let _ = set.destroy(&mut self.ipset).await; } return Err(err); } @@ -154,6 +148,6 @@ impl PluginInfo for Plugin { async fn destroy_sets_at_shutdown(mut ipset: IpSet, sets: Vec, shutdown: ShutdownToken) { shutdown.wait().await; for set in sets { - set.destroy(&mut ipset, None).await; + set.destroy(&mut ipset).await; } } diff --git a/plugins/reaction-plugin-nftables/Cargo.toml b/plugins/reaction-plugin-nftables/Cargo.toml deleted file mode 100644 index 1de8e6b..0000000 --- a/plugins/reaction-plugin-nftables/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "reaction-plugin-nftables" -version = "0.1.0" -edition = "2024" - -[dependencies] -tokio = { workspace = true, features = ["rt-multi-thread"] } -remoc.workspace = true -reaction-plugin.path = "../reaction-plugin" -serde.workspace = true -serde_json.workspace = true -nftables = { version = "0.6.3", features = ["tokio"] } -libnftables1-sys = { version = "0.1.1" } diff --git a/plugins/reaction-plugin-nftables/src/action.rs b/plugins/reaction-plugin-nftables/src/action.rs deleted file mode 100644 index 3898ab5..0000000 --- a/plugins/reaction-plugin-nftables/src/action.rs +++ /dev/null @@ -1,493 +0,0 @@ -use std::{ - borrow::Cow, - collections::HashSet, - fmt::{Debug, Display}, - u32, -}; - -use nftables::{ - batch::Batch, - expr::Expression, - schema::{Element, NfListObject, Rule, SetFlag, SetType, SetTypeValue}, - stmt::Statement, - types::{NfFamily, NfHook}, -}; -use reaction_plugin::{Exec, shutdown::ShutdownToken, time::parse_duration}; -use remoc::rch::mpsc as remocMpsc; -use serde::{Deserialize, Serialize}; - -use crate::{helpers::Version, nft::NftClient}; - -#[derive(Default, Serialize, Deserialize, PartialEq, Eq, Clone, Copy)] -pub enum IpVersion { - #[default] - #[serde(rename = "ip")] - Ip, - #[serde(rename = "ipv4")] - Ipv4, - #[serde(rename = "ipv6")] - Ipv6, -} -impl Debug for IpVersion { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}", - match self { - IpVersion::Ipv4 => "ipv4", - IpVersion::Ipv6 => "ipv6", - IpVersion::Ip => "ip", - } - ) - } -} - -#[derive(Default, Debug, Serialize, Deserialize)] -pub enum AddDel { - #[default] - #[serde(alias = "add")] - Add, - #[serde(alias = "delete")] - Delete, -} - -/// User-facing action options -#[derive(Serialize, Deserialize)] -#[serde(deny_unknown_fields)] -pub struct ActionOptions { - /// The set that should be used by this action - pub set: String, - /// The pattern name of the IP. - /// Defaults to "ip" - #[serde(default = "serde_ip")] - pub pattern: String, - #[serde(skip)] - ip_index: usize, - // Whether the action is to "add" or "del" the ip from the set - #[serde(default)] - action: AddDel, - - #[serde(flatten)] - pub set_options: SetOptions, -} - -fn serde_ip() -> String { - "ip".into() -} - -impl ActionOptions { - pub fn set_ip_index(&mut self, patterns: Vec) -> Result<(), ()> { - self.ip_index = patterns - .into_iter() - .enumerate() - .filter(|(_, name)| name == &self.pattern) - .next() - .ok_or(())? - .0; - Ok(()) - } -} - -/// Merged set options -#[derive(Default, Clone, Deserialize, Serialize, Debug, PartialEq, Eq)] -pub struct SetOptions { - /// The IP type. - /// Defaults to `46`. - /// If `ipv4`: creates an IPv4 set with this name - /// If `ipv6`: creates an IPv6 set with this name - /// If `ip`: creates an IPv4 set with its name suffixed by 'v4' AND an IPv6 set with its name suffixed by 'v6' - /// *Merged set-wise*. - #[serde(default)] - version: Option, - /// Chains where the IP set should be inserted. - /// Defaults to `["input", "forward"]` - /// *Merged set-wise*. - #[serde(default)] - hooks: Option>, - // Optional timeout, letting linux/netfilter handle set removal instead of reaction - // Note that `reaction show` and `reaction flush` won't work if set instead of an `after` action - // Same syntax as after and retryperiod in reaction. - /// *Merged set-wise*. - #[serde(skip_serializing_if = "Option::is_none")] - timeout: Option, - #[serde(skip)] - timeout_u32: Option, - // Target that iptables should use when the IP is encountered. - // Defaults to DROP, but can also be ACCEPT, RETURN or any user-defined chain - /// *Merged set-wise*. - #[serde(default)] - target: Option, -} - -impl SetOptions { - pub fn merge(&mut self, options: &SetOptions) -> Result<(), String> { - // merge two Option and fail if there is conflict - fn inner_merge( - a: &mut Option, - b: &Option, - name: &str, - ) -> Result<(), String> { - match (&a, &b) { - (Some(aa), Some(bb)) => { - if aa != bb { - return Err(format!( - "Conflicting options for {name}: `{aa:?}` and `{bb:?}`" - )); - } - } - (None, Some(_)) => { - *a = b.clone(); - } - _ => (), - }; - Ok(()) - } - - inner_merge(&mut self.version, &options.version, "version")?; - inner_merge(&mut self.timeout, &options.timeout, "timeout")?; - inner_merge(&mut self.hooks, &options.hooks, "chains")?; - inner_merge(&mut self.target, &options.target, "target")?; - - if let Some(timeout) = &self.timeout { - let duration = parse_duration(timeout) - .map_err(|err| format!("failed to parse timeout: {}", err))? - .as_secs(); - if duration > u32::MAX as u64 { - return Err(format!( - "timeout is limited to {} seconds (approx {} days)", - u32::MAX, - 49_000 - )); - } - self.timeout_u32 = Some(duration as u32); - } - - Ok(()) - } -} - -#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] -pub enum RHook { - Ingress, - Prerouting, - Forward, - Input, - Output, - Postrouting, - Egress, -} - -impl RHook { - pub fn as_str(&self) -> &'static str { - match self { - RHook::Ingress => "ingress", - RHook::Prerouting => "prerouting", - RHook::Forward => "forward", - RHook::Input => "input", - RHook::Output => "output", - RHook::Postrouting => "postrouting", - RHook::Egress => "egress", - } - } -} - -impl Display for RHook { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.as_str()) - } -} - -impl From<&RHook> for NfHook { - fn from(value: &RHook) -> Self { - match value { - RHook::Ingress => Self::Ingress, - RHook::Prerouting => Self::Prerouting, - RHook::Forward => Self::Forward, - RHook::Input => Self::Input, - RHook::Output => Self::Output, - RHook::Postrouting => Self::Postrouting, - RHook::Egress => Self::Egress, - } - } -} - -#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] -pub enum RStatement { - Accept, - Drop, - Continue, - Return, -} - -pub struct Set { - pub sets: SetNames, - pub hooks: Vec, - pub timeout: Option, - pub target: RStatement, -} - -impl Set { - pub fn from(name: String, options: SetOptions) -> Self { - Self { - sets: SetNames::new(name, options.version), - timeout: options.timeout_u32, - target: options.target.unwrap_or(RStatement::Drop), - hooks: options.hooks.unwrap_or(vec![RHook::Input, RHook::Forward]), - } - } - - pub fn init<'a>(&self, batch: &mut Batch<'a>) -> Result<(), String> { - for (set, version) in [ - (&self.sets.ipv4, Version::IPv4), - (&self.sets.ipv6, Version::IPv6), - ] { - if let Some(set) = set { - let family = NfFamily::INet; - let table = Cow::from("reaction"); - - // create set - batch.add(NfListObject::<'a>::Set(Box::new(nftables::schema::Set::< - 'a, - > { - family, - table: table.to_owned(), - name: Cow::Owned(set.to_owned()), - // TODO Try a set which is both ipv4 and ipv6? - set_type: SetTypeValue::Single(match version { - Version::IPv4 => SetType::Ipv4Addr, - Version::IPv6 => SetType::Ipv6Addr, - }), - flags: Some({ - let mut flags = HashSet::from([SetFlag::Interval]); - if self.timeout.is_some() { - flags.insert(SetFlag::Timeout); - } - flags - }), - timeout: self.timeout.clone(), - ..Default::default() - }))); - // insert set in chains - let expr = vec![match self.target { - RStatement::Accept => Statement::Accept(None), - RStatement::Drop => Statement::Drop(None), - RStatement::Continue => Statement::Continue(None), - RStatement::Return => Statement::Return(None), - }]; - for hook in &self.hooks { - batch.add(NfListObject::Rule(Rule { - family, - table: table.to_owned(), - chain: Cow::from(hook.to_string()), - expr: Cow::Owned(expr.clone()), - ..Default::default() - })); - } - } - } - Ok(()) - } -} - -pub struct SetNames { - pub ipv4: Option, - pub ipv6: Option, -} - -impl SetNames { - pub fn new(name: String, version: Option) -> Self { - Self { - ipv4: match version { - Some(IpVersion::Ipv4) => Some(name.clone()), - Some(IpVersion::Ipv6) => None, - None | Some(IpVersion::Ip) => Some(format!("{}v4", name)), - }, - ipv6: match version { - Some(IpVersion::Ipv4) => None, - Some(IpVersion::Ipv6) => Some(name), - None | Some(IpVersion::Ip) => Some(format!("{}v6", name)), - }, - } - } -} - -pub struct Action { - nft: NftClient, - rx: remocMpsc::Receiver, - shutdown: ShutdownToken, - sets: SetNames, - // index of pattern ip in match vec - ip_index: usize, - action: AddDel, -} - -impl Action { - pub fn new( - nft: NftClient, - shutdown: ShutdownToken, - rx: remocMpsc::Receiver, - options: ActionOptions, - ) -> Result { - Ok(Action { - nft, - rx, - shutdown, - sets: SetNames::new(options.set, options.set_options.version), - ip_index: options.ip_index, - action: options.action, - }) - } - - pub async fn serve(mut self) { - loop { - let event = tokio::select! { - exec = self.rx.recv() => Some(exec), - _ = self.shutdown.wait() => None, - }; - match event { - // shutdown asked - None => break, - // channel closed - Some(Ok(None)) => break, - // error from channel - Some(Err(err)) => { - eprintln!("ERROR {err}"); - break; - } - // ok - Some(Ok(Some(exec))) => { - if let Err(err) = self.handle_exec(exec).await { - eprintln!("ERROR {err}"); - break; - } - } - } - } - // eprintln!("DEBUG Asking for shutdown"); - // self.shutdown.ask_shutdown(); - } - - async fn handle_exec(&mut self, mut exec: Exec) -> Result<(), String> { - // safeguard against Vec::remove's panic - if exec.match_.len() <= self.ip_index { - return Err(format!( - "match received from reaction is smaller than expected. looking for index {} but size is {}. this is a bug!", - self.ip_index, - exec.match_.len() - )); - } - let ip = exec.match_.remove(self.ip_index); - // select set - let set = match (&self.sets.ipv4, &self.sets.ipv6) { - (None, None) => return Err(format!("action is neither IPv4 nor IPv6, this is a bug!")), - (None, Some(set)) => set, - (Some(set), None) => set, - (Some(set4), Some(set6)) => { - if ip.contains(':') { - set6 - } else { - set4 - } - } - }; - // add/remove ip to set - let element = NfListObject::Element(Element { - family: NfFamily::INet, - table: Cow::from("reaction"), - name: Cow::from(set), - elem: Cow::from(vec![Expression::String(Cow::from(ip.clone()))]), - }); - let mut batch = Batch::new(); - match self.action { - AddDel::Add => batch.add(element), - AddDel::Delete => batch.delete(element), - }; - match self.nft.send(batch).await { - Ok(ok) => { - eprintln!("DEBUG action ok {:?} {ip}: {ok}", self.action); - Ok(()) - } - Err(err) => Err(format!("action ko {:?} {ip}: {err}", self.action)), - } - } -} - -#[cfg(test)] -mod tests { - use crate::action::{IpVersion, RHook, RStatement, SetOptions}; - - #[tokio::test] - async fn set_options_merge() { - let s1 = SetOptions { - version: None, - hooks: None, - timeout: None, - timeout_u32: None, - target: None, - }; - let s2 = SetOptions { - version: Some(IpVersion::Ipv4), - hooks: Some(vec![RHook::Input]), - timeout: Some("3h".into()), - timeout_u32: Some(3 * 3600), - target: Some(RStatement::Drop), - }; - assert_ne!(s1, s2); - assert_eq!(s1, SetOptions::default()); - - { - // s2 can be merged in s1 - let mut s1 = s1.clone(); - assert!(s1.merge(&s2).is_ok()); - assert_eq!(s1, s2); - } - - { - // s1 can be merged in s2 - let mut s2 = s2.clone(); - assert!(s2.merge(&s1).is_ok()); - } - - { - // s1 can be merged in itself - let mut s3 = s1.clone(); - assert!(s3.merge(&s1).is_ok()); - assert_eq!(s1, s3); - } - - { - // s2 can be merged in itself - let mut s3 = s2.clone(); - assert!(s3.merge(&s2).is_ok()); - assert_eq!(s2, s3); - } - - for s3 in [ - SetOptions { - version: Some(IpVersion::Ipv6), - ..Default::default() - }, - SetOptions { - hooks: Some(vec![RHook::Output]), - ..Default::default() - }, - SetOptions { - timeout: Some("30min".into()), - ..Default::default() - }, - SetOptions { - target: Some(RStatement::Continue), - ..Default::default() - }, - ] { - // none with some is ok - assert!(s3.clone().merge(&s1).is_ok(), "s3: {s3:?}"); - assert!(s1.clone().merge(&s3).is_ok(), "s3: {s3:?}"); - // different some is ko - assert!(s3.clone().merge(&s2).is_err(), "s3: {s3:?}"); - assert!(s2.clone().merge(&s3).is_err(), "s3: {s3:?}"); - } - } -} diff --git a/plugins/reaction-plugin-nftables/src/helpers.rs b/plugins/reaction-plugin-nftables/src/helpers.rs deleted file mode 100644 index b8b97b2..0000000 --- a/plugins/reaction-plugin-nftables/src/helpers.rs +++ /dev/null @@ -1,15 +0,0 @@ -use std::fmt::Display; - -#[derive(PartialEq, Eq, PartialOrd, Ord, Copy, Clone)] -pub enum Version { - IPv4, - IPv6, -} -impl Display for Version { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(match self { - Version::IPv4 => "IPv4", - Version::IPv6 => "IPv6", - }) - } -} diff --git a/plugins/reaction-plugin-nftables/src/main.rs b/plugins/reaction-plugin-nftables/src/main.rs deleted file mode 100644 index 6a7f067..0000000 --- a/plugins/reaction-plugin-nftables/src/main.rs +++ /dev/null @@ -1,176 +0,0 @@ -use std::{ - borrow::Cow, - collections::{BTreeMap, BTreeSet}, -}; - -use nftables::{ - batch::Batch, - schema::{Chain, NfListObject, Table}, - types::{NfChainType, NfFamily}, -}; -use reaction_plugin::{ - ActionConfig, ActionImpl, Hello, Manifest, PluginInfo, RemoteResult, StreamConfig, StreamImpl, - shutdown::ShutdownController, -}; -use remoc::rtc; - -use crate::{ - action::{Action, ActionOptions, Set, SetOptions}, - nft::NftClient, -}; - -#[cfg(test)] -mod tests; - -mod action; -pub mod helpers; -mod nft; - -#[tokio::main] -async fn main() { - let plugin = Plugin::default(); - reaction_plugin::main_loop(plugin).await; -} - -#[derive(Default)] -struct Plugin { - nft: NftClient, - sets: Vec, - actions: Vec, - shutdown: ShutdownController, -} - -impl PluginInfo for Plugin { - async fn manifest(&mut self) -> Result { - Ok(Manifest { - hello: Hello::new(), - streams: BTreeSet::default(), - actions: BTreeSet::from(["nftables".into()]), - }) - } - - async fn load_config( - &mut self, - streams: Vec, - actions: Vec, - ) -> RemoteResult<(Vec, Vec)> { - if !streams.is_empty() { - return Err("This plugin can't handle any stream type".into()); - } - - let mut ret_actions = Vec::with_capacity(actions.len()); - let mut set_options: BTreeMap = BTreeMap::new(); - - for ActionConfig { - stream_name, - filter_name, - action_name, - action_type, - config, - patterns, - } in actions - { - if &action_type != "nftables" { - return Err("This plugin can't handle other action types than nftables".into()); - } - - let mut options: ActionOptions = serde_json::from_value(config.into()).map_err(|err| { - format!("invalid options for action {stream_name}.{filter_name}.{action_name}: {err}") - })?; - - options.set_ip_index(patterns).map_err(|_| - format!( - "No pattern with name {} in filter {stream_name}.{filter_name}. Try setting the option `pattern` to your pattern name of type 'ip'", - &options.pattern - ) - )?; - - // Merge option - set_options - .entry(options.set.clone()) - .or_default() - .merge(&options.set_options) - .map_err(|err| format!("set {}: {err}", options.set))?; - - let (tx, rx) = remoc::rch::mpsc::channel(1); - self.actions.push(Action::new( - self.nft.clone(), - self.shutdown.token(), - rx, - options, - )?); - - ret_actions.push(ActionImpl { tx }); - } - - // Init all sets - while let Some((name, options)) = set_options.pop_first() { - self.sets.push(Set::from(name, options)); - } - - Ok((vec![], ret_actions)) - } - - async fn start(&mut self) -> RemoteResult<()> { - self.shutdown.delegate().handle_quit_signals()?; - - let mut batch = Batch::new(); - batch.add(reaction_table()); - - // Create a chain for each registered netfilter hook - for hook in self - .sets - .iter() - .flat_map(|set| &set.hooks) - .collect::>() - { - batch.add(NfListObject::Chain(Chain { - family: NfFamily::INet, - table: Cow::Borrowed("reaction"), - name: Cow::from(hook.as_str()), - _type: Some(NfChainType::Filter), - hook: Some(hook.into()), - prio: Some(0), - ..Default::default() - })); - } - - for set in &self.sets { - set.init(&mut batch)?; - } - - // TODO apply batch - self.nft.send(batch).await?; - - // Launch a task that will destroy the table on shutdown - { - let token = self.shutdown.token(); - tokio::spawn(async move { - token.wait().await; - Batch::new().delete(reaction_table()); - }); - } - - // Launch all actions - while let Some(action) = self.actions.pop() { - tokio::spawn(async move { action.serve().await }); - } - self.actions = Default::default(); - - Ok(()) - } - - async fn close(self) -> RemoteResult<()> { - self.shutdown.ask_shutdown(); - self.shutdown.wait_all_task_shutdown().await; - Ok(()) - } -} - -fn reaction_table() -> NfListObject<'static> { - NfListObject::Table(Table { - family: NfFamily::INet, - name: Cow::Borrowed("reaction"), - handle: None, - }) -} diff --git a/plugins/reaction-plugin-nftables/src/nft.rs b/plugins/reaction-plugin-nftables/src/nft.rs deleted file mode 100644 index a1641d2..0000000 --- a/plugins/reaction-plugin-nftables/src/nft.rs +++ /dev/null @@ -1,81 +0,0 @@ -use std::{ - ffi::{CStr, CString}, - thread, -}; - -use libnftables1_sys::Nftables; -use nftables::batch::Batch; -use tokio::sync::{mpsc, oneshot}; - -/// A client with a dedicated server thread to libnftables. -/// Calling [`Default::default()`] spawns a new server thread. -/// Cloning just creates a new client to the same server thread. -#[derive(Clone)] -pub struct NftClient { - tx: mpsc::Sender, -} - -impl Default for NftClient { - fn default() -> Self { - let (tx, mut rx) = mpsc::channel(10); - - thread::spawn(move || { - let mut conn = Nftables::new(); - - while let Some(NftCommand { json, ret }) = rx.blocking_recv() { - let (rc, output, error) = conn.run_cmd(json.as_ptr()); - let res = match rc { - 0 => to_rust_string(output) - .ok_or_else(|| "unknown ok (rc = 0 but no output buffer)".into()), - _ => to_rust_string(error) - .map(|err| format!("error (rc = {rc}: {err})")) - .ok_or_else(|| format!("unknown error (rc = {rc} but no error buffer)")), - }; - let _ = ret.send(res); - } - }); - - NftClient { tx } - } -} - -impl NftClient { - /// Send a batch to nftables. - pub async fn send(&self, batch: Batch<'_>) -> Result { - // convert JSON to CString - let mut json = serde_json::to_vec(&batch.to_nftables()) - .map_err(|err| format!("couldn't build json to send to nftables: {err}"))?; - json.push('\0' as u8); - let json = CString::from_vec_with_nul(json) - .map_err(|err| format!("invalid json with null char: {err}"))?; - - // Send command - let (tx, rx) = oneshot::channel(); - let command = NftCommand { json, ret: tx }; - self.tx - .send(command) - .await - .map_err(|err| format!("nftables thread has quit, can't send command: {err}"))?; - - // Wait for result - rx.await - .map_err(|_| format!("nftables thread has quit, no response for command"))? - } -} - -struct NftCommand { - json: CString, - ret: oneshot::Sender>, -} - -fn to_rust_string(c_ptr: *const i8) -> Option { - if c_ptr.is_null() { - None - } else { - Some( - unsafe { CStr::from_ptr(c_ptr) } - .to_string_lossy() - .into_owned(), - ) - } -} diff --git a/plugins/reaction-plugin-nftables/src/tests.rs b/plugins/reaction-plugin-nftables/src/tests.rs deleted file mode 100644 index 7c2de26..0000000 --- a/plugins/reaction-plugin-nftables/src/tests.rs +++ /dev/null @@ -1,247 +0,0 @@ -use reaction_plugin::{ActionConfig, PluginInfo, StreamConfig, Value}; -use serde_json::json; - -use crate::Plugin; - -#[tokio::test] -async fn conf_stream() { - // No stream is supported by nftables - assert!( - Plugin::default() - .load_config( - vec![StreamConfig { - stream_name: "stream".into(), - stream_type: "nftables".into(), - config: Value::Null - }], - vec![] - ) - .await - .is_err() - ); - - // Empty config is ok - assert!(Plugin::default().load_config(vec![], vec![]).await.is_ok()); -} - -#[tokio::test] -async fn conf_action_standalone() { - let p = vec!["name".into(), "ip".into(), "ip2".into()]; - let p_noip = vec!["name".into(), "ip2".into()]; - - for (is_ok, conf, patterns) in [ - // minimal set - (true, json!({ "set": "test" }), &p), - // missing set key - (false, json!({}), &p), - (false, json!({ "version": "ipv4" }), &p), - // unknown key - (false, json!({ "set": "test", "unknown": "yes" }), &p), - (false, json!({ "set": "test", "ip_index": 1 }), &p), - (false, json!({ "set": "test", "timeout_u32": 1 }), &p), - // pattern // - (true, json!({ "set": "test" }), &p), - (true, json!({ "set": "test", "pattern": "ip" }), &p), - (true, json!({ "set": "test", "pattern": "ip2" }), &p), - (true, json!({ "set": "test", "pattern": "ip2" }), &p_noip), - // unknown pattern "ip" - (false, json!({ "set": "test" }), &p_noip), - (false, json!({ "set": "test", "pattern": "ip" }), &p_noip), - // unknown pattern - (false, json!({ "set": "test", "pattern": "unknown" }), &p), - (false, json!({ "set": "test", "pattern": "uwu" }), &p_noip), - // bad type - (false, json!({ "set": "test", "pattern": 0 }), &p_noip), - (false, json!({ "set": "test", "pattern": true }), &p_noip), - // action // - (true, json!({ "set": "test", "action": "add" }), &p), - (true, json!({ "set": "test", "action": "delete" }), &p), - // unknown action - (false, json!({ "set": "test", "action": "create" }), &p), - (false, json!({ "set": "test", "action": "insert" }), &p), - (false, json!({ "set": "test", "action": "del" }), &p), - (false, json!({ "set": "test", "action": "destroy" }), &p), - // bad type - (false, json!({ "set": "test", "action": true }), &p), - (false, json!({ "set": "test", "action": 1 }), &p), - // ip version // - // ok - (true, json!({ "set": "test", "version": "ipv4" }), &p), - (true, json!({ "set": "test", "version": "ipv6" }), &p), - (true, json!({ "set": "test", "version": "ip" }), &p), - // unknown version - (false, json!({ "set": "test", "version": 4 }), &p), - (false, json!({ "set": "test", "version": 6 }), &p), - (false, json!({ "set": "test", "version": 46 }), &p), - (false, json!({ "set": "test", "version": "5" }), &p), - (false, json!({ "set": "test", "version": "ipv5" }), &p), - (false, json!({ "set": "test", "version": "4" }), &p), - (false, json!({ "set": "test", "version": "6" }), &p), - (false, json!({ "set": "test", "version": "46" }), &p), - // bad type - (false, json!({ "set": "test", "version": true }), &p), - // hooks // - // everything is fine really - (true, json!({ "set": "test", "hooks": [] }), &p), - ( - true, - json!({ "set": "test", "hooks": ["input", "forward", "ingress", "prerouting", "output", "postrouting", "egress"] }), - &p, - ), - (false, json!({ "set": "test", "hooks": ["INPUT"] }), &p), - (false, json!({ "set": "test", "hooks": ["FORWARD"] }), &p), - ( - false, - json!({ "set": "test", "hooks": ["unknown_hook"] }), - &p, - ), - // timeout // - (true, json!({ "set": "test", "timeout": "1m" }), &p), - (true, json!({ "set": "test", "timeout": "3 days" }), &p), - // bad - (false, json!({ "set": "test", "timeout": "3 dayz"}), &p), - (false, json!({ "set": "test", "timeout": 12 }), &p), - // target // - // anything is fine too - (true, json!({ "set": "test", "target": "drop" }), &p), - (true, json!({ "set": "test", "target": "accept" }), &p), - (true, json!({ "set": "test", "target": "return" }), &p), - (true, json!({ "set": "test", "target": "continue" }), &p), - // bad - (false, json!({ "set": "test", "target": "custom" }), &p), - (false, json!({ "set": "test", "target": "DROP" }), &p), - (false, json!({ "set": "test", "target": 11 }), &p), - (false, json!({ "set": "test", "target": ["DROP"] }), &p), - ] { - let res = Plugin::default() - .load_config( - vec![], - vec![ActionConfig { - stream_name: "stream".into(), - filter_name: "filter".into(), - action_name: "action".into(), - action_type: "nftables".into(), - config: conf.clone().into(), - patterns: patterns.clone(), - }], - ) - .await; - - assert!( - res.is_ok() == is_ok, - "conf: {:?}, must be ok: {is_ok}, result: {:?}", - conf, - // empty Result::Ok because ActionImpl is not Debug - res.map(|_| ()) - ); - } -} - -// TODO -#[tokio::test] -async fn conf_action_merge() { - let mut plugin = Plugin::default(); - - let set1 = ActionConfig { - stream_name: "stream".into(), - filter_name: "filter".into(), - action_name: "action1".into(), - action_type: "nftables".into(), - config: json!({ - "set": "test", - "target": "drop", - "hooks": ["input"], - "action": "add", - }) - .into(), - patterns: vec!["ip".into()], - }; - - let set2 = ActionConfig { - stream_name: "stream".into(), - filter_name: "filter".into(), - action_name: "action2".into(), - action_type: "nftables".into(), - config: json!({ - "set": "test", - "target": "drop", - "version": "ip", - "action": "add", - }) - .into(), - patterns: vec!["ip".into()], - }; - - let set3 = ActionConfig { - stream_name: "stream".into(), - filter_name: "filter".into(), - action_name: "action2".into(), - action_type: "nftables".into(), - config: json!({ - "set": "test", - "action": "delete", - }) - .into(), - patterns: vec!["ip".into()], - }; - - let res = plugin - .load_config( - vec![], - vec![ - // First set - set1.clone(), - // Same set, adding options, no conflict - set2.clone(), - // Same set, no new options, no conflict - set3.clone(), - // Unrelated set, so no conflict - ActionConfig { - stream_name: "stream".into(), - filter_name: "filter".into(), - action_name: "action3".into(), - action_type: "nftables".into(), - config: json!({ - "set": "test2", - "target": "return", - "version": "ipv6", - }) - .into(), - patterns: vec!["ip".into()], - }, - ], - ) - .await; - - assert!(res.is_ok(), "res: {:?}", res.map(|_| ())); - - // Another set with conflict is not ok - let res = plugin - .load_config( - vec![], - vec![ - // First set - set1, - // Same set, adding options, no conflict - set2, - // Same set, no new options, no conflict - set3, - // Another set with conflict - ActionConfig { - stream_name: "stream".into(), - filter_name: "filter".into(), - action_name: "action3".into(), - action_type: "nftables".into(), - config: json!({ - "set": "test", - "target": "target2", - "action": "del", - }) - .into(), - patterns: vec!["ip".into()], - }, - ], - ) - .await; - assert!(res.is_err(), "res: {:?}", res.map(|_| ())); -} diff --git a/plugins/reaction-plugin/Cargo.toml b/plugins/reaction-plugin/Cargo.toml index 0fa3a38..08d1eb6 100644 --- a/plugins/reaction-plugin/Cargo.toml +++ b/plugins/reaction-plugin/Cargo.toml @@ -11,6 +11,7 @@ categories = ["security"] description = "Plugin interface for reaction, a daemon that scans logs and takes action (alternative to fail2ban)" [dependencies] +chrono.workspace = true remoc.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/release.py b/release.py index 8359c7a..5fa870c 100644 --- a/release.py +++ b/release.py @@ -102,8 +102,9 @@ def main(): You'll need to install minisign to check the authenticity of the package. -After installing reaction, create your configuration file(s) in JSON, YAML or JSONnet in the -`/etc/reaction/` directory. +After installing reaction, create your configuration file at +`/etc/reaction.json`, `/etc/reaction.jsonnet` or `/etc/reaction.yml`. +You can also provide a directory containing multiple configuration files in the previous formats. See for documentation. Reload systemd: @@ -113,8 +114,8 @@ $ sudo systemctl daemon-reload Then enable and start reaction with this command ```bash -# write first your configuration file(s) in /etc/reaction/ -$ sudo systemctl enable --now reaction.service +# replace `reaction.jsonnet` with the name of your configuration file in /etc/ +$ sudo systemctl enable --now reaction@reaction.jsonnet.service ``` """.strip(), ] diff --git a/shell.nix b/shell.nix index ecb4318..27dac77 100644 --- a/shell.nix +++ b/shell.nix @@ -4,7 +4,6 @@ pkgs.mkShell { name = "libipset"; buildInputs = [ ipset - nftables clang ]; src = null; diff --git a/src/concepts/plugin.rs b/src/concepts/plugin.rs index 5e22287..c5bc330 100644 --- a/src/concepts/plugin.rs +++ b/src/concepts/plugin.rs @@ -1,7 +1,5 @@ use std::{collections::BTreeMap, io::Error, path, process::Stdio}; -#[cfg(target_os = "macos")] -use std::os::darwin::fs::MetadataExt; #[cfg(target_os = "freebsd")] use std::os::freebsd::fs::MetadataExt; #[cfg(target_os = "illumos")] @@ -172,8 +170,6 @@ impl Plugin { let mut command = Command::new("run0"); // --pipe gives direct, non-emulated stdio access, for better performance. command.arg("--pipe"); - // run the command inside the same slice as reaction - command.arg("--slice-inherit"); // Make path absolute for systemd let full_workdir = path::absolute(&plugin_working_directory)?; diff --git a/src/concepts/stream.rs b/src/concepts/stream.rs index 97e6ece..3b3fdf5 100644 --- a/src/concepts/stream.rs +++ b/src/concepts/stream.rs @@ -1,7 +1,6 @@ use std::{cmp::Ordering, collections::BTreeMap, hash::Hash}; use reaction_plugin::StreamConfig; -use regex::RegexSet; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -20,11 +19,6 @@ pub struct Stream { #[serde(skip)] pub name: String, - #[serde(skip)] - pub compiled_regex_set: RegexSet, - #[serde(skip)] - pub regex_index_to_filter_name: Vec, - // Plugin-specific #[serde(default, rename = "type", skip_serializing_if = "Option::is_none")] pub stream_type: Option, @@ -96,21 +90,6 @@ impl Stream { filter.setup(name, key, patterns)?; } - let all_regexes: BTreeMap<_, _> = self - .filters - .values() - .flat_map(|filter| { - filter - .regex - .iter() - .map(|regex| (regex, filter.name.clone())) - }) - .collect(); - - self.compiled_regex_set = RegexSet::new(all_regexes.keys()) - .map_err(|err| format!("too much regexes on the filters of this stream: {err}"))?; - self.regex_index_to_filter_name = all_regexes.into_values().collect(); - Ok(()) } diff --git a/src/daemon/plugin/mod.rs b/src/daemon/plugin/mod.rs index c32e283..52ad0e8 100644 --- a/src/daemon/plugin/mod.rs +++ b/src/daemon/plugin/mod.rs @@ -1,6 +1,5 @@ use std::{ collections::{BTreeMap, BTreeSet}, - fmt::Display, io, ops::{Deref, DerefMut}, process::ExitStatus, @@ -53,7 +52,7 @@ impl PluginManager { let mut child = plugin .launch(state_directory) .await - .map_err(|err| systemd_error(plugin, "could not launch plugin", err))?; + .map_err(|err| format!("could not launch plugin: {err}"))?; { let stderr = child.stderr.take().unwrap(); @@ -71,7 +70,10 @@ impl PluginManager { ) = Connect::io(remoc::Cfg::default(), stdout, stdin) .await .map_err(|err| { - systemd_error(plugin, "could not init communication with plugin", err) + format!( + "could not init communication with plugin {}: {err}", + plugin.name + ) })?; tokio::spawn(conn); @@ -164,20 +166,6 @@ impl PluginManager { } } -fn systemd_error(plugin: &Plugin, message: &str, err: impl Display) -> String { - if plugin.systemd { - format!( - "{message}: {err}. \ -`plugins.{0}.systemd` is set to true, so this may be an issue with systemd's run0. \ -please make sure `sudo run0 ls /` returns the same thing as `sudo ls /` as a test. \ -if run0 can't be found or doesn't output anything, set `plugins.{0}.systemd` to false.", - plugin.name, - ) - } else { - format!("{message}: {err}") - } -} - async fn handle_stderr(stderr: ChildStderr, plugin_name: String) { // read lines until shutdown let lines = reader_to_stream(stderr); diff --git a/src/daemon/stream.rs b/src/daemon/stream.rs index 3ed0d56..7d50b54 100644 --- a/src/daemon/stream.rs +++ b/src/daemon/stream.rs @@ -1,10 +1,11 @@ use std::{ - collections::{BTreeSet, HashMap}, + collections::{BTreeMap, BTreeSet, HashMap}, process::Stdio, }; use futures::{FutureExt, Stream as AsyncStream, StreamExt, future::join_all}; use reaction_plugin::{StreamImpl, shutdown::ShutdownToken}; +use regex::RegexSet; use tokio::{ io::{AsyncBufReadExt, BufReader}, process::{Child, ChildStderr, ChildStdout, Command}, @@ -44,6 +45,7 @@ pub fn reader_to_stream( } pub struct StreamManager { + compiled_regex_set: RegexSet, regex_index_to_filter_manager: Vec, stream: &'static Stream, stream_plugin: Option, @@ -57,6 +59,16 @@ impl StreamManager { shutdown: ShutdownToken, plugins: &mut Plugins, ) -> Result { + let all_regexes: BTreeMap<_, _> = filter_managers + .iter() + .flat_map(|(filter, filter_manager)| { + filter + .regex + .iter() + .map(|regex| (regex, filter_manager.clone())) + }) + .collect(); + let stream_plugin = if stream.is_plugin() { Some( plugins @@ -72,23 +84,11 @@ impl StreamManager { None }; - let regex_index_to_filter_manager = stream - .regex_index_to_filter_name - .iter() - .map(|filter_name| { - filter_managers - .iter() - .find(|(filter, _)| filter_name == &filter.name) - .unwrap() - .1 - .clone() - }) - .collect(); - debug!("successfully initialized stream {}", stream.name); Ok(StreamManager { - regex_index_to_filter_manager, + compiled_regex_set: RegexSet::new(all_regexes.keys()).map_err(|err| err.to_string())?, + regex_index_to_filter_manager: all_regexes.into_values().collect(), stream, stream_plugin, shutdown, @@ -230,7 +230,7 @@ impl StreamManager { } fn matching_filters(&self, line: &str) -> BTreeSet<&FilterManager> { - let matches = self.stream.compiled_regex_set.matches(line); + let matches = self.compiled_regex_set.matches(line); matches .into_iter() .map(|match_| &self.regex_index_to_filter_manager[match_]) diff --git a/tests/notif.jsonnet b/tests/notif.jsonnet index 14d76c0..2e46471 100644 --- a/tests/notif.jsonnet +++ b/tests/notif.jsonnet @@ -41,23 +41,6 @@ }, }, }, - f2: { - regex: [ - "^can't found $", - ], - retry: 2, - retryperiod: '60s', - actions: { - damn: { - cmd: ['notify-send', 'you should not see that', 'ban '], - }, - undamn: { - cmd: ['notify-send', 'you should not see that', 'unban '], - after: '3s', - onexit: true, - }, - }, - }, }, }, },