ipset: Add the add/del option, journal orders & deduplicate them

This commit is contained in:
ppom 2026-01-29 12:00:00 +01:00
commit 2f7c32aa8c
No known key found for this signature in database
3 changed files with 116 additions and 81 deletions

View file

@ -2,41 +2,56 @@ use std::u32;
use reaction_plugin::{Exec, shutdown::ShutdownToken, time::parse_duration};
use remoc::rch::mpsc as remocMpsc;
use serde::{Deserialize, Serialize, de::Deserializer, de::Error};
use serde::{Deserialize, Serialize};
use crate::ipset::{IpSet, Order, SetChain, SetOptions, Version};
#[derive(Default, Serialize, Deserialize)]
pub enum IpVersion {
#[serde(alias = "4")]
V4,
#[serde(alias = "6")]
V6,
#[serde(alias = "46")]
#[default]
V46,
}
impl<'de> Deserialize<'de> for IpVersion {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
match Option::<u8>::deserialize(deserializer)? {
None => Ok(IpVersion::V46),
Some(version) => match version {
4 => Ok(IpVersion::V4),
6 => Ok(IpVersion::V6),
46 => Ok(IpVersion::V46),
_ => Err(D::Error::custom("version must be one of 4, 6 or 46")),
},
}
}
}
impl Serialize for IpVersion {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_u8(match self {
IpVersion::V4 => 4,
IpVersion::V6 => 6,
IpVersion::V46 => 46,
})
}
// impl<'de> Deserialize<'de> for IpVersion {
// fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
// where
// D: Deserializer<'de>,
// {
// match Option::<u8>::deserialize(deserializer)? {
// None => Ok(IpVersion::V46),
// Some(version) => match version {
// 4 => Ok(IpVersion::V4),
// 6 => Ok(IpVersion::V6),
// 46 => Ok(IpVersion::V46),
// _ => Err(D::Error::custom("version must be one of 4, 6 or 46")),
// },
// }
// }
// }
// impl Serialize for IpVersion {
// fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
// where
// S: serde::Serializer,
// {
// serializer.serialize_u8(match self {
// IpVersion::V4 => 4,
// IpVersion::V6 => 6,
// IpVersion::V46 => 46,
// })
// }
// }
#[derive(Default, Serialize, Deserialize)]
pub enum AddDel {
#[default]
#[serde(alias = "add")]
Add,
#[serde(alias = "del")]
Del,
}
// FIXME block configs that have different set options for the same name
@ -56,6 +71,7 @@ pub struct ActionOptions {
/// If `4`: creates an IPv4 set with this name
/// If `6`: creates an IPv6 set with this name
/// If `46`: creates an IPv4 set with its name suffixed by 'v4' AND an IPv6 set with its name suffixed by 'v6'
#[serde(default)]
version: IpVersion,
/// Chains where the IP set should be inserted.
/// Defaults to `["INPUT", "FORWARD"]`
@ -69,7 +85,9 @@ pub struct ActionOptions {
// Defaults to DROP, but can also be ACCEPT, RETURN or any user-defined chain
#[serde(default = "serde_drop")]
target: String,
// TODO add `add`//`remove` option
// Whether the action is to "add" or "del" the ip from the set
#[serde(default)]
action: AddDel,
}
fn serde_ip() -> String {
@ -93,6 +111,7 @@ pub struct Action {
chains: Vec<String>,
timeout: Option<u32>,
target: String,
action: AddDel,
}
impl Action {
@ -108,6 +127,7 @@ impl Action {
rx,
shutdown,
ip_index,
action: options.action,
target: options.target,
chains: options.chains,
timeout: if let Some(timeout) = options.timeout {
@ -144,7 +164,6 @@ impl Action {
(&self.ipv6_set, Version::IPv6),
] {
if let Some(set) = set {
println!("INFO creating {version} set {set}");
// create set
self.ipset
.order(Order::CreateSet(SetOptions {
@ -155,7 +174,6 @@ impl Action {
.await?;
// insert set in chains
for chain in &self.chains {
println!("INFO inserting {version} set {set} in chain {chain}");
self.ipset
.order(Order::InsertSet(SetChain {
set: set.clone(),
@ -176,7 +194,6 @@ impl Action {
] {
if let Some(set) = set {
for chain in &self.chains {
println!("INFO removing {version} set {set} from chain {chain}");
if let Err(err) = self
.ipset
.order(Order::RemoveSet(SetChain {
@ -191,7 +208,6 @@ impl Action {
);
}
}
println!("INFO destroying {version} set {set}");
if let Err(err) = self.ipset.order(Order::DestroySet(set.clone())).await {
println!("ERROR while destroying {version} set {set}: {err}");
}
@ -251,8 +267,13 @@ impl Action {
}
}
};
// add ip to set
self.ipset.order(Order::Insert(set.clone(), ip)).await?;
// add/remove ip to set
self.ipset
.order(match self.action {
AddDel::Add => Order::Add(set.clone(), ip),
AddDel::Del => Order::Del(set.clone(), ip),
})
.await?;
Ok(())
}
}

View file

@ -1,4 +1,10 @@
use std::{collections::BTreeMap, fmt::Display, net::Ipv4Addr, process::Command, thread};
use std::{
collections::{BTreeMap, BTreeSet},
fmt::Display,
net::Ipv4Addr,
process::Command,
thread,
};
use ipset::{
Session,
@ -6,7 +12,7 @@ use ipset::{
};
use tokio::sync::{mpsc, oneshot};
#[derive(PartialEq, Eq, Copy, Clone)]
#[derive(PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
pub enum Version {
IPv4,
IPv6,
@ -20,25 +26,28 @@ impl Display for Version {
}
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct SetOptions {
pub name: String,
pub version: Version,
pub timeout: Option<u32>,
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct SetChain {
pub set: String,
pub chain: String,
pub target: String,
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)]
pub enum Order {
CreateSet(SetOptions),
DestroySet(String),
InsertSet(SetChain),
RemoveSet(SetChain),
Insert(String, String),
Remove(String, String),
Add(String, String),
Del(String, String),
}
#[derive(Clone)]
@ -101,7 +110,14 @@ struct Set {
#[derive(Default)]
struct IPsetManager {
// IPset sessions
sessions: BTreeMap<String, Set>,
// All set-wise commands already run, to ignore duplicates.
// Duplicates are the natural cause of multiple actions
// (eg. add action and del action) manipulating the same sets.
//
// It's fine because no order should be run twice during the runtime of the plugin.
journal: BTreeSet<Order>,
}
impl IPsetManager {
@ -118,12 +134,24 @@ impl IPsetManager {
}
fn handle_order(&mut self, order: Order) -> Result<(), String> {
// We only journal set-wise orders
// We skip the order if already run.
match order {
Order::Add(_, _) | Order::Del(_, _) => (),
_ => {
if !self.journal.insert(order.clone()) {
return Ok(());
}
}
};
match order {
Order::CreateSet(SetOptions {
name,
version,
timeout,
}) => {
println!("INFO creating {version} set {name}");
let mut session: Session<HashNet> = Session::new(name.clone());
session
.create(|builder| {
@ -140,6 +168,7 @@ impl IPsetManager {
}
Order::DestroySet(set) => {
if let Some(mut session) = self.sessions.remove(&set) {
println!("INFO destroying {} set {set}", session.version);
session
.session
.destroy()
@ -147,20 +176,19 @@ impl IPsetManager {
}
}
Order::InsertSet(options) => insert_remove_set(options, true)?,
Order::RemoveSet(options) => insert_remove_set(options, false)?,
Order::InsertSet(options) => self.insert_remove_set(options, true)?,
Order::RemoveSet(options) => self.insert_remove_set(options, false)?,
Order::Insert(set, ip) => self.insert_remove_ip(set, ip, true)?,
Order::Remove(set, ip) => self.insert_remove_ip(set, ip, false)?,
Order::Add(set, ip) => self.insert_remove_ip(set, ip, true)?,
Order::Del(set, ip) => self.insert_remove_ip(set, ip, false)?,
};
Ok(())
}
fn insert_remove_ip(&mut self, set: String, ip: String, insert: bool) -> Result<(), String> {
let session = self
.sessions
.get_mut(&set)
.ok_or(format!("No set managed by us with this name: {set}"))?;
let session = self.sessions.get_mut(&set).ok_or(format!(
"No set handled by us with this name: {set}. This likely is a bug."
))?;
let mut net_data = NetDataType::new(Ipv4Addr::LOCALHOST, 0);
net_data
@ -184,12 +212,19 @@ impl IPsetManager {
target: action,
} = options;
let command = match self
let version = self
.sessions
.get(&set)
.ok_or(format!("No set managed by us with this name: {set}"))?
.version
{
.version;
if insert {
println!("INFO inserting {version} set {set} in chain {chain}");
} else {
println!("INFO removing {version} set {set} from chain {chain}");
}
let command = match version {
Version::IPv4 => "iptables",
Version::IPv6 => "ip6tables",
};

View file

@ -29,41 +29,20 @@
type: 'ipset',
options: {
set: 'reactiontest',
pattern: 'ip',
version: 46,
chains: ['INPUT', 'FORWARD'],
// pattern: 'ip',
// version: 46,
// chains: ['INPUT', 'FORWARD'],
// target: 'DROP',
// action: 'add',
},
},
b0: {
type: 'cluster_send',
options: {
send: 'NODE b0 <num>',
to: 's1',
},
after: '1s',
},
},
},
},
},
s1: {
type: 'cluster',
options: {
listen_port: 1234,
bind_ipv4: '127.0.0.1',
bind_ipv6: null,
message_timeout: '30s',
nodes: [{
public_key: 'PUBLIC_KEY',
addresses: ['127.0.0.1:4321'],
}],
},
filters: {
f1: {
regex: ['^<all>$'],
actions: {
a1: {
cmd: ['sh', '-c', 'echo <all> >>./log'],
after: '10s',
type: 'ipset',
options: {
set: 'reactiontest',
action: 'del',
},
},
},
},