Add ability to read config from multiple files in a same directory

This commit is contained in:
Baptiste Careil 2024-12-28 21:04:41 +01:00 committed by ppom
commit 28b3a173bb
No known key found for this signature in database
3 changed files with 521 additions and 14 deletions

View file

@ -1,5 +1,5 @@
use std::{
collections::BTreeMap,
collections::{btree_map, BTreeMap},
fs::File,
io,
path::Path,
@ -9,9 +9,9 @@ use std::{
use serde::Deserialize;
use thiserror::Error;
use tracing::{error, info};
use tracing::{debug, error, info, warn};
use super::{Pattern, Stream};
use super::{Filter, Pattern, Stream};
pub type Patterns = BTreeMap<String, Arc<Pattern>>;
@ -19,8 +19,10 @@ pub type Patterns = BTreeMap<String, Arc<Pattern>>;
#[cfg_attr(test, derive(Default))]
#[serde(deny_unknown_fields)]
pub struct Config {
#[serde(default)]
patterns: Patterns,
#[serde(default)]
streams: BTreeMap<String, Stream>,
#[serde(default = "num_cpus::get")]
@ -60,7 +62,53 @@ impl Config {
&self.state_directory
}
fn setup(&mut self) -> Result<(), String> {
pub fn filters(&self) -> Vec<&Filter> {
self.streams
.values()
.flat_map(|stream| stream.filters().values())
.collect()
}
pub fn get_filter(&self, name: &(String, String)) -> Option<&Filter> {
self.streams
.get(&name.0)
.and_then(|stream| stream.get_filter(&name.1))
}
pub fn merge(&mut self, mut other: Config) -> Result<(), String> {
for (key, pattern) in other.patterns.into_iter() {
match self.patterns.entry(key) {
btree_map::Entry::Occupied(e) => {
return Err(format!("Pattern {} is already defined", e.key()));
}
btree_map::Entry::Vacant(e) => {
e.insert(pattern);
}
}
}
for (key, stream) in other.streams.into_iter() {
match self.streams.entry(key) {
btree_map::Entry::Vacant(e) => {
e.insert(stream);
}
btree_map::Entry::Occupied(mut e) => {
e.get_mut()
.merge(stream)
.map_err(|err| format!("Stream {}: {}", e.key(), err))?;
}
}
}
self.start.append(&mut other.start);
self.stop.append(&mut other.stop);
self.state_directory = other.state_directory;
Ok(())
}
pub fn setup(&mut self) -> Result<(), String> {
if self.concurrency == 0 {
self.concurrency = num_cpus::get();
}
@ -68,6 +116,10 @@ impl Config {
// Nullify this useless field
self._definitions = serde_json::Value::Null;
if self.patterns.is_empty() {
return Err("no patterns configured".into());
}
let mut new_patterns = BTreeMap::new();
for (key, value) in &self.patterns {
let mut value = value.as_ref().clone();
@ -94,32 +146,158 @@ impl Config {
run_commands(&self.stop, "stop")
}
pub fn from_path(path: &Path) -> Result<(Self, Vec<String>), String> {
match std::fs::metadata(path) {
Err(e) => Err(format!("Error accessing {}: {e}", path.to_string_lossy())),
Ok(m) => {
if m.is_file() {
Self::from_file(path).map(|cfg| {
let fname = path
.file_name()
.map(|s| s.to_string_lossy().to_string())
.unwrap_or("".to_string());
(cfg, vec![fname])
})
} else if m.is_dir() {
Self::from_dir(path)
} else {
Err(format!(
"Invalid file type for {}: not a file nor a directory",
path.to_string_lossy()
))
}
}
}
}
pub fn from_dir(path: &Path) -> Result<(Self, Vec<String>), String> {
let dir = std::fs::read_dir(path)
.map_err(|e| format!("Error accessing directory {}: {e}", path.display()))?;
let mut cfg: Option<Self> = None;
let mut read_cfg_fname = vec![];
for f in dir {
let f =
f.map_err(|e| format!("Error while reading directory {}: {e}", path.display()))?;
let fname = f.file_name();
let fname = match fname.to_str() {
Some(fname) => fname,
None => {
warn!(
"Ignoring file {} in {}",
f.file_name().to_string_lossy(),
path.display()
);
continue;
}
};
if fname.starts_with(".") || fname.starts_with("_") {
// silently ignore hidden file
debug!("Ignoring hidden file {fname} in {}", path.display());
continue;
}
let fpath = f.path();
let ext = match fpath.extension() {
None => {
// silently ignore files without extensions (may be directory)
debug!(
"Ignoring file without extension {fname} in {}",
path.display()
);
continue;
}
Some(ext) => {
if let Some(ext) = ext.to_str() {
ext
} else {
warn!(
"Ignoring file {} in {} with unexpected extension",
fname,
path.display()
);
continue;
}
}
};
let cfg_format = match Self::_extension_to_format(ext) {
Ok(fmt) => fmt,
Err(_) => {
// silently ignore files without an expected extension
debug!(
"Ignoring file with non recognized extension {fname} in {}",
path.display()
);
continue;
}
};
let cfg_part = Self::_load_file(&fpath, cfg_format)
.map_err(|e| format!("While reading {fname} in {}: {e}", path.display()))?;
read_cfg_fname.push(fname.to_string());
if let Some(mut cfg_agg) = cfg.take() {
cfg_agg.merge(cfg_part)?;
cfg = Some(cfg_agg);
} else {
cfg = Some(cfg_part)
}
}
if let Some(mut cfg) = cfg {
cfg.setup().map_err(ConfigError::BadConfig).map_err(|e| {
format!(
"{e}\nWhile reading files from {}. List of files read, in that order:\n{}",
path.display(),
read_cfg_fname.join("\n"),
)
})?;
Ok((cfg, read_cfg_fname))
} else {
Err(format!(
"No valid configuration files found in {}",
path.display()
))
}
}
pub fn from_file(path: &Path) -> Result<Self, String> {
Config::_from_file(path)
.map_err(|err| format!("Configuration file {}: {}", path.display(), err))
}
fn _extension_to_format(extension: &str) -> Result<Format, ConfigError> {
match extension {
"yaml" | "yml" => Ok(Format::Yaml),
"json" => Ok(Format::Json),
"jsonnet" => Ok(Format::Jsonnet),
_ => Err(ConfigError::Extension(format!(
"extension {} is not recognized",
extension
))),
}
}
fn _load_file(path: &Path, format: Format) -> Result<Self, ConfigError> {
let cfg: Self = match format {
Format::Json => serde_json::from_reader(File::open(path)?)?,
Format::Yaml => serde_yaml::from_reader(File::open(path)?)?,
Format::Jsonnet => serde_json::from_str(&jsonnet::from_path(path)?)?,
};
Ok(cfg)
}
fn _from_file(path: &Path) -> Result<Self, ConfigError> {
let extension = path
.extension()
.and_then(|ex| ex.to_str())
.ok_or(ConfigError::Extension("no file extension".into()))?;
let format = match extension {
"yaml" | "yml" => Ok(Format::Yaml),
"json" => Ok(Format::Json),
"jsonnet" => Ok(Format::Jsonnet),
_ => Err(ConfigError::Extension(format!(
"extension {} is not recognized",
extension
))),
}?;
let format = Self::_extension_to_format(extension)?;
let mut config: Config = match format {
Format::Json => serde_json::from_reader(File::open(path)?)?,
Format::Yaml => serde_yaml::from_reader(File::open(path)?)?,
Format::Jsonnet => serde_json::from_str(&jsonnet::from_path(path)?)?,
};
let mut config: Config = Self::_load_file(path, format)?;
config.setup().map_err(ConfigError::BadConfig)?;
@ -213,6 +391,7 @@ fn run_commands(commands: &Vec<Vec<String>>, moment: &str) -> bool {
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
@ -222,4 +401,312 @@ mod tests {
let mut config = Config::default();
assert!(config.setup().is_err());
}
fn parse_config_json(cfg: &str) -> Config {
const DUMMY_PATTERNS: &str = r#"
"patterns": {
"zero": {
"regex": "0"
}
}"#;
const DUMMY_FILTERS: &str = r#"
"filters": {
"dummy": {
"regex": ["abc"],
"actions": {
"act": {
"cmd": ["echo", "1"]
}
}
}
}"#;
const DUMMY_STREAMS: &str = r#"
"streams": {
"dummy": {
"cmd": ["true"],
{{FILTERS}}
}
}
"#;
let cfg = cfg
.to_string()
.replace("{{STREAMS}}", DUMMY_STREAMS)
.replace("{{PATTERNS}}", DUMMY_PATTERNS)
.replace("{{FILTERS}}", DUMMY_FILTERS);
serde_json::from_str(&cfg)
.inspect_err(|_| {
eprintln!("{cfg}");
})
.unwrap()
}
#[test]
fn config_without_stream() {
let mut cfg1 = parse_config_json(
r#"{
{{PATTERNS}}
}"#,
);
let mut cfg2 = parse_config_json(
r#"{
{{PATTERNS}},
"streams": {}
}"#,
);
let res = cfg1.setup();
assert!(res.is_err());
let res = cfg2.setup();
assert!(res.is_err());
}
#[test]
fn config_without_pattern() {
let mut cfg1 = parse_config_json(
r#"{
{{STREAMS}}
}"#,
);
let mut cfg2 = parse_config_json(
r#"{
"patterns": {},
{{STREAMS}}
}"#,
);
let res = cfg1.setup();
assert!(res.is_err());
let res = cfg2.setup();
assert!(res.is_err());
}
#[test]
fn merge_config_distinct_patterns() {
let mut cfg_org = parse_config_json(
r#"{
"patterns": {
"ip4": {
"regex": "ip4"
}
},
{{STREAMS}}
}"#,
);
let cfg_oth = parse_config_json(
r#"{
"patterns": {
"ip6": {
"regex": "ip6"
}
}
}"#,
);
cfg_org.merge(cfg_oth).unwrap();
cfg_org.setup().unwrap();
assert!(cfg_org.patterns.contains_key("ip4"));
assert!(cfg_org.patterns.contains_key("ip6"));
assert_eq!(cfg_org.patterns.len(), 2);
assert!(cfg_org.streams.contains_key("dummy"));
assert_eq!(cfg_org.streams.len(), 1);
}
#[test]
fn merge_config_same_patterns() {
let mut cfg_org = parse_config_json(
r#"{
"patterns": {
"zero": {
"regex": "0"
}
},
{{STREAMS}}
}"#,
);
let cfg_oth = parse_config_json(
r#"{
"patterns": {
"zero": {
"regex": "00"
}
}
}"#,
);
let res = cfg_org.merge(cfg_oth);
assert!(res.is_err());
}
#[test]
fn merge_config_distinct_streams() {
let mut cfg_org = parse_config_json(
r#"{
{{PATTERNS}},
"streams": {
"echo1": {
"cmd": ["echo"],
{{FILTERS}}
}
}
}"#,
);
let cfg_oth = parse_config_json(
r#"{
"streams": {
"echo2": {
"cmd": ["echo"],
{{FILTERS}}
}
}
}"#,
);
cfg_org.merge(cfg_oth).unwrap();
cfg_org.setup().unwrap();
assert!(cfg_org.patterns.contains_key("zero"));
assert_eq!(cfg_org.patterns.len(), 1);
assert!(cfg_org.streams.contains_key("echo1"));
assert!(cfg_org.streams.contains_key("echo2"));
assert_eq!(cfg_org.streams.len(), 2);
}
#[test]
fn merge_config_same_streams_distinct_filters() {
let mut cfg_org = parse_config_json(
r#"{
{{PATTERNS}},
"streams": {
"echo": {
"cmd": ["echo"],
"filters": {
"f1": {
"regex": ["abc"],
"actions": {
"act": {
"cmd": ["echo", "1"]
}
}
}
}
}
}
}"#,
);
let cfg_oth = parse_config_json(
r#"{
"streams": {
"echo": {
"cmd": ["echo"],
"filters": {
"f2": {
"regex": ["abc"],
"actions": {
"act": {
"cmd": ["echo", "1"]
}
}
}
}
}
}
}"#,
);
cfg_org.merge(cfg_oth).unwrap();
cfg_org.setup().unwrap();
assert!(cfg_org.streams.contains_key("echo"));
assert_eq!(cfg_org.streams.len(), 1);
let filters = cfg_org.streams.get("echo").unwrap().filters();
assert!(filters.contains_key("f1"));
assert!(filters.contains_key("f2"));
assert_eq!(filters.len(), 2);
}
#[test]
fn merge_config_same_streams_distinct_command() {
let mut cfg_org = parse_config_json(
r#"{
{{PATTERNS}},
"streams": {
"echo": {
"cmd": ["echo"],
{{FILTERS}}
}
}
}"#,
);
let cfg_oth = parse_config_json(
r#"{
"streams": {
"echo": {
"cmd": ["true"]
}
}
}"#,
);
let res = cfg_org.merge(cfg_oth);
assert!(res.is_err());
}
#[test]
fn merge_config_same_streams_command_in_one() {
let mut cfg_org = parse_config_json(
r#"{
{{PATTERNS}},
"streams": {
"echo": {
{{FILTERS}}
}
}
}"#,
);
let cfg_oth = parse_config_json(
r#"{
"streams": {
"echo": {
"cmd": ["echo"]
}
}
}"#,
);
cfg_org.merge(cfg_oth).unwrap();
cfg_org.setup().unwrap();
assert!(cfg_org.streams.contains_key("echo"));
assert_eq!(cfg_org.streams.len(), 1);
let stream = cfg_org.streams.get("echo").unwrap();
assert_eq!(stream.cmd().len(), 1);
assert_eq!(stream.filters().len(), 1);
}
#[test]
fn merge_config_same_streams_same_filters() {
let mut cfg_org = parse_config_json(
r#"{
{{PATTERNS}},
"streams": {
"echo": {
"cmd": ["echo"],
{{FILTERS}}
}
}
}"#,
);
let cfg_oth = parse_config_json(
r#"{
"streams": {
"echo": {
"cmd": ["echo"],
{{FILTERS}}
}
}
}"#,
);
let res = cfg_org.merge(cfg_oth);
assert!(res.is_err());
}
}

