Include previous value with etcd watch events.

This commit is contained in:
Joachim Bauch 2024-04-15 11:57:52 +02:00
parent 9a892a194e
commit d0d68f0d21
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
5 changed files with 48 additions and 18 deletions

View file

@ -129,7 +129,7 @@ func (s *backendStorageEtcd) EtcdClientCreated(client *EtcdClient) {
}
for _, ev := range response.Kvs {
s.EtcdKeyUpdated(client, string(ev.Key), ev.Value)
s.EtcdKeyUpdated(client, string(ev.Key), ev.Value, nil)
}
s.initializedFunc()
@ -167,7 +167,7 @@ func (s *backendStorageEtcd) getBackends(ctx context.Context, client *EtcdClient
return client.Get(ctx, keyPrefix, clientv3.WithPrefix())
}
func (s *backendStorageEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []byte) {
func (s *backendStorageEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []byte, prevValue []byte) {
var info BackendInformationEtcd
if err := json.Unmarshal(data, &info); err != nil {
log.Printf("Could not decode backend information %s: %s", string(data), err)
@ -227,7 +227,7 @@ func (s *backendStorageEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data
s.wakeupForTesting()
}
func (s *backendStorageEtcd) EtcdKeyDeleted(client *EtcdClient, key string) {
func (s *backendStorageEtcd) EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) {
s.mu.Lock()
defer s.mu.Unlock()

View file

@ -45,8 +45,8 @@ type EtcdClientListener interface {
type EtcdClientWatcher interface {
EtcdWatchCreated(client *EtcdClient, key string)
EtcdKeyUpdated(client *EtcdClient, key string, value []byte)
EtcdKeyDeleted(client *EtcdClient, key string)
EtcdKeyUpdated(client *EtcdClient, key string, value []byte, prevValue []byte)
EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte)
}
type EtcdClient struct {
@ -261,7 +261,7 @@ func (c *EtcdClient) Get(ctx context.Context, key string, opts ...clientv3.OpOpt
func (c *EtcdClient) Watch(ctx context.Context, key string, nextRevision int64, watcher EtcdClientWatcher, opts ...clientv3.OpOption) (int64, error) {
log.Printf("Wait for leader and start watching on %s (rev=%d)", key, nextRevision)
opts = append(opts, clientv3.WithRev(nextRevision))
opts = append(opts, clientv3.WithRev(nextRevision), clientv3.WithPrevKV())
ch := c.getEtcdClient().Watch(clientv3.WithRequireLeader(ctx), key, opts...)
log.Printf("Watch created for %s", key)
watcher.EtcdWatchCreated(c, key)
@ -274,9 +274,17 @@ func (c *EtcdClient) Watch(ctx context.Context, key string, nextRevision int64,
for _, ev := range response.Events {
switch ev.Type {
case clientv3.EventTypePut:
watcher.EtcdKeyUpdated(c, string(ev.Kv.Key), ev.Kv.Value)
var prevValue []byte
if ev.PrevKv != nil {
prevValue = ev.PrevKv.Value
}
watcher.EtcdKeyUpdated(c, string(ev.Kv.Key), ev.Kv.Value, prevValue)
case clientv3.EventTypeDelete:
watcher.EtcdKeyDeleted(c, string(ev.Kv.Key))
var prevValue []byte
if ev.PrevKv != nil {
prevValue = ev.PrevKv.Value
}
watcher.EtcdKeyDeleted(c, string(ev.Kv.Key), prevValue)
default:
log.Printf("Unsupported watch event %s %q -> %q", ev.Type, ev.Kv.Key, ev.Kv.Value)
}

View file

@ -196,6 +196,8 @@ type etcdEvent struct {
t mvccpb.Event_EventType
key string
value string
prevValue string
}
type EtcdClientTestListener struct {
@ -260,19 +262,27 @@ func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) {
func (l *EtcdClientTestListener) EtcdWatchCreated(client *EtcdClient, key string) {
}
func (l *EtcdClientTestListener) EtcdKeyUpdated(client *EtcdClient, key string, value []byte) {
l.events <- etcdEvent{
func (l *EtcdClientTestListener) EtcdKeyUpdated(client *EtcdClient, key string, value []byte, prevValue []byte) {
evt := etcdEvent{
t: clientv3.EventTypePut,
key: string(key),
value: string(value),
}
if len(prevValue) > 0 {
evt.prevValue = string(prevValue)
}
l.events <- evt
}
func (l *EtcdClientTestListener) EtcdKeyDeleted(client *EtcdClient, key string) {
l.events <- etcdEvent{
func (l *EtcdClientTestListener) EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) {
evt := etcdEvent{
t: clientv3.EventTypeDelete,
key: string(key),
}
if len(prevValue) > 0 {
evt.prevValue = string(prevValue)
}
l.events <- evt
}
func Test_EtcdClient_Watch(t *testing.T) {
@ -298,11 +308,23 @@ func Test_EtcdClient_Watch(t *testing.T) {
t.Errorf("expected value %s, got %s", "2", event.value)
}
SetEtcdValue(etcd, "foo/a", []byte("3"))
event = <-listener.events
if event.t != clientv3.EventTypePut {
t.Errorf("expected type %d, got %d", clientv3.EventTypePut, event.t)
} else if event.key != "foo/a" {
t.Errorf("expected key %s, got %s", "foo/a", event.key)
} else if event.value != "3" {
t.Errorf("expected value %s, got %s", "3", event.value)
}
DeleteEtcdValue(etcd, "foo/a")
event = <-listener.events
if event.t != clientv3.EventTypeDelete {
t.Errorf("expected type %d, got %d", clientv3.EventTypeDelete, event.t)
} else if event.key != "foo/a" {
t.Errorf("expected key %s, got %s", "foo/a", event.key)
} else if event.prevValue != "3" {
t.Errorf("expected previous value %s, got %s", "3", event.prevValue)
}
}

View file

@ -623,7 +623,7 @@ func (c *GrpcClients) EtcdClientCreated(client *EtcdClient) {
}
for _, ev := range response.Kvs {
c.EtcdKeyUpdated(client, string(ev.Key), ev.Value)
c.EtcdKeyUpdated(client, string(ev.Key), ev.Value, nil)
}
c.initializedFunc()
nextRevision = response.Header.Revision + 1
@ -661,7 +661,7 @@ func (c *GrpcClients) getGrpcTargets(ctx context.Context, client *EtcdClient, ta
return client.Get(ctx, targetPrefix, clientv3.WithPrefix())
}
func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte) {
func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte, prevValue []byte) {
var info GrpcTargetInformationEtcd
if err := json.Unmarshal(data, &info); err != nil {
log.Printf("Could not decode GRPC target %s=%s: %s", key, string(data), err)
@ -710,7 +710,7 @@ func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte
c.wakeupForTesting()
}
func (c *GrpcClients) EtcdKeyDeleted(client *EtcdClient, key string) {
func (c *GrpcClients) EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) {
c.mu.Lock()
defer c.mu.Unlock()

View file

@ -126,7 +126,7 @@ func (p *proxyConfigEtcd) EtcdClientCreated(client *EtcdClient) {
}
for _, ev := range response.Kvs {
p.EtcdKeyUpdated(client, string(ev.Key), ev.Value)
p.EtcdKeyUpdated(client, string(ev.Key), ev.Value, nil)
}
nextRevision = response.Header.Revision + 1
break
@ -163,7 +163,7 @@ func (p *proxyConfigEtcd) getProxyUrls(ctx context.Context, client *EtcdClient,
return client.Get(ctx, keyPrefix, clientv3.WithPrefix())
}
func (p *proxyConfigEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []byte) {
func (p *proxyConfigEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []byte, prevValue []byte) {
var info ProxyInformationEtcd
if err := json.Unmarshal(data, &info); err != nil {
log.Printf("Could not decode proxy information %s: %s", string(data), err)
@ -204,7 +204,7 @@ func (p *proxyConfigEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []
}
}
func (p *proxyConfigEtcd) EtcdKeyDeleted(client *EtcdClient, key string) {
func (p *proxyConfigEtcd) EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) {
p.mu.Lock()
defer p.mu.Unlock()