From 767d2836008cc219a72b1384279f60457271a589 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 9 Sep 2020 14:42:18 +0200 Subject: [PATCH] Handle case where etcd cluster is not available during startup. --- src/signaling/mcu_proxy.go | 74 +++++++++++++++++++++++++++++++++----- 1 file changed, 66 insertions(+), 8 deletions(-) diff --git a/src/signaling/mcu_proxy.go b/src/signaling/mcu_proxy.go index 191492f..2b83207 100644 --- a/src/signaling/mcu_proxy.go +++ b/src/signaling/mcu_proxy.go @@ -60,6 +60,9 @@ const ( proxyUrlTypeStatic = "static" proxyUrlTypeEtcd = "etcd" + + initialWaitDelay = time.Second + maxWaitDelay = 8 * time.Second ) type mcuProxyPubSubCommon struct { @@ -1084,20 +1087,38 @@ func (m *mcuProxy) configureEtcd(config *goconf.ConfigFile, ignoreErrors bool) e m.client.Store(c) log.Printf("Using proxy URL endpoints %+v", endpoints) - ch := c.Watch(clientv3.WithRequireLeader(context.Background()), keyPrefix, clientv3.WithPrefix()) - go m.processWatches(ch) + go func(client *clientv3.Client) { + log.Printf("Wait for leader and start watching on %s", keyPrefix) + ch := client.Watch(clientv3.WithRequireLeader(context.Background()), keyPrefix, clientv3.WithPrefix()) + log.Printf("Watch created for %s", keyPrefix) + m.processWatches(ch) + }(c) go func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() + m.waitForConnection() + + waitDelay := initialWaitDelay + for { + response, err := m.getProxyUrls(keyPrefix) + if err != nil { + if err == context.DeadlineExceeded { + log.Printf("Timeout getting initial list of proxy URLs, retry in %s", waitDelay) + } else { + log.Printf("Could not get initial list of proxy URLs, retry in %s: %s", waitDelay, err) + } + + time.Sleep(waitDelay) + waitDelay = waitDelay * 2 + if waitDelay > maxWaitDelay { + waitDelay = maxWaitDelay + } + continue + } - response, err := c.Get(ctx, keyPrefix, clientv3.WithPrefix()) - if err != nil { - log.Printf("Could not get initial list of proxy URLs: %s", err) - } else { for _, ev := range response.Kvs { m.addEtcdProxy(string(ev.Key), ev.Value) } + return } }() } @@ -1106,6 +1127,43 @@ func (m *mcuProxy) configureEtcd(config *goconf.ConfigFile, ignoreErrors bool) e return nil } +func (m *mcuProxy) getProxyUrls(keyPrefix string) (*clientv3.GetResponse, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + return m.getEtcdClient().Get(ctx, keyPrefix, clientv3.WithPrefix()) +} + +func (m *mcuProxy) waitForConnection() { + waitDelay := initialWaitDelay + for { + if err := m.syncClient(); err != nil { + if err == context.DeadlineExceeded { + log.Printf("Timeout waiting for etcd client to connect to the cluster, retry in %s", waitDelay) + } else { + log.Printf("Could not sync etcd client with the cluster, retry in %s: %s", waitDelay, err) + } + + time.Sleep(waitDelay) + waitDelay = waitDelay * 2 + if waitDelay > maxWaitDelay { + waitDelay = maxWaitDelay + } + continue + } + + log.Printf("Client using endpoints %+v", m.getEtcdClient().Endpoints()) + return + } +} + +func (m *mcuProxy) syncClient() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + return m.getEtcdClient().Sync(ctx) +} + func (m *mcuProxy) Reload(config *goconf.ConfigFile) { m.connectionsMu.Lock() defer m.connectionsMu.Unlock()