ipset: adapt to plugin interface change

This commit is contained in:
ppom 2026-02-09 12:00:00 +01:00
commit 16dc41f825
No known key found for this signature in database
2 changed files with 159 additions and 131 deletions

View file

@ -1,7 +1,8 @@
use std::collections::{BTreeMap, BTreeSet};
use reaction_plugin::{
ActionImpl, Hello, Manifest, PluginInfo, RemoteError, RemoteResult, StreamImpl, Value,
ActionConfig, ActionImpl, Hello, Manifest, PluginInfo, RemoteError, RemoteResult, StreamConfig,
StreamImpl,
shutdown::{ShutdownController, ShutdownToken},
};
use remoc::rtc;
@ -26,7 +27,6 @@ async fn main() {
#[derive(Default)]
struct Plugin {
ipset: IpSet,
set_options: BTreeMap<String, SetOptions>,
sets: Vec<Set>,
actions: Vec<Action>,
shutdown: ShutdownController,
@ -41,64 +41,69 @@ impl PluginInfo for Plugin {
})
}
async fn stream_impl(
async fn load_config(
&mut self,
_stream_name: String,
_stream_type: String,
_config: Value,
) -> RemoteResult<StreamImpl> {
Err("This plugin can't handle any stream type".into())
}
async fn action_impl(
&mut self,
stream_name: String,
filter_name: String,
action_name: String,
action_type: String,
config: Value,
patterns: Vec<String>,
) -> RemoteResult<ActionImpl> {
if &action_type != "ipset" {
return Err("This plugin can't handle other action types than ipset".into());
streams: Vec<StreamConfig>,
actions: Vec<ActionConfig>,
) -> RemoteResult<(Vec<StreamImpl>, Vec<ActionImpl>)> {
if !streams.is_empty() {
return Err("This plugin can't handle any stream type".into());
}
let mut options: ActionOptions = serde_json::from_value(config.into()).map_err(|err| {
let mut ret_actions = Vec::with_capacity(actions.len());
let mut set_options: BTreeMap<String, SetOptions> = BTreeMap::new();
for ActionConfig {
stream_name,
filter_name,
action_name,
action_type,
config,
patterns,
} in actions
{
if &action_type != "ipset" {
return Err("This plugin can't handle other action types than ipset".into());
}
let mut options: ActionOptions = serde_json::from_value(config.into()).map_err(|err| {
format!("invalid options for action {stream_name}.{filter_name}.{action_name}: {err}")
})?;
options.set_ip_index(patterns).map_err(|_|
options.set_ip_index(patterns).map_err(|_|
format!(
"No pattern with name {} in filter {stream_name}.{filter_name}. Try setting the option `pattern` to your pattern name of type 'ip'",
&options.pattern
)
)?;
// Merge option
self.set_options
.entry(options.set.clone())
.or_default()
.merge(&options.set_options)
.map_err(|err| format!("ipset {}: {err}", options.set))?;
// Merge option
set_options
.entry(options.set.clone())
.or_default()
.merge(&options.set_options)
.map_err(|err| format!("ipset {}: {err}", options.set))?;
let (tx, rx) = remoc::rch::mpsc::channel(1);
self.actions.push(Action::new(
self.ipset.clone(),
self.shutdown.token(),
rx,
options,
)?);
let (tx, rx) = remoc::rch::mpsc::channel(1);
self.actions.push(Action::new(
self.ipset.clone(),
self.shutdown.token(),
rx,
options,
)?);
Ok(ActionImpl { tx })
}
ret_actions.push(ActionImpl { tx });
}
async fn finish_setup(&mut self) -> RemoteResult<()> {
// Init all sets
while let Some((name, options)) = self.set_options.pop_first() {
while let Some((name, options)) = set_options.pop_first() {
self.sets.push(Set::from(name, options));
}
self.set_options = Default::default();
Ok((vec![], ret_actions))
}
async fn start(&mut self) -> RemoteResult<()> {
let mut first_error = None;
for (i, set) in self.sets.iter().enumerate() {
// Retain if error
@ -138,8 +143,6 @@ impl PluginInfo for Plugin {
}
}
impl Plugin {}
async fn destroy_sets_at_shutdown(mut ipset: IpSet, sets: Vec<Set>, shutdown: ShutdownToken) {
shutdown.wait().await;
for set in sets {

View file

@ -1,4 +1,4 @@
use reaction_plugin::{PluginInfo, Value};
use reaction_plugin::{ActionConfig, PluginInfo, StreamConfig, Value};
use serde_json::json;
use crate::Plugin;
@ -8,10 +8,20 @@ async fn conf_stream() {
// No stream is supported by ipset
assert!(
Plugin::default()
.stream_impl("stream".into(), "ipset".into(), Value::Null)
.load_config(
vec![StreamConfig {
stream_name: "stream".into(),
stream_type: "ipset".into(),
config: Value::Null
}],
vec![]
)
.await
.is_err()
);
// Nothing is ok
assert!(Plugin::default().load_config(vec![], vec![]).await.is_ok());
}
#[tokio::test]
@ -106,13 +116,16 @@ async fn conf_action_standalone() {
(false, json!({ "set": "test", "target": ["DROP"] }), &p),
] {
let res = Plugin::default()
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"ipset".into(),
conf.clone().into(),
patterns.clone(),
.load_config(
vec![],
vec![ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action".into(),
action_type: "ipset".into(),
config: conf.clone().into(),
patterns: patterns.clone(),
}],
)
.await;
@ -131,93 +144,105 @@ async fn conf_action_standalone() {
async fn conf_action_merge() {
let mut plugin = Plugin::default();
// First set is ok
let res = plugin
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"ipset".into(),
json!({
"set": "test",
"target": "DROP",
"chains": ["INPUT"],
"action": "add",
})
.into(),
vec!["ip".into()],
)
.await;
assert!(res.is_ok(), "res: {:?}", res.map(|_| ()));
let set1 = ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action1".into(),
action_type: "ipset".into(),
config: json!({
"set": "test",
"target": "DROP",
"chains": ["INPUT"],
"action": "add",
})
.into(),
patterns: vec!["ip".into()],
};
// Another set without conflict is ok
let res = plugin
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"ipset".into(),
json!({
"set": "test",
"target": "DROP",
"version": "46",
"action": "add",
})
.into(),
vec!["ip".into()],
)
.await;
assert!(res.is_ok(), "res: {:?}", res.map(|_| ()));
let set2 = ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action2".into(),
action_type: "ipset".into(),
config: json!({
"set": "test",
"target": "DROP",
"version": "46",
"action": "add",
})
.into(),
patterns: vec!["ip".into()],
};
// Another set without conflict is ok
let res = plugin
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"ipset".into(),
json!({
"set": "test",
"action": "del",
})
.into(),
vec!["ip".into()],
)
.await;
assert!(res.is_ok(), "res: {:?}", res.map(|_| ()));
let set3 = ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action2".into(),
action_type: "ipset".into(),
config: json!({
"set": "test",
"action": "del",
})
.into(),
patterns: vec!["ip".into()],
};
// Unrelated set is ok
let res = plugin
.action_impl(
"stream".into(),
"filter".into(),
"action2".into(),
"ipset".into(),
json!({
"set": "test1",
"target": "target1",
"version": "6",
})
.into(),
vec!["ip".into()],
.load_config(
vec![],
vec![
// First set
set1.clone(),
// Same set, adding options, no conflict
set2.clone(),
// Same set, no new options, no conflict
set3.clone(),
// Unrelated set, so no conflict
ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action3".into(),
action_type: "ipset".into(),
config: json!({
"set": "test2",
"target": "target1",
"version": "6",
})
.into(),
patterns: vec!["ip".into()],
},
],
)
.await;
assert!(res.is_ok(), "res: {:?}", res.map(|_| ()));
// Another set with conflict is not ok
let res = plugin
.action_impl(
"stream".into(),
"filter".into(),
"action".into(),
"ipset".into(),
json!({
"set": "test",
"target": "target2",
"action": "del",
})
.into(),
vec!["ip".into()],
.load_config(
vec![],
vec![
// First set
set1,
// Same set, adding options, no conflict
set2,
// Same set, no new options, no conflict
set3,
// Another set with conflict
ActionConfig {
stream_name: "stream".into(),
filter_name: "filter".into(),
action_name: "action3".into(),
action_type: "ipset".into(),
config: json!({
"set": "test",
"target": "target2",
"action": "del",
})
.into(),
patterns: vec!["ip".into()],
},
],
)
.await;
assert!(res.is_err(), "res: {:?}", res.map(|_| ()));