mirror of
https://github.com/strukturag/nextcloud-spreed-signaling
synced 2024-05-10 01:26:32 +02:00
Merge pull request #673 from strukturag/reuse-backoff
Reuse backoff waiting code where possible
This commit is contained in:
commit
df477a7856
|
@ -103,23 +103,24 @@ func (s *backendStorageEtcd) EtcdClientCreated(client *EtcdClient) {
|
|||
}()
|
||||
|
||||
go func() {
|
||||
client.WaitForConnection()
|
||||
if err := client.WaitForConnection(context.Background()); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
waitDelay := initialWaitDelay
|
||||
backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
for {
|
||||
response, err := s.getBackends(client, s.keyPrefix)
|
||||
if err != nil {
|
||||
if err == context.DeadlineExceeded {
|
||||
log.Printf("Timeout getting initial list of backends, retry in %s", waitDelay)
|
||||
log.Printf("Timeout getting initial list of backends, retry in %s", backoff.NextWait())
|
||||
} else {
|
||||
log.Printf("Could not get initial list of backends, retry in %s: %s", waitDelay, err)
|
||||
log.Printf("Could not get initial list of backends, retry in %s: %s", backoff.NextWait(), err)
|
||||
}
|
||||
|
||||
time.Sleep(waitDelay)
|
||||
waitDelay = waitDelay * 2
|
||||
if waitDelay > maxWaitDelay {
|
||||
waitDelay = maxWaitDelay
|
||||
}
|
||||
backoff.Wait(context.Background())
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
@ -212,26 +212,30 @@ func (c *EtcdClient) RemoveListener(listener EtcdClientListener) {
|
|||
delete(c.listeners, listener)
|
||||
}
|
||||
|
||||
func (c *EtcdClient) WaitForConnection() {
|
||||
waitDelay := initialWaitDelay
|
||||
func (c *EtcdClient) WaitForConnection(ctx context.Context) error {
|
||||
backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.syncClient(); err != nil {
|
||||
if err == context.DeadlineExceeded {
|
||||
log.Printf("Timeout waiting for etcd client to connect to the cluster, retry in %s", waitDelay)
|
||||
log.Printf("Timeout waiting for etcd client to connect to the cluster, retry in %s", backoff.NextWait())
|
||||
} else {
|
||||
log.Printf("Could not sync etcd client with the cluster, retry in %s: %s", waitDelay, err)
|
||||
log.Printf("Could not sync etcd client with the cluster, retry in %s: %s", backoff.NextWait(), err)
|
||||
}
|
||||
|
||||
time.Sleep(waitDelay)
|
||||
waitDelay = waitDelay * 2
|
||||
if waitDelay > maxWaitDelay {
|
||||
waitDelay = maxWaitDelay
|
||||
}
|
||||
backoff.Wait(ctx)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("Client synced, using endpoints %+v", c.getEtcdClient().Endpoints())
|
||||
return
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -236,7 +236,10 @@ func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) {
|
|||
|
||||
go func() {
|
||||
defer close(l.initial)
|
||||
client.WaitForConnection()
|
||||
if err := client.WaitForConnection(l.ctx); err != nil {
|
||||
l.t.Errorf("error waiting for connection: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(l.ctx, time.Second)
|
||||
defer cancel()
|
||||
|
|
|
@ -594,7 +594,9 @@ func (c *GrpcClients) EtcdClientCreated(client *EtcdClient) {
|
|||
}()
|
||||
|
||||
go func() {
|
||||
client.WaitForConnection()
|
||||
if err := client.WaitForConnection(context.Background()); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
backoff, _ := NewExponentialBackoff(initialWaitDelay, maxWaitDelay)
|
||||
for {
|
||||
|
|
15
hub.go
15
hub.go
|
@ -417,19 +417,20 @@ func (h *Hub) updateGeoDatabase() {
|
|||
}
|
||||
|
||||
defer h.geoipUpdating.Store(false)
|
||||
delay := time.Second
|
||||
backoff, err := NewExponentialBackoff(time.Second, 5*time.Minute)
|
||||
if err != nil {
|
||||
log.Printf("Could not create exponential backoff: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
for !h.closer.IsClosed() {
|
||||
err := h.geoip.Update()
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
log.Printf("Could not update GeoIP database, will retry later (%s)", err)
|
||||
time.Sleep(delay)
|
||||
delay = delay * 2
|
||||
if delay > 5*time.Minute {
|
||||
delay = 5 * time.Minute
|
||||
}
|
||||
log.Printf("Could not update GeoIP database, will retry in %s (%s)", backoff.NextWait(), err)
|
||||
backoff.Wait(context.Background())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
package signaling
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"log"
|
||||
|
@ -74,33 +75,27 @@ func NewNatsClient(url string) (NatsClient, error) {
|
|||
return NewLoopbackNatsClient()
|
||||
}
|
||||
|
||||
backoff, err := NewExponentialBackoff(initialConnectInterval, maxConnectInterval)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := &natsClient{}
|
||||
|
||||
var err error
|
||||
client.nc, err = nats.Connect(url,
|
||||
nats.ClosedHandler(client.onClosed),
|
||||
nats.DisconnectHandler(client.onDisconnected),
|
||||
nats.ReconnectHandler(client.onReconnected))
|
||||
|
||||
interrupt := make(chan os.Signal, 1)
|
||||
signal.Notify(interrupt, os.Interrupt)
|
||||
defer signal.Stop(interrupt)
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
|
||||
defer stop()
|
||||
|
||||
delay := initialConnectInterval
|
||||
timer := time.NewTimer(delay)
|
||||
// The initial connect must succeed, so we retry in the case of an error.
|
||||
for err != nil {
|
||||
log.Printf("Could not create connection (%s), will retry in %s", err, delay)
|
||||
timer.Reset(delay)
|
||||
select {
|
||||
case <-interrupt:
|
||||
log.Printf("Could not create connection (%s), will retry in %s", err, backoff.NextWait())
|
||||
backoff.Wait(ctx)
|
||||
if ctx.Err() != nil {
|
||||
return nil, fmt.Errorf("interrupted")
|
||||
case <-timer.C:
|
||||
// Retry connection
|
||||
delay = delay * 2
|
||||
if delay > maxConnectInterval {
|
||||
delay = maxConnectInterval
|
||||
}
|
||||
}
|
||||
|
||||
client.nc, err = nats.Connect(url)
|
||||
|
|
|
@ -233,14 +233,15 @@ func (s *ProxyServer) Start(config *goconf.ConfigFile) error {
|
|||
mcuType = signaling.McuTypeDefault
|
||||
}
|
||||
|
||||
interrupt := make(chan os.Signal, 1)
|
||||
signal.Notify(interrupt, os.Interrupt)
|
||||
defer signal.Stop(interrupt)
|
||||
backoff, err := signaling.NewExponentialBackoff(initialMcuRetry, maxMcuRetry)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
|
||||
defer stop()
|
||||
|
||||
var err error
|
||||
var mcu signaling.Mcu
|
||||
mcuRetry := initialMcuRetry
|
||||
mcuRetryTimer := time.NewTimer(mcuRetry)
|
||||
for {
|
||||
switch mcuType {
|
||||
case signaling.McuTypeJanus:
|
||||
|
@ -263,17 +264,10 @@ func (s *ProxyServer) Start(config *goconf.ConfigFile) error {
|
|||
break
|
||||
}
|
||||
|
||||
log.Printf("Could not initialize %s MCU at %s (%s) will retry in %s", mcuType, s.url, err, mcuRetry)
|
||||
mcuRetryTimer.Reset(mcuRetry)
|
||||
select {
|
||||
case <-interrupt:
|
||||
log.Printf("Could not initialize %s MCU at %s (%s) will retry in %s", mcuType, s.url, err, backoff.NextWait())
|
||||
backoff.Wait(ctx)
|
||||
if ctx.Err() != nil {
|
||||
return fmt.Errorf("Cancelled")
|
||||
case <-mcuRetryTimer.C:
|
||||
// Retry connection
|
||||
mcuRetry = mcuRetry * 2
|
||||
if mcuRetry > maxMcuRetry {
|
||||
mcuRetry = maxMcuRetry
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -93,23 +93,24 @@ func (p *proxyConfigEtcd) EtcdClientCreated(client *EtcdClient) {
|
|||
}()
|
||||
|
||||
go func() {
|
||||
client.WaitForConnection()
|
||||
if err := client.WaitForConnection(context.Background()); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
waitDelay := initialWaitDelay
|
||||
backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
for {
|
||||
response, err := p.getProxyUrls(client, p.keyPrefix)
|
||||
if err != nil {
|
||||
if err == context.DeadlineExceeded {
|
||||
log.Printf("Timeout getting initial list of proxy URLs, retry in %s", waitDelay)
|
||||
log.Printf("Timeout getting initial list of proxy URLs, retry in %s", backoff.NextWait())
|
||||
} else {
|
||||
log.Printf("Could not get initial list of proxy URLs, retry in %s: %s", waitDelay, err)
|
||||
log.Printf("Could not get initial list of proxy URLs, retry in %s: %s", backoff.NextWait(), err)
|
||||
}
|
||||
|
||||
time.Sleep(waitDelay)
|
||||
waitDelay = waitDelay * 2
|
||||
if waitDelay > maxWaitDelay {
|
||||
waitDelay = maxWaitDelay
|
||||
}
|
||||
backoff.Wait(context.Background())
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue