From 131d8eced93e6e0cd480b241740c19ff4123d305 Mon Sep 17 00:00:00 2001 From: ppom Date: Mon, 20 May 2024 12:00:00 +0200 Subject: [PATCH] Big client-daemon refacto Now most of the work is done on the client side. Daemon sends its Config, Matches and Actions structures. Client does all data conversions, and filtering. For flush, Client sends to the Daemon individual flush instructions. --- app/client.go | 448 ++++++++++++++++++++++++-------------------- app/daemon.go | 100 ++++------ app/main.go | 11 +- app/persist.go | 8 +- app/pipe.go | 67 +------ app/startup.go | 38 ++-- app/types.go | 109 ++++++++--- config/test.jsonnet | 27 ++- 8 files changed, 431 insertions(+), 377 deletions(-) diff --git a/app/client.go b/app/client.go index 598f0e8..ab553de 100644 --- a/app/client.go +++ b/app/client.go @@ -8,29 +8,29 @@ import ( "net" "os" "regexp" + "slices" "strings" + "time" "framagit.org/ppom/reaction/logger" "sigs.k8s.io/yaml" ) const ( - Show = 0 - Flush = 1 - Config = 2 + Info = 0 + Flush = 1 ) type Request struct { Request int - Stream string - Filter string - Pattern Match + Flush PSF } type Response struct { - Err error - ClientStatus ClientStatus - Config Conf + Err error + // Config Conf + Matches MatchesMap + Actions ActionsMap } func SendAndRetrieve(data Request) Response { @@ -63,13 +63,6 @@ type MapPatternStatusFlush MapPatternStatus type ClientStatus map[string]map[string]MapPatternStatus type ClientStatusFlush ClientStatus -type CompiledPattern struct{ - Name string - Regex string - compiledRegex *regexp.Regexp -} -type CPM map[string]CompiledPattern - func (mps MapPatternStatusFlush) MarshalJSON() ([]byte, error) { for _, v := range mps { return json.Marshal(v) @@ -88,137 +81,156 @@ func (csf ClientStatusFlush) MarshalJSON() ([]byte, error) { return json.Marshal(ret) } -func ClientShow(format, stream, filter string, regex *regexp.Regexp, kvpattern []string) { - response := SendAndRetrieve(Request{Show, stream, filter, ""}) - if response.Err != nil { - logger.Fatalln("Received error from daemon:", response.Err) +func pfMatches(streamName string, filterName string, regex *regexp.Regexp, regexes map[string]*regexp.Regexp, match Match, filter *Filter) bool { + // Check stream and filter match + if streamName != "" && streamName != filter.Stream.Name { + return false } - - // Remove empty structs - for streamName := range response.ClientStatus { - for filterName := range response.ClientStatus[streamName] { - for patternName, patternMap := range response.ClientStatus[streamName][filterName] { - if len(patternMap.Actions) == 0 && patternMap.Matches == 0 { - delete(response.ClientStatus[streamName][filterName], patternName) - } - } - if len(response.ClientStatus[streamName][filterName]) == 0 { - delete(response.ClientStatus[streamName], filterName) - } - } - if len(response.ClientStatus[streamName]) == 0 { - delete(response.ClientStatus, streamName) - } + if filterName != "" && filterName != filter.Name { + return false } - - // Limit to stream, filter if exists - if stream != "" { - exists := false - for streamName := range response.ClientStatus { - if stream == streamName { - if filter != "" { - for filterName := range response.ClientStatus[streamName] { - if filter == filterName { - exists = true - } else { - delete(response.ClientStatus[streamName], filterName) - } - } - } else { - exists = true - } - } else { - delete(response.ClientStatus, streamName) - } - } - if !exists { - logger.Println(logger.WARN, "No matching stream.filter items found. This does not mean it doesn't exist, maybe it just didn't receive any match.") - os.Exit(1) - } - } - - // Limit to pattern + // Check regex match if regex != nil { - for streamName := range response.ClientStatus { - for filterName := range response.ClientStatus[streamName] { - for patterns := range response.ClientStatus[streamName][filterName] { - pmatch := false - for _, p := range patterns.Split() { - if regex.MatchString(p) { - pmatch = true - } - } - if !pmatch { - delete(response.ClientStatus[streamName][filterName], patterns) - } - } - if len(response.ClientStatus[streamName][filterName]) == 0 { - delete(response.ClientStatus[streamName], filterName) - } - } - if len(response.ClientStatus[streamName]) == 0 { - delete(response.ClientStatus, streamName) + var pmatch bool + for _, p := range match.Split() { + if regex.MatchString(p) { + pmatch = true } } + if !pmatch { + return false + } } - - // Limit to kvpatterns - if kvpattern != nil { - // Get pattern indices (as stored in DB) from config - responseConfig := SendAndRetrieve(Request{Config, stream, filter, ""}) - if responseConfig.Err != nil { - logger.Fatalln("Received error from daemon:", responseConfig.Err) - } - // Build map from kvpattern - args := make(CPM) - for _, p := range kvpattern { - // p syntax already checked in Main - a := strings.Split(p, "=") - compiled, err := regexp.Compile(fmt.Sprintf("^%v$", a[1])) - if err != nil { - logger.Fatalf("Bad argument: %v: %v", p, err) - } - args[a[0]] = CompiledPattern{Name: a[0], Regex: a[1], compiledRegex: compiled} - } - - for streamName := range response.ClientStatus { - for filterName := range response.ClientStatus[streamName] { - for patterns := range response.ClientStatus[streamName][filterName] { - pmatch := 0 - for ip, p := range patterns.Split() { - // get pattern name from stream.filter.pattern (which was alphabetically sorted at startup) - if v, found := args[responseConfig.Config.Streams[streamName].Filters[filterName].Pattern[ip].Name]; found { - if v.compiledRegex.Match([]byte(p)) { - pmatch++ - } - } - } - if pmatch != len(kvpattern) { - delete(response.ClientStatus[streamName][filterName], patterns) - } - } - if len(response.ClientStatus[streamName][filterName]) == 0 { - delete(response.ClientStatus[streamName], filterName) + // Check regexes match + if len(regexes) != 0 { + // Check that all user requested patterns are in this filter + var nbMatched int + var localMatches = match.Split() + // For each pattern of this filter + for i, pattern := range filter.Pattern { + // Check that this pattern has user requested name + if reg, ok := regexes[pattern.Name]; ok { + // Check that the PF.p[i] matches user requested pattern + if reg.MatchString(localMatches[i]) { + nbMatched++ } } - if len(response.ClientStatus[streamName]) == 0 { - delete(response.ClientStatus, streamName) - } + } + if len(regexes) != nbMatched { + return false } } + // All checks passed + return true +} +func addMatchToCS(cs ClientStatus, pf PF, times map[time.Time]struct{}) { + patterns, streamName, filterName := pf.P, pf.F.Stream.Name, pf.F.Name + if cs[streamName] == nil { + cs[streamName] = make(map[string]MapPatternStatus) + } + if cs[streamName][filterName] == nil { + cs[streamName][filterName] = make(MapPatternStatus) + } + cs[streamName][filterName][patterns] = &PatternStatus{len(times), nil} +} + +func addActionToCS(cs ClientStatus, pa PA, times map[time.Time]struct{}) { + patterns, streamName, filterName, actionName := pa.P, pa.A.Filter.Stream.Name, pa.A.Filter.Name, pa.A.Name + if cs[streamName] == nil { + cs[streamName] = make(map[string]MapPatternStatus) + } + if cs[streamName][filterName] == nil { + cs[streamName][filterName] = make(MapPatternStatus) + } + if cs[streamName][filterName][patterns] == nil { + cs[streamName][filterName][patterns] = new(PatternStatus) + } + ps := cs[streamName][filterName][patterns] + if ps.Actions == nil { + ps.Actions = make(map[string][]string) + } + for then := range times { + ps.Actions[actionName] = append(ps.Actions[actionName], then.Format(time.DateTime)) + } +} + +func printClientStatus(cs ClientStatus, format string) { var text []byte var err error if format == "json" { - text, err = json.MarshalIndent(response.ClientStatus, "", " ") + text, err = json.MarshalIndent(cs, "", " ") } else { - text, err = yaml.Marshal(response.ClientStatus) + text, err = yaml.Marshal(cs) } if err != nil { logger.Fatalln("Failed to convert daemon binary response to text format:", err) } fmt.Println(strings.ReplaceAll(string(text), "\\0", " ")) +} + +func ClientShow(format, stream, filter string, regex *regexp.Regexp, kvpattern []string) { + response := SendAndRetrieve(Request{Info, PSF{}}) + if response.Err != nil { + logger.Fatalln("Received error from daemon:", response.Err) + } + + cs := make(ClientStatus) + + var regexes map[string]*regexp.Regexp + + if len(kvpattern) != 0 { + // Transform []{"k=v","k=v"} into map["k"]"v" + regexes = make(map[string]*regexp.Regexp) + for _, p := range kvpattern { + // p syntax already checked in Main + key, value, found := strings.Cut(p, "=") + if !found { + logger.Fatalf("Bad argument: no =: %v", p) + } + compiled, err := regexp.Compile(fmt.Sprintf("^%v$", value)) + if err != nil { + logger.Fatalf("Bad argument: %v: %v", p, err) + } + regexes[key] = compiled + } + } + + var found bool + + // Painful data manipulation + for pf, times := range response.Matches { + // Check this PF is not empty + if len(times) == 0 { + continue + } + if !pfMatches(stream, filter, regex, regexes, pf.P, pf.F) { + continue + } + addMatchToCS(cs, pf, times) + found = true + } + + // Painful data manipulation + for pa, times := range response.Actions { + // Check this PF is not empty + if len(times) == 0 { + continue + } + if !pfMatches(stream, filter, regex, regexes, pa.P, pa.A.Filter) { + continue + } + addActionToCS(cs, pa, times) + found = true + } + + if !found { + logger.Println(logger.WARN, "No matching stream.filter items found. This does not mean it doesn't exist, maybe it just didn't receive any match.") + os.Exit(1) + } + + printClientStatus(cs, format) os.Exit(0) } @@ -236,88 +248,126 @@ func ClientShow(format, stream, filter string, regex *regexp.Regexp, kvpattern [ * - "2024-04-30 15:27:28" * */ -func ClientFlush(patterns []string, stream, filter, format string) { - responseConfig := SendAndRetrieve(Request{Config, stream, filter, ""}) - if responseConfig.Err != nil { - logger.Fatalln("Received error from daemon:", responseConfig.Err) +func ClientFlush(patterns []string, streamName, filterName, format string) { + // Either we have one pattern that is not of type name=pattern + requestedPatterns := make(map[string]string) + var requestedPattern string + if len(patterns) == 1 && strings.Index(patterns[0], "=") == -1 { + requestedPattern = patterns[0] + // Or we have one ore multiple patterns that must be of type name=pattern + } else { + for _, kvpattern := range patterns { + k, v, found := strings.Cut(kvpattern, "=") + if !found { + logger.Fatalf("Multiple patterns provided must be of type name=pattern: %v", kvpattern) + } + if requestedPatterns[k] != "" { + logger.Fatalf("Can't provide the same pattern name multiple times: %v", k) + } + requestedPatterns[k] = v + } } - if _, found := responseConfig.Config.Streams[stream].Filters[filter]; filter != "" && found == false { + // Remember which Filters are compatible with the query + filterCompatibility := make(map[SF]bool) + isCompatible := func(filter *Filter) bool { + sf := SF{filter.Stream.Name, filter.Name} + compatible, ok := filterCompatibility[sf] + + // already tested + if ok { + return compatible + } + + for k := range requestedPatterns { + if -1 == slices.IndexFunc(filter.Pattern, func(pattern *Pattern) bool { + return pattern.Name == k + }) { + filterCompatibility[sf] = false + return false + } + } + filterCompatibility[sf] = true + return true + } + + // match functions + kvMatch := func(filter *Filter, filterPatterns []string) bool { + // For each user requested pattern + for k, v := range requestedPatterns { + // Find its index on the Filter.Pattern + for i, pattern := range filter.Pattern { + if k == pattern.Name { + // Test the match + if v != filterPatterns[i] { + return false + } + } + } + } + return true + } + + var found bool + fullMatch := func(filter *Filter, match Match) bool { + // Test if we limit by stream + if streamName == "" || filter.Stream.Name == streamName { + // Test if we limit by filter + if filterName == "" || filter.Name == filterName { + found = true + filterPatterns := match.Split() + // If there is only one pattern + if requestedPattern != "" { + for _, filterPattern := range filterPatterns { + if requestedPattern == filterPattern { + return true + } + } + } else { + return isCompatible(filter) && kvMatch(filter, filterPatterns) + } + } + } + return false + } + + response := SendAndRetrieve(Request{Info, PSF{}}) + if response.Err != nil { + logger.Fatalln("Received error from daemon:", response.Err) + } + + commands := make([]PSF, 0) + + cs := make(ClientStatus) + + for pf, times := range response.Matches { + if fullMatch(pf.F, pf.P) { + commands = append(commands, PSF{pf.P, pf.F.Stream.Name, pf.F.Name}) + addMatchToCS(cs, pf, times) + } + } + + for pa, times := range response.Actions { + if fullMatch(pa.A.Filter, pa.P) { + commands = append(commands, PSF{pa.P, pa.A.Filter.Stream.Name, pa.A.Filter.Name}) + addActionToCS(cs, pa, times) + } + } + + if !found { logger.Println(logger.WARN, "No matching stream.filter items found. This does not mean it doesn't exist, maybe it just didn't receive any match.") os.Exit(1) } - processFilter := func(patterns []string, stream, filter, format string) { - fpqty := len(responseConfig.Config.Streams[stream].Filters[filter].Pattern) - - if len(patterns) > fpqty { - logger.Fatalln("filter have 1 pattern") - } - - for _, kv := range patterns { - if len(strings.Split(kv, "=")) != 2 && fpqty > 1 { - logger.Fatalln("args should be in pattern=value format using more than one pattern in filter") - } - } - - // Transform arg to k=v - if fpqty == 1 && len(strings.Split(patterns[0], "=")) == 1 { - patterns[0] = strings.Join([]string{responseConfig.Config.Streams[stream].Filters[filter].Pattern[0].Name, patterns[0]}, "=") - } - - // arg pattern map - pmap := make(map[string]string) - for _, p := range patterns { - a := strings.Split(p, "=") - pmap[a[0]] = a[1] - } - - // Check every arg pattern exist in filter - for k, _ := range pmap { - found := false - for _, fp := range responseConfig.Config.Streams[stream].Filters[filter].Pattern { - if strings.EqualFold(k, fp.Name) { - found = true - break - } - } - if !found { - logger.Fatalf("pattern <%s> not found in filter %s.%s\n", k, stream, filter) - } - } - - // Order arg patterns before JoinMatch - var orderedPatterns []string - for _, p := range responseConfig.Config.Streams[stream].Filters[filter].Pattern { - if v, found := pmap[p.Name]; found { - orderedPatterns = append(orderedPatterns, v) - } else { - orderedPatterns = append(orderedPatterns, "") - } - } - response := SendAndRetrieve(Request{Flush, stream, filter, JoinMatch(orderedPatterns)}) + for _, psf := range commands { + response := SendAndRetrieve(Request{Flush, psf}) if response.Err != nil { - logger.Fatalln(logger.ERROR, "Received error from daemon:", response.Err) - } - - var text []byte - var err error - if format == "json" { - text, err = json.MarshalIndent(ClientStatusFlush(response.ClientStatus), "", " ") - } else { - text, err = yaml.Marshal(ClientStatusFlush(response.ClientStatus)) - } - if err != nil { - logger.Fatalln("Failed to convert daemon binary response to text format:", err) - } - fmt.Println(string(text)) - } - - for streamName := range responseConfig.Config.Streams { - for filterName := range responseConfig.Config.Streams[streamName].Filters { - processFilter(patterns, streamName, filterName, format) + logger.Fatalln("Received error from daemon:", response.Err) } } + + printClientStatus(cs, format) + os.Exit(0) } func TestRegex(confFilename, regex, line string) { diff --git a/app/daemon.go b/app/daemon.go index b0a3809..7a2ffd0 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -5,7 +5,6 @@ import ( "os" "os/exec" "os/signal" - "regexp" "strings" "sync" "syscall" @@ -91,11 +90,11 @@ func (f *Filter) match(line *string) Match { } } if len(result) == len(f.Pattern) { - logger.Printf(logger.INFO, "%s.%s: match %s", f.stream.name, f.name, WithBrackets(result)) + logger.Printf(logger.INFO, "%s.%s: match %s", f.Stream.Name, f.Name, WithBrackets(result)) return JoinMatch(result) } } else { - logger.Printf(logger.INFO, "%s.%s: match [.]\n", f.stream.name, f.name) + logger.Printf(logger.INFO, "%s.%s: match [.]\n", f.Stream.Name, f.Name) // No pattern, so this match will never actually be used return "." } @@ -115,12 +114,12 @@ func (a *Action) exec(match Match) { var computedCommand []string - if a.filter.Pattern != nil { + if a.Filter.Pattern != nil { computedCommand = make([]string, 0, len(a.Cmd)) matches := match.Split() for _, item := range a.Cmd { - for i, p := range a.filter.Pattern { + for i, p := range a.Filter.Pattern { item = strings.ReplaceAll(item, p.nameWithBraces, matches[i]) } computedCommand = append(computedCommand, item) @@ -129,12 +128,12 @@ func (a *Action) exec(match Match) { computedCommand = a.Cmd } - logger.Printf(logger.INFO, "%s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand) + logger.Printf(logger.INFO, "%s.%s.%s: run %s\n", a.Filter.Stream.Name, a.Filter.Name, a.Name, computedCommand) cmd := exec.Command(computedCommand[0], computedCommand[1:]...) if ret := cmd.Run(); ret != nil { - logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret) + logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.Filter.Stream.Name, a.Filter.Name, a.Name, computedCommand, ret) } } @@ -147,7 +146,7 @@ func ActionsManager(concurrency int) { var pa PA for { pa = <-execActionsC - pa.a.exec(pa.p) + pa.A.exec(pa.P) } }() } @@ -157,7 +156,7 @@ func ActionsManager(concurrency int) { for { pa = <-execActionsC go func(pa PA) { - pa.a.exec(pa.p) + pa.A.exec(pa.P) }(pa) } }() @@ -172,65 +171,49 @@ func ActionsManager(concurrency int) { for { select { case pat := <-actionsC: - pa := PA{pat.p, pat.a} - pattern, action, then := pat.p, pat.a, pat.t + pa := PA{pat.P, pat.A} + pattern, action, then := pat.P, pat.A, pat.T now := time.Now() // check if must be executed now if then.Compare(now) <= 0 { execAction(action, pattern) } else { - actionsLock.Lock() if actions[pa] == nil { actions[pa] = make(map[time.Time]struct{}) } actions[pa][then] = struct{}{} - actionsLock.Unlock() go func(insidePat PAT, insideNow time.Time) { - time.Sleep(insidePat.t.Sub(insideNow)) + time.Sleep(insidePat.T.Sub(insideNow)) pendingActionsC <- insidePat }(pat, now) } case pat := <-pendingActionsC: - pa := PA{pat.p, pat.a} - pattern, action, then := pat.p, pat.a, pat.t - actionsLock.Lock() + pa := PA{pat.P, pat.A} + pattern, action, then := pat.P, pat.A, pat.T if actions[pa] != nil { delete(actions[pa], then) } - actionsLock.Unlock() execAction(action, pattern) case fo := <-flushToActionsC: - ret := make(ActionsMap) - match := 0 - actionsLock.Lock() for pa := range actions { - ppa, pfo := pa.p.Split(), fo.p.Split() - for i, p := range ppa { - if m, err := regexp.MatchString(pfo[i], p); err == nil && m == true { - match++ - } - } - if match == len(ppa) { + if fo.S == pa.A.Filter.Stream.Name && + fo.F == pa.A.Filter.Name && + fo.P == pa.P { for range actions[pa] { - execAction(pa.a, pa.p) + execAction(pa.A, pa.P) } - ret[pa] = actions[pa] delete(actions, pa) + break } - match = 0 } - actionsLock.Unlock() - fo.ret <- ret case _, _ = <-stopActions: - actionsLock.Lock() for pa := range actions { - if pa.a.OnExit { + if pa.A.OnExit { for range actions[pa] { - execAction(pa.a, pa.p) + execAction(pa.A, pa.P) } } } - actionsLock.Unlock() wgActions.Done() return } @@ -238,7 +221,7 @@ func ActionsManager(concurrency int) { } func MatchesManager() { - var fo FlushMatchOrder + var fo PSF var pft PFT end := false @@ -261,7 +244,7 @@ func MatchesManager() { matchesManagerHandleFlush(fo) case pft = <-matchesC: - entry := LogEntry{pft.t, 0, pft.p, pft.f.stream.name, pft.f.name, 0, false} + entry := LogEntry{pft.T, 0, pft.P, pft.F.Stream.Name, pft.F.Name, 0, false} entry.Exec = matchesManagerHandleMatch(pft) @@ -270,37 +253,25 @@ func MatchesManager() { } } -func matchesManagerHandleFlush(fo FlushMatchOrder) { - ret := make(MatchesMap) - match := 0 +func matchesManagerHandleFlush(fo PSF) { matchesLock.Lock() for pf := range matches { - ppf, pfo := pf.p.Split(), fo.p.Split() - for i, p := range ppf { - if m, err := regexp.MatchString(pfo[i], p); err == nil && m == true { - match++ - } - } - if match == len(ppf) { - if fo.ret != nil { - ret[pf] = matches[pf] - } + if fo.S == pf.F.Stream.Name && + fo.F == pf.F.Name && + fo.P == pf.P { delete(matches, pf) + break } - match = 0 } matchesLock.Unlock() - if fo.ret != nil { - fo.ret <- ret - } } func matchesManagerHandleMatch(pft PFT) bool { matchesLock.Lock() defer matchesLock.Unlock() - filter, patterns, then := pft.f, pft.p, pft.t - pf := PF{pft.p, pft.f} + filter, patterns, then := pft.F, pft.P, pft.T + pf := PF{pft.P, pft.F} if filter.Retry > 1 { // make sure map exists @@ -332,7 +303,7 @@ func matchesManagerHandleMatch(pft PFT) bool { func StreamManager(s *Stream, endedSignal chan *Stream) { defer wgStreams.Done() - logger.Printf(logger.INFO, "%s: start %s\n", s.name, s.Cmd) + logger.Printf(logger.INFO, "%s: start %s\n", s.Name, s.Cmd) lines := cmdStdout(s.Cmd) for { @@ -356,7 +327,6 @@ func StreamManager(s *Stream, endedSignal chan *Stream) { var actions ActionsMap var matches MatchesMap -var actionsLock sync.Mutex var matchesLock sync.Mutex var stopStreams chan bool @@ -391,10 +361,10 @@ var logsC chan LogEntry var actionsC chan PAT // SocketManager, DatabaseManager → MatchesManager -var flushToMatchesC chan FlushMatchOrder +var flushToMatchesC chan PSF // SocketManager → ActionsManager -var flushToActionsC chan FlushActionOrder +var flushToActionsC chan PSF // SocketManager → DatabaseManager var flushToDatabaseC chan LogEntry @@ -406,8 +376,8 @@ func Daemon(confFilename string) { matchesC = make(chan PFT) logsC = make(chan LogEntry) actionsC = make(chan PAT) - flushToMatchesC = make(chan FlushMatchOrder) - flushToActionsC = make(chan FlushActionOrder) + flushToMatchesC = make(chan PSF) + flushToActionsC = make(chan PSF) flushToDatabaseC = make(chan LogEntry) stopActions = make(chan bool) stopStreams = make(chan bool) @@ -438,7 +408,7 @@ func Daemon(confFilename string) { for { select { case finishedStream := <-endSignals: - logger.Printf(logger.ERROR, "%s stream finished", finishedStream.name) + logger.Printf(logger.ERROR, "%s stream finished", finishedStream.Name) nbStreamsInExecution-- if nbStreamsInExecution == 0 { quit(conf, false) diff --git a/app/main.go b/app/main.go index da5b499..9ff3dbe 100644 --- a/app/main.go +++ b/app/main.go @@ -106,7 +106,8 @@ func basicUsage() { ` + bold + `reaction flush` + reset + ` TARGET [TARGET...] # remove currently active matches and run currently pending actions for the specified TARGET(s) # (then show flushed matches and actions) - # e.g. reaction flush 192.168.1.1 root + # e.g. reaction flush 192.168.1.1 + # e.g. reaction flush ip=192.168.1.1 user=root # options: -s/--socket SOCKET # path to the client-daemon communication socket @@ -192,6 +193,7 @@ func Main(version, commit string) { var regex *regexp.Regexp var err error if *pattern != "" { + // TODO anchor ^$ the pattern? regex, err = regexp.Compile(*pattern) if err != nil { logger.Fatalln("-p/--pattern: ", err) @@ -223,11 +225,12 @@ func Main(version, commit string) { stream, filter := "", "" if *limit != "" { splitSF := strings.Split(*limit, ".") - if len(splitSF) != 2 { + stream = splitSF[0] + if len(splitSF) == 2 { + filter = splitSF[1] + } else if len(splitSF) > 2 { logger.Fatalln("-l/--limit: only one . separator is supported") } - stream = splitSF[0] - filter = splitSF[1] } ClientFlush(f.Args(), stream, filter, *queryFormat) diff --git a/app/persist.go b/app/persist.go index 034e2a7..78e78a1 100644 --- a/app/persist.go +++ b/app/persist.go @@ -125,7 +125,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E defer func() { for sf, t := range discardedEntries { if t > 0 { - logger.Printf(logger.WARN, "info discarded %v times from the DBs: stream/filter not found: %s.%s\n", t, sf.s, sf.f) + logger.Printf(logger.WARN, "info discarded %v times from the DBs: stream/filter not found: %s.%s\n", t, sf.S, sf.F) } } if malformedEntries > 0 { @@ -186,8 +186,8 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E discardedEntries[SF{"", ""}]++ continue } - entry.Stream = sf.s - entry.Filter = sf.f + entry.Stream = sf.S + entry.Filter = sf.F } if stream := c.Streams[entry.Stream]; stream != nil { filter = stream.Filters[entry.Filter] @@ -231,7 +231,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E // replay executions if entry.Exec && entry.T.Add(*filter.longuestActionDuration).Unix() > now.Unix() { if startup { - flushToMatchesC <- FlushMatchOrder{entry.Pattern, nil} + flushToMatchesC <- PSF{entry.Pattern, entry.Stream, entry.Filter} filter.sendActions(entry.Pattern, entry.T) } diff --git a/app/pipe.go b/app/pipe.go index 7f4f684..0d5b4ac 100644 --- a/app/pipe.go +++ b/app/pipe.go @@ -2,58 +2,15 @@ package app import ( "encoding/gob" + "errors" "net" "os" "path" - "sync" "time" "framagit.org/ppom/reaction/logger" ) -func genClientStatus(local_actions ActionsMap, local_matches MatchesMap, local_actionsLock, local_matchesLock *sync.Mutex) ClientStatus { - cs := make(ClientStatus) - local_matchesLock.Lock() - - // Painful data manipulation - for pf, times := range local_matches { - patterns, filter := pf.p, pf.f - if cs[filter.stream.name] == nil { - cs[filter.stream.name] = make(map[string]MapPatternStatus) - } - if cs[filter.stream.name][filter.name] == nil { - cs[filter.stream.name][filter.name] = make(MapPatternStatus) - } - cs[filter.stream.name][filter.name][patterns] = &PatternStatus{len(times), nil} - } - - local_matchesLock.Unlock() - local_actionsLock.Lock() - - // Painful data manipulation - for pa, times := range local_actions { - patterns, action := pa.p, pa.a - if cs[action.filter.stream.name] == nil { - cs[action.filter.stream.name] = make(map[string]MapPatternStatus) - } - if cs[action.filter.stream.name][action.filter.name] == nil { - cs[action.filter.stream.name][action.filter.name] = make(MapPatternStatus) - } - if cs[action.filter.stream.name][action.filter.name][patterns] == nil { - cs[action.filter.stream.name][action.filter.name][patterns] = new(PatternStatus) - } - ps := cs[action.filter.stream.name][action.filter.name][patterns] - if ps.Actions == nil { - ps.Actions = make(map[string][]string) - } - for then := range times { - ps.Actions[action.name] = append(ps.Actions[action.name], then.Format(time.DateTime)) - } - } - local_actionsLock.Unlock() - return cs -} - func createOpenSocket() net.Listener { err := os.MkdirAll(path.Dir(*SocketPath), 0755) if err != nil { @@ -97,24 +54,20 @@ func SocketManager(conf *Conf) { } switch request.Request { - case Show: - response.ClientStatus = genClientStatus(actions, matches, &actionsLock, &matchesLock) + case Info: + // response.Config = *conf + response.Matches = matches + response.Actions = actions case Flush: - le := LogEntry{time.Now(), 0, request.Pattern, request.Stream, request.Filter, 0, false} + le := LogEntry{time.Now(), 0, request.Flush.P, request.Flush.S, request.Flush.F, 0, false} - matchesC := FlushMatchOrder{request.Pattern, make(chan MatchesMap)} - actionsC := FlushActionOrder{request.Pattern, make(chan ActionsMap)} - - flushToMatchesC <- matchesC - flushToActionsC <- actionsC + flushToMatchesC <- request.Flush + flushToActionsC <- request.Flush flushToDatabaseC <- le - var lock sync.Mutex - response.ClientStatus = genClientStatus(<-actionsC.ret, <-matchesC.ret, &lock, &lock) - case Config: - response.Config = *conf default: - logger.Println(logger.ERROR, "Invalid Message from cli: unrecognised Request type") + logger.Println(logger.ERROR, "Invalid Message from cli: unrecognised command type") + response.Err = errors.New("unrecognised command type") return } diff --git a/app/startup.go b/app/startup.go index d414b46..07faab6 100644 --- a/app/startup.go +++ b/app/startup.go @@ -65,39 +65,39 @@ func (c *Conf) setup() { for streamName := range c.Streams { stream := c.Streams[streamName] - stream.name = streamName + stream.Name = streamName - if strings.Contains(stream.name, ".") { - logger.Fatalf("Bad configuration: character '.' is not allowed in stream names: '%v'", stream.name) + if strings.Contains(stream.Name, ".") { + logger.Fatalf("Bad configuration: character '.' is not allowed in stream names: '%v'", stream.Name) } if len(stream.Filters) == 0 { - logger.Fatalf("Bad configuration: no filters configured in %v", stream.name) + logger.Fatalf("Bad configuration: no filters configured in %v", stream.Name) } for filterName := range stream.Filters { filter := stream.Filters[filterName] - filter.stream = stream - filter.name = filterName + filter.Stream = stream + filter.Name = filterName - if strings.Contains(filter.name, ".") { - logger.Fatalf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.name) + if strings.Contains(filter.Name, ".") { + logger.Fatalf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.Name) } // Parse Duration if filter.RetryPeriod == "" { if filter.Retry > 1 { - logger.Fatalf("Bad configuration: retry but no retryperiod in %v.%v", stream.name, filter.name) + logger.Fatalf("Bad configuration: retry but no retryperiod in %v.%v", stream.Name, filter.Name) } } else { retryDuration, err := time.ParseDuration(filter.RetryPeriod) if err != nil { - logger.Fatalf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.name, filter.name, err) + logger.Fatalf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.Name, filter.Name, err) } filter.retryDuration = retryDuration } if len(filter.Regex) == 0 { - logger.Fatalf("Bad configuration: no regexes configured in %v.%v", stream.name, filter.name) + logger.Fatalf("Bad configuration: no regexes configured in %v.%v", stream.Name, filter.Name) } // Compute Regexes // Look for Patterns inside Regexes @@ -114,32 +114,32 @@ func (c *Conf) setup() { } compiledRegex, err := regexp.Compile(regex) if err != nil { - logger.Fatalf("Bad configuration: regex of filter %s.%s: %v", stream.name, filter.name, err) + logger.Fatalf("Bad configuration: regex of filter %s.%s: %v", stream.Name, filter.Name, err) } filter.compiledRegex = append(filter.compiledRegex, *compiledRegex) } if len(filter.Actions) == 0 { - logger.Fatalln("Bad configuration: no actions configured in", stream.name, ".", filter.name) + logger.Fatalln("Bad configuration: no actions configured in", stream.Name, ".", filter.Name) } for actionName := range filter.Actions { action := filter.Actions[actionName] - action.filter = filter - action.name = actionName + action.Filter = filter + action.Name = actionName - if strings.Contains(action.name, ".") { - logger.Fatalln("Bad configuration: character '.' is not allowed in action names", action.name) + if strings.Contains(action.Name, ".") { + logger.Fatalln("Bad configuration: character '.' is not allowed in action names", action.Name) } // Parse Duration if action.After != "" { afterDuration, err := time.ParseDuration(action.After) if err != nil { - logger.Fatalln("Bad configuration: Failed to parse after time in ", stream.name, ".", filter.name, ".", action.name, ":", err) + logger.Fatalln("Bad configuration: Failed to parse after time in ", stream.Name, ".", filter.Name, ".", action.Name, ":", err) } action.afterDuration = afterDuration } else if action.OnExit { - logger.Fatalln("Bad configuration: Cannot have `onexit: true` without an `after` directive in", stream.name, ".", filter.name, ".", action.name) + logger.Fatalln("Bad configuration: Cannot have `onexit: true` without an `after` directive in", stream.Name, ".", filter.Name, ".", action.Name) } if filter.longuestActionDuration == nil || filter.longuestActionDuration.Milliseconds() < action.afterDuration.Milliseconds() { filter.longuestActionDuration = &action.afterDuration diff --git a/app/types.go b/app/types.go index ad455ac..b49b075 100644 --- a/app/types.go +++ b/app/types.go @@ -1,6 +1,7 @@ package app import ( + "bytes" "encoding/gob" "fmt" "os" @@ -32,15 +33,33 @@ type Pattern struct { // They're always referenced through pointers type Stream struct { - name string `json:"-"` + Name string `json:"-"` Cmd []string `json:"cmd"` Filters map[string]*Filter `json:"filters"` } +type LilStream struct { + Name string +} + +func (s *Stream) GobEncode() ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(LilStream{s.Name}) + return buf.Bytes(), err +} + +func (s *Stream) GobDecode(b []byte)(error) { + var ls LilStream + dec := gob.NewDecoder(bytes.NewReader(b)) + err := dec.Decode(&ls) + s.Name = ls.Name + return err +} type Filter struct { - stream *Stream `json:"-"` - name string `json:"-"` + Stream *Stream `json:"-"` + Name string `json:"-"` Regex []string `json:"regex"` compiledRegex []regexp.Regexp `json:"-"` @@ -54,9 +73,34 @@ type Filter struct { longuestActionDuration *time.Duration } +// those small versions are needed to prevent infinite recursion in gob because of +// data cycles: Stream <-> Filter, Filter <-> Action +type LilFilter struct { + Stream *Stream + Name string + Pattern []*Pattern +} + +func (f *Filter) GobDecode(b []byte)(error) { + var lf LilFilter + dec := gob.NewDecoder(bytes.NewReader(b)) + err := dec.Decode(&lf) + f.Stream = lf.Stream + f.Name = lf.Name + f.Pattern = lf.Pattern + return err +} + +func (f *Filter) GobEncode() ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(LilFilter{f.Stream, f.Name, f.Pattern}) + return buf.Bytes(), err +} + type Action struct { - filter *Filter `json:"-"` - name string `json:"-"` + Filter *Filter `json:"-"` + Name string `json:"-"` Cmd []string `json:"cmd"` @@ -65,6 +109,26 @@ type Action struct { OnExit bool `json:"onexit"` } +type LilAction struct { + Filter *Filter + Name string +} + +func (a *Action) GobEncode() ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(LilAction{a.Filter, a.Name}) + return buf.Bytes(), err +} + +func (a *Action) GobDecode(b []byte)(error) { + var la LilAction + dec := gob.NewDecoder(bytes.NewReader(b)) + err := dec.Decode(&la) + a.Filter = la.Filter + a.Name = la.Name + return err +} type LogEntry struct { T time.Time @@ -108,38 +172,29 @@ func WithBrackets(mm []string) string { // Helper structs made to carry information // Stream, Filter -type SF struct{ s, f string } +type SF struct{ S, F string } // Pattern, Stream, Filter type PSF struct { - p Match - s, f string + P Match + S, F string } type PF struct { - p Match - f *Filter + P Match + F *Filter } type PFT struct { - p Match - f *Filter - t time.Time + P Match + F *Filter + T time.Time } type PA struct { - p Match - a *Action + P Match + A *Action } type PAT struct { - p Match - a *Action - t time.Time -} - -type FlushMatchOrder struct { - p Match - ret chan MatchesMap -} -type FlushActionOrder struct { - p Match - ret chan ActionsMap + P Match + A *Action + T time.Time } diff --git a/config/test.jsonnet b/config/test.jsonnet index 503eb63..a4ec77f 100644 --- a/config/test.jsonnet +++ b/config/test.jsonnet @@ -14,7 +14,7 @@ streams: { tailDown1: { - cmd: ['sh', '-c', "echo 1_a 2_a 3_a a_1 a_2 a_3 | tr ' ' '\n' | while read i; do sleep 1; echo found $i; done"], + cmd: ['sh', '-c', "echo 1_a 2_a 3_a a_1 a_2 a_3 | tr ' ' '\n' | while read i; do sleep 1; echo found $i; done; sleep 30"], filters: { findIP: { regex: [ @@ -29,7 +29,30 @@ }, undamn: { cmd: ['echo', 'undamn', ''], - after: '4s', + after: '28s', + onexit: true, + }, + }, + }, + }, + }, + tailDown2: { + cmd: ['sh', '-c', "echo 1_a 2_a 3_a a_1 a_2 a_3 | tr ' ' '\n' | while read i; do sleep 1; echo found $i; done; sleep 30"], + filters: { + findIP: { + regex: [ + '^found _$', + '^found _$', + ], + retry: 2, + retryperiod: '30s', + actions: { + damn: { + cmd: ['echo', ''], + }, + undamn: { + cmd: ['echo', 'undamn', ''], + after: '28s', onexit: true, }, },