diff --git a/.gitignore b/.gitignore index 55ecad6..07180cd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,22 +1,15 @@ /reaction -reaction*.db -reaction*.db.old +reaction.db +reaction.db.old /data -/lmdb -reaction*.export.json /reaction*.sock /result /wiki -/deb *.deb *.minisig *.qcow2 -debian-packaging/* *.swp -export-go-db/export-go-db -import-rust-db/target /target -reaction-plugin/target /local .ccls-cache .direnv diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml deleted file mode 100644 index 78d7601..0000000 --- a/.gitlab-ci.yml +++ /dev/null @@ -1,15 +0,0 @@ ---- -image: golang:1.20-bookworm -stages: - - build - -variables: - DEBIAN_FRONTEND: noninteractive - -test_building: - stage: build - before_script: - - apt-get -qq -y update - - apt-get -qq -y install build-essential devscripts debhelper quilt wget - script: - - make reaction ip46tables nft46 diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index f43d2ad..4289e65 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -6,6 +6,7 @@ Here is a high-level overview of the codebase. ## Build +- `bench/`: Configuration that spawns a very high load on reaction. Useful to test performance improvements and regressions. - `build.rs`: permits to create shell completions and man pages on build. - `Cargo.toml`, `Cargo.lock`: manifest and dependencies. - `config/`: example / test configuration files. Look at its git history to discover more. @@ -15,8 +16,7 @@ Here is a high-level overview of the codebase. ## Main source code -- `helpers_c/`: C helpers. I wish to have special IP support in reaction and get rid of them. See #79 and #116. -- `tests/`: Integration tests. For now they test basic reaction runtime behavior, persistance, and client-daemon communication. +- `tests/`: Integration tests. They test reaction runtime behavior, persistance, client-daemon communication, plugin integrations. - `src/`: The source code, here we go! ### Top-level files @@ -25,18 +25,13 @@ Here is a high-level overview of the codebase. - `src/lib.rs`: Second main entrypoint - `src/cli.rs`: Command-line arguments - `src/tests.rs`: Test utilities +- `src/protocol.rs`: de/serialization and client/daemon protocol messages. ### `src/concepts/` reaction really is about its configuration, which is at the center of the code. -There is one file for each of its concepts: configuration, streams, filters, actions, patterns. - -### `src/protocol/` - -Low-level serialization/deserialization and client-daemon protocol messages. - -Shared by the client and daemon's socket. Also used by daemon's database. +There is one file for each of its concepts: configuration, streams, filters, actions, patterns, plugins. ### `src/client/` @@ -58,9 +53,9 @@ This code has async code, to handle input streams and communication with clients - `mod.rs`: High-level logic - `state.rs`: Inner state operations - `socket.rs`: The socket task, responsible for communication with clients. -- `shutdown.rs`: Logic for passing shutdown signal across all tasks +- `plugin.rs`: Plugin startup, configuration loading and cleanup. -### `src/tree` +### `crates/treedb` Persistence layer. @@ -68,5 +63,19 @@ This is a database highly adapted to reaction workload, making reaction faster t (heed, sled and fjall crates have been tested). Its design is explained in the comments of its files: -- `mod.rs`: main database code, with its two API structs: Tree and Database. -- `raw.rs` low-level part, directly interacting with de/serializisation and files. +- `lib.rs`: main database code, with its two API structs: Tree and Database. +- `raw.rs`: low-level part, directly interacting with de/serializisation and files. +- `time.rs`: time definitions shared with reaction. +- `helpers.rs`: utilities to ease db deserialization from disk. + +### `plugins/reaction-plugin` + +Shared plugin interface between reaction daemon and its plugins. + +Also defines some shared logic between them: +- `shutdown.rs`: Logic for passing shutdown signal across all tasks +- `parse_duration.rs` Duration parsing + +### `plugins/reaction-plugin-*` + +All core plugins. diff --git a/Cargo.lock b/Cargo.lock index f5b71b2..c9a7b56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1980,6 +1980,15 @@ dependencies = [ "windows-link", ] +[[package]] +name = "libnftables1-sys" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b290d0d41f0ad578660aeed371bcae4cf85f129a6fe31350dbd2e097518cd7f" +dependencies = [ + "bindgen", +] + [[package]] name = "linux-raw-sys" version = "0.11.0" @@ -2248,6 +2257,22 @@ dependencies = [ "wmi", ] +[[package]] +name = "nftables" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c57e7343eed9e9330e084eef12651b15be3c8ed7825915a0ffa33736b852bed" +dependencies = [ + "schemars", + "serde", + "serde_json", + "serde_path_to_error", + "strum", + "strum_macros", + "thiserror 2.0.18", + "tokio", +] + [[package]] name = "nix" version = "0.29.0" @@ -2861,7 +2886,6 @@ dependencies = [ name = "reaction-plugin" version = "1.0.0" dependencies = [ - "chrono", "remoc", "serde", "serde_json", @@ -2899,6 +2923,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "reaction-plugin-nftables" +version = "0.1.0" +dependencies = [ + "libnftables1-sys", + "nftables", + "reaction-plugin", + "remoc", + "serde", + "serde_json", + "tokio", +] + [[package]] name = "reaction-plugin-virtual" version = "1.0.0" @@ -2919,6 +2956,26 @@ dependencies = [ "bitflags", ] +[[package]] +name = "ref-cast" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f354300ae66f76f1c85c5f84693f0ce81d747e2c3f21a45fef496d89c960bf7d" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "regex" version = "1.12.2" @@ -3194,6 +3251,31 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "schemars" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc" +dependencies = [ + "dyn-clone", + "ref-cast", + "schemars_derive", + "serde", + "serde_json", +] + +[[package]] +name = "schemars_derive" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d115b50f4aaeea07e79c1912f645c7513d81715d0420f8bc77a18c6260b307f" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.114", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -3287,6 +3369,17 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "serde_json" version = "1.0.149" @@ -3300,6 +3393,17 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" diff --git a/Cargo.toml b/Cargo.toml index dd537aa..66e95e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,8 @@ assets = [ [ "target/release/reaction.bash", "/usr/share/bash-completion/completions/reaction", "644" ], [ "target/release/reaction.fish", "/usr/share/fish/completions/", "644" ], [ "target/release/_reaction", "/usr/share/zsh/vendor-completions/", "644" ], + # Slice + [ "packaging/system-reaction.slice", "/usr/lib/systemd/system/", "644" ], ] [dependencies] @@ -83,6 +85,7 @@ members = [ "plugins/reaction-plugin", "plugins/reaction-plugin-cluster", "plugins/reaction-plugin-ipset", + "plugins/reaction-plugin-nftables", "plugins/reaction-plugin-virtual" ] diff --git a/packaging/reaction.service b/packaging/reaction.service index 5bd1478..343015d 100644 --- a/packaging/reaction.service +++ b/packaging/reaction.service @@ -1,6 +1,6 @@ # vim: ft=systemd [Unit] -Description=A daemon that scans program outputs for repeated patterns, and takes action. +Description=reaction daemon Documentation=https://reaction.ppom.me # Ensure reaction will insert its chain after docker has inserted theirs. Only useful when iptables & docker are used # After=docker.service @@ -17,6 +17,8 @@ RuntimeDirectory=reaction WorkingDirectory=/var/lib/reaction # Let reaction kill its child processes first KillMode=mixed +# Put reaction in its own slice so that plugins can be grouped within. +Slice=system-reaction.slice [Install] WantedBy=multi-user.target diff --git a/packaging/system-reaction.slice b/packaging/system-reaction.slice new file mode 100644 index 0000000..732f276 --- /dev/null +++ b/packaging/system-reaction.slice @@ -0,0 +1 @@ +[Slice] diff --git a/plugins/reaction-plugin-ipset/src/action.rs b/plugins/reaction-plugin-ipset/src/action.rs index 8522717..c820e28 100644 --- a/plugins/reaction-plugin-ipset/src/action.rs +++ b/plugins/reaction-plugin-ipset/src/action.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, u32}; +use std::{fmt::Debug, u32, usize}; use reaction_plugin::{Exec, shutdown::ShutdownToken, time::parse_duration}; use remoc::rch::mpsc as remocMpsc; @@ -39,9 +39,6 @@ pub enum AddDel { Del, } -// FIXME block configs that have different set options for the same name -// treat default values as none? - /// User-facing action options #[derive(Serialize, Deserialize)] #[serde(deny_unknown_fields)] @@ -176,7 +173,7 @@ impl Set { } } - pub async fn init(&self, ipset: &mut IpSet) -> Result<(), String> { + pub async fn init(&self, ipset: &mut IpSet) -> Result<(), (usize, String)> { for (set, version) in [ (&self.sets.ipv4, Version::IPv4), (&self.sets.ipv6, Version::IPv6), @@ -189,44 +186,42 @@ impl Set { version, timeout: self.timeout, })) - .await?; + .await + .map_err(|err| (0, err.to_string()))?; // insert set in chains - for chain in &self.chains { + for (i, chain) in self.chains.iter().enumerate() { ipset .order(Order::InsertSet(SetChain { set: set.clone(), chain: chain.clone(), target: self.target.clone(), })) - .await?; + .await + .map_err(|err| (i + 1, err.to_string()))?; } } } Ok(()) } - pub async fn destroy(&self, ipset: &mut IpSet) { - for (set, version) in [ - (&self.sets.ipv4, Version::IPv4), - (&self.sets.ipv6, Version::IPv6), - ] { + pub async fn destroy(&self, ipset: &mut IpSet, until: Option) { + for set in [&self.sets.ipv4, &self.sets.ipv6] { if let Some(set) = set { - for chain in &self.chains { - if let Err(err) = ipset + for chain in self + .chains + .iter() + .take(until.map(|until| until - 1).unwrap_or(usize::MAX)) + { + let _ = ipset .order(Order::RemoveSet(SetChain { set: set.clone(), chain: chain.clone(), target: self.target.clone(), })) - .await - { - eprintln!( - "ERROR while removing {version} set {set} from chain {chain}: {err}" - ); - } + .await; } - if let Err(err) = ipset.order(Order::DestroySet(set.clone())).await { - eprintln!("ERROR while destroying {version} set {set}: {err}"); + if until.is_none_or(|until| until != 0) { + let _ = ipset.order(Order::DestroySet(set.clone())).await; } } } diff --git a/plugins/reaction-plugin-ipset/src/ipset.rs b/plugins/reaction-plugin-ipset/src/ipset.rs index b2fcb78..81b1061 100644 --- a/plugins/reaction-plugin-ipset/src/ipset.rs +++ b/plugins/reaction-plugin-ipset/src/ipset.rs @@ -72,7 +72,7 @@ impl IpSet { pub enum IpSetError { Thread(String), - IpSet(String), + IpSet(()), } impl Display for IpSetError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -81,7 +81,7 @@ impl Display for IpSetError { "{}", match self { IpSetError::Thread(err) => err, - IpSetError::IpSet(err) => err, + IpSetError::IpSet(()) => "ipset error", } ) } @@ -90,12 +90,12 @@ impl From for String { fn from(value: IpSetError) -> Self { match value { IpSetError::Thread(err) => err, - IpSetError::IpSet(err) => err, + IpSetError::IpSet(()) => "ipset error".to_string(), } } } -pub type OrderType = (Order, oneshot::Sender>); +pub type OrderType = (Order, oneshot::Sender>); struct Set { session: Session, @@ -121,7 +121,7 @@ impl IPsetManager { } } - fn handle_order(&mut self, order: Order) -> Result<(), String> { + fn handle_order(&mut self, order: Order) -> Result<(), ()> { match order { Order::CreateSet(CreateSet { name, @@ -139,7 +139,7 @@ impl IPsetManager { }; builder.with_ipv6(version == Version::IPv6)?.build() }) - .map_err(|err| format!("Could not create set {name}: {err}"))?; + .map_err(|err| eprintln!("ERROR Could not create set {name}: {err}"))?; self.sessions.insert(name, Set { session, version }); } @@ -149,7 +149,7 @@ impl IPsetManager { session .session .destroy() - .map_err(|err| format!("Could not destroy set {set}: {err}"))?; + .map_err(|err| eprintln!("ERROR Could not destroy set {set}: {err}"))?; } } @@ -162,9 +162,13 @@ impl IPsetManager { Ok(()) } - fn insert_remove_ip(&mut self, set: String, ip: String, insert: bool) -> Result<(), String> { + fn insert_remove_ip(&mut self, set: String, ip: String, insert: bool) -> Result<(), ()> { + self._insert_remove_ip(set, ip, insert) + .map_err(|err| eprintln!("ERROR {err}")) + } + fn _insert_remove_ip(&mut self, set: String, ip: String, insert: bool) -> Result<(), String> { let session = self.sessions.get_mut(&set).ok_or(format!( - "No set handled by us with this name: {set}. This likely is a bug." + "No set handled by this plugin with this name: {set}. This likely is a bug." ))?; let mut net_data = NetDataType::new(Ipv4Addr::LOCALHOST, 0); @@ -182,24 +186,28 @@ impl IPsetManager { Ok(()) } - fn insert_remove_set(&self, options: SetChain, insert: bool) -> Result<(), String> { - let SetChain { - set, - chain, - target: action, - } = options; + fn insert_remove_set(&self, options: SetChain, insert: bool) -> Result<(), ()> { + self._insert_remove_set(options, insert) + .map_err(|err| eprintln!("ERROR {err}")) + } + fn _insert_remove_set(&self, options: SetChain, insert: bool) -> Result<(), String> { + let SetChain { set, chain, target } = options; let version = self .sessions .get(&set) - .ok_or(format!("No set managed by us with this name: {set}"))? + .ok_or(format!( + "No set managed by this plugin with this name: {set}" + ))? .version; - if insert { - eprintln!("INFO inserting {version} set {set} in chain {chain}"); + let (verb, verbing, from) = if insert { + ("insert", "inserting", "in") } else { - eprintln!("INFO removing {version} set {set} from chain {chain}"); - } + ("remove", "removing", "from") + }; + + eprintln!("INFO {verbing} {version} set {set} {from} chain {chain}"); let command = match version { Version::IPv4 => "iptables", @@ -217,20 +225,20 @@ impl IPsetManager { &set, "src", "-j", - &action, + &target, ]) .spawn() - .map_err(|err| format!("Could not insert ipset {set} in chain {chain}: {err}"))?; + .map_err(|err| format!("Could not {verb} ipset {set} {from} chain {chain}: Could not execute {command}: {err}"))?; let exit = child .wait() - .map_err(|err| format!("Could not insert ipset: {err}"))?; + .map_err(|err| format!("Could not {verb} ipset {set} {from} chain {chain}: {err}"))?; if exit.success() { Ok(()) } else { Err(format!( - "Could not insert ipset: exit code {}", + "Could not {verb} ipset: exit code {}", exit.code() .map(|c| c.to_string()) .unwrap_or_else(|| "".to_string()) diff --git a/plugins/reaction-plugin-ipset/src/main.rs b/plugins/reaction-plugin-ipset/src/main.rs index 1117529..828c3a8 100644 --- a/plugins/reaction-plugin-ipset/src/main.rs +++ b/plugins/reaction-plugin-ipset/src/main.rs @@ -109,15 +109,21 @@ impl PluginInfo for Plugin { let mut first_error = None; for (i, set) in self.sets.iter().enumerate() { // Retain if error - if let Err(err) = set.init(&mut self.ipset).await { - first_error = Some((i, RemoteError::Plugin(err))); + if let Err((failed_step, err)) = set.init(&mut self.ipset).await { + first_error = Some((i, failed_step, RemoteError::Plugin(err))); break; } } // Destroy initialized sets if error - if let Some((i, err)) = first_error { - for set in self.sets.iter().take(i + 1) { - let _ = set.destroy(&mut self.ipset).await; + if let Some((last_set, failed_step, err)) = first_error { + eprintln!("DEBUG last_set: {last_set} failed_step: {failed_step} err: {err}"); + for (curr_set, set) in self.sets.iter().enumerate().take(last_set + 1) { + let until = if last_set == curr_set { + Some(failed_step) + } else { + None + }; + let _ = set.destroy(&mut self.ipset, until).await; } return Err(err); } @@ -148,6 +154,6 @@ impl PluginInfo for Plugin { async fn destroy_sets_at_shutdown(mut ipset: IpSet, sets: Vec, shutdown: ShutdownToken) { shutdown.wait().await; for set in sets { - set.destroy(&mut ipset).await; + set.destroy(&mut ipset, None).await; } } diff --git a/plugins/reaction-plugin-nftables/Cargo.toml b/plugins/reaction-plugin-nftables/Cargo.toml new file mode 100644 index 0000000..1de8e6b --- /dev/null +++ b/plugins/reaction-plugin-nftables/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "reaction-plugin-nftables" +version = "0.1.0" +edition = "2024" + +[dependencies] +tokio = { workspace = true, features = ["rt-multi-thread"] } +remoc.workspace = true +reaction-plugin.path = "../reaction-plugin" +serde.workspace = true +serde_json.workspace = true +nftables = { version = "0.6.3", features = ["tokio"] } +libnftables1-sys = { version = "0.1.1" } diff --git a/plugins/reaction-plugin-nftables/src/action.rs b/plugins/reaction-plugin-nftables/src/action.rs new file mode 100644 index 0000000..3898ab5 --- /dev/null +++ b/plugins/reaction-plugin-nftables/src/action.rs @@ -0,0 +1,493 @@ +use std::{ + borrow::Cow, + collections::HashSet, + fmt::{Debug, Display}, + u32, +}; + +use nftables::{ + batch::Batch, + expr::Expression, + schema::{Element, NfListObject, Rule, SetFlag, SetType, SetTypeValue}, + stmt::Statement, + types::{NfFamily, NfHook}, +}; +use reaction_plugin::{Exec, shutdown::ShutdownToken, time::parse_duration}; +use remoc::rch::mpsc as remocMpsc; +use serde::{Deserialize, Serialize}; + +use crate::{helpers::Version, nft::NftClient}; + +#[derive(Default, Serialize, Deserialize, PartialEq, Eq, Clone, Copy)] +pub enum IpVersion { + #[default] + #[serde(rename = "ip")] + Ip, + #[serde(rename = "ipv4")] + Ipv4, + #[serde(rename = "ipv6")] + Ipv6, +} +impl Debug for IpVersion { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + IpVersion::Ipv4 => "ipv4", + IpVersion::Ipv6 => "ipv6", + IpVersion::Ip => "ip", + } + ) + } +} + +#[derive(Default, Debug, Serialize, Deserialize)] +pub enum AddDel { + #[default] + #[serde(alias = "add")] + Add, + #[serde(alias = "delete")] + Delete, +} + +/// User-facing action options +#[derive(Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ActionOptions { + /// The set that should be used by this action + pub set: String, + /// The pattern name of the IP. + /// Defaults to "ip" + #[serde(default = "serde_ip")] + pub pattern: String, + #[serde(skip)] + ip_index: usize, + // Whether the action is to "add" or "del" the ip from the set + #[serde(default)] + action: AddDel, + + #[serde(flatten)] + pub set_options: SetOptions, +} + +fn serde_ip() -> String { + "ip".into() +} + +impl ActionOptions { + pub fn set_ip_index(&mut self, patterns: Vec) -> Result<(), ()> { + self.ip_index = patterns + .into_iter() + .enumerate() + .filter(|(_, name)| name == &self.pattern) + .next() + .ok_or(())? + .0; + Ok(()) + } +} + +/// Merged set options +#[derive(Default, Clone, Deserialize, Serialize, Debug, PartialEq, Eq)] +pub struct SetOptions { + /// The IP type. + /// Defaults to `46`. + /// If `ipv4`: creates an IPv4 set with this name + /// If `ipv6`: creates an IPv6 set with this name + /// If `ip`: creates an IPv4 set with its name suffixed by 'v4' AND an IPv6 set with its name suffixed by 'v6' + /// *Merged set-wise*. + #[serde(default)] + version: Option, + /// Chains where the IP set should be inserted. + /// Defaults to `["input", "forward"]` + /// *Merged set-wise*. + #[serde(default)] + hooks: Option>, + // Optional timeout, letting linux/netfilter handle set removal instead of reaction + // Note that `reaction show` and `reaction flush` won't work if set instead of an `after` action + // Same syntax as after and retryperiod in reaction. + /// *Merged set-wise*. + #[serde(skip_serializing_if = "Option::is_none")] + timeout: Option, + #[serde(skip)] + timeout_u32: Option, + // Target that iptables should use when the IP is encountered. + // Defaults to DROP, but can also be ACCEPT, RETURN or any user-defined chain + /// *Merged set-wise*. + #[serde(default)] + target: Option, +} + +impl SetOptions { + pub fn merge(&mut self, options: &SetOptions) -> Result<(), String> { + // merge two Option and fail if there is conflict + fn inner_merge( + a: &mut Option, + b: &Option, + name: &str, + ) -> Result<(), String> { + match (&a, &b) { + (Some(aa), Some(bb)) => { + if aa != bb { + return Err(format!( + "Conflicting options for {name}: `{aa:?}` and `{bb:?}`" + )); + } + } + (None, Some(_)) => { + *a = b.clone(); + } + _ => (), + }; + Ok(()) + } + + inner_merge(&mut self.version, &options.version, "version")?; + inner_merge(&mut self.timeout, &options.timeout, "timeout")?; + inner_merge(&mut self.hooks, &options.hooks, "chains")?; + inner_merge(&mut self.target, &options.target, "target")?; + + if let Some(timeout) = &self.timeout { + let duration = parse_duration(timeout) + .map_err(|err| format!("failed to parse timeout: {}", err))? + .as_secs(); + if duration > u32::MAX as u64 { + return Err(format!( + "timeout is limited to {} seconds (approx {} days)", + u32::MAX, + 49_000 + )); + } + self.timeout_u32 = Some(duration as u32); + } + + Ok(()) + } +} + +#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum RHook { + Ingress, + Prerouting, + Forward, + Input, + Output, + Postrouting, + Egress, +} + +impl RHook { + pub fn as_str(&self) -> &'static str { + match self { + RHook::Ingress => "ingress", + RHook::Prerouting => "prerouting", + RHook::Forward => "forward", + RHook::Input => "input", + RHook::Output => "output", + RHook::Postrouting => "postrouting", + RHook::Egress => "egress", + } + } +} + +impl Display for RHook { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +impl From<&RHook> for NfHook { + fn from(value: &RHook) -> Self { + match value { + RHook::Ingress => Self::Ingress, + RHook::Prerouting => Self::Prerouting, + RHook::Forward => Self::Forward, + RHook::Input => Self::Input, + RHook::Output => Self::Output, + RHook::Postrouting => Self::Postrouting, + RHook::Egress => Self::Egress, + } + } +} + +#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum RStatement { + Accept, + Drop, + Continue, + Return, +} + +pub struct Set { + pub sets: SetNames, + pub hooks: Vec, + pub timeout: Option, + pub target: RStatement, +} + +impl Set { + pub fn from(name: String, options: SetOptions) -> Self { + Self { + sets: SetNames::new(name, options.version), + timeout: options.timeout_u32, + target: options.target.unwrap_or(RStatement::Drop), + hooks: options.hooks.unwrap_or(vec![RHook::Input, RHook::Forward]), + } + } + + pub fn init<'a>(&self, batch: &mut Batch<'a>) -> Result<(), String> { + for (set, version) in [ + (&self.sets.ipv4, Version::IPv4), + (&self.sets.ipv6, Version::IPv6), + ] { + if let Some(set) = set { + let family = NfFamily::INet; + let table = Cow::from("reaction"); + + // create set + batch.add(NfListObject::<'a>::Set(Box::new(nftables::schema::Set::< + 'a, + > { + family, + table: table.to_owned(), + name: Cow::Owned(set.to_owned()), + // TODO Try a set which is both ipv4 and ipv6? + set_type: SetTypeValue::Single(match version { + Version::IPv4 => SetType::Ipv4Addr, + Version::IPv6 => SetType::Ipv6Addr, + }), + flags: Some({ + let mut flags = HashSet::from([SetFlag::Interval]); + if self.timeout.is_some() { + flags.insert(SetFlag::Timeout); + } + flags + }), + timeout: self.timeout.clone(), + ..Default::default() + }))); + // insert set in chains + let expr = vec![match self.target { + RStatement::Accept => Statement::Accept(None), + RStatement::Drop => Statement::Drop(None), + RStatement::Continue => Statement::Continue(None), + RStatement::Return => Statement::Return(None), + }]; + for hook in &self.hooks { + batch.add(NfListObject::Rule(Rule { + family, + table: table.to_owned(), + chain: Cow::from(hook.to_string()), + expr: Cow::Owned(expr.clone()), + ..Default::default() + })); + } + } + } + Ok(()) + } +} + +pub struct SetNames { + pub ipv4: Option, + pub ipv6: Option, +} + +impl SetNames { + pub fn new(name: String, version: Option) -> Self { + Self { + ipv4: match version { + Some(IpVersion::Ipv4) => Some(name.clone()), + Some(IpVersion::Ipv6) => None, + None | Some(IpVersion::Ip) => Some(format!("{}v4", name)), + }, + ipv6: match version { + Some(IpVersion::Ipv4) => None, + Some(IpVersion::Ipv6) => Some(name), + None | Some(IpVersion::Ip) => Some(format!("{}v6", name)), + }, + } + } +} + +pub struct Action { + nft: NftClient, + rx: remocMpsc::Receiver, + shutdown: ShutdownToken, + sets: SetNames, + // index of pattern ip in match vec + ip_index: usize, + action: AddDel, +} + +impl Action { + pub fn new( + nft: NftClient, + shutdown: ShutdownToken, + rx: remocMpsc::Receiver, + options: ActionOptions, + ) -> Result { + Ok(Action { + nft, + rx, + shutdown, + sets: SetNames::new(options.set, options.set_options.version), + ip_index: options.ip_index, + action: options.action, + }) + } + + pub async fn serve(mut self) { + loop { + let event = tokio::select! { + exec = self.rx.recv() => Some(exec), + _ = self.shutdown.wait() => None, + }; + match event { + // shutdown asked + None => break, + // channel closed + Some(Ok(None)) => break, + // error from channel + Some(Err(err)) => { + eprintln!("ERROR {err}"); + break; + } + // ok + Some(Ok(Some(exec))) => { + if let Err(err) = self.handle_exec(exec).await { + eprintln!("ERROR {err}"); + break; + } + } + } + } + // eprintln!("DEBUG Asking for shutdown"); + // self.shutdown.ask_shutdown(); + } + + async fn handle_exec(&mut self, mut exec: Exec) -> Result<(), String> { + // safeguard against Vec::remove's panic + if exec.match_.len() <= self.ip_index { + return Err(format!( + "match received from reaction is smaller than expected. looking for index {} but size is {}. this is a bug!", + self.ip_index, + exec.match_.len() + )); + } + let ip = exec.match_.remove(self.ip_index); + // select set + let set = match (&self.sets.ipv4, &self.sets.ipv6) { + (None, None) => return Err(format!("action is neither IPv4 nor IPv6, this is a bug!")), + (None, Some(set)) => set, + (Some(set), None) => set, + (Some(set4), Some(set6)) => { + if ip.contains(':') { + set6 + } else { + set4 + } + } + }; + // add/remove ip to set + let element = NfListObject::Element(Element { + family: NfFamily::INet, + table: Cow::from("reaction"), + name: Cow::from(set), + elem: Cow::from(vec![Expression::String(Cow::from(ip.clone()))]), + }); + let mut batch = Batch::new(); + match self.action { + AddDel::Add => batch.add(element), + AddDel::Delete => batch.delete(element), + }; + match self.nft.send(batch).await { + Ok(ok) => { + eprintln!("DEBUG action ok {:?} {ip}: {ok}", self.action); + Ok(()) + } + Err(err) => Err(format!("action ko {:?} {ip}: {err}", self.action)), + } + } +} + +#[cfg(test)] +mod tests { + use crate::action::{IpVersion, RHook, RStatement, SetOptions}; + + #[tokio::test] + async fn set_options_merge() { + let s1 = SetOptions { + version: None, + hooks: None, + timeout: None, + timeout_u32: None, + target: None, + }; + let s2 = SetOptions { + version: Some(IpVersion::Ipv4), + hooks: Some(vec![RHook::Input]), + timeout: Some("3h".into()), + timeout_u32: Some(3 * 3600), + target: Some(RStatement::Drop), + }; + assert_ne!(s1, s2); + assert_eq!(s1, SetOptions::default()); + + { + // s2 can be merged in s1 + let mut s1 = s1.clone(); + assert!(s1.merge(&s2).is_ok()); + assert_eq!(s1, s2); + } + + { + // s1 can be merged in s2 + let mut s2 = s2.clone(); + assert!(s2.merge(&s1).is_ok()); + } + + { + // s1 can be merged in itself + let mut s3 = s1.clone(); + assert!(s3.merge(&s1).is_ok()); + assert_eq!(s1, s3); + } + + { + // s2 can be merged in itself + let mut s3 = s2.clone(); + assert!(s3.merge(&s2).is_ok()); + assert_eq!(s2, s3); + } + + for s3 in [ + SetOptions { + version: Some(IpVersion::Ipv6), + ..Default::default() + }, + SetOptions { + hooks: Some(vec![RHook::Output]), + ..Default::default() + }, + SetOptions { + timeout: Some("30min".into()), + ..Default::default() + }, + SetOptions { + target: Some(RStatement::Continue), + ..Default::default() + }, + ] { + // none with some is ok + assert!(s3.clone().merge(&s1).is_ok(), "s3: {s3:?}"); + assert!(s1.clone().merge(&s3).is_ok(), "s3: {s3:?}"); + // different some is ko + assert!(s3.clone().merge(&s2).is_err(), "s3: {s3:?}"); + assert!(s2.clone().merge(&s3).is_err(), "s3: {s3:?}"); + } + } +} diff --git a/plugins/reaction-plugin-nftables/src/helpers.rs b/plugins/reaction-plugin-nftables/src/helpers.rs new file mode 100644 index 0000000..b8b97b2 --- /dev/null +++ b/plugins/reaction-plugin-nftables/src/helpers.rs @@ -0,0 +1,15 @@ +use std::fmt::Display; + +#[derive(PartialEq, Eq, PartialOrd, Ord, Copy, Clone)] +pub enum Version { + IPv4, + IPv6, +} +impl Display for Version { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + Version::IPv4 => "IPv4", + Version::IPv6 => "IPv6", + }) + } +} diff --git a/plugins/reaction-plugin-nftables/src/main.rs b/plugins/reaction-plugin-nftables/src/main.rs new file mode 100644 index 0000000..6a7f067 --- /dev/null +++ b/plugins/reaction-plugin-nftables/src/main.rs @@ -0,0 +1,176 @@ +use std::{ + borrow::Cow, + collections::{BTreeMap, BTreeSet}, +}; + +use nftables::{ + batch::Batch, + schema::{Chain, NfListObject, Table}, + types::{NfChainType, NfFamily}, +}; +use reaction_plugin::{ + ActionConfig, ActionImpl, Hello, Manifest, PluginInfo, RemoteResult, StreamConfig, StreamImpl, + shutdown::ShutdownController, +}; +use remoc::rtc; + +use crate::{ + action::{Action, ActionOptions, Set, SetOptions}, + nft::NftClient, +}; + +#[cfg(test)] +mod tests; + +mod action; +pub mod helpers; +mod nft; + +#[tokio::main] +async fn main() { + let plugin = Plugin::default(); + reaction_plugin::main_loop(plugin).await; +} + +#[derive(Default)] +struct Plugin { + nft: NftClient, + sets: Vec, + actions: Vec, + shutdown: ShutdownController, +} + +impl PluginInfo for Plugin { + async fn manifest(&mut self) -> Result { + Ok(Manifest { + hello: Hello::new(), + streams: BTreeSet::default(), + actions: BTreeSet::from(["nftables".into()]), + }) + } + + async fn load_config( + &mut self, + streams: Vec, + actions: Vec, + ) -> RemoteResult<(Vec, Vec)> { + if !streams.is_empty() { + return Err("This plugin can't handle any stream type".into()); + } + + let mut ret_actions = Vec::with_capacity(actions.len()); + let mut set_options: BTreeMap = BTreeMap::new(); + + for ActionConfig { + stream_name, + filter_name, + action_name, + action_type, + config, + patterns, + } in actions + { + if &action_type != "nftables" { + return Err("This plugin can't handle other action types than nftables".into()); + } + + let mut options: ActionOptions = serde_json::from_value(config.into()).map_err(|err| { + format!("invalid options for action {stream_name}.{filter_name}.{action_name}: {err}") + })?; + + options.set_ip_index(patterns).map_err(|_| + format!( + "No pattern with name {} in filter {stream_name}.{filter_name}. Try setting the option `pattern` to your pattern name of type 'ip'", + &options.pattern + ) + )?; + + // Merge option + set_options + .entry(options.set.clone()) + .or_default() + .merge(&options.set_options) + .map_err(|err| format!("set {}: {err}", options.set))?; + + let (tx, rx) = remoc::rch::mpsc::channel(1); + self.actions.push(Action::new( + self.nft.clone(), + self.shutdown.token(), + rx, + options, + )?); + + ret_actions.push(ActionImpl { tx }); + } + + // Init all sets + while let Some((name, options)) = set_options.pop_first() { + self.sets.push(Set::from(name, options)); + } + + Ok((vec![], ret_actions)) + } + + async fn start(&mut self) -> RemoteResult<()> { + self.shutdown.delegate().handle_quit_signals()?; + + let mut batch = Batch::new(); + batch.add(reaction_table()); + + // Create a chain for each registered netfilter hook + for hook in self + .sets + .iter() + .flat_map(|set| &set.hooks) + .collect::>() + { + batch.add(NfListObject::Chain(Chain { + family: NfFamily::INet, + table: Cow::Borrowed("reaction"), + name: Cow::from(hook.as_str()), + _type: Some(NfChainType::Filter), + hook: Some(hook.into()), + prio: Some(0), + ..Default::default() + })); + } + + for set in &self.sets { + set.init(&mut batch)?; + } + + // TODO apply batch + self.nft.send(batch).await?; + + // Launch a task that will destroy the table on shutdown + { + let token = self.shutdown.token(); + tokio::spawn(async move { + token.wait().await; + Batch::new().delete(reaction_table()); + }); + } + + // Launch all actions + while let Some(action) = self.actions.pop() { + tokio::spawn(async move { action.serve().await }); + } + self.actions = Default::default(); + + Ok(()) + } + + async fn close(self) -> RemoteResult<()> { + self.shutdown.ask_shutdown(); + self.shutdown.wait_all_task_shutdown().await; + Ok(()) + } +} + +fn reaction_table() -> NfListObject<'static> { + NfListObject::Table(Table { + family: NfFamily::INet, + name: Cow::Borrowed("reaction"), + handle: None, + }) +} diff --git a/plugins/reaction-plugin-nftables/src/nft.rs b/plugins/reaction-plugin-nftables/src/nft.rs new file mode 100644 index 0000000..a1641d2 --- /dev/null +++ b/plugins/reaction-plugin-nftables/src/nft.rs @@ -0,0 +1,81 @@ +use std::{ + ffi::{CStr, CString}, + thread, +}; + +use libnftables1_sys::Nftables; +use nftables::batch::Batch; +use tokio::sync::{mpsc, oneshot}; + +/// A client with a dedicated server thread to libnftables. +/// Calling [`Default::default()`] spawns a new server thread. +/// Cloning just creates a new client to the same server thread. +#[derive(Clone)] +pub struct NftClient { + tx: mpsc::Sender, +} + +impl Default for NftClient { + fn default() -> Self { + let (tx, mut rx) = mpsc::channel(10); + + thread::spawn(move || { + let mut conn = Nftables::new(); + + while let Some(NftCommand { json, ret }) = rx.blocking_recv() { + let (rc, output, error) = conn.run_cmd(json.as_ptr()); + let res = match rc { + 0 => to_rust_string(output) + .ok_or_else(|| "unknown ok (rc = 0 but no output buffer)".into()), + _ => to_rust_string(error) + .map(|err| format!("error (rc = {rc}: {err})")) + .ok_or_else(|| format!("unknown error (rc = {rc} but no error buffer)")), + }; + let _ = ret.send(res); + } + }); + + NftClient { tx } + } +} + +impl NftClient { + /// Send a batch to nftables. + pub async fn send(&self, batch: Batch<'_>) -> Result { + // convert JSON to CString + let mut json = serde_json::to_vec(&batch.to_nftables()) + .map_err(|err| format!("couldn't build json to send to nftables: {err}"))?; + json.push('\0' as u8); + let json = CString::from_vec_with_nul(json) + .map_err(|err| format!("invalid json with null char: {err}"))?; + + // Send command + let (tx, rx) = oneshot::channel(); + let command = NftCommand { json, ret: tx }; + self.tx + .send(command) + .await + .map_err(|err| format!("nftables thread has quit, can't send command: {err}"))?; + + // Wait for result + rx.await + .map_err(|_| format!("nftables thread has quit, no response for command"))? + } +} + +struct NftCommand { + json: CString, + ret: oneshot::Sender>, +} + +fn to_rust_string(c_ptr: *const i8) -> Option { + if c_ptr.is_null() { + None + } else { + Some( + unsafe { CStr::from_ptr(c_ptr) } + .to_string_lossy() + .into_owned(), + ) + } +} diff --git a/plugins/reaction-plugin-nftables/src/tests.rs b/plugins/reaction-plugin-nftables/src/tests.rs new file mode 100644 index 0000000..7c2de26 --- /dev/null +++ b/plugins/reaction-plugin-nftables/src/tests.rs @@ -0,0 +1,247 @@ +use reaction_plugin::{ActionConfig, PluginInfo, StreamConfig, Value}; +use serde_json::json; + +use crate::Plugin; + +#[tokio::test] +async fn conf_stream() { + // No stream is supported by nftables + assert!( + Plugin::default() + .load_config( + vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "nftables".into(), + config: Value::Null + }], + vec![] + ) + .await + .is_err() + ); + + // Empty config is ok + assert!(Plugin::default().load_config(vec![], vec![]).await.is_ok()); +} + +#[tokio::test] +async fn conf_action_standalone() { + let p = vec!["name".into(), "ip".into(), "ip2".into()]; + let p_noip = vec!["name".into(), "ip2".into()]; + + for (is_ok, conf, patterns) in [ + // minimal set + (true, json!({ "set": "test" }), &p), + // missing set key + (false, json!({}), &p), + (false, json!({ "version": "ipv4" }), &p), + // unknown key + (false, json!({ "set": "test", "unknown": "yes" }), &p), + (false, json!({ "set": "test", "ip_index": 1 }), &p), + (false, json!({ "set": "test", "timeout_u32": 1 }), &p), + // pattern // + (true, json!({ "set": "test" }), &p), + (true, json!({ "set": "test", "pattern": "ip" }), &p), + (true, json!({ "set": "test", "pattern": "ip2" }), &p), + (true, json!({ "set": "test", "pattern": "ip2" }), &p_noip), + // unknown pattern "ip" + (false, json!({ "set": "test" }), &p_noip), + (false, json!({ "set": "test", "pattern": "ip" }), &p_noip), + // unknown pattern + (false, json!({ "set": "test", "pattern": "unknown" }), &p), + (false, json!({ "set": "test", "pattern": "uwu" }), &p_noip), + // bad type + (false, json!({ "set": "test", "pattern": 0 }), &p_noip), + (false, json!({ "set": "test", "pattern": true }), &p_noip), + // action // + (true, json!({ "set": "test", "action": "add" }), &p), + (true, json!({ "set": "test", "action": "delete" }), &p), + // unknown action + (false, json!({ "set": "test", "action": "create" }), &p), + (false, json!({ "set": "test", "action": "insert" }), &p), + (false, json!({ "set": "test", "action": "del" }), &p), + (false, json!({ "set": "test", "action": "destroy" }), &p), + // bad type + (false, json!({ "set": "test", "action": true }), &p), + (false, json!({ "set": "test", "action": 1 }), &p), + // ip version // + // ok + (true, json!({ "set": "test", "version": "ipv4" }), &p), + (true, json!({ "set": "test", "version": "ipv6" }), &p), + (true, json!({ "set": "test", "version": "ip" }), &p), + // unknown version + (false, json!({ "set": "test", "version": 4 }), &p), + (false, json!({ "set": "test", "version": 6 }), &p), + (false, json!({ "set": "test", "version": 46 }), &p), + (false, json!({ "set": "test", "version": "5" }), &p), + (false, json!({ "set": "test", "version": "ipv5" }), &p), + (false, json!({ "set": "test", "version": "4" }), &p), + (false, json!({ "set": "test", "version": "6" }), &p), + (false, json!({ "set": "test", "version": "46" }), &p), + // bad type + (false, json!({ "set": "test", "version": true }), &p), + // hooks // + // everything is fine really + (true, json!({ "set": "test", "hooks": [] }), &p), + ( + true, + json!({ "set": "test", "hooks": ["input", "forward", "ingress", "prerouting", "output", "postrouting", "egress"] }), + &p, + ), + (false, json!({ "set": "test", "hooks": ["INPUT"] }), &p), + (false, json!({ "set": "test", "hooks": ["FORWARD"] }), &p), + ( + false, + json!({ "set": "test", "hooks": ["unknown_hook"] }), + &p, + ), + // timeout // + (true, json!({ "set": "test", "timeout": "1m" }), &p), + (true, json!({ "set": "test", "timeout": "3 days" }), &p), + // bad + (false, json!({ "set": "test", "timeout": "3 dayz"}), &p), + (false, json!({ "set": "test", "timeout": 12 }), &p), + // target // + // anything is fine too + (true, json!({ "set": "test", "target": "drop" }), &p), + (true, json!({ "set": "test", "target": "accept" }), &p), + (true, json!({ "set": "test", "target": "return" }), &p), + (true, json!({ "set": "test", "target": "continue" }), &p), + // bad + (false, json!({ "set": "test", "target": "custom" }), &p), + (false, json!({ "set": "test", "target": "DROP" }), &p), + (false, json!({ "set": "test", "target": 11 }), &p), + (false, json!({ "set": "test", "target": ["DROP"] }), &p), + ] { + let res = Plugin::default() + .load_config( + vec![], + vec![ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action".into(), + action_type: "nftables".into(), + config: conf.clone().into(), + patterns: patterns.clone(), + }], + ) + .await; + + assert!( + res.is_ok() == is_ok, + "conf: {:?}, must be ok: {is_ok}, result: {:?}", + conf, + // empty Result::Ok because ActionImpl is not Debug + res.map(|_| ()) + ); + } +} + +// TODO +#[tokio::test] +async fn conf_action_merge() { + let mut plugin = Plugin::default(); + + let set1 = ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action1".into(), + action_type: "nftables".into(), + config: json!({ + "set": "test", + "target": "drop", + "hooks": ["input"], + "action": "add", + }) + .into(), + patterns: vec!["ip".into()], + }; + + let set2 = ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action2".into(), + action_type: "nftables".into(), + config: json!({ + "set": "test", + "target": "drop", + "version": "ip", + "action": "add", + }) + .into(), + patterns: vec!["ip".into()], + }; + + let set3 = ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action2".into(), + action_type: "nftables".into(), + config: json!({ + "set": "test", + "action": "delete", + }) + .into(), + patterns: vec!["ip".into()], + }; + + let res = plugin + .load_config( + vec![], + vec![ + // First set + set1.clone(), + // Same set, adding options, no conflict + set2.clone(), + // Same set, no new options, no conflict + set3.clone(), + // Unrelated set, so no conflict + ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action3".into(), + action_type: "nftables".into(), + config: json!({ + "set": "test2", + "target": "return", + "version": "ipv6", + }) + .into(), + patterns: vec!["ip".into()], + }, + ], + ) + .await; + + assert!(res.is_ok(), "res: {:?}", res.map(|_| ())); + + // Another set with conflict is not ok + let res = plugin + .load_config( + vec![], + vec![ + // First set + set1, + // Same set, adding options, no conflict + set2, + // Same set, no new options, no conflict + set3, + // Another set with conflict + ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action3".into(), + action_type: "nftables".into(), + config: json!({ + "set": "test", + "target": "target2", + "action": "del", + }) + .into(), + patterns: vec!["ip".into()], + }, + ], + ) + .await; + assert!(res.is_err(), "res: {:?}", res.map(|_| ())); +} diff --git a/plugins/reaction-plugin/Cargo.toml b/plugins/reaction-plugin/Cargo.toml index 08d1eb6..0fa3a38 100644 --- a/plugins/reaction-plugin/Cargo.toml +++ b/plugins/reaction-plugin/Cargo.toml @@ -11,7 +11,6 @@ categories = ["security"] description = "Plugin interface for reaction, a daemon that scans logs and takes action (alternative to fail2ban)" [dependencies] -chrono.workspace = true remoc.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/release.py b/release.py index 5fa870c..8359c7a 100644 --- a/release.py +++ b/release.py @@ -102,9 +102,8 @@ def main(): You'll need to install minisign to check the authenticity of the package. -After installing reaction, create your configuration file at -`/etc/reaction.json`, `/etc/reaction.jsonnet` or `/etc/reaction.yml`. -You can also provide a directory containing multiple configuration files in the previous formats. +After installing reaction, create your configuration file(s) in JSON, YAML or JSONnet in the +`/etc/reaction/` directory. See for documentation. Reload systemd: @@ -114,8 +113,8 @@ $ sudo systemctl daemon-reload Then enable and start reaction with this command ```bash -# replace `reaction.jsonnet` with the name of your configuration file in /etc/ -$ sudo systemctl enable --now reaction@reaction.jsonnet.service +# write first your configuration file(s) in /etc/reaction/ +$ sudo systemctl enable --now reaction.service ``` """.strip(), ] diff --git a/shell.nix b/shell.nix index 27dac77..ecb4318 100644 --- a/shell.nix +++ b/shell.nix @@ -4,6 +4,7 @@ pkgs.mkShell { name = "libipset"; buildInputs = [ ipset + nftables clang ]; src = null; diff --git a/src/concepts/plugin.rs b/src/concepts/plugin.rs index c5bc330..5e22287 100644 --- a/src/concepts/plugin.rs +++ b/src/concepts/plugin.rs @@ -1,5 +1,7 @@ use std::{collections::BTreeMap, io::Error, path, process::Stdio}; +#[cfg(target_os = "macos")] +use std::os::darwin::fs::MetadataExt; #[cfg(target_os = "freebsd")] use std::os::freebsd::fs::MetadataExt; #[cfg(target_os = "illumos")] @@ -170,6 +172,8 @@ impl Plugin { let mut command = Command::new("run0"); // --pipe gives direct, non-emulated stdio access, for better performance. command.arg("--pipe"); + // run the command inside the same slice as reaction + command.arg("--slice-inherit"); // Make path absolute for systemd let full_workdir = path::absolute(&plugin_working_directory)?; diff --git a/src/concepts/stream.rs b/src/concepts/stream.rs index 3b3fdf5..97e6ece 100644 --- a/src/concepts/stream.rs +++ b/src/concepts/stream.rs @@ -1,6 +1,7 @@ use std::{cmp::Ordering, collections::BTreeMap, hash::Hash}; use reaction_plugin::StreamConfig; +use regex::RegexSet; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -19,6 +20,11 @@ pub struct Stream { #[serde(skip)] pub name: String, + #[serde(skip)] + pub compiled_regex_set: RegexSet, + #[serde(skip)] + pub regex_index_to_filter_name: Vec, + // Plugin-specific #[serde(default, rename = "type", skip_serializing_if = "Option::is_none")] pub stream_type: Option, @@ -90,6 +96,21 @@ impl Stream { filter.setup(name, key, patterns)?; } + let all_regexes: BTreeMap<_, _> = self + .filters + .values() + .flat_map(|filter| { + filter + .regex + .iter() + .map(|regex| (regex, filter.name.clone())) + }) + .collect(); + + self.compiled_regex_set = RegexSet::new(all_regexes.keys()) + .map_err(|err| format!("too much regexes on the filters of this stream: {err}"))?; + self.regex_index_to_filter_name = all_regexes.into_values().collect(); + Ok(()) } diff --git a/src/daemon/plugin/mod.rs b/src/daemon/plugin/mod.rs index 52ad0e8..c32e283 100644 --- a/src/daemon/plugin/mod.rs +++ b/src/daemon/plugin/mod.rs @@ -1,5 +1,6 @@ use std::{ collections::{BTreeMap, BTreeSet}, + fmt::Display, io, ops::{Deref, DerefMut}, process::ExitStatus, @@ -52,7 +53,7 @@ impl PluginManager { let mut child = plugin .launch(state_directory) .await - .map_err(|err| format!("could not launch plugin: {err}"))?; + .map_err(|err| systemd_error(plugin, "could not launch plugin", err))?; { let stderr = child.stderr.take().unwrap(); @@ -70,10 +71,7 @@ impl PluginManager { ) = Connect::io(remoc::Cfg::default(), stdout, stdin) .await .map_err(|err| { - format!( - "could not init communication with plugin {}: {err}", - plugin.name - ) + systemd_error(plugin, "could not init communication with plugin", err) })?; tokio::spawn(conn); @@ -166,6 +164,20 @@ impl PluginManager { } } +fn systemd_error(plugin: &Plugin, message: &str, err: impl Display) -> String { + if plugin.systemd { + format!( + "{message}: {err}. \ +`plugins.{0}.systemd` is set to true, so this may be an issue with systemd's run0. \ +please make sure `sudo run0 ls /` returns the same thing as `sudo ls /` as a test. \ +if run0 can't be found or doesn't output anything, set `plugins.{0}.systemd` to false.", + plugin.name, + ) + } else { + format!("{message}: {err}") + } +} + async fn handle_stderr(stderr: ChildStderr, plugin_name: String) { // read lines until shutdown let lines = reader_to_stream(stderr); diff --git a/src/daemon/stream.rs b/src/daemon/stream.rs index 7d50b54..3ed0d56 100644 --- a/src/daemon/stream.rs +++ b/src/daemon/stream.rs @@ -1,11 +1,10 @@ use std::{ - collections::{BTreeMap, BTreeSet, HashMap}, + collections::{BTreeSet, HashMap}, process::Stdio, }; use futures::{FutureExt, Stream as AsyncStream, StreamExt, future::join_all}; use reaction_plugin::{StreamImpl, shutdown::ShutdownToken}; -use regex::RegexSet; use tokio::{ io::{AsyncBufReadExt, BufReader}, process::{Child, ChildStderr, ChildStdout, Command}, @@ -45,7 +44,6 @@ pub fn reader_to_stream( } pub struct StreamManager { - compiled_regex_set: RegexSet, regex_index_to_filter_manager: Vec, stream: &'static Stream, stream_plugin: Option, @@ -59,16 +57,6 @@ impl StreamManager { shutdown: ShutdownToken, plugins: &mut Plugins, ) -> Result { - let all_regexes: BTreeMap<_, _> = filter_managers - .iter() - .flat_map(|(filter, filter_manager)| { - filter - .regex - .iter() - .map(|regex| (regex, filter_manager.clone())) - }) - .collect(); - let stream_plugin = if stream.is_plugin() { Some( plugins @@ -84,11 +72,23 @@ impl StreamManager { None }; + let regex_index_to_filter_manager = stream + .regex_index_to_filter_name + .iter() + .map(|filter_name| { + filter_managers + .iter() + .find(|(filter, _)| filter_name == &filter.name) + .unwrap() + .1 + .clone() + }) + .collect(); + debug!("successfully initialized stream {}", stream.name); Ok(StreamManager { - compiled_regex_set: RegexSet::new(all_regexes.keys()).map_err(|err| err.to_string())?, - regex_index_to_filter_manager: all_regexes.into_values().collect(), + regex_index_to_filter_manager, stream, stream_plugin, shutdown, @@ -230,7 +230,7 @@ impl StreamManager { } fn matching_filters(&self, line: &str) -> BTreeSet<&FilterManager> { - let matches = self.compiled_regex_set.matches(line); + let matches = self.stream.compiled_regex_set.matches(line); matches .into_iter() .map(|match_| &self.regex_index_to_filter_manager[match_]) diff --git a/tests/notif.jsonnet b/tests/notif.jsonnet index 2e46471..14d76c0 100644 --- a/tests/notif.jsonnet +++ b/tests/notif.jsonnet @@ -41,6 +41,23 @@ }, }, }, + f2: { + regex: [ + "^can't found $", + ], + retry: 2, + retryperiod: '60s', + actions: { + damn: { + cmd: ['notify-send', 'you should not see that', 'ban '], + }, + undamn: { + cmd: ['notify-send', 'you should not see that', 'unban '], + after: '3s', + onexit: true, + }, + }, + }, }, }, },