etcd: Fix race in initialized event.

It could happen that the initialized event was triggered even though the
watch was not fully created yet.
This commit is contained in:
Joachim Bauch 2022-07-08 10:56:36 +02:00
parent 0165788fe3
commit 40e1b208c0
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
5 changed files with 30 additions and 2 deletions

View file

@ -27,6 +27,7 @@ import (
"fmt" "fmt"
"log" "log"
"net/url" "net/url"
"sync"
"time" "time"
"github.com/dlintw/goconf" "github.com/dlintw/goconf"
@ -42,6 +43,7 @@ type backendStorageEtcd struct {
initializedCtx context.Context initializedCtx context.Context
initializedFunc context.CancelFunc initializedFunc context.CancelFunc
initializedWg sync.WaitGroup
wakeupChanForTesting chan bool wakeupChanForTesting chan bool
} }
@ -100,6 +102,7 @@ func (s *backendStorageEtcd) wakeupForTesting() {
} }
func (s *backendStorageEtcd) EtcdClientCreated(client *EtcdClient) { func (s *backendStorageEtcd) EtcdClientCreated(client *EtcdClient) {
s.initializedWg.Add(1)
go func() { go func() {
if err := client.Watch(context.Background(), s.keyPrefix, s, clientv3.WithPrefix()); err != nil { if err := client.Watch(context.Background(), s.keyPrefix, s, clientv3.WithPrefix()); err != nil {
log.Printf("Error processing watch for %s: %s", s.keyPrefix, err) log.Printf("Error processing watch for %s: %s", s.keyPrefix, err)
@ -130,12 +133,17 @@ func (s *backendStorageEtcd) EtcdClientCreated(client *EtcdClient) {
for _, ev := range response.Kvs { for _, ev := range response.Kvs {
s.EtcdKeyUpdated(client, string(ev.Key), ev.Value) s.EtcdKeyUpdated(client, string(ev.Key), ev.Value)
} }
s.initializedWg.Wait()
s.initializedFunc() s.initializedFunc()
return return
} }
}() }()
} }
func (s *backendStorageEtcd) EtcdWatchCreated(client *EtcdClient, key string) {
s.initializedWg.Done()
}
func (s *backendStorageEtcd) getBackends(client *EtcdClient, keyPrefix string) (*clientv3.GetResponse, error) { func (s *backendStorageEtcd) getBackends(client *EtcdClient, keyPrefix string) (*clientv3.GetResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel() defer cancel()

View file

@ -41,6 +41,7 @@ type EtcdClientListener interface {
} }
type EtcdClientWatcher interface { type EtcdClientWatcher interface {
EtcdWatchCreated(client *EtcdClient, key string)
EtcdKeyUpdated(client *EtcdClient, key string, value []byte) EtcdKeyUpdated(client *EtcdClient, key string, value []byte)
EtcdKeyDeleted(client *EtcdClient, key string) EtcdKeyDeleted(client *EtcdClient, key string)
} }
@ -242,6 +243,7 @@ func (c *EtcdClient) Watch(ctx context.Context, key string, watcher EtcdClientWa
log.Printf("Wait for leader and start watching on %s", key) log.Printf("Wait for leader and start watching on %s", key)
ch := c.getEtcdClient().Watch(clientv3.WithRequireLeader(ctx), key, opts...) ch := c.getEtcdClient().Watch(clientv3.WithRequireLeader(ctx), key, opts...)
log.Printf("Watch created for %s", key) log.Printf("Watch created for %s", key)
watcher.EtcdWatchCreated(c, key)
for response := range ch { for response := range ch {
if err := response.Err(); err != nil { if err := response.Err(); err != nil {
return err return err

View file

@ -29,6 +29,7 @@ import (
"os" "os"
"runtime" "runtime"
"strconv" "strconv"
"sync"
"syscall" "syscall"
"testing" "testing"
"time" "time"
@ -200,8 +201,9 @@ type EtcdClientTestListener struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
initial chan bool initial chan bool
events chan etcdEvent initialWg sync.WaitGroup
events chan etcdEvent
} }
func NewEtcdClientTestListener(ctx context.Context, t *testing.T) *EtcdClientTestListener { func NewEtcdClientTestListener(ctx context.Context, t *testing.T) *EtcdClientTestListener {
@ -222,6 +224,7 @@ func (l *EtcdClientTestListener) Close() {
} }
func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) { func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) {
l.initialWg.Add(1)
go func() { go func() {
if err := client.Watch(clientv3.WithRequireLeader(l.ctx), "foo", l, clientv3.WithPrefix()); err != nil { if err := client.Watch(clientv3.WithRequireLeader(l.ctx), "foo", l, clientv3.WithPrefix()); err != nil {
l.t.Error(err) l.t.Error(err)
@ -243,10 +246,15 @@ func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) {
} else if string(response.Kvs[0].Value) != "1" { } else if string(response.Kvs[0].Value) != "1" {
l.t.Errorf("expected value \"1\", got \"%s\"", string(response.Kvs[0].Value)) l.t.Errorf("expected value \"1\", got \"%s\"", string(response.Kvs[0].Value))
} }
l.initialWg.Wait()
l.initial <- true l.initial <- true
}() }()
} }
func (l *EtcdClientTestListener) EtcdWatchCreated(client *EtcdClient, key string) {
l.initialWg.Done()
}
func (l *EtcdClientTestListener) EtcdKeyUpdated(client *EtcdClient, key string, value []byte) { func (l *EtcdClientTestListener) EtcdKeyUpdated(client *EtcdClient, key string, value []byte) {
l.events <- etcdEvent{ l.events <- etcdEvent{
t: clientv3.EventTypePut, t: clientv3.EventTypePut,

View file

@ -259,6 +259,7 @@ type GrpcClients struct {
initializedCtx context.Context initializedCtx context.Context
initializedFunc context.CancelFunc initializedFunc context.CancelFunc
initializedWg sync.WaitGroup
wakeupChanForTesting chan bool wakeupChanForTesting chan bool
selfCheckWaitGroup sync.WaitGroup selfCheckWaitGroup sync.WaitGroup
} }
@ -584,6 +585,7 @@ func (c *GrpcClients) loadTargetsEtcd(config *goconf.ConfigFile, fromReload bool
} }
func (c *GrpcClients) EtcdClientCreated(client *EtcdClient) { func (c *GrpcClients) EtcdClientCreated(client *EtcdClient) {
c.initializedWg.Add(1)
go func() { go func() {
if err := client.Watch(context.Background(), c.targetPrefix, c, clientv3.WithPrefix()); err != nil { if err := client.Watch(context.Background(), c.targetPrefix, c, clientv3.WithPrefix()); err != nil {
log.Printf("Error processing watch for %s: %s", c.targetPrefix, err) log.Printf("Error processing watch for %s: %s", c.targetPrefix, err)
@ -610,12 +612,17 @@ func (c *GrpcClients) EtcdClientCreated(client *EtcdClient) {
for _, ev := range response.Kvs { for _, ev := range response.Kvs {
c.EtcdKeyUpdated(client, string(ev.Key), ev.Value) c.EtcdKeyUpdated(client, string(ev.Key), ev.Value)
} }
c.initializedWg.Wait()
c.initializedFunc() c.initializedFunc()
return return
} }
}() }()
} }
func (c *GrpcClients) EtcdWatchCreated(client *EtcdClient, key string) {
c.initializedWg.Done()
}
func (c *GrpcClients) getGrpcTargets(client *EtcdClient, targetPrefix string) (*clientv3.GetResponse, error) { func (c *GrpcClients) getGrpcTargets(client *EtcdClient, targetPrefix string) (*clientv3.GetResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel() defer cancel()

View file

@ -1561,6 +1561,9 @@ func (m *mcuProxy) EtcdClientCreated(client *EtcdClient) {
}() }()
} }
func (m *mcuProxy) EtcdWatchCreated(client *EtcdClient, key string) {
}
func (m *mcuProxy) getProxyUrls(client *EtcdClient, keyPrefix string) (*clientv3.GetResponse, error) { func (m *mcuProxy) getProxyUrls(client *EtcdClient, keyPrefix string) (*clientv3.GetResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel() defer cancel()