/** * Standalone signaling server for the Nextcloud Spreed app. * Copyright (C) 2022 struktur AG * * @author Joachim Bauch * * @license GNU AGPL version 3 or any later version * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package signaling import ( "context" "encoding/json" "errors" "fmt" "log" "net/url" "time" "github.com/dlintw/goconf" clientv3 "go.etcd.io/etcd/client/v3" ) type backendStorageEtcd struct { backendStorageCommon etcdClient *EtcdClient keyPrefix string keyInfos map[string]*BackendInformationEtcd initializedCtx context.Context initializedFunc context.CancelFunc wakeupChanForTesting chan struct{} closeCtx context.Context closeFunc context.CancelFunc } func NewBackendStorageEtcd(config *goconf.ConfigFile, etcdClient *EtcdClient) (BackendStorage, error) { if etcdClient == nil || !etcdClient.IsConfigured() { return nil, fmt.Errorf("no etcd endpoints configured") } keyPrefix, _ := config.GetString("backend", "backendprefix") if keyPrefix == "" { return nil, fmt.Errorf("no backend prefix configured") } initializedCtx, initializedFunc := context.WithCancel(context.Background()) closeCtx, closeFunc := context.WithCancel(context.Background()) result := &backendStorageEtcd{ backendStorageCommon: backendStorageCommon{ backends: make(map[string][]*Backend), }, etcdClient: etcdClient, keyPrefix: keyPrefix, keyInfos: make(map[string]*BackendInformationEtcd), initializedCtx: initializedCtx, initializedFunc: initializedFunc, closeCtx: closeCtx, closeFunc: closeFunc, } etcdClient.AddListener(result) return result, nil } func (s *backendStorageEtcd) WaitForInitialized(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() case <-s.initializedCtx.Done(): return nil } } func (s *backendStorageEtcd) wakeupForTesting() { if s.wakeupChanForTesting == nil { return } select { case s.wakeupChanForTesting <- struct{}{}: default: } } func (s *backendStorageEtcd) EtcdClientCreated(client *EtcdClient) { go func() { if err := client.WaitForConnection(s.closeCtx); err != nil { if errors.Is(err, context.Canceled) { return } panic(err) } backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay) if err != nil { panic(err) } for s.closeCtx.Err() == nil { response, err := s.getBackends(s.closeCtx, client, s.keyPrefix) if err != nil { if errors.Is(err, context.Canceled) { return } else if errors.Is(err, context.DeadlineExceeded) { log.Printf("Timeout getting initial list of backends, retry in %s", backoff.NextWait()) } else { log.Printf("Could not get initial list of backends, retry in %s: %s", backoff.NextWait(), err) } backoff.Wait(s.closeCtx) continue } for _, ev := range response.Kvs { s.EtcdKeyUpdated(client, string(ev.Key), ev.Value, nil) } s.initializedFunc() nextRevision := response.Header.Revision + 1 prevRevision := nextRevision backoff.Reset() for s.closeCtx.Err() == nil { var err error if nextRevision, err = client.Watch(s.closeCtx, s.keyPrefix, nextRevision, s, clientv3.WithPrefix()); err != nil { log.Printf("Error processing watch for %s (%s), retry in %s", s.keyPrefix, err, backoff.NextWait()) backoff.Wait(s.closeCtx) continue } if nextRevision != prevRevision { backoff.Reset() prevRevision = nextRevision } else { log.Printf("Processing watch for %s interrupted, retry in %s", s.keyPrefix, backoff.NextWait()) backoff.Wait(s.closeCtx) } } return } }() } func (s *backendStorageEtcd) EtcdWatchCreated(client *EtcdClient, key string) { } func (s *backendStorageEtcd) getBackends(ctx context.Context, client *EtcdClient, keyPrefix string) (*clientv3.GetResponse, error) { ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() return client.Get(ctx, keyPrefix, clientv3.WithPrefix()) } func (s *backendStorageEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []byte, prevValue []byte) { var info BackendInformationEtcd if err := json.Unmarshal(data, &info); err != nil { log.Printf("Could not decode backend information %s: %s", string(data), err) return } if err := info.CheckValid(); err != nil { log.Printf("Received invalid backend information %s: %s", string(data), err) return } backend := &Backend{ id: key, url: info.Url, parsedUrl: info.parsedUrl, secret: []byte(info.Secret), allowHttp: info.parsedUrl.Scheme == "http", maxStreamBitrate: info.MaxStreamBitrate, maxScreenBitrate: info.MaxScreenBitrate, sessionLimit: info.SessionLimit, } host := info.parsedUrl.Host s.mu.Lock() defer s.mu.Unlock() s.keyInfos[key] = &info entries, found := s.backends[host] if !found { // Simple case, first backend for this host log.Printf("Added backend %s (from %s)", info.Url, key) s.backends[host] = []*Backend{backend} statsBackendsCurrent.Inc() s.wakeupForTesting() return } // Was the backend changed? replaced := false for idx, entry := range entries { if entry.id == key { log.Printf("Updated backend %s (from %s)", info.Url, key) entries[idx] = backend replaced = true break } } if !replaced { // New backend, add to list. log.Printf("Added backend %s (from %s)", info.Url, key) s.backends[host] = append(entries, backend) statsBackendsCurrent.Inc() } s.wakeupForTesting() } func (s *backendStorageEtcd) EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) { s.mu.Lock() defer s.mu.Unlock() info, found := s.keyInfos[key] if !found { return } delete(s.keyInfos, key) host := info.parsedUrl.Host entries, found := s.backends[host] if !found { return } log.Printf("Removing backend %s (from %s)", info.Url, key) newEntries := make([]*Backend, 0, len(entries)-1) for _, entry := range entries { if entry.id == key { statsBackendsCurrent.Dec() continue } newEntries = append(newEntries, entry) } if len(newEntries) > 0 { s.backends[host] = newEntries } else { delete(s.backends, host) } s.wakeupForTesting() } func (s *backendStorageEtcd) Close() { s.etcdClient.RemoveListener(s) s.closeFunc() } func (s *backendStorageEtcd) Reload(config *goconf.ConfigFile) { // Backend updates are processed through etcd. } func (s *backendStorageEtcd) GetCompatBackend() *Backend { return nil } func (s *backendStorageEtcd) GetBackend(u *url.URL) *Backend { s.mu.RLock() defer s.mu.RUnlock() return s.getBackendLocked(u) }