View file

@ -8,7 +8,9 @@ use super::{Filter, Patterns};
#[cfg_attr(test, derive(Default))]
#[serde(deny_unknown_fields)]
pub struct Stream {
#[serde(default)]
cmd: Vec<String>,
#[serde(default)]
filters: BTreeMap<String, Filter>,
#[serde(skip)]
@ -32,6 +34,24 @@ impl Stream {
&self.cmd
}
pub fn merge(&mut self, other: Stream) -> Result<(), String> {
if !(self.cmd.is_empty() || other.cmd.is_empty() || self.cmd == other.cmd) {
return Err("Command is already defined".into());
}
if self.cmd.is_empty() {
self.cmd = other.cmd;
}
for (key, filter) in other.filters.into_iter() {
if self.filters.insert(key.clone(), filter).is_some() {
return Err(format!("Filter {} already defined", key));
}
}
Ok(())
}
pub fn setup(&mut self, name: &str, patterns: &Patterns) -> Result<(), String> {
self._setup(name, patterns)
.map_err(|msg| format!("stream {}: {}", name, msg))

View file

@ -31,7 +31,7 @@ pub async fn daemon(
config_path: PathBuf,
socket: PathBuf,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let config: &'static Config = Box::leak(Box::new(Config::from_file(&config_path)?));
let config: &'static Config = Box::leak(Box::new(Config::from_path(&config_path)?.0));
if !config.start() {
return Err("a start command failed, exiting.".into());