diff --git a/rust/src/config.rs b/rust/src/config.rs index 66b40a1..17e07e0 100644 --- a/rust/src/config.rs +++ b/rust/src/config.rs @@ -1,9 +1,8 @@ -#![allow(dead_code)] - +use std::collections::BTreeMap; use std::fs::File; use std::path::PathBuf; +use std::process::Command; use std::process::Stdio; -use std::{collections::BTreeMap, process::Command}; use anyhow::{anyhow, Context, Result}; use log::{error, info}; @@ -18,7 +17,7 @@ pub type Patterns = BTreeMap; pub struct Config { patterns: Patterns, - pub streams: BTreeMap, + streams: BTreeMap, #[serde(default = "num_cpus::get")] concurrency: usize, @@ -30,6 +29,10 @@ pub struct Config { } impl Config { + pub fn streams(&self) -> &BTreeMap { + &self.streams + } + pub fn setup(&mut self) -> Result<()> { self._setup() .or_else(|msg| Err(anyhow!("Bad configuration: {}", msg))) diff --git a/rust/src/daemon.rs b/rust/src/daemon.rs index 138c9be..3387e55 100644 --- a/rust/src/daemon.rs +++ b/rust/src/daemon.rs @@ -1,10 +1,11 @@ +use std::path::PathBuf; use std::process::exit; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::sync_channel; +use std::sync::mpsc::{channel, sync_channel}; +use std::sync::Arc; use std::thread; -use std::{path::PathBuf, sync::Arc}; -use log::{debug, error, Level, info}; +use log::{debug, error, info, Level}; use crate::{config, logger}; @@ -29,16 +30,20 @@ pub fn daemon(config_path: &PathBuf, loglevel: Level, socket: &PathBuf) { exit(1); } + // TODO match manager + let (match_tx, match_rx) = channel(); + let mut stream_process_child_handles = Vec::new(); let mut stream_thread_handles = Vec::new(); - for (_, stream) in &config.streams { + for (_, stream) in config.streams() { let stream = stream.clone(); - let (tx, rx) = sync_channel(0); + let match_tx = match_tx.clone(); + let (child_tx, child_rx) = sync_channel(0); - stream_thread_handles.push(thread::spawn(move || stream.manager(tx))); + stream_thread_handles.push(thread::spawn(move || stream.manager(child_tx, match_tx))); - if let Ok(Some(child)) = rx.recv() { + if let Ok(Some(child)) = child_rx.recv() { stream_process_child_handles.push(child); } } diff --git a/rust/src/filter.rs b/rust/src/filter.rs index ab47668..3122ff1 100644 --- a/rust/src/filter.rs +++ b/rust/src/filter.rs @@ -1,12 +1,14 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - time::Duration, -}; +use std::collections::{BTreeMap, BTreeSet}; +use std::time::Duration; +use log::info; use regex::Regex; use serde::Deserialize; -use crate::{action::Action, config::Patterns, parse_duration::parse_duration, pattern::Pattern}; +use crate::{ + action::Action, config::Patterns, messages::Match, parse_duration::parse_duration, + pattern::Pattern, +}; #[derive(Clone, Debug, Deserialize)] #[serde(deny_unknown_fields)] @@ -81,6 +83,7 @@ impl Filter { return Err("no regex configured".into()); } + let mut first = true; for regex in &self.regex { let mut regex_buf = regex.clone(); for (_pattern_name, pattern) in patterns { @@ -91,12 +94,25 @@ impl Filter { &pattern.name_with_braces )); } - self.patterns.insert(pattern.clone()); + if first { + self.patterns.insert(pattern.clone()); + } else if !self.patterns.contains(&pattern) { + return Err(format!( + "pattern {} is not present in the first regex but is present in a following regex. all regexes should contain the same set of regexes", + &pattern.name_with_braces + )); + } + } else if !first && self.patterns.contains(&pattern) { + return Err(format!( + "pattern {} is present in the first regex but is not present in a following regex. all regexes should contain the same set of regexes", + &pattern.name_with_braces + )); } - regex_buf = regex_buf.replacen(&pattern.name_with_braces, &pattern.regex, 1) + regex_buf = regex_buf.replacen(&pattern.name_with_braces, &pattern.regex, 1); } let compiled = Regex::new(®ex_buf).or_else(|err| Err(err.to_string()))?; self.compiled_regex.push(compiled); + first = false; } self.regex.clear(); @@ -117,13 +133,39 @@ impl Filter { Ok(()) } + + pub fn get_match(&self, line: &String) -> Option { + for regex in &self.compiled_regex { + if let Some(matches) = regex.captures(line) { + if self.patterns.len() > 0 { + let mut result = Match::new(); + for pattern in &self.patterns { + let match_ = matches.name(&pattern.name).expect("logic error"); + if pattern.not_an_ignore(match_.as_str()) { + result.push(match_.as_str().to_string()); + } + } + if result.len() == self.patterns.len() { + info!("{}.{}: match {:?}", self.stream_name, self.name, result); + return Some(result); + } + } else { + info!("{}.{}: match [.]", self.stream_name, self.name); + return Some(vec![".".to_string()]); + } + } + } + None + } } #[cfg(test)] pub mod tests { use crate::action::tests::ok_action; - use crate::pattern::tests::default_pattern; + use crate::pattern::tests::{ + boubou_pattern_with_ignore, default_pattern, ok_pattern_with_ignore, + }; use super::*; @@ -151,7 +193,7 @@ pub mod tests { } #[test] - fn missing_config() { + fn setup_missing_config() { let mut filter; let name = "name".to_string(); let empty_patterns = Patterns::new(); @@ -172,7 +214,7 @@ pub mod tests { } #[test] - fn retry() { + fn setup_retry() { let mut filter; let name = "name".to_string(); let empty_patterns = Patterns::new(); @@ -201,7 +243,7 @@ pub mod tests { } #[test] - fn longuest_action_duration() { + fn setup_longuest_action_duration() { let mut filter; let name = "name".to_string(); let empty_patterns = Patterns::new(); @@ -239,7 +281,7 @@ pub mod tests { } #[test] - fn regexes() { + fn setup_regexes() { let name = "name".to_string(); let mut filter; @@ -258,12 +300,11 @@ pub mod tests { patterns.insert(unused_name.clone(), unused_pattern.clone()); let boubou_name = "boubou".to_string(); - let mut boubou = default_pattern(); - boubou.regex = "(?:bou){2}".to_string(); - assert!(boubou.setup(&boubou_name).is_ok()); + let mut boubou = boubou_pattern_with_ignore(); + boubou.setup(&boubou_name).unwrap(); patterns.insert(boubou_name.clone(), boubou.clone()); - // TODO correct regex replacement + // correct regex replacement filter = default_filter(); filter.actions.insert(name.clone(), ok_action()); filter.regex.push("insert here$".to_string()); @@ -278,7 +319,7 @@ pub mod tests { let stored_pattern = filter.patterns.first().unwrap(); assert_eq!(stored_pattern.regex, pattern.regex); - // TODO same pattern two times in regex + // same pattern two times in regex filter = default_filter(); filter.actions.insert(name.clone(), ok_action()); filter @@ -286,7 +327,7 @@ pub mod tests { .push("there are two s!".to_string()); assert!(filter.setup(&name, &name, &patterns).is_err()); - // TODO two patterns in one regex + // two patterns in one regex filter = default_filter(); filter.actions.insert(name.clone(), ok_action()); filter @@ -295,7 +336,7 @@ pub mod tests { assert!(filter.setup(&name, &name, &patterns).is_ok()); assert_eq!( filter.compiled_regex[0].to_string(), - Regex::new("insert (?P[abc]) here and (?P(?:bou){2}) there") + Regex::new("insert (?P[abc]) here and (?P(?:bou){1,3}) there") .unwrap() .to_string() ); @@ -305,7 +346,7 @@ pub mod tests { let stored_pattern = filter.patterns.last().unwrap(); assert_eq!(stored_pattern.regex, pattern.regex); - // TODO multiple regexes with same pattern + // multiple regexes with same pattern filter = default_filter(); filter.actions.insert(name.clone(), ok_action()); filter.regex.push("insert here".to_string()); @@ -327,7 +368,7 @@ pub mod tests { let stored_pattern = filter.patterns.first().unwrap(); assert_eq!(stored_pattern.regex, pattern.regex); - // TODO multiple regexes with same patterns + // multiple regexes with same patterns filter = default_filter(); filter.actions.insert(name.clone(), ok_action()); filter @@ -339,13 +380,13 @@ pub mod tests { assert!(filter.setup(&name, &name, &patterns).is_ok()); assert_eq!( filter.compiled_regex[0].to_string(), - Regex::new("insert (?P[abc]) here and (?P(?:bou){2}) there") + Regex::new("insert (?P[abc]) here and (?P(?:bou){1,3}) there") .unwrap() .to_string() ); assert_eq!( filter.compiled_regex[1].to_string(), - Regex::new("also add (?P(?:bou){2}) here and (?P[abc]) there") + Regex::new("also add (?P(?:bou){1,3}) here and (?P[abc]) there") .unwrap() .to_string() ); @@ -355,28 +396,132 @@ pub mod tests { let stored_pattern = filter.patterns.last().unwrap(); assert_eq!(stored_pattern.regex, pattern.regex); - // TODO multiple regexes with different patterns + // multiple regexes with different patterns 1 filter = default_filter(); filter.actions.insert(name.clone(), ok_action()); filter.regex.push("insert here".to_string()); filter.regex.push("also add there".to_string()); - assert!(filter.setup(&name, &name, &patterns).is_ok()); + assert!(filter.setup(&name, &name, &patterns).is_err()); + + // multiple regexes with different patterns 2 + filter = default_filter(); + filter.actions.insert(name.clone(), ok_action()); + filter + .regex + .push("insert here and there".to_string()); + filter.regex.push("also add there".to_string()); + assert!(filter.setup(&name, &name, &patterns).is_err()); + + // multiple regexes with different patterns 3 + filter = default_filter(); + filter.actions.insert(name.clone(), ok_action()); + filter.regex.push("also add there".to_string()); + filter + .regex + .push("insert here and there".to_string()); + assert!(filter.setup(&name, &name, &patterns).is_err()); + } + + #[test] + fn get_match() { + let name = "name".to_string(); + let mut filter; + + // make a Patterns + let mut patterns = Patterns::new(); + + let mut pattern = ok_pattern_with_ignore(); + pattern.setup(&name).unwrap(); + patterns.insert(name.clone(), pattern.clone()); + + let boubou_name = "boubou".to_string(); + let mut boubou = boubou_pattern_with_ignore(); + boubou.setup(&boubou_name).unwrap(); + patterns.insert(boubou_name.clone(), boubou.clone()); + + // one simple regex + filter = default_filter(); + filter.actions.insert(name.clone(), ok_action()); + filter.regex.push("insert here$".to_string()); + filter.setup(&name, &name, &patterns).unwrap(); assert_eq!( - filter.compiled_regex[0].to_string(), - Regex::new("insert (?P[abc]) here") - .unwrap() - .to_string() + filter.get_match(&"insert b here".into()), + Some(vec!("b".into())) + ); + assert_eq!(filter.get_match(&"insert a here".into()), None); + assert_eq!(filter.get_match(&"youpi b youpi".into()), None); + assert_eq!(filter.get_match(&"insert here".into()), None); + + // two patterns in one regex + filter = default_filter(); + filter.actions.insert(name.clone(), ok_action()); + filter + .regex + .push("insert here and there".to_string()); + filter.setup(&name, &name, &patterns).unwrap(); + assert_eq!( + filter.get_match(&"insert b here and bouboubou there".into()), + Some(vec!("bouboubou".into(), "b".into())) ); assert_eq!( - filter.compiled_regex[1].to_string(), - Regex::new("also add (?P(?:bou){2}) there") - .unwrap() - .to_string() + filter.get_match(&"insert a here and bouboubou there".into()), + None + ); + assert_eq!( + filter.get_match(&"insert b here and boubou there".into()), + None + ); + + // multiple regexes with same pattern + filter = default_filter(); + filter.actions.insert(name.clone(), ok_action()); + filter.regex.push("insert here".to_string()); + filter.regex.push("also add there".to_string()); + filter.setup(&name, &name, &patterns).unwrap(); + assert_eq!(filter.get_match(&"insert a here".into()), None); + assert_eq!( + filter.get_match(&"insert b here".into()), + Some(vec!("b".into())) + ); + assert_eq!(filter.get_match(&"also add a there".into()), None); + assert_eq!( + filter.get_match(&"also add b there".into()), + Some(vec!("b".into())) + ); + + // multiple regexes with same patterns + filter = default_filter(); + filter.actions.insert(name.clone(), ok_action()); + filter + .regex + .push("insert here and there".to_string()); + filter + .regex + .push("also add here and there".to_string()); + filter.setup(&name, &name, &patterns).unwrap(); + assert_eq!( + filter.get_match(&"insert b here and bouboubou there".into()), + Some(vec!("bouboubou".into(), "b".into())) + ); + assert_eq!( + filter.get_match(&"also add bouboubou here and b there".into()), + Some(vec!("bouboubou".into(), "b".into())) + ); + assert_eq!( + filter.get_match(&"insert a here and bouboubou there".into()), + None + ); + assert_eq!( + filter.get_match(&"also add bouboubou here and a there".into()), + None + ); + assert_eq!( + filter.get_match(&"insert b here and boubou there".into()), + None + ); + assert_eq!( + filter.get_match(&"also add boubou here and b there".into()), + None ); - assert_eq!(filter.patterns.len(), 2); - let stored_pattern = filter.patterns.first().unwrap(); - assert_eq!(stored_pattern.regex, boubou.regex); - let stored_pattern = filter.patterns.last().unwrap(); - assert_eq!(stored_pattern.regex, pattern.regex); } } diff --git a/rust/src/main.rs b/rust/src/main.rs index deb8333..60c82f8 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -13,13 +13,14 @@ use clap::Parser; use regex::Regex; mod action; +mod config; mod filter; +mod messages; mod pattern; mod stream; mod cli; mod client; -mod config; mod daemon; mod logger; mod parse_duration; diff --git a/rust/src/messages.rs b/rust/src/messages.rs new file mode 100644 index 0000000..f3dd567 --- /dev/null +++ b/rust/src/messages.rs @@ -0,0 +1,11 @@ +use std::time::SystemTime; + +use crate::filter::Filter; + +pub type Match = Vec; + +pub struct PFT { + pub m: Match, + pub f: Filter, + pub t: SystemTime, +} diff --git a/rust/src/pattern.rs b/rust/src/pattern.rs index 8e449f5..77ba13f 100644 --- a/rust/src/pattern.rs +++ b/rust/src/pattern.rs @@ -18,7 +18,7 @@ pub struct Pattern { compiled_ignore_regex: Vec, #[serde(skip)] - name: String, + pub name: String, #[serde(skip)] pub name_with_braces: String, } @@ -72,6 +72,20 @@ impl Pattern { Ok(()) } + + pub fn not_an_ignore(&self, match_: &str) -> bool { + for regex in &self.compiled_ignore_regex { + if regex.is_match(match_) { + return false; + } + } + for ignore in &self.ignore { + if ignore == match_ { + return false; + } + } + true + } } // This is required to be added to a BTreeSet @@ -116,8 +130,21 @@ pub mod tests { pattern } + pub fn ok_pattern_with_ignore() -> Pattern { + let mut pattern = ok_pattern(); + pattern.ignore.push("a".into()); + pattern + } + + pub fn boubou_pattern_with_ignore() -> Pattern { + let mut pattern = ok_pattern(); + pattern.regex = "(?:bou){1,3}".to_string(); + pattern.ignore.push("boubou".into()); + pattern + } + #[test] - fn missing_information() { + fn setup_missing_information() { let mut pattern; // Empty name @@ -136,7 +163,7 @@ pub mod tests { } #[test] - fn regex() { + fn setup_regex() { let mut pattern; // regex ok @@ -155,7 +182,7 @@ pub mod tests { } #[test] - fn ignore() { + fn setup_ignore() { let mut pattern; // ignore ok @@ -173,7 +200,7 @@ pub mod tests { } #[test] - fn ignore_regex() { + fn setup_ignore_regex() { let mut pattern; // ignore_regex ok @@ -189,4 +216,27 @@ pub mod tests { pattern.ignore.push("[a".into()); assert!(pattern.setup(&"name".into()).is_err()); } + + #[test] + fn not_an_ignore() { + let mut pattern; + + // ignore ok + pattern = default_pattern(); + pattern.regex = "[abcdefg]".into(); + pattern.ignore.push("a".into()); + pattern.ignore.push("b".into()); + pattern.ignore_regex.push("c".into()); + pattern.ignore_regex.push("[de]".into()); + + pattern.setup(&"name".into()).unwrap(); + assert_eq!(pattern.not_an_ignore("a"), false); + assert_eq!(pattern.not_an_ignore("b"), false); + assert_eq!(pattern.not_an_ignore("c"), false); + assert_eq!(pattern.not_an_ignore("d"), false); + assert_eq!(pattern.not_an_ignore("e"), false); + assert_eq!(pattern.not_an_ignore("f"), true); + assert_eq!(pattern.not_an_ignore("g"), true); + assert_eq!(pattern.not_an_ignore("h"), true); + } } diff --git a/rust/src/stream.rs b/rust/src/stream.rs index cad0172..489dcba 100644 --- a/rust/src/stream.rs +++ b/rust/src/stream.rs @@ -1,14 +1,13 @@ -use std::{ - collections::BTreeMap, - io::{BufRead, BufReader}, - process::{Child, Command, Stdio}, - sync::mpsc, -}; +use std::collections::BTreeMap; +use std::io::{BufRead, BufReader}; +use std::process::{Child, Command, Stdio}; +use std::sync::mpsc::{Sender, SyncSender}; +use std::time::SystemTime; use log::{debug, error, info}; use serde::Deserialize; -use crate::{config::Patterns, filter::Filter}; +use crate::{config::Patterns, filter::Filter, messages::PFT}; #[derive(Clone, Debug, Deserialize)] #[serde(deny_unknown_fields)] @@ -54,7 +53,7 @@ impl Stream { Ok(()) } - pub fn manager(&self, childs_channel: mpsc::SyncSender>) { + pub fn manager(&self, child_tx: SyncSender>, match_tx: Sender) { info!("{}: start {:?}", self.name, self.cmd); let mut child = match Command::new(&self.cmd[0]) .args(&self.cmd[1..]) @@ -66,15 +65,17 @@ impl Stream { Ok(child) => child, Err(err) => { error!("could not execute stream {} cmd: {}", self.name, err); - let _ = childs_channel.send(None); + let _ = child_tx.send(None); return; } }; + // keep stdout before sending/moving child to the main thread let mut stdout = BufReader::new(child.stdout.take().unwrap()); - // let main handle the child processus - let _ = childs_channel.send(Some(child)); + // let main handle the child process + let _ = child_tx.send(Some(child)); + drop(child_tx); let mut line: String = "".into(); while let Ok(nb_chars) = stdout.read_line(&mut line) { @@ -83,7 +84,18 @@ impl Stream { } debug!("stream {} stdout: {}", self.name, line); - // TODO apply each filter on the line + for (_, filter) in &self.filters { + if let Some(match_) = filter.get_match(&line) { + match_tx + .send(PFT { + m: match_, + // FIXME this clone is a lot :'( + f: filter.clone(), + t: SystemTime::now(), + }) + .unwrap(); + } + } line.clear(); }