From 2f7c32aa8cf26c321fd8a6f200d5a24d3e6a7e6a Mon Sep 17 00:00:00 2001 From: ppom Date: Thu, 29 Jan 2026 12:00:00 +0100 Subject: [PATCH] ipset: Add the add/del option, journal orders & deduplicate them --- plugins/reaction-plugin-ipset/src/action.rs | 91 +++++++++++++-------- plugins/reaction-plugin-ipset/src/ipset.rs | 65 +++++++++++---- tests/test-conf/test-ipset.jsonnet | 43 +++------- 3 files changed, 117 insertions(+), 82 deletions(-) diff --git a/plugins/reaction-plugin-ipset/src/action.rs b/plugins/reaction-plugin-ipset/src/action.rs index f4277f4..e3b63b1 100644 --- a/plugins/reaction-plugin-ipset/src/action.rs +++ b/plugins/reaction-plugin-ipset/src/action.rs @@ -2,41 +2,56 @@ use std::u32; use reaction_plugin::{Exec, shutdown::ShutdownToken, time::parse_duration}; use remoc::rch::mpsc as remocMpsc; -use serde::{Deserialize, Serialize, de::Deserializer, de::Error}; +use serde::{Deserialize, Serialize}; use crate::ipset::{IpSet, Order, SetChain, SetOptions, Version}; + +#[derive(Default, Serialize, Deserialize)] pub enum IpVersion { + #[serde(alias = "4")] V4, + #[serde(alias = "6")] V6, + #[serde(alias = "46")] + #[default] V46, } -impl<'de> Deserialize<'de> for IpVersion { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - match Option::::deserialize(deserializer)? { - None => Ok(IpVersion::V46), - Some(version) => match version { - 4 => Ok(IpVersion::V4), - 6 => Ok(IpVersion::V6), - 46 => Ok(IpVersion::V46), - _ => Err(D::Error::custom("version must be one of 4, 6 or 46")), - }, - } - } -} -impl Serialize for IpVersion { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - serializer.serialize_u8(match self { - IpVersion::V4 => 4, - IpVersion::V6 => 6, - IpVersion::V46 => 46, - }) - } +// impl<'de> Deserialize<'de> for IpVersion { +// fn deserialize(deserializer: D) -> Result +// where +// D: Deserializer<'de>, +// { +// match Option::::deserialize(deserializer)? { +// None => Ok(IpVersion::V46), +// Some(version) => match version { +// 4 => Ok(IpVersion::V4), +// 6 => Ok(IpVersion::V6), +// 46 => Ok(IpVersion::V46), +// _ => Err(D::Error::custom("version must be one of 4, 6 or 46")), +// }, +// } +// } +// } +// impl Serialize for IpVersion { +// fn serialize(&self, serializer: S) -> Result +// where +// S: serde::Serializer, +// { +// serializer.serialize_u8(match self { +// IpVersion::V4 => 4, +// IpVersion::V6 => 6, +// IpVersion::V46 => 46, +// }) +// } +// } + +#[derive(Default, Serialize, Deserialize)] +pub enum AddDel { + #[default] + #[serde(alias = "add")] + Add, + #[serde(alias = "del")] + Del, } // FIXME block configs that have different set options for the same name @@ -56,6 +71,7 @@ pub struct ActionOptions { /// If `4`: creates an IPv4 set with this name /// If `6`: creates an IPv6 set with this name /// If `46`: creates an IPv4 set with its name suffixed by 'v4' AND an IPv6 set with its name suffixed by 'v6' + #[serde(default)] version: IpVersion, /// Chains where the IP set should be inserted. /// Defaults to `["INPUT", "FORWARD"]` @@ -69,7 +85,9 @@ pub struct ActionOptions { // Defaults to DROP, but can also be ACCEPT, RETURN or any user-defined chain #[serde(default = "serde_drop")] target: String, - // TODO add `add`//`remove` option + // Whether the action is to "add" or "del" the ip from the set + #[serde(default)] + action: AddDel, } fn serde_ip() -> String { @@ -93,6 +111,7 @@ pub struct Action { chains: Vec, timeout: Option, target: String, + action: AddDel, } impl Action { @@ -108,6 +127,7 @@ impl Action { rx, shutdown, ip_index, + action: options.action, target: options.target, chains: options.chains, timeout: if let Some(timeout) = options.timeout { @@ -144,7 +164,6 @@ impl Action { (&self.ipv6_set, Version::IPv6), ] { if let Some(set) = set { - println!("INFO creating {version} set {set}"); // create set self.ipset .order(Order::CreateSet(SetOptions { @@ -155,7 +174,6 @@ impl Action { .await?; // insert set in chains for chain in &self.chains { - println!("INFO inserting {version} set {set} in chain {chain}"); self.ipset .order(Order::InsertSet(SetChain { set: set.clone(), @@ -176,7 +194,6 @@ impl Action { ] { if let Some(set) = set { for chain in &self.chains { - println!("INFO removing {version} set {set} from chain {chain}"); if let Err(err) = self .ipset .order(Order::RemoveSet(SetChain { @@ -191,7 +208,6 @@ impl Action { ); } } - println!("INFO destroying {version} set {set}"); if let Err(err) = self.ipset.order(Order::DestroySet(set.clone())).await { println!("ERROR while destroying {version} set {set}: {err}"); } @@ -251,8 +267,13 @@ impl Action { } } }; - // add ip to set - self.ipset.order(Order::Insert(set.clone(), ip)).await?; + // add/remove ip to set + self.ipset + .order(match self.action { + AddDel::Add => Order::Add(set.clone(), ip), + AddDel::Del => Order::Del(set.clone(), ip), + }) + .await?; Ok(()) } } diff --git a/plugins/reaction-plugin-ipset/src/ipset.rs b/plugins/reaction-plugin-ipset/src/ipset.rs index 263addb..3c4ded9 100644 --- a/plugins/reaction-plugin-ipset/src/ipset.rs +++ b/plugins/reaction-plugin-ipset/src/ipset.rs @@ -1,4 +1,10 @@ -use std::{collections::BTreeMap, fmt::Display, net::Ipv4Addr, process::Command, thread}; +use std::{ + collections::{BTreeMap, BTreeSet}, + fmt::Display, + net::Ipv4Addr, + process::Command, + thread, +}; use ipset::{ Session, @@ -6,7 +12,7 @@ use ipset::{ }; use tokio::sync::{mpsc, oneshot}; -#[derive(PartialEq, Eq, Copy, Clone)] +#[derive(PartialEq, Eq, PartialOrd, Ord, Copy, Clone)] pub enum Version { IPv4, IPv6, @@ -20,25 +26,28 @@ impl Display for Version { } } +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)] pub struct SetOptions { pub name: String, pub version: Version, pub timeout: Option, } +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)] pub struct SetChain { pub set: String, pub chain: String, pub target: String, } +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)] pub enum Order { CreateSet(SetOptions), DestroySet(String), InsertSet(SetChain), RemoveSet(SetChain), - Insert(String, String), - Remove(String, String), + Add(String, String), + Del(String, String), } #[derive(Clone)] @@ -101,7 +110,14 @@ struct Set { #[derive(Default)] struct IPsetManager { + // IPset sessions sessions: BTreeMap, + // All set-wise commands already run, to ignore duplicates. + // Duplicates are the natural cause of multiple actions + // (eg. add action and del action) manipulating the same sets. + // + // It's fine because no order should be run twice during the runtime of the plugin. + journal: BTreeSet, } impl IPsetManager { @@ -118,12 +134,24 @@ impl IPsetManager { } fn handle_order(&mut self, order: Order) -> Result<(), String> { + // We only journal set-wise orders + // We skip the order if already run. + match order { + Order::Add(_, _) | Order::Del(_, _) => (), + _ => { + if !self.journal.insert(order.clone()) { + return Ok(()); + } + } + }; + match order { Order::CreateSet(SetOptions { name, version, timeout, }) => { + println!("INFO creating {version} set {name}"); let mut session: Session = Session::new(name.clone()); session .create(|builder| { @@ -140,6 +168,7 @@ impl IPsetManager { } Order::DestroySet(set) => { if let Some(mut session) = self.sessions.remove(&set) { + println!("INFO destroying {} set {set}", session.version); session .session .destroy() @@ -147,20 +176,19 @@ impl IPsetManager { } } - Order::InsertSet(options) => insert_remove_set(options, true)?, - Order::RemoveSet(options) => insert_remove_set(options, false)?, + Order::InsertSet(options) => self.insert_remove_set(options, true)?, + Order::RemoveSet(options) => self.insert_remove_set(options, false)?, - Order::Insert(set, ip) => self.insert_remove_ip(set, ip, true)?, - Order::Remove(set, ip) => self.insert_remove_ip(set, ip, false)?, + Order::Add(set, ip) => self.insert_remove_ip(set, ip, true)?, + Order::Del(set, ip) => self.insert_remove_ip(set, ip, false)?, }; Ok(()) } fn insert_remove_ip(&mut self, set: String, ip: String, insert: bool) -> Result<(), String> { - let session = self - .sessions - .get_mut(&set) - .ok_or(format!("No set managed by us with this name: {set}"))?; + let session = self.sessions.get_mut(&set).ok_or(format!( + "No set handled by us with this name: {set}. This likely is a bug." + ))?; let mut net_data = NetDataType::new(Ipv4Addr::LOCALHOST, 0); net_data @@ -184,12 +212,19 @@ impl IPsetManager { target: action, } = options; - let command = match self + let version = self .sessions .get(&set) .ok_or(format!("No set managed by us with this name: {set}"))? - .version - { + .version; + + if insert { + println!("INFO inserting {version} set {set} in chain {chain}"); + } else { + println!("INFO removing {version} set {set} from chain {chain}"); + } + + let command = match version { Version::IPv4 => "iptables", Version::IPv6 => "ip6tables", }; diff --git a/tests/test-conf/test-ipset.jsonnet b/tests/test-conf/test-ipset.jsonnet index e749324..d8769aa 100644 --- a/tests/test-conf/test-ipset.jsonnet +++ b/tests/test-conf/test-ipset.jsonnet @@ -29,41 +29,20 @@ type: 'ipset', options: { set: 'reactiontest', - pattern: 'ip', - version: 46, - chains: ['INPUT', 'FORWARD'], + // pattern: 'ip', + // version: 46, + // chains: ['INPUT', 'FORWARD'], + // target: 'DROP', + // action: 'add', }, }, - b0: { - type: 'cluster_send', - options: { - send: 'NODE b0 ', - to: 's1', - }, - after: '1s', - }, - }, - }, - }, - }, - s1: { - type: 'cluster', - options: { - listen_port: 1234, - bind_ipv4: '127.0.0.1', - bind_ipv6: null, - message_timeout: '30s', - nodes: [{ - public_key: 'PUBLIC_KEY', - addresses: ['127.0.0.1:4321'], - }], - }, - filters: { - f1: { - regex: ['^$'], - actions: { a1: { - cmd: ['sh', '-c', 'echo >>./log'], + after: '10s', + type: 'ipset', + options: { + set: 'reactiontest', + action: 'del', + }, }, }, },