diff --git a/app/client.go b/app/client.go index 598f0e8..ab553de 100644 --- a/app/client.go +++ b/app/client.go @@ -8,29 +8,29 @@ import ( "net" "os" "regexp" + "slices" "strings" + "time" "framagit.org/ppom/reaction/logger" "sigs.k8s.io/yaml" ) const ( - Show = 0 - Flush = 1 - Config = 2 + Info = 0 + Flush = 1 ) type Request struct { Request int - Stream string - Filter string - Pattern Match + Flush PSF } type Response struct { - Err error - ClientStatus ClientStatus - Config Conf + Err error + // Config Conf + Matches MatchesMap + Actions ActionsMap } func SendAndRetrieve(data Request) Response { @@ -63,13 +63,6 @@ type MapPatternStatusFlush MapPatternStatus type ClientStatus map[string]map[string]MapPatternStatus type ClientStatusFlush ClientStatus -type CompiledPattern struct{ - Name string - Regex string - compiledRegex *regexp.Regexp -} -type CPM map[string]CompiledPattern - func (mps MapPatternStatusFlush) MarshalJSON() ([]byte, error) { for _, v := range mps { return json.Marshal(v) @@ -88,137 +81,156 @@ func (csf ClientStatusFlush) MarshalJSON() ([]byte, error) { return json.Marshal(ret) } -func ClientShow(format, stream, filter string, regex *regexp.Regexp, kvpattern []string) { - response := SendAndRetrieve(Request{Show, stream, filter, ""}) - if response.Err != nil { - logger.Fatalln("Received error from daemon:", response.Err) +func pfMatches(streamName string, filterName string, regex *regexp.Regexp, regexes map[string]*regexp.Regexp, match Match, filter *Filter) bool { + // Check stream and filter match + if streamName != "" && streamName != filter.Stream.Name { + return false } - - // Remove empty structs - for streamName := range response.ClientStatus { - for filterName := range response.ClientStatus[streamName] { - for patternName, patternMap := range response.ClientStatus[streamName][filterName] { - if len(patternMap.Actions) == 0 && patternMap.Matches == 0 { - delete(response.ClientStatus[streamName][filterName], patternName) - } - } - if len(response.ClientStatus[streamName][filterName]) == 0 { - delete(response.ClientStatus[streamName], filterName) - } - } - if len(response.ClientStatus[streamName]) == 0 { - delete(response.ClientStatus, streamName) - } + if filterName != "" && filterName != filter.Name { + return false } - - // Limit to stream, filter if exists - if stream != "" { - exists := false - for streamName := range response.ClientStatus { - if stream == streamName { - if filter != "" { - for filterName := range response.ClientStatus[streamName] { - if filter == filterName { - exists = true - } else { - delete(response.ClientStatus[streamName], filterName) - } - } - } else { - exists = true - } - } else { - delete(response.ClientStatus, streamName) - } - } - if !exists { - logger.Println(logger.WARN, "No matching stream.filter items found. This does not mean it doesn't exist, maybe it just didn't receive any match.") - os.Exit(1) - } - } - - // Limit to pattern + // Check regex match if regex != nil { - for streamName := range response.ClientStatus { - for filterName := range response.ClientStatus[streamName] { - for patterns := range response.ClientStatus[streamName][filterName] { - pmatch := false - for _, p := range patterns.Split() { - if regex.MatchString(p) { - pmatch = true - } - } - if !pmatch { - delete(response.ClientStatus[streamName][filterName], patterns) - } - } - if len(response.ClientStatus[streamName][filterName]) == 0 { - delete(response.ClientStatus[streamName], filterName) - } - } - if len(response.ClientStatus[streamName]) == 0 { - delete(response.ClientStatus, streamName) + var pmatch bool + for _, p := range match.Split() { + if regex.MatchString(p) { + pmatch = true } } + if !pmatch { + return false + } } - - // Limit to kvpatterns - if kvpattern != nil { - // Get pattern indices (as stored in DB) from config - responseConfig := SendAndRetrieve(Request{Config, stream, filter, ""}) - if responseConfig.Err != nil { - logger.Fatalln("Received error from daemon:", responseConfig.Err) - } - // Build map from kvpattern - args := make(CPM) - for _, p := range kvpattern { - // p syntax already checked in Main - a := strings.Split(p, "=") - compiled, err := regexp.Compile(fmt.Sprintf("^%v$", a[1])) - if err != nil { - logger.Fatalf("Bad argument: %v: %v", p, err) - } - args[a[0]] = CompiledPattern{Name: a[0], Regex: a[1], compiledRegex: compiled} - } - - for streamName := range response.ClientStatus { - for filterName := range response.ClientStatus[streamName] { - for patterns := range response.ClientStatus[streamName][filterName] { - pmatch := 0 - for ip, p := range patterns.Split() { - // get pattern name from stream.filter.pattern (which was alphabetically sorted at startup) - if v, found := args[responseConfig.Config.Streams[streamName].Filters[filterName].Pattern[ip].Name]; found { - if v.compiledRegex.Match([]byte(p)) { - pmatch++ - } - } - } - if pmatch != len(kvpattern) { - delete(response.ClientStatus[streamName][filterName], patterns) - } - } - if len(response.ClientStatus[streamName][filterName]) == 0 { - delete(response.ClientStatus[streamName], filterName) + // Check regexes match + if len(regexes) != 0 { + // Check that all user requested patterns are in this filter + var nbMatched int + var localMatches = match.Split() + // For each pattern of this filter + for i, pattern := range filter.Pattern { + // Check that this pattern has user requested name + if reg, ok := regexes[pattern.Name]; ok { + // Check that the PF.p[i] matches user requested pattern + if reg.MatchString(localMatches[i]) { + nbMatched++ } } - if len(response.ClientStatus[streamName]) == 0 { - delete(response.ClientStatus, streamName) - } + } + if len(regexes) != nbMatched { + return false } } + // All checks passed + return true +} +func addMatchToCS(cs ClientStatus, pf PF, times map[time.Time]struct{}) { + patterns, streamName, filterName := pf.P, pf.F.Stream.Name, pf.F.Name + if cs[streamName] == nil { + cs[streamName] = make(map[string]MapPatternStatus) + } + if cs[streamName][filterName] == nil { + cs[streamName][filterName] = make(MapPatternStatus) + } + cs[streamName][filterName][patterns] = &PatternStatus{len(times), nil} +} + +func addActionToCS(cs ClientStatus, pa PA, times map[time.Time]struct{}) { + patterns, streamName, filterName, actionName := pa.P, pa.A.Filter.Stream.Name, pa.A.Filter.Name, pa.A.Name + if cs[streamName] == nil { + cs[streamName] = make(map[string]MapPatternStatus) + } + if cs[streamName][filterName] == nil { + cs[streamName][filterName] = make(MapPatternStatus) + } + if cs[streamName][filterName][patterns] == nil { + cs[streamName][filterName][patterns] = new(PatternStatus) + } + ps := cs[streamName][filterName][patterns] + if ps.Actions == nil { + ps.Actions = make(map[string][]string) + } + for then := range times { + ps.Actions[actionName] = append(ps.Actions[actionName], then.Format(time.DateTime)) + } +} + +func printClientStatus(cs ClientStatus, format string) { var text []byte var err error if format == "json" { - text, err = json.MarshalIndent(response.ClientStatus, "", " ") + text, err = json.MarshalIndent(cs, "", " ") } else { - text, err = yaml.Marshal(response.ClientStatus) + text, err = yaml.Marshal(cs) } if err != nil { logger.Fatalln("Failed to convert daemon binary response to text format:", err) } fmt.Println(strings.ReplaceAll(string(text), "\\0", " ")) +} + +func ClientShow(format, stream, filter string, regex *regexp.Regexp, kvpattern []string) { + response := SendAndRetrieve(Request{Info, PSF{}}) + if response.Err != nil { + logger.Fatalln("Received error from daemon:", response.Err) + } + + cs := make(ClientStatus) + + var regexes map[string]*regexp.Regexp + + if len(kvpattern) != 0 { + // Transform []{"k=v","k=v"} into map["k"]"v" + regexes = make(map[string]*regexp.Regexp) + for _, p := range kvpattern { + // p syntax already checked in Main + key, value, found := strings.Cut(p, "=") + if !found { + logger.Fatalf("Bad argument: no =: %v", p) + } + compiled, err := regexp.Compile(fmt.Sprintf("^%v$", value)) + if err != nil { + logger.Fatalf("Bad argument: %v: %v", p, err) + } + regexes[key] = compiled + } + } + + var found bool + + // Painful data manipulation + for pf, times := range response.Matches { + // Check this PF is not empty + if len(times) == 0 { + continue + } + if !pfMatches(stream, filter, regex, regexes, pf.P, pf.F) { + continue + } + addMatchToCS(cs, pf, times) + found = true + } + + // Painful data manipulation + for pa, times := range response.Actions { + // Check this PF is not empty + if len(times) == 0 { + continue + } + if !pfMatches(stream, filter, regex, regexes, pa.P, pa.A.Filter) { + continue + } + addActionToCS(cs, pa, times) + found = true + } + + if !found { + logger.Println(logger.WARN, "No matching stream.filter items found. This does not mean it doesn't exist, maybe it just didn't receive any match.") + os.Exit(1) + } + + printClientStatus(cs, format) os.Exit(0) } @@ -236,88 +248,126 @@ func ClientShow(format, stream, filter string, regex *regexp.Regexp, kvpattern [ * - "2024-04-30 15:27:28" * */ -func ClientFlush(patterns []string, stream, filter, format string) { - responseConfig := SendAndRetrieve(Request{Config, stream, filter, ""}) - if responseConfig.Err != nil { - logger.Fatalln("Received error from daemon:", responseConfig.Err) +func ClientFlush(patterns []string, streamName, filterName, format string) { + // Either we have one pattern that is not of type name=pattern + requestedPatterns := make(map[string]string) + var requestedPattern string + if len(patterns) == 1 && strings.Index(patterns[0], "=") == -1 { + requestedPattern = patterns[0] + // Or we have one ore multiple patterns that must be of type name=pattern + } else { + for _, kvpattern := range patterns { + k, v, found := strings.Cut(kvpattern, "=") + if !found { + logger.Fatalf("Multiple patterns provided must be of type name=pattern: %v", kvpattern) + } + if requestedPatterns[k] != "" { + logger.Fatalf("Can't provide the same pattern name multiple times: %v", k) + } + requestedPatterns[k] = v + } } - if _, found := responseConfig.Config.Streams[stream].Filters[filter]; filter != "" && found == false { + // Remember which Filters are compatible with the query + filterCompatibility := make(map[SF]bool) + isCompatible := func(filter *Filter) bool { + sf := SF{filter.Stream.Name, filter.Name} + compatible, ok := filterCompatibility[sf] + + // already tested + if ok { + return compatible + } + + for k := range requestedPatterns { + if -1 == slices.IndexFunc(filter.Pattern, func(pattern *Pattern) bool { + return pattern.Name == k + }) { + filterCompatibility[sf] = false + return false + } + } + filterCompatibility[sf] = true + return true + } + + // match functions + kvMatch := func(filter *Filter, filterPatterns []string) bool { + // For each user requested pattern + for k, v := range requestedPatterns { + // Find its index on the Filter.Pattern + for i, pattern := range filter.Pattern { + if k == pattern.Name { + // Test the match + if v != filterPatterns[i] { + return false + } + } + } + } + return true + } + + var found bool + fullMatch := func(filter *Filter, match Match) bool { + // Test if we limit by stream + if streamName == "" || filter.Stream.Name == streamName { + // Test if we limit by filter + if filterName == "" || filter.Name == filterName { + found = true + filterPatterns := match.Split() + // If there is only one pattern + if requestedPattern != "" { + for _, filterPattern := range filterPatterns { + if requestedPattern == filterPattern { + return true + } + } + } else { + return isCompatible(filter) && kvMatch(filter, filterPatterns) + } + } + } + return false + } + + response := SendAndRetrieve(Request{Info, PSF{}}) + if response.Err != nil { + logger.Fatalln("Received error from daemon:", response.Err) + } + + commands := make([]PSF, 0) + + cs := make(ClientStatus) + + for pf, times := range response.Matches { + if fullMatch(pf.F, pf.P) { + commands = append(commands, PSF{pf.P, pf.F.Stream.Name, pf.F.Name}) + addMatchToCS(cs, pf, times) + } + } + + for pa, times := range response.Actions { + if fullMatch(pa.A.Filter, pa.P) { + commands = append(commands, PSF{pa.P, pa.A.Filter.Stream.Name, pa.A.Filter.Name}) + addActionToCS(cs, pa, times) + } + } + + if !found { logger.Println(logger.WARN, "No matching stream.filter items found. This does not mean it doesn't exist, maybe it just didn't receive any match.") os.Exit(1) } - processFilter := func(patterns []string, stream, filter, format string) { - fpqty := len(responseConfig.Config.Streams[stream].Filters[filter].Pattern) - - if len(patterns) > fpqty { - logger.Fatalln("filter have 1 pattern") - } - - for _, kv := range patterns { - if len(strings.Split(kv, "=")) != 2 && fpqty > 1 { - logger.Fatalln("args should be in pattern=value format using more than one pattern in filter") - } - } - - // Transform arg to k=v - if fpqty == 1 && len(strings.Split(patterns[0], "=")) == 1 { - patterns[0] = strings.Join([]string{responseConfig.Config.Streams[stream].Filters[filter].Pattern[0].Name, patterns[0]}, "=") - } - - // arg pattern map - pmap := make(map[string]string) - for _, p := range patterns { - a := strings.Split(p, "=") - pmap[a[0]] = a[1] - } - - // Check every arg pattern exist in filter - for k, _ := range pmap { - found := false - for _, fp := range responseConfig.Config.Streams[stream].Filters[filter].Pattern { - if strings.EqualFold(k, fp.Name) { - found = true - break - } - } - if !found { - logger.Fatalf("pattern <%s> not found in filter %s.%s\n", k, stream, filter) - } - } - - // Order arg patterns before JoinMatch - var orderedPatterns []string - for _, p := range responseConfig.Config.Streams[stream].Filters[filter].Pattern { - if v, found := pmap[p.Name]; found { - orderedPatterns = append(orderedPatterns, v) - } else { - orderedPatterns = append(orderedPatterns, "") - } - } - response := SendAndRetrieve(Request{Flush, stream, filter, JoinMatch(orderedPatterns)}) + for _, psf := range commands { + response := SendAndRetrieve(Request{Flush, psf}) if response.Err != nil { - logger.Fatalln(logger.ERROR, "Received error from daemon:", response.Err) - } - - var text []byte - var err error - if format == "json" { - text, err = json.MarshalIndent(ClientStatusFlush(response.ClientStatus), "", " ") - } else { - text, err = yaml.Marshal(ClientStatusFlush(response.ClientStatus)) - } - if err != nil { - logger.Fatalln("Failed to convert daemon binary response to text format:", err) - } - fmt.Println(string(text)) - } - - for streamName := range responseConfig.Config.Streams { - for filterName := range responseConfig.Config.Streams[streamName].Filters { - processFilter(patterns, streamName, filterName, format) + logger.Fatalln("Received error from daemon:", response.Err) } } + + printClientStatus(cs, format) + os.Exit(0) } func TestRegex(confFilename, regex, line string) { diff --git a/app/daemon.go b/app/daemon.go index b0a3809..7a2ffd0 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -5,7 +5,6 @@ import ( "os" "os/exec" "os/signal" - "regexp" "strings" "sync" "syscall" @@ -91,11 +90,11 @@ func (f *Filter) match(line *string) Match { } } if len(result) == len(f.Pattern) { - logger.Printf(logger.INFO, "%s.%s: match %s", f.stream.name, f.name, WithBrackets(result)) + logger.Printf(logger.INFO, "%s.%s: match %s", f.Stream.Name, f.Name, WithBrackets(result)) return JoinMatch(result) } } else { - logger.Printf(logger.INFO, "%s.%s: match [.]\n", f.stream.name, f.name) + logger.Printf(logger.INFO, "%s.%s: match [.]\n", f.Stream.Name, f.Name) // No pattern, so this match will never actually be used return "." } @@ -115,12 +114,12 @@ func (a *Action) exec(match Match) { var computedCommand []string - if a.filter.Pattern != nil { + if a.Filter.Pattern != nil { computedCommand = make([]string, 0, len(a.Cmd)) matches := match.Split() for _, item := range a.Cmd { - for i, p := range a.filter.Pattern { + for i, p := range a.Filter.Pattern { item = strings.ReplaceAll(item, p.nameWithBraces, matches[i]) } computedCommand = append(computedCommand, item) @@ -129,12 +128,12 @@ func (a *Action) exec(match Match) { computedCommand = a.Cmd } - logger.Printf(logger.INFO, "%s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand) + logger.Printf(logger.INFO, "%s.%s.%s: run %s\n", a.Filter.Stream.Name, a.Filter.Name, a.Name, computedCommand) cmd := exec.Command(computedCommand[0], computedCommand[1:]...) if ret := cmd.Run(); ret != nil { - logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret) + logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.Filter.Stream.Name, a.Filter.Name, a.Name, computedCommand, ret) } } @@ -147,7 +146,7 @@ func ActionsManager(concurrency int) { var pa PA for { pa = <-execActionsC - pa.a.exec(pa.p) + pa.A.exec(pa.P) } }() } @@ -157,7 +156,7 @@ func ActionsManager(concurrency int) { for { pa = <-execActionsC go func(pa PA) { - pa.a.exec(pa.p) + pa.A.exec(pa.P) }(pa) } }() @@ -172,65 +171,49 @@ func ActionsManager(concurrency int) { for { select { case pat := <-actionsC: - pa := PA{pat.p, pat.a} - pattern, action, then := pat.p, pat.a, pat.t + pa := PA{pat.P, pat.A} + pattern, action, then := pat.P, pat.A, pat.T now := time.Now() // check if must be executed now if then.Compare(now) <= 0 { execAction(action, pattern) } else { - actionsLock.Lock() if actions[pa] == nil { actions[pa] = make(map[time.Time]struct{}) } actions[pa][then] = struct{}{} - actionsLock.Unlock() go func(insidePat PAT, insideNow time.Time) { - time.Sleep(insidePat.t.Sub(insideNow)) + time.Sleep(insidePat.T.Sub(insideNow)) pendingActionsC <- insidePat }(pat, now) } case pat := <-pendingActionsC: - pa := PA{pat.p, pat.a} - pattern, action, then := pat.p, pat.a, pat.t - actionsLock.Lock() + pa := PA{pat.P, pat.A} + pattern, action, then := pat.P, pat.A, pat.T if actions[pa] != nil { delete(actions[pa], then) } - actionsLock.Unlock() execAction(action, pattern) case fo := <-flushToActionsC: - ret := make(ActionsMap) - match := 0 - actionsLock.Lock() for pa := range actions { - ppa, pfo := pa.p.Split(), fo.p.Split() - for i, p := range ppa { - if m, err := regexp.MatchString(pfo[i], p); err == nil && m == true { - match++ - } - } - if match == len(ppa) { + if fo.S == pa.A.Filter.Stream.Name && + fo.F == pa.A.Filter.Name && + fo.P == pa.P { for range actions[pa] { - execAction(pa.a, pa.p) + execAction(pa.A, pa.P) } - ret[pa] = actions[pa] delete(actions, pa) + break } - match = 0 } - actionsLock.Unlock() - fo.ret <- ret case _, _ = <-stopActions: - actionsLock.Lock() for pa := range actions { - if pa.a.OnExit { + if pa.A.OnExit { for range actions[pa] { - execAction(pa.a, pa.p) + execAction(pa.A, pa.P) } } } - actionsLock.Unlock() wgActions.Done() return } @@ -238,7 +221,7 @@ func ActionsManager(concurrency int) { } func MatchesManager() { - var fo FlushMatchOrder + var fo PSF var pft PFT end := false @@ -261,7 +244,7 @@ func MatchesManager() { matchesManagerHandleFlush(fo) case pft = <-matchesC: - entry := LogEntry{pft.t, 0, pft.p, pft.f.stream.name, pft.f.name, 0, false} + entry := LogEntry{pft.T, 0, pft.P, pft.F.Stream.Name, pft.F.Name, 0, false} entry.Exec = matchesManagerHandleMatch(pft) @@ -270,37 +253,25 @@ func MatchesManager() { } } -func matchesManagerHandleFlush(fo FlushMatchOrder) { - ret := make(MatchesMap) - match := 0 +func matchesManagerHandleFlush(fo PSF) { matchesLock.Lock() for pf := range matches { - ppf, pfo := pf.p.Split(), fo.p.Split() - for i, p := range ppf { - if m, err := regexp.MatchString(pfo[i], p); err == nil && m == true { - match++ - } - } - if match == len(ppf) { - if fo.ret != nil { - ret[pf] = matches[pf] - } + if fo.S == pf.F.Stream.Name && + fo.F == pf.F.Name && + fo.P == pf.P { delete(matches, pf) + break } - match = 0 } matchesLock.Unlock() - if fo.ret != nil { - fo.ret <- ret - } } func matchesManagerHandleMatch(pft PFT) bool { matchesLock.Lock() defer matchesLock.Unlock() - filter, patterns, then := pft.f, pft.p, pft.t - pf := PF{pft.p, pft.f} + filter, patterns, then := pft.F, pft.P, pft.T + pf := PF{pft.P, pft.F} if filter.Retry > 1 { // make sure map exists @@ -332,7 +303,7 @@ func matchesManagerHandleMatch(pft PFT) bool { func StreamManager(s *Stream, endedSignal chan *Stream) { defer wgStreams.Done() - logger.Printf(logger.INFO, "%s: start %s\n", s.name, s.Cmd) + logger.Printf(logger.INFO, "%s: start %s\n", s.Name, s.Cmd) lines := cmdStdout(s.Cmd) for { @@ -356,7 +327,6 @@ func StreamManager(s *Stream, endedSignal chan *Stream) { var actions ActionsMap var matches MatchesMap -var actionsLock sync.Mutex var matchesLock sync.Mutex var stopStreams chan bool @@ -391,10 +361,10 @@ var logsC chan LogEntry var actionsC chan PAT // SocketManager, DatabaseManager → MatchesManager -var flushToMatchesC chan FlushMatchOrder +var flushToMatchesC chan PSF // SocketManager → ActionsManager -var flushToActionsC chan FlushActionOrder +var flushToActionsC chan PSF // SocketManager → DatabaseManager var flushToDatabaseC chan LogEntry @@ -406,8 +376,8 @@ func Daemon(confFilename string) { matchesC = make(chan PFT) logsC = make(chan LogEntry) actionsC = make(chan PAT) - flushToMatchesC = make(chan FlushMatchOrder) - flushToActionsC = make(chan FlushActionOrder) + flushToMatchesC = make(chan PSF) + flushToActionsC = make(chan PSF) flushToDatabaseC = make(chan LogEntry) stopActions = make(chan bool) stopStreams = make(chan bool) @@ -438,7 +408,7 @@ func Daemon(confFilename string) { for { select { case finishedStream := <-endSignals: - logger.Printf(logger.ERROR, "%s stream finished", finishedStream.name) + logger.Printf(logger.ERROR, "%s stream finished", finishedStream.Name) nbStreamsInExecution-- if nbStreamsInExecution == 0 { quit(conf, false) diff --git a/app/main.go b/app/main.go index da5b499..9ff3dbe 100644 --- a/app/main.go +++ b/app/main.go @@ -106,7 +106,8 @@ func basicUsage() { ` + bold + `reaction flush` + reset + ` TARGET [TARGET...] # remove currently active matches and run currently pending actions for the specified TARGET(s) # (then show flushed matches and actions) - # e.g. reaction flush 192.168.1.1 root + # e.g. reaction flush 192.168.1.1 + # e.g. reaction flush ip=192.168.1.1 user=root # options: -s/--socket SOCKET # path to the client-daemon communication socket @@ -192,6 +193,7 @@ func Main(version, commit string) { var regex *regexp.Regexp var err error if *pattern != "" { + // TODO anchor ^$ the pattern? regex, err = regexp.Compile(*pattern) if err != nil { logger.Fatalln("-p/--pattern: ", err) @@ -223,11 +225,12 @@ func Main(version, commit string) { stream, filter := "", "" if *limit != "" { splitSF := strings.Split(*limit, ".") - if len(splitSF) != 2 { + stream = splitSF[0] + if len(splitSF) == 2 { + filter = splitSF[1] + } else if len(splitSF) > 2 { logger.Fatalln("-l/--limit: only one . separator is supported") } - stream = splitSF[0] - filter = splitSF[1] } ClientFlush(f.Args(), stream, filter, *queryFormat) diff --git a/app/persist.go b/app/persist.go index 034e2a7..78e78a1 100644 --- a/app/persist.go +++ b/app/persist.go @@ -125,7 +125,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E defer func() { for sf, t := range discardedEntries { if t > 0 { - logger.Printf(logger.WARN, "info discarded %v times from the DBs: stream/filter not found: %s.%s\n", t, sf.s, sf.f) + logger.Printf(logger.WARN, "info discarded %v times from the DBs: stream/filter not found: %s.%s\n", t, sf.S, sf.F) } } if malformedEntries > 0 { @@ -186,8 +186,8 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E discardedEntries[SF{"", ""}]++ continue } - entry.Stream = sf.s - entry.Filter = sf.f + entry.Stream = sf.S + entry.Filter = sf.F } if stream := c.Streams[entry.Stream]; stream != nil { filter = stream.Filters[entry.Filter] @@ -231,7 +231,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E // replay executions if entry.Exec && entry.T.Add(*filter.longuestActionDuration).Unix() > now.Unix() { if startup { - flushToMatchesC <- FlushMatchOrder{entry.Pattern, nil} + flushToMatchesC <- PSF{entry.Pattern, entry.Stream, entry.Filter} filter.sendActions(entry.Pattern, entry.T) } diff --git a/app/pipe.go b/app/pipe.go index 7f4f684..0d5b4ac 100644 --- a/app/pipe.go +++ b/app/pipe.go @@ -2,58 +2,15 @@ package app import ( "encoding/gob" + "errors" "net" "os" "path" - "sync" "time" "framagit.org/ppom/reaction/logger" ) -func genClientStatus(local_actions ActionsMap, local_matches MatchesMap, local_actionsLock, local_matchesLock *sync.Mutex) ClientStatus { - cs := make(ClientStatus) - local_matchesLock.Lock() - - // Painful data manipulation - for pf, times := range local_matches { - patterns, filter := pf.p, pf.f - if cs[filter.stream.name] == nil { - cs[filter.stream.name] = make(map[string]MapPatternStatus) - } - if cs[filter.stream.name][filter.name] == nil { - cs[filter.stream.name][filter.name] = make(MapPatternStatus) - } - cs[filter.stream.name][filter.name][patterns] = &PatternStatus{len(times), nil} - } - - local_matchesLock.Unlock() - local_actionsLock.Lock() - - // Painful data manipulation - for pa, times := range local_actions { - patterns, action := pa.p, pa.a - if cs[action.filter.stream.name] == nil { - cs[action.filter.stream.name] = make(map[string]MapPatternStatus) - } - if cs[action.filter.stream.name][action.filter.name] == nil { - cs[action.filter.stream.name][action.filter.name] = make(MapPatternStatus) - } - if cs[action.filter.stream.name][action.filter.name][patterns] == nil { - cs[action.filter.stream.name][action.filter.name][patterns] = new(PatternStatus) - } - ps := cs[action.filter.stream.name][action.filter.name][patterns] - if ps.Actions == nil { - ps.Actions = make(map[string][]string) - } - for then := range times { - ps.Actions[action.name] = append(ps.Actions[action.name], then.Format(time.DateTime)) - } - } - local_actionsLock.Unlock() - return cs -} - func createOpenSocket() net.Listener { err := os.MkdirAll(path.Dir(*SocketPath), 0755) if err != nil { @@ -97,24 +54,20 @@ func SocketManager(conf *Conf) { } switch request.Request { - case Show: - response.ClientStatus = genClientStatus(actions, matches, &actionsLock, &matchesLock) + case Info: + // response.Config = *conf + response.Matches = matches + response.Actions = actions case Flush: - le := LogEntry{time.Now(), 0, request.Pattern, request.Stream, request.Filter, 0, false} + le := LogEntry{time.Now(), 0, request.Flush.P, request.Flush.S, request.Flush.F, 0, false} - matchesC := FlushMatchOrder{request.Pattern, make(chan MatchesMap)} - actionsC := FlushActionOrder{request.Pattern, make(chan ActionsMap)} - - flushToMatchesC <- matchesC - flushToActionsC <- actionsC + flushToMatchesC <- request.Flush + flushToActionsC <- request.Flush flushToDatabaseC <- le - var lock sync.Mutex - response.ClientStatus = genClientStatus(<-actionsC.ret, <-matchesC.ret, &lock, &lock) - case Config: - response.Config = *conf default: - logger.Println(logger.ERROR, "Invalid Message from cli: unrecognised Request type") + logger.Println(logger.ERROR, "Invalid Message from cli: unrecognised command type") + response.Err = errors.New("unrecognised command type") return } diff --git a/app/startup.go b/app/startup.go index d414b46..07faab6 100644 --- a/app/startup.go +++ b/app/startup.go @@ -65,39 +65,39 @@ func (c *Conf) setup() { for streamName := range c.Streams { stream := c.Streams[streamName] - stream.name = streamName + stream.Name = streamName - if strings.Contains(stream.name, ".") { - logger.Fatalf("Bad configuration: character '.' is not allowed in stream names: '%v'", stream.name) + if strings.Contains(stream.Name, ".") { + logger.Fatalf("Bad configuration: character '.' is not allowed in stream names: '%v'", stream.Name) } if len(stream.Filters) == 0 { - logger.Fatalf("Bad configuration: no filters configured in %v", stream.name) + logger.Fatalf("Bad configuration: no filters configured in %v", stream.Name) } for filterName := range stream.Filters { filter := stream.Filters[filterName] - filter.stream = stream - filter.name = filterName + filter.Stream = stream + filter.Name = filterName - if strings.Contains(filter.name, ".") { - logger.Fatalf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.name) + if strings.Contains(filter.Name, ".") { + logger.Fatalf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.Name) } // Parse Duration if filter.RetryPeriod == "" { if filter.Retry > 1 { - logger.Fatalf("Bad configuration: retry but no retryperiod in %v.%v", stream.name, filter.name) + logger.Fatalf("Bad configuration: retry but no retryperiod in %v.%v", stream.Name, filter.Name) } } else { retryDuration, err := time.ParseDuration(filter.RetryPeriod) if err != nil { - logger.Fatalf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.name, filter.name, err) + logger.Fatalf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.Name, filter.Name, err) } filter.retryDuration = retryDuration } if len(filter.Regex) == 0 { - logger.Fatalf("Bad configuration: no regexes configured in %v.%v", stream.name, filter.name) + logger.Fatalf("Bad configuration: no regexes configured in %v.%v", stream.Name, filter.Name) } // Compute Regexes // Look for Patterns inside Regexes @@ -114,32 +114,32 @@ func (c *Conf) setup() { } compiledRegex, err := regexp.Compile(regex) if err != nil { - logger.Fatalf("Bad configuration: regex of filter %s.%s: %v", stream.name, filter.name, err) + logger.Fatalf("Bad configuration: regex of filter %s.%s: %v", stream.Name, filter.Name, err) } filter.compiledRegex = append(filter.compiledRegex, *compiledRegex) } if len(filter.Actions) == 0 { - logger.Fatalln("Bad configuration: no actions configured in", stream.name, ".", filter.name) + logger.Fatalln("Bad configuration: no actions configured in", stream.Name, ".", filter.Name) } for actionName := range filter.Actions { action := filter.Actions[actionName] - action.filter = filter - action.name = actionName + action.Filter = filter + action.Name = actionName - if strings.Contains(action.name, ".") { - logger.Fatalln("Bad configuration: character '.' is not allowed in action names", action.name) + if strings.Contains(action.Name, ".") { + logger.Fatalln("Bad configuration: character '.' is not allowed in action names", action.Name) } // Parse Duration if action.After != "" { afterDuration, err := time.ParseDuration(action.After) if err != nil { - logger.Fatalln("Bad configuration: Failed to parse after time in ", stream.name, ".", filter.name, ".", action.name, ":", err) + logger.Fatalln("Bad configuration: Failed to parse after time in ", stream.Name, ".", filter.Name, ".", action.Name, ":", err) } action.afterDuration = afterDuration } else if action.OnExit { - logger.Fatalln("Bad configuration: Cannot have `onexit: true` without an `after` directive in", stream.name, ".", filter.name, ".", action.name) + logger.Fatalln("Bad configuration: Cannot have `onexit: true` without an `after` directive in", stream.Name, ".", filter.Name, ".", action.Name) } if filter.longuestActionDuration == nil || filter.longuestActionDuration.Milliseconds() < action.afterDuration.Milliseconds() { filter.longuestActionDuration = &action.afterDuration diff --git a/app/types.go b/app/types.go index ad455ac..b49b075 100644 --- a/app/types.go +++ b/app/types.go @@ -1,6 +1,7 @@ package app import ( + "bytes" "encoding/gob" "fmt" "os" @@ -32,15 +33,33 @@ type Pattern struct { // They're always referenced through pointers type Stream struct { - name string `json:"-"` + Name string `json:"-"` Cmd []string `json:"cmd"` Filters map[string]*Filter `json:"filters"` } +type LilStream struct { + Name string +} + +func (s *Stream) GobEncode() ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(LilStream{s.Name}) + return buf.Bytes(), err +} + +func (s *Stream) GobDecode(b []byte)(error) { + var ls LilStream + dec := gob.NewDecoder(bytes.NewReader(b)) + err := dec.Decode(&ls) + s.Name = ls.Name + return err +} type Filter struct { - stream *Stream `json:"-"` - name string `json:"-"` + Stream *Stream `json:"-"` + Name string `json:"-"` Regex []string `json:"regex"` compiledRegex []regexp.Regexp `json:"-"` @@ -54,9 +73,34 @@ type Filter struct { longuestActionDuration *time.Duration } +// those small versions are needed to prevent infinite recursion in gob because of +// data cycles: Stream <-> Filter, Filter <-> Action +type LilFilter struct { + Stream *Stream + Name string + Pattern []*Pattern +} + +func (f *Filter) GobDecode(b []byte)(error) { + var lf LilFilter + dec := gob.NewDecoder(bytes.NewReader(b)) + err := dec.Decode(&lf) + f.Stream = lf.Stream + f.Name = lf.Name + f.Pattern = lf.Pattern + return err +} + +func (f *Filter) GobEncode() ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(LilFilter{f.Stream, f.Name, f.Pattern}) + return buf.Bytes(), err +} + type Action struct { - filter *Filter `json:"-"` - name string `json:"-"` + Filter *Filter `json:"-"` + Name string `json:"-"` Cmd []string `json:"cmd"` @@ -65,6 +109,26 @@ type Action struct { OnExit bool `json:"onexit"` } +type LilAction struct { + Filter *Filter + Name string +} + +func (a *Action) GobEncode() ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(LilAction{a.Filter, a.Name}) + return buf.Bytes(), err +} + +func (a *Action) GobDecode(b []byte)(error) { + var la LilAction + dec := gob.NewDecoder(bytes.NewReader(b)) + err := dec.Decode(&la) + a.Filter = la.Filter + a.Name = la.Name + return err +} type LogEntry struct { T time.Time @@ -108,38 +172,29 @@ func WithBrackets(mm []string) string { // Helper structs made to carry information // Stream, Filter -type SF struct{ s, f string } +type SF struct{ S, F string } // Pattern, Stream, Filter type PSF struct { - p Match - s, f string + P Match + S, F string } type PF struct { - p Match - f *Filter + P Match + F *Filter } type PFT struct { - p Match - f *Filter - t time.Time + P Match + F *Filter + T time.Time } type PA struct { - p Match - a *Action + P Match + A *Action } type PAT struct { - p Match - a *Action - t time.Time -} - -type FlushMatchOrder struct { - p Match - ret chan MatchesMap -} -type FlushActionOrder struct { - p Match - ret chan ActionsMap + P Match + A *Action + T time.Time } diff --git a/config/test.jsonnet b/config/test.jsonnet index 503eb63..a4ec77f 100644 --- a/config/test.jsonnet +++ b/config/test.jsonnet @@ -14,7 +14,7 @@ streams: { tailDown1: { - cmd: ['sh', '-c', "echo 1_a 2_a 3_a a_1 a_2 a_3 | tr ' ' '\n' | while read i; do sleep 1; echo found $i; done"], + cmd: ['sh', '-c', "echo 1_a 2_a 3_a a_1 a_2 a_3 | tr ' ' '\n' | while read i; do sleep 1; echo found $i; done; sleep 30"], filters: { findIP: { regex: [ @@ -29,7 +29,30 @@ }, undamn: { cmd: ['echo', 'undamn', ''], - after: '4s', + after: '28s', + onexit: true, + }, + }, + }, + }, + }, + tailDown2: { + cmd: ['sh', '-c', "echo 1_a 2_a 3_a a_1 a_2 a_3 | tr ' ' '\n' | while read i; do sleep 1; echo found $i; done; sleep 30"], + filters: { + findIP: { + regex: [ + '^found _$', + '^found _$', + ], + retry: 2, + retryperiod: '30s', + actions: { + damn: { + cmd: ['echo', ''], + }, + undamn: { + cmd: ['echo', 'undamn', ''], + after: '28s', onexit: true, }, },