From d0d68f0d2175152e4f851f4b60e7e5a23d9345be Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Mon, 15 Apr 2024 11:57:52 +0200 Subject: [PATCH] Include previous value with etcd watch events. --- backend_storage_etcd.go | 6 +++--- etcd_client.go | 18 +++++++++++++----- etcd_client_test.go | 30 ++++++++++++++++++++++++++---- grpc_client.go | 6 +++--- proxy_config_etcd.go | 6 +++--- 5 files changed, 48 insertions(+), 18 deletions(-) diff --git a/backend_storage_etcd.go b/backend_storage_etcd.go index 321b8c3..cd669c8 100644 --- a/backend_storage_etcd.go +++ b/backend_storage_etcd.go @@ -129,7 +129,7 @@ func (s *backendStorageEtcd) EtcdClientCreated(client *EtcdClient) { } for _, ev := range response.Kvs { - s.EtcdKeyUpdated(client, string(ev.Key), ev.Value) + s.EtcdKeyUpdated(client, string(ev.Key), ev.Value, nil) } s.initializedFunc() @@ -167,7 +167,7 @@ func (s *backendStorageEtcd) getBackends(ctx context.Context, client *EtcdClient return client.Get(ctx, keyPrefix, clientv3.WithPrefix()) } -func (s *backendStorageEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []byte) { +func (s *backendStorageEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []byte, prevValue []byte) { var info BackendInformationEtcd if err := json.Unmarshal(data, &info); err != nil { log.Printf("Could not decode backend information %s: %s", string(data), err) @@ -227,7 +227,7 @@ func (s *backendStorageEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data s.wakeupForTesting() } -func (s *backendStorageEtcd) EtcdKeyDeleted(client *EtcdClient, key string) { +func (s *backendStorageEtcd) EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) { s.mu.Lock() defer s.mu.Unlock() diff --git a/etcd_client.go b/etcd_client.go index 6443701..ea1b64d 100644 --- a/etcd_client.go +++ b/etcd_client.go @@ -45,8 +45,8 @@ type EtcdClientListener interface { type EtcdClientWatcher interface { EtcdWatchCreated(client *EtcdClient, key string) - EtcdKeyUpdated(client *EtcdClient, key string, value []byte) - EtcdKeyDeleted(client *EtcdClient, key string) + EtcdKeyUpdated(client *EtcdClient, key string, value []byte, prevValue []byte) + EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) } type EtcdClient struct { @@ -261,7 +261,7 @@ func (c *EtcdClient) Get(ctx context.Context, key string, opts ...clientv3.OpOpt func (c *EtcdClient) Watch(ctx context.Context, key string, nextRevision int64, watcher EtcdClientWatcher, opts ...clientv3.OpOption) (int64, error) { log.Printf("Wait for leader and start watching on %s (rev=%d)", key, nextRevision) - opts = append(opts, clientv3.WithRev(nextRevision)) + opts = append(opts, clientv3.WithRev(nextRevision), clientv3.WithPrevKV()) ch := c.getEtcdClient().Watch(clientv3.WithRequireLeader(ctx), key, opts...) log.Printf("Watch created for %s", key) watcher.EtcdWatchCreated(c, key) @@ -274,9 +274,17 @@ func (c *EtcdClient) Watch(ctx context.Context, key string, nextRevision int64, for _, ev := range response.Events { switch ev.Type { case clientv3.EventTypePut: - watcher.EtcdKeyUpdated(c, string(ev.Kv.Key), ev.Kv.Value) + var prevValue []byte + if ev.PrevKv != nil { + prevValue = ev.PrevKv.Value + } + watcher.EtcdKeyUpdated(c, string(ev.Kv.Key), ev.Kv.Value, prevValue) case clientv3.EventTypeDelete: - watcher.EtcdKeyDeleted(c, string(ev.Kv.Key)) + var prevValue []byte + if ev.PrevKv != nil { + prevValue = ev.PrevKv.Value + } + watcher.EtcdKeyDeleted(c, string(ev.Kv.Key), prevValue) default: log.Printf("Unsupported watch event %s %q -> %q", ev.Type, ev.Kv.Key, ev.Kv.Value) } diff --git a/etcd_client_test.go b/etcd_client_test.go index 14f718b..7849c38 100644 --- a/etcd_client_test.go +++ b/etcd_client_test.go @@ -196,6 +196,8 @@ type etcdEvent struct { t mvccpb.Event_EventType key string value string + + prevValue string } type EtcdClientTestListener struct { @@ -260,19 +262,27 @@ func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) { func (l *EtcdClientTestListener) EtcdWatchCreated(client *EtcdClient, key string) { } -func (l *EtcdClientTestListener) EtcdKeyUpdated(client *EtcdClient, key string, value []byte) { - l.events <- etcdEvent{ +func (l *EtcdClientTestListener) EtcdKeyUpdated(client *EtcdClient, key string, value []byte, prevValue []byte) { + evt := etcdEvent{ t: clientv3.EventTypePut, key: string(key), value: string(value), } + if len(prevValue) > 0 { + evt.prevValue = string(prevValue) + } + l.events <- evt } -func (l *EtcdClientTestListener) EtcdKeyDeleted(client *EtcdClient, key string) { - l.events <- etcdEvent{ +func (l *EtcdClientTestListener) EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) { + evt := etcdEvent{ t: clientv3.EventTypeDelete, key: string(key), } + if len(prevValue) > 0 { + evt.prevValue = string(prevValue) + } + l.events <- evt } func Test_EtcdClient_Watch(t *testing.T) { @@ -298,11 +308,23 @@ func Test_EtcdClient_Watch(t *testing.T) { t.Errorf("expected value %s, got %s", "2", event.value) } + SetEtcdValue(etcd, "foo/a", []byte("3")) + event = <-listener.events + if event.t != clientv3.EventTypePut { + t.Errorf("expected type %d, got %d", clientv3.EventTypePut, event.t) + } else if event.key != "foo/a" { + t.Errorf("expected key %s, got %s", "foo/a", event.key) + } else if event.value != "3" { + t.Errorf("expected value %s, got %s", "3", event.value) + } + DeleteEtcdValue(etcd, "foo/a") event = <-listener.events if event.t != clientv3.EventTypeDelete { t.Errorf("expected type %d, got %d", clientv3.EventTypeDelete, event.t) } else if event.key != "foo/a" { t.Errorf("expected key %s, got %s", "foo/a", event.key) + } else if event.prevValue != "3" { + t.Errorf("expected previous value %s, got %s", "3", event.prevValue) } } diff --git a/grpc_client.go b/grpc_client.go index 312e056..fccdd97 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -623,7 +623,7 @@ func (c *GrpcClients) EtcdClientCreated(client *EtcdClient) { } for _, ev := range response.Kvs { - c.EtcdKeyUpdated(client, string(ev.Key), ev.Value) + c.EtcdKeyUpdated(client, string(ev.Key), ev.Value, nil) } c.initializedFunc() nextRevision = response.Header.Revision + 1 @@ -661,7 +661,7 @@ func (c *GrpcClients) getGrpcTargets(ctx context.Context, client *EtcdClient, ta return client.Get(ctx, targetPrefix, clientv3.WithPrefix()) } -func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte) { +func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte, prevValue []byte) { var info GrpcTargetInformationEtcd if err := json.Unmarshal(data, &info); err != nil { log.Printf("Could not decode GRPC target %s=%s: %s", key, string(data), err) @@ -710,7 +710,7 @@ func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte c.wakeupForTesting() } -func (c *GrpcClients) EtcdKeyDeleted(client *EtcdClient, key string) { +func (c *GrpcClients) EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) { c.mu.Lock() defer c.mu.Unlock() diff --git a/proxy_config_etcd.go b/proxy_config_etcd.go index ede9883..35ccade 100644 --- a/proxy_config_etcd.go +++ b/proxy_config_etcd.go @@ -126,7 +126,7 @@ func (p *proxyConfigEtcd) EtcdClientCreated(client *EtcdClient) { } for _, ev := range response.Kvs { - p.EtcdKeyUpdated(client, string(ev.Key), ev.Value) + p.EtcdKeyUpdated(client, string(ev.Key), ev.Value, nil) } nextRevision = response.Header.Revision + 1 break @@ -163,7 +163,7 @@ func (p *proxyConfigEtcd) getProxyUrls(ctx context.Context, client *EtcdClient, return client.Get(ctx, keyPrefix, clientv3.WithPrefix()) } -func (p *proxyConfigEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []byte) { +func (p *proxyConfigEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []byte, prevValue []byte) { var info ProxyInformationEtcd if err := json.Unmarshal(data, &info); err != nil { log.Printf("Could not decode proxy information %s: %s", string(data), err) @@ -204,7 +204,7 @@ func (p *proxyConfigEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data [] } } -func (p *proxyConfigEtcd) EtcdKeyDeleted(client *EtcdClient, key string) { +func (p *proxyConfigEtcd) EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) { p.mu.Lock() defer p.mu.Unlock()