diff --git a/src/concepts/config.rs b/src/concepts/config.rs index 3067a17..68e3de8 100644 --- a/src/concepts/config.rs +++ b/src/concepts/config.rs @@ -1,5 +1,5 @@ use std::{ - collections::BTreeMap, + collections::{btree_map, BTreeMap}, fs::File, io, path::Path, @@ -9,9 +9,9 @@ use std::{ use serde::Deserialize; use thiserror::Error; -use tracing::{error, info}; +use tracing::{debug, error, info, warn}; -use super::{Pattern, Stream}; +use super::{Filter, Pattern, Stream}; pub type Patterns = BTreeMap>; @@ -19,8 +19,10 @@ pub type Patterns = BTreeMap>; #[cfg_attr(test, derive(Default))] #[serde(deny_unknown_fields)] pub struct Config { + #[serde(default)] patterns: Patterns, + #[serde(default)] streams: BTreeMap, #[serde(default = "num_cpus::get")] @@ -60,7 +62,53 @@ impl Config { &self.state_directory } - fn setup(&mut self) -> Result<(), String> { + pub fn filters(&self) -> Vec<&Filter> { + self.streams + .values() + .flat_map(|stream| stream.filters().values()) + .collect() + } + + pub fn get_filter(&self, name: &(String, String)) -> Option<&Filter> { + self.streams + .get(&name.0) + .and_then(|stream| stream.get_filter(&name.1)) + } + + pub fn merge(&mut self, mut other: Config) -> Result<(), String> { + for (key, pattern) in other.patterns.into_iter() { + match self.patterns.entry(key) { + btree_map::Entry::Occupied(e) => { + return Err(format!("Pattern {} is already defined", e.key())); + } + btree_map::Entry::Vacant(e) => { + e.insert(pattern); + } + } + } + + for (key, stream) in other.streams.into_iter() { + match self.streams.entry(key) { + btree_map::Entry::Vacant(e) => { + e.insert(stream); + } + btree_map::Entry::Occupied(mut e) => { + e.get_mut() + .merge(stream) + .map_err(|err| format!("Stream {}: {}", e.key(), err))?; + } + } + } + + self.start.append(&mut other.start); + self.stop.append(&mut other.stop); + + self.state_directory = other.state_directory; + + Ok(()) + } + + pub fn setup(&mut self) -> Result<(), String> { if self.concurrency == 0 { self.concurrency = num_cpus::get(); } @@ -68,6 +116,10 @@ impl Config { // Nullify this useless field self._definitions = serde_json::Value::Null; + if self.patterns.is_empty() { + return Err("no patterns configured".into()); + } + let mut new_patterns = BTreeMap::new(); for (key, value) in &self.patterns { let mut value = value.as_ref().clone(); @@ -94,32 +146,158 @@ impl Config { run_commands(&self.stop, "stop") } + pub fn from_path(path: &Path) -> Result<(Self, Vec), String> { + match std::fs::metadata(path) { + Err(e) => Err(format!("Error accessing {}: {e}", path.to_string_lossy())), + Ok(m) => { + if m.is_file() { + Self::from_file(path).map(|cfg| { + let fname = path + .file_name() + .map(|s| s.to_string_lossy().to_string()) + .unwrap_or("".to_string()); + (cfg, vec![fname]) + }) + } else if m.is_dir() { + Self::from_dir(path) + } else { + Err(format!( + "Invalid file type for {}: not a file nor a directory", + path.to_string_lossy() + )) + } + } + } + } + + pub fn from_dir(path: &Path) -> Result<(Self, Vec), String> { + let dir = std::fs::read_dir(path) + .map_err(|e| format!("Error accessing directory {}: {e}", path.display()))?; + let mut cfg: Option = None; + let mut read_cfg_fname = vec![]; + for f in dir { + let f = + f.map_err(|e| format!("Error while reading directory {}: {e}", path.display()))?; + + let fname = f.file_name(); + let fname = match fname.to_str() { + Some(fname) => fname, + None => { + warn!( + "Ignoring file {} in {}", + f.file_name().to_string_lossy(), + path.display() + ); + continue; + } + }; + + if fname.starts_with(".") || fname.starts_with("_") { + // silently ignore hidden file + debug!("Ignoring hidden file {fname} in {}", path.display()); + continue; + } + + let fpath = f.path(); + let ext = match fpath.extension() { + None => { + // silently ignore files without extensions (may be directory) + debug!( + "Ignoring file without extension {fname} in {}", + path.display() + ); + continue; + } + Some(ext) => { + if let Some(ext) = ext.to_str() { + ext + } else { + warn!( + "Ignoring file {} in {} with unexpected extension", + fname, + path.display() + ); + continue; + } + } + }; + + let cfg_format = match Self::_extension_to_format(ext) { + Ok(fmt) => fmt, + Err(_) => { + // silently ignore files without an expected extension + debug!( + "Ignoring file with non recognized extension {fname} in {}", + path.display() + ); + continue; + } + }; + + let cfg_part = Self::_load_file(&fpath, cfg_format) + .map_err(|e| format!("While reading {fname} in {}: {e}", path.display()))?; + read_cfg_fname.push(fname.to_string()); + + if let Some(mut cfg_agg) = cfg.take() { + cfg_agg.merge(cfg_part)?; + cfg = Some(cfg_agg); + } else { + cfg = Some(cfg_part) + } + } + + if let Some(mut cfg) = cfg { + cfg.setup().map_err(ConfigError::BadConfig).map_err(|e| { + format!( + "{e}\nWhile reading files from {}. List of files read, in that order:\n{}", + path.display(), + read_cfg_fname.join("\n"), + ) + })?; + Ok((cfg, read_cfg_fname)) + } else { + Err(format!( + "No valid configuration files found in {}", + path.display() + )) + } + } + pub fn from_file(path: &Path) -> Result { Config::_from_file(path) .map_err(|err| format!("Configuration file {}: {}", path.display(), err)) } + fn _extension_to_format(extension: &str) -> Result { + match extension { + "yaml" | "yml" => Ok(Format::Yaml), + "json" => Ok(Format::Json), + "jsonnet" => Ok(Format::Jsonnet), + _ => Err(ConfigError::Extension(format!( + "extension {} is not recognized", + extension + ))), + } + } + + fn _load_file(path: &Path, format: Format) -> Result { + let cfg: Self = match format { + Format::Json => serde_json::from_reader(File::open(path)?)?, + Format::Yaml => serde_yaml::from_reader(File::open(path)?)?, + Format::Jsonnet => serde_json::from_str(&jsonnet::from_path(path)?)?, + }; + Ok(cfg) + } + fn _from_file(path: &Path) -> Result { let extension = path .extension() .and_then(|ex| ex.to_str()) .ok_or(ConfigError::Extension("no file extension".into()))?; - let format = match extension { - "yaml" | "yml" => Ok(Format::Yaml), - "json" => Ok(Format::Json), - "jsonnet" => Ok(Format::Jsonnet), - _ => Err(ConfigError::Extension(format!( - "extension {} is not recognized", - extension - ))), - }?; + let format = Self::_extension_to_format(extension)?; - let mut config: Config = match format { - Format::Json => serde_json::from_reader(File::open(path)?)?, - Format::Yaml => serde_yaml::from_reader(File::open(path)?)?, - Format::Jsonnet => serde_json::from_str(&jsonnet::from_path(path)?)?, - }; + let mut config: Config = Self::_load_file(path, format)?; config.setup().map_err(ConfigError::BadConfig)?; @@ -213,6 +391,7 @@ fn run_commands(commands: &Vec>, moment: &str) -> bool { } #[cfg(test)] +#[allow(clippy::unwrap_used)] mod tests { use super::*; @@ -222,4 +401,312 @@ mod tests { let mut config = Config::default(); assert!(config.setup().is_err()); } + + fn parse_config_json(cfg: &str) -> Config { + const DUMMY_PATTERNS: &str = r#" + "patterns": { + "zero": { + "regex": "0" + } + }"#; + const DUMMY_FILTERS: &str = r#" + "filters": { + "dummy": { + "regex": ["abc"], + "actions": { + "act": { + "cmd": ["echo", "1"] + } + } + } + }"#; + const DUMMY_STREAMS: &str = r#" + "streams": { + "dummy": { + "cmd": ["true"], + {{FILTERS}} + } + } + "#; + let cfg = cfg + .to_string() + .replace("{{STREAMS}}", DUMMY_STREAMS) + .replace("{{PATTERNS}}", DUMMY_PATTERNS) + .replace("{{FILTERS}}", DUMMY_FILTERS); + + serde_json::from_str(&cfg) + .inspect_err(|_| { + eprintln!("{cfg}"); + }) + .unwrap() + } + + #[test] + fn config_without_stream() { + let mut cfg1 = parse_config_json( + r#"{ + {{PATTERNS}} + }"#, + ); + let mut cfg2 = parse_config_json( + r#"{ + {{PATTERNS}}, + "streams": {} + }"#, + ); + + let res = cfg1.setup(); + assert!(res.is_err()); + let res = cfg2.setup(); + assert!(res.is_err()); + } + + #[test] + fn config_without_pattern() { + let mut cfg1 = parse_config_json( + r#"{ + {{STREAMS}} + }"#, + ); + let mut cfg2 = parse_config_json( + r#"{ + "patterns": {}, + {{STREAMS}} + }"#, + ); + + let res = cfg1.setup(); + assert!(res.is_err()); + let res = cfg2.setup(); + assert!(res.is_err()); + } + + #[test] + fn merge_config_distinct_patterns() { + let mut cfg_org = parse_config_json( + r#"{ + "patterns": { + "ip4": { + "regex": "ip4" + } + }, + {{STREAMS}} + }"#, + ); + let cfg_oth = parse_config_json( + r#"{ + "patterns": { + "ip6": { + "regex": "ip6" + } + } + }"#, + ); + + cfg_org.merge(cfg_oth).unwrap(); + cfg_org.setup().unwrap(); + assert!(cfg_org.patterns.contains_key("ip4")); + assert!(cfg_org.patterns.contains_key("ip6")); + assert_eq!(cfg_org.patterns.len(), 2); + assert!(cfg_org.streams.contains_key("dummy")); + assert_eq!(cfg_org.streams.len(), 1); + } + + #[test] + fn merge_config_same_patterns() { + let mut cfg_org = parse_config_json( + r#"{ + "patterns": { + "zero": { + "regex": "0" + } + }, + {{STREAMS}} + }"#, + ); + let cfg_oth = parse_config_json( + r#"{ + "patterns": { + "zero": { + "regex": "00" + } + } + }"#, + ); + + let res = cfg_org.merge(cfg_oth); + assert!(res.is_err()); + } + + #[test] + fn merge_config_distinct_streams() { + let mut cfg_org = parse_config_json( + r#"{ + {{PATTERNS}}, + "streams": { + "echo1": { + "cmd": ["echo"], + {{FILTERS}} + } + } + }"#, + ); + let cfg_oth = parse_config_json( + r#"{ + "streams": { + "echo2": { + "cmd": ["echo"], + {{FILTERS}} + } + } + }"#, + ); + + cfg_org.merge(cfg_oth).unwrap(); + cfg_org.setup().unwrap(); + assert!(cfg_org.patterns.contains_key("zero")); + assert_eq!(cfg_org.patterns.len(), 1); + assert!(cfg_org.streams.contains_key("echo1")); + assert!(cfg_org.streams.contains_key("echo2")); + assert_eq!(cfg_org.streams.len(), 2); + } + + #[test] + fn merge_config_same_streams_distinct_filters() { + let mut cfg_org = parse_config_json( + r#"{ + {{PATTERNS}}, + "streams": { + "echo": { + "cmd": ["echo"], + "filters": { + "f1": { + "regex": ["abc"], + "actions": { + "act": { + "cmd": ["echo", "1"] + } + } + } + } + } + } + }"#, + ); + let cfg_oth = parse_config_json( + r#"{ + "streams": { + "echo": { + "cmd": ["echo"], + "filters": { + "f2": { + "regex": ["abc"], + "actions": { + "act": { + "cmd": ["echo", "1"] + } + } + } + } + } + } + }"#, + ); + + cfg_org.merge(cfg_oth).unwrap(); + cfg_org.setup().unwrap(); + assert!(cfg_org.streams.contains_key("echo")); + assert_eq!(cfg_org.streams.len(), 1); + + let filters = cfg_org.streams.get("echo").unwrap().filters(); + assert!(filters.contains_key("f1")); + assert!(filters.contains_key("f2")); + assert_eq!(filters.len(), 2); + } + + #[test] + fn merge_config_same_streams_distinct_command() { + let mut cfg_org = parse_config_json( + r#"{ + {{PATTERNS}}, + "streams": { + "echo": { + "cmd": ["echo"], + {{FILTERS}} + } + } + }"#, + ); + let cfg_oth = parse_config_json( + r#"{ + "streams": { + "echo": { + "cmd": ["true"] + } + } + }"#, + ); + + let res = cfg_org.merge(cfg_oth); + assert!(res.is_err()); + } + + #[test] + fn merge_config_same_streams_command_in_one() { + let mut cfg_org = parse_config_json( + r#"{ + {{PATTERNS}}, + "streams": { + "echo": { + {{FILTERS}} + } + } + }"#, + ); + let cfg_oth = parse_config_json( + r#"{ + "streams": { + "echo": { + "cmd": ["echo"] + } + } + }"#, + ); + + cfg_org.merge(cfg_oth).unwrap(); + cfg_org.setup().unwrap(); + assert!(cfg_org.streams.contains_key("echo")); + assert_eq!(cfg_org.streams.len(), 1); + let stream = cfg_org.streams.get("echo").unwrap(); + assert_eq!(stream.cmd().len(), 1); + assert_eq!(stream.filters().len(), 1); + } + + #[test] + fn merge_config_same_streams_same_filters() { + let mut cfg_org = parse_config_json( + r#"{ + {{PATTERNS}}, + "streams": { + "echo": { + "cmd": ["echo"], + {{FILTERS}} + } + } + }"#, + ); + let cfg_oth = parse_config_json( + r#"{ + "streams": { + "echo": { + "cmd": ["echo"], + {{FILTERS}} + } + } + }"#, + ); + + let res = cfg_org.merge(cfg_oth); + assert!(res.is_err()); + } } diff --git a/src/concepts/stream.rs b/src/concepts/stream.rs index b57c28e..67e2bfe 100644 --- a/src/concepts/stream.rs +++ b/src/concepts/stream.rs @@ -8,7 +8,9 @@ use super::{Filter, Patterns}; #[cfg_attr(test, derive(Default))] #[serde(deny_unknown_fields)] pub struct Stream { + #[serde(default)] cmd: Vec, + #[serde(default)] filters: BTreeMap, #[serde(skip)] @@ -32,6 +34,24 @@ impl Stream { &self.cmd } + pub fn merge(&mut self, other: Stream) -> Result<(), String> { + if !(self.cmd.is_empty() || other.cmd.is_empty() || self.cmd == other.cmd) { + return Err("Command is already defined".into()); + } + + if self.cmd.is_empty() { + self.cmd = other.cmd; + } + + for (key, filter) in other.filters.into_iter() { + if self.filters.insert(key.clone(), filter).is_some() { + return Err(format!("Filter {} already defined", key)); + } + } + + Ok(()) + } + pub fn setup(&mut self, name: &str, patterns: &Patterns) -> Result<(), String> { self._setup(name, patterns) .map_err(|msg| format!("stream {}: {}", name, msg)) diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 5a3e407..058165b 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -31,7 +31,7 @@ pub async fn daemon( config_path: PathBuf, socket: PathBuf, ) -> Result<(), Box> { - let config: &'static Config = Box::leak(Box::new(Config::from_file(&config_path)?)); + let config: &'static Config = Box::leak(Box::new(Config::from_path(&config_path)?.0)); if !config.start() { return Err("a start command failed, exiting.".into());