Fix #151: Move RegexSet creation from StreamManager to config Stream

This move the potential error of a too big regex set to the config setup,
a place where it can be gracefully handled, instead of the place it was,
where this would make reaction mess up with start/stop, etc.
This commit is contained in:
ppom 2026-02-26 12:00:00 +01:00
commit c41c89101d
No known key found for this signature in database
2 changed files with 37 additions and 16 deletions

View file

@ -1,6 +1,7 @@
use std::{cmp::Ordering, collections::BTreeMap, hash::Hash};
use reaction_plugin::StreamConfig;
use regex::RegexSet;
use serde::{Deserialize, Serialize};
use serde_json::Value;
@ -19,6 +20,11 @@ pub struct Stream {
#[serde(skip)]
pub name: String,
#[serde(skip)]
pub compiled_regex_set: RegexSet,
#[serde(skip)]
pub regex_index_to_filter_name: Vec<String>,
// Plugin-specific
#[serde(default, rename = "type", skip_serializing_if = "Option::is_none")]
pub stream_type: Option<String>,
@ -90,6 +96,21 @@ impl Stream {
filter.setup(name, key, patterns)?;
}
let all_regexes: BTreeMap<_, _> = self
.filters
.values()
.flat_map(|filter| {
filter
.regex
.iter()
.map(|regex| (regex, filter.name.clone()))
})
.collect();
self.compiled_regex_set = RegexSet::new(all_regexes.keys())
.map_err(|err| format!("too much regexes on the filters of this stream: {err}"))?;
self.regex_index_to_filter_name = all_regexes.into_values().collect();
Ok(())
}

View file

@ -1,11 +1,10 @@
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
collections::{BTreeSet, HashMap},
process::Stdio,
};
use futures::{FutureExt, Stream as AsyncStream, StreamExt, future::join_all};
use reaction_plugin::{StreamImpl, shutdown::ShutdownToken};
use regex::RegexSet;
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{Child, ChildStderr, ChildStdout, Command},
@ -45,7 +44,6 @@ pub fn reader_to_stream(
}
pub struct StreamManager {
compiled_regex_set: RegexSet,
regex_index_to_filter_manager: Vec<FilterManager>,
stream: &'static Stream,
stream_plugin: Option<StreamImpl>,
@ -59,16 +57,6 @@ impl StreamManager {
shutdown: ShutdownToken,
plugins: &mut Plugins,
) -> Result<Self, String> {
let all_regexes: BTreeMap<_, _> = filter_managers
.iter()
.flat_map(|(filter, filter_manager)| {
filter
.regex
.iter()
.map(|regex| (regex, filter_manager.clone()))
})
.collect();
let stream_plugin = if stream.is_plugin() {
Some(
plugins
@ -84,11 +72,23 @@ impl StreamManager {
None
};
let regex_index_to_filter_manager = stream
.regex_index_to_filter_name
.iter()
.map(|filter_name| {
filter_managers
.iter()
.find(|(filter, _)| filter_name == &filter.name)
.unwrap()
.1
.clone()
})
.collect();
debug!("successfully initialized stream {}", stream.name);
Ok(StreamManager {
compiled_regex_set: RegexSet::new(all_regexes.keys()).map_err(|err| err.to_string())?,
regex_index_to_filter_manager: all_regexes.into_values().collect(),
regex_index_to_filter_manager,
stream,
stream_plugin,
shutdown,
@ -230,7 +230,7 @@ impl StreamManager {
}
fn matching_filters(&self, line: &str) -> BTreeSet<&FilterManager> {
let matches = self.compiled_regex_set.matches(line);
let matches = self.stream.compiled_regex_set.matches(line);
matches
.into_iter()
.map(|match_| &self.regex_index_to_filter_manager[match_])