From 24eab34da70fadbed0f593354bfaf82889deca9a Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 30 Jun 2022 11:34:32 +0200 Subject: [PATCH] Allow configuring backends through etcd. --- api_backend.go | 39 ++++ backend_client.go | 8 +- backend_client_test.go | 6 +- backend_configuration.go | 351 +++++++++------------------------- backend_configuration_test.go | 188 ++++++++++++++++-- backend_server_test.go | 6 +- backend_storage_etcd.go | 256 +++++++++++++++++++++++++ backend_storage_static.go | 303 +++++++++++++++++++++++++++++ hub.go | 5 +- hub_test.go | 6 +- room_ping_test.go | 2 +- server.conf.in | 26 +++ server/main.go | 2 +- 13 files changed, 903 insertions(+), 295 deletions(-) create mode 100644 backend_storage_etcd.go create mode 100644 backend_storage_static.go diff --git a/api_backend.go b/api_backend.go index 58ee9c7..901e035 100644 --- a/api_backend.go +++ b/api_backend.go @@ -28,7 +28,10 @@ import ( "crypto/subtle" "encoding/hex" "encoding/json" + "fmt" "net/http" + "net/url" + "strings" ) const ( @@ -321,3 +324,39 @@ type TurnCredentials struct { TTL int64 `json:"ttl"` URIs []string `json:"uris"` } + +// Information on a backend in the etcd cluster. + +type BackendInformationEtcd struct { + parsedUrl *url.URL + + Url string `json:"url"` + Secret string `json:"secret"` + + MaxStreamBitrate int `json:"maxstreambitrate,omitempty"` + MaxScreenBitrate int `json:"maxscreenbitrate,omitempty"` + + SessionLimit uint64 `json:"sessionlimit,omitempty"` +} + +func (p *BackendInformationEtcd) CheckValid() error { + if p.Url == "" { + return fmt.Errorf("url missing") + } + if p.Secret == "" { + return fmt.Errorf("secret missing") + } + + parsedUrl, err := url.Parse(p.Url) + if err != nil { + return fmt.Errorf("invalid url: %w", err) + } + + if strings.Contains(parsedUrl.Host, ":") && hasStandardPort(parsedUrl) { + parsedUrl.Host = parsedUrl.Hostname() + p.Url = parsedUrl.String() + } + + p.parsedUrl = parsedUrl + return nil +} diff --git a/backend_client.go b/backend_client.go index f9cf3a9..2e3fb3b 100644 --- a/backend_client.go +++ b/backend_client.go @@ -50,8 +50,8 @@ type BackendClient struct { capabilities *Capabilities } -func NewBackendClient(config *goconf.ConfigFile, maxConcurrentRequestsPerHost int, version string) (*BackendClient, error) { - backends, err := NewBackendConfiguration(config) +func NewBackendClient(config *goconf.ConfigFile, maxConcurrentRequestsPerHost int, version string, etcdClient *EtcdClient) (*BackendClient, error) { + backends, err := NewBackendConfiguration(config, etcdClient) if err != nil { return nil, err } @@ -80,6 +80,10 @@ func NewBackendClient(config *goconf.ConfigFile, maxConcurrentRequestsPerHost in }, nil } +func (b *BackendClient) Close() { + b.backends.Close() +} + func (b *BackendClient) Reload(config *goconf.ConfigFile) { b.backends.Reload(config) } diff --git a/backend_client_test.go b/backend_client_test.go index c9ff35e..5bae7d5 100644 --- a/backend_client_test.go +++ b/backend_client_test.go @@ -95,7 +95,7 @@ func TestPostOnRedirect(t *testing.T) { if u.Scheme == "http" { config.AddOption("backend", "allowhttp", "true") } - client, err := NewBackendClient(config, 1, "0.0") + client, err := NewBackendClient(config, 1, "0.0", nil) if err != nil { t.Fatal(err) } @@ -134,7 +134,7 @@ func TestPostOnRedirectDifferentHost(t *testing.T) { if u.Scheme == "http" { config.AddOption("backend", "allowhttp", "true") } - client, err := NewBackendClient(config, 1, "0.0") + client, err := NewBackendClient(config, 1, "0.0", nil) if err != nil { t.Fatal(err) } @@ -187,7 +187,7 @@ func TestPostOnRedirectStatusFound(t *testing.T) { if u.Scheme == "http" { config.AddOption("backend", "allowhttp", "true") } - client, err := NewBackendClient(config, 1, "0.0") + client, err := NewBackendClient(config, 1, "0.0", nil) if err != nil { t.Fatal(err) } diff --git a/backend_configuration.go b/backend_configuration.go index 4cde69f..bc69ff7 100644 --- a/backend_configuration.go +++ b/backend_configuration.go @@ -22,15 +22,21 @@ package signaling import ( - "log" + "fmt" "net/url" - "reflect" "strings" "sync" "github.com/dlintw/goconf" ) +const ( + BackendTypeStatic = "static" + BackendTypeEtcd = "etcd" + + DefaultBackendType = BackendTypeStatic +) + var ( SessionLimitExceeded = NewError("session_limit_exceeded", "Too many sessions connected for this backend.") ) @@ -105,271 +111,43 @@ func (b *Backend) RemoveSession(session Session) { delete(b.sessions, session.PublicId()) } -type BackendConfiguration struct { +type BackendStorage interface { + Close() + Reload(config *goconf.ConfigFile) + + GetCompatBackend() *Backend + GetBackend(u *url.URL) *Backend + GetBackends() []*Backend +} + +type backendStorageCommon struct { + mu sync.RWMutex backends map[string][]*Backend - - // Deprecated - allowAll bool - commonSecret []byte - compatBackend *Backend } -func NewBackendConfiguration(config *goconf.ConfigFile) (*BackendConfiguration, error) { - allowAll, _ := config.GetBool("backend", "allowall") - allowHttp, _ := config.GetBool("backend", "allowhttp") - commonSecret, _ := config.GetString("backend", "secret") - sessionLimit, err := config.GetInt("backend", "sessionlimit") - if err != nil || sessionLimit < 0 { - sessionLimit = 0 +func (s *backendStorageCommon) GetBackends() []*Backend { + s.mu.RLock() + defer s.mu.RUnlock() + + var result []*Backend + for _, entries := range s.backends { + result = append(result, entries...) } - backends := make(map[string][]*Backend) - var compatBackend *Backend - numBackends := 0 - if allowAll { - log.Println("WARNING: All backend hostnames are allowed, only use for development!") - compatBackend = &Backend{ - id: "compat", - secret: []byte(commonSecret), - compat: true, - - allowHttp: allowHttp, - - sessionLimit: uint64(sessionLimit), - } - if sessionLimit > 0 { - log.Printf("Allow a maximum of %d sessions", sessionLimit) - } - numBackends++ - } else if backendIds, _ := config.GetString("backend", "backends"); backendIds != "" { - for host, configuredBackends := range getConfiguredHosts(backendIds, config) { - backends[host] = append(backends[host], configuredBackends...) - for _, be := range configuredBackends { - log.Printf("Backend %s added for %s", be.id, be.url) - } - numBackends += len(configuredBackends) - } - } else if allowedUrls, _ := config.GetString("backend", "allowed"); allowedUrls != "" { - // Old-style configuration, only hosts are configured and are using a common secret. - allowMap := make(map[string]bool) - for _, u := range strings.Split(allowedUrls, ",") { - u = strings.TrimSpace(u) - if idx := strings.IndexByte(u, '/'); idx != -1 { - log.Printf("WARNING: Removing path from allowed hostname \"%s\", check your configuration!", u) - u = u[:idx] - } - if u != "" { - allowMap[strings.ToLower(u)] = true - } - } - - if len(allowMap) == 0 { - log.Println("WARNING: No backend hostnames are allowed, check your configuration!") - } else { - compatBackend = &Backend{ - id: "compat", - secret: []byte(commonSecret), - compat: true, - - allowHttp: allowHttp, - - sessionLimit: uint64(sessionLimit), - } - hosts := make([]string, 0, len(allowMap)) - for host := range allowMap { - hosts = append(hosts, host) - backends[host] = []*Backend{compatBackend} - } - if len(hosts) > 1 { - log.Println("WARNING: Using deprecated backend configuration. Please migrate the \"allowed\" setting to the new \"backends\" configuration.") - } - log.Printf("Allowed backend hostnames: %s", hosts) - if sessionLimit > 0 { - log.Printf("Allow a maximum of %d sessions", sessionLimit) - } - numBackends++ - } - } - - RegisterBackendConfigurationStats() - statsBackendsCurrent.Add(float64(numBackends)) - - return &BackendConfiguration{ - backends: backends, - - allowAll: allowAll, - commonSecret: []byte(commonSecret), - compatBackend: compatBackend, - }, nil + return result } -func (b *BackendConfiguration) RemoveBackendsForHost(host string) { - if oldBackends := b.backends[host]; len(oldBackends) > 0 { - for _, backend := range oldBackends { - log.Printf("Backend %s removed for %s", backend.id, backend.url) - } - statsBackendsCurrent.Sub(float64(len(oldBackends))) - } - delete(b.backends, host) -} +func (s *backendStorageCommon) getBackendLocked(u *url.URL) *Backend { + s.mu.RLock() + defer s.mu.RUnlock() -func (b *BackendConfiguration) UpsertHost(host string, backends []*Backend) { - for existingIndex, existingBackend := range b.backends[host] { - found := false - index := 0 - for _, newBackend := range backends { - if reflect.DeepEqual(existingBackend, newBackend) { // otherwise we could manually compare the struct members here - found = true - backends = append(backends[:index], backends[index+1:]...) - break - } else if newBackend.id == existingBackend.id { - found = true - b.backends[host][existingIndex] = newBackend - backends = append(backends[:index], backends[index+1:]...) - log.Printf("Backend %s updated for %s", newBackend.id, newBackend.url) - break - } - index++ - } - if !found { - removed := b.backends[host][existingIndex] - log.Printf("Backend %s removed for %s", removed.id, removed.url) - b.backends[host] = append(b.backends[host][:existingIndex], b.backends[host][existingIndex+1:]...) - statsBackendsCurrent.Dec() - } - } - - b.backends[host] = append(b.backends[host], backends...) - for _, added := range backends { - log.Printf("Backend %s added for %s", added.id, added.url) - } - statsBackendsCurrent.Add(float64(len(backends))) -} - -func getConfiguredBackendIDs(backendIds string) (ids []string) { - seen := make(map[string]bool) - - for _, id := range strings.Split(backendIds, ",") { - id = strings.TrimSpace(id) - if id == "" { - continue - } - - if seen[id] { - continue - } - ids = append(ids, id) - seen[id] = true - } - - return ids -} - -func getConfiguredHosts(backendIds string, config *goconf.ConfigFile) (hosts map[string][]*Backend) { - hosts = make(map[string][]*Backend) - for _, id := range getConfiguredBackendIDs(backendIds) { - u, _ := config.GetString(id, "url") - if u == "" { - log.Printf("Backend %s is missing or incomplete, skipping", id) - continue - } - - if u[len(u)-1] != '/' { - u += "/" - } - parsed, err := url.Parse(u) - if err != nil { - log.Printf("Backend %s has an invalid url %s configured (%s), skipping", id, u, err) - continue - } - - if strings.Contains(parsed.Host, ":") && hasStandardPort(parsed) { - parsed.Host = parsed.Hostname() - u = parsed.String() - } - - secret, _ := config.GetString(id, "secret") - if u == "" || secret == "" { - log.Printf("Backend %s is missing or incomplete, skipping", id) - continue - } - - sessionLimit, err := config.GetInt(id, "sessionlimit") - if err != nil || sessionLimit < 0 { - sessionLimit = 0 - } - if sessionLimit > 0 { - log.Printf("Backend %s allows a maximum of %d sessions", id, sessionLimit) - } - - maxStreamBitrate, err := config.GetInt(id, "maxstreambitrate") - if err != nil || maxStreamBitrate < 0 { - maxStreamBitrate = 0 - } - maxScreenBitrate, err := config.GetInt(id, "maxscreenbitrate") - if err != nil || maxScreenBitrate < 0 { - maxScreenBitrate = 0 - } - - hosts[parsed.Host] = append(hosts[parsed.Host], &Backend{ - id: id, - url: u, - secret: []byte(secret), - - allowHttp: parsed.Scheme == "http", - - maxStreamBitrate: maxStreamBitrate, - maxScreenBitrate: maxScreenBitrate, - - sessionLimit: uint64(sessionLimit), - }) - } - - return hosts -} - -func (b *BackendConfiguration) Reload(config *goconf.ConfigFile) { - if b.compatBackend != nil { - log.Println("Old-style configuration active, reload is not supported") - return - } - - if backendIds, _ := config.GetString("backend", "backends"); backendIds != "" { - configuredHosts := getConfiguredHosts(backendIds, config) - - // remove backends that are no longer configured - for hostname := range b.backends { - if _, ok := configuredHosts[hostname]; !ok { - b.RemoveBackendsForHost(hostname) - } - } - - // rewrite backends adding newly configured ones and rewriting existing ones - for hostname, configuredBackends := range configuredHosts { - b.UpsertHost(hostname, configuredBackends) - } - } -} - -func (b *BackendConfiguration) GetCompatBackend() *Backend { - return b.compatBackend -} - -func (b *BackendConfiguration) GetBackend(u *url.URL) *Backend { - if strings.Contains(u.Host, ":") && hasStandardPort(u) { - u.Host = u.Hostname() - } - - entries, found := b.backends[u.Host] + entries, found := s.backends[u.Host] if !found { - if b.allowAll { - return b.compatBackend - } return nil } - s := u.String() - if s[len(s)-1] != '/' { - s += "/" + url := u.String() + if url[len(url)-1] != '/' { + url += "/" } for _, entry := range entries { if !entry.IsUrlAllowed(u) { @@ -379,7 +157,7 @@ func (b *BackendConfiguration) GetBackend(u *url.URL) *Backend { if entry.url == "" { // Old-style configuration, only hosts are configured. return entry - } else if strings.HasPrefix(s, entry.url) { + } else if strings.HasPrefix(url, entry.url) { return entry } } @@ -387,12 +165,59 @@ func (b *BackendConfiguration) GetBackend(u *url.URL) *Backend { return nil } -func (b *BackendConfiguration) GetBackends() []*Backend { - var result []*Backend - for _, entries := range b.backends { - result = append(result, entries...) +type BackendConfiguration struct { + storage BackendStorage +} + +func NewBackendConfiguration(config *goconf.ConfigFile, etcdClient *EtcdClient) (*BackendConfiguration, error) { + backendType, _ := config.GetString("backend", "backendtype") + if backendType == "" { + backendType = DefaultBackendType } - return result + + RegisterBackendConfigurationStats() + + var storage BackendStorage + var err error + switch backendType { + case BackendTypeStatic: + storage, err = NewBackendStorageStatic(config) + case BackendTypeEtcd: + storage, err = NewBackendStorageEtcd(config, etcdClient) + default: + err = fmt.Errorf("unknown backend type: %s", backendType) + } + if err != nil { + return nil, err + } + + return &BackendConfiguration{ + storage: storage, + }, nil +} + +func (b *BackendConfiguration) Close() { + b.storage.Close() +} + +func (b *BackendConfiguration) Reload(config *goconf.ConfigFile) { + b.storage.Reload(config) +} + +func (b *BackendConfiguration) GetCompatBackend() *Backend { + return b.storage.GetCompatBackend() +} + +func (b *BackendConfiguration) GetBackend(u *url.URL) *Backend { + if strings.Contains(u.Host, ":") && hasStandardPort(u) { + u.Host = u.Hostname() + } + + return b.storage.GetBackend(u) +} + +func (b *BackendConfiguration) GetBackends() []*Backend { + return b.storage.GetBackends() } func (b *BackendConfiguration) IsUrlAllowed(u *url.URL) bool { @@ -416,5 +241,5 @@ func (b *BackendConfiguration) GetSecret(u *url.URL) []byte { return nil } - return entry.secret + return entry.Secret() } diff --git a/backend_configuration_test.go b/backend_configuration_test.go index 17e7508..d06cf6b 100644 --- a/backend_configuration_test.go +++ b/backend_configuration_test.go @@ -23,8 +23,10 @@ package signaling import ( "bytes" + "context" "net/url" "reflect" + "sort" "testing" "github.com/dlintw/goconf" @@ -104,7 +106,7 @@ func TestIsUrlAllowed_Compat(t *testing.T) { config.AddOption("backend", "allowed", "domain.invalid") config.AddOption("backend", "allowhttp", "true") config.AddOption("backend", "secret", string(testBackendSecret)) - cfg, err := NewBackendConfiguration(config) + cfg, err := NewBackendConfiguration(config, nil) if err != nil { t.Fatal(err) } @@ -125,7 +127,7 @@ func TestIsUrlAllowed_CompatForceHttps(t *testing.T) { config := goconf.NewConfigFile() config.AddOption("backend", "allowed", "domain.invalid") config.AddOption("backend", "secret", string(testBackendSecret)) - cfg, err := NewBackendConfiguration(config) + cfg, err := NewBackendConfiguration(config, nil) if err != nil { t.Fatal(err) } @@ -170,7 +172,7 @@ func TestIsUrlAllowed(t *testing.T) { config.AddOption("baz", "secret", string(testBackendSecret)+"-baz") config.AddOption("lala", "url", "https://otherdomain.invalid/") config.AddOption("lala", "secret", string(testBackendSecret)+"-lala") - cfg, err := NewBackendConfiguration(config) + cfg, err := NewBackendConfiguration(config, nil) if err != nil { t.Fatal(err) } @@ -187,7 +189,7 @@ func TestIsUrlAllowed_EmptyAllowlist(t *testing.T) { config := goconf.NewConfigFile() config.AddOption("backend", "allowed", "") config.AddOption("backend", "secret", string(testBackendSecret)) - cfg, err := NewBackendConfiguration(config) + cfg, err := NewBackendConfiguration(config, nil) if err != nil { t.Fatal(err) } @@ -207,7 +209,7 @@ func TestIsUrlAllowed_AllowAll(t *testing.T) { config.AddOption("backend", "allowall", "true") config.AddOption("backend", "allowed", "") config.AddOption("backend", "secret", string(testBackendSecret)) - cfg, err := NewBackendConfiguration(config) + cfg, err := NewBackendConfiguration(config, nil) if err != nil { t.Fatal(err) } @@ -247,7 +249,7 @@ func TestBackendReloadNoChange(t *testing.T) { original_config.AddOption("backend1", "secret", string(testBackendSecret)+"-backend1") original_config.AddOption("backend2", "url", "http://domain2.invalid") original_config.AddOption("backend2", "secret", string(testBackendSecret)+"-backend2") - o_cfg, err := NewBackendConfiguration(original_config) + o_cfg, err := NewBackendConfiguration(original_config, nil) if err != nil { t.Fatal(err) } @@ -260,7 +262,7 @@ func TestBackendReloadNoChange(t *testing.T) { new_config.AddOption("backend1", "secret", string(testBackendSecret)+"-backend1") new_config.AddOption("backend2", "url", "http://domain2.invalid") new_config.AddOption("backend2", "secret", string(testBackendSecret)+"-backend2") - n_cfg, err := NewBackendConfiguration(new_config) + n_cfg, err := NewBackendConfiguration(new_config, nil) if err != nil { t.Fatal(err) } @@ -282,7 +284,7 @@ func TestBackendReloadChangeExistingURL(t *testing.T) { original_config.AddOption("backend1", "secret", string(testBackendSecret)+"-backend1") original_config.AddOption("backend2", "url", "http://domain2.invalid") original_config.AddOption("backend2", "secret", string(testBackendSecret)+"-backend2") - o_cfg, err := NewBackendConfiguration(original_config) + o_cfg, err := NewBackendConfiguration(original_config, nil) if err != nil { t.Fatal(err) } @@ -296,7 +298,7 @@ func TestBackendReloadChangeExistingURL(t *testing.T) { new_config.AddOption("backend1", "sessionlimit", "10") new_config.AddOption("backend2", "url", "http://domain2.invalid") new_config.AddOption("backend2", "secret", string(testBackendSecret)+"-backend2") - n_cfg, err := NewBackendConfiguration(new_config) + n_cfg, err := NewBackendConfiguration(new_config, nil) if err != nil { t.Fatal(err) } @@ -322,7 +324,7 @@ func TestBackendReloadChangeSecret(t *testing.T) { original_config.AddOption("backend1", "secret", string(testBackendSecret)+"-backend1") original_config.AddOption("backend2", "url", "http://domain2.invalid") original_config.AddOption("backend2", "secret", string(testBackendSecret)+"-backend2") - o_cfg, err := NewBackendConfiguration(original_config) + o_cfg, err := NewBackendConfiguration(original_config, nil) if err != nil { t.Fatal(err) } @@ -335,7 +337,7 @@ func TestBackendReloadChangeSecret(t *testing.T) { new_config.AddOption("backend1", "secret", string(testBackendSecret)+"-backend3") new_config.AddOption("backend2", "url", "http://domain2.invalid") new_config.AddOption("backend2", "secret", string(testBackendSecret)+"-backend2") - n_cfg, err := NewBackendConfiguration(new_config) + n_cfg, err := NewBackendConfiguration(new_config, nil) if err != nil { t.Fatal(err) } @@ -358,7 +360,7 @@ func TestBackendReloadAddBackend(t *testing.T) { original_config.AddOption("backend", "allowall", "false") original_config.AddOption("backend1", "url", "http://domain1.invalid") original_config.AddOption("backend1", "secret", string(testBackendSecret)+"-backend1") - o_cfg, err := NewBackendConfiguration(original_config) + o_cfg, err := NewBackendConfiguration(original_config, nil) if err != nil { t.Fatal(err) } @@ -372,7 +374,7 @@ func TestBackendReloadAddBackend(t *testing.T) { new_config.AddOption("backend2", "url", "http://domain2.invalid") new_config.AddOption("backend2", "secret", string(testBackendSecret)+"-backend2") new_config.AddOption("backend2", "sessionlimit", "10") - n_cfg, err := NewBackendConfiguration(new_config) + n_cfg, err := NewBackendConfiguration(new_config, nil) if err != nil { t.Fatal(err) } @@ -400,7 +402,7 @@ func TestBackendReloadRemoveHost(t *testing.T) { original_config.AddOption("backend1", "secret", string(testBackendSecret)+"-backend1") original_config.AddOption("backend2", "url", "http://domain2.invalid") original_config.AddOption("backend2", "secret", string(testBackendSecret)+"-backend2") - o_cfg, err := NewBackendConfiguration(original_config) + o_cfg, err := NewBackendConfiguration(original_config, nil) if err != nil { t.Fatal(err) } @@ -411,7 +413,7 @@ func TestBackendReloadRemoveHost(t *testing.T) { new_config.AddOption("backend", "allowall", "false") new_config.AddOption("backend1", "url", "http://domain1.invalid") new_config.AddOption("backend1", "secret", string(testBackendSecret)+"-backend1") - n_cfg, err := NewBackendConfiguration(new_config) + n_cfg, err := NewBackendConfiguration(new_config, nil) if err != nil { t.Fatal(err) } @@ -437,7 +439,7 @@ func TestBackendReloadRemoveBackendFromSharedHost(t *testing.T) { original_config.AddOption("backend1", "secret", string(testBackendSecret)+"-backend1") original_config.AddOption("backend2", "url", "http://domain1.invalid/bar/") original_config.AddOption("backend2", "secret", string(testBackendSecret)+"-backend2") - o_cfg, err := NewBackendConfiguration(original_config) + o_cfg, err := NewBackendConfiguration(original_config, nil) if err != nil { t.Fatal(err) } @@ -448,7 +450,7 @@ func TestBackendReloadRemoveBackendFromSharedHost(t *testing.T) { new_config.AddOption("backend", "allowall", "false") new_config.AddOption("backend1", "url", "http://domain1.invalid/foo/") new_config.AddOption("backend1", "secret", string(testBackendSecret)+"-backend1") - n_cfg, err := NewBackendConfiguration(new_config) + n_cfg, err := NewBackendConfiguration(new_config, nil) if err != nil { t.Fatal(err) } @@ -464,3 +466,155 @@ func TestBackendReloadRemoveBackendFromSharedHost(t *testing.T) { t.Error("BackendConfiguration should be equal after Reload") } } + +func sortBackends(backends []*Backend) []*Backend { + result := make([]*Backend, len(backends)) + copy(result, backends) + + sort.Slice(result, func(i, j int) bool { + return result[i].Id() < result[j].Id() + }) + return result +} + +func mustParse(s string) *url.URL { + p, err := url.Parse(s) + if err != nil { + panic(err) + } + return p +} + +func TestBackendConfiguration_Etcd(t *testing.T) { + etcd, client := NewEtcdClientForTest(t) + + url1 := "https://domain1.invalid/foo" + initialSecret1 := string(testBackendSecret) + "-backend1-initial" + secret1 := string(testBackendSecret) + "-backend1" + + SetEtcdValue(etcd, "/backends/1_one", []byte("{\"url\":\""+url1+"\",\"secret\":\""+initialSecret1+"\"}")) + + config := goconf.NewConfigFile() + config.AddOption("backend", "backendtype", "etcd") + config.AddOption("backend", "backendprefix", "/backends") + + cfg, err := NewBackendConfiguration(config, client) + if err != nil { + t.Fatal(err) + } + defer cfg.Close() + + storage := cfg.storage.(*backendStorageEtcd) + ch := make(chan bool, 1) + storage.SetWakeupForTesting(ch) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + if err := storage.WaitForInitialized(ctx); err != nil { + t.Fatal(err) + } + + if backends := sortBackends(cfg.GetBackends()); len(backends) != 1 { + t.Errorf("Expected one backend, got %+v", backends) + } else if backends[0].url != url1 { + t.Errorf("Expected backend url %s, got %s", url1, backends[0].url) + } else if string(backends[0].secret) != initialSecret1 { + t.Errorf("Expected backend secret %s, got %s", initialSecret1, string(backends[0].secret)) + } else if backend := cfg.GetBackend(mustParse(url1)); backend != backends[0] { + t.Errorf("Expected backend %+v, got %+v", backends[0], backend) + } + + drainWakeupChannel(ch) + SetEtcdValue(etcd, "/backends/1_one", []byte("{\"url\":\""+url1+"\",\"secret\":\""+secret1+"\"}")) + <-ch + if backends := sortBackends(cfg.GetBackends()); len(backends) != 1 { + t.Errorf("Expected one backend, got %+v", backends) + } else if backends[0].url != url1 { + t.Errorf("Expected backend url %s, got %s", url1, backends[0].url) + } else if string(backends[0].secret) != secret1 { + t.Errorf("Expected backend secret %s, got %s", secret1, string(backends[0].secret)) + } else if backend := cfg.GetBackend(mustParse(url1)); backend != backends[0] { + t.Errorf("Expected backend %+v, got %+v", backends[0], backend) + } + + url2 := "https://domain1.invalid/bar" + secret2 := string(testBackendSecret) + "-backend2" + + drainWakeupChannel(ch) + SetEtcdValue(etcd, "/backends/2_two", []byte("{\"url\":\""+url2+"\",\"secret\":\""+secret2+"\"}")) + <-ch + if backends := sortBackends(cfg.GetBackends()); len(backends) != 2 { + t.Errorf("Expected two backends, got %+v", backends) + } else if backends[0].url != url1 { + t.Errorf("Expected backend url %s, got %s", url1, backends[0].url) + } else if string(backends[0].secret) != secret1 { + t.Errorf("Expected backend secret %s, got %s", secret1, string(backends[0].secret)) + } else if backends[1].url != url2 { + t.Errorf("Expected backend url %s, got %s", url2, backends[1].url) + } else if string(backends[1].secret) != secret2 { + t.Errorf("Expected backend secret %s, got %s", secret2, string(backends[1].secret)) + } else if backend := cfg.GetBackend(mustParse(url1)); backend != backends[0] { + t.Errorf("Expected backend %+v, got %+v", backends[0], backend) + } else if backend := cfg.GetBackend(mustParse(url2)); backend != backends[1] { + t.Errorf("Expected backend %+v, got %+v", backends[1], backend) + } + + url3 := "https://domain2.invalid/foo" + secret3 := string(testBackendSecret) + "-backend3" + + drainWakeupChannel(ch) + SetEtcdValue(etcd, "/backends/3_three", []byte("{\"url\":\""+url3+"\",\"secret\":\""+secret3+"\"}")) + <-ch + if backends := sortBackends(cfg.GetBackends()); len(backends) != 3 { + t.Errorf("Expected three backends, got %+v", backends) + } else if backends[0].url != url1 { + t.Errorf("Expected backend url %s, got %s", url1, backends[0].url) + } else if string(backends[0].secret) != secret1 { + t.Errorf("Expected backend secret %s, got %s", secret1, string(backends[0].secret)) + } else if backends[1].url != url2 { + t.Errorf("Expected backend url %s, got %s", url2, backends[1].url) + } else if string(backends[1].secret) != secret2 { + t.Errorf("Expected backend secret %s, got %s", secret2, string(backends[1].secret)) + } else if backends[2].url != url3 { + t.Errorf("Expected backend url %s, got %s", url3, backends[2].url) + } else if string(backends[2].secret) != secret3 { + t.Errorf("Expected backend secret %s, got %s", secret3, string(backends[2].secret)) + } else if backend := cfg.GetBackend(mustParse(url1)); backend != backends[0] { + t.Errorf("Expected backend %+v, got %+v", backends[0], backend) + } else if backend := cfg.GetBackend(mustParse(url2)); backend != backends[1] { + t.Errorf("Expected backend %+v, got %+v", backends[1], backend) + } else if backend := cfg.GetBackend(mustParse(url3)); backend != backends[2] { + t.Errorf("Expected backend %+v, got %+v", backends[2], backend) + } + + drainWakeupChannel(ch) + DeleteEtcdValue(etcd, "/backends/1_one") + <-ch + if backends := sortBackends(cfg.GetBackends()); len(backends) != 2 { + t.Errorf("Expected two backends, got %+v", backends) + } else if backends[0].url != url2 { + t.Errorf("Expected backend url %s, got %s", url2, backends[0].url) + } else if string(backends[0].secret) != secret2 { + t.Errorf("Expected backend secret %s, got %s", secret2, string(backends[0].secret)) + } else if backends[1].url != url3 { + t.Errorf("Expected backend url %s, got %s", url3, backends[1].url) + } else if string(backends[1].secret) != secret3 { + t.Errorf("Expected backend secret %s, got %s", secret3, string(backends[1].secret)) + } + + drainWakeupChannel(ch) + DeleteEtcdValue(etcd, "/backends/2_two") + <-ch + if backends := sortBackends(cfg.GetBackends()); len(backends) != 1 { + t.Errorf("Expected one backend, got %+v", backends) + } else if backends[0].url != url3 { + t.Errorf("Expected backend url %s, got %s", url3, backends[0].url) + } else if string(backends[0].secret) != secret3 { + t.Errorf("Expected backend secret %s, got %s", secret3, string(backends[0].secret)) + } + + if _, found := storage.backends["domain1.invalid"]; found { + t.Errorf("Should have removed host information for %s", "domain1.invalid") + } +} diff --git a/backend_server_test.go b/backend_server_test.go index 40416f0..e9e0068 100644 --- a/backend_server_test.go +++ b/backend_server_test.go @@ -88,7 +88,7 @@ func CreateBackendServerForTestFromConfig(t *testing.T, config *goconf.ConfigFil config.AddOption("clients", "internalsecret", string(testInternalSecret)) config.AddOption("geoip", "url", "none") events := getAsyncEventsForTest(t) - hub, err := NewHub(config, events, nil, nil, r, "no-version") + hub, err := NewHub(config, events, nil, nil, nil, r, "no-version") if err != nil { t.Fatal(err) } @@ -162,7 +162,7 @@ func CreateBackendServerWithClusteringForTestFromConfig(t *testing.T, config1 *g events1.Close() }) client1 := NewGrpcClientsForTest(t, addr2) - hub1, err := NewHub(config1, events1, grpcServer1, client1, r1, "no-version") + hub1, err := NewHub(config1, events1, grpcServer1, client1, nil, r1, "no-version") if err != nil { t.Fatal(err) } @@ -191,7 +191,7 @@ func CreateBackendServerWithClusteringForTestFromConfig(t *testing.T, config1 *g events2.Close() }) client2 := NewGrpcClientsForTest(t, addr1) - hub2, err := NewHub(config2, events2, grpcServer2, client2, r2, "no-version") + hub2, err := NewHub(config2, events2, grpcServer2, client2, nil, r2, "no-version") if err != nil { t.Fatal(err) } diff --git a/backend_storage_etcd.go b/backend_storage_etcd.go new file mode 100644 index 0000000..4f95043 --- /dev/null +++ b/backend_storage_etcd.go @@ -0,0 +1,256 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2022 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net/url" + "time" + + "github.com/dlintw/goconf" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type backendStorageEtcd struct { + backendStorageCommon + + etcdClient *EtcdClient + keyPrefix string + keyInfos map[string]*BackendInformationEtcd + + initializedCtx context.Context + initializedFunc context.CancelFunc + wakeupChanForTesting chan bool +} + +func NewBackendStorageEtcd(config *goconf.ConfigFile, etcdClient *EtcdClient) (BackendStorage, error) { + if etcdClient == nil || !etcdClient.IsConfigured() { + return nil, fmt.Errorf("no etcd endpoints configured") + } + + keyPrefix, _ := config.GetString("backend", "backendprefix") + if keyPrefix == "" { + return nil, fmt.Errorf("no backend prefix configured") + } + + initializedCtx, initializedFunc := context.WithCancel(context.Background()) + result := &backendStorageEtcd{ + backendStorageCommon: backendStorageCommon{ + backends: make(map[string][]*Backend), + }, + etcdClient: etcdClient, + keyPrefix: keyPrefix, + keyInfos: make(map[string]*BackendInformationEtcd), + + initializedCtx: initializedCtx, + initializedFunc: initializedFunc, + } + + etcdClient.AddListener(result) + return result, nil +} + +func (s *backendStorageEtcd) WaitForInitialized(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.initializedCtx.Done(): + return nil + } +} + +func (s *backendStorageEtcd) SetWakeupForTesting(ch chan bool) { + s.mu.Lock() + defer s.mu.Unlock() + + s.wakeupChanForTesting = ch +} + +func (s *backendStorageEtcd) wakeupForTesting() { + if s.wakeupChanForTesting == nil { + return + } + + select { + case s.wakeupChanForTesting <- true: + default: + } +} + +func (s *backendStorageEtcd) EtcdClientCreated(client *EtcdClient) { + go func() { + if err := client.Watch(context.Background(), s.keyPrefix, s, clientv3.WithPrefix()); err != nil { + log.Printf("Error processing watch for %s: %s", s.keyPrefix, err) + } + }() + + go func() { + client.WaitForConnection() + + waitDelay := initialWaitDelay + for { + response, err := s.getBackends(client, s.keyPrefix) + if err != nil { + if err == context.DeadlineExceeded { + log.Printf("Timeout getting initial list of backends, retry in %s", waitDelay) + } else { + log.Printf("Could not get initial list of backends, retry in %s: %s", waitDelay, err) + } + + time.Sleep(waitDelay) + waitDelay = waitDelay * 2 + if waitDelay > maxWaitDelay { + waitDelay = maxWaitDelay + } + continue + } + + for _, ev := range response.Kvs { + s.EtcdKeyUpdated(client, string(ev.Key), ev.Value) + } + s.initializedFunc() + return + } + }() +} + +func (s *backendStorageEtcd) getBackends(client *EtcdClient, keyPrefix string) (*clientv3.GetResponse, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + return client.Get(ctx, keyPrefix, clientv3.WithPrefix()) +} + +func (s *backendStorageEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []byte) { + var info BackendInformationEtcd + if err := json.Unmarshal(data, &info); err != nil { + log.Printf("Could not decode backend information %s: %s", string(data), err) + return + } + if err := info.CheckValid(); err != nil { + log.Printf("Received invalid backend information %s: %s", string(data), err) + return + } + + backend := &Backend{ + id: key, + url: info.Url, + secret: []byte(info.Secret), + + allowHttp: info.parsedUrl.Scheme == "http", + + maxStreamBitrate: info.MaxStreamBitrate, + maxScreenBitrate: info.MaxScreenBitrate, + sessionLimit: info.SessionLimit, + } + + host := info.parsedUrl.Host + + s.mu.Lock() + defer s.mu.Unlock() + + s.keyInfos[key] = &info + entries, found := s.backends[host] + if !found { + // Simple case, first backend for this host + log.Printf("Added backend %s (from %s)", info.Url, key) + s.backends[host] = []*Backend{backend} + statsBackendsCurrent.Inc() + s.wakeupForTesting() + return + } + + // Was the backend changed? + replaced := false + for idx, entry := range entries { + if entry.id == key { + log.Printf("Updated backend %s (from %s)", info.Url, key) + entries[idx] = backend + replaced = true + break + } + } + + if !replaced { + // New backend, add to list. + log.Printf("Added backend %s (from %s)", info.Url, key) + s.backends[host] = append(entries, backend) + statsBackendsCurrent.Inc() + } + s.wakeupForTesting() +} + +func (s *backendStorageEtcd) EtcdKeyDeleted(client *EtcdClient, key string) { + s.mu.Lock() + defer s.mu.Unlock() + + info, found := s.keyInfos[key] + if !found { + return + } + + delete(s.keyInfos, key) + host := info.parsedUrl.Host + entries, found := s.backends[host] + if !found { + return + } + + log.Printf("Removing backend %s (from %s)", info.Url, key) + newEntries := make([]*Backend, 0, len(entries)-1) + for _, entry := range entries { + if entry.id == key { + statsBackendsCurrent.Dec() + continue + } + + newEntries = append(newEntries, entry) + } + if len(newEntries) > 0 { + s.backends[host] = newEntries + } else { + delete(s.backends, host) + } + s.wakeupForTesting() +} + +func (s *backendStorageEtcd) Close() { + s.etcdClient.RemoveListener(s) +} + +func (s *backendStorageEtcd) Reload(config *goconf.ConfigFile) { + // Backend updates are processed through etcd. +} + +func (s *backendStorageEtcd) GetCompatBackend() *Backend { + return nil +} + +func (s *backendStorageEtcd) GetBackend(u *url.URL) *Backend { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.getBackendLocked(u) +} diff --git a/backend_storage_static.go b/backend_storage_static.go new file mode 100644 index 0000000..e062e60 --- /dev/null +++ b/backend_storage_static.go @@ -0,0 +1,303 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2022 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "log" + "net/url" + "reflect" + "strings" + + "github.com/dlintw/goconf" +) + +type backendStorageStatic struct { + backendStorageCommon + + // Deprecated + allowAll bool + commonSecret []byte + compatBackend *Backend +} + +func NewBackendStorageStatic(config *goconf.ConfigFile) (BackendStorage, error) { + allowAll, _ := config.GetBool("backend", "allowall") + allowHttp, _ := config.GetBool("backend", "allowhttp") + commonSecret, _ := config.GetString("backend", "secret") + sessionLimit, err := config.GetInt("backend", "sessionlimit") + if err != nil || sessionLimit < 0 { + sessionLimit = 0 + } + backends := make(map[string][]*Backend) + var compatBackend *Backend + numBackends := 0 + if allowAll { + log.Println("WARNING: All backend hostnames are allowed, only use for development!") + compatBackend = &Backend{ + id: "compat", + secret: []byte(commonSecret), + compat: true, + + allowHttp: allowHttp, + + sessionLimit: uint64(sessionLimit), + } + if sessionLimit > 0 { + log.Printf("Allow a maximum of %d sessions", sessionLimit) + } + numBackends++ + } else if backendIds, _ := config.GetString("backend", "backends"); backendIds != "" { + for host, configuredBackends := range getConfiguredHosts(backendIds, config) { + backends[host] = append(backends[host], configuredBackends...) + for _, be := range configuredBackends { + log.Printf("Backend %s added for %s", be.id, be.url) + } + numBackends += len(configuredBackends) + } + } else if allowedUrls, _ := config.GetString("backend", "allowed"); allowedUrls != "" { + // Old-style configuration, only hosts are configured and are using a common secret. + allowMap := make(map[string]bool) + for _, u := range strings.Split(allowedUrls, ",") { + u = strings.TrimSpace(u) + if idx := strings.IndexByte(u, '/'); idx != -1 { + log.Printf("WARNING: Removing path from allowed hostname \"%s\", check your configuration!", u) + u = u[:idx] + } + if u != "" { + allowMap[strings.ToLower(u)] = true + } + } + + if len(allowMap) == 0 { + log.Println("WARNING: No backend hostnames are allowed, check your configuration!") + } else { + compatBackend = &Backend{ + id: "compat", + secret: []byte(commonSecret), + compat: true, + + allowHttp: allowHttp, + + sessionLimit: uint64(sessionLimit), + } + hosts := make([]string, 0, len(allowMap)) + for host := range allowMap { + hosts = append(hosts, host) + backends[host] = []*Backend{compatBackend} + } + if len(hosts) > 1 { + log.Println("WARNING: Using deprecated backend configuration. Please migrate the \"allowed\" setting to the new \"backends\" configuration.") + } + log.Printf("Allowed backend hostnames: %s", hosts) + if sessionLimit > 0 { + log.Printf("Allow a maximum of %d sessions", sessionLimit) + } + numBackends++ + } + } + + statsBackendsCurrent.Add(float64(numBackends)) + return &backendStorageStatic{ + backendStorageCommon: backendStorageCommon{ + backends: backends, + }, + + allowAll: allowAll, + commonSecret: []byte(commonSecret), + compatBackend: compatBackend, + }, nil +} + +func (s *backendStorageStatic) Close() { +} + +func (s *backendStorageStatic) RemoveBackendsForHost(host string) { + if oldBackends := s.backends[host]; len(oldBackends) > 0 { + for _, backend := range oldBackends { + log.Printf("Backend %s removed for %s", backend.id, backend.url) + } + statsBackendsCurrent.Sub(float64(len(oldBackends))) + } + delete(s.backends, host) +} + +func (s *backendStorageStatic) UpsertHost(host string, backends []*Backend) { + for existingIndex, existingBackend := range s.backends[host] { + found := false + index := 0 + for _, newBackend := range backends { + if reflect.DeepEqual(existingBackend, newBackend) { // otherwise we could manually compare the struct members here + found = true + backends = append(backends[:index], backends[index+1:]...) + break + } else if newBackend.id == existingBackend.id { + found = true + s.backends[host][existingIndex] = newBackend + backends = append(backends[:index], backends[index+1:]...) + log.Printf("Backend %s updated for %s", newBackend.id, newBackend.url) + break + } + index++ + } + if !found { + removed := s.backends[host][existingIndex] + log.Printf("Backend %s removed for %s", removed.id, removed.url) + s.backends[host] = append(s.backends[host][:existingIndex], s.backends[host][existingIndex+1:]...) + statsBackendsCurrent.Dec() + } + } + + s.backends[host] = append(s.backends[host], backends...) + for _, added := range backends { + log.Printf("Backend %s added for %s", added.id, added.url) + } + statsBackendsCurrent.Add(float64(len(backends))) +} + +func getConfiguredBackendIDs(backendIds string) (ids []string) { + seen := make(map[string]bool) + + for _, id := range strings.Split(backendIds, ",") { + id = strings.TrimSpace(id) + if id == "" { + continue + } + + if seen[id] { + continue + } + ids = append(ids, id) + seen[id] = true + } + + return ids +} + +func getConfiguredHosts(backendIds string, config *goconf.ConfigFile) (hosts map[string][]*Backend) { + hosts = make(map[string][]*Backend) + for _, id := range getConfiguredBackendIDs(backendIds) { + u, _ := config.GetString(id, "url") + if u == "" { + log.Printf("Backend %s is missing or incomplete, skipping", id) + continue + } + + if u[len(u)-1] != '/' { + u += "/" + } + parsed, err := url.Parse(u) + if err != nil { + log.Printf("Backend %s has an invalid url %s configured (%s), skipping", id, u, err) + continue + } + + if strings.Contains(parsed.Host, ":") && hasStandardPort(parsed) { + parsed.Host = parsed.Hostname() + u = parsed.String() + } + + secret, _ := config.GetString(id, "secret") + if u == "" || secret == "" { + log.Printf("Backend %s is missing or incomplete, skipping", id) + continue + } + + sessionLimit, err := config.GetInt(id, "sessionlimit") + if err != nil || sessionLimit < 0 { + sessionLimit = 0 + } + if sessionLimit > 0 { + log.Printf("Backend %s allows a maximum of %d sessions", id, sessionLimit) + } + + maxStreamBitrate, err := config.GetInt(id, "maxstreambitrate") + if err != nil || maxStreamBitrate < 0 { + maxStreamBitrate = 0 + } + maxScreenBitrate, err := config.GetInt(id, "maxscreenbitrate") + if err != nil || maxScreenBitrate < 0 { + maxScreenBitrate = 0 + } + + hosts[parsed.Host] = append(hosts[parsed.Host], &Backend{ + id: id, + url: u, + secret: []byte(secret), + + allowHttp: parsed.Scheme == "http", + + maxStreamBitrate: maxStreamBitrate, + maxScreenBitrate: maxScreenBitrate, + + sessionLimit: uint64(sessionLimit), + }) + } + + return hosts +} + +func (s *backendStorageStatic) Reload(config *goconf.ConfigFile) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.compatBackend != nil { + log.Println("Old-style configuration active, reload is not supported") + return + } + + if backendIds, _ := config.GetString("backend", "backends"); backendIds != "" { + configuredHosts := getConfiguredHosts(backendIds, config) + + // remove backends that are no longer configured + for hostname := range s.backends { + if _, ok := configuredHosts[hostname]; !ok { + s.RemoveBackendsForHost(hostname) + } + } + + // rewrite backends adding newly configured ones and rewriting existing ones + for hostname, configuredBackends := range configuredHosts { + s.UpsertHost(hostname, configuredBackends) + } + } +} + +func (s *backendStorageStatic) GetCompatBackend() *Backend { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.compatBackend +} + +func (s *backendStorageStatic) GetBackend(u *url.URL) *Backend { + s.mu.RLock() + defer s.mu.RUnlock() + + if _, found := s.backends[u.Host]; !found { + if s.allowAll { + return s.compatBackend + } + return nil + } + + return s.getBackendLocked(u) +} diff --git a/hub.go b/hub.go index 9edf83e..88e69a3 100644 --- a/hub.go +++ b/hub.go @@ -154,7 +154,7 @@ type Hub struct { rpcClients *GrpcClients } -func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer, rpcClients *GrpcClients, r *mux.Router, version string) (*Hub, error) { +func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer, rpcClients *GrpcClients, etcdClient *EtcdClient, r *mux.Router, version string) (*Hub, error) { hashKey, _ := config.GetString("sessions", "hashkey") switch len(hashKey) { case 32: @@ -185,7 +185,7 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer maxConcurrentRequestsPerHost = defaultMaxConcurrentRequestsPerHost } - backend, err := NewBackendClient(config, maxConcurrentRequestsPerHost, version) + backend, err := NewBackendClient(config, maxConcurrentRequestsPerHost, version, etcdClient) if err != nil { return nil, err } @@ -447,6 +447,7 @@ func (h *Hub) Run() { go h.updateGeoDatabase() h.roomPing.Start() defer h.roomPing.Stop() + defer h.backend.Close() housekeeping := time.NewTicker(housekeepingInterval) geoipUpdater := time.NewTicker(24 * time.Hour) diff --git a/hub_test.go b/hub_test.go index cb92289..896531b 100644 --- a/hub_test.go +++ b/hub_test.go @@ -122,7 +122,7 @@ func CreateHubForTestWithConfig(t *testing.T, getConfigFunc func(*httptest.Serve if err != nil { t.Fatal(err) } - h, err := NewHub(config, events, nil, nil, r, "no-version") + h, err := NewHub(config, events, nil, nil, nil, r, "no-version") if err != nil { t.Fatal(err) } @@ -190,7 +190,7 @@ func CreateClusteredHubsForTestWithConfig(t *testing.T, getConfigFunc func(*http t.Fatal(err) } client1 := NewGrpcClientsForTest(t, addr2) - h1, err := NewHub(config1, events1, grpcServer1, client1, r1, "no-version") + h1, err := NewHub(config1, events1, grpcServer1, client1, nil, r1, "no-version") if err != nil { t.Fatal(err) } @@ -210,7 +210,7 @@ func CreateClusteredHubsForTestWithConfig(t *testing.T, getConfigFunc func(*http t.Fatal(err) } client2 := NewGrpcClientsForTest(t, addr1) - h2, err := NewHub(config2, events2, grpcServer2, client2, r2, "no-version") + h2, err := NewHub(config2, events2, grpcServer2, client2, nil, r2, "no-version") if err != nil { t.Fatal(err) } diff --git a/room_ping_test.go b/room_ping_test.go index a58fb47..e570d70 100644 --- a/room_ping_test.go +++ b/room_ping_test.go @@ -44,7 +44,7 @@ func NewRoomPingForTest(t *testing.T) (*url.URL, *RoomPing) { t.Fatal(err) } - backend, err := NewBackendClient(config, 1, "0.0") + backend, err := NewBackendClient(config, 1, "0.0", nil) if err != nil { t.Fatal(err) } diff --git a/server.conf.in b/server.conf.in index 9303d31..2f9123a 100644 --- a/server.conf.in +++ b/server.conf.in @@ -50,6 +50,15 @@ blockkey = -encryption-key- internalsecret = the-shared-secret-for-internal-clients [backend] +# Type of backend configuration. +# Defaults to "static". +# +# Possible values: +# - static: A comma-separated list of backends is given in the "backends" option. +# - etcd: Backends are retrieved from an etcd cluster. +#backendtype = static + +# For backend type "static": # Comma-separated list of backend ids from which clients are allowed to connect # from. Each backend will have isolated rooms, i.e. clients connecting to room # "abc12345" on backend 1 will be in a different room than clients connected to @@ -57,6 +66,22 @@ internalsecret = the-shared-secret-for-internal-clients # backends will not be able to communicate with each other. #backends = backend-id, another-backend +# For backend type "etcd": +# Key prefix of backend entries. All keys below will be watched and assumed to +# contain a JSON document with the following entries: +# - "url": Url of the Nextcloud instance. +# - "secret": Shared secret for requests from and to the backend servers. +# +# Additional optional entries: +# - "maxstreambitrate": Maximum bitrate per publishing stream (in bits per second). +# - "maxscreenbitrate": Maximum bitrate per screensharing stream (in bits per second). +# - "sessionlimit": Number of sessions that are allowed to connect. +# +# Example: +# "/signaling/backend/one" -> {"url": "https://nextcloud.domain1.invalid", ...} +# "/signaling/backend/two" -> {"url": "https://domain2.invalid/nextcloud", ...} +#backendprefix = /signaling/backend + # Allow any hostname as backend endpoint. This is extremely insecure and should # only be used while running the benchmark client against the server. allowall = false @@ -77,6 +102,7 @@ connectionsperhost = 8 # certificates. #skipverify = false +# For backendtype "static": # Backend configurations as defined in the "[backend]" section above. The # section names must match the ids used in "backends" above. #[backend-id] diff --git a/server/main.go b/server/main.go index 843ee48..11147a9 100644 --- a/server/main.go +++ b/server/main.go @@ -182,7 +182,7 @@ func main() { defer rpcClients.Close() r := mux.NewRouter() - hub, err := signaling.NewHub(config, events, rpcServer, rpcClients, r, version) + hub, err := signaling.NewHub(config, events, rpcServer, rpcClients, etcdClient, r, version) if err != nil { log.Fatal("Could not create hub: ", err) }