diff --git a/backend_storage_etcd.go b/backend_storage_etcd.go index a88f71a..321b8c3 100644 --- a/backend_storage_etcd.go +++ b/backend_storage_etcd.go @@ -134,10 +134,22 @@ func (s *backendStorageEtcd) EtcdClientCreated(client *EtcdClient) { s.initializedFunc() nextRevision := response.Header.Revision + 1 + prevRevision := nextRevision + backoff.Reset() for s.closeCtx.Err() == nil { var err error if nextRevision, err = client.Watch(s.closeCtx, s.keyPrefix, nextRevision, s, clientv3.WithPrefix()); err != nil { - log.Printf("Error processing watch for %s: %s", s.keyPrefix, err) + log.Printf("Error processing watch for %s (%s), retry in %s", s.keyPrefix, err, backoff.NextWait()) + backoff.Wait(s.closeCtx) + continue + } + + if nextRevision != prevRevision { + backoff.Reset() + prevRevision = nextRevision + } else { + log.Printf("Processing watch for %s interrupted, retry in %s", s.keyPrefix, backoff.NextWait()) + backoff.Wait(s.closeCtx) } } return diff --git a/grpc_client.go b/grpc_client.go index 2e50a83..312e056 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -630,10 +630,22 @@ func (c *GrpcClients) EtcdClientCreated(client *EtcdClient) { break } + prevRevision := nextRevision + backoff.Reset() for c.closeCtx.Err() == nil { var err error if nextRevision, err = client.Watch(c.closeCtx, c.targetPrefix, nextRevision, c, clientv3.WithPrefix()); err != nil { - log.Printf("Error processing watch for %s: %s", c.targetPrefix, err) + log.Printf("Error processing watch for %s (%s), retry in %s", c.targetPrefix, err, backoff.NextWait()) + backoff.Wait(c.closeCtx) + continue + } + + if nextRevision != prevRevision { + backoff.Reset() + prevRevision = nextRevision + } else { + log.Printf("Processing watch for %s interrupted, retry in %s", c.targetPrefix, backoff.NextWait()) + backoff.Wait(c.closeCtx) } } }() diff --git a/proxy_config_etcd.go b/proxy_config_etcd.go index 3d40ef1..ede9883 100644 --- a/proxy_config_etcd.go +++ b/proxy_config_etcd.go @@ -132,10 +132,22 @@ func (p *proxyConfigEtcd) EtcdClientCreated(client *EtcdClient) { break } + prevRevision := nextRevision + backoff.Reset() for p.closeCtx.Err() == nil { var err error if nextRevision, err = client.Watch(p.closeCtx, p.keyPrefix, nextRevision, p, clientv3.WithPrefix()); err != nil { - log.Printf("Error processing watch for %s: %s", p.keyPrefix, err) + log.Printf("Error processing watch for %s (%s), retry in %s", p.keyPrefix, err, backoff.NextWait()) + backoff.Wait(p.closeCtx) + continue + } + + if nextRevision != prevRevision { + backoff.Reset() + prevRevision = nextRevision + } else { + log.Printf("Processing watch for %s interrupted, retry in %s", p.keyPrefix, backoff.NextWait()) + backoff.Wait(p.closeCtx) } } }()