Reuse backoff waiting code in etcd clients.

This commit is contained in:
Joachim Bauch 2024-02-27 16:27:17 +01:00
parent a68454ceec
commit bde0b08eb1
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
5 changed files with 41 additions and 30 deletions

View file

@ -103,23 +103,24 @@ func (s *backendStorageEtcd) EtcdClientCreated(client *EtcdClient) {
}()
go func() {
client.WaitForConnection()
if err := client.WaitForConnection(context.Background()); err != nil {
panic(err)
}
waitDelay := initialWaitDelay
backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay)
if err != nil {
panic(err)
}
for {
response, err := s.getBackends(client, s.keyPrefix)
if err != nil {
if err == context.DeadlineExceeded {
log.Printf("Timeout getting initial list of backends, retry in %s", waitDelay)
log.Printf("Timeout getting initial list of backends, retry in %s", backoff.NextWait())
} else {
log.Printf("Could not get initial list of backends, retry in %s: %s", waitDelay, err)
log.Printf("Could not get initial list of backends, retry in %s: %s", backoff.NextWait(), err)
}
time.Sleep(waitDelay)
waitDelay = waitDelay * 2
if waitDelay > maxWaitDelay {
waitDelay = maxWaitDelay
}
backoff.Wait(context.Background())
continue
}

View file

@ -212,26 +212,30 @@ func (c *EtcdClient) RemoveListener(listener EtcdClientListener) {
delete(c.listeners, listener)
}
func (c *EtcdClient) WaitForConnection() {
waitDelay := initialWaitDelay
func (c *EtcdClient) WaitForConnection(ctx context.Context) error {
backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay)
if err != nil {
return err
}
for {
if err := ctx.Err(); err != nil {
return err
}
if err := c.syncClient(); err != nil {
if err == context.DeadlineExceeded {
log.Printf("Timeout waiting for etcd client to connect to the cluster, retry in %s", waitDelay)
log.Printf("Timeout waiting for etcd client to connect to the cluster, retry in %s", backoff.NextWait())
} else {
log.Printf("Could not sync etcd client with the cluster, retry in %s: %s", waitDelay, err)
log.Printf("Could not sync etcd client with the cluster, retry in %s: %s", backoff.NextWait(), err)
}
time.Sleep(waitDelay)
waitDelay = waitDelay * 2
if waitDelay > maxWaitDelay {
waitDelay = maxWaitDelay
}
backoff.Wait(ctx)
continue
}
log.Printf("Client synced, using endpoints %+v", c.getEtcdClient().Endpoints())
return
return nil
}
}

View file

@ -236,7 +236,10 @@ func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) {
go func() {
defer close(l.initial)
client.WaitForConnection()
if err := client.WaitForConnection(l.ctx); err != nil {
l.t.Errorf("error waiting for connection: %s", err)
return
}
ctx, cancel := context.WithTimeout(l.ctx, time.Second)
defer cancel()

View file

@ -594,7 +594,9 @@ func (c *GrpcClients) EtcdClientCreated(client *EtcdClient) {
}()
go func() {
client.WaitForConnection()
if err := client.WaitForConnection(context.Background()); err != nil {
panic(err)
}
backoff, _ := NewExponentialBackoff(initialWaitDelay, maxWaitDelay)
for {

View file

@ -93,23 +93,24 @@ func (p *proxyConfigEtcd) EtcdClientCreated(client *EtcdClient) {
}()
go func() {
client.WaitForConnection()
if err := client.WaitForConnection(context.Background()); err != nil {
panic(err)
}
waitDelay := initialWaitDelay
backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay)
if err != nil {
panic(err)
}
for {
response, err := p.getProxyUrls(client, p.keyPrefix)
if err != nil {
if err == context.DeadlineExceeded {
log.Printf("Timeout getting initial list of proxy URLs, retry in %s", waitDelay)
log.Printf("Timeout getting initial list of proxy URLs, retry in %s", backoff.NextWait())
} else {
log.Printf("Could not get initial list of proxy URLs, retry in %s: %s", waitDelay, err)
log.Printf("Could not get initial list of proxy URLs, retry in %s: %s", backoff.NextWait(), err)
}
time.Sleep(waitDelay)
waitDelay = waitDelay * 2
if waitDelay > maxWaitDelay {
waitDelay = maxWaitDelay
}
backoff.Wait(context.Background())
continue
}