Handle case where etcd cluster is not available during startup.

This commit is contained in:
Joachim Bauch 2020-09-09 14:42:18 +02:00
parent adce45162e
commit 767d283600
Failed to extract signature
1 changed files with 66 additions and 8 deletions

View File

@ -60,6 +60,9 @@ const (
proxyUrlTypeStatic = "static"
proxyUrlTypeEtcd = "etcd"
initialWaitDelay = time.Second
maxWaitDelay = 8 * time.Second
)
type mcuProxyPubSubCommon struct {
@ -1084,20 +1087,38 @@ func (m *mcuProxy) configureEtcd(config *goconf.ConfigFile, ignoreErrors bool) e
m.client.Store(c)
log.Printf("Using proxy URL endpoints %+v", endpoints)
ch := c.Watch(clientv3.WithRequireLeader(context.Background()), keyPrefix, clientv3.WithPrefix())
go m.processWatches(ch)
go func(client *clientv3.Client) {
log.Printf("Wait for leader and start watching on %s", keyPrefix)
ch := client.Watch(clientv3.WithRequireLeader(context.Background()), keyPrefix, clientv3.WithPrefix())
log.Printf("Watch created for %s", keyPrefix)
m.processWatches(ch)
}(c)
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
m.waitForConnection()
waitDelay := initialWaitDelay
for {
response, err := m.getProxyUrls(keyPrefix)
if err != nil {
if err == context.DeadlineExceeded {
log.Printf("Timeout getting initial list of proxy URLs, retry in %s", waitDelay)
} else {
log.Printf("Could not get initial list of proxy URLs, retry in %s: %s", waitDelay, err)
}
time.Sleep(waitDelay)
waitDelay = waitDelay * 2
if waitDelay > maxWaitDelay {
waitDelay = maxWaitDelay
}
continue
}
response, err := c.Get(ctx, keyPrefix, clientv3.WithPrefix())
if err != nil {
log.Printf("Could not get initial list of proxy URLs: %s", err)
} else {
for _, ev := range response.Kvs {
m.addEtcdProxy(string(ev.Key), ev.Value)
}
return
}
}()
}
@ -1106,6 +1127,43 @@ func (m *mcuProxy) configureEtcd(config *goconf.ConfigFile, ignoreErrors bool) e
return nil
}
func (m *mcuProxy) getProxyUrls(keyPrefix string) (*clientv3.GetResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
return m.getEtcdClient().Get(ctx, keyPrefix, clientv3.WithPrefix())
}
func (m *mcuProxy) waitForConnection() {
waitDelay := initialWaitDelay
for {
if err := m.syncClient(); err != nil {
if err == context.DeadlineExceeded {
log.Printf("Timeout waiting for etcd client to connect to the cluster, retry in %s", waitDelay)
} else {
log.Printf("Could not sync etcd client with the cluster, retry in %s: %s", waitDelay, err)
}
time.Sleep(waitDelay)
waitDelay = waitDelay * 2
if waitDelay > maxWaitDelay {
waitDelay = maxWaitDelay
}
continue
}
log.Printf("Client using endpoints %+v", m.getEtcdClient().Endpoints())
return
}
}
func (m *mcuProxy) syncClient() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
return m.getEtcdClient().Sync(ctx)
}
func (m *mcuProxy) Reload(config *goconf.ConfigFile) {
m.connectionsMu.Lock()
defer m.connectionsMu.Unlock()