reaction/app/daemon.go

448 lines
9.8 KiB
Go
Raw Permalink Normal View History

2023-03-25 18:27:01 +01:00
package app
2023-03-19 23:10:18 +01:00
import (
2023-03-20 23:25:57 +01:00
"bufio"
2023-03-24 17:52:00 +01:00
"os"
2023-03-19 23:10:18 +01:00
"os/exec"
2023-04-27 10:42:19 +02:00
"os/signal"
2023-03-24 17:36:41 +01:00
"strings"
"sync"
2023-10-12 12:00:00 +02:00
"syscall"
2023-03-24 17:36:41 +01:00
"time"
2023-10-12 12:00:00 +02:00
"framagit.org/ppom/reaction/logger"
2023-03-19 23:10:18 +01:00
)
2023-03-24 17:36:41 +01:00
// Executes a command and channel-send its stdout
func cmdStdout(commandline []string) chan *string {
lines := make(chan *string)
2023-03-19 23:10:18 +01:00
2023-03-24 00:27:51 +01:00
go func() {
cmd := exec.Command(commandline[0], commandline[1:]...)
stdout, err := cmd.StdoutPipe()
if err != nil {
2023-10-12 12:00:00 +02:00
logger.Fatalln("couldn't open stdout on command:", err)
}
2023-03-24 00:27:51 +01:00
if err := cmd.Start(); err != nil {
2023-10-12 12:00:00 +02:00
logger.Fatalln("couldn't start command:", err)
2023-03-24 00:27:51 +01:00
}
defer stdout.Close()
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
line := scanner.Text()
lines <- &line
2023-10-12 12:00:00 +02:00
logger.Println(logger.DEBUG, "stdout:", line)
2023-03-24 00:27:51 +01:00
}
close(lines)
}()
return lines
2023-03-19 23:10:18 +01:00
}
2023-11-23 12:00:00 +01:00
func runCommands(commands [][]string, moment string) bool {
ok := true
for _, command := range commands {
cmd := exec.Command(command[0], command[1:]...)
cmd.WaitDelay = time.Minute
logger.Printf(logger.INFO, "%v command: run %v\n", moment, command)
if err := cmd.Start(); err != nil {
logger.Printf(logger.ERROR, "%v command: run %v: %v", moment, command, err)
2023-11-23 12:00:00 +01:00
ok = false
} else {
err := cmd.Wait()
if err != nil {
logger.Printf(logger.ERROR, "%v command: run %v: %v", moment, command, err)
2023-11-23 12:00:00 +01:00
ok = false
}
}
}
2023-11-23 12:00:00 +01:00
return ok
}
2023-08-21 23:33:56 +02:00
func (p *Pattern) notAnIgnore(match *string) bool {
for _, regex := range p.compiledIgnoreRegex {
if regex.MatchString(*match) {
return false
}
}
2023-08-21 23:33:56 +02:00
for _, ignore := range p.Ignore {
if ignore == *match {
return false
}
}
return true
}
2023-03-24 17:36:41 +01:00
// Whether one of the filter's regexes is matched on a line
func (f *Filter) match(line *string) Match {
2023-03-24 00:27:51 +01:00
for _, regex := range f.compiledRegex {
2023-03-24 17:36:41 +01:00
if matches := regex.FindStringSubmatch(*line); matches != nil {
2024-04-28 12:45:47 +02:00
if f.Pattern != nil {
2024-04-09 12:00:00 +02:00
var result []string
2024-04-28 12:45:47 +02:00
for _, p := range f.Pattern {
match := matches[regex.SubexpIndex(p.Name)]
2024-04-09 12:00:00 +02:00
if p.notAnIgnore(&match) {
result = append(result, match)
}
}
2024-04-28 12:45:47 +02:00
if len(result) == len(f.Pattern) {
logger.Printf(logger.INFO, "%s.%s: match %s", f.Stream.Name, f.Name, WithBrackets(result))
return JoinMatch(result)
2024-04-09 12:00:00 +02:00
}
} else {
logger.Printf(logger.INFO, "%s.%s: match [.]\n", f.Stream.Name, f.Name)
// No pattern, so this match will never actually be used
2024-04-09 12:00:00 +02:00
return "."
2023-08-21 23:33:56 +02:00
}
2023-03-24 00:27:51 +01:00
}
2023-03-20 23:25:57 +01:00
}
2024-04-09 12:00:00 +02:00
return ""
2023-03-20 23:25:57 +01:00
}
func (f *Filter) sendActions(match Match, at time.Time) {
2023-03-24 00:27:51 +01:00
for _, a := range f.Actions {
actionsC <- PAT{match, a, at.Add(a.afterDuration)}
2023-03-19 23:10:18 +01:00
}
2023-03-24 00:27:51 +01:00
}
2023-03-20 23:25:57 +01:00
func (a *Action) exec(match Match) {
defer wgActions.Done()
var computedCommand []string
if a.Filter.Pattern != nil {
computedCommand = make([]string, 0, len(a.Cmd))
matches := match.Split()
for _, item := range a.Cmd {
for i, p := range a.Filter.Pattern {
item = strings.ReplaceAll(item, p.nameWithBraces, matches[i])
}
computedCommand = append(computedCommand, item)
}
} else {
computedCommand = a.Cmd
}
2023-03-24 17:36:41 +01:00
logger.Printf(logger.INFO, "%s.%s.%s: run %s\n", a.Filter.Stream.Name, a.Filter.Name, a.Name, computedCommand)
2023-03-24 17:36:41 +01:00
cmd := exec.Command(computedCommand[0], computedCommand[1:]...)
2023-03-24 18:06:57 +01:00
if ret := cmd.Run(); ret != nil {
logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.Filter.Stream.Name, a.Filter.Name, a.Name, computedCommand, ret)
}
2023-03-19 23:10:18 +01:00
}
func ActionsManager(concurrency int) {
// concurrency init
execActionsC := make(chan PA)
if concurrency > 0 {
for i := 0; i < concurrency; i++ {
go func() {
var pa PA
for {
pa = <-execActionsC
pa.A.exec(pa.P)
}
}()
}
} else {
go func() {
var pa PA
for {
pa = <-execActionsC
go func(pa PA) {
pa.A.exec(pa.P)
}(pa)
}
}()
}
execAction := func(a *Action, p Match) {
wgActions.Add(1)
execActionsC <- PA{p, a}
}
// main
pendingActionsC := make(chan PAT)
for {
select {
2023-10-01 12:00:00 +02:00
case pat := <-actionsC:
pa := PA{pat.P, pat.A}
pattern, action, then := pat.P, pat.A, pat.T
2023-10-01 12:00:00 +02:00
now := time.Now()
// check if must be executed now
2023-09-23 00:14:20 +02:00
if then.Compare(now) <= 0 {
execAction(action, pattern)
} else {
if actions[pa] == nil {
actions[pa] = make(map[time.Time]struct{})
2023-10-01 12:00:00 +02:00
}
actions[pa][then] = struct{}{}
2023-10-01 12:00:00 +02:00
go func(insidePat PAT, insideNow time.Time) {
time.Sleep(insidePat.T.Sub(insideNow))
2023-10-01 12:00:00 +02:00
pendingActionsC <- insidePat
2023-09-23 00:14:20 +02:00
}(pat, now)
}
2023-10-01 12:00:00 +02:00
case pat := <-pendingActionsC:
pa := PA{pat.P, pat.A}
pattern, action, then := pat.P, pat.A, pat.T
if actions[pa] != nil {
delete(actions[pa], then)
execAction(action, pattern)
2023-10-01 12:00:00 +02:00
}
case fo := <-flushToActionsC:
for pa := range actions {
if fo.S == pa.A.Filter.Stream.Name &&
fo.F == pa.A.Filter.Name &&
fo.P == pa.P {
2023-10-01 12:00:00 +02:00
for range actions[pa] {
execAction(pa.A, pa.P)
2023-10-01 12:00:00 +02:00
}
delete(actions, pa)
break
2023-10-01 12:00:00 +02:00
}
}
case _, _ = <-stopActions:
for pa := range actions {
if pa.A.OnExit {
for range actions[pa] {
execAction(pa.A, pa.P)
}
}
}
wgActions.Done()
return
}
}
}
func MatchesManager() {
var fo PSF
var pft PFT
end := false
for !end {
select {
2023-10-01 12:00:00 +02:00
case fo = <-flushToMatchesC:
matchesManagerHandleFlush(fo)
case fo, ok := <-startupMatchesC:
if !ok {
end = true
} else {
2023-10-01 12:00:00 +02:00
_ = matchesManagerHandleMatch(fo)
}
2023-03-25 19:12:11 +01:00
}
}
for {
select {
2023-10-01 12:00:00 +02:00
case fo = <-flushToMatchesC:
matchesManagerHandleFlush(fo)
case pft = <-matchesC:
2023-04-26 17:18:55 +02:00
entry := LogEntry{pft.T, 0, pft.P, pft.F.Stream.Name, pft.F.Name, 0, false}
2023-03-25 19:12:11 +01:00
entry.Exec = matchesManagerHandleMatch(pft)
2023-03-25 19:12:11 +01:00
logsC <- entry
}
}
}
2023-04-26 17:18:55 +02:00
func matchesManagerHandleFlush(fo PSF) {
2023-10-01 12:00:00 +02:00
matchesLock.Lock()
for pf := range matches {
if fo.S == pf.F.Stream.Name &&
fo.F == pf.F.Name &&
fo.P == pf.P {
2023-10-01 12:00:00 +02:00
delete(matches, pf)
break
2023-10-01 12:00:00 +02:00
}
}
matchesLock.Unlock()
}
func matchesManagerHandleMatch(pft PFT) bool {
2023-10-01 12:00:00 +02:00
matchesLock.Lock()
defer matchesLock.Unlock()
filter, patterns, then := pft.F, pft.P, pft.T
pf := PF{pft.P, pft.F}
if filter.Retry > 1 {
// make sure map exists
if matches[pf] == nil {
matches[pf] = make(map[time.Time]struct{})
}
// add new match
matches[pf][then] = struct{}{}
// remove match when expired
go func(pf PF, then time.Time) {
2023-10-01 12:00:00 +02:00
time.Sleep(then.Sub(time.Now()) + filter.retryDuration)
matchesLock.Lock()
if matches[pf] != nil {
// FIXME replace this and all similar occurences
// by clear() when switching to go 1.21
delete(matches[pf], then)
}
matchesLock.Unlock()
}(pf, then)
}
if filter.Retry <= 1 || len(matches[pf]) >= filter.Retry {
delete(matches, pf)
filter.sendActions(patterns, then)
return true
}
return false
}
func StreamManager(s *Stream, endedSignal chan *Stream) {
defer wgStreams.Done()
logger.Printf(logger.INFO, "%s: start %s\n", s.Name, s.Cmd)
2023-03-24 00:27:51 +01:00
lines := cmdStdout(s.Cmd)
for {
select {
case line, ok := <-lines:
if !ok {
endedSignal <- s
return
}
for _, filter := range s.Filters {
2024-04-09 12:00:00 +02:00
if match := filter.match(line); match != "" {
matchesC <- PFT{match, filter, time.Now()}
}
}
case _, _ = <-stopStreams:
return
}
}
2023-03-19 23:10:18 +01:00
}
var actions ActionsMap
var matches MatchesMap
var matchesLock sync.Mutex
var stopStreams chan bool
var stopActions chan bool
var wgActions sync.WaitGroup
var wgStreams sync.WaitGroup
2023-09-25 20:42:42 +02:00
/*
2023-10-01 12:00:00 +02:00
<StreamCmds>
2023-09-25 20:42:42 +02:00
StreamManager onstartup:matches
matches MatchesManager logs DatabaseManager ·
actions ActionsManager
SocketManager flushes··
2023-10-01 12:00:00 +02:00
<Clients>
2023-09-25 20:42:42 +02:00
*/
// DatabaseManager → MatchesManager
var startupMatchesC chan PFT
// StreamManager → MatchesManager
var matchesC chan PFT
2023-09-25 20:42:42 +02:00
// MatchesManager → DatabaseManager
var logsC chan LogEntry
2023-09-25 20:42:42 +02:00
// MatchesManager → ActionsManager
var actionsC chan PAT
2023-04-26 17:18:55 +02:00
2023-09-25 20:42:42 +02:00
// SocketManager, DatabaseManager → MatchesManager
var flushToMatchesC chan PSF
2023-09-25 20:42:42 +02:00
// SocketManager → ActionsManager
var flushToActionsC chan PSF
2023-09-25 20:42:42 +02:00
// SocketManager → DatabaseManager
var flushToDatabaseC chan LogEntry
func Daemon(confFilename string) {
conf := parseConf(confFilename)
2023-09-06 02:00:33 +02:00
startupMatchesC = make(chan PFT)
2023-09-25 20:42:42 +02:00
matchesC = make(chan PFT)
logsC = make(chan LogEntry)
actionsC = make(chan PAT)
flushToMatchesC = make(chan PSF)
flushToActionsC = make(chan PSF)
2023-09-25 20:42:42 +02:00
flushToDatabaseC = make(chan LogEntry)
stopActions = make(chan bool)
stopStreams = make(chan bool)
actions = make(ActionsMap)
matches = make(MatchesMap)
2023-11-23 12:00:00 +01:00
_ = runCommands(conf.Start, "start")
2023-09-09 20:42:47 +02:00
go DatabaseManager(conf)
go MatchesManager()
go ActionsManager(conf.Concurrency)
2023-05-01 18:21:31 +02:00
// Ready to start
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
2023-04-26 17:18:55 +02:00
endSignals := make(chan *Stream)
nbStreamsInExecution := len(conf.Streams)
2023-03-24 00:27:51 +01:00
for _, stream := range conf.Streams {
wgStreams.Add(1)
go StreamManager(stream, endSignals)
}
go SocketManager(conf)
2023-05-03 20:03:22 +02:00
2023-04-27 10:42:19 +02:00
for {
select {
case finishedStream := <-endSignals:
logger.Printf(logger.ERROR, "%s stream finished", finishedStream.Name)
nbStreamsInExecution--
if nbStreamsInExecution == 0 {
2023-11-23 12:00:00 +01:00
quit(conf, false)
2023-04-27 10:42:19 +02:00
}
case <-sigs:
2023-10-12 12:00:00 +02:00
logger.Printf(logger.INFO, "Received SIGINT/SIGTERM, exiting")
2023-11-23 12:00:00 +01:00
quit(conf, true)
2023-04-27 10:42:19 +02:00
}
2023-03-24 00:27:51 +01:00
}
2023-04-27 10:42:19 +02:00
}
2023-11-23 12:00:00 +01:00
func quit(conf *Conf, graceful bool) {
// send stop to StreamManager·s
close(stopStreams)
2023-10-12 12:00:00 +02:00
logger.Println(logger.INFO, "Waiting for Streams to finish...")
wgStreams.Wait()
// ActionsManager calls wgActions.Done() when it has launched all pending actions
wgActions.Add(1)
// send stop to ActionsManager
close(stopActions)
// stop all actions
2023-10-12 12:00:00 +02:00
logger.Println(logger.INFO, "Waiting for Actions to finish...")
wgActions.Wait()
// run stop commands
2023-11-23 12:00:00 +01:00
stopOk := runCommands(conf.Stop, "stop")
2023-05-03 20:03:22 +02:00
// delete pipe
err := os.Remove(*SocketPath)
2023-05-05 15:33:00 +02:00
if err != nil {
2023-10-12 12:00:00 +02:00
logger.Println(logger.ERROR, "Failed to remove socket:", err)
2023-05-05 15:33:00 +02:00
}
2023-11-23 12:00:00 +01:00
if !stopOk || !graceful {
os.Exit(1)
}
os.Exit(0)
2023-03-19 23:10:18 +01:00
}