From bde0b08eb19c6b6e248d2f4c956a348d80f7add1 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 27 Feb 2024 16:27:17 +0100 Subject: [PATCH] Reuse backoff waiting code in etcd clients. --- backend_storage_etcd.go | 19 ++++++++++--------- etcd_client.go | 24 ++++++++++++++---------- etcd_client_test.go | 5 ++++- grpc_client.go | 4 +++- proxy_config_etcd.go | 19 ++++++++++--------- 5 files changed, 41 insertions(+), 30 deletions(-) diff --git a/backend_storage_etcd.go b/backend_storage_etcd.go index 08717c2..de3e66d 100644 --- a/backend_storage_etcd.go +++ b/backend_storage_etcd.go @@ -103,23 +103,24 @@ func (s *backendStorageEtcd) EtcdClientCreated(client *EtcdClient) { }() go func() { - client.WaitForConnection() + if err := client.WaitForConnection(context.Background()); err != nil { + panic(err) + } - waitDelay := initialWaitDelay + backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay) + if err != nil { + panic(err) + } for { response, err := s.getBackends(client, s.keyPrefix) if err != nil { if err == context.DeadlineExceeded { - log.Printf("Timeout getting initial list of backends, retry in %s", waitDelay) + log.Printf("Timeout getting initial list of backends, retry in %s", backoff.NextWait()) } else { - log.Printf("Could not get initial list of backends, retry in %s: %s", waitDelay, err) + log.Printf("Could not get initial list of backends, retry in %s: %s", backoff.NextWait(), err) } - time.Sleep(waitDelay) - waitDelay = waitDelay * 2 - if waitDelay > maxWaitDelay { - waitDelay = maxWaitDelay - } + backoff.Wait(context.Background()) continue } diff --git a/etcd_client.go b/etcd_client.go index 8da453d..815a20f 100644 --- a/etcd_client.go +++ b/etcd_client.go @@ -212,26 +212,30 @@ func (c *EtcdClient) RemoveListener(listener EtcdClientListener) { delete(c.listeners, listener) } -func (c *EtcdClient) WaitForConnection() { - waitDelay := initialWaitDelay +func (c *EtcdClient) WaitForConnection(ctx context.Context) error { + backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay) + if err != nil { + return err + } + for { + if err := ctx.Err(); err != nil { + return err + } + if err := c.syncClient(); err != nil { if err == context.DeadlineExceeded { - log.Printf("Timeout waiting for etcd client to connect to the cluster, retry in %s", waitDelay) + log.Printf("Timeout waiting for etcd client to connect to the cluster, retry in %s", backoff.NextWait()) } else { - log.Printf("Could not sync etcd client with the cluster, retry in %s: %s", waitDelay, err) + log.Printf("Could not sync etcd client with the cluster, retry in %s: %s", backoff.NextWait(), err) } - time.Sleep(waitDelay) - waitDelay = waitDelay * 2 - if waitDelay > maxWaitDelay { - waitDelay = maxWaitDelay - } + backoff.Wait(ctx) continue } log.Printf("Client synced, using endpoints %+v", c.getEtcdClient().Endpoints()) - return + return nil } } diff --git a/etcd_client_test.go b/etcd_client_test.go index dd85ddb..d40bf28 100644 --- a/etcd_client_test.go +++ b/etcd_client_test.go @@ -236,7 +236,10 @@ func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) { go func() { defer close(l.initial) - client.WaitForConnection() + if err := client.WaitForConnection(l.ctx); err != nil { + l.t.Errorf("error waiting for connection: %s", err) + return + } ctx, cancel := context.WithTimeout(l.ctx, time.Second) defer cancel() diff --git a/grpc_client.go b/grpc_client.go index 8d50226..0461a23 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -594,7 +594,9 @@ func (c *GrpcClients) EtcdClientCreated(client *EtcdClient) { }() go func() { - client.WaitForConnection() + if err := client.WaitForConnection(context.Background()); err != nil { + panic(err) + } backoff, _ := NewExponentialBackoff(initialWaitDelay, maxWaitDelay) for { diff --git a/proxy_config_etcd.go b/proxy_config_etcd.go index d9ce438..b03ee0b 100644 --- a/proxy_config_etcd.go +++ b/proxy_config_etcd.go @@ -93,23 +93,24 @@ func (p *proxyConfigEtcd) EtcdClientCreated(client *EtcdClient) { }() go func() { - client.WaitForConnection() + if err := client.WaitForConnection(context.Background()); err != nil { + panic(err) + } - waitDelay := initialWaitDelay + backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay) + if err != nil { + panic(err) + } for { response, err := p.getProxyUrls(client, p.keyPrefix) if err != nil { if err == context.DeadlineExceeded { - log.Printf("Timeout getting initial list of proxy URLs, retry in %s", waitDelay) + log.Printf("Timeout getting initial list of proxy URLs, retry in %s", backoff.NextWait()) } else { - log.Printf("Could not get initial list of proxy URLs, retry in %s: %s", waitDelay, err) + log.Printf("Could not get initial list of proxy URLs, retry in %s: %s", backoff.NextWait(), err) } - time.Sleep(waitDelay) - waitDelay = waitDelay * 2 - if waitDelay > maxWaitDelay { - waitDelay = maxWaitDelay - } + backoff.Wait(context.Background()) continue }