Big client-daemon refacto

Now most of the work is done on the client side.

Daemon sends its Config, Matches and Actions structures.
Client does all data conversions, and filtering.
For flush, Client sends to the Daemon individual flush instructions.
This commit is contained in:
ppom 2024-05-20 12:00:00 +02:00
commit 131d8eced9
8 changed files with 414 additions and 360 deletions

View file

@ -8,29 +8,29 @@ import (
"net"
"os"
"regexp"
"slices"
"strings"
"time"
"framagit.org/ppom/reaction/logger"
"sigs.k8s.io/yaml"
)
const (
Show = 0
Flush = 1
Config = 2
Info = 0
Flush = 1
)
type Request struct {
Request int
Stream string
Filter string
Pattern Match
Flush PSF
}
type Response struct {
Err error
ClientStatus ClientStatus
Config Conf
Err error
// Config Conf
Matches MatchesMap
Actions ActionsMap
}
func SendAndRetrieve(data Request) Response {
@ -63,13 +63,6 @@ type MapPatternStatusFlush MapPatternStatus
type ClientStatus map[string]map[string]MapPatternStatus
type ClientStatusFlush ClientStatus
type CompiledPattern struct{
Name string
Regex string
compiledRegex *regexp.Regexp
}
type CPM map[string]CompiledPattern
func (mps MapPatternStatusFlush) MarshalJSON() ([]byte, error) {
for _, v := range mps {
return json.Marshal(v)
@ -88,137 +81,156 @@ func (csf ClientStatusFlush) MarshalJSON() ([]byte, error) {
return json.Marshal(ret)
}
func ClientShow(format, stream, filter string, regex *regexp.Regexp, kvpattern []string) {
response := SendAndRetrieve(Request{Show, stream, filter, ""})
if response.Err != nil {
logger.Fatalln("Received error from daemon:", response.Err)
func pfMatches(streamName string, filterName string, regex *regexp.Regexp, regexes map[string]*regexp.Regexp, match Match, filter *Filter) bool {
// Check stream and filter match
if streamName != "" && streamName != filter.Stream.Name {
return false
}
// Remove empty structs
for streamName := range response.ClientStatus {
for filterName := range response.ClientStatus[streamName] {
for patternName, patternMap := range response.ClientStatus[streamName][filterName] {
if len(patternMap.Actions) == 0 && patternMap.Matches == 0 {
delete(response.ClientStatus[streamName][filterName], patternName)
}
}
if len(response.ClientStatus[streamName][filterName]) == 0 {
delete(response.ClientStatus[streamName], filterName)
}
}
if len(response.ClientStatus[streamName]) == 0 {
delete(response.ClientStatus, streamName)
}
if filterName != "" && filterName != filter.Name {
return false
}
// Limit to stream, filter if exists
if stream != "" {
exists := false
for streamName := range response.ClientStatus {
if stream == streamName {
if filter != "" {
for filterName := range response.ClientStatus[streamName] {
if filter == filterName {
exists = true
} else {
delete(response.ClientStatus[streamName], filterName)
}
}
} else {
exists = true
}
} else {
delete(response.ClientStatus, streamName)
}
}
if !exists {
logger.Println(logger.WARN, "No matching stream.filter items found. This does not mean it doesn't exist, maybe it just didn't receive any match.")
os.Exit(1)
}
}
// Limit to pattern
// Check regex match
if regex != nil {
for streamName := range response.ClientStatus {
for filterName := range response.ClientStatus[streamName] {
for patterns := range response.ClientStatus[streamName][filterName] {
pmatch := false
for _, p := range patterns.Split() {
if regex.MatchString(p) {
pmatch = true
}
}
if !pmatch {
delete(response.ClientStatus[streamName][filterName], patterns)
}
}
if len(response.ClientStatus[streamName][filterName]) == 0 {
delete(response.ClientStatus[streamName], filterName)
}
}
if len(response.ClientStatus[streamName]) == 0 {
delete(response.ClientStatus, streamName)
var pmatch bool
for _, p := range match.Split() {
if regex.MatchString(p) {
pmatch = true
}
}
if !pmatch {
return false
}
}
// Limit to kvpatterns
if kvpattern != nil {
// Get pattern indices (as stored in DB) from config
responseConfig := SendAndRetrieve(Request{Config, stream, filter, ""})
if responseConfig.Err != nil {
logger.Fatalln("Received error from daemon:", responseConfig.Err)
}
// Build map from kvpattern
args := make(CPM)
for _, p := range kvpattern {
// p syntax already checked in Main
a := strings.Split(p, "=")
compiled, err := regexp.Compile(fmt.Sprintf("^%v$", a[1]))
if err != nil {
logger.Fatalf("Bad argument: %v: %v", p, err)
}
args[a[0]] = CompiledPattern{Name: a[0], Regex: a[1], compiledRegex: compiled}
}
for streamName := range response.ClientStatus {
for filterName := range response.ClientStatus[streamName] {
for patterns := range response.ClientStatus[streamName][filterName] {
pmatch := 0
for ip, p := range patterns.Split() {
// get pattern name from stream.filter.pattern (which was alphabetically sorted at startup)
if v, found := args[responseConfig.Config.Streams[streamName].Filters[filterName].Pattern[ip].Name]; found {
if v.compiledRegex.Match([]byte(p)) {
pmatch++
}
}
}
if pmatch != len(kvpattern) {
delete(response.ClientStatus[streamName][filterName], patterns)
}
}
if len(response.ClientStatus[streamName][filterName]) == 0 {
delete(response.ClientStatus[streamName], filterName)
// Check regexes match
if len(regexes) != 0 {
// Check that all user requested patterns are in this filter
var nbMatched int
var localMatches = match.Split()
// For each pattern of this filter
for i, pattern := range filter.Pattern {
// Check that this pattern has user requested name
if reg, ok := regexes[pattern.Name]; ok {
// Check that the PF.p[i] matches user requested pattern
if reg.MatchString(localMatches[i]) {
nbMatched++
}
}
if len(response.ClientStatus[streamName]) == 0 {
delete(response.ClientStatus, streamName)
}
}
if len(regexes) != nbMatched {
return false
}
}
// All checks passed
return true
}
func addMatchToCS(cs ClientStatus, pf PF, times map[time.Time]struct{}) {
patterns, streamName, filterName := pf.P, pf.F.Stream.Name, pf.F.Name
if cs[streamName] == nil {
cs[streamName] = make(map[string]MapPatternStatus)
}
if cs[streamName][filterName] == nil {
cs[streamName][filterName] = make(MapPatternStatus)
}
cs[streamName][filterName][patterns] = &PatternStatus{len(times), nil}
}
func addActionToCS(cs ClientStatus, pa PA, times map[time.Time]struct{}) {
patterns, streamName, filterName, actionName := pa.P, pa.A.Filter.Stream.Name, pa.A.Filter.Name, pa.A.Name
if cs[streamName] == nil {
cs[streamName] = make(map[string]MapPatternStatus)
}
if cs[streamName][filterName] == nil {
cs[streamName][filterName] = make(MapPatternStatus)
}
if cs[streamName][filterName][patterns] == nil {
cs[streamName][filterName][patterns] = new(PatternStatus)
}
ps := cs[streamName][filterName][patterns]
if ps.Actions == nil {
ps.Actions = make(map[string][]string)
}
for then := range times {
ps.Actions[actionName] = append(ps.Actions[actionName], then.Format(time.DateTime))
}
}
func printClientStatus(cs ClientStatus, format string) {
var text []byte
var err error
if format == "json" {
text, err = json.MarshalIndent(response.ClientStatus, "", " ")
text, err = json.MarshalIndent(cs, "", " ")
} else {
text, err = yaml.Marshal(response.ClientStatus)
text, err = yaml.Marshal(cs)
}
if err != nil {
logger.Fatalln("Failed to convert daemon binary response to text format:", err)
}
fmt.Println(strings.ReplaceAll(string(text), "\\0", " "))
}
func ClientShow(format, stream, filter string, regex *regexp.Regexp, kvpattern []string) {
response := SendAndRetrieve(Request{Info, PSF{}})
if response.Err != nil {
logger.Fatalln("Received error from daemon:", response.Err)
}
cs := make(ClientStatus)
var regexes map[string]*regexp.Regexp
if len(kvpattern) != 0 {
// Transform []{"k=v","k=v"} into map["k"]"v"
regexes = make(map[string]*regexp.Regexp)
for _, p := range kvpattern {
// p syntax already checked in Main
key, value, found := strings.Cut(p, "=")
if !found {
logger.Fatalf("Bad argument: no =: %v", p)
}
compiled, err := regexp.Compile(fmt.Sprintf("^%v$", value))
if err != nil {
logger.Fatalf("Bad argument: %v: %v", p, err)
}
regexes[key] = compiled
}
}
var found bool
// Painful data manipulation
for pf, times := range response.Matches {
// Check this PF is not empty
if len(times) == 0 {
continue
}
if !pfMatches(stream, filter, regex, regexes, pf.P, pf.F) {
continue
}
addMatchToCS(cs, pf, times)
found = true
}
// Painful data manipulation
for pa, times := range response.Actions {
// Check this PF is not empty
if len(times) == 0 {
continue
}
if !pfMatches(stream, filter, regex, regexes, pa.P, pa.A.Filter) {
continue
}
addActionToCS(cs, pa, times)
found = true
}
if !found {
logger.Println(logger.WARN, "No matching stream.filter items found. This does not mean it doesn't exist, maybe it just didn't receive any match.")
os.Exit(1)
}
printClientStatus(cs, format)
os.Exit(0)
}
@ -236,88 +248,126 @@ func ClientShow(format, stream, filter string, regex *regexp.Regexp, kvpattern [
* - "2024-04-30 15:27:28"
*
*/
func ClientFlush(patterns []string, stream, filter, format string) {
responseConfig := SendAndRetrieve(Request{Config, stream, filter, ""})
if responseConfig.Err != nil {
logger.Fatalln("Received error from daemon:", responseConfig.Err)
func ClientFlush(patterns []string, streamName, filterName, format string) {
// Either we have one pattern that is not of type name=pattern
requestedPatterns := make(map[string]string)
var requestedPattern string
if len(patterns) == 1 && strings.Index(patterns[0], "=") == -1 {
requestedPattern = patterns[0]
// Or we have one ore multiple patterns that must be of type name=pattern
} else {
for _, kvpattern := range patterns {
k, v, found := strings.Cut(kvpattern, "=")
if !found {
logger.Fatalf("Multiple patterns provided must be of type name=pattern: %v", kvpattern)
}
if requestedPatterns[k] != "" {
logger.Fatalf("Can't provide the same pattern name multiple times: %v", k)
}
requestedPatterns[k] = v
}
}
if _, found := responseConfig.Config.Streams[stream].Filters[filter]; filter != "" && found == false {
// Remember which Filters are compatible with the query
filterCompatibility := make(map[SF]bool)
isCompatible := func(filter *Filter) bool {
sf := SF{filter.Stream.Name, filter.Name}
compatible, ok := filterCompatibility[sf]
// already tested
if ok {
return compatible
}
for k := range requestedPatterns {
if -1 == slices.IndexFunc(filter.Pattern, func(pattern *Pattern) bool {
return pattern.Name == k
}) {
filterCompatibility[sf] = false
return false
}
}
filterCompatibility[sf] = true
return true
}
// match functions
kvMatch := func(filter *Filter, filterPatterns []string) bool {
// For each user requested pattern
for k, v := range requestedPatterns {
// Find its index on the Filter.Pattern
for i, pattern := range filter.Pattern {
if k == pattern.Name {
// Test the match
if v != filterPatterns[i] {
return false
}
}
}
}
return true
}
var found bool
fullMatch := func(filter *Filter, match Match) bool {
// Test if we limit by stream
if streamName == "" || filter.Stream.Name == streamName {
// Test if we limit by filter
if filterName == "" || filter.Name == filterName {
found = true
filterPatterns := match.Split()
// If there is only one pattern
if requestedPattern != "" {
for _, filterPattern := range filterPatterns {
if requestedPattern == filterPattern {
return true
}
}
} else {
return isCompatible(filter) && kvMatch(filter, filterPatterns)
}
}
}
return false
}
response := SendAndRetrieve(Request{Info, PSF{}})
if response.Err != nil {
logger.Fatalln("Received error from daemon:", response.Err)
}
commands := make([]PSF, 0)
cs := make(ClientStatus)
for pf, times := range response.Matches {
if fullMatch(pf.F, pf.P) {
commands = append(commands, PSF{pf.P, pf.F.Stream.Name, pf.F.Name})
addMatchToCS(cs, pf, times)
}
}
for pa, times := range response.Actions {
if fullMatch(pa.A.Filter, pa.P) {
commands = append(commands, PSF{pa.P, pa.A.Filter.Stream.Name, pa.A.Filter.Name})
addActionToCS(cs, pa, times)
}
}
if !found {
logger.Println(logger.WARN, "No matching stream.filter items found. This does not mean it doesn't exist, maybe it just didn't receive any match.")
os.Exit(1)
}
processFilter := func(patterns []string, stream, filter, format string) {
fpqty := len(responseConfig.Config.Streams[stream].Filters[filter].Pattern)
if len(patterns) > fpqty {
logger.Fatalln("filter have 1 pattern")
}
for _, kv := range patterns {
if len(strings.Split(kv, "=")) != 2 && fpqty > 1 {
logger.Fatalln("args should be in pattern=value format using more than one pattern in filter")
}
}
// Transform arg to k=v
if fpqty == 1 && len(strings.Split(patterns[0], "=")) == 1 {
patterns[0] = strings.Join([]string{responseConfig.Config.Streams[stream].Filters[filter].Pattern[0].Name, patterns[0]}, "=")
}
// arg pattern map
pmap := make(map[string]string)
for _, p := range patterns {
a := strings.Split(p, "=")
pmap[a[0]] = a[1]
}
// Check every arg pattern exist in filter
for k, _ := range pmap {
found := false
for _, fp := range responseConfig.Config.Streams[stream].Filters[filter].Pattern {
if strings.EqualFold(k, fp.Name) {
found = true
break
}
}
if !found {
logger.Fatalf("pattern <%s> not found in filter %s.%s\n", k, stream, filter)
}
}
// Order arg patterns before JoinMatch
var orderedPatterns []string
for _, p := range responseConfig.Config.Streams[stream].Filters[filter].Pattern {
if v, found := pmap[p.Name]; found {
orderedPatterns = append(orderedPatterns, v)
} else {
orderedPatterns = append(orderedPatterns, "")
}
}
response := SendAndRetrieve(Request{Flush, stream, filter, JoinMatch(orderedPatterns)})
for _, psf := range commands {
response := SendAndRetrieve(Request{Flush, psf})
if response.Err != nil {
logger.Fatalln(logger.ERROR, "Received error from daemon:", response.Err)
}
var text []byte
var err error
if format == "json" {
text, err = json.MarshalIndent(ClientStatusFlush(response.ClientStatus), "", " ")
} else {
text, err = yaml.Marshal(ClientStatusFlush(response.ClientStatus))
}
if err != nil {
logger.Fatalln("Failed to convert daemon binary response to text format:", err)
}
fmt.Println(string(text))
}
for streamName := range responseConfig.Config.Streams {
for filterName := range responseConfig.Config.Streams[streamName].Filters {
processFilter(patterns, streamName, filterName, format)
logger.Fatalln("Received error from daemon:", response.Err)
}
}
printClientStatus(cs, format)
os.Exit(0)
}
func TestRegex(confFilename, regex, line string) {

View file

@ -5,7 +5,6 @@ import (
"os"
"os/exec"
"os/signal"
"regexp"
"strings"
"sync"
"syscall"
@ -91,11 +90,11 @@ func (f *Filter) match(line *string) Match {
}
}
if len(result) == len(f.Pattern) {
logger.Printf(logger.INFO, "%s.%s: match %s", f.stream.name, f.name, WithBrackets(result))
logger.Printf(logger.INFO, "%s.%s: match %s", f.Stream.Name, f.Name, WithBrackets(result))
return JoinMatch(result)
}
} else {
logger.Printf(logger.INFO, "%s.%s: match [.]\n", f.stream.name, f.name)
logger.Printf(logger.INFO, "%s.%s: match [.]\n", f.Stream.Name, f.Name)
// No pattern, so this match will never actually be used
return "."
}
@ -115,12 +114,12 @@ func (a *Action) exec(match Match) {
var computedCommand []string
if a.filter.Pattern != nil {
if a.Filter.Pattern != nil {
computedCommand = make([]string, 0, len(a.Cmd))
matches := match.Split()
for _, item := range a.Cmd {
for i, p := range a.filter.Pattern {
for i, p := range a.Filter.Pattern {
item = strings.ReplaceAll(item, p.nameWithBraces, matches[i])
}
computedCommand = append(computedCommand, item)
@ -129,12 +128,12 @@ func (a *Action) exec(match Match) {
computedCommand = a.Cmd
}
logger.Printf(logger.INFO, "%s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand)
logger.Printf(logger.INFO, "%s.%s.%s: run %s\n", a.Filter.Stream.Name, a.Filter.Name, a.Name, computedCommand)
cmd := exec.Command(computedCommand[0], computedCommand[1:]...)
if ret := cmd.Run(); ret != nil {
logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret)
logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.Filter.Stream.Name, a.Filter.Name, a.Name, computedCommand, ret)
}
}
@ -147,7 +146,7 @@ func ActionsManager(concurrency int) {
var pa PA
for {
pa = <-execActionsC
pa.a.exec(pa.p)
pa.A.exec(pa.P)
}
}()
}
@ -157,7 +156,7 @@ func ActionsManager(concurrency int) {
for {
pa = <-execActionsC
go func(pa PA) {
pa.a.exec(pa.p)
pa.A.exec(pa.P)
}(pa)
}
}()
@ -172,65 +171,49 @@ func ActionsManager(concurrency int) {
for {
select {
case pat := <-actionsC:
pa := PA{pat.p, pat.a}
pattern, action, then := pat.p, pat.a, pat.t
pa := PA{pat.P, pat.A}
pattern, action, then := pat.P, pat.A, pat.T
now := time.Now()
// check if must be executed now
if then.Compare(now) <= 0 {
execAction(action, pattern)
} else {
actionsLock.Lock()
if actions[pa] == nil {
actions[pa] = make(map[time.Time]struct{})
}
actions[pa][then] = struct{}{}
actionsLock.Unlock()
go func(insidePat PAT, insideNow time.Time) {
time.Sleep(insidePat.t.Sub(insideNow))
time.Sleep(insidePat.T.Sub(insideNow))
pendingActionsC <- insidePat
}(pat, now)
}
case pat := <-pendingActionsC:
pa := PA{pat.p, pat.a}
pattern, action, then := pat.p, pat.a, pat.t
actionsLock.Lock()
pa := PA{pat.P, pat.A}
pattern, action, then := pat.P, pat.A, pat.T
if actions[pa] != nil {
delete(actions[pa], then)
}
actionsLock.Unlock()
execAction(action, pattern)
case fo := <-flushToActionsC:
ret := make(ActionsMap)
match := 0
actionsLock.Lock()
for pa := range actions {
ppa, pfo := pa.p.Split(), fo.p.Split()
for i, p := range ppa {
if m, err := regexp.MatchString(pfo[i], p); err == nil && m == true {
match++
}
}
if match == len(ppa) {
if fo.S == pa.A.Filter.Stream.Name &&
fo.F == pa.A.Filter.Name &&
fo.P == pa.P {
for range actions[pa] {
execAction(pa.a, pa.p)
execAction(pa.A, pa.P)
}
ret[pa] = actions[pa]
delete(actions, pa)
break
}
match = 0
}
actionsLock.Unlock()
fo.ret <- ret
case _, _ = <-stopActions:
actionsLock.Lock()
for pa := range actions {
if pa.a.OnExit {
if pa.A.OnExit {
for range actions[pa] {
execAction(pa.a, pa.p)
execAction(pa.A, pa.P)
}
}
}
actionsLock.Unlock()
wgActions.Done()
return
}
@ -238,7 +221,7 @@ func ActionsManager(concurrency int) {
}
func MatchesManager() {
var fo FlushMatchOrder
var fo PSF
var pft PFT
end := false
@ -261,7 +244,7 @@ func MatchesManager() {
matchesManagerHandleFlush(fo)
case pft = <-matchesC:
entry := LogEntry{pft.t, 0, pft.p, pft.f.stream.name, pft.f.name, 0, false}
entry := LogEntry{pft.T, 0, pft.P, pft.F.Stream.Name, pft.F.Name, 0, false}
entry.Exec = matchesManagerHandleMatch(pft)
@ -270,37 +253,25 @@ func MatchesManager() {
}
}
func matchesManagerHandleFlush(fo FlushMatchOrder) {
ret := make(MatchesMap)
match := 0
func matchesManagerHandleFlush(fo PSF) {
matchesLock.Lock()
for pf := range matches {
ppf, pfo := pf.p.Split(), fo.p.Split()
for i, p := range ppf {
if m, err := regexp.MatchString(pfo[i], p); err == nil && m == true {
match++
}
}
if match == len(ppf) {
if fo.ret != nil {
ret[pf] = matches[pf]
}
if fo.S == pf.F.Stream.Name &&
fo.F == pf.F.Name &&
fo.P == pf.P {
delete(matches, pf)
break
}
match = 0
}
matchesLock.Unlock()
if fo.ret != nil {
fo.ret <- ret
}
}
func matchesManagerHandleMatch(pft PFT) bool {
matchesLock.Lock()
defer matchesLock.Unlock()
filter, patterns, then := pft.f, pft.p, pft.t
pf := PF{pft.p, pft.f}
filter, patterns, then := pft.F, pft.P, pft.T
pf := PF{pft.P, pft.F}
if filter.Retry > 1 {
// make sure map exists
@ -332,7 +303,7 @@ func matchesManagerHandleMatch(pft PFT) bool {
func StreamManager(s *Stream, endedSignal chan *Stream) {
defer wgStreams.Done()
logger.Printf(logger.INFO, "%s: start %s\n", s.name, s.Cmd)
logger.Printf(logger.INFO, "%s: start %s\n", s.Name, s.Cmd)
lines := cmdStdout(s.Cmd)
for {
@ -356,7 +327,6 @@ func StreamManager(s *Stream, endedSignal chan *Stream) {
var actions ActionsMap
var matches MatchesMap
var actionsLock sync.Mutex
var matchesLock sync.Mutex
var stopStreams chan bool
@ -391,10 +361,10 @@ var logsC chan LogEntry
var actionsC chan PAT
// SocketManager, DatabaseManager → MatchesManager
var flushToMatchesC chan FlushMatchOrder
var flushToMatchesC chan PSF
// SocketManager → ActionsManager
var flushToActionsC chan FlushActionOrder
var flushToActionsC chan PSF
// SocketManager → DatabaseManager
var flushToDatabaseC chan LogEntry
@ -406,8 +376,8 @@ func Daemon(confFilename string) {
matchesC = make(chan PFT)
logsC = make(chan LogEntry)
actionsC = make(chan PAT)
flushToMatchesC = make(chan FlushMatchOrder)
flushToActionsC = make(chan FlushActionOrder)
flushToMatchesC = make(chan PSF)
flushToActionsC = make(chan PSF)
flushToDatabaseC = make(chan LogEntry)
stopActions = make(chan bool)
stopStreams = make(chan bool)
@ -438,7 +408,7 @@ func Daemon(confFilename string) {
for {
select {
case finishedStream := <-endSignals:
logger.Printf(logger.ERROR, "%s stream finished", finishedStream.name)
logger.Printf(logger.ERROR, "%s stream finished", finishedStream.Name)
nbStreamsInExecution--
if nbStreamsInExecution == 0 {
quit(conf, false)

View file

@ -106,7 +106,8 @@ func basicUsage() {
` + bold + `reaction flush` + reset + ` TARGET [TARGET...]
# remove currently active matches and run currently pending actions for the specified TARGET(s)
# (then show flushed matches and actions)
# e.g. reaction flush 192.168.1.1 root
# e.g. reaction flush 192.168.1.1
# e.g. reaction flush ip=192.168.1.1 user=root
# options:
-s/--socket SOCKET # path to the client-daemon communication socket
@ -192,6 +193,7 @@ func Main(version, commit string) {
var regex *regexp.Regexp
var err error
if *pattern != "" {
// TODO anchor ^$ the pattern?
regex, err = regexp.Compile(*pattern)
if err != nil {
logger.Fatalln("-p/--pattern: ", err)
@ -223,11 +225,12 @@ func Main(version, commit string) {
stream, filter := "", ""
if *limit != "" {
splitSF := strings.Split(*limit, ".")
if len(splitSF) != 2 {
stream = splitSF[0]
if len(splitSF) == 2 {
filter = splitSF[1]
} else if len(splitSF) > 2 {
logger.Fatalln("-l/--limit: only one . separator is supported")
}
stream = splitSF[0]
filter = splitSF[1]
}
ClientFlush(f.Args(), stream, filter, *queryFormat)

View file

@ -125,7 +125,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E
defer func() {
for sf, t := range discardedEntries {
if t > 0 {
logger.Printf(logger.WARN, "info discarded %v times from the DBs: stream/filter not found: %s.%s\n", t, sf.s, sf.f)
logger.Printf(logger.WARN, "info discarded %v times from the DBs: stream/filter not found: %s.%s\n", t, sf.S, sf.F)
}
}
if malformedEntries > 0 {
@ -186,8 +186,8 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E
discardedEntries[SF{"", ""}]++
continue
}
entry.Stream = sf.s
entry.Filter = sf.f
entry.Stream = sf.S
entry.Filter = sf.F
}
if stream := c.Streams[entry.Stream]; stream != nil {
filter = stream.Filters[entry.Filter]
@ -231,7 +231,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E
// replay executions
if entry.Exec && entry.T.Add(*filter.longuestActionDuration).Unix() > now.Unix() {
if startup {
flushToMatchesC <- FlushMatchOrder{entry.Pattern, nil}
flushToMatchesC <- PSF{entry.Pattern, entry.Stream, entry.Filter}
filter.sendActions(entry.Pattern, entry.T)
}

View file

@ -2,58 +2,15 @@ package app
import (
"encoding/gob"
"errors"
"net"
"os"
"path"
"sync"
"time"
"framagit.org/ppom/reaction/logger"
)
func genClientStatus(local_actions ActionsMap, local_matches MatchesMap, local_actionsLock, local_matchesLock *sync.Mutex) ClientStatus {
cs := make(ClientStatus)
local_matchesLock.Lock()
// Painful data manipulation
for pf, times := range local_matches {
patterns, filter := pf.p, pf.f
if cs[filter.stream.name] == nil {
cs[filter.stream.name] = make(map[string]MapPatternStatus)
}
if cs[filter.stream.name][filter.name] == nil {
cs[filter.stream.name][filter.name] = make(MapPatternStatus)
}
cs[filter.stream.name][filter.name][patterns] = &PatternStatus{len(times), nil}
}
local_matchesLock.Unlock()
local_actionsLock.Lock()
// Painful data manipulation
for pa, times := range local_actions {
patterns, action := pa.p, pa.a
if cs[action.filter.stream.name] == nil {
cs[action.filter.stream.name] = make(map[string]MapPatternStatus)
}
if cs[action.filter.stream.name][action.filter.name] == nil {
cs[action.filter.stream.name][action.filter.name] = make(MapPatternStatus)
}
if cs[action.filter.stream.name][action.filter.name][patterns] == nil {
cs[action.filter.stream.name][action.filter.name][patterns] = new(PatternStatus)
}
ps := cs[action.filter.stream.name][action.filter.name][patterns]
if ps.Actions == nil {
ps.Actions = make(map[string][]string)
}
for then := range times {
ps.Actions[action.name] = append(ps.Actions[action.name], then.Format(time.DateTime))
}
}
local_actionsLock.Unlock()
return cs
}
func createOpenSocket() net.Listener {
err := os.MkdirAll(path.Dir(*SocketPath), 0755)
if err != nil {
@ -97,24 +54,20 @@ func SocketManager(conf *Conf) {
}
switch request.Request {
case Show:
response.ClientStatus = genClientStatus(actions, matches, &actionsLock, &matchesLock)
case Info:
// response.Config = *conf
response.Matches = matches
response.Actions = actions
case Flush:
le := LogEntry{time.Now(), 0, request.Pattern, request.Stream, request.Filter, 0, false}
le := LogEntry{time.Now(), 0, request.Flush.P, request.Flush.S, request.Flush.F, 0, false}
matchesC := FlushMatchOrder{request.Pattern, make(chan MatchesMap)}
actionsC := FlushActionOrder{request.Pattern, make(chan ActionsMap)}
flushToMatchesC <- matchesC
flushToActionsC <- actionsC
flushToMatchesC <- request.Flush
flushToActionsC <- request.Flush
flushToDatabaseC <- le
var lock sync.Mutex
response.ClientStatus = genClientStatus(<-actionsC.ret, <-matchesC.ret, &lock, &lock)
case Config:
response.Config = *conf
default:
logger.Println(logger.ERROR, "Invalid Message from cli: unrecognised Request type")
logger.Println(logger.ERROR, "Invalid Message from cli: unrecognised command type")
response.Err = errors.New("unrecognised command type")
return
}

View file

@ -65,39 +65,39 @@ func (c *Conf) setup() {
for streamName := range c.Streams {
stream := c.Streams[streamName]
stream.name = streamName
stream.Name = streamName
if strings.Contains(stream.name, ".") {
logger.Fatalf("Bad configuration: character '.' is not allowed in stream names: '%v'", stream.name)
if strings.Contains(stream.Name, ".") {
logger.Fatalf("Bad configuration: character '.' is not allowed in stream names: '%v'", stream.Name)
}
if len(stream.Filters) == 0 {
logger.Fatalf("Bad configuration: no filters configured in %v", stream.name)
logger.Fatalf("Bad configuration: no filters configured in %v", stream.Name)
}
for filterName := range stream.Filters {
filter := stream.Filters[filterName]
filter.stream = stream
filter.name = filterName
filter.Stream = stream
filter.Name = filterName
if strings.Contains(filter.name, ".") {
logger.Fatalf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.name)
if strings.Contains(filter.Name, ".") {
logger.Fatalf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.Name)
}
// Parse Duration
if filter.RetryPeriod == "" {
if filter.Retry > 1 {
logger.Fatalf("Bad configuration: retry but no retryperiod in %v.%v", stream.name, filter.name)
logger.Fatalf("Bad configuration: retry but no retryperiod in %v.%v", stream.Name, filter.Name)
}
} else {
retryDuration, err := time.ParseDuration(filter.RetryPeriod)
if err != nil {
logger.Fatalf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.name, filter.name, err)
logger.Fatalf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.Name, filter.Name, err)
}
filter.retryDuration = retryDuration
}
if len(filter.Regex) == 0 {
logger.Fatalf("Bad configuration: no regexes configured in %v.%v", stream.name, filter.name)
logger.Fatalf("Bad configuration: no regexes configured in %v.%v", stream.Name, filter.Name)
}
// Compute Regexes
// Look for Patterns inside Regexes
@ -114,32 +114,32 @@ func (c *Conf) setup() {
}
compiledRegex, err := regexp.Compile(regex)
if err != nil {
logger.Fatalf("Bad configuration: regex of filter %s.%s: %v", stream.name, filter.name, err)
logger.Fatalf("Bad configuration: regex of filter %s.%s: %v", stream.Name, filter.Name, err)
}
filter.compiledRegex = append(filter.compiledRegex, *compiledRegex)
}
if len(filter.Actions) == 0 {
logger.Fatalln("Bad configuration: no actions configured in", stream.name, ".", filter.name)
logger.Fatalln("Bad configuration: no actions configured in", stream.Name, ".", filter.Name)
}
for actionName := range filter.Actions {
action := filter.Actions[actionName]
action.filter = filter
action.name = actionName
action.Filter = filter
action.Name = actionName
if strings.Contains(action.name, ".") {
logger.Fatalln("Bad configuration: character '.' is not allowed in action names", action.name)
if strings.Contains(action.Name, ".") {
logger.Fatalln("Bad configuration: character '.' is not allowed in action names", action.Name)
}
// Parse Duration
if action.After != "" {
afterDuration, err := time.ParseDuration(action.After)
if err != nil {
logger.Fatalln("Bad configuration: Failed to parse after time in ", stream.name, ".", filter.name, ".", action.name, ":", err)
logger.Fatalln("Bad configuration: Failed to parse after time in ", stream.Name, ".", filter.Name, ".", action.Name, ":", err)
}
action.afterDuration = afterDuration
} else if action.OnExit {
logger.Fatalln("Bad configuration: Cannot have `onexit: true` without an `after` directive in", stream.name, ".", filter.name, ".", action.name)
logger.Fatalln("Bad configuration: Cannot have `onexit: true` without an `after` directive in", stream.Name, ".", filter.Name, ".", action.Name)
}
if filter.longuestActionDuration == nil || filter.longuestActionDuration.Milliseconds() < action.afterDuration.Milliseconds() {
filter.longuestActionDuration = &action.afterDuration

View file

@ -1,6 +1,7 @@
package app
import (
"bytes"
"encoding/gob"
"fmt"
"os"
@ -32,15 +33,33 @@ type Pattern struct {
// They're always referenced through pointers
type Stream struct {
name string `json:"-"`
Name string `json:"-"`
Cmd []string `json:"cmd"`
Filters map[string]*Filter `json:"filters"`
}
type LilStream struct {
Name string
}
func (s *Stream) GobEncode() ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(LilStream{s.Name})
return buf.Bytes(), err
}
func (s *Stream) GobDecode(b []byte)(error) {
var ls LilStream
dec := gob.NewDecoder(bytes.NewReader(b))
err := dec.Decode(&ls)
s.Name = ls.Name
return err
}
type Filter struct {
stream *Stream `json:"-"`
name string `json:"-"`
Stream *Stream `json:"-"`
Name string `json:"-"`
Regex []string `json:"regex"`
compiledRegex []regexp.Regexp `json:"-"`
@ -54,9 +73,34 @@ type Filter struct {
longuestActionDuration *time.Duration
}
// those small versions are needed to prevent infinite recursion in gob because of
// data cycles: Stream <-> Filter, Filter <-> Action
type LilFilter struct {
Stream *Stream
Name string
Pattern []*Pattern
}
func (f *Filter) GobDecode(b []byte)(error) {
var lf LilFilter
dec := gob.NewDecoder(bytes.NewReader(b))
err := dec.Decode(&lf)
f.Stream = lf.Stream
f.Name = lf.Name
f.Pattern = lf.Pattern
return err
}
func (f *Filter) GobEncode() ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(LilFilter{f.Stream, f.Name, f.Pattern})
return buf.Bytes(), err
}
type Action struct {
filter *Filter `json:"-"`
name string `json:"-"`
Filter *Filter `json:"-"`
Name string `json:"-"`
Cmd []string `json:"cmd"`
@ -65,6 +109,26 @@ type Action struct {
OnExit bool `json:"onexit"`
}
type LilAction struct {
Filter *Filter
Name string
}
func (a *Action) GobEncode() ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(LilAction{a.Filter, a.Name})
return buf.Bytes(), err
}
func (a *Action) GobDecode(b []byte)(error) {
var la LilAction
dec := gob.NewDecoder(bytes.NewReader(b))
err := dec.Decode(&la)
a.Filter = la.Filter
a.Name = la.Name
return err
}
type LogEntry struct {
T time.Time
@ -108,38 +172,29 @@ func WithBrackets(mm []string) string {
// Helper structs made to carry information
// Stream, Filter
type SF struct{ s, f string }
type SF struct{ S, F string }
// Pattern, Stream, Filter
type PSF struct {
p Match
s, f string
P Match
S, F string
}
type PF struct {
p Match
f *Filter
P Match
F *Filter
}
type PFT struct {
p Match
f *Filter
t time.Time
P Match
F *Filter
T time.Time
}
type PA struct {
p Match
a *Action
P Match
A *Action
}
type PAT struct {
p Match
a *Action
t time.Time
}
type FlushMatchOrder struct {
p Match
ret chan MatchesMap
}
type FlushActionOrder struct {
p Match
ret chan ActionsMap
P Match
A *Action
T time.Time
}

View file

@ -14,7 +14,7 @@
streams: {
tailDown1: {
cmd: ['sh', '-c', "echo 1_a 2_a 3_a a_1 a_2 a_3 | tr ' ' '\n' | while read i; do sleep 1; echo found $i; done"],
cmd: ['sh', '-c', "echo 1_a 2_a 3_a a_1 a_2 a_3 | tr ' ' '\n' | while read i; do sleep 1; echo found $i; done; sleep 30"],
filters: {
findIP: {
regex: [
@ -29,7 +29,30 @@
},
undamn: {
cmd: ['echo', 'undamn', '<num>'],
after: '4s',
after: '28s',
onexit: true,
},
},
},
},
},
tailDown2: {
cmd: ['sh', '-c', "echo 1_a 2_a 3_a a_1 a_2 a_3 | tr ' ' '\n' | while read i; do sleep 1; echo found $i; done; sleep 30"],
filters: {
findIP: {
regex: [
'^found <num>_<letter>$',
'^found <letter>_<num>$',
],
retry: 2,
retryperiod: '30s',
actions: {
damn: {
cmd: ['echo', '<num>'],
},
undamn: {
cmd: ['echo', 'undamn', '<num>'],
after: '28s',
onexit: true,
},
},