Extract ipset options from action options so that it's globally merged

Actions don't manage sets anymore.
Set options are merged at each new action,
then Sets are managed by themselves.
This commit is contained in:
ppom 2026-01-31 12:00:00 +01:00
commit 87a25cf04c
No known key found for this signature in database
4 changed files with 222 additions and 159 deletions

View file

@ -1,12 +1,12 @@
use std::u32;
use std::{fmt::Debug, u32};
use reaction_plugin::{Exec, shutdown::ShutdownToken, time::parse_duration};
use remoc::rch::mpsc as remocMpsc;
use serde::{Deserialize, Serialize};
use crate::ipset::{IpSet, Order, SetChain, SetOptions, Version};
use crate::ipset::{CreateSet, IpSet, Order, SetChain, Version};
#[derive(Default, Serialize, Deserialize)]
#[derive(Default, Serialize, Deserialize, PartialEq, Eq, Clone, Copy)]
pub enum IpVersion {
#[serde(alias = "4")]
V4,
@ -16,34 +16,19 @@ pub enum IpVersion {
#[default]
V46,
}
// impl<'de> Deserialize<'de> for IpVersion {
// fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
// where
// D: Deserializer<'de>,
// {
// match Option::<u8>::deserialize(deserializer)? {
// None => Ok(IpVersion::V46),
// Some(version) => match version {
// 4 => Ok(IpVersion::V4),
// 6 => Ok(IpVersion::V6),
// 46 => Ok(IpVersion::V46),
// _ => Err(D::Error::custom("version must be one of 4, 6 or 46")),
// },
// }
// }
// }
// impl Serialize for IpVersion {
// fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
// where
// S: serde::Serializer,
// {
// serializer.serialize_u8(match self {
// IpVersion::V4 => 4,
// IpVersion::V6 => 6,
// IpVersion::V46 => 46,
// })
// }
// }
impl Debug for IpVersion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
IpVersion::V4 => "4",
IpVersion::V6 => "6",
IpVersion::V46 => "46",
}
)
}
}
#[derive(Default, Serialize, Deserialize)]
pub enum AddDel {
@ -57,116 +42,148 @@ pub enum AddDel {
// FIXME block configs that have different set options for the same name
// treat default values as none?
/// User-facing action options
#[derive(Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ActionOptions {
/// The set that should be used by this action
set: String,
pub set: String,
/// The pattern name of the IP.
/// Defaults to "ip"
#[serde(default = "serde_ip")]
pub pattern: String,
/// The IP type.
/// Defaults to `46`.
/// If `4`: creates an IPv4 set with this name
/// If `6`: creates an IPv6 set with this name
/// If `46`: creates an IPv4 set with its name suffixed by 'v4' AND an IPv6 set with its name suffixed by 'v6'
#[serde(default)]
version: IpVersion,
/// Chains where the IP set should be inserted.
/// Defaults to `["INPUT", "FORWARD"]`
#[serde(default = "serde_chains")]
chains: Vec<String>,
// Optional timeout, letting linux/netfilter handle set removal instead of reaction
// Note that `reaction show` and `reaction flush` won't work if set instead of an `after` action
#[serde(skip_serializing_if = "Option::is_none")]
timeout: Option<String>,
// Target that iptables should use when the IP is encountered.
// Defaults to DROP, but can also be ACCEPT, RETURN or any user-defined chain
#[serde(default = "serde_drop")]
target: String,
#[serde(skip)]
ip_index: usize,
// Whether the action is to "add" or "del" the ip from the set
#[serde(default)]
action: AddDel,
#[serde(flatten)]
pub set_options: SetOptions,
}
fn serde_ip() -> String {
"ip".into()
}
fn serde_drop() -> String {
"DROP".into()
}
fn serde_chains() -> Vec<String> {
vec!["INPUT".into(), "FORWARD".into()]
impl ActionOptions {
pub fn set_ip_index(&mut self, patterns: Vec<String>) -> Result<(), ()> {
self.ip_index = patterns
.into_iter()
.enumerate()
.filter(|(_, name)| name == &self.pattern)
.next()
.ok_or(())?
.0;
Ok(())
}
}
pub struct Action {
ipset: IpSet,
rx: remocMpsc::Receiver<Exec>,
shutdown: ShutdownToken,
ipv4_set: Option<String>,
ipv6_set: Option<String>,
// index of pattern ip in match vec
ip_index: usize,
/// Merged set options
#[derive(Default, Deserialize, Serialize)]
pub struct SetOptions {
/// The IP type.
/// Defaults to `46`.
/// If `4`: creates an IPv4 set with this name
/// If `6`: creates an IPv6 set with this name
/// If `46`: creates an IPv4 set with its name suffixed by 'v4' AND an IPv6 set with its name suffixed by 'v6'
/// *Merged set-wise*.
#[serde(default)]
version: Option<IpVersion>,
/// Chains where the IP set should be inserted.
/// Defaults to `["INPUT", "FORWARD"]`
/// *Merged set-wise*.
#[serde(default)]
chains: Option<Vec<String>>,
// Optional timeout, letting linux/netfilter handle set removal instead of reaction
// Note that `reaction show` and `reaction flush` won't work if set instead of an `after` action
/// *Merged set-wise*.
#[serde(skip_serializing_if = "Option::is_none")]
timeout: Option<String>,
#[serde(skip)]
timeout_u32: Option<u32>,
// Target that iptables should use when the IP is encountered.
// Defaults to DROP, but can also be ACCEPT, RETURN or any user-defined chain
/// *Merged set-wise*.
#[serde(default)]
target: Option<String>,
}
impl SetOptions {
pub fn merge(&mut self, options: &SetOptions) -> Result<(), String> {
// merge two Option<T> and fail if there is conflict
fn inner_merge<T: Eq + Clone + std::fmt::Debug>(
a: &mut Option<T>,
b: &Option<T>,
name: &str,
) -> Result<(), String> {
match (&a, &b) {
(Some(aa), Some(bb)) => {
if aa != bb {
return Err(format!(
"Conflicting options for {name}: `{aa:?}` and `{bb:?}`"
));
}
}
(None, Some(_)) => {
*a = b.clone();
}
_ => (),
};
Ok(())
}
inner_merge(&mut self.version, &options.version, "version")?;
inner_merge(&mut self.timeout, &options.timeout, "timeout")?;
inner_merge(&mut self.chains, &options.chains, "chains")?;
inner_merge(&mut self.target, &options.target, "target")?;
if let Some(timeout) = &self.timeout {
let duration = parse_duration(timeout)
.map_err(|err| format!("failed to parse timeout: {}", err))?
.as_secs();
if duration > u32::MAX as u64 {
return Err(format!(
"timeout is limited to {} seconds (approx {} days)",
u32::MAX,
49_000
));
}
self.timeout_u32 = Some(duration as u32);
}
Ok(())
}
}
pub struct Set {
sets: SetNames,
chains: Vec<String>,
timeout: Option<u32>,
target: String,
action: AddDel,
}
impl Action {
pub fn new(
ipset: IpSet,
shutdown: ShutdownToken,
ip_index: usize,
rx: remocMpsc::Receiver<Exec>,
options: ActionOptions,
) -> Result<Self, String> {
Ok(Action {
ipset,
rx,
shutdown,
ip_index,
action: options.action,
target: options.target,
chains: options.chains,
timeout: if let Some(timeout) = options.timeout {
let duration = parse_duration(&timeout)
.map_err(|err| format!("failed to parse timeout: {}", err))?
.as_secs();
if duration > u32::MAX as u64 {
return Err(format!(
"timeout is limited to {} seconds (approx {} days)",
u32::MAX,
49_000
));
}
Some(duration as u32)
} else {
None
},
ipv4_set: match options.version {
IpVersion::V4 => Some(options.set.clone()),
IpVersion::V6 => None,
IpVersion::V46 => Some(format!("{}v4", options.set)),
},
ipv6_set: match options.version {
IpVersion::V4 => None,
IpVersion::V6 => Some(options.set),
IpVersion::V46 => Some(format!("{}v6", options.set)),
},
})
impl Set {
pub fn from(name: String, options: SetOptions) -> Self {
Self {
sets: SetNames::new(name, options.version),
timeout: options.timeout_u32,
target: options.target.unwrap_or("DROP".into()),
chains: options
.chains
.unwrap_or(vec!["INPUT".into(), "FORWARD".into()]),
}
}
pub async fn init(&mut self) -> Result<(), String> {
pub async fn init(&self, ipset: &mut IpSet) -> Result<(), String> {
for (set, version) in [
(&self.ipv4_set, Version::IPv4),
(&self.ipv6_set, Version::IPv6),
(&self.sets.ipv4, Version::IPv4),
(&self.sets.ipv6, Version::IPv6),
] {
if let Some(set) = set {
// create set
self.ipset
.order(Order::CreateSet(SetOptions {
ipset
.order(Order::CreateSet(CreateSet {
name: set.clone(),
version,
timeout: self.timeout,
@ -174,7 +191,7 @@ impl Action {
.await?;
// insert set in chains
for chain in &self.chains {
self.ipset
ipset
.order(Order::InsertSet(SetChain {
set: set.clone(),
chain: chain.clone(),
@ -187,15 +204,14 @@ impl Action {
Ok(())
}
pub async fn destroy(&mut self) {
pub async fn destroy(&self, ipset: &mut IpSet) {
for (set, version) in [
(&self.ipv4_set, Version::IPv4),
(&self.ipv6_set, Version::IPv6),
(&self.sets.ipv4, Version::IPv4),
(&self.sets.ipv6, Version::IPv6),
] {
if let Some(set) = set {
for chain in &self.chains {
if let Err(err) = self
.ipset
if let Err(err) = ipset
.order(Order::RemoveSet(SetChain {
set: set.clone(),
chain: chain.clone(),
@ -208,12 +224,62 @@ impl Action {
);
}
}
if let Err(err) = self.ipset.order(Order::DestroySet(set.clone())).await {
if let Err(err) = ipset.order(Order::DestroySet(set.clone())).await {
println!("ERROR while destroying {version} set {set}: {err}");
}
}
}
}
}
pub struct SetNames {
pub ipv4: Option<String>,
pub ipv6: Option<String>,
}
impl SetNames {
pub fn new(name: String, version: Option<IpVersion>) -> Self {
Self {
ipv4: match version {
Some(IpVersion::V4) => Some(name.clone()),
Some(IpVersion::V6) => None,
None | Some(IpVersion::V46) => Some(format!("{}v4", name)),
},
ipv6: match version {
Some(IpVersion::V4) => None,
Some(IpVersion::V6) => Some(name),
None | Some(IpVersion::V46) => Some(format!("{}v6", name)),
},
}
}
}
pub struct Action {
ipset: IpSet,
rx: remocMpsc::Receiver<Exec>,
shutdown: ShutdownToken,
sets: SetNames,
// index of pattern ip in match vec
ip_index: usize,
action: AddDel,
}
impl Action {
pub fn new(
ipset: IpSet,
shutdown: ShutdownToken,
rx: remocMpsc::Receiver<Exec>,
options: ActionOptions,
) -> Result<Self, String> {
Ok(Action {
ipset,
rx,
shutdown,
sets: SetNames::new(options.set, options.set_options.version),
ip_index: options.ip_index,
action: options.action,
})
}
pub async fn serve(mut self) {
loop {
@ -241,7 +307,6 @@ impl Action {
}
}
self.shutdown.ask_shutdown();
self.destroy().await;
}
async fn handle_exec(&mut self, mut exec: Exec) -> Result<(), String> {
@ -255,7 +320,7 @@ impl Action {
}
let ip = exec.match_.remove(self.ip_index);
// select set
let set = match (&self.ipv4_set, &self.ipv6_set) {
let set = match (&self.sets.ipv4, &self.sets.ipv6) {
(None, None) => return Err(format!("action is neither IPv4 nor IPv6, this is a bug!")),
(None, Some(set)) => set,
(Some(set), None) => set,

View file

@ -1,10 +1,4 @@
use std::{
collections::{BTreeMap, BTreeSet},
fmt::Display,
net::Ipv4Addr,
process::Command,
thread,
};
use std::{collections::BTreeMap, fmt::Display, net::Ipv4Addr, process::Command, thread};
use ipset::{
Session,
@ -27,7 +21,7 @@ impl Display for Version {
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct SetOptions {
pub struct CreateSet {
pub name: String,
pub version: Version,
pub timeout: Option<u32>,
@ -42,7 +36,7 @@ pub struct SetChain {
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)]
pub enum Order {
CreateSet(SetOptions),
CreateSet(CreateSet),
DestroySet(String),
InsertSet(SetChain),
RemoveSet(SetChain),
@ -112,12 +106,6 @@ struct Set {
struct IPsetManager {
// IPset sessions
sessions: BTreeMap<String, Set>,
// All set-wise commands already run, to ignore duplicates.
// Duplicates are the natural cause of multiple actions
// (eg. add action and del action) manipulating the same sets.
//
// It's fine because no order should be run twice during the runtime of the plugin.
journal: BTreeSet<Order>,
}
impl IPsetManager {
@ -134,19 +122,8 @@ impl IPsetManager {
}
fn handle_order(&mut self, order: Order) -> Result<(), String> {
// We only journal set-wise orders
// We skip the order if already run.
match order {
Order::Add(_, _) | Order::Del(_, _) => (),
_ => {
if !self.journal.insert(order.clone()) {
return Ok(());
}
}
};
match order {
Order::CreateSet(SetOptions {
Order::CreateSet(CreateSet {
name,
version,
timeout,

View file

@ -1,13 +1,13 @@
use std::collections::BTreeSet;
use std::collections::{BTreeMap, BTreeSet};
use reaction_plugin::{
ActionImpl, Hello, Manifest, PluginInfo, RemoteError, RemoteResult, StreamImpl, Value,
shutdown::ShutdownController,
shutdown::{ShutdownController, ShutdownToken},
};
use remoc::rtc;
use crate::{
action::{Action, ActionOptions},
action::{Action, ActionOptions, Set, SetOptions},
ipset::IpSet,
};
@ -26,6 +26,8 @@ async fn main() {
#[derive(Default)]
struct Plugin {
ipset: IpSet,
set_options: BTreeMap<String, SetOptions>,
sets: Vec<Set>,
actions: Vec<Action>,
shutdown: ShutdownController,
}
@ -61,28 +63,28 @@ impl PluginInfo for Plugin {
return Err("This plugin can't handle other action types than ipset".into());
}
let options: ActionOptions = serde_json::from_value(config.into()).map_err(|err| {
let mut options: ActionOptions = serde_json::from_value(config.into()).map_err(|err| {
format!("invalid options for action {stream_name}.{filter_name}.{action_name}: {err}")
})?;
let ip_index = patterns
.into_iter()
.enumerate()
.filter(|(_, name)| name == &options.pattern)
.next()
.ok_or_else(|| {
options.set_ip_index(patterns).map_err(|_|
format!(
"No pattern with name {} in filter {stream_name}.{filter_name}. Try setting the option `pattern` to your pattern name of type 'ip'",
options.pattern
&options.pattern
)
})?
.0;
)?;
// Merge option
self.set_options
.entry(options.set.clone())
.or_default()
.merge(&options.set_options)
.map_err(|err| format!("ipset {}: {err}", options.set))?;
let (tx, rx) = remoc::rch::mpsc::channel(1);
self.actions.push(Action::new(
self.ipset.clone(),
self.shutdown.token(),
ip_index,
rx,
options,
)?);
@ -92,28 +94,40 @@ impl PluginInfo for Plugin {
async fn finish_setup(&mut self) -> RemoteResult<()> {
// Init all sets
while let Some((name, options)) = self.set_options.pop_first() {
self.sets.push(Set::from(name, options));
}
self.set_options = Default::default();
let mut first_error = None;
for (i, action) in self.actions.iter_mut().enumerate() {
for (i, set) in self.sets.iter().enumerate() {
// Retain if error
if let Err(err) = action.init().await {
if let Err(err) = set.init(&mut self.ipset).await {
first_error = Some((i, RemoteError::Plugin(err)));
break;
}
}
// Destroy initialized sets if error
if let Some((i, err)) = first_error {
for action in self.actions.iter_mut().take(i + 1) {
let _ = action.destroy().await;
for set in self.sets.iter().take(i + 1) {
let _ = set.destroy(&mut self.ipset).await;
}
return Err(err);
}
// Launch a task that will destroy the sets on shutdown
tokio::spawn(destroy_sets_at_shutdown(
self.ipset.clone(),
std::mem::take(&mut self.sets),
self.shutdown.token(),
));
// Launch all actions
while let Some(action) = self.actions.pop() {
tokio::spawn(async move { action.serve() });
}
self.actions = Default::default();
Ok(())
}
@ -123,3 +137,10 @@ impl PluginInfo for Plugin {
Ok(())
}
}
async fn destroy_sets_at_shutdown(mut ipset: IpSet, sets: Vec<Set>, shutdown: ShutdownToken) {
shutdown.wait().await;
for set in sets {
set.destroy(&mut ipset).await;
}
}

View file

@ -9,7 +9,7 @@
},
plugins: {
cluster: {
ipset: {
path: './target/debug/reaction-plugin-ipset',
check_root: false,
systemd_options: {