Untested cli/daemon communication code

This commit is contained in:
ppom 2023-05-03 20:03:22 +02:00
parent 4c86f37b70
commit f3080f5293
7 changed files with 320 additions and 10 deletions

119
app/cli.go Normal file
View file

@ -0,0 +1,119 @@
package app
import (
"encoding/gob"
"errors"
"fmt"
"log"
"math/rand"
"os"
"path"
"time"
)
const (
Query = 0
Flush = 1
)
type Request struct {
Request int
Id int
Pattern string
}
type Response struct {
Err error
Actions ReadableMap
}
// Runtime files:
// /run/user/<uid>/reaction/reaction.pipe
// /run/user/<uid>/reaction/id.response
func RuntimeDirectory() string {
return fmt.Sprintf("/run/user/%v/reaction/", os.Getuid())
}
func PipePath() string {
return path.Join(RuntimeDirectory(), "reaction.pipe")
}
func (r Request) ResponsePath() string {
return path.Join(RuntimeDirectory(), string(r.Id))
}
func Send(data Request) {
pipePath := PipePath()
pipe, err := os.OpenFile(pipePath, os.O_APPEND, os.ModeNamedPipe)
if err != nil {
log.Println("Failed to open", pipePath, ":", err)
log.Fatalln("Is the reaction daemon running? Does the CLI run as the same user?")
}
log.Println("DEBUG opening ok, encoding...")
enc := gob.NewEncoder(pipe)
err = enc.Encode(data)
if err != nil {
log.Fatalf("Failed to write to %s: %s", pipePath, err)
}
}
func SendAndRetrieve(data Request) Response {
if data.Id == 0 {
data.Id = rand.Int()
}
log.Println("DEBUG sending:", data)
Send(data)
responsePath := data.ResponsePath()
d, _ := time.ParseDuration("100ms")
for tries := 20; tries > 0; tries-- {
log.Println("DEBUG waiting for answer...")
file, err := os.Open(responsePath)
if errors.Is(err, os.ErrNotExist) {
time.Sleep(d)
continue
}
defer os.Remove(responsePath)
if err != nil {
log.Fatalf("Error opening daemon answer: %s", err)
}
var response Response
err = gob.NewDecoder(file).Decode(&response)
if err != nil {
log.Fatalf("Error parsing daemon answer: %s", err)
}
return response
}
log.Fatalln("Timeout while waiting answer from the daemon")
return Response{errors.New("unreachable code"), nil}
}
func usage(err string) {
fmt.Println("Usage: reactionc")
fmt.Println("Usage: reactionc flush <PATTERN>")
log.Fatalln(err)
}
func CLI() {
if len(os.Args) <= 1 {
response := SendAndRetrieve(Request{Query, 0, ""})
if response.Err != nil {
log.Fatalln("Received error from daemon:", response.Err)
}
fmt.Println(response.Actions.ToString())
os.Exit(0)
}
switch os.Args[1] {
case "flush":
if len(os.Args) != 3 {
usage("flush takes one <PATTERN> argument")
}
response := SendAndRetrieve(Request{Flush, 0, os.Args[2]})
if response.Err != nil {
log.Fatalln("Received error from daemon:", response.Err)
}
os.Exit(0)
default:
usage("first argument must be `flush`")
}
}

174
app/pipe.go Normal file
View file

@ -0,0 +1,174 @@
package app
import (
"encoding/gob"
"errors"
"io/fs"
"log"
"os"
"sync"
"syscall"
"gopkg.in/yaml.v3"
)
type ActionMap map[string]map[*Action]map[chan bool]bool
type ReadableMap map[string]map[string]map[string]int
type ActionStore struct {
store ActionMap
mutex sync.Mutex
}
// Called by an Action before entering sleep
func (a *ActionStore) Register(action *Action, pattern string) chan bool {
a.mutex.Lock()
defer a.mutex.Unlock()
if a.store[pattern] == nil {
a.store[pattern] = make(map[*Action]map[chan bool]bool)
}
if a.store[pattern][action] == nil {
a.store[pattern][action] = make(map[chan bool]bool)
}
sig := make(chan bool)
a.store[pattern][action][sig] = true
return sig
}
// Called by an Action after sleep
func (a *ActionStore) Unregister(action *Action, pattern string, sig chan bool) {
a.mutex.Lock()
defer a.mutex.Unlock()
if a.store[pattern] == nil || a.store[pattern][action] == nil || len(a.store[pattern][action]) == 0 {
return
}
close(sig)
delete(a.store[pattern][action], sig)
}
// Called by Main
func (a *ActionStore) Quit() {
a.mutex.Lock()
defer a.mutex.Unlock()
for _, actions := range a.store {
for _, sigs := range actions {
for sig := range sigs {
close(sig)
}
}
}
a.store = make(ActionMap)
}
// Called by a CLI
func (a *ActionStore) Flush(pattern string) {
a.mutex.Lock()
defer a.mutex.Unlock()
if a.store[pattern] != nil {
for _, action := range a.store[pattern] {
for sig := range action {
close(sig)
}
}
}
delete(a.store, pattern)
}
// Called by a CLI
func (a *ActionStore) pendingActions() ReadableMap {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.store.ToReadable()
}
func (a ActionMap) ToReadable() ReadableMap {
res := make(ReadableMap)
for pattern, actions := range a {
for action := range actions {
filter := action.filter.name
stream := action.filter.stream.name
if res[stream] == nil {
res[stream] = make(map[string]map[string]int)
}
if res[stream][filter] == nil {
res[stream][filter] = make(map[string]int)
}
res[stream][filter][pattern] = res[stream][filter][pattern] + 1
}
}
return res
}
func (r ReadableMap) ToString() string {
text, err := yaml.Marshal(r)
if err != nil {
log.Fatalln(err)
}
return string(text)
}
// Pipe-related, server-related functions
func createOpenPipe() fs.File {
err := os.Mkdir(RuntimeDirectory(), 0755)
if err != nil && !errors.Is(err, os.ErrExist) {
log.Fatalln("FATAL Failed to create runtime directory", err)
}
pipePath := PipePath()
_, err = os.Stat(pipePath)
if err == nil {
log.Println("WARN Runtime file", pipePath, "already exists: Is the daemon already running? Deleting.")
err = os.Remove(pipePath)
if err != nil {
log.Println("FATAL Failed to remove runtime file:", err)
}
}
err = syscall.Mkfifo(pipePath, 0600)
if err != nil {
log.Println("FATAL Failed to create runtime file:", err)
}
file, err := os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe)
if err != nil {
log.Println("FATAL Failed to open runtime file:", err)
}
return file
}
func Respond(request Request, response Response) {
file, err := os.Create(request.ResponsePath())
if err != nil {
log.Println("WARN Can't respond to message:", err)
return
}
err = gob.NewEncoder(file).Encode(response)
if err != nil {
log.Println("WARN Can't respond to message:", err)
return
}
}
// Handle connections
func Serve() {
pipe := createOpenPipe()
for {
var request Request
err := gob.NewDecoder(pipe).Decode(&request)
if err != nil {
log.Println("WARN Invalid Message received: ", err)
}
go func(request Request) {
var response Response
switch request.Request {
case Query:
response.Actions = actionStore.store.ToReadable()
case Flush:
actionStore.Flush(request.Pattern)
default:
log.Println("WARN Invalid Message: unrecognised Request type")
}
Respond(request, response)
}(request)
}
}

