mirror of
https://framagit.org/ppom/reaction
synced 2026-03-14 20:55:47 +01:00
Refacto: make all Config structures' fields public
Config is 'static after setup anyways. I don't need to hide all this, it's just cumbersome for tests.
This commit is contained in:
parent
6f63f49acd
commit
a0b804811b
13 changed files with 119 additions and 226 deletions
|
|
@ -19,7 +19,7 @@ pub fn test_regex(
|
|||
|
||||
// Code close to Filter::setup()
|
||||
let mut used_patterns: BTreeSet<Arc<Pattern>> = BTreeSet::new();
|
||||
for pattern in config.patterns().values() {
|
||||
for pattern in config.patterns.values() {
|
||||
if let Some(index) = regex.find(pattern.name_with_braces()) {
|
||||
// we already `find` it, so we must be able to `rfind` it
|
||||
#[allow(clippy::unwrap_used)]
|
||||
|
|
@ -43,7 +43,7 @@ pub fn test_regex(
|
|||
let mut result = Vec::new();
|
||||
if !used_patterns.is_empty() {
|
||||
for pattern in used_patterns.iter() {
|
||||
if let Some(match_) = matches.name(pattern.name()) {
|
||||
if let Some(match_) = matches.name(&pattern.name) {
|
||||
result.push(match_.as_str().to_string());
|
||||
if pattern.is_ignore(match_.as_str()) {
|
||||
ignored = true;
|
||||
|
|
|
|||
|
|
@ -11,36 +11,36 @@ use super::{Match, Pattern};
|
|||
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct Action {
|
||||
cmd: Vec<String>,
|
||||
pub cmd: Vec<String>,
|
||||
|
||||
// TODO one shot time deserialization
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
after: Option<String>,
|
||||
pub after: Option<String>,
|
||||
#[serde(skip)]
|
||||
after_duration: Option<TimeDelta>,
|
||||
pub after_duration: Option<TimeDelta>,
|
||||
|
||||
#[serde(
|
||||
rename = "onexit",
|
||||
default = "set_false",
|
||||
skip_serializing_if = "is_false"
|
||||
)]
|
||||
on_exit: bool,
|
||||
pub on_exit: bool,
|
||||
#[serde(default = "set_false", skip_serializing_if = "is_false")]
|
||||
oneshot: bool,
|
||||
pub oneshot: bool,
|
||||
|
||||
#[serde(default = "set_false", skip_serializing_if = "is_false")]
|
||||
ipv4only: bool,
|
||||
pub ipv4only: bool,
|
||||
#[serde(default = "set_false", skip_serializing_if = "is_false")]
|
||||
ipv6only: bool,
|
||||
pub ipv6only: bool,
|
||||
|
||||
#[serde(skip)]
|
||||
patterns: Arc<BTreeSet<Arc<Pattern>>>,
|
||||
pub patterns: Arc<BTreeSet<Arc<Pattern>>>,
|
||||
#[serde(skip)]
|
||||
name: String,
|
||||
pub name: String,
|
||||
#[serde(skip)]
|
||||
filter_name: String,
|
||||
pub filter_name: String,
|
||||
#[serde(skip)]
|
||||
stream_name: String,
|
||||
pub stream_name: String,
|
||||
}
|
||||
|
||||
fn set_false() -> bool {
|
||||
|
|
@ -52,29 +52,6 @@ fn is_false(b: &bool) -> bool {
|
|||
}
|
||||
|
||||
impl Action {
|
||||
pub fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
pub fn after_duration(&self) -> Option<TimeDelta> {
|
||||
self.after_duration
|
||||
}
|
||||
|
||||
pub fn on_exit(&self) -> bool {
|
||||
self.on_exit
|
||||
}
|
||||
|
||||
pub fn oneshot(&self) -> bool {
|
||||
self.oneshot
|
||||
}
|
||||
|
||||
pub fn ipv4only(&self) -> bool {
|
||||
self.ipv4only
|
||||
}
|
||||
pub fn ipv6only(&self) -> bool {
|
||||
self.ipv6only
|
||||
}
|
||||
|
||||
pub fn setup(
|
||||
&mut self,
|
||||
stream_name: &str,
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
|
|||
use thiserror::Error;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use super::{Filter, Pattern, Stream};
|
||||
use super::{Pattern, Stream};
|
||||
|
||||
pub type Patterns = BTreeMap<String, Arc<Pattern>>;
|
||||
|
||||
|
|
@ -20,20 +20,20 @@ pub type Patterns = BTreeMap<String, Arc<Pattern>>;
|
|||
#[serde(deny_unknown_fields)]
|
||||
pub struct Config {
|
||||
#[serde(default = "num_cpus::get")]
|
||||
concurrency: usize,
|
||||
pub concurrency: usize,
|
||||
#[serde(default = "dot", skip_serializing_if = "String::is_empty")]
|
||||
state_directory: String,
|
||||
pub state_directory: String,
|
||||
|
||||
#[serde(default)]
|
||||
patterns: Patterns,
|
||||
pub patterns: Patterns,
|
||||
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
start: Vec<Vec<String>>,
|
||||
pub start: Vec<Vec<String>>,
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
stop: Vec<Vec<String>>,
|
||||
pub stop: Vec<Vec<String>>,
|
||||
|
||||
#[serde(default)]
|
||||
streams: BTreeMap<String, Stream>,
|
||||
pub streams: BTreeMap<String, Stream>,
|
||||
|
||||
// This field only serve the purpose of having a top-level place for saving YAML variables
|
||||
#[serde(default, skip_serializing, rename = "definitions")]
|
||||
|
|
@ -45,35 +45,6 @@ fn dot() -> String {
|
|||
}
|
||||
|
||||
impl Config {
|
||||
pub fn streams(&self) -> &BTreeMap<String, Stream> {
|
||||
&self.streams
|
||||
}
|
||||
|
||||
pub fn patterns(&self) -> &Patterns {
|
||||
&self.patterns
|
||||
}
|
||||
|
||||
pub fn concurrency(&self) -> usize {
|
||||
self.concurrency
|
||||
}
|
||||
|
||||
pub fn state_directory(&self) -> &str {
|
||||
&self.state_directory
|
||||
}
|
||||
|
||||
pub fn filters(&self) -> Vec<&Filter> {
|
||||
self.streams
|
||||
.values()
|
||||
.flat_map(|stream| stream.filters().values())
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn get_filter(&self, name: &(String, String)) -> Option<&Filter> {
|
||||
self.streams
|
||||
.get(&name.0)
|
||||
.and_then(|stream| stream.get_filter(&name.1))
|
||||
}
|
||||
|
||||
fn merge(&mut self, mut other: Config) -> Result<(), String> {
|
||||
for (key, pattern) in other.patterns.into_iter() {
|
||||
match self.patterns.entry(key) {
|
||||
|
|
@ -644,7 +615,7 @@ mod tests {
|
|||
assert!(cfg_org.streams.contains_key("echo"));
|
||||
assert_eq!(cfg_org.streams.len(), 1);
|
||||
|
||||
let filters = cfg_org.streams.get("echo").unwrap().filters();
|
||||
let filters = &cfg_org.streams.get("echo").unwrap().filters;
|
||||
assert!(filters.contains_key("f1"));
|
||||
assert!(filters.contains_key("f2"));
|
||||
assert_eq!(filters.len(), 2);
|
||||
|
|
@ -704,8 +675,8 @@ mod tests {
|
|||
assert!(cfg_org.streams.contains_key("echo"));
|
||||
assert_eq!(cfg_org.streams.len(), 1);
|
||||
let stream = cfg_org.streams.get("echo").unwrap();
|
||||
assert_eq!(stream.cmd().len(), 1);
|
||||
assert_eq!(stream.filters().len(), 1);
|
||||
assert_eq!(stream.cmd.len(), 1);
|
||||
assert_eq!(stream.filters.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
|||
|
|
@ -31,34 +31,34 @@ pub enum Duplicate {
|
|||
#[serde(deny_unknown_fields)]
|
||||
pub struct Filter {
|
||||
#[serde(skip)]
|
||||
longuest_action_duration: TimeDelta,
|
||||
pub longuest_action_duration: TimeDelta,
|
||||
#[serde(skip)]
|
||||
has_ip: bool,
|
||||
pub has_ip: bool,
|
||||
|
||||
regex: Vec<String>,
|
||||
pub regex: Vec<String>,
|
||||
#[serde(skip)]
|
||||
compiled_regex: Vec<Regex>,
|
||||
pub compiled_regex: Vec<Regex>,
|
||||
// We want patterns to be ordered
|
||||
// This is necessary when using matches which contain multiple patterns
|
||||
#[serde(skip)]
|
||||
patterns: Arc<BTreeSet<Arc<Pattern>>>,
|
||||
pub patterns: Arc<BTreeSet<Arc<Pattern>>>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
retry: Option<u32>,
|
||||
pub retry: Option<u32>,
|
||||
#[serde(rename = "retryperiod", skip_serializing_if = "Option::is_none")]
|
||||
retry_period: Option<String>,
|
||||
pub retry_period: Option<String>,
|
||||
#[serde(skip)]
|
||||
retry_duration: Option<TimeDelta>,
|
||||
pub retry_duration: Option<TimeDelta>,
|
||||
|
||||
#[serde(default)]
|
||||
duplicate: Duplicate,
|
||||
pub duplicate: Duplicate,
|
||||
|
||||
actions: BTreeMap<String, Action>,
|
||||
pub actions: BTreeMap<String, Action>,
|
||||
|
||||
#[serde(skip)]
|
||||
name: String,
|
||||
pub name: String,
|
||||
#[serde(skip)]
|
||||
stream_name: String,
|
||||
pub stream_name: String,
|
||||
}
|
||||
|
||||
impl Filter {
|
||||
|
|
@ -85,46 +85,6 @@ impl Filter {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
pub fn stream_name(&self) -> &str {
|
||||
&self.stream_name
|
||||
}
|
||||
|
||||
pub fn retry(&self) -> Option<u32> {
|
||||
self.retry
|
||||
}
|
||||
|
||||
pub fn retry_duration(&self) -> Option<TimeDelta> {
|
||||
self.retry_duration
|
||||
}
|
||||
|
||||
pub fn longuest_action_duration(&self) -> TimeDelta {
|
||||
self.longuest_action_duration
|
||||
}
|
||||
|
||||
pub fn regex(&self) -> &Vec<String> {
|
||||
&self.regex
|
||||
}
|
||||
|
||||
pub fn duplicate(&self) -> Duplicate {
|
||||
self.duplicate
|
||||
}
|
||||
|
||||
pub fn actions(&self) -> &BTreeMap<String, Action> {
|
||||
&self.actions
|
||||
}
|
||||
|
||||
pub fn patterns(&self) -> &BTreeSet<Arc<Pattern>> {
|
||||
&self.patterns
|
||||
}
|
||||
|
||||
pub fn check_ip(&self) -> bool {
|
||||
self.has_ip
|
||||
}
|
||||
|
||||
pub fn setup(
|
||||
&mut self,
|
||||
stream_name: &str,
|
||||
|
|
@ -220,11 +180,11 @@ impl Filter {
|
|||
self.has_ip = self
|
||||
.actions
|
||||
.values()
|
||||
.any(|action| action.ipv4only() || action.ipv6only());
|
||||
.any(|action| action.ipv4only || action.ipv6only);
|
||||
|
||||
self.longuest_action_duration =
|
||||
self.actions.values().fold(TimeDelta::seconds(0), |acc, v| {
|
||||
v.after_duration()
|
||||
v.after_duration
|
||||
.map_or(acc, |v| if v > acc { v } else { acc })
|
||||
});
|
||||
|
||||
|
|
@ -239,7 +199,7 @@ impl Filter {
|
|||
for pattern in self.patterns.as_ref() {
|
||||
// if the pattern is in an optional part of the regex,
|
||||
// there may be no captured group for it.
|
||||
if let Some(match_) = matches.name(pattern.name()) {
|
||||
if let Some(match_) = matches.name(&pattern.name) {
|
||||
if !pattern.is_ignore(match_.as_str()) {
|
||||
let mut match_ = match_.as_str().to_string();
|
||||
pattern.normalize(&mut match_);
|
||||
|
|
@ -267,16 +227,16 @@ impl Filter {
|
|||
mut patterns: BTreeMap<Arc<Pattern>, String>,
|
||||
) -> Result<Match, String> {
|
||||
// Check pattern length
|
||||
if patterns.len() != self.patterns().len() {
|
||||
if patterns.len() != self.patterns.len() {
|
||||
return Err(format!(
|
||||
"{} patterns specified, while the {}.{} filter has {} pattern: ({})",
|
||||
patterns.len(),
|
||||
self.stream_name(),
|
||||
self.name(),
|
||||
self.patterns().len(),
|
||||
self.patterns()
|
||||
self.stream_name,
|
||||
self.name,
|
||||
self.patterns.len(),
|
||||
self.patterns
|
||||
.iter()
|
||||
.map(|pattern| pattern.name().clone())
|
||||
.map(|pattern| pattern.name.clone())
|
||||
.reduce(|acc, pattern| acc + ", " + &pattern)
|
||||
.unwrap_or("".into()),
|
||||
));
|
||||
|
|
@ -286,7 +246,7 @@ impl Filter {
|
|||
if self.patterns.get(pattern).is_none() {
|
||||
return Err(format!(
|
||||
"pattern {} is not present in the filter {}.{}",
|
||||
pattern.name(),
|
||||
pattern.name,
|
||||
self.stream_name,
|
||||
self.name
|
||||
));
|
||||
|
|
@ -296,7 +256,7 @@ impl Filter {
|
|||
return Err(format!(
|
||||
"'{}' doesn't match pattern {}",
|
||||
match_,
|
||||
pattern.name(),
|
||||
pattern.name,
|
||||
));
|
||||
}
|
||||
|
||||
|
|
@ -304,7 +264,7 @@ impl Filter {
|
|||
return Err(format!(
|
||||
"'{}' is explicitly ignored by pattern {}",
|
||||
match_,
|
||||
pattern.name(),
|
||||
pattern.name,
|
||||
));
|
||||
}
|
||||
|
||||
|
|
@ -315,7 +275,7 @@ impl Filter {
|
|||
if !patterns.contains_key(pattern) {
|
||||
return Err(format!(
|
||||
"pattern {} is missing, because it's in the filter {}.{}",
|
||||
pattern.name(),
|
||||
pattern.name,
|
||||
self.stream_name,
|
||||
self.name
|
||||
));
|
||||
|
|
@ -373,7 +333,7 @@ impl Filter {
|
|||
config_patterns: &Patterns,
|
||||
) -> Self {
|
||||
let mut filter = Self {
|
||||
actions: actions.into_iter().map(|a| (a.name().into(), a)).collect(),
|
||||
actions: actions.into_iter().map(|a| (a.name.clone(), a)).collect(),
|
||||
regex: regex.into_iter().map(|s| s.into()).collect(),
|
||||
retry,
|
||||
retry_period: retry_period.map(|s| s.into()),
|
||||
|
|
|
|||
|
|
@ -134,21 +134,21 @@ pub struct PatternIp {
|
|||
rename = "type",
|
||||
skip_serializing_if = "PatternType::is_default"
|
||||
)]
|
||||
pattern_type: PatternType,
|
||||
pub pattern_type: PatternType,
|
||||
|
||||
#[serde(default, rename = "ipv4mask")]
|
||||
ipv4_mask: Option<u8>,
|
||||
pub ipv4_mask: Option<u8>,
|
||||
#[serde(default, rename = "ipv6mask")]
|
||||
ipv6_mask: Option<u8>,
|
||||
pub ipv6_mask: Option<u8>,
|
||||
#[serde(skip)]
|
||||
ipv4_bitmask: Option<Ipv4Addr>,
|
||||
pub ipv4_bitmask: Option<Ipv4Addr>,
|
||||
#[serde(skip)]
|
||||
ipv6_bitmask: Option<Ipv6Addr>,
|
||||
pub ipv6_bitmask: Option<Ipv6Addr>,
|
||||
|
||||
#[serde(default, rename = "ignorecidr", skip_serializing_if = "Vec::is_empty")]
|
||||
ignore_cidr: Vec<String>,
|
||||
pub ignore_cidr: Vec<String>,
|
||||
#[serde(skip)]
|
||||
ignore_cidr_normalized: Vec<Cidr>,
|
||||
pub ignore_cidr_normalized: Vec<Cidr>,
|
||||
}
|
||||
|
||||
impl PatternIp {
|
||||
|
|
|
|||
|
|
@ -15,20 +15,20 @@ pub struct Pattern {
|
|||
pub regex: String,
|
||||
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
ignore: Vec<String>,
|
||||
pub ignore: Vec<String>,
|
||||
|
||||
#[serde(default, rename = "ignoreregex", skip_serializing_if = "Vec::is_empty")]
|
||||
ignore_regex: Vec<String>,
|
||||
pub ignore_regex: Vec<String>,
|
||||
#[serde(skip)]
|
||||
compiled_ignore_regex: RegexSet,
|
||||
pub compiled_ignore_regex: RegexSet,
|
||||
|
||||
#[serde(flatten)]
|
||||
ip: PatternIp,
|
||||
pub ip: PatternIp,
|
||||
|
||||
#[serde(skip)]
|
||||
name: String,
|
||||
pub name: String,
|
||||
#[serde(skip)]
|
||||
name_with_braces: String,
|
||||
pub name_with_braces: String,
|
||||
}
|
||||
|
||||
impl Pattern {
|
||||
|
|
@ -40,15 +40,12 @@ impl Pattern {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn name(&self) -> &String {
|
||||
&self.name
|
||||
}
|
||||
pub fn name_with_braces(&self) -> &String {
|
||||
&self.name_with_braces
|
||||
}
|
||||
|
||||
pub fn pattern_type(&self) -> PatternType {
|
||||
self.ip.pattern_type()
|
||||
self.ip.pattern_type
|
||||
}
|
||||
|
||||
pub fn setup(&mut self, name: &str) -> Result<(), String> {
|
||||
|
|
|
|||
|
|
@ -9,31 +9,19 @@ use super::{Filter, Patterns};
|
|||
#[serde(deny_unknown_fields)]
|
||||
pub struct Stream {
|
||||
#[serde(default)]
|
||||
cmd: Vec<String>,
|
||||
pub cmd: Vec<String>,
|
||||
#[serde(default)]
|
||||
filters: BTreeMap<String, Filter>,
|
||||
pub filters: BTreeMap<String, Filter>,
|
||||
|
||||
#[serde(skip)]
|
||||
name: String,
|
||||
pub name: String,
|
||||
}
|
||||
|
||||
impl Stream {
|
||||
pub fn filters(&self) -> &BTreeMap<String, Filter> {
|
||||
&self.filters
|
||||
}
|
||||
|
||||
pub fn get_filter(&self, filter_name: &str) -> Option<&Filter> {
|
||||
self.filters.get(filter_name)
|
||||
}
|
||||
|
||||
pub fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
pub fn cmd(&self) -> &Vec<String> {
|
||||
&self.cmd
|
||||
}
|
||||
|
||||
pub fn merge(&mut self, other: Stream) -> Result<(), String> {
|
||||
if !(self.cmd.is_empty() || other.cmd.is_empty() || self.cmd == other.cmd) {
|
||||
return Err("cmd has conflicting definitions".into());
|
||||
|
|
|
|||
|
|
@ -91,11 +91,11 @@ impl FilterManager {
|
|||
let already_triggered = state.triggers.contains_key(&m);
|
||||
|
||||
// if duplicate: ignore and already triggered, skip
|
||||
if already_triggered && Duplicate::Ignore == self.filter.duplicate() {
|
||||
if already_triggered && Duplicate::Ignore == self.filter.duplicate {
|
||||
return false;
|
||||
}
|
||||
|
||||
let trigger = match self.filter.retry() {
|
||||
let trigger = match self.filter.retry {
|
||||
None => true,
|
||||
Some(retry) => {
|
||||
state.add_match(m.clone(), now);
|
||||
|
|
@ -106,7 +106,7 @@ impl FilterManager {
|
|||
|
||||
if trigger {
|
||||
state.remove_match(&m);
|
||||
let extend = already_triggered && Duplicate::Extend == self.filter.duplicate();
|
||||
let extend = already_triggered && Duplicate::Extend == self.filter.duplicate;
|
||||
if extend {
|
||||
state.remove_trigger(&m);
|
||||
}
|
||||
|
|
@ -142,7 +142,7 @@ impl FilterManager {
|
|||
let is_match = |match_: &Match| {
|
||||
match_
|
||||
.iter()
|
||||
.zip(self.filter.patterns())
|
||||
.zip(self.filter.patterns.as_ref())
|
||||
.filter_map(|(a_match, pattern)| {
|
||||
patterns.get(pattern.as_ref()).map(|regex| (a_match, regex))
|
||||
})
|
||||
|
|
@ -205,13 +205,13 @@ impl FilterManager {
|
|||
if remaining > 0 {
|
||||
let pattern_status = cs.entry(m.clone()).or_default();
|
||||
|
||||
for action in self.filter.actions().values() {
|
||||
let action_time = t + action.after_duration().unwrap_or_default();
|
||||
for action in self.filter.actions.values() {
|
||||
let action_time = t + action.after_duration.unwrap_or_default();
|
||||
if action_time > now {
|
||||
// Insert action
|
||||
pattern_status
|
||||
.actions
|
||||
.entry(action.name().into())
|
||||
.entry(action.name.clone())
|
||||
.or_default()
|
||||
.push(action_time.to_rfc3339().chars().take(19).collect());
|
||||
|
||||
|
|
@ -241,9 +241,9 @@ impl FilterManager {
|
|||
only_after: bool,
|
||||
) {
|
||||
// Testing if we have an IPv4 or IPv6
|
||||
let ip_type = if self.filter.check_ip() {
|
||||
let ip_type = if self.filter.has_ip {
|
||||
self.filter
|
||||
.patterns()
|
||||
.patterns
|
||||
.iter()
|
||||
.zip(&m)
|
||||
.find(|(p, _)| p.pattern_type() == PatternType::Ip)
|
||||
|
|
@ -266,17 +266,17 @@ impl FilterManager {
|
|||
// Scheduling each action
|
||||
for action in self
|
||||
.filter
|
||||
.actions()
|
||||
.actions
|
||||
.values()
|
||||
// On startup, skip oneshot actions
|
||||
.filter(|action| !startup || !action.oneshot())
|
||||
.filter(|action| !startup || !action.oneshot)
|
||||
// If only_after, keep only after actions
|
||||
.filter(|action| !only_after || action.after_duration().is_some())
|
||||
.filter(|action| !only_after || action.after_duration.is_some())
|
||||
// If specific ip version, check it
|
||||
.filter(|action| !action.ipv4only() || ip_type == PatternType::Ipv4)
|
||||
.filter(|action| !action.ipv6only() || ip_type == PatternType::Ipv6)
|
||||
.filter(|action| !action.ipv4only || ip_type == PatternType::Ipv4)
|
||||
.filter(|action| !action.ipv6only || ip_type == PatternType::Ipv6)
|
||||
{
|
||||
let exec_time = t + action.after_duration().unwrap_or_default();
|
||||
let exec_time = t + action.after_duration.unwrap_or_default();
|
||||
let m = m.clone();
|
||||
|
||||
if exec_time <= now {
|
||||
|
|
@ -298,7 +298,7 @@ impl FilterManager {
|
|||
_ = this.shutdown.wait() => true,
|
||||
};
|
||||
// Exec action if triggered hasn't been already flushed
|
||||
if !exiting || action.on_exit() {
|
||||
if !exiting || action.on_exit {
|
||||
#[allow(clippy::unwrap_used)] // propagating panics is ok
|
||||
let mut state = this.state.lock().unwrap();
|
||||
if state.decrement_trigger(&m, t) {
|
||||
|
|
@ -311,13 +311,13 @@ impl FilterManager {
|
|||
}
|
||||
|
||||
fn clear_past_triggers_and_schedule_future_actions(&self, now: Time) {
|
||||
let longuest_action_duration = self.filter.longuest_action_duration();
|
||||
let longuest_action_duration = self.filter.longuest_action_duration;
|
||||
let number_of_actions = self
|
||||
.filter
|
||||
.actions()
|
||||
.actions
|
||||
.values()
|
||||
// On startup, skip oneshot actions
|
||||
.filter(|action| !action.oneshot())
|
||||
.filter(|action| !action.oneshot)
|
||||
.count() as u64;
|
||||
|
||||
#[allow(clippy::unwrap_used)] // propagating panics is ok
|
||||
|
|
|
|||
|
|
@ -11,20 +11,20 @@ use crate::{
|
|||
pub fn filter_ordered_times_db_name(filter: &Filter) -> String {
|
||||
format!(
|
||||
"filter_ordered_times_{}.{}",
|
||||
filter.stream_name(),
|
||||
filter.name()
|
||||
filter.stream_name,
|
||||
filter.name
|
||||
)
|
||||
}
|
||||
|
||||
pub fn filter_triggers_old_db_name(filter: &Filter) -> String {
|
||||
format!("filter_triggers_{}.{}", filter.stream_name(), filter.name())
|
||||
format!("filter_triggers_{}.{}", filter.stream_name, filter.name)
|
||||
}
|
||||
|
||||
pub fn filter_triggers_db_name(filter: &Filter) -> String {
|
||||
format!(
|
||||
"filter_triggers2_{}.{}",
|
||||
filter.stream_name(),
|
||||
filter.name()
|
||||
filter.stream_name,
|
||||
filter.name
|
||||
)
|
||||
}
|
||||
|
||||
|
|
@ -67,18 +67,18 @@ impl State {
|
|||
pub fn new(filter: &'static Filter, db: &mut Database, now: Time) -> Result<Self, String> {
|
||||
let ordered_times = db.open_tree(
|
||||
filter_ordered_times_db_name(filter),
|
||||
filter.retry_duration().unwrap_or_default(),
|
||||
filter.retry_duration.unwrap_or_default(),
|
||||
|(key, value)| Ok((to_time(&key)?, to_match(&value)?)),
|
||||
)?;
|
||||
let mut triggers = db.open_tree(
|
||||
filter_triggers_db_name(filter),
|
||||
filter.longuest_action_duration(),
|
||||
filter.longuest_action_duration,
|
||||
|(key, value)| Ok((to_match(&key)?, to_timemap(&value)?)),
|
||||
)?;
|
||||
if triggers.is_empty() {
|
||||
let old_triggers = db.open_tree(
|
||||
filter_triggers_old_db_name(filter),
|
||||
filter.longuest_action_duration(),
|
||||
filter.longuest_action_duration,
|
||||
|(key, value)| Ok((to_matchtime(&key)?, to_u64(&value)?)),
|
||||
)?;
|
||||
for (mt, n) in old_triggers.iter() {
|
||||
|
|
@ -95,7 +95,7 @@ impl State {
|
|||
}
|
||||
let mut this = Self {
|
||||
filter,
|
||||
has_after: !filter.longuest_action_duration().is_zero(),
|
||||
has_after: !filter.longuest_action_duration.is_zero(),
|
||||
matches: BTreeMap::new(),
|
||||
ordered_times,
|
||||
triggers,
|
||||
|
|
@ -115,7 +115,7 @@ impl State {
|
|||
// We record triggered filters only when there is an action with an `after` directive
|
||||
if self.has_after {
|
||||
// Add the (Match, Time) to the triggers map
|
||||
let n = self.filter.actions().len() as u64;
|
||||
let n = self.filter.actions.len() as u64;
|
||||
self.triggers.fetch_update(m, |map| {
|
||||
Some(match map {
|
||||
None => [(t, n)].into(),
|
||||
|
|
@ -183,7 +183,7 @@ impl State {
|
|||
}
|
||||
|
||||
pub fn clear_past_matches(&mut self, now: Time) {
|
||||
let retry_duration = self.filter.retry_duration().unwrap_or_default();
|
||||
let retry_duration = self.filter.retry_duration.unwrap_or_default();
|
||||
while self
|
||||
.ordered_times
|
||||
.first_key_value()
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ pub async fn daemon(
|
|||
|
||||
let (state, stream_managers) = {
|
||||
// Semaphore limiting action execution concurrency
|
||||
let exec_limit = match config.concurrency() {
|
||||
let exec_limit = match config.concurrency {
|
||||
0 => None,
|
||||
n => Some(Arc::new(Semaphore::new(n))),
|
||||
};
|
||||
|
|
@ -54,9 +54,9 @@ pub async fn daemon(
|
|||
let now = Local::now();
|
||||
let mut state = HashMap::new();
|
||||
let mut stream_managers = Vec::new();
|
||||
for stream in config.streams().values() {
|
||||
for stream in config.streams.values() {
|
||||
let mut filter_managers = HashMap::new();
|
||||
for filter in stream.filters().values() {
|
||||
for filter in stream.filters.values() {
|
||||
let manager =
|
||||
FilterManager::new(filter, exec_limit.clone(), shutdown.token(), &mut db, now)?;
|
||||
filter_managers.insert(filter, manager);
|
||||
|
|
|
|||
|
|
@ -79,7 +79,7 @@ fn handle_trigger_order(
|
|||
// Check stream existance
|
||||
let filters = match shared_state
|
||||
.iter()
|
||||
.find(|(stream, _)| stream_name == stream.name())
|
||||
.find(|(stream, _)| stream_name == stream.name)
|
||||
{
|
||||
Some((_, filters)) => filters,
|
||||
None => {
|
||||
|
|
@ -90,7 +90,7 @@ fn handle_trigger_order(
|
|||
// Check filter existance
|
||||
let filter_manager = match filters
|
||||
.iter()
|
||||
.find(|(filter, _)| filter_name == filter.name())
|
||||
.find(|(filter, _)| filter_name == filter.name)
|
||||
{
|
||||
Some((_, filter)) => filter,
|
||||
None => {
|
||||
|
|
@ -122,7 +122,7 @@ fn handle_show_or_flush_order(
|
|||
stream_name.is_none()
|
||||
|| stream_name
|
||||
.clone()
|
||||
.is_some_and(|name| name == stream.name())
|
||||
.is_some_and(|name| name == stream.name)
|
||||
})
|
||||
.fold(BTreeMap::new(), |mut acc, (stream, filter_manager)| {
|
||||
let inner_map = filter_manager
|
||||
|
|
@ -132,22 +132,22 @@ fn handle_show_or_flush_order(
|
|||
filter_name.is_none()
|
||||
|| filter_name
|
||||
.clone()
|
||||
.is_some_and(|name| name == filter.name())
|
||||
.is_some_and(|name| name == filter.name)
|
||||
})
|
||||
// pattern filtering
|
||||
.filter(|(filter, _)| {
|
||||
patterns
|
||||
.iter()
|
||||
.all(|(pattern, _)| filter.patterns().get(pattern).is_some())
|
||||
.all(|(pattern, _)| filter.patterns.get(pattern).is_some())
|
||||
})
|
||||
.map(|(filter, manager)| {
|
||||
(
|
||||
filter.name().to_owned(),
|
||||
filter.name.to_owned(),
|
||||
manager.handle_order(&patterns, order, now),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
acc.insert(stream.name().to_owned(), inner_map);
|
||||
acc.insert(stream.name.to_owned(), inner_map);
|
||||
acc
|
||||
});
|
||||
DaemonResponse::Order(cs)
|
||||
|
|
@ -174,7 +174,7 @@ fn answer_order(
|
|||
.map(|(name, reg)| {
|
||||
// lookup pattern in config.patterns
|
||||
config
|
||||
.patterns()
|
||||
.patterns
|
||||
.iter()
|
||||
// retrieve or Err
|
||||
.find(|(pattern_name, _)| &name == *pattern_name)
|
||||
|
|
@ -196,7 +196,7 @@ fn answer_order(
|
|||
Ok(reg) => Ok((pattern, reg)),
|
||||
Err(err) => Err(format!(
|
||||
"pattern '{}' regex doesn't compile: {err}",
|
||||
pattern.name()
|
||||
pattern.name
|
||||
)),
|
||||
})
|
||||
.collect::<Result<BTreeMap<Arc<Pattern>, Regex>, String>>()
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ impl StreamManager {
|
|||
.iter()
|
||||
.flat_map(|(filter, filter_manager)| {
|
||||
filter
|
||||
.regex()
|
||||
.regex
|
||||
.iter()
|
||||
.map(|regex| (regex, filter_manager.clone()))
|
||||
})
|
||||
|
|
@ -80,9 +80,9 @@ impl StreamManager {
|
|||
}
|
||||
|
||||
pub async fn start(self) {
|
||||
info!("{}: start {:?}", self.stream.name(), self.stream.cmd());
|
||||
let mut child = match Command::new(&self.stream.cmd()[0])
|
||||
.args(&self.stream.cmd()[1..])
|
||||
info!("{}: start {:?}", self.stream.name, self.stream.cmd);
|
||||
let mut child = match Command::new(&self.stream.cmd[0])
|
||||
.args(&self.stream.cmd[1..])
|
||||
.stdin(Stdio::null())
|
||||
.stderr(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
|
|
@ -92,7 +92,7 @@ impl StreamManager {
|
|||
Err(err) => {
|
||||
error!(
|
||||
"could not execute stream {} cmd: {}",
|
||||
self.stream.name(),
|
||||
self.stream.name,
|
||||
err
|
||||
);
|
||||
return;
|
||||
|
|
@ -120,7 +120,7 @@ impl StreamManager {
|
|||
// wait either for the child process to exit on its own or for the shutdown signal
|
||||
futures::select! {
|
||||
_ = child.wait().fuse() => {
|
||||
error!("stream {} exited: its command returned.", self.stream.name());
|
||||
error!("stream {} exited: its command returned.", self.stream.name);
|
||||
return;
|
||||
}
|
||||
_ = self.shutdown.wait().fuse() => {}
|
||||
|
|
@ -143,7 +143,7 @@ impl StreamManager {
|
|||
} else {
|
||||
warn!(
|
||||
"could not get PID of child process for stream {}",
|
||||
self.stream.name()
|
||||
self.stream.name
|
||||
);
|
||||
// still try to use tokio API to kill and reclaim the child process
|
||||
}
|
||||
|
|
@ -160,7 +160,7 @@ impl StreamManager {
|
|||
futures::select! {
|
||||
_ = child.wait().fuse() => {}
|
||||
_ = sleep(Duration::from_secs(STREAM_PROCESS_KILL_WAIT_TIMEOUT_SEC)).fuse() => {
|
||||
error!("child process of stream {} did not terminate", self.stream.name());
|
||||
error!("child process of stream {} did not terminate", self.stream.name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -183,7 +183,7 @@ impl StreamManager {
|
|||
Some(Err(err)) => {
|
||||
error!(
|
||||
"impossible to read output from stream {}: {}",
|
||||
self.stream.name(),
|
||||
self.stream.name,
|
||||
err
|
||||
);
|
||||
return;
|
||||
|
|
|
|||
|
|
@ -55,10 +55,10 @@ const DB_NEW_NAME: &str = "reaction.new.db";
|
|||
|
||||
impl Config {
|
||||
fn path_of(&self, name: &str) -> PathBuf {
|
||||
if self.state_directory().is_empty() {
|
||||
if self.state_directory.is_empty() {
|
||||
name.into()
|
||||
} else {
|
||||
PathBuf::from(self.state_directory()).join(name)
|
||||
PathBuf::from(&self.state_directory).join(name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue