Compare commits

...

19 commits

Author SHA1 Message Date
ppom
8a34a1fa11
Remove useless gitlab ci file 2026-03-13 12:00:00 +01:00
ppom
3ca54c6c43
ipset: Better error handling and messages
- Clearer messages.
- Make sure logs are showed in order.
- When cleaning after an error on startup,
  do not try to undo an action that failed.
2026-03-02 12:00:00 +01:00
ppom
16692731f0
Remove useless chrono dependency from reaction-plugin 2026-03-02 12:00:00 +01:00
ppom
938a366576
More useful error message when plugin can't launch and systemd=true 2026-03-01 12:00:00 +01:00
ppom
5a6c203c01
Add system-reaction.slice 2026-02-27 12:00:00 +01:00
ppom
f2b1accec0
Fix slice-inherit option 2026-02-26 12:00:00 +01:00
ppom
00725ed9e2
notif test: add a filter that shouldn't match 2026-02-26 12:00:00 +01:00
ppom
ea0e7177d9
nftables: Fix bad action advertised 2026-02-26 12:00:00 +01:00
ppom
c41c89101d
Fix #151: Move RegexSet creation from StreamManager to config Stream
This move the potential error of a too big regex set to the config setup,
a place where it can be gracefully handled, instead of the place it was,
where this would make reaction mess up with start/stop, etc.
2026-02-26 12:00:00 +01:00
ppom
3d7e647ef7
Adapt tests to nftables configuration 2026-02-25 12:00:00 +01:00
ppom
5b6cc35deb
nftables: Fix compilation errors and actually use libnftables 2026-02-25 12:00:00 +01:00
ppom
0cd765251a
run plugins in the same slice as reaction
And reaction should be started in system-reaction.slice.
The plugins could then be grouped together with the daemon
2026-02-20 12:00:00 +01:00
ppom
26cf3a96e7
First draft of an nftables plugin
Not compiling yet but I'm getting there.
Must be careful on the unsafe, C-wrapping code!
2026-02-20 12:00:00 +01:00
ppom
285954f7cd
Remove outdated FIXME 2026-02-18 12:00:00 +01:00
ppom
dc51d7d432
Add support for macOS 2026-02-17 12:00:00 +01:00
ppom
488dc6c66f
Update release instructions 2026-02-15 12:00:00 +01:00
ppom
88c99fff0f
Fix install instructions 2026-02-12 12:00:00 +01:00
ppom
645d72ac1e
.gitignore cleanup 2026-02-12 12:00:00 +01:00
ppom
a7e958f248
Update ARCHITECTURE.md 2026-02-12 12:00:00 +01:00
24 changed files with 1303 additions and 119 deletions

11
.gitignore vendored
View file

