mirror of
https://github.com/strukturag/nextcloud-spreed-signaling
synced 2024-05-27 02:52:27 +02:00
Merge pull request #704 from strukturag/etcd-prev-value
Include previous value with etcd watch events.
This commit is contained in:
commit
5bc9ada233
|
@ -129,7 +129,7 @@ func (s *backendStorageEtcd) EtcdClientCreated(client *EtcdClient) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ev := range response.Kvs {
|
for _, ev := range response.Kvs {
|
||||||
s.EtcdKeyUpdated(client, string(ev.Key), ev.Value)
|
s.EtcdKeyUpdated(client, string(ev.Key), ev.Value, nil)
|
||||||
}
|
}
|
||||||
s.initializedFunc()
|
s.initializedFunc()
|
||||||
|
|
||||||
|
@ -167,7 +167,7 @@ func (s *backendStorageEtcd) getBackends(ctx context.Context, client *EtcdClient
|
||||||
return client.Get(ctx, keyPrefix, clientv3.WithPrefix())
|
return client.Get(ctx, keyPrefix, clientv3.WithPrefix())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *backendStorageEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []byte) {
|
func (s *backendStorageEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []byte, prevValue []byte) {
|
||||||
var info BackendInformationEtcd
|
var info BackendInformationEtcd
|
||||||
if err := json.Unmarshal(data, &info); err != nil {
|
if err := json.Unmarshal(data, &info); err != nil {
|
||||||
log.Printf("Could not decode backend information %s: %s", string(data), err)
|
log.Printf("Could not decode backend information %s: %s", string(data), err)
|
||||||
|
@ -227,7 +227,7 @@ func (s *backendStorageEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data
|
||||||
s.wakeupForTesting()
|
s.wakeupForTesting()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *backendStorageEtcd) EtcdKeyDeleted(client *EtcdClient, key string) {
|
func (s *backendStorageEtcd) EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
|
|
@ -45,8 +45,8 @@ type EtcdClientListener interface {
|
||||||
|
|
||||||
type EtcdClientWatcher interface {
|
type EtcdClientWatcher interface {
|
||||||
EtcdWatchCreated(client *EtcdClient, key string)
|
EtcdWatchCreated(client *EtcdClient, key string)
|
||||||
EtcdKeyUpdated(client *EtcdClient, key string, value []byte)
|
EtcdKeyUpdated(client *EtcdClient, key string, value []byte, prevValue []byte)
|
||||||
EtcdKeyDeleted(client *EtcdClient, key string)
|
EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte)
|
||||||
}
|
}
|
||||||
|
|
||||||
type EtcdClient struct {
|
type EtcdClient struct {
|
||||||
|
@ -261,7 +261,7 @@ func (c *EtcdClient) Get(ctx context.Context, key string, opts ...clientv3.OpOpt
|
||||||
|
|
||||||
func (c *EtcdClient) Watch(ctx context.Context, key string, nextRevision int64, watcher EtcdClientWatcher, opts ...clientv3.OpOption) (int64, error) {
|
func (c *EtcdClient) Watch(ctx context.Context, key string, nextRevision int64, watcher EtcdClientWatcher, opts ...clientv3.OpOption) (int64, error) {
|
||||||
log.Printf("Wait for leader and start watching on %s (rev=%d)", key, nextRevision)
|
log.Printf("Wait for leader and start watching on %s (rev=%d)", key, nextRevision)
|
||||||
opts = append(opts, clientv3.WithRev(nextRevision))
|
opts = append(opts, clientv3.WithRev(nextRevision), clientv3.WithPrevKV())
|
||||||
ch := c.getEtcdClient().Watch(clientv3.WithRequireLeader(ctx), key, opts...)
|
ch := c.getEtcdClient().Watch(clientv3.WithRequireLeader(ctx), key, opts...)
|
||||||
log.Printf("Watch created for %s", key)
|
log.Printf("Watch created for %s", key)
|
||||||
watcher.EtcdWatchCreated(c, key)
|
watcher.EtcdWatchCreated(c, key)
|
||||||
|
@ -274,9 +274,17 @@ func (c *EtcdClient) Watch(ctx context.Context, key string, nextRevision int64,
|
||||||
for _, ev := range response.Events {
|
for _, ev := range response.Events {
|
||||||
switch ev.Type {
|
switch ev.Type {
|
||||||
case clientv3.EventTypePut:
|
case clientv3.EventTypePut:
|
||||||
watcher.EtcdKeyUpdated(c, string(ev.Kv.Key), ev.Kv.Value)
|
var prevValue []byte
|
||||||
|
if ev.PrevKv != nil {
|
||||||
|
prevValue = ev.PrevKv.Value
|
||||||
|
}
|
||||||
|
watcher.EtcdKeyUpdated(c, string(ev.Kv.Key), ev.Kv.Value, prevValue)
|
||||||
case clientv3.EventTypeDelete:
|
case clientv3.EventTypeDelete:
|
||||||
watcher.EtcdKeyDeleted(c, string(ev.Kv.Key))
|
var prevValue []byte
|
||||||
|
if ev.PrevKv != nil {
|
||||||
|
prevValue = ev.PrevKv.Value
|
||||||
|
}
|
||||||
|
watcher.EtcdKeyDeleted(c, string(ev.Kv.Key), prevValue)
|
||||||
default:
|
default:
|
||||||
log.Printf("Unsupported watch event %s %q -> %q", ev.Type, ev.Kv.Key, ev.Kv.Value)
|
log.Printf("Unsupported watch event %s %q -> %q", ev.Type, ev.Kv.Key, ev.Kv.Value)
|
||||||
}
|
}
|
||||||
|
|
|
@ -196,6 +196,8 @@ type etcdEvent struct {
|
||||||
t mvccpb.Event_EventType
|
t mvccpb.Event_EventType
|
||||||
key string
|
key string
|
||||||
value string
|
value string
|
||||||
|
|
||||||
|
prevValue string
|
||||||
}
|
}
|
||||||
|
|
||||||
type EtcdClientTestListener struct {
|
type EtcdClientTestListener struct {
|
||||||
|
@ -260,19 +262,27 @@ func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) {
|
||||||
func (l *EtcdClientTestListener) EtcdWatchCreated(client *EtcdClient, key string) {
|
func (l *EtcdClientTestListener) EtcdWatchCreated(client *EtcdClient, key string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *EtcdClientTestListener) EtcdKeyUpdated(client *EtcdClient, key string, value []byte) {
|
func (l *EtcdClientTestListener) EtcdKeyUpdated(client *EtcdClient, key string, value []byte, prevValue []byte) {
|
||||||
l.events <- etcdEvent{
|
evt := etcdEvent{
|
||||||
t: clientv3.EventTypePut,
|
t: clientv3.EventTypePut,
|
||||||
key: string(key),
|
key: string(key),
|
||||||
value: string(value),
|
value: string(value),
|
||||||
}
|
}
|
||||||
|
if len(prevValue) > 0 {
|
||||||
|
evt.prevValue = string(prevValue)
|
||||||
|
}
|
||||||
|
l.events <- evt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *EtcdClientTestListener) EtcdKeyDeleted(client *EtcdClient, key string) {
|
func (l *EtcdClientTestListener) EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) {
|
||||||
l.events <- etcdEvent{
|
evt := etcdEvent{
|
||||||
t: clientv3.EventTypeDelete,
|
t: clientv3.EventTypeDelete,
|
||||||
key: string(key),
|
key: string(key),
|
||||||
}
|
}
|
||||||
|
if len(prevValue) > 0 {
|
||||||
|
evt.prevValue = string(prevValue)
|
||||||
|
}
|
||||||
|
l.events <- evt
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_EtcdClient_Watch(t *testing.T) {
|
func Test_EtcdClient_Watch(t *testing.T) {
|
||||||
|
@ -298,11 +308,23 @@ func Test_EtcdClient_Watch(t *testing.T) {
|
||||||
t.Errorf("expected value %s, got %s", "2", event.value)
|
t.Errorf("expected value %s, got %s", "2", event.value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SetEtcdValue(etcd, "foo/a", []byte("3"))
|
||||||
|
event = <-listener.events
|
||||||
|
if event.t != clientv3.EventTypePut {
|
||||||
|
t.Errorf("expected type %d, got %d", clientv3.EventTypePut, event.t)
|
||||||
|
} else if event.key != "foo/a" {
|
||||||
|
t.Errorf("expected key %s, got %s", "foo/a", event.key)
|
||||||
|
} else if event.value != "3" {
|
||||||
|
t.Errorf("expected value %s, got %s", "3", event.value)
|
||||||
|
}
|
||||||
|
|
||||||
DeleteEtcdValue(etcd, "foo/a")
|
DeleteEtcdValue(etcd, "foo/a")
|
||||||
event = <-listener.events
|
event = <-listener.events
|
||||||
if event.t != clientv3.EventTypeDelete {
|
if event.t != clientv3.EventTypeDelete {
|
||||||
t.Errorf("expected type %d, got %d", clientv3.EventTypeDelete, event.t)
|
t.Errorf("expected type %d, got %d", clientv3.EventTypeDelete, event.t)
|
||||||
} else if event.key != "foo/a" {
|
} else if event.key != "foo/a" {
|
||||||
t.Errorf("expected key %s, got %s", "foo/a", event.key)
|
t.Errorf("expected key %s, got %s", "foo/a", event.key)
|
||||||
|
} else if event.prevValue != "3" {
|
||||||
|
t.Errorf("expected previous value %s, got %s", "3", event.prevValue)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -623,7 +623,7 @@ func (c *GrpcClients) EtcdClientCreated(client *EtcdClient) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ev := range response.Kvs {
|
for _, ev := range response.Kvs {
|
||||||
c.EtcdKeyUpdated(client, string(ev.Key), ev.Value)
|
c.EtcdKeyUpdated(client, string(ev.Key), ev.Value, nil)
|
||||||
}
|
}
|
||||||
c.initializedFunc()
|
c.initializedFunc()
|
||||||
nextRevision = response.Header.Revision + 1
|
nextRevision = response.Header.Revision + 1
|
||||||
|
@ -661,7 +661,7 @@ func (c *GrpcClients) getGrpcTargets(ctx context.Context, client *EtcdClient, ta
|
||||||
return client.Get(ctx, targetPrefix, clientv3.WithPrefix())
|
return client.Get(ctx, targetPrefix, clientv3.WithPrefix())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte) {
|
func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte, prevValue []byte) {
|
||||||
var info GrpcTargetInformationEtcd
|
var info GrpcTargetInformationEtcd
|
||||||
if err := json.Unmarshal(data, &info); err != nil {
|
if err := json.Unmarshal(data, &info); err != nil {
|
||||||
log.Printf("Could not decode GRPC target %s=%s: %s", key, string(data), err)
|
log.Printf("Could not decode GRPC target %s=%s: %s", key, string(data), err)
|
||||||
|
@ -710,7 +710,7 @@ func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte
|
||||||
c.wakeupForTesting()
|
c.wakeupForTesting()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *GrpcClients) EtcdKeyDeleted(client *EtcdClient, key string) {
|
func (c *GrpcClients) EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
|
|
@ -126,7 +126,7 @@ func (p *proxyConfigEtcd) EtcdClientCreated(client *EtcdClient) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ev := range response.Kvs {
|
for _, ev := range response.Kvs {
|
||||||
p.EtcdKeyUpdated(client, string(ev.Key), ev.Value)
|
p.EtcdKeyUpdated(client, string(ev.Key), ev.Value, nil)
|
||||||
}
|
}
|
||||||
nextRevision = response.Header.Revision + 1
|
nextRevision = response.Header.Revision + 1
|
||||||
break
|
break
|
||||||
|
@ -163,7 +163,7 @@ func (p *proxyConfigEtcd) getProxyUrls(ctx context.Context, client *EtcdClient,
|
||||||
return client.Get(ctx, keyPrefix, clientv3.WithPrefix())
|
return client.Get(ctx, keyPrefix, clientv3.WithPrefix())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *proxyConfigEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []byte) {
|
func (p *proxyConfigEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []byte, prevValue []byte) {
|
||||||
var info ProxyInformationEtcd
|
var info ProxyInformationEtcd
|
||||||
if err := json.Unmarshal(data, &info); err != nil {
|
if err := json.Unmarshal(data, &info); err != nil {
|
||||||
log.Printf("Could not decode proxy information %s: %s", string(data), err)
|
log.Printf("Could not decode proxy information %s: %s", string(data), err)
|
||||||
|
@ -204,7 +204,7 @@ func (p *proxyConfigEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *proxyConfigEtcd) EtcdKeyDeleted(client *EtcdClient, key string) {
|
func (p *proxyConfigEtcd) EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue