From b0dc3c56adfb8b5e1e4ba2aa04f8e20f90cc46ae Mon Sep 17 00:00:00 2001 From: ppom Date: Mon, 9 Feb 2026 12:00:00 +0100 Subject: [PATCH] ipset: adapt to plugin interface change --- plugins/reaction-plugin-ipset/src/main.rs | 89 +++++----- plugins/reaction-plugin-ipset/src/tests.rs | 197 ++++++++++++--------- 2 files changed, 157 insertions(+), 129 deletions(-) diff --git a/plugins/reaction-plugin-ipset/src/main.rs b/plugins/reaction-plugin-ipset/src/main.rs index 9982b94..268eb95 100644 --- a/plugins/reaction-plugin-ipset/src/main.rs +++ b/plugins/reaction-plugin-ipset/src/main.rs @@ -1,7 +1,8 @@ use std::collections::{BTreeMap, BTreeSet}; use reaction_plugin::{ - ActionImpl, Hello, Manifest, PluginInfo, RemoteError, RemoteResult, StreamImpl, Value, + ActionConfig, ActionImpl, Hello, Manifest, PluginInfo, RemoteError, RemoteResult, StreamConfig, + StreamImpl, shutdown::{ShutdownController, ShutdownToken}, }; use remoc::rtc; @@ -26,7 +27,6 @@ async fn main() { #[derive(Default)] struct Plugin { ipset: IpSet, - set_options: BTreeMap, sets: Vec, actions: Vec, shutdown: ShutdownController, @@ -41,64 +41,69 @@ impl PluginInfo for Plugin { }) } - async fn stream_impl( + async fn load_config( &mut self, - _stream_name: String, - _stream_type: String, - _config: Value, - ) -> RemoteResult { - Err("This plugin can't handle any stream type".into()) - } - - async fn action_impl( - &mut self, - stream_name: String, - filter_name: String, - action_name: String, - action_type: String, - config: Value, - patterns: Vec, - ) -> RemoteResult { - if &action_type != "ipset" { - return Err("This plugin can't handle other action types than ipset".into()); + streams: Vec, + actions: Vec, + ) -> RemoteResult<(Vec, Vec)> { + if !streams.is_empty() { + return Err("This plugin can't handle any stream type".into()); } - let mut options: ActionOptions = serde_json::from_value(config.into()).map_err(|err| { + let mut ret_actions = Vec::with_capacity(actions.len()); + let mut set_options: BTreeMap = BTreeMap::new(); + + for ActionConfig { + stream_name, + filter_name, + action_name, + action_type, + config, + patterns, + } in actions + { + if &action_type != "ipset" { + return Err("This plugin can't handle other action types than ipset".into()); + } + + let mut options: ActionOptions = serde_json::from_value(config.into()).map_err(|err| { format!("invalid options for action {stream_name}.{filter_name}.{action_name}: {err}") })?; - options.set_ip_index(patterns).map_err(|_| + options.set_ip_index(patterns).map_err(|_| format!( "No pattern with name {} in filter {stream_name}.{filter_name}. Try setting the option `pattern` to your pattern name of type 'ip'", &options.pattern ) )?; - // Merge option - self.set_options - .entry(options.set.clone()) - .or_default() - .merge(&options.set_options) - .map_err(|err| format!("ipset {}: {err}", options.set))?; + // Merge option + set_options + .entry(options.set.clone()) + .or_default() + .merge(&options.set_options) + .map_err(|err| format!("ipset {}: {err}", options.set))?; - let (tx, rx) = remoc::rch::mpsc::channel(1); - self.actions.push(Action::new( - self.ipset.clone(), - self.shutdown.token(), - rx, - options, - )?); + let (tx, rx) = remoc::rch::mpsc::channel(1); + self.actions.push(Action::new( + self.ipset.clone(), + self.shutdown.token(), + rx, + options, + )?); - Ok(ActionImpl { tx }) - } + ret_actions.push(ActionImpl { tx }); + } - async fn finish_setup(&mut self) -> RemoteResult<()> { // Init all sets - while let Some((name, options)) = self.set_options.pop_first() { + while let Some((name, options)) = set_options.pop_first() { self.sets.push(Set::from(name, options)); } - self.set_options = Default::default(); + Ok((vec![], ret_actions)) + } + + async fn start(&mut self) -> RemoteResult<()> { let mut first_error = None; for (i, set) in self.sets.iter().enumerate() { // Retain if error @@ -138,8 +143,6 @@ impl PluginInfo for Plugin { } } -impl Plugin {} - async fn destroy_sets_at_shutdown(mut ipset: IpSet, sets: Vec, shutdown: ShutdownToken) { shutdown.wait().await; for set in sets { diff --git a/plugins/reaction-plugin-ipset/src/tests.rs b/plugins/reaction-plugin-ipset/src/tests.rs index 397df39..f1cef9e 100644 --- a/plugins/reaction-plugin-ipset/src/tests.rs +++ b/plugins/reaction-plugin-ipset/src/tests.rs @@ -1,4 +1,4 @@ -use reaction_plugin::{PluginInfo, Value}; +use reaction_plugin::{ActionConfig, PluginInfo, StreamConfig, Value}; use serde_json::json; use crate::Plugin; @@ -8,10 +8,20 @@ async fn conf_stream() { // No stream is supported by ipset assert!( Plugin::default() - .stream_impl("stream".into(), "ipset".into(), Value::Null) + .load_config( + vec![StreamConfig { + stream_name: "stream".into(), + stream_type: "ipset".into(), + config: Value::Null + }], + vec![] + ) .await .is_err() ); + + // Nothing is ok + assert!(Plugin::default().load_config(vec![], vec![]).await.is_ok()); } #[tokio::test] @@ -106,13 +116,16 @@ async fn conf_action_standalone() { (false, json!({ "set": "test", "target": ["DROP"] }), &p), ] { let res = Plugin::default() - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "ipset".into(), - conf.clone().into(), - patterns.clone(), + .load_config( + vec![], + vec![ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action".into(), + action_type: "ipset".into(), + config: conf.clone().into(), + patterns: patterns.clone(), + }], ) .await; @@ -131,93 +144,105 @@ async fn conf_action_standalone() { async fn conf_action_merge() { let mut plugin = Plugin::default(); - // First set is ok - let res = plugin - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "ipset".into(), - json!({ - "set": "test", - "target": "DROP", - "chains": ["INPUT"], - "action": "add", - }) - .into(), - vec!["ip".into()], - ) - .await; - assert!(res.is_ok(), "res: {:?}", res.map(|_| ())); + let set1 = ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action1".into(), + action_type: "ipset".into(), + config: json!({ + "set": "test", + "target": "DROP", + "chains": ["INPUT"], + "action": "add", + }) + .into(), + patterns: vec!["ip".into()], + }; - // Another set without conflict is ok - let res = plugin - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "ipset".into(), - json!({ - "set": "test", - "target": "DROP", - "version": "46", - "action": "add", - }) - .into(), - vec!["ip".into()], - ) - .await; - assert!(res.is_ok(), "res: {:?}", res.map(|_| ())); + let set2 = ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action2".into(), + action_type: "ipset".into(), + config: json!({ + "set": "test", + "target": "DROP", + "version": "46", + "action": "add", + }) + .into(), + patterns: vec!["ip".into()], + }; - // Another set without conflict is ok - let res = plugin - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "ipset".into(), - json!({ - "set": "test", - "action": "del", - }) - .into(), - vec!["ip".into()], - ) - .await; - assert!(res.is_ok(), "res: {:?}", res.map(|_| ())); + let set3 = ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action2".into(), + action_type: "ipset".into(), + config: json!({ + "set": "test", + "action": "del", + }) + .into(), + patterns: vec!["ip".into()], + }; - // Unrelated set is ok let res = plugin - .action_impl( - "stream".into(), - "filter".into(), - "action2".into(), - "ipset".into(), - json!({ - "set": "test1", - "target": "target1", - "version": "6", - }) - .into(), - vec!["ip".into()], + .load_config( + vec![], + vec![ + // First set + set1.clone(), + // Same set, adding options, no conflict + set2.clone(), + // Same set, no new options, no conflict + set3.clone(), + // Unrelated set, so no conflict + ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action3".into(), + action_type: "ipset".into(), + config: json!({ + "set": "test2", + "target": "target1", + "version": "6", + }) + .into(), + patterns: vec!["ip".into()], + }, + ], ) .await; + assert!(res.is_ok(), "res: {:?}", res.map(|_| ())); // Another set with conflict is not ok let res = plugin - .action_impl( - "stream".into(), - "filter".into(), - "action".into(), - "ipset".into(), - json!({ - "set": "test", - "target": "target2", - "action": "del", - }) - .into(), - vec!["ip".into()], + .load_config( + vec![], + vec![ + // First set + set1, + // Same set, adding options, no conflict + set2, + // Same set, no new options, no conflict + set3, + // Another set with conflict + ActionConfig { + stream_name: "stream".into(), + filter_name: "filter".into(), + action_name: "action3".into(), + action_type: "ipset".into(), + config: json!({ + "set": "test", + "target": "target2", + "action": "del", + }) + .into(), + patterns: vec!["ip".into()], + }, + ], ) .await; assert!(res.is_err(), "res: {:?}", res.map(|_| ()));