Backoff when retrying watch.

This commit is contained in:
Joachim Bauch 2024-04-11 22:53:56 +02:00
parent 88a575c36c
commit 26102e7acb
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
3 changed files with 39 additions and 3 deletions

View file

@ -134,10 +134,22 @@ func (s *backendStorageEtcd) EtcdClientCreated(client *EtcdClient) {
s.initializedFunc()
nextRevision := response.Header.Revision + 1
prevRevision := nextRevision
backoff.Reset()
for s.closeCtx.Err() == nil {
var err error
if nextRevision, err = client.Watch(s.closeCtx, s.keyPrefix, nextRevision, s, clientv3.WithPrefix()); err != nil {
log.Printf("Error processing watch for %s: %s", s.keyPrefix, err)
log.Printf("Error processing watch for %s (%s), retry in %s", s.keyPrefix, err, backoff.NextWait())
backoff.Wait(s.closeCtx)
continue
}
if nextRevision != prevRevision {
backoff.Reset()
prevRevision = nextRevision
} else {
log.Printf("Processing watch for %s interrupted, retry in %s", s.keyPrefix, backoff.NextWait())
backoff.Wait(s.closeCtx)
}
}
return

View file

@ -630,10 +630,22 @@ func (c *GrpcClients) EtcdClientCreated(client *EtcdClient) {
break
}
prevRevision := nextRevision
backoff.Reset()
for c.closeCtx.Err() == nil {
var err error
if nextRevision, err = client.Watch(c.closeCtx, c.targetPrefix, nextRevision, c, clientv3.WithPrefix()); err != nil {
log.Printf("Error processing watch for %s: %s", c.targetPrefix, err)
log.Printf("Error processing watch for %s (%s), retry in %s", c.targetPrefix, err, backoff.NextWait())
backoff.Wait(c.closeCtx)
continue
}
if nextRevision != prevRevision {
backoff.Reset()
prevRevision = nextRevision
} else {
log.Printf("Processing watch for %s interrupted, retry in %s", c.targetPrefix, backoff.NextWait())
backoff.Wait(c.closeCtx)
}
}
}()

View file

@ -132,10 +132,22 @@ func (p *proxyConfigEtcd) EtcdClientCreated(client *EtcdClient) {
break
}
prevRevision := nextRevision
backoff.Reset()
for p.closeCtx.Err() == nil {
var err error
if nextRevision, err = client.Watch(p.closeCtx, p.keyPrefix, nextRevision, p, clientv3.WithPrefix()); err != nil {
log.Printf("Error processing watch for %s: %s", p.keyPrefix, err)
log.Printf("Error processing watch for %s (%s), retry in %s", p.keyPrefix, err, backoff.NextWait())
backoff.Wait(p.closeCtx)
continue
}
if nextRevision != prevRevision {
backoff.Reset()
prevRevision = nextRevision
} else {
log.Printf("Processing watch for %s interrupted, retry in %s", p.keyPrefix, backoff.NextWait())
backoff.Wait(p.closeCtx)
}
}
}()