View file

@ -24,10 +24,10 @@ func cmdStdout(commandline []string) chan *string {
cmd := exec.Command(commandline[0], commandline[1:]...)
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Fatal("couldn't open stdout on command:", err)
log.Fatalln("couldn't open stdout on command:", err)
}
if err := cmd.Start(); err != nil {
log.Fatal("couldn't start command:", err)
log.Fatalln("couldn't start command:", err)
}
defer stdout.Close()
@ -77,14 +77,17 @@ func sleep(d time.Duration) chan bool {
func (a *Action) exec(match string, advance time.Duration) {
defer wgActions.Done()
// Wait for either end of sleep time, or stopActions channel being closed
// Wait for either end of sleep time, or actionStore requesting stop
if a.afterDuration != 0 && a.afterDuration > advance {
stopAction := actionStore.Register(a, match)
select {
case <-sleep(a.afterDuration - advance):
// no-op
case _, _ = <-stopActions:
case _, _ = <-stopAction:
// no-op
}
// Let's not wait for the lock
go actionStore.Unregister(a, match, stopAction)
}
computedCommand := make([]string, 0, len(a.Cmd))
@ -174,7 +177,7 @@ func (s *Stream) handle(endedSignal chan *Stream) {
}
var stopStreams chan bool
var stopActions chan bool
var actionStore ActionStore
var wgActions sync.WaitGroup
var db *gob.Encoder
@ -188,6 +191,8 @@ func Main() {
os.Exit(2)
}
actionStore.store = make(ActionMap)
conf := parseConf(*confFilename)
db = conf.updateFromDB()
@ -197,7 +202,6 @@ func Main() {
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
stopStreams = make(chan bool)
stopActions = make(chan bool)
endSignals := make(chan *Stream)
noStreamsInExecution := len(conf.Streams)
@ -206,6 +210,8 @@ func Main() {
go stream.handle(endSignals)
}
go Serve()
for {
select {
case finishedStream := <-endSignals:
@ -225,9 +231,11 @@ func quit() {
// stop all streams
close(stopStreams)
// stop all actions
close(stopActions)
actionStore.Quit()
// wait for them to complete
wgActions.Wait()
// delete pipe
os.Remove(PipePath())
os.Exit(3)
}

View file

@ -248,7 +248,7 @@ func (c *Conf) updateFromDB() *gob.Encoder {
}
}
func parseConf(filename string) (*Conf, *gob.Encoder) {
func parseConf(filename string) *Conf {
data, err := os.ReadFile(filename)
if err != nil {

9
cli.go Normal file
View file

@ -0,0 +1,9 @@
package main
import (
"reaction/app"
)
func main() {
app.CLI()
}

View file

@ -4,7 +4,7 @@ patterns:
streams:
tailDown:
cmd: [ "sh", "-c", "echo 'found 1.1.1.1' && sleep 2s && echo 'found 1.1.1.2' && sleep 2s && echo 'found 1.1.1.1' && sleep 1s" ]
cmd: [ "sh", "-c", "echo 'found 1.1.1.1' && sleep 2s && echo 'found 1.1.1.2' && sleep 2s && echo 'found 1.1.1.1' && sleep 10m" ]
filters:
findIP:
regex:
@ -16,4 +16,4 @@ streams:
cmd: [ "echo", "<ip>" ]
sleepdamn:
cmd: [ "echo", "sleep", "<ip>" ]
after: 10s
after: 8m

View file