mirror of
https://framagit.org/ppom/reaction
synced 2026-03-14 12:45:47 +01:00
Compare commits
19 commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8a34a1fa11 |
||
|
|
3ca54c6c43 |
||
|
|
16692731f0 |
||
|
|
938a366576 |
||
|
|
5a6c203c01 |
||
|
|
f2b1accec0 |
||
|
|
00725ed9e2 |
||
|
|
ea0e7177d9 |
||
|
|
c41c89101d |
||
|
|
3d7e647ef7 |
||
|
|
5b6cc35deb |
||
|
|
0cd765251a |
||
|
|
26cf3a96e7 |
||
|
|
285954f7cd |
||
|
|
dc51d7d432 |
||
|
|
488dc6c66f |
||
|
|
88c99fff0f |
||
|
|
645d72ac1e |
||
|
|
a7e958f248 |
24 changed files with 1303 additions and 119 deletions
11
.gitignore
vendored
11
.gitignore
vendored
|
|
@ -1,22 +1,15 @@
|
|||
/reaction
|
||||
reaction*.db
|
||||
reaction*.db.old
|
||||
reaction.db
|
||||
reaction.db.old
|
||||
/data
|
||||
/lmdb
|
||||
reaction*.export.json
|
||||
/reaction*.sock
|
||||
/result
|
||||
/wiki
|
||||
/deb
|
||||
*.deb
|
||||
*.minisig
|
||||
*.qcow2
|
||||
debian-packaging/*
|
||||
*.swp
|
||||
export-go-db/export-go-db
|
||||
import-rust-db/target
|
||||
/target
|
||||
reaction-plugin/target
|
||||
/local
|
||||
.ccls-cache
|
||||
.direnv
|
||||
|
|
|
|||
|
|
@ -1,15 +0,0 @@
|
|||
---
|
||||
image: golang:1.20-bookworm
|
||||
stages:
|
||||
- build
|
||||
|
||||
variables:
|
||||
DEBIAN_FRONTEND: noninteractive
|
||||
|
||||
test_building:
|
||||
stage: build
|
||||
before_script:
|
||||
- apt-get -qq -y update
|
||||
- apt-get -qq -y install build-essential devscripts debhelper quilt wget
|
||||
script:
|
||||
- make reaction ip46tables nft46
|
||||
|
|
@ -6,6 +6,7 @@ Here is a high-level overview of the codebase.
|
|||
|
||||
## Build
|
||||
|
||||
- `bench/`: Configuration that spawns a very high load on reaction. Useful to test performance improvements and regressions.
|
||||
- `build.rs`: permits to create shell completions and man pages on build.
|
||||
- `Cargo.toml`, `Cargo.lock`: manifest and dependencies.
|
||||
- `config/`: example / test configuration files. Look at its git history to discover more.
|
||||
|
|
@ -15,8 +16,7 @@ Here is a high-level overview of the codebase.
|
|||
|
||||
## Main source code
|
||||
|
||||
- `helpers_c/`: C helpers. I wish to have special IP support in reaction and get rid of them. See #79 and #116.
|
||||
- `tests/`: Integration tests. For now they test basic reaction runtime behavior, persistance, and client-daemon communication.
|
||||
- `tests/`: Integration tests. They test reaction runtime behavior, persistance, client-daemon communication, plugin integrations.
|
||||
- `src/`: The source code, here we go!
|
||||
|
||||
### Top-level files
|
||||
|
|
@ -25,18 +25,13 @@ Here is a high-level overview of the codebase.
|
|||
- `src/lib.rs`: Second main entrypoint
|
||||
- `src/cli.rs`: Command-line arguments
|
||||
- `src/tests.rs`: Test utilities
|
||||
- `src/protocol.rs`: de/serialization and client/daemon protocol messages.
|
||||
|
||||
### `src/concepts/`
|
||||
|
||||
reaction really is about its configuration, which is at the center of the code.
|
||||
|
||||
There is one file for each of its concepts: configuration, streams, filters, actions, patterns.
|
||||
|
||||
### `src/protocol/`
|
||||
|
||||
Low-level serialization/deserialization and client-daemon protocol messages.
|
||||
|
||||
Shared by the client and daemon's socket. Also used by daemon's database.
|
||||
There is one file for each of its concepts: configuration, streams, filters, actions, patterns, plugins.
|
||||
|
||||
### `src/client/`
|
||||
|
||||
|
|
@ -58,9 +53,9 @@ This code has async code, to handle input streams and communication with clients
|
|||
- `mod.rs`: High-level logic
|
||||
- `state.rs`: Inner state operations
|
||||
- `socket.rs`: The socket task, responsible for communication with clients.
|
||||
- `shutdown.rs`: Logic for passing shutdown signal across all tasks
|
||||
- `plugin.rs`: Plugin startup, configuration loading and cleanup.
|
||||
|
||||
### `src/tree`
|
||||
### `crates/treedb`
|
||||
|
||||
Persistence layer.
|
||||
|
||||
|
|
@ -68,5 +63,19 @@ This is a database highly adapted to reaction workload, making reaction faster t
|
|||
(heed, sled and fjall crates have been tested).
|
||||
Its design is explained in the comments of its files:
|
||||
|
||||
- `mod.rs`: main database code, with its two API structs: Tree and Database.
|
||||
- `raw.rs` low-level part, directly interacting with de/serializisation and files.
|
||||
- `lib.rs`: main database code, with its two API structs: Tree and Database.
|
||||
- `raw.rs`: low-level part, directly interacting with de/serializisation and files.
|
||||
- `time.rs`: time definitions shared with reaction.
|
||||
- `helpers.rs`: utilities to ease db deserialization from disk.
|
||||
|
||||
### `plugins/reaction-plugin`
|
||||
|
||||
Shared plugin interface between reaction daemon and its plugins.
|
||||
|
||||
Also defines some shared logic between them:
|
||||
- `shutdown.rs`: Logic for passing shutdown signal across all tasks
|
||||
- `parse_duration.rs` Duration parsing
|
||||
|
||||
### `plugins/reaction-plugin-*`
|
||||
|
||||
All core plugins.
|
||||
|
|
|
|||
106
Cargo.lock
generated
106
Cargo.lock
generated
|
|
@ -1980,6 +1980,15 @@ dependencies = [
|
|||
"windows-link",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libnftables1-sys"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4b290d0d41f0ad578660aeed371bcae4cf85f129a6fe31350dbd2e097518cd7f"
|
||||
dependencies = [
|
||||
"bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "linux-raw-sys"
|
||||
version = "0.11.0"
|
||||
|
|
@ -2248,6 +2257,22 @@ dependencies = [
|
|||
"wmi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nftables"
|
||||
version = "0.6.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3c57e7343eed9e9330e084eef12651b15be3c8ed7825915a0ffa33736b852bed"
|
||||
dependencies = [
|
||||
"schemars",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_path_to_error",
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.29.0"
|
||||
|
|
@ -2861,7 +2886,6 @@ dependencies = [
|
|||
name = "reaction-plugin"
|
||||
version = "1.0.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"remoc",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
|
@ -2899,6 +2923,19 @@ dependencies = [
|
|||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reaction-plugin-nftables"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"libnftables1-sys",
|
||||
"nftables",
|
||||
"reaction-plugin",
|
||||
"remoc",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reaction-plugin-virtual"
|
||||
version = "1.0.0"
|
||||
|
|
@ -2919,6 +2956,26 @@ dependencies = [
|
|||
"bitflags",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ref-cast"
|
||||
version = "1.0.25"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f354300ae66f76f1c85c5f84693f0ce81d747e2c3f21a45fef496d89c960bf7d"
|
||||
dependencies = [
|
||||
"ref-cast-impl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ref-cast-impl"
|
||||
version = "1.0.25"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.114",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex"
|
||||
version = "1.12.2"
|
||||
|
|
@ -3194,6 +3251,31 @@ dependencies = [
|
|||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "schemars"
|
||||
version = "1.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc"
|
||||
dependencies = [
|
||||
"dyn-clone",
|
||||
"ref-cast",
|
||||
"schemars_derive",
|
||||
"serde",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "schemars_derive"
|
||||
version = "1.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7d115b50f4aaeea07e79c1912f645c7513d81715d0420f8bc77a18c6260b307f"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"serde_derive_internals",
|
||||
"syn 2.0.114",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scoped-tls"
|
||||
version = "1.0.1"
|
||||
|
|
@ -3287,6 +3369,17 @@ dependencies = [
|
|||
"syn 2.0.114",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive_internals"
|
||||
version = "0.29.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.114",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_json"
|
||||
version = "1.0.149"
|
||||
|
|
@ -3300,6 +3393,17 @@ dependencies = [
|
|||
"zmij",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_path_to_error"
|
||||
version = "0.1.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"serde",
|
||||
"serde_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_urlencoded"
|
||||
version = "0.7.1"
|
||||
|
|
|
|||
|
|
@ -29,6 +29,8 @@ assets = [
|
|||
[ "target/release/reaction.bash", "/usr/share/bash-completion/completions/reaction", "644" ],
|
||||
[ "target/release/reaction.fish", "/usr/share/fish/completions/", "644" ],
|
||||
[ "target/release/_reaction", "/usr/share/zsh/vendor-completions/", "644" ],
|
||||
# Slice
|
||||
[ "packaging/system-reaction.slice", "/usr/lib/systemd/system/", "644" ],
|
||||
]
|
||||
|
||||
[dependencies]
|
||||
|
|
@ -83,6 +85,7 @@ members = [
|
|||
"plugins/reaction-plugin",
|
||||
"plugins/reaction-plugin-cluster",
|
||||
"plugins/reaction-plugin-ipset",
|
||||
"plugins/reaction-plugin-nftables",
|
||||
"plugins/reaction-plugin-virtual"
|
||||
]
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
# vim: ft=systemd
|
||||
[Unit]
|
||||
Description=A daemon that scans program outputs for repeated patterns, and takes action.
|
||||
Description=reaction daemon
|
||||
Documentation=https://reaction.ppom.me
|
||||
# Ensure reaction will insert its chain after docker has inserted theirs. Only useful when iptables & docker are used
|
||||
# After=docker.service
|
||||
|
|
@ -17,6 +17,8 @@ RuntimeDirectory=reaction
|
|||
WorkingDirectory=/var/lib/reaction
|
||||
# Let reaction kill its child processes first
|
||||
KillMode=mixed
|
||||
# Put reaction in its own slice so that plugins can be grouped within.
|
||||
Slice=system-reaction.slice
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
|
|
|
|||
1
packaging/system-reaction.slice
Normal file
1
packaging/system-reaction.slice
Normal file
|
|
@ -0,0 +1 @@
|
|||
[Slice]
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
use std::{fmt::Debug, u32};
|
||||
use std::{fmt::Debug, u32, usize};
|
||||
|
||||
use reaction_plugin::{Exec, shutdown::ShutdownToken, time::parse_duration};
|
||||
use remoc::rch::mpsc as remocMpsc;
|
||||
|
|
@ -39,9 +39,6 @@ pub enum AddDel {
|
|||
Del,
|
||||
}
|
||||
|
||||
// FIXME block configs that have different set options for the same name
|
||||
// treat default values as none?
|
||||
|
||||
/// User-facing action options
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
|
|
@ -176,7 +173,7 @@ impl Set {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn init(&self, ipset: &mut IpSet) -> Result<(), String> {
|
||||
pub async fn init(&self, ipset: &mut IpSet) -> Result<(), (usize, String)> {
|
||||
for (set, version) in [
|
||||
(&self.sets.ipv4, Version::IPv4),
|
||||
(&self.sets.ipv6, Version::IPv6),
|
||||
|
|
@ -189,44 +186,42 @@ impl Set {
|
|||
version,
|
||||
timeout: self.timeout,
|
||||
}))
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|err| (0, err.to_string()))?;
|
||||
// insert set in chains
|
||||
for chain in &self.chains {
|
||||
for (i, chain) in self.chains.iter().enumerate() {
|
||||
ipset
|
||||
.order(Order::InsertSet(SetChain {
|
||||
set: set.clone(),
|
||||
chain: chain.clone(),
|
||||
target: self.target.clone(),
|
||||
}))
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|err| (i + 1, err.to_string()))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn destroy(&self, ipset: &mut IpSet) {
|
||||
for (set, version) in [
|
||||
(&self.sets.ipv4, Version::IPv4),
|
||||
(&self.sets.ipv6, Version::IPv6),
|
||||
] {
|
||||
pub async fn destroy(&self, ipset: &mut IpSet, until: Option<usize>) {
|
||||
for set in [&self.sets.ipv4, &self.sets.ipv6] {
|
||||
if let Some(set) = set {
|
||||
for chain in &self.chains {
|
||||
if let Err(err) = ipset
|
||||
for chain in self
|
||||
.chains
|
||||
.iter()
|
||||
.take(until.map(|until| until - 1).unwrap_or(usize::MAX))
|
||||
{
|
||||
let _ = ipset
|
||||
.order(Order::RemoveSet(SetChain {
|
||||
set: set.clone(),
|
||||
chain: chain.clone(),
|
||||
target: self.target.clone(),
|
||||
}))
|
||||
.await
|
||||
{
|
||||
eprintln!(
|
||||
"ERROR while removing {version} set {set} from chain {chain}: {err}"
|
||||
);
|
||||
}
|
||||
.await;
|
||||
}
|
||||
if let Err(err) = ipset.order(Order::DestroySet(set.clone())).await {
|
||||
eprintln!("ERROR while destroying {version} set {set}: {err}");
|
||||
if until.is_none_or(|until| until != 0) {
|
||||
let _ = ipset.order(Order::DestroySet(set.clone())).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ impl IpSet {
|
|||
|
||||
pub enum IpSetError {
|
||||
Thread(String),
|
||||
IpSet(String),
|
||||
IpSet(()),
|
||||
}
|
||||
impl Display for IpSetError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
|
|
@ -81,7 +81,7 @@ impl Display for IpSetError {
|
|||
"{}",
|
||||
match self {
|
||||
IpSetError::Thread(err) => err,
|
||||
IpSetError::IpSet(err) => err,
|
||||
IpSetError::IpSet(()) => "ipset error",
|
||||
}
|
||||
)
|
||||
}
|
||||
|
|
@ -90,12 +90,12 @@ impl From<IpSetError> for String {
|
|||
fn from(value: IpSetError) -> Self {
|
||||
match value {
|
||||
IpSetError::Thread(err) => err,
|
||||
IpSetError::IpSet(err) => err,
|
||||
IpSetError::IpSet(()) => "ipset error".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type OrderType = (Order, oneshot::Sender<Result<(), String>>);
|
||||
pub type OrderType = (Order, oneshot::Sender<Result<(), ()>>);
|
||||
|
||||
struct Set {
|
||||
session: Session<HashNet>,
|
||||
|
|
@ -121,7 +121,7 @@ impl IPsetManager {
|
|||
}
|
||||
}
|
||||
|
||||
fn handle_order(&mut self, order: Order) -> Result<(), String> {
|
||||
fn handle_order(&mut self, order: Order) -> Result<(), ()> {
|
||||
match order {
|
||||
Order::CreateSet(CreateSet {
|
||||
name,
|
||||
|
|
@ -139,7 +139,7 @@ impl IPsetManager {
|
|||
};
|
||||
builder.with_ipv6(version == Version::IPv6)?.build()
|
||||
})
|
||||
.map_err(|err| format!("Could not create set {name}: {err}"))?;
|
||||
.map_err(|err| eprintln!("ERROR Could not create set {name}: {err}"))?;
|
||||
|
||||
self.sessions.insert(name, Set { session, version });
|
||||
}
|
||||
|
|
@ -149,7 +149,7 @@ impl IPsetManager {
|
|||
session
|
||||
.session
|
||||
.destroy()
|
||||
.map_err(|err| format!("Could not destroy set {set}: {err}"))?;
|
||||
.map_err(|err| eprintln!("ERROR Could not destroy set {set}: {err}"))?;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -162,9 +162,13 @@ impl IPsetManager {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn insert_remove_ip(&mut self, set: String, ip: String, insert: bool) -> Result<(), String> {
|
||||
fn insert_remove_ip(&mut self, set: String, ip: String, insert: bool) -> Result<(), ()> {
|
||||
self._insert_remove_ip(set, ip, insert)
|
||||
.map_err(|err| eprintln!("ERROR {err}"))
|
||||
}
|
||||
fn _insert_remove_ip(&mut self, set: String, ip: String, insert: bool) -> Result<(), String> {
|
||||
let session = self.sessions.get_mut(&set).ok_or(format!(
|
||||
"No set handled by us with this name: {set}. This likely is a bug."
|
||||
"No set handled by this plugin with this name: {set}. This likely is a bug."
|
||||
))?;
|
||||
|
||||
let mut net_data = NetDataType::new(Ipv4Addr::LOCALHOST, 0);
|
||||
|
|
@ -182,24 +186,28 @@ impl IPsetManager {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn insert_remove_set(&self, options: SetChain, insert: bool) -> Result<(), String> {
|
||||
let SetChain {
|
||||
set,
|
||||
chain,
|
||||
target: action,
|
||||
} = options;
|
||||
fn insert_remove_set(&self, options: SetChain, insert: bool) -> Result<(), ()> {
|
||||
self._insert_remove_set(options, insert)
|
||||
.map_err(|err| eprintln!("ERROR {err}"))
|
||||
}
|
||||
fn _insert_remove_set(&self, options: SetChain, insert: bool) -> Result<(), String> {
|
||||
let SetChain { set, chain, target } = options;
|
||||
|
||||
let version = self
|
||||
.sessions
|
||||
.get(&set)
|
||||
.ok_or(format!("No set managed by us with this name: {set}"))?
|
||||
.ok_or(format!(
|
||||
"No set managed by this plugin with this name: {set}"
|
||||
))?
|
||||
.version;
|
||||
|
||||
if insert {
|
||||
eprintln!("INFO inserting {version} set {set} in chain {chain}");
|
||||
let (verb, verbing, from) = if insert {
|
||||
("insert", "inserting", "in")
|
||||
} else {
|
||||
eprintln!("INFO removing {version} set {set} from chain {chain}");
|
||||
}
|
||||
("remove", "removing", "from")
|
||||
};
|
||||
|
||||
eprintln!("INFO {verbing} {version} set {set} {from} chain {chain}");
|
||||
|
||||
let command = match version {
|
||||
Version::IPv4 => "iptables",
|
||||
|
|
@ -217,20 +225,20 @@ impl IPsetManager {
|
|||
&set,
|
||||
"src",
|
||||
"-j",
|
||||
&action,
|
||||
&target,
|
||||
])
|
||||
.spawn()
|
||||
.map_err(|err| format!("Could not insert ipset {set} in chain {chain}: {err}"))?;
|
||||
.map_err(|err| format!("Could not {verb} ipset {set} {from} chain {chain}: Could not execute {command}: {err}"))?;
|
||||
|
||||
let exit = child
|
||||
.wait()
|
||||
.map_err(|err| format!("Could not insert ipset: {err}"))?;
|
||||
.map_err(|err| format!("Could not {verb} ipset {set} {from} chain {chain}: {err}"))?;
|
||||
|
||||
if exit.success() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(format!(
|
||||
"Could not insert ipset: exit code {}",
|
||||
"Could not {verb} ipset: exit code {}",
|
||||
exit.code()
|
||||
.map(|c| c.to_string())
|
||||
.unwrap_or_else(|| "<unknown>".to_string())
|
||||
|
|
|
|||
|
|
@ -109,15 +109,21 @@ impl PluginInfo for Plugin {
|
|||
let mut first_error = None;
|
||||
for (i, set) in self.sets.iter().enumerate() {
|
||||
// Retain if error
|
||||
if let Err(err) = set.init(&mut self.ipset).await {
|
||||
first_error = Some((i, RemoteError::Plugin(err)));
|
||||
if let Err((failed_step, err)) = set.init(&mut self.ipset).await {
|
||||
first_error = Some((i, failed_step, RemoteError::Plugin(err)));
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Destroy initialized sets if error
|
||||
if let Some((i, err)) = first_error {
|
||||
for set in self.sets.iter().take(i + 1) {
|
||||
let _ = set.destroy(&mut self.ipset).await;
|
||||
if let Some((last_set, failed_step, err)) = first_error {
|
||||
eprintln!("DEBUG last_set: {last_set} failed_step: {failed_step} err: {err}");
|
||||
for (curr_set, set) in self.sets.iter().enumerate().take(last_set + 1) {
|
||||
let until = if last_set == curr_set {
|
||||
Some(failed_step)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let _ = set.destroy(&mut self.ipset, until).await;
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
|
|
@ -148,6 +154,6 @@ impl PluginInfo for Plugin {
|
|||
async fn destroy_sets_at_shutdown(mut ipset: IpSet, sets: Vec<Set>, shutdown: ShutdownToken) {
|
||||
shutdown.wait().await;
|
||||
for set in sets {
|
||||
set.destroy(&mut ipset).await;
|
||||
set.destroy(&mut ipset, None).await;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
13
plugins/reaction-plugin-nftables/Cargo.toml
Normal file
13
plugins/reaction-plugin-nftables/Cargo.toml
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
[package]
|
||||
name = "reaction-plugin-nftables"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
tokio = { workspace = true, features = ["rt-multi-thread"] }
|
||||
remoc.workspace = true
|
||||
reaction-plugin.path = "../reaction-plugin"
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
nftables = { version = "0.6.3", features = ["tokio"] }
|
||||
libnftables1-sys = { version = "0.1.1" }
|
||||
493
plugins/reaction-plugin-nftables/src/action.rs
Normal file
493
plugins/reaction-plugin-nftables/src/action.rs
Normal file
|
|
@ -0,0 +1,493 @@
|
|||
use std::{
|
||||
borrow::Cow,
|
||||
collections::HashSet,
|
||||
fmt::{Debug, Display},
|
||||
u32,
|
||||
};
|
||||
|
||||
use nftables::{
|
||||
batch::Batch,
|
||||
expr::Expression,
|
||||
schema::{Element, NfListObject, Rule, SetFlag, SetType, SetTypeValue},
|
||||
stmt::Statement,
|
||||
types::{NfFamily, NfHook},
|
||||
};
|
||||
use reaction_plugin::{Exec, shutdown::ShutdownToken, time::parse_duration};
|
||||
use remoc::rch::mpsc as remocMpsc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{helpers::Version, nft::NftClient};
|
||||
|
||||
#[derive(Default, Serialize, Deserialize, PartialEq, Eq, Clone, Copy)]
|
||||
pub enum IpVersion {
|
||||
#[default]
|
||||
#[serde(rename = "ip")]
|
||||
Ip,
|
||||
#[serde(rename = "ipv4")]
|
||||
Ipv4,
|
||||
#[serde(rename = "ipv6")]
|
||||
Ipv6,
|
||||
}
|
||||
impl Debug for IpVersion {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{}",
|
||||
match self {
|
||||
IpVersion::Ipv4 => "ipv4",
|
||||
IpVersion::Ipv6 => "ipv6",
|
||||
IpVersion::Ip => "ip",
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Serialize, Deserialize)]
|
||||
pub enum AddDel {
|
||||
#[default]
|
||||
#[serde(alias = "add")]
|
||||
Add,
|
||||
#[serde(alias = "delete")]
|
||||
Delete,
|
||||
}
|
||||
|
||||
/// User-facing action options
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct ActionOptions {
|
||||
/// The set that should be used by this action
|
||||
pub set: String,
|
||||
/// The pattern name of the IP.
|
||||
/// Defaults to "ip"
|
||||
#[serde(default = "serde_ip")]
|
||||
pub pattern: String,
|
||||
#[serde(skip)]
|
||||
ip_index: usize,
|
||||
// Whether the action is to "add" or "del" the ip from the set
|
||||
#[serde(default)]
|
||||
action: AddDel,
|
||||
|
||||
#[serde(flatten)]
|
||||
pub set_options: SetOptions,
|
||||
}
|
||||
|
||||
fn serde_ip() -> String {
|
||||
"ip".into()
|
||||
}
|
||||
|
||||
impl ActionOptions {
|
||||
pub fn set_ip_index(&mut self, patterns: Vec<String>) -> Result<(), ()> {
|
||||
self.ip_index = patterns
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.filter(|(_, name)| name == &self.pattern)
|
||||
.next()
|
||||
.ok_or(())?
|
||||
.0;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Merged set options
|
||||
#[derive(Default, Clone, Deserialize, Serialize, Debug, PartialEq, Eq)]
|
||||
pub struct SetOptions {
|
||||
/// The IP type.
|
||||
/// Defaults to `46`.
|
||||
/// If `ipv4`: creates an IPv4 set with this name
|
||||
/// If `ipv6`: creates an IPv6 set with this name
|
||||
/// If `ip`: creates an IPv4 set with its name suffixed by 'v4' AND an IPv6 set with its name suffixed by 'v6'
|
||||
/// *Merged set-wise*.
|
||||
#[serde(default)]
|
||||
version: Option<IpVersion>,
|
||||
/// Chains where the IP set should be inserted.
|
||||
/// Defaults to `["input", "forward"]`
|
||||
/// *Merged set-wise*.
|
||||
#[serde(default)]
|
||||
hooks: Option<Vec<RHook>>,
|
||||
// Optional timeout, letting linux/netfilter handle set removal instead of reaction
|
||||
// Note that `reaction show` and `reaction flush` won't work if set instead of an `after` action
|
||||
// Same syntax as after and retryperiod in reaction.
|
||||
/// *Merged set-wise*.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
timeout: Option<String>,
|
||||
#[serde(skip)]
|
||||
timeout_u32: Option<u32>,
|
||||
// Target that iptables should use when the IP is encountered.
|
||||
// Defaults to DROP, but can also be ACCEPT, RETURN or any user-defined chain
|
||||
/// *Merged set-wise*.
|
||||
#[serde(default)]
|
||||
target: Option<RStatement>,
|
||||
}
|
||||
|
||||
impl SetOptions {
|
||||
pub fn merge(&mut self, options: &SetOptions) -> Result<(), String> {
|
||||
// merge two Option<T> and fail if there is conflict
|
||||
fn inner_merge<T: Eq + Clone + std::fmt::Debug>(
|
||||
a: &mut Option<T>,
|
||||
b: &Option<T>,
|
||||
name: &str,
|
||||
) -> Result<(), String> {
|
||||
match (&a, &b) {
|
||||
(Some(aa), Some(bb)) => {
|
||||
if aa != bb {
|
||||
return Err(format!(
|
||||
"Conflicting options for {name}: `{aa:?}` and `{bb:?}`"
|
||||
));
|
||||
}
|
||||
}
|
||||
(None, Some(_)) => {
|
||||
*a = b.clone();
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
inner_merge(&mut self.version, &options.version, "version")?;
|
||||
inner_merge(&mut self.timeout, &options.timeout, "timeout")?;
|
||||
inner_merge(&mut self.hooks, &options.hooks, "chains")?;
|
||||
inner_merge(&mut self.target, &options.target, "target")?;
|
||||
|
||||
if let Some(timeout) = &self.timeout {
|
||||
let duration = parse_duration(timeout)
|
||||
.map_err(|err| format!("failed to parse timeout: {}", err))?
|
||||
.as_secs();
|
||||
if duration > u32::MAX as u64 {
|
||||
return Err(format!(
|
||||
"timeout is limited to {} seconds (approx {} days)",
|
||||
u32::MAX,
|
||||
49_000
|
||||
));
|
||||
}
|
||||
self.timeout_u32 = Some(duration as u32);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum RHook {
|
||||
Ingress,
|
||||
Prerouting,
|
||||
Forward,
|
||||
Input,
|
||||
Output,
|
||||
Postrouting,
|
||||
Egress,
|
||||
}
|
||||
|
||||
impl RHook {
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
RHook::Ingress => "ingress",
|
||||
RHook::Prerouting => "prerouting",
|
||||
RHook::Forward => "forward",
|
||||
RHook::Input => "input",
|
||||
RHook::Output => "output",
|
||||
RHook::Postrouting => "postrouting",
|
||||
RHook::Egress => "egress",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for RHook {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&RHook> for NfHook {
|
||||
fn from(value: &RHook) -> Self {
|
||||
match value {
|
||||
RHook::Ingress => Self::Ingress,
|
||||
RHook::Prerouting => Self::Prerouting,
|
||||
RHook::Forward => Self::Forward,
|
||||
RHook::Input => Self::Input,
|
||||
RHook::Output => Self::Output,
|
||||
RHook::Postrouting => Self::Postrouting,
|
||||
RHook::Egress => Self::Egress,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum RStatement {
|
||||
Accept,
|
||||
Drop,
|
||||
Continue,
|
||||
Return,
|
||||
}
|
||||
|
||||
pub struct Set {
|
||||
pub sets: SetNames,
|
||||
pub hooks: Vec<RHook>,
|
||||
pub timeout: Option<u32>,
|
||||
pub target: RStatement,
|
||||
}
|
||||
|
||||
impl Set {
|
||||
pub fn from(name: String, options: SetOptions) -> Self {
|
||||
Self {
|
||||
sets: SetNames::new(name, options.version),
|
||||
timeout: options.timeout_u32,
|
||||
target: options.target.unwrap_or(RStatement::Drop),
|
||||
hooks: options.hooks.unwrap_or(vec![RHook::Input, RHook::Forward]),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init<'a>(&self, batch: &mut Batch<'a>) -> Result<(), String> {
|
||||
for (set, version) in [
|
||||
(&self.sets.ipv4, Version::IPv4),
|
||||
(&self.sets.ipv6, Version::IPv6),
|
||||
] {
|
||||
if let Some(set) = set {
|
||||
let family = NfFamily::INet;
|
||||
let table = Cow::from("reaction");
|
||||
|
||||
// create set
|
||||
batch.add(NfListObject::<'a>::Set(Box::new(nftables::schema::Set::<
|
||||
'a,
|
||||
> {
|
||||
family,
|
||||
table: table.to_owned(),
|
||||
name: Cow::Owned(set.to_owned()),
|
||||
// TODO Try a set which is both ipv4 and ipv6?
|
||||
set_type: SetTypeValue::Single(match version {
|
||||
Version::IPv4 => SetType::Ipv4Addr,
|
||||
Version::IPv6 => SetType::Ipv6Addr,
|
||||
}),
|
||||
flags: Some({
|
||||
let mut flags = HashSet::from([SetFlag::Interval]);
|
||||
if self.timeout.is_some() {
|
||||
flags.insert(SetFlag::Timeout);
|
||||
}
|
||||
flags
|
||||
}),
|
||||
timeout: self.timeout.clone(),
|
||||
..Default::default()
|
||||
})));
|
||||
// insert set in chains
|
||||
let expr = vec![match self.target {
|
||||
RStatement::Accept => Statement::Accept(None),
|
||||
RStatement::Drop => Statement::Drop(None),
|
||||
RStatement::Continue => Statement::Continue(None),
|
||||
RStatement::Return => Statement::Return(None),
|
||||
}];
|
||||
for hook in &self.hooks {
|
||||
batch.add(NfListObject::Rule(Rule {
|
||||
family,
|
||||
table: table.to_owned(),
|
||||
chain: Cow::from(hook.to_string()),
|
||||
expr: Cow::Owned(expr.clone()),
|
||||
..Default::default()
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SetNames {
|
||||
pub ipv4: Option<String>,
|
||||
pub ipv6: Option<String>,
|
||||
}
|
||||
|
||||
impl SetNames {
|
||||
pub fn new(name: String, version: Option<IpVersion>) -> Self {
|
||||
Self {
|
||||
ipv4: match version {
|
||||
Some(IpVersion::Ipv4) => Some(name.clone()),
|
||||
Some(IpVersion::Ipv6) => None,
|
||||
None | Some(IpVersion::Ip) => Some(format!("{}v4", name)),
|
||||
},
|
||||
ipv6: match version {
|
||||
Some(IpVersion::Ipv4) => None,
|
||||
Some(IpVersion::Ipv6) => Some(name),
|
||||
None | Some(IpVersion::Ip) => Some(format!("{}v6", name)),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Action {
|
||||
nft: NftClient,
|
||||
rx: remocMpsc::Receiver<Exec>,
|
||||
shutdown: ShutdownToken,
|
||||
sets: SetNames,
|
||||
// index of pattern ip in match vec
|
||||
ip_index: usize,
|
||||
action: AddDel,
|
||||
}
|
||||
|
||||
impl Action {
|
||||
pub fn new(
|
||||
nft: NftClient,
|
||||
shutdown: ShutdownToken,
|
||||
rx: remocMpsc::Receiver<Exec>,
|
||||
options: ActionOptions,
|
||||
) -> Result<Self, String> {
|
||||
Ok(Action {
|
||||
nft,
|
||||
rx,
|
||||
shutdown,
|
||||
sets: SetNames::new(options.set, options.set_options.version),
|
||||
ip_index: options.ip_index,
|
||||
action: options.action,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn serve(mut self) {
|
||||
loop {
|
||||
let event = tokio::select! {
|
||||
exec = self.rx.recv() => Some(exec),
|
||||
_ = self.shutdown.wait() => None,
|
||||
};
|
||||
match event {
|
||||
// shutdown asked
|
||||
None => break,
|
||||
// channel closed
|
||||
Some(Ok(None)) => break,
|
||||
// error from channel
|
||||
Some(Err(err)) => {
|
||||
eprintln!("ERROR {err}");
|
||||
break;
|
||||
}
|
||||
// ok
|
||||
Some(Ok(Some(exec))) => {
|
||||
if let Err(err) = self.handle_exec(exec).await {
|
||||
eprintln!("ERROR {err}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// eprintln!("DEBUG Asking for shutdown");
|
||||
// self.shutdown.ask_shutdown();
|
||||
}
|
||||
|
||||
async fn handle_exec(&mut self, mut exec: Exec) -> Result<(), String> {
|
||||
// safeguard against Vec::remove's panic
|
||||
if exec.match_.len() <= self.ip_index {
|
||||
return Err(format!(
|
||||
"match received from reaction is smaller than expected. looking for index {} but size is {}. this is a bug!",
|
||||
self.ip_index,
|
||||
exec.match_.len()
|
||||
));
|
||||
}
|
||||
let ip = exec.match_.remove(self.ip_index);
|
||||
// select set
|
||||
let set = match (&self.sets.ipv4, &self.sets.ipv6) {
|
||||
(None, None) => return Err(format!("action is neither IPv4 nor IPv6, this is a bug!")),
|
||||
(None, Some(set)) => set,
|
||||
(Some(set), None) => set,
|
||||
(Some(set4), Some(set6)) => {
|
||||
if ip.contains(':') {
|
||||
set6
|
||||
} else {
|
||||
set4
|
||||
}
|
||||
}
|
||||
};
|
||||
// add/remove ip to set
|
||||
let element = NfListObject::Element(Element {
|
||||
family: NfFamily::INet,
|
||||
table: Cow::from("reaction"),
|
||||
name: Cow::from(set),
|
||||
elem: Cow::from(vec![Expression::String(Cow::from(ip.clone()))]),
|
||||
});
|
||||
let mut batch = Batch::new();
|
||||
match self.action {
|
||||
AddDel::Add => batch.add(element),
|
||||
AddDel::Delete => batch.delete(element),
|
||||
};
|
||||
match self.nft.send(batch).await {
|
||||
Ok(ok) => {
|
||||
eprintln!("DEBUG action ok {:?} {ip}: {ok}", self.action);
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => Err(format!("action ko {:?} {ip}: {err}", self.action)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::action::{IpVersion, RHook, RStatement, SetOptions};
|
||||
|
||||
#[tokio::test]
|
||||
async fn set_options_merge() {
|
||||
let s1 = SetOptions {
|
||||
version: None,
|
||||
hooks: None,
|
||||
timeout: None,
|
||||
timeout_u32: None,
|
||||
target: None,
|
||||
};
|
||||
let s2 = SetOptions {
|
||||
version: Some(IpVersion::Ipv4),
|
||||
hooks: Some(vec![RHook::Input]),
|
||||
timeout: Some("3h".into()),
|
||||
timeout_u32: Some(3 * 3600),
|
||||
target: Some(RStatement::Drop),
|
||||
};
|
||||
assert_ne!(s1, s2);
|
||||
assert_eq!(s1, SetOptions::default());
|
||||
|
||||
{
|
||||
// s2 can be merged in s1
|
||||
let mut s1 = s1.clone();
|
||||
assert!(s1.merge(&s2).is_ok());
|
||||
assert_eq!(s1, s2);
|
||||
}
|
||||
|
||||
{
|
||||
// s1 can be merged in s2
|
||||
let mut s2 = s2.clone();
|
||||
assert!(s2.merge(&s1).is_ok());
|
||||
}
|
||||
|
||||
{
|
||||
// s1 can be merged in itself
|
||||
let mut s3 = s1.clone();
|
||||
assert!(s3.merge(&s1).is_ok());
|
||||
assert_eq!(s1, s3);
|
||||
}
|
||||
|
||||
{
|
||||
// s2 can be merged in itself
|
||||
let mut s3 = s2.clone();
|
||||
assert!(s3.merge(&s2).is_ok());
|
||||
assert_eq!(s2, s3);
|
||||
}
|
||||
|
||||
for s3 in [
|
||||
SetOptions {
|
||||
version: Some(IpVersion::Ipv6),
|
||||
..Default::default()
|
||||
},
|
||||
SetOptions {
|
||||
hooks: Some(vec![RHook::Output]),
|
||||
..Default::default()
|
||||
},
|
||||
SetOptions {
|
||||
timeout: Some("30min".into()),
|
||||
..Default::default()
|
||||
},
|
||||
SetOptions {
|
||||
target: Some(RStatement::Continue),
|
||||
..Default::default()
|
||||
},
|
||||
] {
|
||||
// none with some is ok
|
||||
assert!(s3.clone().merge(&s1).is_ok(), "s3: {s3:?}");
|
||||
assert!(s1.clone().merge(&s3).is_ok(), "s3: {s3:?}");
|
||||
// different some is ko
|
||||
assert!(s3.clone().merge(&s2).is_err(), "s3: {s3:?}");
|
||||
assert!(s2.clone().merge(&s3).is_err(), "s3: {s3:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
15
plugins/reaction-plugin-nftables/src/helpers.rs
Normal file
15
plugins/reaction-plugin-nftables/src/helpers.rs
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
use std::fmt::Display;
|
||||
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
|
||||
pub enum Version {
|
||||
IPv4,
|
||||
IPv6,
|
||||
}
|
||||
impl Display for Version {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(match self {
|
||||
Version::IPv4 => "IPv4",
|
||||
Version::IPv6 => "IPv6",
|
||||
})
|
||||
}
|
||||
}
|
||||
176
plugins/reaction-plugin-nftables/src/main.rs
Normal file
176
plugins/reaction-plugin-nftables/src/main.rs
Normal file
|
|
@ -0,0 +1,176 @@
|
|||
use std::{
|
||||
borrow::Cow,
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
};
|
||||
|
||||
use nftables::{
|
||||
batch::Batch,
|
||||
schema::{Chain, NfListObject, Table},
|
||||
types::{NfChainType, NfFamily},
|
||||
};
|
||||
use reaction_plugin::{
|
||||
ActionConfig, ActionImpl, Hello, Manifest, PluginInfo, RemoteResult, StreamConfig, StreamImpl,
|
||||
shutdown::ShutdownController,
|
||||
};
|
||||
use remoc::rtc;
|
||||
|
||||
use crate::{
|
||||
action::{Action, ActionOptions, Set, SetOptions},
|
||||
nft::NftClient,
|
||||
};
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
mod action;
|
||||
pub mod helpers;
|
||||
mod nft;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let plugin = Plugin::default();
|
||||
reaction_plugin::main_loop(plugin).await;
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct Plugin {
|
||||
nft: NftClient,
|
||||
sets: Vec<Set>,
|
||||
actions: Vec<Action>,
|
||||
shutdown: ShutdownController,
|
||||
}
|
||||
|
||||
impl PluginInfo for Plugin {
|
||||
async fn manifest(&mut self) -> Result<Manifest, rtc::CallError> {
|
||||
Ok(Manifest {
|
||||
hello: Hello::new(),
|
||||
streams: BTreeSet::default(),
|
||||
actions: BTreeSet::from(["nftables".into()]),
|
||||
})
|
||||
}
|
||||
|
||||
async fn load_config(
|
||||
&mut self,
|
||||
streams: Vec<StreamConfig>,
|
||||
actions: Vec<ActionConfig>,
|
||||
) -> RemoteResult<(Vec<StreamImpl>, Vec<ActionImpl>)> {
|
||||
if !streams.is_empty() {
|
||||
return Err("This plugin can't handle any stream type".into());
|
||||
}
|
||||
|
||||
let mut ret_actions = Vec::with_capacity(actions.len());
|
||||
let mut set_options: BTreeMap<String, SetOptions> = BTreeMap::new();
|
||||
|
||||
for ActionConfig {
|
||||
stream_name,
|
||||
filter_name,
|
||||
action_name,
|
||||
action_type,
|
||||
config,
|
||||
patterns,
|
||||
} in actions
|
||||
{
|
||||
if &action_type != "nftables" {
|
||||
return Err("This plugin can't handle other action types than nftables".into());
|
||||
}
|
||||
|
||||
let mut options: ActionOptions = serde_json::from_value(config.into()).map_err(|err| {
|
||||
format!("invalid options for action {stream_name}.{filter_name}.{action_name}: {err}")
|
||||
})?;
|
||||
|
||||
options.set_ip_index(patterns).map_err(|_|
|
||||
format!(
|
||||
"No pattern with name {} in filter {stream_name}.{filter_name}. Try setting the option `pattern` to your pattern name of type 'ip'",
|
||||
&options.pattern
|
||||
)
|
||||
)?;
|
||||
|
||||
// Merge option
|
||||
set_options
|
||||
.entry(options.set.clone())
|
||||
.or_default()
|
||||
.merge(&options.set_options)
|
||||
.map_err(|err| format!("set {}: {err}", options.set))?;
|
||||
|
||||
let (tx, rx) = remoc::rch::mpsc::channel(1);
|
||||
self.actions.push(Action::new(
|
||||
self.nft.clone(),
|
||||
self.shutdown.token(),
|
||||
rx,
|
||||
options,
|
||||
)?);
|
||||
|
||||
ret_actions.push(ActionImpl { tx });
|
||||
}
|
||||
|
||||
// Init all sets
|
||||
while let Some((name, options)) = set_options.pop_first() {
|
||||
self.sets.push(Set::from(name, options));
|
||||
}
|
||||
|
||||
Ok((vec![], ret_actions))
|
||||
}
|
||||
|
||||
async fn start(&mut self) -> RemoteResult<()> {
|
||||
self.shutdown.delegate().handle_quit_signals()?;
|
||||
|
||||
let mut batch = Batch::new();
|
||||
batch.add(reaction_table());
|
||||
|
||||
// Create a chain for each registered netfilter hook
|
||||
for hook in self
|
||||
.sets
|
||||
.iter()
|
||||
.flat_map(|set| &set.hooks)
|
||||
.collect::<BTreeSet<_>>()
|
||||
{
|
||||
batch.add(NfListObject::Chain(Chain {
|
||||
family: NfFamily::INet,
|
||||
table: Cow::Borrowed("reaction"),
|
||||
name: Cow::from(hook.as_str()),
|
||||
_type: Some(NfChainType::Filter),
|
||||
hook: Some(hook.into()),
|
||||
prio: Some(0),
|
||||
..Default::default()
|
||||
}));
|
||||
}
|
||||
|
||||
for set in &self.sets {
|
||||
set.init(&mut batch)?;
|
||||
}
|
||||
|
||||
// TODO apply batch
|
||||
self.nft.send(batch).await?;
|
||||
|
||||
// Launch a task that will destroy the table on shutdown
|
||||
{
|
||||
let token = self.shutdown.token();
|
||||
tokio::spawn(async move {
|
||||
token.wait().await;
|
||||
Batch::new().delete(reaction_table());
|
||||
});
|
||||
}
|
||||
|
||||
// Launch all actions
|
||||
while let Some(action) = self.actions.pop() {
|
||||
tokio::spawn(async move { action.serve().await });
|
||||
}
|
||||
self.actions = Default::default();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn close(self) -> RemoteResult<()> {
|
||||
self.shutdown.ask_shutdown();
|
||||
self.shutdown.wait_all_task_shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn reaction_table() -> NfListObject<'static> {
|
||||
NfListObject::Table(Table {
|
||||
family: NfFamily::INet,
|
||||
name: Cow::Borrowed("reaction"),
|
||||
handle: None,
|
||||
})
|
||||
}
|
||||
81
plugins/reaction-plugin-nftables/src/nft.rs
Normal file
81
plugins/reaction-plugin-nftables/src/nft.rs
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
use std::{
|
||||
ffi::{CStr, CString},
|
||||
thread,
|
||||
};
|
||||
|
||||
use libnftables1_sys::Nftables;
|
||||
use nftables::batch::Batch;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
/// A client with a dedicated server thread to libnftables.
|
||||
/// Calling [`Default::default()`] spawns a new server thread.
|
||||
/// Cloning just creates a new client to the same server thread.
|
||||
#[derive(Clone)]
|
||||
pub struct NftClient {
|
||||
tx: mpsc::Sender<NftCommand>,
|
||||
}
|
||||
|
||||
impl Default for NftClient {
|
||||
fn default() -> Self {
|
||||
let (tx, mut rx) = mpsc::channel(10);
|
||||
|
||||
thread::spawn(move || {
|
||||
let mut conn = Nftables::new();
|
||||
|
||||
while let Some(NftCommand { json, ret }) = rx.blocking_recv() {
|
||||
let (rc, output, error) = conn.run_cmd(json.as_ptr());
|
||||
let res = match rc {
|
||||
0 => to_rust_string(output)
|
||||
.ok_or_else(|| "unknown ok (rc = 0 but no output buffer)".into()),
|
||||
_ => to_rust_string(error)
|
||||
.map(|err| format!("error (rc = {rc}: {err})"))
|
||||
.ok_or_else(|| format!("unknown error (rc = {rc} but no error buffer)")),
|
||||
};
|
||||
let _ = ret.send(res);
|
||||
}
|
||||
});
|
||||
|
||||
NftClient { tx }
|
||||
}
|
||||
}
|
||||
|
||||
impl NftClient {
|
||||
/// Send a batch to nftables.
|
||||
pub async fn send(&self, batch: Batch<'_>) -> Result<String, String> {
|
||||
// convert JSON to CString
|
||||
let mut json = serde_json::to_vec(&batch.to_nftables())
|
||||
.map_err(|err| format!("couldn't build json to send to nftables: {err}"))?;
|
||||
json.push('\0' as u8);
|
||||
let json = CString::from_vec_with_nul(json)
|
||||
.map_err(|err| format!("invalid json with null char: {err}"))?;
|
||||
|
||||
// Send command
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let command = NftCommand { json, ret: tx };
|
||||
self.tx
|
||||
.send(command)
|
||||
.await
|
||||
.map_err(|err| format!("nftables thread has quit, can't send command: {err}"))?;
|
||||
|
||||
// Wait for result
|
||||
rx.await
|
||||
.map_err(|_| format!("nftables thread has quit, no response for command"))?
|
||||
}
|
||||
}
|
||||
|
||||
struct NftCommand {
|
||||
json: CString,
|
||||
ret: oneshot::Sender<Result<String, String>>,
|
||||
}
|
||||
|
||||
fn to_rust_string(c_ptr: *const i8) -> Option<String> {
|
||||
if c_ptr.is_null() {
|
||||
None
|
||||
} else {
|
||||
Some(
|
||||
unsafe { CStr::from_ptr(c_ptr) }
|
||||
.to_string_lossy()
|
||||
.into_owned(),
|
||||
)
|
||||
}
|
||||
}
|
||||
247
plugins/reaction-plugin-nftables/src/tests.rs
Normal file
247
plugins/reaction-plugin-nftables/src/tests.rs
Normal file
|
|
@ -0,0 +1,247 @@
|
|||
use reaction_plugin::{ActionConfig, PluginInfo, StreamConfig, Value};
|
||||
use serde_json::json;
|
||||
|
||||
use crate::Plugin;
|
||||
|
||||
#[tokio::test]
|
||||
async fn conf_stream() {
|
||||
// No stream is supported by nftables
|
||||
assert!(
|
||||
Plugin::default()
|
||||
.load_config(
|
||||
vec![StreamConfig {
|
||||
stream_name: "stream".into(),
|
||||
stream_type: "nftables".into(),
|
||||
config: Value::Null
|
||||
}],
|
||||
vec![]
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
|
||||
// Empty config is ok
|
||||
assert!(Plugin::default().load_config(vec![], vec![]).await.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn conf_action_standalone() {
|
||||
let p = vec!["name".into(), "ip".into(), "ip2".into()];
|
||||
let p_noip = vec!["name".into(), "ip2".into()];
|
||||
|
||||
for (is_ok, conf, patterns) in [
|
||||
// minimal set
|
||||
(true, json!({ "set": "test" }), &p),
|
||||
// missing set key
|
||||
(false, json!({}), &p),
|
||||
(false, json!({ "version": "ipv4" }), &p),
|
||||
// unknown key
|
||||
(false, json!({ "set": "test", "unknown": "yes" }), &p),
|
||||
(false, json!({ "set": "test", "ip_index": 1 }), &p),
|
||||
(false, json!({ "set": "test", "timeout_u32": 1 }), &p),
|
||||
// pattern //
|
||||
(true, json!({ "set": "test" }), &p),
|
||||
(true, json!({ "set": "test", "pattern": "ip" }), &p),
|
||||
(true, json!({ "set": "test", "pattern": "ip2" }), &p),
|
||||
(true, json!({ "set": "test", "pattern": "ip2" }), &p_noip),
|
||||
// unknown pattern "ip"
|
||||
(false, json!({ "set": "test" }), &p_noip),
|
||||
(false, json!({ "set": "test", "pattern": "ip" }), &p_noip),
|
||||
// unknown pattern
|
||||
(false, json!({ "set": "test", "pattern": "unknown" }), &p),
|
||||
(false, json!({ "set": "test", "pattern": "uwu" }), &p_noip),
|
||||
// bad type
|
||||
(false, json!({ "set": "test", "pattern": 0 }), &p_noip),
|
||||
(false, json!({ "set": "test", "pattern": true }), &p_noip),
|
||||
// action //
|
||||
(true, json!({ "set": "test", "action": "add" }), &p),
|
||||
(true, json!({ "set": "test", "action": "delete" }), &p),
|
||||
// unknown action
|
||||
(false, json!({ "set": "test", "action": "create" }), &p),
|
||||
(false, json!({ "set": "test", "action": "insert" }), &p),
|
||||
(false, json!({ "set": "test", "action": "del" }), &p),
|
||||
(false, json!({ "set": "test", "action": "destroy" }), &p),
|
||||
// bad type
|
||||
(false, json!({ "set": "test", "action": true }), &p),
|
||||
(false, json!({ "set": "test", "action": 1 }), &p),
|
||||
// ip version //
|
||||
// ok
|
||||
(true, json!({ "set": "test", "version": "ipv4" }), &p),
|
||||
(true, json!({ "set": "test", "version": "ipv6" }), &p),
|
||||
(true, json!({ "set": "test", "version": "ip" }), &p),
|
||||
// unknown version
|
||||
(false, json!({ "set": "test", "version": 4 }), &p),
|
||||
(false, json!({ "set": "test", "version": 6 }), &p),
|
||||
(false, json!({ "set": "test", "version": 46 }), &p),
|
||||
(false, json!({ "set": "test", "version": "5" }), &p),
|
||||
(false, json!({ "set": "test", "version": "ipv5" }), &p),
|
||||
(false, json!({ "set": "test", "version": "4" }), &p),
|
||||
(false, json!({ "set": "test", "version": "6" }), &p),
|
||||
(false, json!({ "set": "test", "version": "46" }), &p),
|
||||
// bad type
|
||||
(false, json!({ "set": "test", "version": true }), &p),
|
||||
// hooks //
|
||||
// everything is fine really
|
||||
(true, json!({ "set": "test", "hooks": [] }), &p),
|
||||
(
|
||||
true,
|
||||
json!({ "set": "test", "hooks": ["input", "forward", "ingress", "prerouting", "output", "postrouting", "egress"] }),
|
||||
&p,
|
||||
),
|
||||
(false, json!({ "set": "test", "hooks": ["INPUT"] }), &p),
|
||||
(false, json!({ "set": "test", "hooks": ["FORWARD"] }), &p),
|
||||
(
|
||||
false,
|
||||
json!({ "set": "test", "hooks": ["unknown_hook"] }),
|
||||
&p,
|
||||
),
|
||||
// timeout //
|
||||
(true, json!({ "set": "test", "timeout": "1m" }), &p),
|
||||
(true, json!({ "set": "test", "timeout": "3 days" }), &p),
|
||||
// bad
|
||||
(false, json!({ "set": "test", "timeout": "3 dayz"}), &p),
|
||||
(false, json!({ "set": "test", "timeout": 12 }), &p),
|
||||
// target //
|
||||
// anything is fine too
|
||||
(true, json!({ "set": "test", "target": "drop" }), &p),
|
||||
(true, json!({ "set": "test", "target": "accept" }), &p),
|
||||
(true, json!({ "set": "test", "target": "return" }), &p),
|
||||
(true, json!({ "set": "test", "target": "continue" }), &p),
|
||||
// bad
|
||||
(false, json!({ "set": "test", "target": "custom" }), &p),
|
||||
(false, json!({ "set": "test", "target": "DROP" }), &p),
|
||||
(false, json!({ "set": "test", "target": 11 }), &p),
|
||||
(false, json!({ "set": "test", "target": ["DROP"] }), &p),
|
||||
] {
|
||||
let res = Plugin::default()
|
||||
.load_config(
|
||||
vec![],
|
||||
vec![ActionConfig {
|
||||
stream_name: "stream".into(),
|
||||
filter_name: "filter".into(),
|
||||
action_name: "action".into(),
|
||||
action_type: "nftables".into(),
|
||||
config: conf.clone().into(),
|
||||
patterns: patterns.clone(),
|
||||
}],
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(
|
||||
res.is_ok() == is_ok,
|
||||
"conf: {:?}, must be ok: {is_ok}, result: {:?}",
|
||||
conf,
|
||||
// empty Result::Ok because ActionImpl is not Debug
|
||||
res.map(|_| ())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO
|
||||
#[tokio::test]
|
||||
async fn conf_action_merge() {
|
||||
let mut plugin = Plugin::default();
|
||||
|
||||
let set1 = ActionConfig {
|
||||
stream_name: "stream".into(),
|
||||
filter_name: "filter".into(),
|
||||
action_name: "action1".into(),
|
||||
action_type: "nftables".into(),
|
||||
config: json!({
|
||||
"set": "test",
|
||||
"target": "drop",
|
||||
"hooks": ["input"],
|
||||
"action": "add",
|
||||
})
|
||||
.into(),
|
||||
patterns: vec!["ip".into()],
|
||||
};
|
||||
|
||||
let set2 = ActionConfig {
|
||||
stream_name: "stream".into(),
|
||||
filter_name: "filter".into(),
|
||||
action_name: "action2".into(),
|
||||
action_type: "nftables".into(),
|
||||
config: json!({
|
||||
"set": "test",
|
||||
"target": "drop",
|
||||
"version": "ip",
|
||||
"action": "add",
|
||||
})
|
||||
.into(),
|
||||
patterns: vec!["ip".into()],
|
||||
};
|
||||
|
||||
let set3 = ActionConfig {
|
||||
stream_name: "stream".into(),
|
||||
filter_name: "filter".into(),
|
||||
action_name: "action2".into(),
|
||||
action_type: "nftables".into(),
|
||||
config: json!({
|
||||
"set": "test",
|
||||
"action": "delete",
|
||||
})
|
||||
.into(),
|
||||
patterns: vec!["ip".into()],
|
||||
};
|
||||
|
||||
let res = plugin
|
||||
.load_config(
|
||||
vec![],
|
||||
vec![
|
||||
// First set
|
||||
set1.clone(),
|
||||
// Same set, adding options, no conflict
|
||||
set2.clone(),
|
||||
// Same set, no new options, no conflict
|
||||
set3.clone(),
|
||||
// Unrelated set, so no conflict
|
||||
ActionConfig {
|
||||
stream_name: "stream".into(),
|
||||
filter_name: "filter".into(),
|
||||
action_name: "action3".into(),
|
||||
action_type: "nftables".into(),
|
||||
config: json!({
|
||||
"set": "test2",
|
||||
"target": "return",
|
||||
"version": "ipv6",
|
||||
})
|
||||
.into(),
|
||||
patterns: vec!["ip".into()],
|
||||
},
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(res.is_ok(), "res: {:?}", res.map(|_| ()));
|
||||
|
||||
// Another set with conflict is not ok
|
||||
let res = plugin
|
||||
.load_config(
|
||||
vec![],
|
||||
vec![
|
||||
// First set
|
||||
set1,
|
||||
// Same set, adding options, no conflict
|
||||
set2,
|
||||
// Same set, no new options, no conflict
|
||||
set3,
|
||||
// Another set with conflict
|
||||
ActionConfig {
|
||||
stream_name: "stream".into(),
|
||||
filter_name: "filter".into(),
|
||||
action_name: "action3".into(),
|
||||
action_type: "nftables".into(),
|
||||
config: json!({
|
||||
"set": "test",
|
||||
"target": "target2",
|
||||
"action": "del",
|
||||
})
|
||||
.into(),
|
||||
patterns: vec!["ip".into()],
|
||||
},
|
||||
],
|
||||
)
|
||||
.await;
|
||||
assert!(res.is_err(), "res: {:?}", res.map(|_| ()));
|
||||
}
|
||||
|
|
@ -11,7 +11,6 @@ categories = ["security"]
|
|||
description = "Plugin interface for reaction, a daemon that scans logs and takes action (alternative to fail2ban)"
|
||||
|
||||
[dependencies]
|
||||
chrono.workspace = true
|
||||
remoc.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
|
|
|||
|
|
@ -102,9 +102,8 @@ def main():
|
|||
|
||||
You'll need to install minisign to check the authenticity of the package.
|
||||
|
||||
After installing reaction, create your configuration file at
|
||||
`/etc/reaction.json`, `/etc/reaction.jsonnet` or `/etc/reaction.yml`.
|
||||
You can also provide a directory containing multiple configuration files in the previous formats.
|
||||
After installing reaction, create your configuration file(s) in JSON, YAML or JSONnet in the
|
||||
`/etc/reaction/` directory.
|
||||
See <https://reaction.ppom.me> for documentation.
|
||||
|
||||
Reload systemd:
|
||||
|
|
@ -114,8 +113,8 @@ $ sudo systemctl daemon-reload
|
|||
|
||||
Then enable and start reaction with this command
|
||||
```bash
|
||||
# replace `reaction.jsonnet` with the name of your configuration file in /etc/
|
||||
$ sudo systemctl enable --now reaction@reaction.jsonnet.service
|
||||
# write first your configuration file(s) in /etc/reaction/
|
||||
$ sudo systemctl enable --now reaction.service
|
||||
```
|
||||
""".strip(),
|
||||
]
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ pkgs.mkShell {
|
|||
name = "libipset";
|
||||
buildInputs = [
|
||||
ipset
|
||||
nftables
|
||||
clang
|
||||
];
|
||||
src = null;
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
use std::{collections::BTreeMap, io::Error, path, process::Stdio};
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
use std::os::darwin::fs::MetadataExt;
|
||||
#[cfg(target_os = "freebsd")]
|
||||
use std::os::freebsd::fs::MetadataExt;
|
||||
#[cfg(target_os = "illumos")]
|
||||
|
|
@ -170,6 +172,8 @@ impl Plugin {
|
|||
let mut command = Command::new("run0");
|
||||
// --pipe gives direct, non-emulated stdio access, for better performance.
|
||||
command.arg("--pipe");
|
||||
// run the command inside the same slice as reaction
|
||||
command.arg("--slice-inherit");
|
||||
|
||||
// Make path absolute for systemd
|
||||
let full_workdir = path::absolute(&plugin_working_directory)?;
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
use std::{cmp::Ordering, collections::BTreeMap, hash::Hash};
|
||||
|
||||
use reaction_plugin::StreamConfig;
|
||||
use regex::RegexSet;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
|
||||
|
|
@ -19,6 +20,11 @@ pub struct Stream {
|
|||
#[serde(skip)]
|
||||
pub name: String,
|
||||
|
||||
#[serde(skip)]
|
||||
pub compiled_regex_set: RegexSet,
|
||||
#[serde(skip)]
|
||||
pub regex_index_to_filter_name: Vec<String>,
|
||||
|
||||
// Plugin-specific
|
||||
#[serde(default, rename = "type", skip_serializing_if = "Option::is_none")]
|
||||
pub stream_type: Option<String>,
|
||||
|
|
@ -90,6 +96,21 @@ impl Stream {
|
|||
filter.setup(name, key, patterns)?;
|
||||
}
|
||||
|
||||
let all_regexes: BTreeMap<_, _> = self
|
||||
.filters
|
||||
.values()
|
||||
.flat_map(|filter| {
|
||||
filter
|
||||
.regex
|
||||
.iter()
|
||||
.map(|regex| (regex, filter.name.clone()))
|
||||
})
|
||||
.collect();
|
||||
|
||||
self.compiled_regex_set = RegexSet::new(all_regexes.keys())
|
||||
.map_err(|err| format!("too much regexes on the filters of this stream: {err}"))?;
|
||||
self.regex_index_to_filter_name = all_regexes.into_values().collect();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
use std::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
fmt::Display,
|
||||
io,
|
||||
ops::{Deref, DerefMut},
|
||||
process::ExitStatus,
|
||||
|
|
@ -52,7 +53,7 @@ impl PluginManager {
|
|||
let mut child = plugin
|
||||
.launch(state_directory)
|
||||
.await
|
||||
.map_err(|err| format!("could not launch plugin: {err}"))?;
|
||||
.map_err(|err| systemd_error(plugin, "could not launch plugin", err))?;
|
||||
|
||||
{
|
||||
let stderr = child.stderr.take().unwrap();
|
||||
|
|
@ -70,10 +71,7 @@ impl PluginManager {
|
|||
) = Connect::io(remoc::Cfg::default(), stdout, stdin)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
format!(
|
||||
"could not init communication with plugin {}: {err}",
|
||||
plugin.name
|
||||
)
|
||||
systemd_error(plugin, "could not init communication with plugin", err)
|
||||
})?;
|
||||
|
||||
tokio::spawn(conn);
|
||||
|
|
@ -166,6 +164,20 @@ impl PluginManager {
|
|||
}
|
||||
}
|
||||
|
||||
fn systemd_error(plugin: &Plugin, message: &str, err: impl Display) -> String {
|
||||
if plugin.systemd {
|
||||
format!(
|
||||
"{message}: {err}. \
|
||||
`plugins.{0}.systemd` is set to true, so this may be an issue with systemd's run0. \
|
||||
please make sure `sudo run0 ls /` returns the same thing as `sudo ls /` as a test. \
|
||||
if run0 can't be found or doesn't output anything, set `plugins.{0}.systemd` to false.",
|
||||
plugin.name,
|
||||
)
|
||||
} else {
|
||||
format!("{message}: {err}")
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_stderr(stderr: ChildStderr, plugin_name: String) {
|
||||
// read lines until shutdown
|
||||
let lines = reader_to_stream(stderr);
|
||||
|
|
|
|||
|
|
@ -1,11 +1,10 @@
|
|||
use std::{
|
||||
collections::{BTreeMap, BTreeSet, HashMap},
|
||||
collections::{BTreeSet, HashMap},
|
||||
process::Stdio,
|
||||
};
|
||||
|
||||
use futures::{FutureExt, Stream as AsyncStream, StreamExt, future::join_all};
|
||||
use reaction_plugin::{StreamImpl, shutdown::ShutdownToken};
|
||||
use regex::RegexSet;
|
||||
use tokio::{
|
||||
io::{AsyncBufReadExt, BufReader},
|
||||
process::{Child, ChildStderr, ChildStdout, Command},
|
||||
|
|
@ -45,7 +44,6 @@ pub fn reader_to_stream(
|
|||
}
|
||||
|
||||
pub struct StreamManager {
|
||||
compiled_regex_set: RegexSet,
|
||||
regex_index_to_filter_manager: Vec<FilterManager>,
|
||||
stream: &'static Stream,
|
||||
stream_plugin: Option<StreamImpl>,
|
||||
|
|
@ -59,16 +57,6 @@ impl StreamManager {
|
|||
shutdown: ShutdownToken,
|
||||
plugins: &mut Plugins,
|
||||
) -> Result<Self, String> {
|
||||
let all_regexes: BTreeMap<_, _> = filter_managers
|
||||
.iter()
|
||||
.flat_map(|(filter, filter_manager)| {
|
||||
filter
|
||||
.regex
|
||||
.iter()
|
||||
.map(|regex| (regex, filter_manager.clone()))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let stream_plugin = if stream.is_plugin() {
|
||||
Some(
|
||||
plugins
|
||||
|
|
@ -84,11 +72,23 @@ impl StreamManager {
|
|||
None
|
||||
};
|
||||
|
||||
let regex_index_to_filter_manager = stream
|
||||
.regex_index_to_filter_name
|
||||
.iter()
|
||||
.map(|filter_name| {
|
||||
filter_managers
|
||||
.iter()
|
||||
.find(|(filter, _)| filter_name == &filter.name)
|
||||
.unwrap()
|
||||
.1
|
||||
.clone()
|
||||
})
|
||||
.collect();
|
||||
|
||||
debug!("successfully initialized stream {}", stream.name);
|
||||
|
||||
Ok(StreamManager {
|
||||
compiled_regex_set: RegexSet::new(all_regexes.keys()).map_err(|err| err.to_string())?,
|
||||
regex_index_to_filter_manager: all_regexes.into_values().collect(),
|
||||
regex_index_to_filter_manager,
|
||||
stream,
|
||||
stream_plugin,
|
||||
shutdown,
|
||||
|
|
@ -230,7 +230,7 @@ impl StreamManager {
|
|||
}
|
||||
|
||||
fn matching_filters(&self, line: &str) -> BTreeSet<&FilterManager> {
|
||||
let matches = self.compiled_regex_set.matches(line);
|
||||
let matches = self.stream.compiled_regex_set.matches(line);
|
||||
matches
|
||||
.into_iter()
|
||||
.map(|match_| &self.regex_index_to_filter_manager[match_])
|
||||
|
|
|
|||
|
|
@ -41,6 +41,23 @@
|
|||
},
|
||||
},
|
||||
},
|
||||
f2: {
|
||||
regex: [
|
||||
"^can't found <num>$",
|
||||
],
|
||||
retry: 2,
|
||||
retryperiod: '60s',
|
||||
actions: {
|
||||
damn: {
|
||||
cmd: ['notify-send', 'you should not see that', 'ban <num>'],
|
||||
},
|
||||
undamn: {
|
||||
cmd: ['notify-send', 'you should not see that', 'unban <num>'],
|
||||
after: '3s',
|
||||
onexit: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue