commit f00ec7fa0817544e143b50305e9c9ef8a15ee274 Author: Simon Vieille Date: Mon May 5 09:25:43 2025 +0200 add tests diff --git a/cmd/server/main.go b/cmd/server/main.go new file mode 100644 index 0000000..6bcfbf1 --- /dev/null +++ b/cmd/server/main.go @@ -0,0 +1,22 @@ +package main + +import ( + "time" + + "github.com/hibiken/asynq" + "gitnet.fr/deblan/golang-worker/task/foo" +) + +const redisAddr = "127.0.0.1:6379" + +func main() { + client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr}) + defer client.Close() + + for { + task, _ := foo.NewFooTask(time.Now(), "foo") + client.Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute)) + + time.Sleep(1 * time.Second) + } +} diff --git a/cmd/worker/main.go b/cmd/worker/main.go new file mode 100644 index 0000000..1c45581 --- /dev/null +++ b/cmd/worker/main.go @@ -0,0 +1,35 @@ +package main + +import ( + "log" + + "github.com/hibiken/asynq" + "gitnet.fr/deblan/golang-worker/task/foo" +) + +const redisAddr = "127.0.0.1:6379" + +func main() { + srv := asynq.NewServer( + asynq.RedisClientOpt{Addr: redisAddr}, + asynq.Config{ + // Specify how many concurrent workers to use + Concurrency: 10, + // Optionally specify multiple queues with different priority. + Queues: map[string]int{ + "critical": 6, + "default": 3, + "low": 1, + }, + // See the godoc for other configuration options + }, + ) + + // mux maps a type to a handler + mux := asynq.NewServeMux() + mux.HandleFunc("foo", foo.HandleFooTask) + + if err := srv.Run(mux); err != nil { + log.Fatalf("could not run server: %v", err) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..982cc35 --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module gitnet.fr/deblan/golang-worker + +go 1.23.0 + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hibiken/asynq v0.25.1 // indirect + github.com/redis/go-redis/v9 v9.8.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect + github.com/spf13/cast v1.8.0 // indirect + golang.org/x/sys v0.32.0 // indirect + golang.org/x/time v0.11.0 // indirect + google.golang.org/protobuf v1.36.6 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a500ae3 --- /dev/null +++ b/go.sum @@ -0,0 +1,20 @@ +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hibiken/asynq v0.25.1 h1:phj028N0nm15n8O2ims+IvJ2gz4k2auvermngh9JhTw= +github.com/hibiken/asynq v0.25.1/go.mod h1:pazWNOLBu0FEynQRBvHA26qdIKRSmfdIfUm4HdsLmXg= +github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI= +github.com/redis/go-redis/v9 v9.8.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/spf13/cast v1.8.0 h1:gEN9K4b8Xws4EX0+a0reLmhq8moKn7ntRlQYgjPeCDk= +github.com/spf13/cast v1.8.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= +golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= diff --git a/task/foo/task.go b/task/foo/task.go new file mode 100644 index 0000000..15aaadc --- /dev/null +++ b/task/foo/task.go @@ -0,0 +1,37 @@ +package foo + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/hibiken/asynq" +) + +type FooTask struct { + Date time.Time + Message string +} + +func NewFooTask(date time.Time, message string) (*asynq.Task, error) { + payload, err := json.Marshal(FooTask{Date: date, Message: message}) + + if err != nil { + return nil, err + } + + return asynq.NewTask("foo", payload), nil +} + +func HandleFooTask(ctx context.Context, t *asynq.Task) error { + var p FooTask + + if err := json.Unmarshal(t.Payload(), &p); err != nil { + return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + } + + fmt.Printf("%+v\n", p) + + return nil +}