mirror of
https://framagit.org/ppom/reaction
synced 2026-03-14 12:45:47 +01:00
Remove old go codebase
This commit is contained in:
parent
e642620ae3
commit
2e9e7a2a7b
13 changed files with 0 additions and 2024 deletions
|
|
@ -1,4 +0,0 @@
|
|||
This is the old Go codebase of reaction, ie. all 0.x and 1.x versions.
|
||||
This codebase most probably won't be updated.
|
||||
|
||||
Development now continues in Rust for reaction 2.x.
|
||||
|
|
@ -1,393 +0,0 @@
|
|||
package app
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/gob"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"regexp"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"framagit.org/ppom/reaction/logger"
|
||||
"sigs.k8s.io/yaml"
|
||||
)
|
||||
|
||||
const (
|
||||
Info = 0
|
||||
Flush = 1
|
||||
)
|
||||
|
||||
type Request struct {
|
||||
Request int
|
||||
Flush PSF
|
||||
}
|
||||
|
||||
type Response struct {
|
||||
Err error
|
||||
// Config Conf
|
||||
Matches MatchesMap
|
||||
Actions ActionsMap
|
||||
}
|
||||
|
||||
func SendAndRetrieve(data Request) Response {
|
||||
conn, err := net.Dial("unix", *SocketPath)
|
||||
if err != nil {
|
||||
logger.Fatalln("Error opening connection to daemon:", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
err = gob.NewEncoder(conn).Encode(data)
|
||||
if err != nil {
|
||||
logger.Fatalln("Can't send message:", err)
|
||||
}
|
||||
|
||||
var response Response
|
||||
err = gob.NewDecoder(conn).Decode(&response)
|
||||
if err != nil {
|
||||
logger.Fatalln("Invalid answer from daemon:", err)
|
||||
}
|
||||
return response
|
||||
}
|
||||
|
||||
type PatternStatus struct {
|
||||
Matches int `json:"matches,omitempty"`
|
||||
Actions map[string][]string `json:"actions,omitempty"`
|
||||
}
|
||||
type MapPatternStatus map[Match]*PatternStatus
|
||||
type MapPatternStatusFlush MapPatternStatus
|
||||
|
||||
type ClientStatus map[string]map[string]MapPatternStatus
|
||||
type ClientStatusFlush ClientStatus
|
||||
|
||||
func (mps MapPatternStatusFlush) MarshalJSON() ([]byte, error) {
|
||||
for _, v := range mps {
|
||||
return json.Marshal(v)
|
||||
}
|
||||
return []byte(""), nil
|
||||
}
|
||||
|
||||
func (csf ClientStatusFlush) MarshalJSON() ([]byte, error) {
|
||||
ret := make(map[string]map[string]MapPatternStatusFlush)
|
||||
for k, v := range csf {
|
||||
ret[k] = make(map[string]MapPatternStatusFlush)
|
||||
for kk, vv := range v {
|
||||
ret[k][kk] = MapPatternStatusFlush(vv)
|
||||
}
|
||||
}
|
||||
return json.Marshal(ret)
|
||||
}
|
||||
|
||||
func pfMatches(streamName string, filterName string, regexes map[string]*regexp.Regexp, match Match, filter *Filter) bool {
|
||||
// Check stream and filter match
|
||||
if streamName != "" && streamName != filter.Stream.Name {
|
||||
return false
|
||||
}
|
||||
if filterName != "" && filterName != filter.Name {
|
||||
return false
|
||||
}
|
||||
// Check that all user requested patterns are in this filter
|
||||
var nbMatched int
|
||||
var localMatches = match.Split()
|
||||
// For each pattern of this filter
|
||||
for i, pattern := range filter.Pattern {
|
||||
// Check that this pattern has user requested name
|
||||
if reg, ok := regexes[pattern.Name]; ok {
|
||||
// Check that the PF.p[i] matches user requested pattern
|
||||
if reg.MatchString(localMatches[i]) {
|
||||
nbMatched++
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(regexes) != nbMatched {
|
||||
return false
|
||||
}
|
||||
// All checks passed
|
||||
return true
|
||||
}
|
||||
|
||||
func addMatchToCS(cs ClientStatus, pf PF, times map[time.Time]struct{}) {
|
||||
patterns, streamName, filterName := pf.P, pf.F.Stream.Name, pf.F.Name
|
||||
if cs[streamName] == nil {
|
||||
cs[streamName] = make(map[string]MapPatternStatus)
|
||||
}
|
||||
if cs[streamName][filterName] == nil {
|
||||
cs[streamName][filterName] = make(MapPatternStatus)
|
||||
}
|
||||
cs[streamName][filterName][patterns] = &PatternStatus{len(times), nil}
|
||||
}
|
||||
|
||||
func addActionToCS(cs ClientStatus, pa PA, times map[time.Time]struct{}) {
|
||||
patterns, streamName, filterName, actionName := pa.P, pa.A.Filter.Stream.Name, pa.A.Filter.Name, pa.A.Name
|
||||
if cs[streamName] == nil {
|
||||
cs[streamName] = make(map[string]MapPatternStatus)
|
||||
}
|
||||
if cs[streamName][filterName] == nil {
|
||||
cs[streamName][filterName] = make(MapPatternStatus)
|
||||
}
|
||||
if cs[streamName][filterName][patterns] == nil {
|
||||
cs[streamName][filterName][patterns] = new(PatternStatus)
|
||||
}
|
||||
ps := cs[streamName][filterName][patterns]
|
||||
if ps.Actions == nil {
|
||||
ps.Actions = make(map[string][]string)
|
||||
}
|
||||
for then := range times {
|
||||
ps.Actions[actionName] = append(ps.Actions[actionName], then.Format(time.DateTime))
|
||||
}
|
||||
}
|
||||
|
||||
func printClientStatus(cs ClientStatus, format string) {
|
||||
var text []byte
|
||||
var err error
|
||||
if format == "json" {
|
||||
text, err = json.MarshalIndent(cs, "", " ")
|
||||
} else {
|
||||
text, err = yaml.Marshal(cs)
|
||||
}
|
||||
if err != nil {
|
||||
logger.Fatalln("Failed to convert daemon binary response to text format:", err)
|
||||
}
|
||||
|
||||
fmt.Println(strings.ReplaceAll(string(text), "\\0", " "))
|
||||
}
|
||||
|
||||
func compileKVPatterns(kvpatterns []string) map[string]*regexp.Regexp {
|
||||
var regexes map[string]*regexp.Regexp
|
||||
regexes = make(map[string]*regexp.Regexp)
|
||||
for _, p := range kvpatterns {
|
||||
// p syntax already checked in Main
|
||||
key, value, found := strings.Cut(p, "=")
|
||||
if !found {
|
||||
logger.Printf(logger.ERROR, "Bad argument: no `=` in %v", p)
|
||||
logger.Fatalln("Patterns must be prefixed by their name (e.g. ip=1.1.1.1)")
|
||||
}
|
||||
if regexes[key] != nil {
|
||||
logger.Fatalf("Bad argument: same pattern name provided multiple times: %v", key)
|
||||
}
|
||||
compiled, err := regexp.Compile(fmt.Sprintf("^%v$", value))
|
||||
if err != nil {
|
||||
logger.Fatalf("Bad argument: Could not compile: `%v`: %v", value, err)
|
||||
}
|
||||
regexes[key] = compiled
|
||||
}
|
||||
return regexes
|
||||
}
|
||||
|
||||
func ClientShow(format, stream, filter string, kvpatterns []string) {
|
||||
response := SendAndRetrieve(Request{Info, PSF{}})
|
||||
if response.Err != nil {
|
||||
logger.Fatalln("Received error from daemon:", response.Err)
|
||||
}
|
||||
|
||||
cs := make(ClientStatus)
|
||||
|
||||
var regexes map[string]*regexp.Regexp
|
||||
|
||||
if len(kvpatterns) != 0 {
|
||||
regexes = compileKVPatterns(kvpatterns)
|
||||
}
|
||||
|
||||
var found bool
|
||||
|
||||
// Painful data manipulation
|
||||
for pf, times := range response.Matches {
|
||||
// Check this PF is not empty
|
||||
if len(times) == 0 {
|
||||
continue
|
||||
}
|
||||
if !pfMatches(stream, filter, regexes, pf.P, pf.F) {
|
||||
continue
|
||||
}
|
||||
addMatchToCS(cs, pf, times)
|
||||
found = true
|
||||
}
|
||||
|
||||
// Painful data manipulation
|
||||
for pa, times := range response.Actions {
|
||||
// Check this PF is not empty
|
||||
if len(times) == 0 {
|
||||
continue
|
||||
}
|
||||
if !pfMatches(stream, filter, regexes, pa.P, pa.A.Filter) {
|
||||
continue
|
||||
}
|
||||
addActionToCS(cs, pa, times)
|
||||
found = true
|
||||
}
|
||||
|
||||
if !found {
|
||||
logger.Println(logger.WARN, "No matching stream.filter items found. This does not mean it doesn't exist, maybe it just didn't receive any match.")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
printClientStatus(cs, format)
|
||||
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
// TODO : Show values we just flushed - for now we got no details :
|
||||
/*
|
||||
* % ./reaction flush -l ssh.failedlogin login=".*t"
|
||||
* ssh:
|
||||
* failedlogin:
|
||||
* actions:
|
||||
* unban:
|
||||
* - "2024-04-30 15:27:28"
|
||||
* - "2024-04-30 15:27:28"
|
||||
* - "2024-04-30 15:27:28"
|
||||
* - "2024-04-30 15:27:28"
|
||||
*
|
||||
*/
|
||||
func ClientFlush(format, streamName, filterName string, patterns []string) {
|
||||
requestedPatterns := compileKVPatterns(patterns)
|
||||
|
||||
// Remember which Filters are compatible with the query
|
||||
filterCompatibility := make(map[SF]bool)
|
||||
isCompatible := func(filter *Filter) bool {
|
||||
sf := SF{filter.Stream.Name, filter.Name}
|
||||
compatible, ok := filterCompatibility[sf]
|
||||
|
||||
// already tested
|
||||
if ok {
|
||||
return compatible
|
||||
}
|
||||
|
||||
for k := range requestedPatterns {
|
||||
if -1 == slices.IndexFunc(filter.Pattern, func(pattern *Pattern) bool {
|
||||
return pattern.Name == k
|
||||
}) {
|
||||
filterCompatibility[sf] = false
|
||||
return false
|
||||
}
|
||||
}
|
||||
filterCompatibility[sf] = true
|
||||
return true
|
||||
}
|
||||
|
||||
// match functions
|
||||
kvMatch := func(filter *Filter, filterPatterns []string) bool {
|
||||
// For each user requested pattern
|
||||
for k, v := range requestedPatterns {
|
||||
// Find its index on the Filter.Pattern
|
||||
for i, pattern := range filter.Pattern {
|
||||
if k == pattern.Name {
|
||||
// Test the match
|
||||
if !v.MatchString(filterPatterns[i]) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
var found bool
|
||||
fullMatch := func(filter *Filter, match Match) bool {
|
||||
// Test if we limit by stream
|
||||
if streamName == "" || filter.Stream.Name == streamName {
|
||||
// Test if we limit by filter
|
||||
if filterName == "" || filter.Name == filterName {
|
||||
found = true
|
||||
filterPatterns := match.Split()
|
||||
return isCompatible(filter) && kvMatch(filter, filterPatterns)
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
response := SendAndRetrieve(Request{Info, PSF{}})
|
||||
if response.Err != nil {
|
||||
logger.Fatalln("Received error from daemon:", response.Err)
|
||||
}
|
||||
|
||||
commands := make([]PSF, 0)
|
||||
|
||||
cs := make(ClientStatus)
|
||||
|
||||
for pf, times := range response.Matches {
|
||||
if fullMatch(pf.F, pf.P) {
|
||||
commands = append(commands, PSF{pf.P, pf.F.Stream.Name, pf.F.Name})
|
||||
addMatchToCS(cs, pf, times)
|
||||
}
|
||||
}
|
||||
|
||||
for pa, times := range response.Actions {
|
||||
if fullMatch(pa.A.Filter, pa.P) {
|
||||
commands = append(commands, PSF{pa.P, pa.A.Filter.Stream.Name, pa.A.Filter.Name})
|
||||
addActionToCS(cs, pa, times)
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
logger.Println(logger.WARN, "No matching stream.filter items found. This does not mean it doesn't exist, maybe it just didn't receive any match.")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
for _, psf := range commands {
|
||||
response := SendAndRetrieve(Request{Flush, psf})
|
||||
if response.Err != nil {
|
||||
logger.Fatalln("Received error from daemon:", response.Err)
|
||||
}
|
||||
}
|
||||
|
||||
printClientStatus(cs, format)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
func TestRegex(confFilename, regex, line string) {
|
||||
conf := parseConf(confFilename)
|
||||
|
||||
// Code close to app/startup.go
|
||||
var usedPatterns []*Pattern
|
||||
for _, pattern := range conf.Patterns {
|
||||
if strings.Contains(regex, pattern.nameWithBraces) {
|
||||
usedPatterns = append(usedPatterns, pattern)
|
||||
regex = strings.Replace(regex, pattern.nameWithBraces, pattern.Regex, 1)
|
||||
}
|
||||
}
|
||||
reg, err := regexp.Compile(regex)
|
||||
if err != nil {
|
||||
logger.Fatalln("ERROR the specified regex is invalid: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Code close to app/daemon.go
|
||||
match := func(line string) {
|
||||
var ignored bool
|
||||
if matches := reg.FindStringSubmatch(line); matches != nil {
|
||||
if usedPatterns != nil {
|
||||
var result []string
|
||||
for _, p := range usedPatterns {
|
||||
match := matches[reg.SubexpIndex(p.Name)]
|
||||
result = append(result, match)
|
||||
if !p.notAnIgnore(&match) {
|
||||
ignored = true
|
||||
}
|
||||
}
|
||||
if !ignored {
|
||||
fmt.Printf("\033[32mmatching\033[0m %v: %v\n", WithBrackets(result), line)
|
||||
} else {
|
||||
fmt.Printf("\033[33mignore matching\033[0m %v: %v\n", WithBrackets(result), line)
|
||||
}
|
||||
} else {
|
||||
fmt.Printf("\033[32mmatching\033[0m [%v]:\n", line)
|
||||
}
|
||||
} else {
|
||||
fmt.Printf("\033[31mno match\033[0m: %v\n", line)
|
||||
}
|
||||
}
|
||||
|
||||
if line != "" {
|
||||
match(line)
|
||||
} else {
|
||||
logger.Println(logger.INFO, "no second argument: reading from stdin")
|
||||
scanner := bufio.NewScanner(os.Stdin)
|
||||
for scanner.Scan() {
|
||||
match(scanner.Text())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,454 +0,0 @@
|
|||
package app
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"os"
|
||||
"os/exec"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"framagit.org/ppom/reaction/logger"
|
||||
)
|
||||
|
||||
// Executes a command and channel-send its stdout
|
||||
func cmdStdout(commandline []string) chan *string {
|
||||
lines := make(chan *string)
|
||||
|
||||
go func() {
|
||||
cmd := exec.Command(commandline[0], commandline[1:]...)
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
logger.Fatalln("couldn't open stdout on command:", err)
|
||||
}
|
||||
if err := cmd.Start(); err != nil {
|
||||
logger.Fatalln("couldn't start command:", err)
|
||||
}
|
||||
defer stdout.Close()
|
||||
scanner := bufio.NewScanner(stdout)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
lines <- &line
|
||||
logger.Println(logger.DEBUG, "stdout:", line)
|
||||
}
|
||||
close(lines)
|
||||
}()
|
||||
|
||||
return lines
|
||||
}
|
||||
|
||||
func runCommands(commands [][]string, moment string) bool {
|
||||
ok := true
|
||||
for _, command := range commands {
|
||||
cmd := exec.Command(command[0], command[1:]...)
|
||||
cmd.WaitDelay = time.Minute
|
||||
|
||||
logger.Printf(logger.INFO, "%v command: run %v\n", moment, command)
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
logger.Printf(logger.ERROR, "%v command: run %v: %v", moment, command, err)
|
||||
ok = false
|
||||
} else {
|
||||
err := cmd.Wait()
|
||||
if err != nil {
|
||||
logger.Printf(logger.ERROR, "%v command: run %v: %v", moment, command, err)
|
||||
ok = false
|
||||
}
|
||||
}
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
func (p *Pattern) notAnIgnore(match *string) bool {
|
||||
for _, regex := range p.compiledIgnoreRegex {
|
||||
if regex.MatchString(*match) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
for _, ignore := range p.Ignore {
|
||||
if ignore == *match {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Whether one of the filter's regexes is matched on a line
|
||||
func (f *Filter) match(line *string) Match {
|
||||
for _, regex := range f.compiledRegex {
|
||||
|
||||
if matches := regex.FindStringSubmatch(*line); matches != nil {
|
||||
if f.Pattern != nil {
|
||||
var result []string
|
||||
for _, p := range f.Pattern {
|
||||
match := matches[regex.SubexpIndex(p.Name)]
|
||||
if p.notAnIgnore(&match) {
|
||||
result = append(result, match)
|
||||
}
|
||||
}
|
||||
if len(result) == len(f.Pattern) {
|
||||
logger.Printf(logger.INFO, "%s.%s: match %s", f.Stream.Name, f.Name, WithBrackets(result))
|
||||
return JoinMatch(result)
|
||||
}
|
||||
} else {
|
||||
logger.Printf(logger.INFO, "%s.%s: match [.]\n", f.Stream.Name, f.Name)
|
||||
// No pattern, so this match will never actually be used
|
||||
return "."
|
||||
}
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (f *Filter) sendActions(match Match, at time.Time) {
|
||||
for _, a := range f.Actions {
|
||||
actionsC <- PAT{match, a, at.Add(a.afterDuration)}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Action) exec(match Match) {
|
||||
defer wgActions.Done()
|
||||
|
||||
var computedCommand []string
|
||||
|
||||
if a.Filter.Pattern != nil {
|
||||
computedCommand = make([]string, 0, len(a.Cmd))
|
||||
matches := match.Split()
|
||||
|
||||
for _, item := range a.Cmd {
|
||||
for i, p := range a.Filter.Pattern {
|
||||
item = strings.ReplaceAll(item, p.nameWithBraces, matches[i])
|
||||
}
|
||||
computedCommand = append(computedCommand, item)
|
||||
}
|
||||
} else {
|
||||
computedCommand = a.Cmd
|
||||
}
|
||||
|
||||
logger.Printf(logger.INFO, "%s.%s.%s: run %s\n", a.Filter.Stream.Name, a.Filter.Name, a.Name, computedCommand)
|
||||
|
||||
cmd := exec.Command(computedCommand[0], computedCommand[1:]...)
|
||||
|
||||
if ret := cmd.Run(); ret != nil {
|
||||
logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.Filter.Stream.Name, a.Filter.Name, a.Name, computedCommand, ret)
|
||||
}
|
||||
}
|
||||
|
||||
func ActionsManager(concurrency int) {
|
||||
// concurrency init
|
||||
execActionsC := make(chan PA)
|
||||
if concurrency > 0 {
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func() {
|
||||
var pa PA
|
||||
for {
|
||||
pa = <-execActionsC
|
||||
pa.A.exec(pa.P)
|
||||
}
|
||||
}()
|
||||
}
|
||||
} else {
|
||||
go func() {
|
||||
var pa PA
|
||||
for {
|
||||
pa = <-execActionsC
|
||||
go func(pa PA) {
|
||||
pa.A.exec(pa.P)
|
||||
}(pa)
|
||||
}
|
||||
}()
|
||||
}
|
||||
execAction := func(a *Action, p Match) {
|
||||
wgActions.Add(1)
|
||||
execActionsC <- PA{p, a}
|
||||
}
|
||||
|
||||
// main
|
||||
pendingActionsC := make(chan PAT)
|
||||
for {
|
||||
select {
|
||||
case pat := <-actionsC:
|
||||
pa := PA{pat.P, pat.A}
|
||||
pattern, action, then := pat.P, pat.A, pat.T
|
||||
now := time.Now()
|
||||
// check if must be executed now
|
||||
if then.Compare(now) <= 0 {
|
||||
execAction(action, pattern)
|
||||
} else {
|
||||
if actions[pa] == nil {
|
||||
actions[pa] = make(map[time.Time]struct{})
|
||||
}
|
||||
actions[pa][then] = struct{}{}
|
||||
go func(insidePat PAT, insideNow time.Time) {
|
||||
time.Sleep(insidePat.T.Sub(insideNow))
|
||||
pendingActionsC <- insidePat
|
||||
}(pat, now)
|
||||
}
|
||||
case pat := <-pendingActionsC:
|
||||
pa := PA{pat.P, pat.A}
|
||||
pattern, action, then := pat.P, pat.A, pat.T
|
||||
if actions[pa] != nil {
|
||||
delete(actions[pa], then)
|
||||
execAction(action, pattern)
|
||||
}
|
||||
case fo := <-flushToActionsC:
|
||||
for pa := range actions {
|
||||
if fo.S == pa.A.Filter.Stream.Name &&
|
||||
fo.F == pa.A.Filter.Name &&
|
||||
fo.P == pa.P {
|
||||
for range actions[pa] {
|
||||
execAction(pa.A, pa.P)
|
||||
}
|
||||
delete(actions, pa)
|
||||
break
|
||||
}
|
||||
}
|
||||
case _, _ = <-stopActions:
|
||||
for pa := range actions {
|
||||
if pa.A.OnExit {
|
||||
for range actions[pa] {
|
||||
execAction(pa.A, pa.P)
|
||||
}
|
||||
}
|
||||
}
|
||||
wgActions.Done()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func MatchesManager() {
|
||||
var fo PSF
|
||||
var pft PFT
|
||||
end := false
|
||||
|
||||
for !end {
|
||||
select {
|
||||
case fo = <-flushToMatchesC:
|
||||
matchesManagerHandleFlush(fo)
|
||||
case fo, ok := <-startupMatchesC:
|
||||
if !ok {
|
||||
end = true
|
||||
} else {
|
||||
_ = matchesManagerHandleMatch(fo)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case fo = <-flushToMatchesC:
|
||||
matchesManagerHandleFlush(fo)
|
||||
case pft = <-matchesC:
|
||||
|
||||
entry := LogEntry{pft.T, 0, pft.P, pft.F.Stream.Name, pft.F.Name, 0, false}
|
||||
|
||||
entry.Exec = matchesManagerHandleMatch(pft)
|
||||
|
||||
logsC <- entry
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func matchesManagerHandleFlush(fo PSF) {
|
||||
matchesLock.Lock()
|
||||
for pf := range matches {
|
||||
if fo.S == pf.F.Stream.Name &&
|
||||
fo.F == pf.F.Name &&
|
||||
fo.P == pf.P {
|
||||
delete(matches, pf)
|
||||
break
|
||||
}
|
||||
}
|
||||
matchesLock.Unlock()
|
||||
}
|
||||
|
||||
func matchesManagerHandleMatch(pft PFT) bool {
|
||||
matchesLock.Lock()
|
||||
defer matchesLock.Unlock()
|
||||
|
||||
filter, patterns, then := pft.F, pft.P, pft.T
|
||||
pf := PF{pft.P, pft.F}
|
||||
|
||||
if filter.Retry > 1 {
|
||||
// make sure map exists
|
||||
if matches[pf] == nil {
|
||||
matches[pf] = make(map[time.Time]struct{})
|
||||
}
|
||||
// add new match
|
||||
matches[pf][then] = struct{}{}
|
||||
// remove match when expired
|
||||
go func(pf PF, then time.Time) {
|
||||
time.Sleep(then.Sub(time.Now()) + filter.retryDuration)
|
||||
matchesLock.Lock()
|
||||
if matches[pf] != nil {
|
||||
// FIXME replace this and all similar occurences
|
||||
// by clear() when switching to go 1.21
|
||||
delete(matches[pf], then)
|
||||
}
|
||||
matchesLock.Unlock()
|
||||
}(pf, then)
|
||||
}
|
||||
|
||||
if filter.Retry <= 1 || len(matches[pf]) >= filter.Retry {
|
||||
delete(matches, pf)
|
||||
filter.sendActions(patterns, then)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func StreamManager(s *Stream, endedSignal chan *Stream) {
|
||||
defer wgStreams.Done()
|
||||
logger.Printf(logger.INFO, "%s: start %s\n", s.Name, s.Cmd)
|
||||
|
||||
lines := cmdStdout(s.Cmd)
|
||||
for {
|
||||
select {
|
||||
case line, ok := <-lines:
|
||||
if !ok {
|
||||
endedSignal <- s
|
||||
return
|
||||
}
|
||||
for _, filter := range s.Filters {
|
||||
if match := filter.match(line); match != "" {
|
||||
matchesC <- PFT{match, filter, time.Now()}
|
||||
}
|
||||
}
|
||||
case _, _ = <-stopStreams:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
var actions ActionsMap
|
||||
var matches MatchesMap
|
||||
var matchesLock sync.Mutex
|
||||
|
||||
var stopStreams chan bool
|
||||
var stopActions chan bool
|
||||
var wgActions sync.WaitGroup
|
||||
var wgStreams sync.WaitGroup
|
||||
|
||||
/*
|
||||
<StreamCmds>
|
||||
↓
|
||||
StreamManager onstartup:matches
|
||||
↓ ↓ ↑
|
||||
matches→ MatchesManager →logs→ DatabaseManager ←·
|
||||
↑ ↓ ↑
|
||||
↑ actions→ ActionsManager ↑
|
||||
↑ ↑ ↑
|
||||
SocketManager →flushes→→→→→→→→→→·→→→→→→→→→→→→→→→→·
|
||||
↑
|
||||
<Clients>
|
||||
*/
|
||||
|
||||
// DatabaseManager → MatchesManager
|
||||
var startupMatchesC chan PFT
|
||||
|
||||
// StreamManager → MatchesManager
|
||||
var matchesC chan PFT
|
||||
|
||||
// MatchesManager → DatabaseManager
|
||||
var logsC chan LogEntry
|
||||
|
||||
// MatchesManager → ActionsManager
|
||||
var actionsC chan PAT
|
||||
|
||||
// SocketManager, DatabaseManager → MatchesManager
|
||||
var flushToMatchesC chan PSF
|
||||
|
||||
// SocketManager → ActionsManager
|
||||
var flushToActionsC chan PSF
|
||||
|
||||
// SocketManager → DatabaseManager
|
||||
var flushToDatabaseC chan LogEntry
|
||||
|
||||
func Daemon(confFilename string) {
|
||||
conf := parseConf(confFilename)
|
||||
|
||||
startupMatchesC = make(chan PFT)
|
||||
matchesC = make(chan PFT)
|
||||
logsC = make(chan LogEntry)
|
||||
actionsC = make(chan PAT)
|
||||
flushToMatchesC = make(chan PSF)
|
||||
flushToActionsC = make(chan PSF)
|
||||
flushToDatabaseC = make(chan LogEntry)
|
||||
stopActions = make(chan bool)
|
||||
stopStreams = make(chan bool)
|
||||
actions = make(ActionsMap)
|
||||
matches = make(MatchesMap)
|
||||
|
||||
_ = runCommands(conf.Start, "start")
|
||||
|
||||
go DatabaseManager(conf)
|
||||
go MatchesManager()
|
||||
go ActionsManager(conf.Concurrency)
|
||||
|
||||
// Ready to start
|
||||
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
endSignals := make(chan *Stream)
|
||||
nbStreamsInExecution := len(conf.Streams)
|
||||
|
||||
for _, stream := range conf.Streams {
|
||||
wgStreams.Add(1)
|
||||
go StreamManager(stream, endSignals)
|
||||
}
|
||||
|
||||
go SocketManager(conf)
|
||||
|
||||
for {
|
||||
select {
|
||||
case finishedStream := <-endSignals:
|
||||
logger.Printf(logger.ERROR, "%s stream finished", finishedStream.Name)
|
||||
nbStreamsInExecution--
|
||||
if nbStreamsInExecution == 0 {
|
||||
quit(conf, false)
|
||||
}
|
||||
case <-sigs:
|
||||
// Trap endSignals, which may cause a deadlock otherwise
|
||||
go func() {
|
||||
ok := true
|
||||
for ok {
|
||||
_, ok = <-endSignals
|
||||
}
|
||||
}()
|
||||
logger.Printf(logger.INFO, "Received SIGINT/SIGTERM, exiting")
|
||||
quit(conf, true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func quit(conf *Conf, graceful bool) {
|
||||
// send stop to StreamManager·s
|
||||
close(stopStreams)
|
||||
logger.Println(logger.INFO, "Waiting for Streams to finish...")
|
||||
wgStreams.Wait()
|
||||
// ActionsManager calls wgActions.Done() when it has launched all pending actions
|
||||
wgActions.Add(1)
|
||||
// send stop to ActionsManager
|
||||
close(stopActions)
|
||||
// stop all actions
|
||||
logger.Println(logger.INFO, "Waiting for Actions to finish...")
|
||||
wgActions.Wait()
|
||||
// run stop commands
|
||||
stopOk := runCommands(conf.Stop, "stop")
|
||||
// delete pipe
|
||||
err := os.Remove(*SocketPath)
|
||||
if err != nil {
|
||||
logger.Println(logger.ERROR, "Failed to remove socket:", err)
|
||||
}
|
||||
|
||||
if !stopOk || !graceful {
|
||||
os.Exit(1)
|
||||
}
|
||||
os.Exit(0)
|
||||
}
|
||||
|
|
@ -1,108 +0,0 @@
|
|||
---
|
||||
# This example configuration file is a good starting point, but you're
|
||||
# strongly encouraged to take a look at the full documentation: https://reaction.ppom.me
|
||||
#
|
||||
# This file is using the well-established YAML configuration language.
|
||||
# Note that the more powerful JSONnet configuration language is also supported
|
||||
# and that the documentation uses JSONnet
|
||||
|
||||
# definitions are just a place to put chunks of conf you want to reuse in another place
|
||||
# using YAML anchors `&name` and pointers `*name`
|
||||
# definitions are not readed by reaction
|
||||
definitions:
|
||||
- &iptablesban [ 'ip46tables', '-w', '-A', 'reaction', '-s', '<ip>', '-j', 'DROP' ]
|
||||
- &iptablesunban [ 'ip46tables', '-w', '-D', 'reaction', '-s', '<ip>', '-j', 'DROP' ]
|
||||
# ip46tables is a minimal C program (only POSIX dependencies) present as a subdirectory.
|
||||
# it permits to handle both ipv4/iptables and ipv6/ip6tables commands
|
||||
|
||||
# if set to a positive number → max number of concurrent actions
|
||||
# if set to a negative number → no limit
|
||||
# if not specified or set to 0 → defaults to the number of CPUs on the system
|
||||
concurrency: 0
|
||||
|
||||
# patterns are substitued in regexes.
|
||||
# when a filter performs an action, it replaces the found pattern
|
||||
patterns:
|
||||
ip:
|
||||
# reaction regex syntax is defined here: https://github.com/google/re2/wiki/Syntax
|
||||
# simple version: regex: '(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F:]{2,90})'
|
||||
regex: '(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?:\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)){3}|(?:(?:[0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,7}:|(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,5}(?::[0-9a-fA-F]{1,4}){1,2}|(?:[0-9a-fA-F]{1,4}:){1,4}(?::[0-9a-fA-F]{1,4}){1,3}|(?:[0-9a-fA-F]{1,4}:){1,3}(?::[0-9a-fA-F]{1,4}){1,4}|(?:[0-9a-fA-F]{1,4}:){1,2}(?::[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:(?:(?::[0-9a-fA-F]{1,4}){1,6})|:(?:(?::[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(?::[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(?:ffff(?::0{1,4}){0,1}:){0,1}(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])|(?:[0-9a-fA-F]{1,4}:){1,4}:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9]))'
|
||||
ignore:
|
||||
- 127.0.0.1
|
||||
- ::1
|
||||
# Patterns can be ignored based on regexes, it will try to match the whole string detected by the pattern
|
||||
# ignoreregex:
|
||||
# - '10\.0\.[0-9]{1,3}\.[0-9]{1,3}'
|
||||
|
||||
# Those commands will be executed in order at start, before everything else
|
||||
start:
|
||||
- [ 'ip46tables', '-w', '-N', 'reaction' ]
|
||||
- [ 'ip46tables', '-w', '-I', 'INPUT', '-p', 'all', '-j', 'reaction' ]
|
||||
- [ 'ip46tables', '-w', '-I', 'FORWARD', '-p', 'all', '-j', 'reaction' ]
|
||||
|
||||
# Those commands will be executed in order at stop, after everything else
|
||||
stop:
|
||||
- [ 'ip46tables', '-w,', '-D', 'INPUT', '-p', 'all', '-j', 'reaction' ]
|
||||
- [ 'ip46tables', '-w,', '-D', 'FORWARD', '-p', 'all', '-j', 'reaction' ]
|
||||
- [ 'ip46tables', '-w', '-F', 'reaction' ]
|
||||
- [ 'ip46tables', '-w', '-X', 'reaction' ]
|
||||
|
||||
# streams are commands
|
||||
# they are run and their ouptut is captured
|
||||
# *example:* `tail -f /var/log/nginx/access.log`
|
||||
# their output will be used by one or more filters
|
||||
streams:
|
||||
# streams have a user-defined name
|
||||
ssh:
|
||||
# note that if the command is not in environment's `PATH`
|
||||
# its full path must be given.
|
||||
cmd: [ 'journalctl', '-n0', '-fu', 'sshd.service' ]
|
||||
# filters run actions when they match regexes on a stream
|
||||
filters:
|
||||
# filters have a user-defined name
|
||||
failedlogin:
|
||||
# reaction's regex syntax is defined here: https://github.com/google/re2/wiki/Syntax
|
||||
regex:
|
||||
# <ip> is predefined in the patterns section
|
||||
# ip's regex is inserted in the following regex
|
||||
- 'authentication failure;.*rhost=<ip>'
|
||||
- 'Failed password for .* from <ip>'
|
||||
- 'Connection (reset|closed) by (authenticating|invalid) user .* <ip>'
|
||||
# if retry and retryperiod are defined,
|
||||
# the actions will only take place if a same pattern is
|
||||
# found `retry` times in a `retryperiod` interval
|
||||
retry: 3
|
||||
# format is defined here: https://pkg.go.dev/time#ParseDuration
|
||||
retryperiod: 6h
|
||||
# actions are run by the filter when regexes are matched
|
||||
actions:
|
||||
# actions have a user-defined name
|
||||
ban:
|
||||
# YAML substitutes *reference by the value anchored at &reference
|
||||
cmd: *iptablesban
|
||||
unban:
|
||||
cmd: *iptablesunban
|
||||
# if after is defined, the action will not take place immediately, but after a specified duration
|
||||
# same format as retryperiod
|
||||
after: 48h
|
||||
# let's say reaction is quitting. does it run all those pending commands which had an `after` duration set?
|
||||
# if you want reaction to run those pending commands before exiting, you can set this:
|
||||
# onexit: true
|
||||
# (defaults to false)
|
||||
# here it is not useful because we will flush and delete the chain containing the bans anyway
|
||||
# (with the stop commands)
|
||||
|
||||
# persistence
|
||||
# tldr; when an `after` action is set in a filter, such filter acts as a 'jail',
|
||||
# which is persisted after reboots.
|
||||
#
|
||||
# when a filter is triggered, there are 2 flows:
|
||||
#
|
||||
# if none of its actions have an `after` directive set:
|
||||
# no action will be replayed.
|
||||
#
|
||||
# else (if at least one action has an `after` directive set):
|
||||
# if reaction stops while `after` actions are pending:
|
||||
# and reaction starts again while those actions would still be pending:
|
||||
# reaction executes the past actions (actions without after or with then+after < now)
|
||||
# and plans the execution of future actions (actions with then+after > now)
|
||||
|
|
@ -1,230 +0,0 @@
|
|||
package app
|
||||
|
||||
import (
|
||||
_ "embed"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"framagit.org/ppom/reaction/logger"
|
||||
)
|
||||
|
||||
func addStringFlag(names []string, defvalue string, f *flag.FlagSet) *string {
|
||||
var value string
|
||||
for _, name := range names {
|
||||
f.StringVar(&value, name, defvalue, "")
|
||||
}
|
||||
return &value
|
||||
}
|
||||
|
||||
func addBoolFlag(names []string, f *flag.FlagSet) *bool {
|
||||
var value bool
|
||||
for _, name := range names {
|
||||
f.BoolVar(&value, name, false, "")
|
||||
}
|
||||
return &value
|
||||
}
|
||||
|
||||
var SocketPath *string
|
||||
|
||||
func addSocketFlag(f *flag.FlagSet) *string {
|
||||
return addStringFlag([]string{"s", "socket"}, "/run/reaction/reaction.sock", f)
|
||||
}
|
||||
|
||||
func addConfFlag(f *flag.FlagSet) *string {
|
||||
return addStringFlag([]string{"c", "config"}, "", f)
|
||||
}
|
||||
|
||||
func addFormatFlag(f *flag.FlagSet) *string {
|
||||
return addStringFlag([]string{"f", "format"}, "yaml", f)
|
||||
}
|
||||
|
||||
func addLimitFlag(f *flag.FlagSet) *string {
|
||||
return addStringFlag([]string{"l", "limit"}, "", f)
|
||||
}
|
||||
|
||||
func addLevelFlag(f *flag.FlagSet) *string {
|
||||
return addStringFlag([]string{"l", "loglevel"}, "INFO", f)
|
||||
}
|
||||
|
||||
func subCommandParse(f *flag.FlagSet, maxRemainingArgs int) {
|
||||
help := addBoolFlag([]string{"h", "help"}, f)
|
||||
f.Parse(os.Args[2:])
|
||||
if *help {
|
||||
basicUsage()
|
||||
os.Exit(0)
|
||||
}
|
||||
// -1 = no limit to remaining args
|
||||
if maxRemainingArgs > -1 && len(f.Args()) > maxRemainingArgs {
|
||||
fmt.Printf("ERROR unrecognized argument(s): %v\n", f.Args()[maxRemainingArgs:])
|
||||
basicUsage()
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func basicUsage() {
|
||||
const (
|
||||
bold = "\033[1m"
|
||||
reset = "\033[0m"
|
||||
)
|
||||
fmt.Print(
|
||||
bold + `reaction help` + reset + `
|
||||
# print this help message
|
||||
|
||||
` + bold + `reaction start` + reset + `
|
||||
# start the daemon
|
||||
|
||||
# options:
|
||||
-c/--config CONFIG_FILE # configuration file in json, jsonnet or yaml format (required)
|
||||
-l/--loglevel LEVEL # minimum log level to show, in DEBUG, INFO, WARN, ERROR, FATAL
|
||||
# (default: INFO)
|
||||
-s/--socket SOCKET # path to the client-daemon communication socket
|
||||
# (default: /run/reaction/reaction.sock)
|
||||
|
||||
` + bold + `reaction example-conf` + reset + `
|
||||
# print a configuration file example
|
||||
|
||||
` + bold + `reaction show` + reset + ` [NAME=PATTERN...]
|
||||
# show current matches and which actions are still to be run for the specified PATTERN regexe(s)
|
||||
# (e.g know what is currenly banned)
|
||||
|
||||
reaction show
|
||||
reaction show "ip=192.168.1.1"
|
||||
reaction show "ip=192\.168\..*" login=root
|
||||
|
||||
# options:
|
||||
-s/--socket SOCKET # path to the client-daemon communication socket
|
||||
-f/--format yaml|json # (default: yaml)
|
||||
-l/--limit STREAM[.FILTER] # only show items related to this STREAM (or STREAM.FILTER)
|
||||
|
||||
` + bold + `reaction flush` + reset + ` NAME=PATTERN [NAME=PATTERN...]
|
||||
# remove currently active matches and run currently pending actions for the specified PATTERN regexe(s)
|
||||
# (then show flushed matches and actions)
|
||||
|
||||
reaction flush "ip=192.168.1.1"
|
||||
reaction flush "ip=192\.168\..*" login=root
|
||||
|
||||
# options:
|
||||
-s/--socket SOCKET # path to the client-daemon communication socket
|
||||
-f/--format yaml|json # (default: yaml)
|
||||
-l/--limit STREAM.FILTER # flush only items related to this STREAM.FILTER
|
||||
|
||||
` + bold + `reaction test-regex` + reset + ` REGEX LINE # test REGEX against LINE
|
||||
cat FILE | ` + bold + `reaction test-regex` + reset + ` REGEX # test REGEX against each line of FILE
|
||||
|
||||
# options:
|
||||
-c/--config CONFIG_FILE # configuration file in json, jsonnet or yaml format
|
||||
# optional: permits to use configured patterns like <ip> in regex
|
||||
|
||||
` + bold + `reaction version` + reset + `
|
||||
# print version information
|
||||
|
||||
see usage examples, service configurations and good practices
|
||||
on the ` + bold + `wiki` + reset + `: https://reaction.ppom.me
|
||||
`)
|
||||
}
|
||||
|
||||
//go:embed example.yml
|
||||
var exampleConf string
|
||||
|
||||
func Main(version string) {
|
||||
if len(os.Args) <= 1 {
|
||||
logger.Fatalln("No argument provided. Try `reaction help`")
|
||||
basicUsage()
|
||||
os.Exit(1)
|
||||
}
|
||||
f := flag.NewFlagSet(os.Args[1], flag.ExitOnError)
|
||||
switch os.Args[1] {
|
||||
case "help", "-h", "-help", "--help":
|
||||
basicUsage()
|
||||
|
||||
case "version", "-v", "--version":
|
||||
fmt.Printf("reaction version %v\n", version)
|
||||
|
||||
case "example-conf":
|
||||
subCommandParse(f, 0)
|
||||
fmt.Print(exampleConf)
|
||||
|
||||
case "start":
|
||||
SocketPath = addSocketFlag(f)
|
||||
confFilename := addConfFlag(f)
|
||||
logLevel := addLevelFlag(f)
|
||||
subCommandParse(f, 0)
|
||||
if *confFilename == "" {
|
||||
logger.Fatalln("no configuration file provided")
|
||||
basicUsage()
|
||||
os.Exit(1)
|
||||
}
|
||||
logLevelType := logger.FromString(*logLevel)
|
||||
if logLevelType == logger.UNKNOWN {
|
||||
logger.Fatalf("Log Level %v not recognized", logLevel)
|
||||
basicUsage()
|
||||
os.Exit(1)
|
||||
}
|
||||
logger.SetLogLevel(logLevelType)
|
||||
Daemon(*confFilename)
|
||||
|
||||
case "show":
|
||||
SocketPath = addSocketFlag(f)
|
||||
queryFormat := addFormatFlag(f)
|
||||
limit := addLimitFlag(f)
|
||||
subCommandParse(f, -1)
|
||||
if *queryFormat != "yaml" && *queryFormat != "json" {
|
||||
logger.Fatalln("only yaml and json formats are supported")
|
||||
}
|
||||
stream, filter := "", ""
|
||||
if *limit != "" {
|
||||
splitSF := strings.Split(*limit, ".")
|
||||
stream = splitSF[0]
|
||||
if len(splitSF) == 2 {
|
||||
filter = splitSF[1]
|
||||
} else if len(splitSF) > 2 {
|
||||
logger.Fatalln("-l/--limit: only one . separator is supported")
|
||||
}
|
||||
}
|
||||
ClientShow(*queryFormat, stream, filter, f.Args())
|
||||
|
||||
case "flush":
|
||||
SocketPath = addSocketFlag(f)
|
||||
queryFormat := addFormatFlag(f)
|
||||
limit := addLimitFlag(f)
|
||||
subCommandParse(f, -1)
|
||||
if *queryFormat != "yaml" && *queryFormat != "json" {
|
||||
logger.Fatalln("only yaml and json formats are supported")
|
||||
}
|
||||
if len(f.Args()) == 0 {
|
||||
logger.Fatalln("subcommand flush takes at least one TARGET argument")
|
||||
}
|
||||
stream, filter := "", ""
|
||||
if *limit != "" {
|
||||
splitSF := strings.Split(*limit, ".")
|
||||
stream = splitSF[0]
|
||||
if len(splitSF) == 2 {
|
||||
filter = splitSF[1]
|
||||
} else if len(splitSF) > 2 {
|
||||
logger.Fatalln("-l/--limit: only one . separator is supported")
|
||||
}
|
||||
}
|
||||
ClientFlush(*queryFormat, stream, filter, f.Args())
|
||||
|
||||
case "test-regex":
|
||||
// socket not needed, no interaction with the daemon
|
||||
confFilename := addConfFlag(f)
|
||||
subCommandParse(f, 2)
|
||||
if *confFilename == "" {
|
||||
logger.Println(logger.WARN, "no configuration file provided. Can't make use of registered patterns.")
|
||||
}
|
||||
if f.Arg(0) == "" {
|
||||
logger.Fatalln("subcommand test-regex takes at least one REGEX argument")
|
||||
basicUsage()
|
||||
os.Exit(1)
|
||||
}
|
||||
TestRegex(*confFilename, f.Arg(0), f.Arg(1))
|
||||
|
||||
default:
|
||||
logger.Fatalf("subcommand %v not recognized. Try `reaction help`", os.Args[1])
|
||||
basicUsage()
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,264 +0,0 @@
|
|||
package app
|
||||
|
||||
import (
|
||||
"encoding/gob"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"framagit.org/ppom/reaction/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
logDBName = "./reaction-matches.db"
|
||||
logDBNewName = "./reaction-matches.new.db"
|
||||
flushDBName = "./reaction-flushes.db"
|
||||
)
|
||||
|
||||
func openDB(path string) (bool, *ReadDB) {
|
||||
file, err := os.Open(path)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
logger.Printf(logger.WARN, "No DB found at %s. It's ok if this is the first time reaction is running.\n", path)
|
||||
return true, nil
|
||||
}
|
||||
logger.Fatalln("Failed to open DB:", err)
|
||||
}
|
||||
return false, &ReadDB{file, gob.NewDecoder(file)}
|
||||
}
|
||||
|
||||
func createDB(path string) *WriteDB {
|
||||
file, err := os.Create(path)
|
||||
if err != nil {
|
||||
logger.Fatalln("Failed to create DB:", err)
|
||||
}
|
||||
return &WriteDB{file, gob.NewEncoder(file)}
|
||||
}
|
||||
|
||||
func DatabaseManager(c *Conf) {
|
||||
logDB, flushDB := c.RotateDB(true)
|
||||
close(startupMatchesC)
|
||||
c.manageLogs(logDB, flushDB)
|
||||
}
|
||||
|
||||
func (c *Conf) manageLogs(logDB *WriteDB, flushDB *WriteDB) {
|
||||
cpt := 0
|
||||
writeSF2int := make(map[SF]int)
|
||||
writeCpt := 1
|
||||
for {
|
||||
select {
|
||||
case entry := <-flushToDatabaseC:
|
||||
flushDB.enc.Encode(entry)
|
||||
case entry := <-logsC:
|
||||
encodeOrFatal(logDB.enc, entry, writeSF2int, &writeCpt)
|
||||
cpt++
|
||||
// let's say 100 000 entries ~ 10 MB
|
||||
if cpt == 500_000 {
|
||||
cpt = 0
|
||||
logger.Printf(logger.INFO, "Rotating database...")
|
||||
logDB.file.Close()
|
||||
flushDB.file.Close()
|
||||
logDB, flushDB = c.RotateDB(false)
|
||||
logger.Printf(logger.INFO, "Rotated database")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conf) RotateDB(startup bool) (*WriteDB, *WriteDB) {
|
||||
var (
|
||||
doesntExist bool
|
||||
err error
|
||||
logReadDB *ReadDB
|
||||
flushReadDB *ReadDB
|
||||
logWriteDB *WriteDB
|
||||
flushWriteDB *WriteDB
|
||||
)
|
||||
doesntExist, logReadDB = openDB(logDBName)
|
||||
if doesntExist {
|
||||
return createDB(logDBName), createDB(flushDBName)
|
||||
}
|
||||
doesntExist, flushReadDB = openDB(flushDBName)
|
||||
if doesntExist {
|
||||
logger.Println(logger.WARN, "Strange! No flushes db, opening /dev/null instead")
|
||||
doesntExist, flushReadDB = openDB("/dev/null")
|
||||
if doesntExist {
|
||||
logger.Fatalln("Opening dummy /dev/null failed")
|
||||
}
|
||||
}
|
||||
|
||||
logWriteDB = createDB(logDBNewName)
|
||||
|
||||
rotateDB(c, logReadDB.dec, flushReadDB.dec, logWriteDB.enc, startup)
|
||||
|
||||
err = logReadDB.file.Close()
|
||||
if err != nil {
|
||||
logger.Fatalln("Failed to close old DB:", err)
|
||||
}
|
||||
|
||||
// It should be ok to rename an open file
|
||||
err = os.Rename(logDBNewName, logDBName)
|
||||
if err != nil {
|
||||
logger.Fatalln("Failed to replace old DB with new one:", err)
|
||||
}
|
||||
|
||||
err = os.Remove(flushDBName)
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
logger.Fatalln("Failed to delete old DB:", err)
|
||||
}
|
||||
|
||||
flushWriteDB = createDB(flushDBName)
|
||||
return logWriteDB, flushWriteDB
|
||||
}
|
||||
|
||||
func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.Encoder, startup bool) {
|
||||
// This mapping is a space optimization feature
|
||||
// It permits to compress stream+filter to a small number (which is a byte in gob)
|
||||
// We do this only for matches, not for flushes
|
||||
readSF2int := make(map[int]SF)
|
||||
writeSF2int := make(map[SF]int)
|
||||
writeCounter := 1
|
||||
// This extra code is made to warn only one time for each non-existant filter
|
||||
discardedEntries := make(map[SF]int)
|
||||
malformedEntries := 0
|
||||
defer func() {
|
||||
for sf, t := range discardedEntries {
|
||||
if t > 0 {
|
||||
logger.Printf(logger.WARN, "info discarded %v times from the DBs: stream/filter not found: %s.%s\n", t, sf.S, sf.F)
|
||||
}
|
||||
}
|
||||
if malformedEntries > 0 {
|
||||
logger.Printf(logger.WARN, "%v malformed entries discarded from the DBs\n", malformedEntries)
|
||||
}
|
||||
}()
|
||||
|
||||
// pattern, stream, fitler → last flush
|
||||
flushes := make(map[*PSF]time.Time)
|
||||
for {
|
||||
var entry LogEntry
|
||||
var filter *Filter
|
||||
// decode entry
|
||||
err := flushDec.Decode(&entry)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
malformedEntries++
|
||||
continue
|
||||
}
|
||||
|
||||
// retrieve related filter
|
||||
if entry.Stream != "" || entry.Filter != "" {
|
||||
if stream := c.Streams[entry.Stream]; stream != nil {
|
||||
filter = stream.Filters[entry.Filter]
|
||||
}
|
||||
if filter == nil {
|
||||
discardedEntries[SF{entry.Stream, entry.Filter}]++
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// store
|
||||
flushes[&PSF{entry.Pattern, entry.Stream, entry.Filter}] = entry.T
|
||||
}
|
||||
|
||||
lastTimeCpt := int64(0)
|
||||
now := time.Now()
|
||||
for {
|
||||
var entry LogEntry
|
||||
var filter *Filter
|
||||
|
||||
// decode entry
|
||||
err := logDec.Decode(&entry)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
malformedEntries++
|
||||
continue
|
||||
}
|
||||
|
||||
// retrieve related stream & filter
|
||||
if entry.Stream == "" && entry.Filter == "" {
|
||||
sf, ok := readSF2int[entry.SF]
|
||||
if !ok {
|
||||
discardedEntries[SF{"", ""}]++
|
||||
continue
|
||||
}
|
||||
entry.Stream = sf.S
|
||||
entry.Filter = sf.F
|
||||
}
|
||||
if stream := c.Streams[entry.Stream]; stream != nil {
|
||||
filter = stream.Filters[entry.Filter]
|
||||
}
|
||||
if filter == nil {
|
||||
discardedEntries[SF{entry.Stream, entry.Filter}]++
|
||||
continue
|
||||
}
|
||||
if entry.SF != 0 {
|
||||
readSF2int[entry.SF] = SF{entry.Stream, entry.Filter}
|
||||
}
|
||||
|
||||
// check if number of patterns is in sync
|
||||
if len(entry.Pattern.Split()) != len(filter.Pattern) {
|
||||
continue
|
||||
}
|
||||
|
||||
// check if it hasn't been flushed
|
||||
lastGlobalFlush := flushes[&PSF{entry.Pattern, "", ""}].Unix()
|
||||
lastLocalFlush := flushes[&PSF{entry.Pattern, entry.Stream, entry.Filter}].Unix()
|
||||
entryTime := entry.T.Unix()
|
||||
if lastLocalFlush > entryTime || lastGlobalFlush > entryTime {
|
||||
continue
|
||||
}
|
||||
|
||||
// restore time
|
||||
if entry.T.IsZero() {
|
||||
entry.T = time.Unix(entry.S, lastTimeCpt)
|
||||
}
|
||||
lastTimeCpt++
|
||||
|
||||
// store matches
|
||||
if !entry.Exec && entry.T.Add(filter.retryDuration).Unix() > now.Unix() {
|
||||
if startup {
|
||||
startupMatchesC <- PFT{entry.Pattern, filter, entry.T}
|
||||
}
|
||||
|
||||
encodeOrFatal(logEnc, entry, writeSF2int, &writeCounter)
|
||||
}
|
||||
|
||||
// replay executions
|
||||
if entry.Exec && entry.T.Add(*filter.longuestActionDuration).Unix() > now.Unix() {
|
||||
if startup {
|
||||
flushToMatchesC <- PSF{entry.Pattern, entry.Stream, entry.Filter}
|
||||
filter.sendActions(entry.Pattern, entry.T)
|
||||
}
|
||||
|
||||
encodeOrFatal(logEnc, entry, writeSF2int, &writeCounter)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func encodeOrFatal(enc *gob.Encoder, entry LogEntry, writeSF2int map[SF]int, writeCounter *int) {
|
||||
// Stream/Filter reduction
|
||||
sf, ok := writeSF2int[SF{entry.Stream, entry.Filter}]
|
||||
if ok {
|
||||
entry.SF = sf
|
||||
entry.Stream = ""
|
||||
entry.Filter = ""
|
||||
} else {
|
||||
entry.SF = *writeCounter
|
||||
writeSF2int[SF{entry.Stream, entry.Filter}] = *writeCounter
|
||||
*writeCounter++
|
||||
}
|
||||
// Time reduction
|
||||
if !entry.T.IsZero() {
|
||||
entry.S = entry.T.Unix()
|
||||
entry.T = time.Time{}
|
||||
}
|
||||
err := enc.Encode(entry)
|
||||
if err != nil {
|
||||
logger.Fatalln("Failed to write to new DB:", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,81 +0,0 @@
|
|||
package app
|
||||
|
||||
import (
|
||||
"encoding/gob"
|
||||
"errors"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"framagit.org/ppom/reaction/logger"
|
||||
)
|
||||
|
||||
func createOpenSocket() net.Listener {
|
||||
err := os.MkdirAll(path.Dir(*SocketPath), 0755)
|
||||
if err != nil {
|
||||
logger.Fatalln("Failed to create socket directory")
|
||||
}
|
||||
_, err = os.Stat(*SocketPath)
|
||||
if err == nil {
|
||||
logger.Println(logger.WARN, "socket", SocketPath, "already exists: Is the daemon already running? Deleting.")
|
||||
err = os.Remove(*SocketPath)
|
||||
if err != nil {
|
||||
logger.Fatalln("Failed to remove socket:", err)
|
||||
}
|
||||
}
|
||||
ln, err := net.Listen("unix", *SocketPath)
|
||||
if err != nil {
|
||||
logger.Fatalln("Failed to create socket:", err)
|
||||
}
|
||||
return ln
|
||||
}
|
||||
|
||||
// Handle connections
|
||||
//func SocketManager(streams map[string]*Stream) {
|
||||
func SocketManager(conf *Conf) {
|
||||
ln := createOpenSocket()
|
||||
defer ln.Close()
|
||||
for {
|
||||
conn, err := ln.Accept()
|
||||
if err != nil {
|
||||
logger.Println(logger.ERROR, "Failed to open connection from cli:", err)
|
||||
continue
|
||||
}
|
||||
go func(conn net.Conn) {
|
||||
defer conn.Close()
|
||||
var request Request
|
||||
var response Response
|
||||
|
||||
err := gob.NewDecoder(conn).Decode(&request)
|
||||
if err != nil {
|
||||
logger.Println(logger.ERROR, "Invalid Message from cli:", err)
|
||||
return
|
||||
}
|
||||
|
||||
switch request.Request {
|
||||
case Info:
|
||||
// response.Config = *conf
|
||||
response.Matches = matches
|
||||
response.Actions = actions
|
||||
case Flush:
|
||||
le := LogEntry{time.Now(), 0, request.Flush.P, request.Flush.S, request.Flush.F, 0, false}
|
||||
|
||||
flushToMatchesC <- request.Flush
|
||||
flushToActionsC <- request.Flush
|
||||
flushToDatabaseC <- le
|
||||
|
||||
default:
|
||||
logger.Println(logger.ERROR, "Invalid Message from cli: unrecognised command type")
|
||||
response.Err = errors.New("unrecognised command type")
|
||||
return
|
||||
}
|
||||
|
||||
err = gob.NewEncoder(conn).Encode(response)
|
||||
if err != nil {
|
||||
logger.Println(logger.ERROR, "Can't respond to cli:", err)
|
||||
return
|
||||
}
|
||||
}(conn)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,178 +0,0 @@
|
|||
package app
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"framagit.org/ppom/reaction/logger"
|
||||
|
||||
"github.com/google/go-jsonnet"
|
||||
)
|
||||
|
||||
func (c *Conf) setup() {
|
||||
if c.Concurrency == 0 {
|
||||
c.Concurrency = runtime.NumCPU()
|
||||
}
|
||||
|
||||
// Assure we iterate through c.Patterns map in reproductible order
|
||||
sortedPatternNames := make([]string, 0, len(c.Patterns))
|
||||
for k := range c.Patterns {
|
||||
sortedPatternNames = append(sortedPatternNames, k)
|
||||
}
|
||||
slices.Sort(sortedPatternNames)
|
||||
|
||||
for _, patternName := range sortedPatternNames {
|
||||
pattern := c.Patterns[patternName]
|
||||
pattern.Name = patternName
|
||||
pattern.nameWithBraces = fmt.Sprintf("<%s>", pattern.Name)
|
||||
|
||||
if pattern.Regex == "" {
|
||||
logger.Fatalf("Bad configuration: pattern's regex %v is empty!", patternName)
|
||||
}
|
||||
|
||||
compiled, err := regexp.Compile(fmt.Sprintf("^%v$", pattern.Regex))
|
||||
if err != nil {
|
||||
logger.Fatalf("Bad configuration: pattern %v: %v", patternName, err)
|
||||
}
|
||||
pattern.Regex = fmt.Sprintf("(?P<%s>%s)", patternName, pattern.Regex)
|
||||
for _, ignore := range pattern.Ignore {
|
||||
if !compiled.MatchString(ignore) {
|
||||
logger.Fatalf("Bad configuration: pattern ignore '%v' doesn't match pattern %v! It should be fixed or removed.", ignore, pattern.nameWithBraces)
|
||||
}
|
||||
}
|
||||
|
||||
// Compile ignore regexes
|
||||
for _, regex := range pattern.IgnoreRegex {
|
||||
// Enclose the regex to make sure that it matches the whole detected string
|
||||
compiledRegex, err := regexp.Compile("^" + regex + "$")
|
||||
if err != nil {
|
||||
logger.Fatalf("Bad configuration: in ignoreregex of pattern %s: %v", pattern.Name, err)
|
||||
}
|
||||
|
||||
pattern.compiledIgnoreRegex = append(pattern.compiledIgnoreRegex, *compiledRegex)
|
||||
}
|
||||
}
|
||||
|
||||
if len(c.Streams) == 0 {
|
||||
logger.Fatalln("Bad configuration: no streams configured!")
|
||||
}
|
||||
for streamName := range c.Streams {
|
||||
|
||||
stream := c.Streams[streamName]
|
||||
stream.Name = streamName
|
||||
|
||||
if strings.Contains(stream.Name, ".") {
|
||||
logger.Fatalf("Bad configuration: character '.' is not allowed in stream names: '%v'", stream.Name)
|
||||
}
|
||||
|
||||
if len(stream.Filters) == 0 {
|
||||
logger.Fatalf("Bad configuration: no filters configured in %v", stream.Name)
|
||||
}
|
||||
for filterName := range stream.Filters {
|
||||
|
||||
filter := stream.Filters[filterName]
|
||||
filter.Stream = stream
|
||||
filter.Name = filterName
|
||||
|
||||
if strings.Contains(filter.Name, ".") {
|
||||
logger.Fatalf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.Name)
|
||||
}
|
||||
// Parse Duration
|
||||
if filter.RetryPeriod == "" {
|
||||
if filter.Retry > 1 {
|
||||
logger.Fatalf("Bad configuration: retry but no retryperiod in %v.%v", stream.Name, filter.Name)
|
||||
}
|
||||
} else {
|
||||
retryDuration, err := time.ParseDuration(filter.RetryPeriod)
|
||||
if err != nil {
|
||||
logger.Fatalf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.Name, filter.Name, err)
|
||||
}
|
||||
filter.retryDuration = retryDuration
|
||||
}
|
||||
|
||||
if len(filter.Regex) == 0 {
|
||||
logger.Fatalf("Bad configuration: no regexes configured in %v.%v", stream.Name, filter.Name)
|
||||
}
|
||||
// Compute Regexes
|
||||
// Look for Patterns inside Regexes
|
||||
for _, regex := range filter.Regex {
|
||||
// iterate through patterns in reproductible order
|
||||
for _, patternName := range sortedPatternNames {
|
||||
pattern := c.Patterns[patternName]
|
||||
if strings.Contains(regex, pattern.nameWithBraces) {
|
||||
if !slices.Contains(filter.Pattern, pattern) {
|
||||
filter.Pattern = append(filter.Pattern, pattern)
|
||||
}
|
||||
regex = strings.Replace(regex, pattern.nameWithBraces, pattern.Regex, 1)
|
||||
}
|
||||
}
|
||||
compiledRegex, err := regexp.Compile(regex)
|
||||
if err != nil {
|
||||
logger.Fatalf("Bad configuration: regex of filter %s.%s: %v", stream.Name, filter.Name, err)
|
||||
}
|
||||
filter.compiledRegex = append(filter.compiledRegex, *compiledRegex)
|
||||
}
|
||||
|
||||
if len(filter.Actions) == 0 {
|
||||
logger.Fatalln("Bad configuration: no actions configured in", stream.Name, ".", filter.Name)
|
||||
}
|
||||
for actionName := range filter.Actions {
|
||||
|
||||
action := filter.Actions[actionName]
|
||||
action.Filter = filter
|
||||
action.Name = actionName
|
||||
|
||||
if strings.Contains(action.Name, ".") {
|
||||
logger.Fatalln("Bad configuration: character '.' is not allowed in action names", action.Name)
|
||||
}
|
||||
// Parse Duration
|
||||
if action.After != "" {
|
||||
afterDuration, err := time.ParseDuration(action.After)
|
||||
if err != nil {
|
||||
logger.Fatalln("Bad configuration: Failed to parse after time in ", stream.Name, ".", filter.Name, ".", action.Name, ":", err)
|
||||
}
|
||||
action.afterDuration = afterDuration
|
||||
} else if action.OnExit {
|
||||
logger.Fatalln("Bad configuration: Cannot have `onexit: true` without an `after` directive in", stream.Name, ".", filter.Name, ".", action.Name)
|
||||
}
|
||||
if filter.longuestActionDuration == nil || filter.longuestActionDuration.Milliseconds() < action.afterDuration.Milliseconds() {
|
||||
filter.longuestActionDuration = &action.afterDuration
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func parseConf(filename string) *Conf {
|
||||
|
||||
data, err := os.Open(filename)
|
||||
if err != nil {
|
||||
logger.Fatalln("Failed to read configuration file:", err)
|
||||
}
|
||||
|
||||
var conf Conf
|
||||
if filename[len(filename)-4:] == ".yml" || filename[len(filename)-5:] == ".yaml" {
|
||||
err = jsonnet.NewYAMLToJSONDecoder(data).Decode(&conf)
|
||||
if err != nil {
|
||||
logger.Fatalln("Failed to parse yaml configuration file:", err)
|
||||
}
|
||||
} else {
|
||||
var jsondata string
|
||||
jsondata, err = jsonnet.MakeVM().EvaluateFile(filename)
|
||||
if err == nil {
|
||||
err = json.Unmarshal([]byte(jsondata), &conf)
|
||||
}
|
||||
if err != nil {
|
||||
logger.Fatalln("Failed to parse json configuration file:", err)
|
||||
}
|
||||
}
|
||||
|
||||
conf.setup()
|
||||
return &conf
|
||||
}
|
||||
|
|
@ -1,200 +0,0 @@
|
|||
package app
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"fmt"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Conf struct {
|
||||
Concurrency int `json:"concurrency"`
|
||||
Patterns map[string]*Pattern `json:"patterns"`
|
||||
Streams map[string]*Stream `json:"streams"`
|
||||
Start [][]string `json:"start"`
|
||||
Stop [][]string `json:"stop"`
|
||||
}
|
||||
|
||||
type Pattern struct {
|
||||
Regex string `json:"regex"`
|
||||
Ignore []string `json:"ignore"`
|
||||
|
||||
IgnoreRegex []string `json:"ignoreregex"`
|
||||
compiledIgnoreRegex []regexp.Regexp `json:"-"`
|
||||
|
||||
Name string `json:"-"`
|
||||
nameWithBraces string `json:"-"`
|
||||
}
|
||||
|
||||
// Stream, Filter & Action structures must never be copied.
|
||||
// They're always referenced through pointers
|
||||
|
||||
type Stream struct {
|
||||
Name string `json:"-"`
|
||||
|
||||
Cmd []string `json:"cmd"`
|
||||
Filters map[string]*Filter `json:"filters"`
|
||||
}
|
||||
type LilStream struct {
|
||||
Name string
|
||||
}
|
||||
|
||||
func (s *Stream) GobEncode() ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
enc := gob.NewEncoder(&buf)
|
||||
err := enc.Encode(LilStream{s.Name})
|
||||
return buf.Bytes(), err
|
||||
}
|
||||
|
||||
func (s *Stream) GobDecode(b []byte)(error) {
|
||||
var ls LilStream
|
||||
dec := gob.NewDecoder(bytes.NewReader(b))
|
||||
err := dec.Decode(&ls)
|
||||
s.Name = ls.Name
|
||||
return err
|
||||
}
|
||||
|
||||
type Filter struct {
|
||||
Stream *Stream `json:"-"`
|
||||
Name string `json:"-"`
|
||||
|
||||
Regex []string `json:"regex"`
|
||||
compiledRegex []regexp.Regexp `json:"-"`
|
||||
Pattern []*Pattern `json:"-"`
|
||||
|
||||
Retry int `json:"retry"`
|
||||
RetryPeriod string `json:"retryperiod"`
|
||||
retryDuration time.Duration `json:"-"`
|
||||
|
||||
Actions map[string]*Action `json:"actions"`
|
||||
longuestActionDuration *time.Duration
|
||||
}
|
||||
|
||||
// those small versions are needed to prevent infinite recursion in gob because of
|
||||
// data cycles: Stream <-> Filter, Filter <-> Action
|
||||
type LilFilter struct {
|
||||
Stream *Stream
|
||||
Name string
|
||||
Pattern []*Pattern
|
||||
}
|
||||
|
||||
func (f *Filter) GobDecode(b []byte)(error) {
|
||||
var lf LilFilter
|
||||
dec := gob.NewDecoder(bytes.NewReader(b))
|
||||
err := dec.Decode(&lf)
|
||||
f.Stream = lf.Stream
|
||||
f.Name = lf.Name
|
||||
f.Pattern = lf.Pattern
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *Filter) GobEncode() ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
enc := gob.NewEncoder(&buf)
|
||||
err := enc.Encode(LilFilter{f.Stream, f.Name, f.Pattern})
|
||||
return buf.Bytes(), err
|
||||
}
|
||||
|
||||
type Action struct {
|
||||
Filter *Filter `json:"-"`
|
||||
Name string `json:"-"`
|
||||
|
||||
Cmd []string `json:"cmd"`
|
||||
|
||||
After string `json:"after"`
|
||||
afterDuration time.Duration `json:"-"`
|
||||
|
||||
OnExit bool `json:"onexit"`
|
||||
}
|
||||
type LilAction struct {
|
||||
Filter *Filter
|
||||
Name string
|
||||
}
|
||||
|
||||
func (a *Action) GobEncode() ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
enc := gob.NewEncoder(&buf)
|
||||
err := enc.Encode(LilAction{a.Filter, a.Name})
|
||||
return buf.Bytes(), err
|
||||
}
|
||||
|
||||
func (a *Action) GobDecode(b []byte)(error) {
|
||||
var la LilAction
|
||||
dec := gob.NewDecoder(bytes.NewReader(b))
|
||||
err := dec.Decode(&la)
|
||||
a.Filter = la.Filter
|
||||
a.Name = la.Name
|
||||
return err
|
||||
}
|
||||
|
||||
type LogEntry struct {
|
||||
T time.Time
|
||||
S int64
|
||||
Pattern Match
|
||||
Stream, Filter string
|
||||
SF int
|
||||
Exec bool
|
||||
}
|
||||
|
||||
type ReadDB struct {
|
||||
file *os.File
|
||||
dec *gob.Decoder
|
||||
}
|
||||
|
||||
type WriteDB struct {
|
||||
file *os.File
|
||||
enc *gob.Encoder
|
||||
}
|
||||
|
||||
type MatchesMap map[PF]map[time.Time]struct{}
|
||||
type ActionsMap map[PA]map[time.Time]struct{}
|
||||
|
||||
// This is a "\x00" Joined string
|
||||
// which contains all matches on a line.
|
||||
type Match string
|
||||
|
||||
func (m *Match) Split() []string {
|
||||
return strings.Split(string(*m), "\x00")
|
||||
}
|
||||
func JoinMatch(mm []string) Match {
|
||||
return Match(strings.Join(mm, "\x00"))
|
||||
}
|
||||
func WithBrackets(mm []string) string {
|
||||
var b strings.Builder
|
||||
for _, match := range mm {
|
||||
fmt.Fprintf(&b, "[%s]", match)
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// Helper structs made to carry information
|
||||
// Stream, Filter
|
||||
type SF struct{ S, F string }
|
||||
|
||||
// Pattern, Stream, Filter
|
||||
type PSF struct {
|
||||
P Match
|
||||
S, F string
|
||||
}
|
||||
|
||||
type PF struct {
|
||||
P Match
|
||||
F *Filter
|
||||
}
|
||||
type PFT struct {
|
||||
P Match
|
||||
F *Filter
|
||||
T time.Time
|
||||
}
|
||||
type PA struct {
|
||||
P Match
|
||||
A *Action
|
||||
}
|
||||
type PAT struct {
|
||||
P Match
|
||||
A *Action
|
||||
T time.Time
|
||||
}
|
||||
|
|
@ -1,10 +0,0 @@
|
|||
module framagit.org/ppom/reaction
|
||||
|
||||
go 1.21
|
||||
|
||||
require (
|
||||
github.com/google/go-jsonnet v0.20.0
|
||||
sigs.k8s.io/yaml v1.1.0
|
||||
)
|
||||
|
||||
require gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
|
|
@ -1,9 +0,0 @@
|
|||
github.com/google/go-jsonnet v0.20.0 h1:WG4TTSARuV7bSm4PMB4ohjxe33IHT5WVTrJSU33uT4g=
|
||||
github.com/google/go-jsonnet v0.20.0/go.mod h1:VbgWF9JX7ztlv770x/TolZNGGFfiHEVx9G6ca2eUmeA=
|
||||
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||
sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs=
|
||||
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
|
||||
|
|
@ -1,80 +0,0 @@
|
|||
package logger
|
||||
|
||||
import "log"
|
||||
|
||||
type Level int
|
||||
|
||||
const (
|
||||
UNKNOWN = Level(-1)
|
||||
DEBUG = Level(1)
|
||||
INFO = Level(2)
|
||||
WARN = Level(3)
|
||||
ERROR = Level(4)
|
||||
FATAL = Level(5)
|
||||
)
|
||||
|
||||
func (l Level) String() string {
|
||||
switch l {
|
||||
case DEBUG:
|
||||
return "DEBUG "
|
||||
case INFO:
|
||||
return "INFO "
|
||||
case WARN:
|
||||
return "WARN "
|
||||
case ERROR:
|
||||
return "ERROR "
|
||||
case FATAL:
|
||||
return "FATAL "
|
||||
default:
|
||||
return "????? "
|
||||
}
|
||||
}
|
||||
|
||||
func FromString(s string) Level {
|
||||
switch s {
|
||||
case "DEBUG":
|
||||
return DEBUG
|
||||
case "INFO":
|
||||
return INFO
|
||||
case "WARN":
|
||||
return WARN
|
||||
case "ERROR":
|
||||
return ERROR
|
||||
case "FATAL":
|
||||
return FATAL
|
||||
default:
|
||||
return UNKNOWN
|
||||
}
|
||||
}
|
||||
|
||||
var LogLevel Level = 2
|
||||
|
||||
func SetLogLevel(level Level) {
|
||||
LogLevel = level
|
||||
}
|
||||
|
||||
func Println(level Level, args ...any) {
|
||||
if level >= LogLevel {
|
||||
newargs := make([]any, 0)
|
||||
newargs = append(newargs, level)
|
||||
newargs = append(newargs, args...)
|
||||
log.Println(newargs...)
|
||||
}
|
||||
}
|
||||
|
||||
func Printf(level Level, format string, args ...any) {
|
||||
if level >= LogLevel {
|
||||
log.Printf(level.String()+format, args...)
|
||||
}
|
||||
}
|
||||
|
||||
func Fatalln(args ...any) {
|
||||
newargs := make([]any, 0)
|
||||
newargs = append(newargs, FATAL)
|
||||
newargs = append(newargs, args...)
|
||||
log.Fatalln(newargs...)
|
||||
}
|
||||
|
||||
func Fatalf(format string, args ...any) {
|
||||
log.Fatalf(FATAL.String()+format, args...)
|
||||
}
|
||||
|
|
@ -1,13 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"framagit.org/ppom/reaction/app"
|
||||
)
|
||||
|
||||
func main() {
|
||||
app.Main(version)
|
||||
}
|
||||
|
||||
var (
|
||||
version = "v1.4.2"
|
||||
)
|
||||
Loading…
Add table
Add a link
Reference in a new issue