From c41c89101dddfc8c7fdcbbfea16d456ac5051f00 Mon Sep 17 00:00:00 2001 From: ppom Date: Thu, 26 Feb 2026 12:00:00 +0100 Subject: [PATCH] Fix #151: Move RegexSet creation from StreamManager to config Stream This move the potential error of a too big regex set to the config setup, a place where it can be gracefully handled, instead of the place it was, where this would make reaction mess up with start/stop, etc. --- src/concepts/stream.rs | 21 +++++++++++++++++++++ src/daemon/stream.rs | 32 ++++++++++++++++---------------- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/src/concepts/stream.rs b/src/concepts/stream.rs index 3b3fdf5..97e6ece 100644 --- a/src/concepts/stream.rs +++ b/src/concepts/stream.rs @@ -1,6 +1,7 @@ use std::{cmp::Ordering, collections::BTreeMap, hash::Hash}; use reaction_plugin::StreamConfig; +use regex::RegexSet; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -19,6 +20,11 @@ pub struct Stream { #[serde(skip)] pub name: String, + #[serde(skip)] + pub compiled_regex_set: RegexSet, + #[serde(skip)] + pub regex_index_to_filter_name: Vec, + // Plugin-specific #[serde(default, rename = "type", skip_serializing_if = "Option::is_none")] pub stream_type: Option, @@ -90,6 +96,21 @@ impl Stream { filter.setup(name, key, patterns)?; } + let all_regexes: BTreeMap<_, _> = self + .filters + .values() + .flat_map(|filter| { + filter + .regex + .iter() + .map(|regex| (regex, filter.name.clone())) + }) + .collect(); + + self.compiled_regex_set = RegexSet::new(all_regexes.keys()) + .map_err(|err| format!("too much regexes on the filters of this stream: {err}"))?; + self.regex_index_to_filter_name = all_regexes.into_values().collect(); + Ok(()) } diff --git a/src/daemon/stream.rs b/src/daemon/stream.rs index 7d50b54..3ed0d56 100644 --- a/src/daemon/stream.rs +++ b/src/daemon/stream.rs @@ -1,11 +1,10 @@ use std::{ - collections::{BTreeMap, BTreeSet, HashMap}, + collections::{BTreeSet, HashMap}, process::Stdio, }; use futures::{FutureExt, Stream as AsyncStream, StreamExt, future::join_all}; use reaction_plugin::{StreamImpl, shutdown::ShutdownToken}; -use regex::RegexSet; use tokio::{ io::{AsyncBufReadExt, BufReader}, process::{Child, ChildStderr, ChildStdout, Command}, @@ -45,7 +44,6 @@ pub fn reader_to_stream( } pub struct StreamManager { - compiled_regex_set: RegexSet, regex_index_to_filter_manager: Vec, stream: &'static Stream, stream_plugin: Option, @@ -59,16 +57,6 @@ impl StreamManager { shutdown: ShutdownToken, plugins: &mut Plugins, ) -> Result { - let all_regexes: BTreeMap<_, _> = filter_managers - .iter() - .flat_map(|(filter, filter_manager)| { - filter - .regex - .iter() - .map(|regex| (regex, filter_manager.clone())) - }) - .collect(); - let stream_plugin = if stream.is_plugin() { Some( plugins @@ -84,11 +72,23 @@ impl StreamManager { None }; + let regex_index_to_filter_manager = stream + .regex_index_to_filter_name + .iter() + .map(|filter_name| { + filter_managers + .iter() + .find(|(filter, _)| filter_name == &filter.name) + .unwrap() + .1 + .clone() + }) + .collect(); + debug!("successfully initialized stream {}", stream.name); Ok(StreamManager { - compiled_regex_set: RegexSet::new(all_regexes.keys()).map_err(|err| err.to_string())?, - regex_index_to_filter_manager: all_regexes.into_values().collect(), + regex_index_to_filter_manager, stream, stream_plugin, shutdown, @@ -230,7 +230,7 @@ impl StreamManager { } fn matching_filters(&self, line: &str) -> BTreeSet<&FilterManager> { - let matches = self.compiled_regex_set.matches(line); + let matches = self.stream.compiled_regex_set.matches(line); matches .into_iter() .map(|match_| &self.regex_index_to_filter_manager[match_])