@ -1,22 +1,15 @@
/reaction
reaction*.db
reaction*.db.old
reaction.db
reaction.db.old
/data
/lmdb
reaction*.export.json
/reaction*.sock
/result
/wiki
/deb
*.deb
*.minisig
*.qcow2
debian-packaging/*
*.swp
export-go-db/export-go-db
import-rust-db/target
/target
reaction-plugin/target
/local
.ccls-cache
.direnv

View file

@ -1,15 +0,0 @@
---
image: golang:1.20-bookworm
stages:
- build
variables:
DEBIAN_FRONTEND: noninteractive
test_building:
stage: build
before_script:
- apt-get -qq -y update
- apt-get -qq -y install build-essential devscripts debhelper quilt wget
script:
- make reaction ip46tables nft46

View file

@ -6,6 +6,7 @@ Here is a high-level overview of the codebase.
## Build
- `bench/`: Configuration that spawns a very high load on reaction. Useful to test performance improvements and regressions.
- `build.rs`: permits to create shell completions and man pages on build.
- `Cargo.toml`, `Cargo.lock`: manifest and dependencies.
- `config/`: example / test configuration files. Look at its git history to discover more.
@ -15,8 +16,7 @@ Here is a high-level overview of the codebase.
## Main source code
- `helpers_c/`: C helpers. I wish to have special IP support in reaction and get rid of them. See #79 and #116.
- `tests/`: Integration tests. For now they test basic reaction runtime behavior, persistance, and client-daemon communication.
- `tests/`: Integration tests. They test reaction runtime behavior, persistance, client-daemon communication, plugin integrations.
- `src/`: The source code, here we go!
### Top-level files
@ -25,18 +25,13 @@ Here is a high-level overview of the codebase.
- `src/lib.rs`: Second main entrypoint
- `src/cli.rs`: Command-line arguments
- `src/tests.rs`: Test utilities
- `src/protocol.rs`: de/serialization and client/daemon protocol messages.
### `src/concepts/`
reaction really is about its configuration, which is at the center of the code.
There is one file for each of its concepts: configuration, streams, filters, actions, patterns.
### `src/protocol/`
Low-level serialization/deserialization and client-daemon protocol messages.
Shared by the client and daemon's socket. Also used by daemon's database.
There is one file for each of its concepts: configuration, streams, filters, actions, patterns, plugins.
### `src/client/`
@ -58,9 +53,9 @@ This code has async code, to handle input streams and communication with clients
- `mod.rs`: High-level logic
- `state.rs`: Inner state operations
- `socket.rs`: The socket task, responsible for communication with clients.
- `shutdown.rs`: Logic for passing shutdown signal across all tasks
- `plugin.rs`: Plugin startup, configuration loading and cleanup.
### `src/tree`
### `crates/treedb`
Persistence layer.
@ -68,5 +63,19 @@ This is a database highly adapted to reaction workload, making reaction faster t
(heed, sled and fjall crates have been tested).
Its design is explained in the comments of its files:
- `mod.rs`: main database code, with its two API structs: Tree and Database.
- `raw.rs` low-level part, directly interacting with de/serializisation and files.
- `lib.rs`: main database code, with its two API structs: Tree and Database.
- `raw.rs`: low-level part, directly interacting with de/serializisation and files.
- `time.rs`: time definitions shared with reaction.
- `helpers.rs`: utilities to ease db deserialization from disk.
### `plugins/reaction-plugin`
Shared plugin interface between reaction daemon and its plugins.
Also defines some shared logic between them:
- `shutdown.rs`: Logic for passing shutdown signal across all tasks
- `parse_duration.rs` Duration parsing
### `plugins/reaction-plugin-*`
All core plugins.

106
Cargo.lock generated
View file

@ -1980,6 +1980,15 @@ dependencies = [
"windows-link",
]
[[package]]
name = "libnftables1-sys"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b290d0d41f0ad578660aeed371bcae4cf85f129a6fe31350dbd2e097518cd7f"
dependencies = [
"bindgen",
]
[[package]]
name = "linux-raw-sys"
version = "0.11.0"
@ -2248,6 +2257,22 @@ dependencies = [
"wmi",
]
[[package]]
name = "nftables"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c57e7343eed9e9330e084eef12651b15be3c8ed7825915a0ffa33736b852bed"
dependencies = [
"schemars",
"serde",
"serde_json",
"serde_path_to_error",
"strum",
"strum_macros",
"thiserror 2.0.18",
"tokio",
]
[[package]]
name = "nix"
version = "0.29.0"
@ -2861,7 +2886,6 @@ dependencies = [
name = "reaction-plugin"
version = "1.0.0"
dependencies = [
"chrono",
"remoc",
"serde",
"serde_json",
@ -2899,6 +2923,19 @@ dependencies = [
"tokio",
]
[[package]]
name = "reaction-plugin-nftables"
version = "0.1.0"
dependencies = [
"libnftables1-sys",
"nftables",
"reaction-plugin",
"remoc",
"serde",
"serde_json",
"tokio",
]
[[package]]
name = "reaction-plugin-virtual"
version = "1.0.0"
@ -2919,6 +2956,26 @@ dependencies = [
"bitflags",
]
[[package]]
name = "ref-cast"
version = "1.0.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f354300ae66f76f1c85c5f84693f0ce81d747e2c3f21a45fef496d89c960bf7d"
dependencies = [
"ref-cast-impl",
]
[[package]]
name = "ref-cast-impl"
version = "1.0.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
]
[[package]]
name = "regex"
version = "1.12.2"
@ -3194,6 +3251,31 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "schemars"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc"
dependencies = [
"dyn-clone",
"ref-cast",
"schemars_derive",
"serde",
"serde_json",
]
[[package]]
name = "schemars_derive"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d115b50f4aaeea07e79c1912f645c7513d81715d0420f8bc77a18c6260b307f"
dependencies = [
"proc-macro2",
"quote",
"serde_derive_internals",
"syn 2.0.114",
]
[[package]]
name = "scoped-tls"
version = "1.0.1"
@ -3287,6 +3369,17 @@ dependencies = [
"syn 2.0.114",
]
[[package]]
name = "serde_derive_internals"
version = "0.29.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
]
[[package]]
name = "serde_json"
version = "1.0.149"
@ -3300,6 +3393,17 @@ dependencies = [
"zmij",
]
[[package]]
name = "serde_path_to_error"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457"
dependencies = [
"itoa",
"serde",
"serde_core",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"

View file

@ -29,6 +29,8 @@ assets = [
[ "target/release/reaction.bash", "/usr/share/bash-completion/completions/reaction", "644" ],
[ "target/release/reaction.fish", "/usr/share/fish/completions/", "644" ],
[ "target/release/_reaction", "/usr/share/zsh/vendor-completions/", "644" ],
# Slice
[ "packaging/system-reaction.slice", "/usr/lib/systemd/system/", "644" ],
]
[dependencies]
@ -83,6 +85,7 @@ members = [
"plugins/reaction-plugin",
"plugins/reaction-plugin-cluster",
"plugins/reaction-plugin-ipset",
"plugins/reaction-plugin-nftables",
"plugins/reaction-plugin-virtual"
]

View file

@ -1,6 +1,6 @@
# vim: ft=systemd
[Unit]
Description=A daemon that scans program outputs for repeated patterns, and takes action.
Description=reaction daemon
Documentation=https://reaction.ppom.me
# Ensure reaction will insert its chain after docker has inserted theirs. Only useful when iptables & docker are used
# After=docker.service
@ -17,6 +17,8 @@ RuntimeDirectory=reaction
WorkingDirectory=/var/lib/reaction
# Let reaction kill its child processes first
KillMode=mixed
# Put reaction in its own slice so that plugins can be grouped within.
Slice=system-reaction.slice
[Install]
WantedBy=multi-user.target

View file

@ -0,0 +1 @@
[Slice]

View file

@ -1,4 +1,4 @@
use std::{fmt::Debug, u32};
use std::{fmt::Debug, u32, usize};
use reaction_plugin::{Exec, shutdown::ShutdownToken, time::parse_duration};
use remoc::rch::mpsc as remocMpsc;
@ -39,9 +39,6 @@ pub enum AddDel {
Del,
}
// FIXME block configs that have different set options for the same name
// treat default values as none?
/// User-facing action options
#[derive(Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
@ -176,7 +173,7 @@ impl Set {
}
}
pub async fn init(&self, ipset: &mut IpSet) -> Result<(), String> {
pub async fn init(&self, ipset: &mut IpSet) -> Result<(), (usize, String)> {
for (set, version) in [
(&self.sets.ipv4, Version::IPv4),
(&self.sets.ipv6, Version::IPv6),
@ -189,44 +186,42 @@ impl Set {
version,
timeout: self.timeout,
}))
.await?;
.await
.map_err(|err| (0, err.to_string()))?;
// insert set in chains
for chain in &self.chains {
for (i, chain) in self.chains.iter().enumerate() {
ipset
.order(Order::InsertSet(SetChain {
set: set.clone(),
chain: chain.clone(),
target: self.target.clone(),
}))
.await?;
.await
.map_err(|err| (i + 1, err.to_string()))?;
}
}
}
Ok(())
}
pub async fn destroy(&self, ipset: &mut IpSet) {
for (set, version) in [
(&self.sets.ipv4, Version::IPv4),
(&self.sets.ipv6, Version::IPv6),
] {
pub async fn destroy(&self, ipset: &mut IpSet, until: Option<usize>) {
for set in [&self.sets.ipv4, &self.sets.ipv6] {
if let Some(set) = set {
for chain in &self.chains {
if let Err(err) = ipset
for chain in self
.chains
.iter()
.take(until.map(|until| until - 1).unwrap_or(usize::MAX))
{
let _ = ipset
.order(Order::RemoveSet(SetChain {
set: set.clone(),
chain: chain.clone(),
target: self.target.clone(),
}))
.await
{
eprintln!(
"ERROR while removing {version} set {set} from chain {chain}: {err}"
);
}
.await;
}
if let Err(err) = ipset.order(Order::DestroySet(set.clone())).await {
eprintln!("ERROR while destroying {version} set {set}: {err}");
if until.is_none_or(|until| until != 0) {
let _ = ipset.order(Order::DestroySet(set.clone())).await;
}
}
}

View file

@ -72,7 +72,7 @@ impl IpSet {
pub enum IpSetError {
Thread(String),
IpSet(String),
IpSet(()),
}
impl Display for IpSetError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@ -81,7 +81,7 @@ impl Display for IpSetError {
"{}",
match self {
IpSetError::Thread(err) => err,
IpSetError::IpSet(err) => err,
IpSetError::IpSet(()) => "ipset error",
}
)
}
@ -90,12 +90,12 @@ impl From<IpSetError> for String {
fn from(value: IpSetError) -> Self {
match value {
IpSetError::Thread(err) => err,
IpSetError::IpSet(err) => err,
IpSetError::IpSet(()) => "ipset error".to_string(),
}
}
}
pub type OrderType = (Order, oneshot::Sender<Result<(), String>>);
pub type OrderType = (Order, oneshot::Sender<Result<(), ()>>);
struct Set {
session: Session<HashNet>,
@ -121,7 +121,7 @@ impl IPsetManager {
}
}
fn handle_order(&mut self, order: Order) -> Result<(), String> {
fn handle_order(&mut self, order: Order) -> Result<(), ()> {
match order {
Order::CreateSet(CreateSet {
name,
@ -139,7 +139,7 @@ impl IPsetManager {
};
builder.with_ipv6(version == Version::IPv6)?.build()
})
.map_err(|err| format!("Could not create set {name}: {err}"))?;
.map_err(|err| eprintln!("ERROR Could not create set {name}: {err}"))?;
self.sessions.insert(name, Set { session, version });
}
@ -149,7 +149,7 @@ impl IPsetManager {
session
.session
.destroy()
.map_err(|err| format!("Could not destroy set {set}: {err}"))?;
.map_err(|err| eprintln!("ERROR Could not destroy set {set}: {err}"))?;
}
}
@ -162,9 +162,13 @@ impl IPsetManager {
Ok(())
}
fn insert_remove_ip(&mut self, set: String, ip: String, insert: bool) -> Result<(), String> {
fn insert_remove_ip(&mut self, set: String, ip: String, insert: bool) -> Result<(), ()> {
self._insert_remove_ip(set, ip, insert)
.map_err(|err| eprintln!("ERROR {err}"))
}
fn _insert_remove_ip(&mut self, set: String, ip: String, insert: bool) -> Result<(), String> {
let session = self.sessions.get_mut(&set).ok_or(format!(
"No set handled by us with this name: {set}. This likely is a bug."
"No set handled by this plugin with this name: {set}. This likely is a bug."
))?;
let mut net_data = NetDataType::new(Ipv4Addr::LOCALHOST, 0);
@ -182,24 +186,28 @@ impl IPsetManager {
Ok(())
}
fn insert_remove_set(&self, options: SetChain, insert: bool) -> Result<(), String> {
let SetChain {
set,
chain,
target: action,
} = options;
fn insert_remove_set(&self, options: SetChain, insert: bool) -> Result<(), ()> {
self._insert_remove_set(options, insert)
.map_err(|err| eprintln!("ERROR {err}"))
}
fn _insert_remove_set(&self, options: SetChain, insert: bool) -> Result<(), String> {
let SetChain { set, chain, target } = options;
let version = self
.sessions
.get(&set)
.ok_or(format!("No set managed by us with this name: {set}"))?
.ok_or(format!(
"No set managed by this plugin with this name: {set}"
))?
.version;
if insert {
eprintln!("INFO inserting {version} set {set} in chain {chain}");
let (verb, verbing, from) = if insert {
("insert", "inserting", "in")
} else {
eprintln!("INFO removing {version} set {set} from chain {chain}");
}
("remove", "removing", "from")
};
eprintln!("INFO {verbing} {version} set {set} {from} chain {chain}");
let command = match version {
Version::IPv4 => "iptables",
@ -217,20 +225,20 @@ impl IPsetManager {
&set,
"src",
"-j",
&action,
&target,
])
.spawn()
.map_err(|err| format!("Could not insert ipset {set} in chain {chain}: {err}"))?;
.map_err(|err| format!("Could not {verb} ipset {set} {from} chain {chain}: Could not execute {command}: {err}"))?;
let exit = child
.wait()
.map_err(|err| format!("Could not insert ipset: {err}"))?;
.map_err(|err| format!("Could not {verb} ipset {set} {from} chain {chain}: {err}"))?;
if exit.success() {
Ok(())
} else {
Err(format!(
"Could not insert ipset: exit code {}",
"Could not {verb} ipset: exit code {}",
exit.code()
.map(|c| c.to_string())
.unwrap_or_else(|| "<unknown>".to_string())

View file

@ -109,15 +109,21 @@ impl PluginInfo for Plugin {
let mut first_error = None;
for (i, set) in self.sets.iter().enumerate() {
// Retain if error
if let Err(err) = set.init(&mut self.ipset).await {
first_error = Some((i, RemoteError::Plugin(err)));
if let Err((failed_step, err)) = set.init(&mut self.ipset).await {
first_error = Some((i, failed_step, RemoteError::Plugin(err)));
break;
}
}
// Destroy initialized sets if error
if let Some((i, err)) = first_error {
for set in self.sets.iter().take(i + 1) {
let _ = set.destroy(&mut self.ipset).await;
if let Some((last_set, failed_step, err)) = first_error {
eprintln!("DEBUG last_set: {last_set} failed_step: {failed_step} err: {err}");
for (curr_set, set) in self.sets.iter().enumerate().take(last_set + 1) {
let until = if last_set == curr_set {
Some(failed_step)
} else {
None
};
let _ = set.destroy(&mut self.ipset, until).await;
}
return Err(err);
}
@ -148,6 +154,6 @@ impl PluginInfo for Plugin {
async fn destroy_sets_at_shutdown(mut ipset: IpSet, sets: Vec<Set>, shutdown: ShutdownToken) {
shutdown.wait().await;
for set in sets {
set.destroy(&mut ipset).await;
set.destroy(&mut ipset, None).await;
}
}

View file

@ -0,0 +1,13 @@
[package]
name = "reaction-plugin-nftables"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { workspace = true, features = ["rt-multi-thread"] }
remoc.workspace = true
reaction-plugin.path = "../reaction-plugin"
serde.workspace = true
serde_json.workspace = true
nftables = { version = "0.6.3", features = ["tokio"] }
libnftables1-sys = { version = "0.1.1" }

View file

@ -0,0 +1,493 @@
use std::{
borrow::Cow,
collections::HashSet,
fmt::{Debug, Display},
u32,
};
use nftables::{
batch::Batch,
expr::Expression,
schema::{Element, NfListObject, Rule, SetFlag, SetType, SetTypeValue},
stmt::Statement,
types::{NfFamily, NfHook},
};
use reaction_plugin::{Exec, shutdown::ShutdownToken, time::parse_duration};
use remoc::rch::mpsc as remocMpsc;
use serde::{Deserialize, Serialize};
use crate::{helpers::Version, nft::NftClient};
#[derive(Default, Serialize, Deserialize, PartialEq, Eq, Clone, Copy)]
pub enum IpVersion {
#[default]
#[serde(rename = "ip")]
Ip,
#[serde(rename = "ipv4")]
Ipv4,
#[serde(rename = "ipv6")]
Ipv6,
}
impl Debug for IpVersion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
IpVersion::Ipv4 => "ipv4",
IpVersion::Ipv6 => "ipv6",
IpVersion::Ip => "ip",
}
)
}
}
#[derive(Default, Debug, Serialize, Deserialize)]
pub enum AddDel {
#[default]
#[serde(alias = "add")]
Add,
#[serde(alias = "delete")]
Delete,
}
/// User-facing action options
#[derive(Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ActionOptions {
/// The set that should be used by this action
pub set: String,
/// The pattern name of the IP.
/// Defaults to "ip"
#[serde(default = "serde_ip")]
pub pattern: String,
#[serde(skip)]
ip_index: usize,
// Whether the action is to "add" or "del" the ip from the set
#[serde(default)]
action: AddDel,
#[serde(flatten)]
pub set_options: SetOptions,
}
fn serde_ip() -> String {
"ip".into()
}
impl ActionOptions {
pub fn set_ip_index(&mut self, patterns: Vec<String>) -> Result<(), ()> {
self.ip_index = patterns
.into_iter()
.enumerate()
.filter(|(_, name)| name == &self.pattern)
.next()
.ok_or(())?
.0;
Ok(())
}
}
/// Merged set options
#[derive(Default, Clone, Deserialize, Serialize, Debug, PartialEq, Eq)]
pub struct SetOptions {
/// The IP type.
/// Defaults to `46`.
/// If `ipv4`: creates an IPv4 set with this name
/// If `ipv6`: creates an IPv6 set with this name
/// If `ip`: creates an IPv4 set with its name suffixed by 'v4' AND an IPv6 set with its name suffixed by 'v6'
/// *Merged set-wise*.
#[serde(default)]
version: Option<IpVersion>,
/// Chains where the IP set should be inserted.
/// Defaults to `["input", "forward"]`
/// *Merged set-wise*.
#[serde(default)]
hooks: Option<Vec<RHook>>,
// Optional timeout, letting linux/netfilter handle set removal instead of reaction
// Note that `reaction show` and `reaction flush` won't work if set instead of an `after` action
// Same syntax as after and retryperiod in reaction.
/// *Merged set-wise*.
#[serde(skip_serializing_if = "Option::is_none")]
timeout: Option<String>,
#[serde(skip)]
timeout_u32: Option<u32>,
// Target that iptables should use when the IP is encountered.
// Defaults to DROP, but can also be ACCEPT, RETURN or any user-defined chain
/// *Merged set-wise*.
#[serde(default)]
target: Option<RStatement>,
}
impl SetOptions {
pub fn merge(&mut self, options: &SetOptions) -> Result<(), String> {
// merge two Option<T> and fail if there is conflict
fn inner_merge<T: Eq + Clone + std::fmt::Debug>(
a: &mut Option<T>,
b: &Option<T>,
name: &str,
) -> Result<(), String> {
match (&a, &b) {
(Some(aa), Some(bb)) => {
if aa != bb {
return Err(format!(
"Conflicting options for {name}: `{aa:?}` and `{bb:?}`"
));
}
}
(None, Some(_)) => {
*a = b.clone();
}
_ => (),
};
Ok(())
}
inner_merge(&mut self.version, &options.version, "version")?;
inner_merge(&mut self.timeout, &options.timeout, "timeout")?;
inner_merge(&mut self.hooks, &options.hooks, "chains")?;
inner_merge(&mut self.target, &options.target, "target")?;
if let Some(timeout) = &self.timeout {
let duration = parse_duration(timeout)
.map_err(|err| format!("failed to parse timeout: {}", err))?
.as_secs();
if duration > u32::MAX as u64 {
return Err(format!(
"timeout is limited to {} seconds (approx {} days)",
u32::MAX,
49_000
));
}
self.timeout_u32 = Some(duration as u32);
}
Ok(())
}
}
#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum RHook {
Ingress,
Prerouting,
Forward,
Input,
Output,
Postrouting,
Egress,
}
impl RHook {
pub fn as_str(&self) -> &'static str {
match self {
RHook::Ingress => "ingress",
RHook::Prerouting => "prerouting",
RHook::Forward => "forward",
RHook::Input => "input",
RHook::Output => "output",
RHook::Postrouting => "postrouting",
RHook::Egress => "egress",
}
}
}
impl Display for RHook {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
impl From<&RHook> for NfHook {
fn from(value: &RHook) -> Self {
match value {
RHook::Ingress => Self::Ingress,
RHook::Prerouting => Self::Prerouting,
RHook::Forward => Self::Forward,
RHook::Input => Self::Input,
RHook::Output => Self::Output,
RHook::Postrouting => Self::Postrouting,
RHook::Egress => Self::Egress,
}
}
}
#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum RStatement {
Accept,
Drop,
Continue,
Return,
}
pub struct Set {
pub sets: SetNames,
pub hooks: Vec<RHook>,
pub timeout: Option<u32>,
pub target: RStatement,
}
impl Set {
pub fn from(name: String, options: SetOptions) -> Self {
Self {
sets: SetNames::new(name, options.version),
timeout: options.timeout_u32,
target: options.target.unwrap_or(RStatement::Drop),
hooks: options.hooks.unwrap_or(vec![RHook::Input, RHook::Forward]),
}
}
pub fn init<'a>(&self, batch: &mut Batch<'a>) -> Result<(), String> {
for (set, version) in [
(&self.sets.ipv4, Version::IPv4),
(&self.sets.ipv6, Version::IPv6),
] {
if let Some(set) = set {
let family = NfFamily::INet;
let table = Cow::from("reaction");
// create set
batch.add(NfListObject::<'a>::Set(Box::new(nftables::schema::Set::<
'a,
> {
family,
table: table.to_owned(),
name: Cow::Owned(set.to_owned()),
// TODO Try a set which is both ipv4 and ipv6?
set_type: SetTypeValue::Single(match version {
Version::IPv4 => SetType::Ipv4Addr,
Version::IPv6 => SetType::Ipv6Addr,
}),
flags: Some({
let mut flags = HashSet::from([SetFlag::Interval]);
if self.timeout.is_some() {
flags.insert(SetFlag::Timeout);
}
flags
}),
timeout: self.timeout.clone(),
..Default::default()
})));
// insert set in chains
let expr = vec![match self.target {
RStatement::Accept => Statement::Accept(None),
RStatement::Drop => Statement::Drop(None),
RStatement::Continue => Statement::Continue(None),
RStatement::Return => Statement::Return(None),
}];
for hook in &self.hooks {
batch.add(NfListObject::Rule(Rule {
family,
table: table.to_owned(),
chain: Cow::from(hook.to_string()),
expr: Cow::Owned(expr.clone()),
..Default::default()
}));
}
}
}
Ok(())
}
}
pub struct SetNames {
pub ipv4: Option<String>,
pub ipv6: Option<String>,
}
impl SetNames {
pub fn new(name: String, version: Option<IpVersion>) -> Self {
Self {
ipv4: match version {
Some(IpVersion::Ipv4) => Some(name.clone()),
Some(IpVersion::Ipv6) => None,
None | Some(IpVersion::Ip) => Some(format!("{}v4", name)),
},
ipv6: match version {
Some(IpVersion::Ipv4) => None,
Some(IpVersion::Ipv6) => Some(name),
None | Some(IpVersion::Ip) => Some(format!("{}v6", name)),
},
}
}
}
pub struct Action {
nft: NftClient,
rx: remocMpsc::Receiver<Exec>,
shutdown: ShutdownToken,
sets: SetNames,
// index of pattern ip in match vec
ip_index: usize,
action: AddDel,
}
impl Action {
pub fn new(
nft: NftClient,
shutdown: ShutdownToken,
rx: remocMpsc::Receiver<Exec>,
options: ActionOptions,
) -> Result<Self, String> {
Ok(Action {
nft,
rx,
shutdown,
sets: SetNames::new(options.set, options.set_options.version),
ip_index: options.ip_index,
action: options.action,
})
}
pub async fn serve(mut self) {
loop {
let event = tokio::select! {
exec = self.rx.recv() => Some(exec),
_ = self.shutdown.wait() => None,
};
match event {
// shutdown asked
None => break,
// channel closed
Some(Ok(None)) => break,
// error from channel
Some(Err(err)) => {
eprintln!("ERROR {err}");
break;
}
// ok
Some(Ok(Some(exec))) => {
if let Err(err) = self.handle_exec(exec).await {
eprintln!("ERROR {err}");
break;
}
}
}
}
// eprintln!("DEBUG Asking for shutdown");
// self.shutdown.ask_shutdown();
}
async fn handle_exec(&mut self, mut exec: Exec) -> Result<(), String> {
// safeguard against Vec::remove's panic
if exec.match_.len() <= self.ip_index {
return Err(format!(
"match received from reaction is smaller than expected. looking for index {} but size is {}. this is a bug!",
self.ip_index,
exec.match_.len()
));
}
let ip = exec.match_.remove(self.ip_index);
// select set
let set = match (&self.sets.ipv4, &self.sets.ipv6) {
(None, None) => return Err(format!("action is neither IPv4 nor IPv6, this is a bug!")),
(None, Some(set)) => set,
(Some(set), None) => set,
(Some(set4), Some(set6)) => {
if ip.contains(':') {
set6
} else {
set4
}
}
};
// add/remove ip to set
let element = NfListObject::Element(Element {
family: NfFamily::INet,
table: Cow::from("reaction"),
name: Cow::from(set),
elem: Cow::from(vec![Expression::String(Cow::from(ip.clone()))]),
});
let mut batch = Batch::new();
match self.action {
AddDel::Add => batch.add(element),
AddDel::Delete => batch.delete(element),
};
match self.nft.send(batch).await {
Ok(ok) => {
eprintln!("DEBUG action ok {:?} {ip}: {ok}", self.action);
Ok(())
}
Err(err) => Err(format!("action ko {:?} {ip}: {err}", self.action)),
}
}
}
#[cfg(test)]
mod tests {
use crate::action::{IpVersion, RHook, RStatement, SetOptions};
#[tokio::test]
async fn set_options_merge() {
let s1 = SetOptions {
version: None,
hooks: None,
timeout: None,
timeout_u32: None,
target: None,
};
let s2 = SetOptions {
version: Some(IpVersion::Ipv4),
hooks: Some(vec![RHook::Input]),
timeout: Some("3h".into()),
timeout_u32: Some(3 * 3600),
target: Some(RStatement::Drop),
};
assert_ne!(s1, s2);
assert_eq!(s1, SetOptions::default());
{
// s2 can be merged in s1
let mut s1 = s1.clone();
assert!(s1.merge(&s2).is_ok());
assert_eq!(s1, s2);
}
{
// s1 can be merged in s2
let mut s2 = s2.clone();
assert!(s2.merge(&s1).is_ok());
}
{
// s1 can be merged in itself
let mut s3 = s1.clone();
assert!(s3.merge(&s1).is_ok());
assert_eq!(s1, s3);
}
{
// s2 can be merged in itself
let mut s3 = s2.clone();
assert!(s3.merge(&s2).is_ok());
assert_eq!(s2, s3);
}
for s3 in [
SetOptions {
version: Some(IpVersion::Ipv6),
..Default::default()
},
SetOptions {
hooks: Some(vec![RHook::Output]),
..Default::default()
},
SetOptions {
timeout: Some("30min".into()),
..Default::default()
},
SetOptions {
target: Some(RStatement::Continue),
..Default::default()
},
] {
// none with some is ok
assert!(s3.clone().merge(&s1).is_ok(), "s3: {s3:?}");
assert!(s1.clone().merge(&s3).is_ok(), "s3: {s3:?}");
// different some is ko
assert!(s3.clone().merge(&s2).is_err(), "s3: {s3:?}");
assert!(s2.clone().merge(&s3).is_err(), "s3: {s3:?}");
}
}
}

View file

@ -0,0 +1,15 @@
use std::fmt::Display;
#[derive(PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
pub enum Version {
IPv4,
IPv6,
}
impl Display for Version {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
Version::IPv4 => "IPv4",
Version::IPv6 => "IPv6",
})
}
}

View file

@ -0,0 +1,176 @@
use std::{
borrow::Cow,
collections::{BTreeMap, BTreeSet},
};
use nftables::{
batch::Batch,
schema::{Chain, NfListObject, Table},
types::{NfChainType, NfFamily},
};
use reaction_plugin::{
ActionConfig, ActionImpl, Hello, Manifest, PluginInfo, RemoteResult, StreamConfig, StreamImpl,
shutdown::ShutdownController,
};
use remoc::rtc;
use crate::{
action::{Action, ActionOptions, Set, SetOptions},
nft::NftClient,
};
#[cfg(test)]
mod tests;
mod action;
pub mod helpers;
mod nft;
#[tokio::main]
async fn main() {
let plugin = Plugin::default();
reaction_plugin::main_loop(plugin).await;
}
#[derive(Default)]
struct Plugin {
nft: NftClient,
sets: Vec<Set>,
actions: Vec<Action>,
shutdown: ShutdownController,
}
impl PluginInfo for Plugin {
async fn manifest(&mut self) -> Result<Manifest, rtc::CallError> {
Ok(Manifest {
hello: Hello::new(),
streams: BTreeSet::default(),
actions: BTreeSet::from(["nftables".into()]),
})
}
async fn load_config(
&mut self,
streams: Vec<StreamConfig>,
actions: Vec<ActionConfig>,
) -> RemoteResult<(Vec<StreamImpl>, Vec<ActionImpl>)> {
if !streams.is_empty() {
return Err("This plugin can't handle any stream type".into());
}
let mut ret_actions = Vec::with_capacity(actions.len());
let mut set_options: BTreeMap<String, SetOptions> = BTreeMap::new();
for ActionConfig {
stream_name,
filter_name,
action_name,
action_type,
config,
patterns,
} in actions
{
if &action_type != "nftables" {
return Err("This plugin can't handle other action types than nftables".into());
}
let mut options: ActionOptions = serde_json::from_value(config.into()).map_err(|err| {
format!("invalid options for action {stream_name}.{filter_name}.{action_name}: {err}")
})?;
options.set_ip_index(patterns).map_err(|_|
format!(
"No pattern with name {} in filter {stream_name}.{filter_name}. Try setting the option `pattern` to your pattern name of type 'ip'",
&options.pattern
)
)?;
// Merge option
set_options
.entry(options.set.clone())
.or_default()
.merge(&options.set_options)
.map_err(|err| format!("set {}: {err}", options.set))?;
let (tx, rx) = remoc::rch::mpsc::channel(1);
self.actions.push(Action::new(
self.nft.clone(),
self.shutdown.token(),
rx,
options,
)?);
ret_actions.push(ActionImpl { tx });
}
// Init all sets
while let Some((name, options)) = set_options.pop_first() {
self.sets.push(Set::from(name, options));
}
Ok((vec![], ret_actions))
}
async fn start(&mut self) -> RemoteResult<()> {
self.shutdown.delegate().handle_quit_signals()?;
let mut batch = Batch::new();
batch.add(reaction_table());
// Create a chain for each registered netfilter hook
for hook in self
.sets
.iter()
.flat_map(|set| &set.hooks)
.collect::<BTreeSet<_>>()
{
batch.add(NfListObject::Chain(Chain {
family: NfFamily::INet,
table: Cow::Borrowed("reaction"),
name: Cow::from(hook.as_str()),
_type: Some(NfChainType::Filter),
hook: Some(hook.into()),
prio: Some(0),
..Default::default()
}));
}
for set in &self.sets {
set.init(&mut batch)?;
}
// TODO apply batch
self.nft.send(batch).await?;
// Launch a task that will destroy the table on shutdown
{
let token = self.shutdown.token();
tokio::spawn(async move {
token.wait().await;
Batch::new().delete(reaction_table());
});
}
// Launch all actions
while let Some(action) = self.actions.pop() {
tokio::spawn(async move { action.serve().await });
}
self.actions = Default::default();
Ok(())
}
async fn close(self) -> RemoteResult<()> {
self.shutdown.ask_shutdown();
self.shutdown.wait_all_task_shutdown().await;
Ok(())
}
}
fn reaction_table() -> NfListObject<'static> {
NfListObject::Table(Table {
family: NfFamily::INet,
name: Cow::Borrowed("reaction"),
handle: None,
})
}

View file

@ -0,0 +1,81 @@
use std::{
ffi::{CStr, CString},
thread,
};
use libnftables1_sys::Nftables;
use nftables::batch::Batch;
use tokio::sync::{mpsc, oneshot};
/// A client with a dedicated server thread to libnftables.
/// Calling [`Default::default()`] spawns a new server thread.
/// Cloning just creates a new client to the same server thread.
#[derive(Clone)]
pub struct NftClient {
tx: mpsc::Sender<NftCommand>,
}
impl Default for NftClient {
fn default() -> Self {
let (tx, mut rx) = mpsc::channel(10);
thread::spawn(move || {
let mut conn = Nftables::new();
while let Some(NftCommand { json, ret }) = rx.blocking_recv() {
let (rc, output, error) = conn.run_cmd(json.as_ptr());
let res = match rc {
0 => to_rust_string(output)
.ok_or_else(|| "unknown ok (rc = 0 but no output buffer)".into()),
_ => to_rust_string(error)
.map(|err| format!("error (rc = {rc}: {err})"))
.ok_or_else(|| format!("unknown error (rc = {rc} but no error buffer)")),
};
let _ = ret.send(res);
}
});
NftClient { tx }
}
}
impl NftClient {
/// Send a batch to nftables.
pub async fn send(&self, batch: Batch<'_>) -> Result<String, String> {
// convert JSON to CString
let mut json = serde_json::to_vec(&batch.to_nftables())
.map_err(|err| format!("couldn't build json to send to nftables: {err}"))?;
json.push('\0' as u8);
let json = CString::from_vec_with_nul(json)
.map_err(|err| format!("invalid json with null char: {err}"))?;
// Send command
let (tx, rx) = oneshot::channel();
let command = NftCommand { json, ret: tx };
self.tx
.send(command)
.await
.map_err(|err| format!("nftables thread has quit, can't send command: {err}"))?;
// Wait for result
rx.await
.map_err(|_| format!("nftables thread has quit, no response for command"))?
}
}
struct NftCommand {
json: CString,
ret: oneshot::Sender<Result<String, String>>,
}
fn to_rust_string(c_ptr: *const i8) -> Option<String> {
if c_ptr.is_null() {
None
} else {
Some(
unsafe { CStr::from_ptr(c_ptr) }
.to_string_lossy()
.into_owned(),
)
}
}

View file

@ -0,0 +1,247 @@
use reaction_plugin::{ActionConfig, PluginInfo, StreamConfig, Value};
use serde_json::json;
use crate::Plugin;
#[tokio::test]
async fn conf_stream() {
// No stream is supported by nftables
assert!(
Plugin::default()
.load_config(
vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "nftables".into(),
config: Value::Null
}],
vec![]
)
.await
.is_err()
);
// Empty config is ok
assert!(Plugin::default().load_config(vec![], vec![]).await.is_ok());
}
#[tokio::test]
async fn conf_action_standalone() {
let p = vec!["name".into(), "ip".into(), "ip2".into()];
let p_noip = vec!["name".into(), "ip2".into()];
for (is_ok, conf, patterns) in [
// minimal set
(true, json!({ "set": "test" }), &p),
// missing set key
(false, json!({}), &p),
(false, json!({ "version": "ipv4" }), &p),
// unknown key
(false, json!({ "set": "test", "unknown": "yes" }), &p),
(false, json!({ "set": "test", "ip_index": 1 }), &p),
(false, json!({ "set": "test", "timeout_u32": 1 }), &p),
// pattern //
(true, json!({ "set": "test" }), &p),
(true, json!({ "set": "test", "pattern": "ip" }), &p),
(true, json!({ "set": "test", "pattern": "ip2" }), &p),
(true, json!({ "set": "test", "pattern": "ip2" }), &p_noip),
// unknown pattern "ip"
(false, json!({ "set": "test" }), &p_noip),
(false, json!({ "set": "test", "pattern": "ip" }), &p_noip),
// unknown pattern
(false, json!({ "set": "test", "pattern": "unknown" }), &p),
(false, json!({ "set": "test", "pattern": "uwu" }), &p_noip),
// bad type
(false, json!({ "set": "test", "pattern": 0 }), &p_noip),
(false, json!({ "set": "test", "pattern": true }), &p_noip),
// action //
(true, json!({ "set": "test", "action": "add" }), &p),
(true, json!({ "set": "test", "action": "delete" }), &p),
// unknown action
(false, json!({ "set": "test", "action": "create" }), &p),
(false, json!({ "set": "test", "action": "insert" }), &p),
(false, json!({ "set": "test", "action": "del" }), &p),
(false, json!({ "set": "test", "action": "destroy" }), &p),
// bad type
(false, json!({ "set": "test", "action": true }), &p),
(false, json!({ "set": "test", "action": 1 }), &p),
// ip version //
// ok
(true, json!({ "set": "test", "version": "ipv4" }), &p),
(true, json!({ "set": "test", "version": "ipv6" }), &p),
(true, json!({ "set": "test", "version": "ip" }), &p),
// unknown version
(false, json!({ "set": "test", "version": 4 }), &p),
(false, json!({ "set": "test", "version": 6 }), &p),
(false, json!({ "set": "test", "version": 46 }), &p),
(false, json!({ "set": "test", "version": "5" }), &p),
(false, json!({ "set": "test", "version": "ipv5" }), &p),
(false, json!({ "set": "test", "version": "4" }), &p),
(false, json!({ "set": "test", "version": "6" }), &p),
(false, json!({ "set": "test", "version": "46" }), &p),
// bad type
(false, json!({ "set": "test", "version": true }), &p),
// hooks //
// everything is fine really
(true, json!({ "set": "test", "hooks": [] }), &p),
(
true,
json!({ "set": "test", "hooks": ["input", "forward", "ingress", "prerouting", "output", "postrouting", "egress"] }),
&p,
),
(false, json!({ "set": "test", "hooks": ["INPUT"] }), &p),
(false, json!({ "set": "test", "hooks": ["FORWARD"] }), &p),
(
false,
json!({ "set": "test", "hooks": ["unknown_hook"] }),
&p,
),
// timeout //
(true, json!({ "set": "test", "timeout": "1m" }), &p),
(true, json!({ "set": "test", "timeout": "3 days" }), &p),
// bad
(false, json!({ "set": "test", "timeout": "3 dayz"}), &p),
(false, json!({ "set": "test", "timeout": 12 }), &p),
// target //
// anything is fine too
(true, json!({ "set": "test", "target": "drop" }), &p),
(true, json!({ "set": "test", "target": "accept" }), &p),
(true, json!({ "set": "test", "target": "return" }), &p),
(true, json!({ "set": "test", "target": "continue" }), &p),
// bad
(false, json!({ "set": "test", "target": "custom" }), &p),
(false, json!({ "set": "test", "target": "DROP" }), &p),
(false, json!({ "set": "test", "target": 11 }), &p),
(false, json!({ "set": "test", "target": ["DROP"] }), &p),
] {
let res = Plugin::default()
.load_config(
vec![],
vec![ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action".into(),
action_type: "nftables".into(),
config: conf.clone().into(),
patterns: patterns.clone(),
}],
)
.await;
assert!(
res.is_ok() == is_ok,
"conf: {:?}, must be ok: {is_ok}, result: {:?}",
conf,
// empty Result::Ok because ActionImpl is not Debug
res.map(|_| ())
);
}
}
// TODO
#[tokio::test]
async fn conf_action_merge() {
let mut plugin = Plugin::default();
let set1 = ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action1".into(),
action_type: "nftables".into(),
config: json!({
"set": "test",
"target": "drop",
"hooks": ["input"],
"action": "add",
})
.into(),
patterns: vec!["ip".into()],
};
let set2 = ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action2".into(),
action_type: "nftables".into(),
config: json!({
"set": "test",
"target": "drop",
"version": "ip",
"action": "add",
})
.into(),
patterns: vec!["ip".into()],
};
let set3 = ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action2".into(),
action_type: "nftables".into(),
config: json!({
"set": "test",
"action": "delete",
})
.into(),
patterns: vec!["ip".into()],
};
let res = plugin
.load_config(
vec![],
vec![
// First set
set1.clone(),
// Same set, adding options, no conflict
set2.clone(),
// Same set, no new options, no conflict
set3.clone(),
// Unrelated set, so no conflict
ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action3".into(),
action_type: "nftables".into(),
config: json!({
"set": "test2",
"target": "return",
"version": "ipv6",
})
.into(),
patterns: vec!["ip".into()],
},
],
)
.await;
assert!(res.is_ok(), "res: {:?}", res.map(|_| ()));
// Another set with conflict is not ok
let res = plugin
.load_config(
vec![],
vec![
// First set
set1,
// Same set, adding options, no conflict
set2,
// Same set, no new options, no conflict
set3,
// Another set with conflict
ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action3".into(),
action_type: "nftables".into(),
config: json!({
"set": "test",
"target": "target2",
"action": "del",
})
.into(),
patterns: vec!["ip".into()],
},
],
)
.await;
assert!(res.is_err(), "res: {:?}", res.map(|_| ()));
}

View file

@ -11,7 +11,6 @@ categories = ["security"]
description = "Plugin interface for reaction, a daemon that scans logs and takes action (alternative to fail2ban)"
[dependencies]
chrono.workspace = true
remoc.workspace = true
serde.workspace = true
serde_json.workspace = true

View file

@ -102,9 +102,8 @@ def main():
You'll need to install minisign to check the authenticity of the package.
After installing reaction, create your configuration file at
`/etc/reaction.json`, `/etc/reaction.jsonnet` or `/etc/reaction.yml`.
You can also provide a directory containing multiple configuration files in the previous formats.
After installing reaction, create your configuration file(s) in JSON, YAML or JSONnet in the
`/etc/reaction/` directory.
See <https://reaction.ppom.me> for documentation.
Reload systemd:
@ -114,8 +113,8 @@ $ sudo systemctl daemon-reload
Then enable and start reaction with this command
```bash
# replace `reaction.jsonnet` with the name of your configuration file in /etc/
$ sudo systemctl enable --now reaction@reaction.jsonnet.service
# write first your configuration file(s) in /etc/reaction/
$ sudo systemctl enable --now reaction.service
```
""".strip(),
]

View file

@ -4,6 +4,7 @@ pkgs.mkShell {
name = "libipset";
buildInputs = [
ipset
nftables
clang
];
src = null;

View file

@ -1,5 +1,7 @@
use std::{collections::BTreeMap, io::Error, path, process::Stdio};
#[cfg(target_os = "macos")]
use std::os::darwin::fs::MetadataExt;
#[cfg(target_os = "freebsd")]
use std::os::freebsd::fs::MetadataExt;
#[cfg(target_os = "illumos")]
@ -170,6 +172,8 @@ impl Plugin {
let mut command = Command::new("run0");
// --pipe gives direct, non-emulated stdio access, for better performance.
command.arg("--pipe");
// run the command inside the same slice as reaction
command.arg("--slice-inherit");
// Make path absolute for systemd
let full_workdir = path::absolute(&plugin_working_directory)?;

View file

@ -1,6 +1,7 @@
use std::{cmp::Ordering, collections::BTreeMap, hash::Hash};
use reaction_plugin::StreamConfig;
use regex::RegexSet;
use serde::{Deserialize, Serialize};
use serde_json::Value;
@ -19,6 +20,11 @@ pub struct Stream {
#[serde(skip)]
pub name: String,
#[serde(skip)]
pub compiled_regex_set: RegexSet,
#[serde(skip)]
pub regex_index_to_filter_name: Vec<String>,
// Plugin-specific
#[serde(default, rename = "type", skip_serializing_if = "Option::is_none")]
pub stream_type: Option<String>,
@ -90,6 +96,21 @@ impl Stream {
filter.setup(name, key, patterns)?;
}
let all_regexes: BTreeMap<_, _> = self
.filters
.values()
.flat_map(|filter| {
filter
.regex
.iter()
.map(|regex| (regex, filter.name.clone()))
})
.collect();
self.compiled_regex_set = RegexSet::new(all_regexes.keys())
.map_err(|err| format!("too much regexes on the filters of this stream: {err}"))?;
self.regex_index_to_filter_name = all_regexes.into_values().collect();
Ok(())
}

View file

@ -1,5 +1,6 @@
use std::{
collections::{BTreeMap, BTreeSet},
fmt::Display,
io,
ops::{Deref, DerefMut},
process::ExitStatus,
@ -52,7 +53,7 @@ impl PluginManager {
let mut child = plugin
.launch(state_directory)
.await
.map_err(|err| format!("could not launch plugin: {err}"))?;
.map_err(|err| systemd_error(plugin, "could not launch plugin", err))?;
{
let stderr = child.stderr.take().unwrap();
@ -70,10 +71,7 @@ impl PluginManager {
) = Connect::io(remoc::Cfg::default(), stdout, stdin)
.await
.map_err(|err| {
format!(
"could not init communication with plugin {}: {err}",
plugin.name
)
systemd_error(plugin, "could not init communication with plugin", err)
})?;
tokio::spawn(conn);
@ -166,6 +164,20 @@ impl PluginManager {
}
}
fn systemd_error(plugin: &Plugin, message: &str, err: impl Display) -> String {
if plugin.systemd {
format!(
"{message}: {err}. \
`plugins.{0}.systemd` is set to true, so this may be an issue with systemd's run0. \
please make sure `sudo run0 ls /` returns the same thing as `sudo ls /` as a test. \
if run0 can't be found or doesn't output anything, set `plugins.{0}.systemd` to false.",
plugin.name,
)
} else {
format!("{message}: {err}")
}
}
async fn handle_stderr(stderr: ChildStderr, plugin_name: String) {
// read lines until shutdown
let lines = reader_to_stream(stderr);

View file

@ -1,11 +1,10 @@
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
collections::{BTreeSet, HashMap},
process::Stdio,
};
use futures::{FutureExt, Stream as AsyncStream, StreamExt, future::join_all};
use reaction_plugin::{StreamImpl, shutdown::ShutdownToken};
use regex::RegexSet;
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{Child, ChildStderr, ChildStdout, Command},
@ -45,7 +44,6 @@ pub fn reader_to_stream(
}
pub struct StreamManager {
compiled_regex_set: RegexSet,
regex_index_to_filter_manager: Vec<FilterManager>,
stream: &'static Stream,
stream_plugin: Option<StreamImpl>,
@ -59,16 +57,6 @@ impl StreamManager {
shutdown: ShutdownToken,
plugins: &mut Plugins,
) -> Result<Self, String> {
let all_regexes: BTreeMap<_, _> = filter_managers
.iter()
.flat_map(|(filter, filter_manager)| {
filter
.regex
.iter()
.map(|regex| (regex, filter_manager.clone()))
})
.collect();
let stream_plugin = if stream.is_plugin() {
Some(
plugins
@ -84,11 +72,23 @@ impl StreamManager {
None
};
let regex_index_to_filter_manager = stream
.regex_index_to_filter_name
.iter()
.map(|filter_name| {
filter_managers
.iter()
.find(|(filter, _)| filter_name == &filter.name)
.unwrap()
.1
.clone()
})
.collect();
debug!("successfully initialized stream {}", stream.name);
Ok(StreamManager {
compiled_regex_set: RegexSet::new(all_regexes.keys()).map_err(|err| err.to_string())?,
regex_index_to_filter_manager: all_regexes.into_values().collect(),
regex_index_to_filter_manager,
stream,
stream_plugin,
shutdown,
@ -230,7 +230,7 @@ impl StreamManager {
}
fn matching_filters(&self, line: &str) -> BTreeSet<&FilterManager> {
let matches = self.compiled_regex_set.matches(line);
let matches = self.stream.compiled_regex_set.matches(line);
matches
.into_iter()
.map(|match_| &self.regex_index_to_filter_manager[match_])

View file

@ -41,6 +41,23 @@
},
},
},
f2: {
regex: [
"^can't found <num>$",
],
retry: 2,
retryperiod: '60s',
actions: {
damn: {
cmd: ['notify-send', 'you should not see that', 'ban <num>'],
},
undamn: {
cmd: ['notify-send', 'you should not see that', 'unban <num>'],
after: '3s',
onexit: true,
},
},
},
},
},
},