From 87a25cf04c7fb3d915899f2d7178782304e5a5d7 Mon Sep 17 00:00:00 2001 From: ppom Date: Sat, 31 Jan 2026 12:00:00 +0100 Subject: [PATCH] Extract ipset options from action options so that it's globally merged Actions don't manage sets anymore. Set options are merged at each new action, then Sets are managed by themselves. --- plugins/reaction-plugin-ipset/src/action.rs | 307 ++++++++++++-------- plugins/reaction-plugin-ipset/src/ipset.rs | 31 +- plugins/reaction-plugin-ipset/src/main.rs | 59 ++-- tests/test-conf/test-ipset.jsonnet | 2 +- 4 files changed, 231 insertions(+), 168 deletions(-) diff --git a/plugins/reaction-plugin-ipset/src/action.rs b/plugins/reaction-plugin-ipset/src/action.rs index e3b63b1..32f2a0a 100644 --- a/plugins/reaction-plugin-ipset/src/action.rs +++ b/plugins/reaction-plugin-ipset/src/action.rs @@ -1,12 +1,12 @@ -use std::u32; +use std::{fmt::Debug, u32}; use reaction_plugin::{Exec, shutdown::ShutdownToken, time::parse_duration}; use remoc::rch::mpsc as remocMpsc; use serde::{Deserialize, Serialize}; -use crate::ipset::{IpSet, Order, SetChain, SetOptions, Version}; +use crate::ipset::{CreateSet, IpSet, Order, SetChain, Version}; -#[derive(Default, Serialize, Deserialize)] +#[derive(Default, Serialize, Deserialize, PartialEq, Eq, Clone, Copy)] pub enum IpVersion { #[serde(alias = "4")] V4, @@ -16,34 +16,19 @@ pub enum IpVersion { #[default] V46, } -// impl<'de> Deserialize<'de> for IpVersion { -// fn deserialize(deserializer: D) -> Result -// where -// D: Deserializer<'de>, -// { -// match Option::::deserialize(deserializer)? { -// None => Ok(IpVersion::V46), -// Some(version) => match version { -// 4 => Ok(IpVersion::V4), -// 6 => Ok(IpVersion::V6), -// 46 => Ok(IpVersion::V46), -// _ => Err(D::Error::custom("version must be one of 4, 6 or 46")), -// }, -// } -// } -// } -// impl Serialize for IpVersion { -// fn serialize(&self, serializer: S) -> Result -// where -// S: serde::Serializer, -// { -// serializer.serialize_u8(match self { -// IpVersion::V4 => 4, -// IpVersion::V6 => 6, -// IpVersion::V46 => 46, -// }) -// } -// } +impl Debug for IpVersion { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + IpVersion::V4 => "4", + IpVersion::V6 => "6", + IpVersion::V46 => "46", + } + ) + } +} #[derive(Default, Serialize, Deserialize)] pub enum AddDel { @@ -57,116 +42,148 @@ pub enum AddDel { // FIXME block configs that have different set options for the same name // treat default values as none? +/// User-facing action options #[derive(Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct ActionOptions { /// The set that should be used by this action - set: String, + pub set: String, /// The pattern name of the IP. /// Defaults to "ip" #[serde(default = "serde_ip")] pub pattern: String, - /// The IP type. - /// Defaults to `46`. - /// If `4`: creates an IPv4 set with this name - /// If `6`: creates an IPv6 set with this name - /// If `46`: creates an IPv4 set with its name suffixed by 'v4' AND an IPv6 set with its name suffixed by 'v6' - #[serde(default)] - version: IpVersion, - /// Chains where the IP set should be inserted. - /// Defaults to `["INPUT", "FORWARD"]` - #[serde(default = "serde_chains")] - chains: Vec, - // Optional timeout, letting linux/netfilter handle set removal instead of reaction - // Note that `reaction show` and `reaction flush` won't work if set instead of an `after` action - #[serde(skip_serializing_if = "Option::is_none")] - timeout: Option, - // Target that iptables should use when the IP is encountered. - // Defaults to DROP, but can also be ACCEPT, RETURN or any user-defined chain - #[serde(default = "serde_drop")] - target: String, + #[serde(skip)] + ip_index: usize, // Whether the action is to "add" or "del" the ip from the set #[serde(default)] action: AddDel, + + #[serde(flatten)] + pub set_options: SetOptions, } fn serde_ip() -> String { "ip".into() } -fn serde_drop() -> String { - "DROP".into() -} -fn serde_chains() -> Vec { - vec!["INPUT".into(), "FORWARD".into()] + +impl ActionOptions { + pub fn set_ip_index(&mut self, patterns: Vec) -> Result<(), ()> { + self.ip_index = patterns + .into_iter() + .enumerate() + .filter(|(_, name)| name == &self.pattern) + .next() + .ok_or(())? + .0; + Ok(()) + } } -pub struct Action { - ipset: IpSet, - rx: remocMpsc::Receiver, - shutdown: ShutdownToken, - ipv4_set: Option, - ipv6_set: Option, - // index of pattern ip in match vec - ip_index: usize, +/// Merged set options +#[derive(Default, Deserialize, Serialize)] +pub struct SetOptions { + /// The IP type. + /// Defaults to `46`. + /// If `4`: creates an IPv4 set with this name + /// If `6`: creates an IPv6 set with this name + /// If `46`: creates an IPv4 set with its name suffixed by 'v4' AND an IPv6 set with its name suffixed by 'v6' + /// *Merged set-wise*. + #[serde(default)] + version: Option, + /// Chains where the IP set should be inserted. + /// Defaults to `["INPUT", "FORWARD"]` + /// *Merged set-wise*. + #[serde(default)] + chains: Option>, + // Optional timeout, letting linux/netfilter handle set removal instead of reaction + // Note that `reaction show` and `reaction flush` won't work if set instead of an `after` action + /// *Merged set-wise*. + #[serde(skip_serializing_if = "Option::is_none")] + timeout: Option, + #[serde(skip)] + timeout_u32: Option, + // Target that iptables should use when the IP is encountered. + // Defaults to DROP, but can also be ACCEPT, RETURN or any user-defined chain + /// *Merged set-wise*. + #[serde(default)] + target: Option, +} + +impl SetOptions { + pub fn merge(&mut self, options: &SetOptions) -> Result<(), String> { + // merge two Option and fail if there is conflict + fn inner_merge( + a: &mut Option, + b: &Option, + name: &str, + ) -> Result<(), String> { + match (&a, &b) { + (Some(aa), Some(bb)) => { + if aa != bb { + return Err(format!( + "Conflicting options for {name}: `{aa:?}` and `{bb:?}`" + )); + } + } + (None, Some(_)) => { + *a = b.clone(); + } + _ => (), + }; + Ok(()) + } + + inner_merge(&mut self.version, &options.version, "version")?; + inner_merge(&mut self.timeout, &options.timeout, "timeout")?; + inner_merge(&mut self.chains, &options.chains, "chains")?; + inner_merge(&mut self.target, &options.target, "target")?; + + if let Some(timeout) = &self.timeout { + let duration = parse_duration(timeout) + .map_err(|err| format!("failed to parse timeout: {}", err))? + .as_secs(); + if duration > u32::MAX as u64 { + return Err(format!( + "timeout is limited to {} seconds (approx {} days)", + u32::MAX, + 49_000 + )); + } + self.timeout_u32 = Some(duration as u32); + } + + Ok(()) + } +} + +pub struct Set { + sets: SetNames, chains: Vec, timeout: Option, target: String, - action: AddDel, } -impl Action { - pub fn new( - ipset: IpSet, - shutdown: ShutdownToken, - ip_index: usize, - rx: remocMpsc::Receiver, - options: ActionOptions, - ) -> Result { - Ok(Action { - ipset, - rx, - shutdown, - ip_index, - action: options.action, - target: options.target, - chains: options.chains, - timeout: if let Some(timeout) = options.timeout { - let duration = parse_duration(&timeout) - .map_err(|err| format!("failed to parse timeout: {}", err))? - .as_secs(); - if duration > u32::MAX as u64 { - return Err(format!( - "timeout is limited to {} seconds (approx {} days)", - u32::MAX, - 49_000 - )); - } - Some(duration as u32) - } else { - None - }, - ipv4_set: match options.version { - IpVersion::V4 => Some(options.set.clone()), - IpVersion::V6 => None, - IpVersion::V46 => Some(format!("{}v4", options.set)), - }, - ipv6_set: match options.version { - IpVersion::V4 => None, - IpVersion::V6 => Some(options.set), - IpVersion::V46 => Some(format!("{}v6", options.set)), - }, - }) +impl Set { + pub fn from(name: String, options: SetOptions) -> Self { + Self { + sets: SetNames::new(name, options.version), + timeout: options.timeout_u32, + target: options.target.unwrap_or("DROP".into()), + chains: options + .chains + .unwrap_or(vec!["INPUT".into(), "FORWARD".into()]), + } } - pub async fn init(&mut self) -> Result<(), String> { + pub async fn init(&self, ipset: &mut IpSet) -> Result<(), String> { for (set, version) in [ - (&self.ipv4_set, Version::IPv4), - (&self.ipv6_set, Version::IPv6), + (&self.sets.ipv4, Version::IPv4), + (&self.sets.ipv6, Version::IPv6), ] { if let Some(set) = set { // create set - self.ipset - .order(Order::CreateSet(SetOptions { + ipset + .order(Order::CreateSet(CreateSet { name: set.clone(), version, timeout: self.timeout, @@ -174,7 +191,7 @@ impl Action { .await?; // insert set in chains for chain in &self.chains { - self.ipset + ipset .order(Order::InsertSet(SetChain { set: set.clone(), chain: chain.clone(), @@ -187,15 +204,14 @@ impl Action { Ok(()) } - pub async fn destroy(&mut self) { + pub async fn destroy(&self, ipset: &mut IpSet) { for (set, version) in [ - (&self.ipv4_set, Version::IPv4), - (&self.ipv6_set, Version::IPv6), + (&self.sets.ipv4, Version::IPv4), + (&self.sets.ipv6, Version::IPv6), ] { if let Some(set) = set { for chain in &self.chains { - if let Err(err) = self - .ipset + if let Err(err) = ipset .order(Order::RemoveSet(SetChain { set: set.clone(), chain: chain.clone(), @@ -208,12 +224,62 @@ impl Action { ); } } - if let Err(err) = self.ipset.order(Order::DestroySet(set.clone())).await { + if let Err(err) = ipset.order(Order::DestroySet(set.clone())).await { println!("ERROR while destroying {version} set {set}: {err}"); } } } } +} + +pub struct SetNames { + pub ipv4: Option, + pub ipv6: Option, +} + +impl SetNames { + pub fn new(name: String, version: Option) -> Self { + Self { + ipv4: match version { + Some(IpVersion::V4) => Some(name.clone()), + Some(IpVersion::V6) => None, + None | Some(IpVersion::V46) => Some(format!("{}v4", name)), + }, + ipv6: match version { + Some(IpVersion::V4) => None, + Some(IpVersion::V6) => Some(name), + None | Some(IpVersion::V46) => Some(format!("{}v6", name)), + }, + } + } +} + +pub struct Action { + ipset: IpSet, + rx: remocMpsc::Receiver, + shutdown: ShutdownToken, + sets: SetNames, + // index of pattern ip in match vec + ip_index: usize, + action: AddDel, +} + +impl Action { + pub fn new( + ipset: IpSet, + shutdown: ShutdownToken, + rx: remocMpsc::Receiver, + options: ActionOptions, + ) -> Result { + Ok(Action { + ipset, + rx, + shutdown, + sets: SetNames::new(options.set, options.set_options.version), + ip_index: options.ip_index, + action: options.action, + }) + } pub async fn serve(mut self) { loop { @@ -241,7 +307,6 @@ impl Action { } } self.shutdown.ask_shutdown(); - self.destroy().await; } async fn handle_exec(&mut self, mut exec: Exec) -> Result<(), String> { @@ -255,7 +320,7 @@ impl Action { } let ip = exec.match_.remove(self.ip_index); // select set - let set = match (&self.ipv4_set, &self.ipv6_set) { + let set = match (&self.sets.ipv4, &self.sets.ipv6) { (None, None) => return Err(format!("action is neither IPv4 nor IPv6, this is a bug!")), (None, Some(set)) => set, (Some(set), None) => set, diff --git a/plugins/reaction-plugin-ipset/src/ipset.rs b/plugins/reaction-plugin-ipset/src/ipset.rs index 3c4ded9..46a2c97 100644 --- a/plugins/reaction-plugin-ipset/src/ipset.rs +++ b/plugins/reaction-plugin-ipset/src/ipset.rs @@ -1,10 +1,4 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - fmt::Display, - net::Ipv4Addr, - process::Command, - thread, -}; +use std::{collections::BTreeMap, fmt::Display, net::Ipv4Addr, process::Command, thread}; use ipset::{ Session, @@ -27,7 +21,7 @@ impl Display for Version { } #[derive(PartialEq, Eq, PartialOrd, Ord, Clone)] -pub struct SetOptions { +pub struct CreateSet { pub name: String, pub version: Version, pub timeout: Option, @@ -42,7 +36,7 @@ pub struct SetChain { #[derive(PartialEq, Eq, PartialOrd, Ord, Clone)] pub enum Order { - CreateSet(SetOptions), + CreateSet(CreateSet), DestroySet(String), InsertSet(SetChain), RemoveSet(SetChain), @@ -112,12 +106,6 @@ struct Set { struct IPsetManager { // IPset sessions sessions: BTreeMap, - // All set-wise commands already run, to ignore duplicates. - // Duplicates are the natural cause of multiple actions - // (eg. add action and del action) manipulating the same sets. - // - // It's fine because no order should be run twice during the runtime of the plugin. - journal: BTreeSet, } impl IPsetManager { @@ -134,19 +122,8 @@ impl IPsetManager { } fn handle_order(&mut self, order: Order) -> Result<(), String> { - // We only journal set-wise orders - // We skip the order if already run. match order { - Order::Add(_, _) | Order::Del(_, _) => (), - _ => { - if !self.journal.insert(order.clone()) { - return Ok(()); - } - } - }; - - match order { - Order::CreateSet(SetOptions { + Order::CreateSet(CreateSet { name, version, timeout, diff --git a/plugins/reaction-plugin-ipset/src/main.rs b/plugins/reaction-plugin-ipset/src/main.rs index 74e494e..7be98b0 100644 --- a/plugins/reaction-plugin-ipset/src/main.rs +++ b/plugins/reaction-plugin-ipset/src/main.rs @@ -1,13 +1,13 @@ -use std::collections::BTreeSet; +use std::collections::{BTreeMap, BTreeSet}; use reaction_plugin::{ ActionImpl, Hello, Manifest, PluginInfo, RemoteError, RemoteResult, StreamImpl, Value, - shutdown::ShutdownController, + shutdown::{ShutdownController, ShutdownToken}, }; use remoc::rtc; use crate::{ - action::{Action, ActionOptions}, + action::{Action, ActionOptions, Set, SetOptions}, ipset::IpSet, }; @@ -26,6 +26,8 @@ async fn main() { #[derive(Default)] struct Plugin { ipset: IpSet, + set_options: BTreeMap, + sets: Vec, actions: Vec, shutdown: ShutdownController, } @@ -61,28 +63,28 @@ impl PluginInfo for Plugin { return Err("This plugin can't handle other action types than ipset".into()); } - let options: ActionOptions = serde_json::from_value(config.into()).map_err(|err| { + let mut options: ActionOptions = serde_json::from_value(config.into()).map_err(|err| { format!("invalid options for action {stream_name}.{filter_name}.{action_name}: {err}") })?; - let ip_index = patterns - .into_iter() - .enumerate() - .filter(|(_, name)| name == &options.pattern) - .next() - .ok_or_else(|| { + options.set_ip_index(patterns).map_err(|_| format!( "No pattern with name {} in filter {stream_name}.{filter_name}. Try setting the option `pattern` to your pattern name of type 'ip'", - options.pattern + &options.pattern ) - })? - .0; + )?; + + // Merge option + self.set_options + .entry(options.set.clone()) + .or_default() + .merge(&options.set_options) + .map_err(|err| format!("ipset {}: {err}", options.set))?; let (tx, rx) = remoc::rch::mpsc::channel(1); self.actions.push(Action::new( self.ipset.clone(), self.shutdown.token(), - ip_index, rx, options, )?); @@ -92,28 +94,40 @@ impl PluginInfo for Plugin { async fn finish_setup(&mut self) -> RemoteResult<()> { // Init all sets + while let Some((name, options)) = self.set_options.pop_first() { + self.sets.push(Set::from(name, options)); + } + self.set_options = Default::default(); + let mut first_error = None; - for (i, action) in self.actions.iter_mut().enumerate() { + for (i, set) in self.sets.iter().enumerate() { // Retain if error - if let Err(err) = action.init().await { + if let Err(err) = set.init(&mut self.ipset).await { first_error = Some((i, RemoteError::Plugin(err))); break; } } // Destroy initialized sets if error if let Some((i, err)) = first_error { - for action in self.actions.iter_mut().take(i + 1) { - let _ = action.destroy().await; + for set in self.sets.iter().take(i + 1) { + let _ = set.destroy(&mut self.ipset).await; } return Err(err); } + // Launch a task that will destroy the sets on shutdown + tokio::spawn(destroy_sets_at_shutdown( + self.ipset.clone(), + std::mem::take(&mut self.sets), + self.shutdown.token(), + )); + // Launch all actions while let Some(action) = self.actions.pop() { tokio::spawn(async move { action.serve() }); } - self.actions = Default::default(); + Ok(()) } @@ -123,3 +137,10 @@ impl PluginInfo for Plugin { Ok(()) } } + +async fn destroy_sets_at_shutdown(mut ipset: IpSet, sets: Vec, shutdown: ShutdownToken) { + shutdown.wait().await; + for set in sets { + set.destroy(&mut ipset).await; + } +} diff --git a/tests/test-conf/test-ipset.jsonnet b/tests/test-conf/test-ipset.jsonnet index d8769aa..25e316d 100644 --- a/tests/test-conf/test-ipset.jsonnet +++ b/tests/test-conf/test-ipset.jsonnet @@ -9,7 +9,7 @@ }, plugins: { - cluster: { + ipset: { path: './target/debug/reaction-plugin-ipset', check_root: false, systemd_options: {