Move backend object to talk package.

This commit is contained in:
Joachim Bauch 2025-12-11 20:20:45 +01:00
commit ff69ee5c91
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
29 changed files with 823 additions and 640 deletions

View file

@ -38,6 +38,10 @@ component_management:
name: container
paths:
- container/**
- component_id: module_etcd
name: etcd
paths:
- etcd/**
- component_id: module_internal
name: internal
paths:

View file

@ -28,16 +28,12 @@ import (
"crypto/subtle"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"regexp"
"slices"
"time"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
)
const (
@ -426,73 +422,6 @@ type TurnCredentials struct {
URIs []string `json:"uris"`
}
// Information on a backend in the etcd cluster.
type BackendInformationEtcd struct {
// Compat setting.
Url string `json:"url,omitempty"`
Urls []string `json:"urls,omitempty"`
parsedUrls []*url.URL
Secret string `json:"secret"`
MaxStreamBitrate api.Bandwidth `json:"maxstreambitrate,omitempty"`
MaxScreenBitrate api.Bandwidth `json:"maxscreenbitrate,omitempty"`
SessionLimit uint64 `json:"sessionlimit,omitempty"`
}
func (p *BackendInformationEtcd) CheckValid() (err error) {
if p.Secret == "" {
return errors.New("secret missing")
}
if len(p.Urls) > 0 {
slices.Sort(p.Urls)
p.Urls = slices.Compact(p.Urls)
seen := make(map[string]bool)
outIdx := 0
for _, u := range p.Urls {
parsedUrl, err := url.Parse(u)
if err != nil {
return fmt.Errorf("invalid url %s: %w", u, err)
}
var changed bool
if parsedUrl, changed = internal.CanonicalizeUrl(parsedUrl); changed {
u = parsedUrl.String()
}
p.Urls[outIdx] = u
if seen[u] {
continue
}
seen[u] = true
p.parsedUrls = append(p.parsedUrls, parsedUrl)
outIdx++
}
if len(p.Urls) != outIdx {
clear(p.Urls[outIdx:])
p.Urls = p.Urls[:outIdx]
}
} else if p.Url != "" {
parsedUrl, err := url.Parse(p.Url)
if err != nil {
return fmt.Errorf("invalid url: %w", err)
}
var changed bool
if parsedUrl, changed = internal.CanonicalizeUrl(parsedUrl); changed {
p.Url = parsedUrl.String()
}
p.Urls = append(p.Urls, p.Url)
p.parsedUrls = append(p.parsedUrls, parsedUrl)
} else {
return errors.New("urls missing")
}
return nil
}
type BackendServerInfoVideoRoom struct {
Name string `json:"name,omitempty"`
Version string `json:"version,omitempty"`

View file

@ -72,110 +72,3 @@ func TestValidNumbers(t *testing.T) {
assert.False(isValidNumber(number), "number %s should not be valid", number)
}
}
func TestValidateBackendInformationEtcd(t *testing.T) {
t.Parallel()
assert := assert.New(t)
testcases := []struct {
b BackendInformationEtcd
expectedError string
expectedUrls []string
}{
{
b: BackendInformationEtcd{},
expectedError: "secret missing",
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
},
expectedError: "urls missing",
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Url: "https://foo\n",
},
expectedError: "invalid url",
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Urls: []string{"https://foo\n"},
},
expectedError: "invalid url",
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Urls: []string{"https://foo", "https://foo\n"},
},
expectedError: "invalid url",
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Url: "https://foo:443",
},
expectedUrls: []string{"https://foo"},
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Urls: []string{"https://foo:443"},
},
expectedUrls: []string{"https://foo"},
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Url: "https://foo:8443",
},
expectedUrls: []string{"https://foo:8443"},
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Urls: []string{"https://foo:8443"},
},
expectedUrls: []string{"https://foo:8443"},
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Urls: []string{"https://foo", "https://bar", "https://foo"},
},
expectedUrls: []string{"https://bar", "https://foo"},
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Urls: []string{"https://foo", "https://bar", "https://foo:443", "https://zaz"},
},
expectedUrls: []string{"https://bar", "https://foo", "https://zaz"},
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Urls: []string{"https://foo:443", "https://bar", "https://foo", "https://zaz"},
},
expectedUrls: []string{"https://bar", "https://foo", "https://zaz"},
},
}
for idx, tc := range testcases {
if tc.expectedError == "" {
if assert.NoError(tc.b.CheckValid(), "failed for testcase %d", idx) {
assert.Equal(tc.expectedUrls, tc.b.Urls, "failed for testcase %d", idx)
var urls []string
for _, u := range tc.b.parsedUrls {
urls = append(urls, u.String())
}
assert.Equal(tc.expectedUrls, urls, "failed for testcase %d", idx)
}
} else {
assert.ErrorContains(tc.b.CheckValid(), tc.expectedError, "failed for testcase %d, got %+v", idx, tc.b.parsedUrls)
}
}
}

View file

@ -28,6 +28,7 @@ import (
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/nats"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
var (
@ -47,22 +48,22 @@ type AsyncEventListener interface {
type AsyncEvents interface {
Close(ctx context.Context) error
RegisterBackendRoomListener(roomId string, backend *Backend, listener AsyncEventListener) error
UnregisterBackendRoomListener(roomId string, backend *Backend, listener AsyncEventListener) error
RegisterBackendRoomListener(roomId string, backend *talk.Backend, listener AsyncEventListener) error
UnregisterBackendRoomListener(roomId string, backend *talk.Backend, listener AsyncEventListener) error
RegisterRoomListener(roomId string, backend *Backend, listener AsyncEventListener) error
UnregisterRoomListener(roomId string, backend *Backend, listener AsyncEventListener) error
RegisterRoomListener(roomId string, backend *talk.Backend, listener AsyncEventListener) error
UnregisterRoomListener(roomId string, backend *talk.Backend, listener AsyncEventListener) error
RegisterUserListener(userId string, backend *Backend, listener AsyncEventListener) error
UnregisterUserListener(userId string, backend *Backend, listener AsyncEventListener) error
RegisterUserListener(userId string, backend *talk.Backend, listener AsyncEventListener) error
UnregisterUserListener(userId string, backend *talk.Backend, listener AsyncEventListener) error
RegisterSessionListener(sessionId api.PublicSessionId, backend *Backend, listener AsyncEventListener) error
UnregisterSessionListener(sessionId api.PublicSessionId, backend *Backend, listener AsyncEventListener) error
RegisterSessionListener(sessionId api.PublicSessionId, backend *talk.Backend, listener AsyncEventListener) error
UnregisterSessionListener(sessionId api.PublicSessionId, backend *talk.Backend, listener AsyncEventListener) error
PublishBackendRoomMessage(roomId string, backend *Backend, message *AsyncMessage) error
PublishRoomMessage(roomId string, backend *Backend, message *AsyncMessage) error
PublishUserMessage(userId string, backend *Backend, message *AsyncMessage) error
PublishSessionMessage(sessionId api.PublicSessionId, backend *Backend, message *AsyncMessage) error
PublishBackendRoomMessage(roomId string, backend *talk.Backend, message *AsyncMessage) error
PublishRoomMessage(roomId string, backend *talk.Backend, message *AsyncMessage) error
PublishUserMessage(userId string, backend *talk.Backend, message *AsyncMessage) error
PublishSessionMessage(sessionId api.PublicSessionId, backend *talk.Backend, message *AsyncMessage) error
}
func NewAsyncEvents(ctx context.Context, url string) (AsyncEvents, error) {

View file

@ -30,9 +30,10 @@ import (
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/nats"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
func GetSubjectForBackendRoomId(roomId string, backend *Backend) string {
func GetSubjectForBackendRoomId(roomId string, backend *talk.Backend) string {
if backend == nil || backend.IsCompat() {
return nats.GetEncodedSubject("backend.room", roomId)
}
@ -40,7 +41,7 @@ func GetSubjectForBackendRoomId(roomId string, backend *Backend) string {
return nats.GetEncodedSubject("backend.room", roomId+"|"+backend.Id())
}
func GetSubjectForRoomId(roomId string, backend *Backend) string {
func GetSubjectForRoomId(roomId string, backend *talk.Backend) string {
if backend == nil || backend.IsCompat() {
return nats.GetEncodedSubject("room", roomId)
}
@ -48,7 +49,7 @@ func GetSubjectForRoomId(roomId string, backend *Backend) string {
return nats.GetEncodedSubject("room", roomId+"|"+backend.Id())
}
func GetSubjectForUserId(userId string, backend *Backend) string {
func GetSubjectForUserId(userId string, backend *talk.Backend) string {
if backend == nil || backend.IsCompat() {
return nats.GetEncodedSubject("user", userId)
}
@ -56,7 +57,7 @@ func GetSubjectForUserId(userId string, backend *Backend) string {
return nats.GetEncodedSubject("user", userId+"|"+backend.Id())
}
func GetSubjectForSessionId(sessionId api.PublicSessionId, backend *Backend) string {
func GetSubjectForSessionId(sessionId api.PublicSessionId, backend *talk.Backend) string {
return string("session." + sessionId)
}
@ -189,7 +190,7 @@ func (e *asyncEventsNats) unregisterListener(key string, subscriptions asyncEven
return sub.Unsubscribe()
}
func (e *asyncEventsNats) RegisterBackendRoomListener(roomId string, backend *Backend, listener AsyncEventListener) error {
func (e *asyncEventsNats) RegisterBackendRoomListener(roomId string, backend *talk.Backend, listener AsyncEventListener) error {
key := GetSubjectForBackendRoomId(roomId, backend)
e.mu.Lock()
@ -198,7 +199,7 @@ func (e *asyncEventsNats) RegisterBackendRoomListener(roomId string, backend *Ba
return e.registerListener(key, e.backendRoomSubscriptions, listener)
}
func (e *asyncEventsNats) UnregisterBackendRoomListener(roomId string, backend *Backend, listener AsyncEventListener) error {
func (e *asyncEventsNats) UnregisterBackendRoomListener(roomId string, backend *talk.Backend, listener AsyncEventListener) error {
key := GetSubjectForBackendRoomId(roomId, backend)
e.mu.Lock()
@ -207,7 +208,7 @@ func (e *asyncEventsNats) UnregisterBackendRoomListener(roomId string, backend *
return e.unregisterListener(key, e.backendRoomSubscriptions, listener)
}
func (e *asyncEventsNats) RegisterRoomListener(roomId string, backend *Backend, listener AsyncEventListener) error {
func (e *asyncEventsNats) RegisterRoomListener(roomId string, backend *talk.Backend, listener AsyncEventListener) error {
key := GetSubjectForRoomId(roomId, backend)
e.mu.Lock()
@ -216,7 +217,7 @@ func (e *asyncEventsNats) RegisterRoomListener(roomId string, backend *Backend,
return e.registerListener(key, e.roomSubscriptions, listener)
}
func (e *asyncEventsNats) UnregisterRoomListener(roomId string, backend *Backend, listener AsyncEventListener) error {
func (e *asyncEventsNats) UnregisterRoomListener(roomId string, backend *talk.Backend, listener AsyncEventListener) error {
key := GetSubjectForRoomId(roomId, backend)
e.mu.Lock()
@ -225,7 +226,7 @@ func (e *asyncEventsNats) UnregisterRoomListener(roomId string, backend *Backend
return e.unregisterListener(key, e.roomSubscriptions, listener)
}
func (e *asyncEventsNats) RegisterUserListener(roomId string, backend *Backend, listener AsyncEventListener) error {
func (e *asyncEventsNats) RegisterUserListener(roomId string, backend *talk.Backend, listener AsyncEventListener) error {
key := GetSubjectForUserId(roomId, backend)
e.mu.Lock()
@ -234,7 +235,7 @@ func (e *asyncEventsNats) RegisterUserListener(roomId string, backend *Backend,
return e.registerListener(key, e.userSubscriptions, listener)
}
func (e *asyncEventsNats) UnregisterUserListener(roomId string, backend *Backend, listener AsyncEventListener) error {
func (e *asyncEventsNats) UnregisterUserListener(roomId string, backend *talk.Backend, listener AsyncEventListener) error {
key := GetSubjectForUserId(roomId, backend)
e.mu.Lock()
@ -243,7 +244,7 @@ func (e *asyncEventsNats) UnregisterUserListener(roomId string, backend *Backend
return e.unregisterListener(key, e.userSubscriptions, listener)
}
func (e *asyncEventsNats) RegisterSessionListener(sessionId api.PublicSessionId, backend *Backend, listener AsyncEventListener) error {
func (e *asyncEventsNats) RegisterSessionListener(sessionId api.PublicSessionId, backend *talk.Backend, listener AsyncEventListener) error {
key := GetSubjectForSessionId(sessionId, backend)
e.mu.Lock()
@ -252,7 +253,7 @@ func (e *asyncEventsNats) RegisterSessionListener(sessionId api.PublicSessionId,
return e.registerListener(key, e.sessionSubscriptions, listener)
}
func (e *asyncEventsNats) UnregisterSessionListener(sessionId api.PublicSessionId, backend *Backend, listener AsyncEventListener) error {
func (e *asyncEventsNats) UnregisterSessionListener(sessionId api.PublicSessionId, backend *talk.Backend, listener AsyncEventListener) error {
key := GetSubjectForSessionId(sessionId, backend)
e.mu.Lock()
@ -266,22 +267,22 @@ func (e *asyncEventsNats) publish(subject string, message *AsyncMessage) error {
return e.client.Publish(subject, message)
}
func (e *asyncEventsNats) PublishBackendRoomMessage(roomId string, backend *Backend, message *AsyncMessage) error {
func (e *asyncEventsNats) PublishBackendRoomMessage(roomId string, backend *talk.Backend, message *AsyncMessage) error {
subject := GetSubjectForBackendRoomId(roomId, backend)
return e.publish(subject, message)
}
func (e *asyncEventsNats) PublishRoomMessage(roomId string, backend *Backend, message *AsyncMessage) error {
func (e *asyncEventsNats) PublishRoomMessage(roomId string, backend *talk.Backend, message *AsyncMessage) error {
subject := GetSubjectForRoomId(roomId, backend)
return e.publish(subject, message)
}
func (e *asyncEventsNats) PublishUserMessage(userId string, backend *Backend, message *AsyncMessage) error {
func (e *asyncEventsNats) PublishUserMessage(userId string, backend *talk.Backend, message *AsyncMessage) error {
subject := GetSubjectForUserId(userId, backend)
return e.publish(subject, message)
}
func (e *asyncEventsNats) PublishSessionMessage(sessionId api.PublicSessionId, backend *Backend, message *AsyncMessage) error {
func (e *asyncEventsNats) PublishSessionMessage(sessionId api.PublicSessionId, backend *talk.Backend, message *AsyncMessage) error {
subject := GetSubjectForSessionId(sessionId, backend)
return e.publish(subject, message)
}

View file

@ -26,13 +26,13 @@ import (
"time"
"github.com/stretchr/testify/require"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
func Benchmark_GetSubjectForSessionId(b *testing.B) {
require := require.New(b)
backend := &Backend{
id: "compat",
}
backend := talk.NewCompatBackend(nil)
data := &SessionIdData{
Sid: 1,
Created: time.Now().UnixMicro(),

View file

@ -98,15 +98,15 @@ func (b *BackendClient) Reload(config *goconf.ConfigFile) {
b.backends.Reload(config)
}
func (b *BackendClient) GetCompatBackend() *Backend {
func (b *BackendClient) GetCompatBackend() *talk.Backend {
return b.backends.GetCompatBackend()
}
func (b *BackendClient) GetBackend(u *url.URL) *Backend {
func (b *BackendClient) GetBackend(u *url.URL) *talk.Backend {
return b.backends.GetBackend(u)
}
func (b *BackendClient) GetBackends() []*Backend {
func (b *BackendClient) GetBackends() []*talk.Backend {
return b.backends.GetBackends()
}

View file

@ -22,7 +22,6 @@
package signaling
import (
"bytes"
"fmt"
"net/url"
"slices"
@ -31,9 +30,9 @@ import (
"github.com/dlintw/goconf"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
const (
@ -43,134 +42,13 @@ const (
DefaultBackendType = BackendTypeStatic
)
var (
SessionLimitExceeded = api.NewError("session_limit_exceeded", "Too many sessions connected for this backend.")
)
type Backend struct {
id string
urls []string
secret []byte
allowHttp bool
maxStreamBitrate api.Bandwidth
maxScreenBitrate api.Bandwidth
sessionLimit uint64
sessionsLock sync.Mutex
// +checklocks:sessionsLock
sessions map[api.PublicSessionId]bool
counted bool
}
func (b *Backend) Id() string {
return b.id
}
func (b *Backend) Secret() []byte {
return b.secret
}
func (b *Backend) IsCompat() bool {
return len(b.urls) == 0
}
func (b *Backend) Equal(other *Backend) bool {
if b == other {
return true
} else if b == nil || other == nil {
return false
}
return b.id == other.id &&
b.allowHttp == other.allowHttp &&
b.maxStreamBitrate == other.maxStreamBitrate &&
b.maxScreenBitrate == other.maxScreenBitrate &&
b.sessionLimit == other.sessionLimit &&
bytes.Equal(b.secret, other.secret) &&
slices.Equal(b.urls, other.urls)
}
func (b *Backend) IsUrlAllowed(u *url.URL) bool {
switch u.Scheme {
case "https":
return true
case "http":
return b.allowHttp
default:
return false
}
}
func (b *Backend) HasUrl(url string) bool {
if b.IsCompat() {
// Old-style configuration, only hosts are configured.
return true
}
for _, u := range b.urls {
if strings.HasPrefix(url, u) {
return true
}
}
return false
}
func (b *Backend) Urls() []string {
return b.urls
}
func (b *Backend) Limit() int {
return int(b.sessionLimit)
}
func (b *Backend) Len() int {
b.sessionsLock.Lock()
defer b.sessionsLock.Unlock()
return len(b.sessions)
}
func (b *Backend) AddSession(session Session) error {
if session.ClientType() == api.HelloClientTypeInternal || session.ClientType() == api.HelloClientTypeVirtual {
// Internal and virtual sessions are not counting to the limit.
return nil
}
if b.sessionLimit == 0 {
// Not limited
return nil
}
b.sessionsLock.Lock()
defer b.sessionsLock.Unlock()
if b.sessions == nil {
b.sessions = make(map[api.PublicSessionId]bool)
} else if uint64(len(b.sessions)) >= b.sessionLimit {
statsBackendLimitExceededTotal.WithLabelValues(b.id).Inc()
return SessionLimitExceeded
}
b.sessions[session.PublicId()] = true
return nil
}
func (b *Backend) RemoveSession(session Session) {
b.sessionsLock.Lock()
defer b.sessionsLock.Unlock()
delete(b.sessions, session.PublicId())
}
type BackendStorage interface {
Close()
Reload(config *goconf.ConfigFile)
Reload(cfg *goconf.ConfigFile)
GetCompatBackend() *Backend
GetBackend(u *url.URL) *Backend
GetBackends() []*Backend
GetCompatBackend() *talk.Backend
GetBackend(u *url.URL) *talk.Backend
GetBackends() []*talk.Backend
}
type BackendStorageStats interface {
@ -183,29 +61,29 @@ type BackendStorageStats interface {
type backendStorageCommon struct {
mu sync.RWMutex
// +checklocks:mu
backends map[string][]*Backend
backends map[string][]*talk.Backend
stats BackendStorageStats // +checklocksignore: Only written to from constructor
}
func (s *backendStorageCommon) GetBackends() []*Backend {
func (s *backendStorageCommon) GetBackends() []*talk.Backend {
s.mu.RLock()
defer s.mu.RUnlock()
var result []*Backend
var result []*talk.Backend
for _, entries := range s.backends {
result = append(result, entries...)
}
slices.SortFunc(result, func(a, b *Backend) int {
slices.SortFunc(result, func(a, b *talk.Backend) int {
return strings.Compare(a.Id(), b.Id())
})
result = slices.CompactFunc(result, func(a, b *Backend) bool {
result = slices.CompactFunc(result, func(a, b *talk.Backend) bool {
return a.Id() == b.Id()
})
return result
}
func (s *backendStorageCommon) getBackendLocked(u *url.URL) *Backend {
func (s *backendStorageCommon) getBackendLocked(u *url.URL) *talk.Backend {
s.mu.RLock()
defer s.mu.RUnlock()
@ -299,16 +177,16 @@ func (b *BackendConfiguration) Reload(config *goconf.ConfigFile) {
b.storage.Reload(config)
}
func (b *BackendConfiguration) GetCompatBackend() *Backend {
func (b *BackendConfiguration) GetCompatBackend() *talk.Backend {
return b.storage.GetCompatBackend()
}
func (b *BackendConfiguration) GetBackend(u *url.URL) *Backend {
func (b *BackendConfiguration) GetBackend(u *url.URL) *talk.Backend {
u, _ = internal.CanonicalizeUrl(u)
return b.storage.GetBackend(u)
}
func (b *BackendConfiguration) GetBackends() []*Backend {
func (b *BackendConfiguration) GetBackends() []*talk.Backend {
return b.storage.GetBackends()
}

View file

@ -28,18 +28,6 @@ import (
)
var (
statsBackendLimit = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "backend",
Name: "session_limit",
Help: "The session limit of a backend",
}, []string{"backend"})
statsBackendLimitExceededTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ // +checklocksignore: Global readonly variable.
Namespace: "signaling",
Subsystem: "backend",
Name: "session_limit_exceeded_total",
Help: "The number of times the session limit exceeded",
}, []string{"backend"})
statsBackendsCurrent = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "backend",
@ -48,8 +36,6 @@ var (
})
backendConfigurationStats = []prometheus.Collector{
statsBackendLimit,
statsBackendLimitExceededTotal,
statsBackendsCurrent,
}
)
@ -57,15 +43,3 @@ var (
func RegisterBackendConfigurationStats() {
metrics.RegisterAll(backendConfigurationStats...)
}
func updateBackendStats(backend *Backend) {
if backend.sessionLimit > 0 {
statsBackendLimit.WithLabelValues(backend.id).Set(float64(backend.sessionLimit))
} else {
statsBackendLimit.DeleteLabelValues(backend.id)
}
}
func deleteBackendStats(backend *Backend) {
statsBackendLimit.DeleteLabelValues(backend.id)
}

View file

@ -34,6 +34,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
func testUrls(t *testing.T, config *BackendConfiguration, valid_urls []string, invalid_urls []string) {
@ -486,9 +487,9 @@ func TestBackendReloadRemoveBackendFromSharedHost(t *testing.T) {
}
}
func sortBackends(backends []*Backend) []*Backend {
func sortBackends(backends []*talk.Backend) []*talk.Backend {
result := slices.Clone(backends)
slices.SortFunc(result, func(a, b *Backend) int {
slices.SortFunc(result, func(a, b *talk.Backend) int {
return strings.Compare(a.Id(), b.Id())
})
return result
@ -534,8 +535,8 @@ func TestBackendConfiguration_EtcdCompat(t *testing.T) {
require.NoError(storage.WaitForInitialized(ctx))
if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 1) &&
assert.Equal([]string{url1}, backends[0].urls) &&
assert.Equal(initialSecret1, string(backends[0].secret)) {
assert.Equal([]string{url1}, backends[0].Urls()) &&
assert.Equal(initialSecret1, string(backends[0].Secret())) {
if backend := cfg.GetBackend(mustParse(url1)); assert.NotNil(backend) {
assert.Equal(backends[0], backend)
}
@ -546,8 +547,8 @@ func TestBackendConfiguration_EtcdCompat(t *testing.T) {
<-ch
assert.Equal(1, stats.value)
if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 1) &&
assert.Equal([]string{url1}, backends[0].urls) &&
assert.Equal(secret1, string(backends[0].secret)) {
assert.Equal([]string{url1}, backends[0].Urls()) &&
assert.Equal(secret1, string(backends[0].Secret())) {
if backend := cfg.GetBackend(mustParse(url1)); assert.NotNil(backend) {
assert.Equal(backends[0], backend)
}
@ -561,10 +562,10 @@ func TestBackendConfiguration_EtcdCompat(t *testing.T) {
<-ch
assert.Equal(2, stats.value)
if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 2) &&
assert.Equal([]string{url1}, backends[0].urls) &&
assert.Equal(secret1, string(backends[0].secret)) &&
assert.Equal([]string{url2}, backends[1].urls) &&
assert.Equal(secret2, string(backends[1].secret)) {
assert.Equal([]string{url1}, backends[0].Urls()) &&
assert.Equal(secret1, string(backends[0].Secret())) &&
assert.Equal([]string{url2}, backends[1].Urls()) &&
assert.Equal(secret2, string(backends[1].Secret())) {
if backend := cfg.GetBackend(mustParse(url1)); assert.NotNil(backend) {
assert.Equal(backends[0], backend)
} else if backend := cfg.GetBackend(mustParse(url2)); assert.NotNil(backend) {
@ -580,12 +581,12 @@ func TestBackendConfiguration_EtcdCompat(t *testing.T) {
<-ch
assert.Equal(3, stats.value)
if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 3) &&
assert.Equal([]string{url1}, backends[0].urls) &&
assert.Equal(secret1, string(backends[0].secret)) &&
assert.Equal([]string{url2}, backends[1].urls) &&
assert.Equal(secret2, string(backends[1].secret)) &&
assert.Equal([]string{url3}, backends[2].urls) &&
assert.Equal(secret3, string(backends[2].secret)) {
assert.Equal([]string{url1}, backends[0].Urls()) &&
assert.Equal(secret1, string(backends[0].Secret())) &&
assert.Equal([]string{url2}, backends[1].Urls()) &&
assert.Equal(secret2, string(backends[1].Secret())) &&
assert.Equal([]string{url3}, backends[2].Urls()) &&
assert.Equal(secret3, string(backends[2].Secret())) {
if backend := cfg.GetBackend(mustParse(url1)); assert.NotNil(backend) {
assert.Equal(backends[0], backend)
} else if backend := cfg.GetBackend(mustParse(url2)); assert.NotNil(backend) {
@ -600,10 +601,10 @@ func TestBackendConfiguration_EtcdCompat(t *testing.T) {
<-ch
assert.Equal(2, stats.value)
if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 2) {
assert.Equal([]string{url2}, backends[0].urls)
assert.Equal(secret2, string(backends[0].secret))
assert.Equal([]string{url3}, backends[1].urls)
assert.Equal(secret3, string(backends[1].secret))
assert.Equal([]string{url2}, backends[0].Urls())
assert.Equal(secret2, string(backends[0].Secret()))
assert.Equal([]string{url3}, backends[1].Urls())
assert.Equal(secret3, string(backends[1].Secret()))
}
drainWakeupChannel(ch)
@ -611,8 +612,8 @@ func TestBackendConfiguration_EtcdCompat(t *testing.T) {
<-ch
assert.Equal(1, stats.value)
if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 1) {
assert.Equal([]string{url3}, backends[0].urls)
assert.Equal(secret3, string(backends[0].secret))
assert.Equal([]string{url3}, backends[0].Urls())
assert.Equal(secret3, string(backends[0].Secret()))
}
storage.mu.RLock()

View file

@ -55,6 +55,7 @@ import (
"github.com/strukturag/nextcloud-spreed-signaling/internal"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/pool"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
const (
@ -327,7 +328,7 @@ func (b *BackendServer) parseRequestBody(f func(context.Context, http.ResponseWr
}
}
func (b *BackendServer) sendRoomInvite(roomid string, backend *Backend, userids []string, properties json.RawMessage) {
func (b *BackendServer) sendRoomInvite(roomid string, backend *talk.Backend, userids []string, properties json.RawMessage) {
msg := &AsyncMessage{
Type: "message",
Message: &api.ServerMessage{
@ -349,7 +350,7 @@ func (b *BackendServer) sendRoomInvite(roomid string, backend *Backend, userids
}
}
func (b *BackendServer) sendRoomDisinvite(roomid string, backend *Backend, reason string, userids []string, sessionids []api.RoomSessionId) {
func (b *BackendServer) sendRoomDisinvite(roomid string, backend *talk.Backend, reason string, userids []string, sessionids []api.RoomSessionId) {
msg := &AsyncMessage{
Type: "message",
Message: &api.ServerMessage{
@ -398,7 +399,7 @@ func (b *BackendServer) sendRoomDisinvite(roomid string, backend *Backend, reaso
wg.Wait()
}
func (b *BackendServer) sendRoomUpdate(roomid string, backend *Backend, notified_userids []string, all_userids []string, properties json.RawMessage) {
func (b *BackendServer) sendRoomUpdate(roomid string, backend *talk.Backend, notified_userids []string, all_userids []string, properties json.RawMessage) {
msg := &AsyncMessage{
Type: "message",
Message: &api.ServerMessage{
@ -499,7 +500,7 @@ func (b *BackendServer) fixupUserSessions(ctx context.Context, cache *container.
return result
}
func (b *BackendServer) sendRoomIncall(roomid string, backend *Backend, request *BackendServerRoomRequest) error {
func (b *BackendServer) sendRoomIncall(roomid string, backend *talk.Backend, request *BackendServerRoomRequest) error {
if !request.InCall.All {
timeout := time.Second
@ -524,7 +525,7 @@ func (b *BackendServer) sendRoomIncall(roomid string, backend *Backend, request
return b.events.PublishBackendRoomMessage(roomid, backend, message)
}
func (b *BackendServer) sendRoomParticipantsUpdate(ctx context.Context, roomid string, backend *Backend, request *BackendServerRoomRequest) error {
func (b *BackendServer) sendRoomParticipantsUpdate(ctx context.Context, roomid string, backend *talk.Backend, request *BackendServerRoomRequest) error {
timeout := time.Second
// Convert (Nextcloud) session ids to signaling session ids.
@ -588,7 +589,7 @@ loop:
return b.events.PublishBackendRoomMessage(roomid, backend, message)
}
func (b *BackendServer) sendRoomMessage(roomid string, backend *Backend, request *BackendServerRoomRequest) error {
func (b *BackendServer) sendRoomMessage(roomid string, backend *talk.Backend, request *BackendServerRoomRequest) error {
message := &AsyncMessage{
Type: "room",
Room: request,
@ -596,7 +597,7 @@ func (b *BackendServer) sendRoomMessage(roomid string, backend *Backend, request
return b.events.PublishBackendRoomMessage(roomid, backend, message)
}
func (b *BackendServer) sendRoomSwitchTo(ctx context.Context, roomid string, backend *Backend, request *BackendServerRoomRequest) error {
func (b *BackendServer) sendRoomSwitchTo(ctx context.Context, roomid string, backend *talk.Backend, request *BackendServerRoomRequest) error {
timeout := time.Second
// Convert (Nextcloud) session ids to signaling session ids.
@ -728,7 +729,7 @@ func isNumeric(s string) bool {
return checkNumeric.MatchString(s)
}
func (b *BackendServer) startDialoutInSession(ctx context.Context, session *ClientSession, roomid string, backend *Backend, backendUrl string, request *BackendServerRoomRequest) (any, error) {
func (b *BackendServer) startDialoutInSession(ctx context.Context, session *ClientSession, roomid string, backend *talk.Backend, backendUrl string, request *BackendServerRoomRequest) (any, error) {
url := backendUrl
if url != "" && url[len(url)-1] != '/' {
url += "/"
@ -809,7 +810,7 @@ func (b *BackendServer) startDialoutInSession(ctx context.Context, session *Clie
}
}
func (b *BackendServer) startDialout(ctx context.Context, roomid string, backend *Backend, backendUrl string, request *BackendServerRoomRequest) (any, error) {
func (b *BackendServer) startDialout(ctx context.Context, roomid string, backend *talk.Backend, backendUrl string, request *BackendServerRoomRequest) (any, error) {
if err := request.Dialout.ValidateNumber(); err != nil {
return returnDialoutError(http.StatusBadRequest, err)
}
@ -860,7 +861,7 @@ func (b *BackendServer) roomHandler(ctx context.Context, w http.ResponseWriter,
v := mux.Vars(r)
roomid := v["roomid"]
var backend *Backend
var backend *talk.Backend
backendUrl := r.Header.Get(HeaderBackendServer)
if backendUrl != "" {
if u, err := url.Parse(backendUrl); err == nil {

View file

@ -33,7 +33,9 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/strukturag/nextcloud-spreed-signaling/async"
"github.com/strukturag/nextcloud-spreed-signaling/etcd"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
type backendStorageEtcd struct {
@ -42,7 +44,7 @@ type backendStorageEtcd struct {
logger log.Logger
etcdClient *EtcdClient
keyPrefix string
keyInfos map[string]*BackendInformationEtcd
keyInfos map[string]*etcd.BackendInformationEtcd
initializedCtx context.Context
initializedFunc context.CancelFunc
@ -66,13 +68,13 @@ func NewBackendStorageEtcd(logger log.Logger, config *goconf.ConfigFile, etcdCli
closeCtx, closeFunc := context.WithCancel(context.Background())
result := &backendStorageEtcd{
backendStorageCommon: backendStorageCommon{
backends: make(map[string][]*Backend),
backends: make(map[string][]*talk.Backend),
stats: stats,
},
logger: logger,
etcdClient: etcdClient,
keyPrefix: keyPrefix,
keyInfos: make(map[string]*BackendInformationEtcd),
keyInfos: make(map[string]*etcd.BackendInformationEtcd),
initializedCtx: initializedCtx,
initializedFunc: initializedFunc,
@ -173,7 +175,7 @@ func (s *backendStorageEtcd) getBackends(ctx context.Context, client *EtcdClient
}
func (s *backendStorageEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []byte, prevValue []byte) {
var info BackendInformationEtcd
var info etcd.BackendInformationEtcd
if err := json.Unmarshal(data, &info); err != nil {
s.logger.Printf("Could not decode backend information %s: %s", string(data), err)
return
@ -183,34 +185,20 @@ func (s *backendStorageEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data
return
}
allowHttp := slices.ContainsFunc(info.parsedUrls, func(u *url.URL) bool {
return u.Scheme == "http"
})
backend := &Backend{
id: key,
urls: info.Urls,
secret: []byte(info.Secret),
allowHttp: allowHttp,
maxStreamBitrate: info.MaxStreamBitrate,
maxScreenBitrate: info.MaxScreenBitrate,
sessionLimit: info.SessionLimit,
}
backend := talk.NewBackendFromEtcd(key, &info)
s.mu.Lock()
defer s.mu.Unlock()
s.keyInfos[key] = &info
added := false
for idx, u := range info.parsedUrls {
for idx, u := range info.ParsedUrls {
host := u.Host
entries, found := s.backends[host]
if !found {
// Simple case, first backend for this host
s.logger.Printf("Added backend %s (from %s)", info.Urls[idx], key)
s.backends[host] = []*Backend{backend}
s.backends[host] = []*talk.Backend{backend}
added = true
continue
}
@ -218,7 +206,7 @@ func (s *backendStorageEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data
// Was the backend changed?
replaced := false
for idx, entry := range entries {
if entry.id == key {
if entry.Id() == key {
s.logger.Printf("Updated backend %s (from %s)", info.Urls[idx], key)
entries[idx] = backend
replaced = true
@ -233,7 +221,7 @@ func (s *backendStorageEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data
added = true
}
}
updateBackendStats(backend)
backend.UpdateStats()
if added {
s.stats.IncBackends()
}
@ -250,15 +238,15 @@ func (s *backendStorageEtcd) EtcdKeyDeleted(client *EtcdClient, key string, prev
}
delete(s.keyInfos, key)
var deleted map[string][]*Backend
var deleted map[string][]*talk.Backend
seen := make(map[string]bool)
for idx, u := range info.parsedUrls {
for idx, u := range info.ParsedUrls {
host := u.Host
entries, found := s.backends[host]
if !found {
if d, ok := deleted[host]; ok {
if slices.ContainsFunc(d, func(b *Backend) bool {
return slices.Contains(b.urls, u.String())
if slices.ContainsFunc(d, func(b *talk.Backend) bool {
return slices.Contains(b.Urls(), u.String())
}) {
s.logger.Printf("Removing backend %s (from %s)", info.Urls[idx], key)
}
@ -267,18 +255,18 @@ func (s *backendStorageEtcd) EtcdKeyDeleted(client *EtcdClient, key string, prev
}
s.logger.Printf("Removing backend %s (from %s)", info.Urls[idx], key)
newEntries := make([]*Backend, 0, len(entries)-1)
newEntries := make([]*talk.Backend, 0, len(entries)-1)
for _, entry := range entries {
if entry.id == key {
if len(info.parsedUrls) > 1 {
if entry.Id() == key {
if len(info.ParsedUrls) > 1 {
if deleted == nil {
deleted = make(map[string][]*Backend)
deleted = make(map[string][]*talk.Backend)
}
deleted[host] = append(deleted[host], entry)
}
if !seen[entry.Id()] {
seen[entry.Id()] = true
updateBackendStats(entry)
entry.UpdateStats()
s.stats.DecBackends()
}
continue
@ -304,11 +292,11 @@ func (s *backendStorageEtcd) Reload(config *goconf.ConfigFile) {
// Backend updates are processed through etcd.
}
func (s *backendStorageEtcd) GetCompatBackend() *Backend {
func (s *backendStorageEtcd) GetCompatBackend() *talk.Backend {
return nil
}
func (s *backendStorageEtcd) GetBackend(u *url.URL) *Backend {
func (s *backendStorageEtcd) GetBackend(u *url.URL) *talk.Backend {
s.mu.RLock()
defer s.mu.RUnlock()

View file

@ -28,77 +28,53 @@ import (
"github.com/dlintw/goconf"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/config"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
type backendStorageStatic struct {
backendStorageCommon
logger log.Logger
backendsById map[string]*Backend
backendsById map[string]*talk.Backend
// Deprecated
allowAll bool
commonSecret []byte
compatBackend *Backend
compatBackend *talk.Backend
}
func NewBackendStorageStatic(logger log.Logger, cfg *goconf.ConfigFile, stats BackendStorageStats) (BackendStorage, error) {
allowAll, _ := cfg.GetBool("backend", "allowall")
allowHttp, _ := cfg.GetBool("backend", "allowhttp")
commonSecret, _ := config.GetStringOptionWithEnv(cfg, "backend", "secret")
sessionLimit, err := cfg.GetInt("backend", "sessionlimit")
if err != nil || sessionLimit < 0 {
sessionLimit = 0
}
backends := make(map[string][]*Backend)
backendsById := make(map[string]*Backend)
var compatBackend *Backend
backends := make(map[string][]*talk.Backend)
backendsById := make(map[string]*talk.Backend)
var compatBackend *talk.Backend
numBackends := 0
if allowAll {
logger.Println("WARNING: All backend hostnames are allowed, only use for development!")
maxStreamBitrate, err := cfg.GetInt("backend", "maxstreambitrate")
if err != nil || maxStreamBitrate < 0 {
maxStreamBitrate = 0
}
maxScreenBitrate, err := cfg.GetInt("backend", "maxscreenbitrate")
if err != nil || maxScreenBitrate < 0 {
maxScreenBitrate = 0
}
compatBackend = &Backend{
id: "compat",
secret: []byte(commonSecret),
allowHttp: allowHttp,
sessionLimit: uint64(sessionLimit),
counted: true,
maxStreamBitrate: api.BandwidthFromBits(uint64(maxStreamBitrate)),
maxScreenBitrate: api.BandwidthFromBits(uint64(maxScreenBitrate)),
}
if sessionLimit > 0 {
compatBackend = talk.NewCompatBackend(cfg)
if sessionLimit := compatBackend.Limit(); sessionLimit > 0 {
logger.Printf("Allow a maximum of %d sessions", sessionLimit)
}
updateBackendStats(compatBackend)
backendsById[compatBackend.id] = compatBackend
compatBackend.UpdateStats()
backendsById[compatBackend.Id()] = compatBackend
numBackends++
} else if backendIds, _ := cfg.GetString("backend", "backends"); backendIds != "" {
added := make(map[string]*Backend)
added := make(map[string]*talk.Backend)
for host, configuredBackends := range getConfiguredHosts(logger, backendIds, cfg, commonSecret) {
backends[host] = append(backends[host], configuredBackends...)
for _, be := range configuredBackends {
added[be.id] = be
added[be.Id()] = be
}
}
for _, be := range added {
logger.Printf("Backend %s added for %s", be.id, strings.Join(be.urls, ", "))
backendsById[be.id] = be
updateBackendStats(be)
be.counted = true
logger.Printf("Backend %s added for %s", be.Id(), strings.Join(be.Urls(), ", "))
backendsById[be.Id()] = be
be.UpdateStats()
be.Count()
}
numBackends += len(added)
} else if allowedUrls, _ := cfg.GetString("backend", "allowed"); allowedUrls != "" {
@ -118,40 +94,21 @@ func NewBackendStorageStatic(logger log.Logger, cfg *goconf.ConfigFile, stats Ba
if len(allowMap) == 0 {
logger.Println("WARNING: No backend hostnames are allowed, check your configuration!")
} else {
maxStreamBitrate, err := cfg.GetInt("backend", "maxstreambitrate")
if err != nil || maxStreamBitrate < 0 {
maxStreamBitrate = 0
}
maxScreenBitrate, err := cfg.GetInt("backend", "maxscreenbitrate")
if err != nil || maxScreenBitrate < 0 {
maxScreenBitrate = 0
}
compatBackend = &Backend{
id: "compat",
secret: []byte(commonSecret),
allowHttp: allowHttp,
sessionLimit: uint64(sessionLimit),
counted: true,
maxStreamBitrate: api.BandwidthFromBits(uint64(maxStreamBitrate)),
maxScreenBitrate: api.BandwidthFromBits(uint64(maxScreenBitrate)),
}
compatBackend = talk.NewCompatBackend(cfg)
hosts := make([]string, 0, len(allowMap))
for host := range allowMap {
hosts = append(hosts, host)
backends[host] = []*Backend{compatBackend}
backends[host] = []*talk.Backend{compatBackend}
}
if len(hosts) > 1 {
logger.Println("WARNING: Using deprecated backend configuration. Please migrate the \"allowed\" setting to the new \"backends\" configuration.")
}
logger.Printf("Allowed backend hostnames: %s", hosts)
if sessionLimit > 0 {
if sessionLimit := compatBackend.Limit(); sessionLimit > 0 {
logger.Printf("Allow a maximum of %d sessions", sessionLimit)
}
updateBackendStats(compatBackend)
backendsById[compatBackend.id] = compatBackend
compatBackend.UpdateStats()
backendsById[compatBackend.Id()] = compatBackend
numBackends++
}
}
@ -189,15 +146,14 @@ func (s *backendStorageStatic) RemoveBackendsForHost(host string, seen map[strin
}
seen[backend.Id()] = seenDeleted
urls := slices.DeleteFunc(backend.urls, func(s string) bool {
urls := slices.DeleteFunc(backend.Urls(), func(s string) bool {
return !strings.Contains(s, "://"+host)
})
s.logger.Printf("Backend %s removed for %s", backend.id, strings.Join(urls, ", "))
if len(urls) == len(backend.urls) && backend.counted {
deleteBackendStats(backend)
s.logger.Printf("Backend %s removed for %s", backend.Id(), strings.Join(urls, ", "))
if len(urls) == len(backend.Urls()) && backend.Uncount() {
backend.DeleteStats()
delete(s.backendsById, backend.Id())
deleted++
backend.counted = false
}
}
s.stats.RemoveBackends(deleted)
@ -215,7 +171,7 @@ const (
)
// +checklocks:s.mu
func (s *backendStorageStatic) UpsertHost(host string, backends []*Backend, seen map[string]seenState) {
func (s *backendStorageStatic) UpsertHost(host string, backends []*talk.Backend, seen map[string]seenState) {
for existingIndex, existingBackend := range s.backends[host] {
found := false
index := 0
@ -224,16 +180,16 @@ func (s *backendStorageStatic) UpsertHost(host string, backends []*Backend, seen
found = true
backends = slices.Delete(backends, index, index+1)
break
} else if newBackend.id == existingBackend.id {
} else if newBackend.Id() == existingBackend.Id() {
found = true
s.backends[host][existingIndex] = newBackend
backends = slices.Delete(backends, index, index+1)
if seen[newBackend.id] != seenUpdated {
seen[newBackend.id] = seenUpdated
s.logger.Printf("Backend %s updated for %s", newBackend.id, strings.Join(newBackend.urls, ", "))
updateBackendStats(newBackend)
newBackend.counted = existingBackend.counted
s.backendsById[newBackend.id] = newBackend
if seen[newBackend.Id()] != seenUpdated {
seen[newBackend.Id()] = seenUpdated
s.logger.Printf("Backend %s updated for %s", newBackend.Id(), strings.Join(newBackend.Urls(), ", "))
newBackend.UpdateStats()
newBackend.CopyCount(existingBackend)
s.backendsById[newBackend.Id()] = newBackend
}
break
}
@ -242,17 +198,16 @@ func (s *backendStorageStatic) UpsertHost(host string, backends []*Backend, seen
if !found {
removed := s.backends[host][existingIndex]
s.backends[host] = slices.Delete(s.backends[host], existingIndex, existingIndex+1)
if seen[removed.id] != seenDeleted {
seen[removed.id] = seenDeleted
urls := slices.DeleteFunc(removed.urls, func(s string) bool {
if seen[removed.Id()] != seenDeleted {
seen[removed.Id()] = seenDeleted
urls := slices.DeleteFunc(removed.Urls(), func(s string) bool {
return !strings.Contains(s, "://"+host)
})
s.logger.Printf("Backend %s removed for %s", removed.id, strings.Join(urls, ", "))
if len(urls) == len(removed.urls) && removed.counted {
deleteBackendStats(removed)
s.logger.Printf("Backend %s removed for %s", removed.Id(), strings.Join(urls, ", "))
if len(urls) == len(removed.Urls()) && removed.Uncount() {
removed.DeleteStats()
delete(s.backendsById, removed.Id())
s.stats.DecBackends()
removed.counted = false
}
}
}
@ -262,22 +217,21 @@ func (s *backendStorageStatic) UpsertHost(host string, backends []*Backend, seen
addedBackends := 0
for _, added := range backends {
if seen[added.id] == seenAdded {
if seen[added.Id()] == seenAdded {
continue
}
seen[added.id] = seenAdded
if prev, found := s.backendsById[added.id]; found {
added.counted = prev.counted
seen[added.Id()] = seenAdded
if prev, found := s.backendsById[added.Id()]; found {
added.CopyCount(prev)
} else {
s.backendsById[added.id] = added
s.backendsById[added.Id()] = added
}
s.logger.Printf("Backend %s added for %s", added.id, strings.Join(added.urls, ", "))
if !added.counted {
updateBackendStats(added)
s.logger.Printf("Backend %s added for %s", added.Id(), strings.Join(added.Urls(), ", "))
if added.Count() {
added.UpdateStats()
addedBackends++
added.counted = true
}
}
s.stats.AddBackends(addedBackends)
@ -298,37 +252,10 @@ func getConfiguredBackendIDs(backendIds string) (ids []string) {
return ids
}
func getConfiguredHosts(logger log.Logger, backendIds string, cfg *goconf.ConfigFile, commonSecret string) (hosts map[string][]*Backend) {
hosts = make(map[string][]*Backend)
func getConfiguredHosts(logger log.Logger, backendIds string, cfg *goconf.ConfigFile, commonSecret string) (hosts map[string][]*talk.Backend) {
hosts = make(map[string][]*talk.Backend)
seenUrls := make(map[string]string)
for _, id := range getConfiguredBackendIDs(backendIds) {
secret, _ := config.GetStringOptionWithEnv(cfg, id, "secret")
if secret == "" && commonSecret != "" {
logger.Printf("Backend %s has no own shared secret set, using common shared secret", id)
secret = commonSecret
}
if secret == "" {
logger.Printf("Backend %s is missing or incomplete, skipping", id)
continue
}
sessionLimit, err := cfg.GetInt(id, "sessionlimit")
if err != nil || sessionLimit < 0 {
sessionLimit = 0
}
if sessionLimit > 0 {
logger.Printf("Backend %s allows a maximum of %d sessions", id, sessionLimit)
}
maxStreamBitrate, err := cfg.GetInt(id, "maxstreambitrate")
if err != nil || maxStreamBitrate < 0 {
maxStreamBitrate = 0
}
maxScreenBitrate, err := cfg.GetInt(id, "maxscreenbitrate")
if err != nil || maxScreenBitrate < 0 {
maxScreenBitrate = 0
}
var urls []string
if u, _ := config.GetStringOptionWithEnv(cfg, id, "urls"); u != "" {
urls = slices.Sorted(internal.SplitEntries(u, ","))
@ -344,14 +271,14 @@ func getConfiguredHosts(logger log.Logger, backendIds string, cfg *goconf.Config
continue
}
backend := &Backend{
id: id,
secret: []byte(secret),
backend, err := talk.NewBackendFromConfig(logger, id, cfg, commonSecret)
if err != nil {
logger.Printf("%s", err)
continue
}
maxStreamBitrate: api.BandwidthFromBits(uint64(maxStreamBitrate)),
maxScreenBitrate: api.BandwidthFromBits(uint64(maxScreenBitrate)),
sessionLimit: uint64(sessionLimit),
if sessionLimit := backend.Limit(); sessionLimit > 0 {
logger.Printf("Backend %s allows a maximum of %d sessions", id, sessionLimit)
}
added := make(map[string]bool)
@ -377,10 +304,7 @@ func getConfiguredHosts(logger log.Logger, backendIds string, cfg *goconf.Config
}
seenUrls[u] = id
backend.urls = append(backend.urls, u)
if parsed.Scheme == "http" {
backend.allowHttp = true
}
backend.AddUrl(parsed)
if !added[parsed.Host] {
hosts[parsed.Host] = append(hosts[parsed.Host], backend)
@ -427,14 +351,14 @@ func (s *backendStorageStatic) Reload(cfg *goconf.ConfigFile) {
}
}
func (s *backendStorageStatic) GetCompatBackend() *Backend {
func (s *backendStorageStatic) GetCompatBackend() *talk.Backend {
s.mu.RLock()
defer s.mu.RUnlock()
return s.compatBackend
}
func (s *backendStorageStatic) GetBackend(u *url.URL) *Backend {
func (s *backendStorageStatic) GetBackend(u *url.URL) *talk.Backend {
s.mu.RLock()
defer s.mu.RUnlock()

View file

@ -39,6 +39,7 @@ import (
"github.com/strukturag/nextcloud-spreed-signaling/async"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/nats"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
var (
@ -76,7 +77,7 @@ type ClientSession struct {
// +checklocks:mu
permissions map[Permission]bool
backend *Backend
backend *talk.Backend
backendUrl string
parsedBackendUrl *url.URL
@ -119,7 +120,7 @@ type ClientSession struct {
responseHandlers map[string]ResponseHandlerFunc
}
func NewClientSession(hub *Hub, privateId api.PrivateSessionId, publicId api.PublicSessionId, data *SessionIdData, backend *Backend, hello *api.HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) {
func NewClientSession(hub *Hub, privateId api.PrivateSessionId, publicId api.PublicSessionId, data *SessionIdData, backend *talk.Backend, hello *api.HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) {
ctx := log.NewLoggerContext(context.Background(), hub.logger)
ctx, closeFunc := context.WithCancel(ctx)
s := &ClientSession{
@ -284,7 +285,7 @@ func (s *ClientSession) SetPermissions(permissions []Permission) {
s.logger.Printf("Permissions of session %s changed: %s", s.PublicId(), permissions)
}
func (s *ClientSession) Backend() *Backend {
func (s *ClientSession) Backend() *talk.Backend {
return s.backend
}
@ -947,9 +948,9 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea
if backend := s.Backend(); backend != nil {
var maxBitrate api.Bandwidth
if streamType == StreamTypeScreen {
maxBitrate = backend.maxScreenBitrate
maxBitrate = backend.MaxScreenBitrate()
} else {
maxBitrate = backend.maxStreamBitrate
maxBitrate = backend.MaxStreamBitrate()
}
if settings.Bitrate <= 0 {
settings.Bitrate = maxBitrate

View file

@ -105,8 +105,8 @@ func TestBandwidth_Backend(t *testing.T) {
backend := hub.backend.GetBackend(u)
require.NotNil(backend, "Could not get backend")
backend.maxScreenBitrate = 1000
backend.maxStreamBitrate = 2000
backend.SetMaxScreenBitrate(1000)
backend.SetMaxStreamBitrate(2000)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
@ -157,9 +157,9 @@ func TestBandwidth_Backend(t *testing.T) {
var expectBitrate api.Bandwidth
if streamType == StreamTypeVideo {
expectBitrate = backend.maxStreamBitrate
expectBitrate = backend.MaxStreamBitrate()
} else {
expectBitrate = backend.maxScreenBitrate
expectBitrate = backend.MaxScreenBitrate()
}
assert.Equal(expectBitrate, pub.settings.Bitrate)
})

99
etcd/api.go Normal file
View file

@ -0,0 +1,99 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2025 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package etcd
import (
"errors"
"fmt"
"net/url"
"slices"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
)
// Information on a backend in the etcd cluster.
type BackendInformationEtcd struct {
// Compat setting.
Url string `json:"url,omitempty"`
Urls []string `json:"urls,omitempty"`
ParsedUrls []*url.URL `json:"-"`
Secret string `json:"secret"`
MaxStreamBitrate api.Bandwidth `json:"maxstreambitrate,omitempty"`
MaxScreenBitrate api.Bandwidth `json:"maxscreenbitrate,omitempty"`
SessionLimit uint64 `json:"sessionlimit,omitempty"`
}
func (p *BackendInformationEtcd) CheckValid() (err error) {
if p.Secret == "" {
return errors.New("secret missing")
}
if len(p.Urls) > 0 {
slices.Sort(p.Urls)
p.Urls = slices.Compact(p.Urls)
seen := make(map[string]bool)
outIdx := 0
for _, u := range p.Urls {
parsedUrl, err := url.Parse(u)
if err != nil {
return fmt.Errorf("invalid url %s: %w", u, err)
}
var changed bool
if parsedUrl, changed = internal.CanonicalizeUrl(parsedUrl); changed {
u = parsedUrl.String()
}
p.Urls[outIdx] = u
if seen[u] {
continue
}
seen[u] = true
p.ParsedUrls = append(p.ParsedUrls, parsedUrl)
outIdx++
}
if len(p.Urls) != outIdx {
clear(p.Urls[outIdx:])
p.Urls = p.Urls[:outIdx]
}
} else if p.Url != "" {
parsedUrl, err := url.Parse(p.Url)
if err != nil {
return fmt.Errorf("invalid url: %w", err)
}
var changed bool
if parsedUrl, changed = internal.CanonicalizeUrl(parsedUrl); changed {
p.Url = parsedUrl.String()
}
p.Urls = append(p.Urls, p.Url)
p.ParsedUrls = append(p.ParsedUrls, parsedUrl)
} else {
return errors.New("urls missing")
}
return nil
}

135
etcd/api_test.go Normal file
View file

@ -0,0 +1,135 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2025 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package etcd
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestValidateBackendInformationEtcd(t *testing.T) {
t.Parallel()
assert := assert.New(t)
testcases := []struct {
b BackendInformationEtcd
expectedError string
expectedUrls []string
}{
{
b: BackendInformationEtcd{},
expectedError: "secret missing",
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
},
expectedError: "urls missing",
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Url: "https://foo\n",
},
expectedError: "invalid url",
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Urls: []string{"https://foo\n"},
},
expectedError: "invalid url",
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Urls: []string{"https://foo", "https://foo\n"},
},
expectedError: "invalid url",
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Url: "https://foo:443",
},
expectedUrls: []string{"https://foo"},
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Urls: []string{"https://foo:443"},
},
expectedUrls: []string{"https://foo"},
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Url: "https://foo:8443",
},
expectedUrls: []string{"https://foo:8443"},
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Urls: []string{"https://foo:8443"},
},
expectedUrls: []string{"https://foo:8443"},
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Urls: []string{"https://foo", "https://bar", "https://foo"},
},
expectedUrls: []string{"https://bar", "https://foo"},
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Urls: []string{"https://foo", "https://bar", "https://foo:443", "https://zaz"},
},
expectedUrls: []string{"https://bar", "https://foo", "https://zaz"},
},
{
b: BackendInformationEtcd{
Secret: "verysecret",
Urls: []string{"https://foo:443", "https://bar", "https://foo", "https://zaz"},
},
expectedUrls: []string{"https://bar", "https://foo", "https://zaz"},
},
}
for idx, tc := range testcases {
if tc.expectedError == "" {
if assert.NoError(tc.b.CheckValid(), "failed for testcase %d", idx) {
assert.Equal(tc.expectedUrls, tc.b.Urls, "failed for testcase %d", idx)
var urls []string
for _, u := range tc.b.ParsedUrls {
urls = append(urls, u.String())
}
assert.Equal(tc.expectedUrls, urls, "failed for testcase %d", idx)
}
} else {
assert.ErrorContains(tc.b.CheckValid(), tc.expectedError, "failed for testcase %d, got %+v", idx, tc.b.ParsedUrls)
}
}
}

View file

@ -41,6 +41,7 @@ import (
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/config"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
var (
@ -65,9 +66,9 @@ type GrpcServerHub interface {
GetSessionByResumeId(resumeId api.PrivateSessionId) Session
GetSessionByPublicId(sessionId api.PublicSessionId) Session
GetSessionIdByRoomSessionId(roomSessionId api.RoomSessionId) (api.PublicSessionId, error)
GetRoomForBackend(roomId string, backend *Backend) *Room
GetRoomForBackend(roomId string, backend *talk.Backend) *Room
GetBackend(u *url.URL) *Backend
GetBackend(u *url.URL) *talk.Backend
CreateProxyToken(publisherId string) (string, error)
}

30
hub.go
View file

@ -742,7 +742,7 @@ func (h *Hub) GetSessionIdByRoomSessionId(roomSessionId api.RoomSessionId) (api.
return h.roomSessions.GetSessionId(roomSessionId)
}
func (h *Hub) GetDialoutSessions(roomId string, backend *Backend) (result []*ClientSession) {
func (h *Hub) GetDialoutSessions(roomId string, backend *talk.Backend) (result []*ClientSession) {
h.mu.RLock()
defer h.mu.RUnlock()
for session := range h.dialoutSessions {
@ -758,7 +758,7 @@ func (h *Hub) GetDialoutSessions(roomId string, backend *Backend) (result []*Cli
return
}
func (h *Hub) GetBackend(u *url.URL) *Backend {
func (h *Hub) GetBackend(u *url.URL) *talk.Backend {
if u == nil {
return h.backend.GetCompatBackend()
}
@ -942,7 +942,7 @@ func (h *Hub) unregisterRemoteSession(session *RemoteSession) {
delete(h.remoteSessions, session)
}
func (h *Hub) newSessionIdData(backend *Backend) *SessionIdData {
func (h *Hub) newSessionIdData(backend *talk.Backend) *SessionIdData {
sid := h.sid.Add(1)
for sid == 0 {
sid = h.sid.Add(1)
@ -955,7 +955,7 @@ func (h *Hub) newSessionIdData(backend *Backend) *SessionIdData {
return sessionIdData
}
func (h *Hub) processRegister(c HandlerClient, message *api.ClientMessage, backend *Backend, auth *BackendClientResponse) {
func (h *Hub) processRegister(c HandlerClient, message *api.ClientMessage, backend *talk.Backend, auth *BackendClientResponse) {
if !c.IsConnected() {
// Client disconnected while waiting for "hello" response.
return
@ -1036,9 +1036,9 @@ func (h *Hub) processRegister(c HandlerClient, message *api.ClientMessage, backe
wg.Wait()
if totalCount.Load() > limit {
backend.RemoveSession(session)
h.logger.Printf("Error adding session %s to backend %s: %s", session.PublicId(), backend.Id(), SessionLimitExceeded)
h.logger.Printf("Error adding session %s to backend %s: %s", session.PublicId(), backend.Id(), talk.SessionLimitExceeded)
session.Close()
client.SendMessage(message.NewWrappedErrorServerMessage(SessionLimitExceeded))
client.SendMessage(message.NewWrappedErrorServerMessage(talk.SessionLimitExceeded))
return
}
}
@ -1365,7 +1365,7 @@ func (h *Hub) processHello(client HandlerClient, message *api.ClientMessage) {
}
}
func (h *Hub) processHelloV1(ctx context.Context, client HandlerClient, message *api.ClientMessage) (*Backend, *BackendClientResponse, error) {
func (h *Hub) processHelloV1(ctx context.Context, client HandlerClient, message *api.ClientMessage) (*talk.Backend, *BackendClientResponse, error) {
url := message.Hello.Auth.ParsedUrl
backend := h.backend.GetBackend(url)
if backend == nil {
@ -1389,7 +1389,7 @@ func (h *Hub) processHelloV1(ctx context.Context, client HandlerClient, message
return backend, &auth, nil
}
func (h *Hub) processHelloV2(ctx context.Context, client HandlerClient, message *api.ClientMessage) (*Backend, *BackendClientResponse, error) {
func (h *Hub) processHelloV2(ctx context.Context, client HandlerClient, message *api.ClientMessage) (*talk.Backend, *BackendClientResponse, error) {
url := message.Hello.Auth.ParsedUrl
backend := h.backend.GetBackend(url)
if backend == nil {
@ -1546,7 +1546,7 @@ func (h *Hub) processHelloClient(client HandlerClient, message *api.ClientMessag
// Make sure the client must send another "hello" in case of errors.
defer h.startExpectHello(client)
var authFunc func(context.Context, HandlerClient, *api.ClientMessage) (*Backend, *BackendClientResponse, error)
var authFunc func(context.Context, HandlerClient, *api.ClientMessage) (*talk.Backend, *BackendClientResponse, error)
switch message.Hello.Version {
case api.HelloVersionV1:
// Auth information contains a ticket that must be validated against the
@ -1616,7 +1616,7 @@ func (h *Hub) processHelloInternal(client HandlerClient, message *api.ClientMess
h.processRegister(client, message, backend, auth)
}
func (h *Hub) disconnectByRoomSessionId(ctx context.Context, roomSessionId api.RoomSessionId, backend *Backend) {
func (h *Hub) disconnectByRoomSessionId(ctx context.Context, roomSessionId api.RoomSessionId, backend *talk.Backend) {
sessionId, err := h.roomSessions.LookupSessionId(ctx, roomSessionId, "room_session_reconnected")
if err == ErrNoSuchRoomSession {
return
@ -1680,8 +1680,8 @@ func (h *Hub) sendRoom(session *ClientSession, message *api.ClientMessage, room
var backendStreamBitrate api.Bandwidth
var backendScreenBitrate api.Bandwidth
if backend := room.Backend(); backend != nil {
backendStreamBitrate = backend.maxStreamBitrate
backendScreenBitrate = backend.maxScreenBitrate
backendStreamBitrate = backend.MaxStreamBitrate()
backendScreenBitrate = backend.MaxScreenBitrate()
}
var maxStreamBitrate api.Bandwidth
@ -1967,7 +1967,7 @@ func (h *Hub) publishFederatedSessions() (int, *sync.WaitGroup) {
return count, &wg
}
func (h *Hub) GetRoomForBackend(id string, backend *Backend) *Room {
func (h *Hub) GetRoomForBackend(id string, backend *talk.Backend) *Room {
internalRoomId := getRoomIdForBackend(id, backend)
h.ru.RLock()
@ -1986,7 +1986,7 @@ func (h *Hub) removeRoom(room *Room) {
h.roomPing.DeleteRoom(room.Id())
}
func (h *Hub) CreateRoom(id string, properties json.RawMessage, backend *Backend) (*Room, error) {
func (h *Hub) CreateRoom(id string, properties json.RawMessage, backend *talk.Backend) (*Room, error) {
h.ru.Lock()
defer h.ru.Unlock()
@ -1994,7 +1994,7 @@ func (h *Hub) CreateRoom(id string, properties json.RawMessage, backend *Backend
}
// +checklocks:h.ru
func (h *Hub) createRoomLocked(id string, properties json.RawMessage, backend *Backend) (*Room, error) {
func (h *Hub) createRoomLocked(id string, properties json.RawMessage, backend *talk.Backend) (*Room, error) {
// Note the write lock must be held.
room, err := NewRoom(id, properties, h, h.events, backend)
if err != nil {

View file

@ -822,9 +822,7 @@ func Benchmark_DecodePrivateSessionIdCached(b *testing.B) {
for range numDecodeCaches {
decodeCaches = append(decodeCaches, container.NewLruCache[*SessionIdData](decodeCacheSize))
}
backend := &Backend{
id: "compat",
}
backend := talk.NewCompatBackend(nil)
data := &SessionIdData{
Sid: 1,
Created: time.Now().UnixMicro(),
@ -851,9 +849,7 @@ func Benchmark_DecodePublicSessionIdCached(b *testing.B) {
for range numDecodeCaches {
decodeCaches = append(decodeCaches, container.NewLruCache[*SessionIdData](decodeCacheSize))
}
backend := &Backend{
id: "compat",
}
backend := talk.NewCompatBackend(nil)
data := &SessionIdData{
Sid: 1,
Created: time.Now().UnixMicro(),

View file

@ -52,6 +52,7 @@ import (
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
const (
@ -1787,11 +1788,11 @@ func (h *mockGrpcServerHub) GetSessionIdByRoomSessionId(roomSessionId api.RoomSe
return "", nil
}
func (h *mockGrpcServerHub) GetBackend(u *url.URL) *Backend {
func (h *mockGrpcServerHub) GetBackend(u *url.URL) *talk.Backend {
return nil
}
func (h *mockGrpcServerHub) GetRoomForBackend(roomId string, backend *Backend) *Room {
func (h *mockGrpcServerHub) GetRoomForBackend(roomId string, backend *talk.Backend) *Room {
return nil
}

View file

@ -39,6 +39,7 @@ import (
"github.com/strukturag/nextcloud-spreed-signaling/internal"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/nats"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
const (
@ -70,7 +71,7 @@ type Room struct {
logger log.Logger
hub *Hub
events AsyncEvents
backend *Backend
backend *talk.Backend
// +checklocks:mu
properties json.RawMessage
@ -101,7 +102,7 @@ type Room struct {
transientData *TransientData
}
func getRoomIdForBackend(id string, backend *Backend) string {
func getRoomIdForBackend(id string, backend *talk.Backend) string {
if id == "" {
return ""
}
@ -109,7 +110,7 @@ func getRoomIdForBackend(id string, backend *Backend) string {
return backend.Id() + "|" + id
}
func NewRoom(roomId string, properties json.RawMessage, hub *Hub, events AsyncEvents, backend *Backend) (*Room, error) {
func NewRoom(roomId string, properties json.RawMessage, hub *Hub, events AsyncEvents, backend *talk.Backend) (*Room, error) {
room := &Room{
id: roomId,
logger: hub.logger,
@ -158,7 +159,7 @@ func (r *Room) Properties() json.RawMessage {
return r.properties
}
func (r *Room) Backend() *Backend {
func (r *Room) Backend() *talk.Backend {
return r.backend
}

View file

@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
type DummySession struct {
@ -69,7 +70,7 @@ func (s *DummySession) ParsedUserData() (api.StringMap, error) {
return nil, nil
}
func (s *DummySession) Backend() *Backend {
func (s *DummySession) Backend() *talk.Backend {
return nil
}

View file

@ -28,6 +28,7 @@ import (
"sync"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
type Permission string
@ -60,7 +61,7 @@ type Session interface {
UserData() json.RawMessage
ParsedUserData() (api.StringMap, error)
Backend() *Backend
Backend() *talk.Backend
BackendUrl() string
ParsedBackendUrl() *url.URL

View file

@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
func TestReverseSessionId(t *testing.T) {
@ -47,9 +48,7 @@ func TestReverseSessionId(t *testing.T) {
func Benchmark_EncodePrivateSessionId(b *testing.B) {
require := require.New(b)
backend := &Backend{
id: "compat",
}
backend := talk.NewCompatBackend(nil)
data := &SessionIdData{
Sid: 1,
Created: time.Now().UnixMicro(),
@ -66,9 +65,7 @@ func Benchmark_EncodePrivateSessionId(b *testing.B) {
func Benchmark_DecodePrivateSessionId(b *testing.B) {
require := require.New(b)
backend := &Backend{
id: "compat",
}
backend := talk.NewCompatBackend(nil)
data := &SessionIdData{
Sid: 1,
Created: time.Now().UnixMicro(),
@ -89,9 +86,7 @@ func Benchmark_DecodePrivateSessionId(b *testing.B) {
func Benchmark_EncodePublicSessionId(b *testing.B) {
require := require.New(b)
backend := &Backend{
id: "compat",
}
backend := talk.NewCompatBackend(nil)
data := &SessionIdData{
Sid: 1,
Created: time.Now().UnixMicro(),
@ -108,9 +103,7 @@ func Benchmark_EncodePublicSessionId(b *testing.B) {
func Benchmark_DecodePublicSessionId(b *testing.B) {
require := require.New(b)
backend := &Backend{
id: "compat",
}
backend := talk.NewCompatBackend(nil)
data := &SessionIdData{
Sid: 1,
Created: time.Now().UnixMicro(),

314
talk/backend.go Normal file
View file

@ -0,0 +1,314 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2020 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package talk
import (
"bytes"
"fmt"
"net/url"
"slices"
"strings"
"sync"
"github.com/dlintw/goconf"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/config"
"github.com/strukturag/nextcloud-spreed-signaling/etcd"
"github.com/strukturag/nextcloud-spreed-signaling/log"
)
var (
SessionLimitExceeded = api.NewError("session_limit_exceeded", "Too many sessions connected for this backend.") // +checklocksignore: Global readonly variable.
)
func init() {
registerBackendStats()
}
type Backend struct {
id string
urls []string
secret []byte
allowHttp bool
maxStreamBitrate api.Bandwidth
maxScreenBitrate api.Bandwidth
sessionLimit uint64
sessionsLock sync.Mutex
// +checklocks:sessionsLock
sessions map[api.PublicSessionId]bool
counted bool
}
func NewCompatBackend(cfg *goconf.ConfigFile) *Backend {
if cfg == nil {
return &Backend{
id: "compat",
}
}
allowHttp, _ := cfg.GetBool("backend", "allowhttp")
commonSecret, _ := config.GetStringOptionWithEnv(cfg, "backend", "secret")
sessionLimit, err := cfg.GetInt("backend", "sessionlimit")
if err != nil || sessionLimit < 0 {
sessionLimit = 0
}
maxStreamBitrate, err := cfg.GetInt("backend", "maxstreambitrate")
if err != nil || maxStreamBitrate < 0 {
maxStreamBitrate = 0
}
maxScreenBitrate, err := cfg.GetInt("backend", "maxscreenbitrate")
if err != nil || maxScreenBitrate < 0 {
maxScreenBitrate = 0
}
return &Backend{
id: "compat",
secret: []byte(commonSecret),
allowHttp: allowHttp,
sessionLimit: uint64(sessionLimit),
counted: true,
maxStreamBitrate: api.BandwidthFromBits(uint64(maxStreamBitrate)),
maxScreenBitrate: api.BandwidthFromBits(uint64(maxScreenBitrate)),
}
}
func NewBackendFromConfig(logger log.Logger, id string, cfg *goconf.ConfigFile, commonSecret string) (*Backend, error) {
secret, _ := config.GetStringOptionWithEnv(cfg, id, "secret")
if secret == "" && commonSecret != "" {
logger.Printf("Backend %s has no own shared secret set, using common shared secret", id)
secret = commonSecret
}
if secret == "" {
return nil, fmt.Errorf("backend %s is missing or incomplete, skipping", id)
}
sessionLimit, err := cfg.GetInt(id, "sessionlimit")
if err != nil || sessionLimit < 0 {
sessionLimit = 0
}
maxStreamBitrate, err := cfg.GetInt(id, "maxstreambitrate")
if err != nil || maxStreamBitrate < 0 {
maxStreamBitrate = 0
}
maxScreenBitrate, err := cfg.GetInt(id, "maxscreenbitrate")
if err != nil || maxScreenBitrate < 0 {
maxScreenBitrate = 0
}
return &Backend{
id: id,
secret: []byte(secret),
maxStreamBitrate: api.BandwidthFromBits(uint64(maxStreamBitrate)),
maxScreenBitrate: api.BandwidthFromBits(uint64(maxScreenBitrate)),
sessionLimit: uint64(sessionLimit),
}, nil
}
func NewBackendFromEtcd(key string, info *etcd.BackendInformationEtcd) *Backend {
allowHttp := slices.ContainsFunc(info.ParsedUrls, func(u *url.URL) bool {
return u.Scheme == "http"
})
return &Backend{
id: key,
urls: info.Urls,
secret: []byte(info.Secret),
allowHttp: allowHttp,
maxStreamBitrate: info.MaxStreamBitrate,
maxScreenBitrate: info.MaxScreenBitrate,
sessionLimit: info.SessionLimit,
}
}
func (b *Backend) Id() string {
return b.id
}
func (b *Backend) Secret() []byte {
return b.secret
}
func (b *Backend) IsCompat() bool {
return len(b.urls) == 0
}
func (b *Backend) Equal(other *Backend) bool {
if b == other {
return true
} else if b == nil || other == nil {
return false
}
return b.id == other.id &&
b.allowHttp == other.allowHttp &&
b.maxStreamBitrate == other.maxStreamBitrate &&
b.maxScreenBitrate == other.maxScreenBitrate &&
b.sessionLimit == other.sessionLimit &&
bytes.Equal(b.secret, other.secret) &&
slices.Equal(b.urls, other.urls)
}
func (b *Backend) IsUrlAllowed(u *url.URL) bool {
switch u.Scheme {
case "https":
return true
case "http":
return b.allowHttp
default:
return false
}
}
func (b *Backend) HasUrl(url string) bool {
if b.IsCompat() {
// Old-style configuration, only hosts are configured.
return true
}
for _, u := range b.urls {
if strings.HasPrefix(url, u) {
return true
}
}
return false
}
func (b *Backend) Urls() []string {
return b.urls
}
func (b *Backend) AddUrl(u *url.URL) {
b.urls = append(b.urls, u.String())
if u.Scheme == "http" {
b.allowHttp = true
}
}
func (b *Backend) Limit() int {
return int(b.sessionLimit)
}
func (b *Backend) SetMaxStreamBitrate(bitrate api.Bandwidth) {
b.maxStreamBitrate = bitrate
}
func (b *Backend) MaxStreamBitrate() api.Bandwidth {
return b.maxStreamBitrate
}
func (b *Backend) SetMaxScreenBitrate(bitrate api.Bandwidth) {
b.maxScreenBitrate = bitrate
}
func (b *Backend) MaxScreenBitrate() api.Bandwidth {
return b.maxScreenBitrate
}
func (b *Backend) CopyCount(other *Backend) {
b.counted = other.counted
}
func (b *Backend) Count() bool {
if b.counted {
return false
}
b.counted = true
return true
}
func (b *Backend) Uncount() bool {
if !b.counted {
return false
}
b.counted = false
return true
}
func (b *Backend) Len() int {
b.sessionsLock.Lock()
defer b.sessionsLock.Unlock()
return len(b.sessions)
}
type BackendSession interface {
PublicId() api.PublicSessionId
ClientType() api.ClientType
}
func (b *Backend) AddSession(session BackendSession) error {
if session.ClientType() == api.HelloClientTypeInternal || session.ClientType() == api.HelloClientTypeVirtual {
// Internal and virtual sessions are not counting to the limit.
return nil
}
if b.sessionLimit == 0 {
// Not limited
return nil
}
b.sessionsLock.Lock()
defer b.sessionsLock.Unlock()
if b.sessions == nil {
b.sessions = make(map[api.PublicSessionId]bool)
} else if uint64(len(b.sessions)) >= b.sessionLimit {
registerBackendStats()
statsBackendLimitExceededTotal.WithLabelValues(b.id).Inc()
return SessionLimitExceeded
}
b.sessions[session.PublicId()] = true
return nil
}
func (b *Backend) RemoveSession(session BackendSession) {
b.sessionsLock.Lock()
defer b.sessionsLock.Unlock()
delete(b.sessions, session.PublicId())
}
func (b *Backend) UpdateStats() {
if b.sessionLimit > 0 {
statsBackendLimit.WithLabelValues(b.id).Set(float64(b.sessionLimit))
} else {
statsBackendLimit.DeleteLabelValues(b.id)
}
}
func (b *Backend) DeleteStats() {
statsBackendLimit.DeleteLabelValues(b.id)
}

View file

@ -0,0 +1,52 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2021 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package talk
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/strukturag/nextcloud-spreed-signaling/metrics"
)
var (
statsBackendLimit = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "backend",
Name: "session_limit",
Help: "The session limit of a backend",
}, []string{"backend"})
statsBackendLimitExceededTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ // +checklocksignore: Global readonly variable.
Namespace: "signaling",
Subsystem: "backend",
Name: "session_limit_exceeded_total",
Help: "The number of times the session limit exceeded",
}, []string{"backend"})
backendStats = []prometheus.Collector{
statsBackendLimit,
statsBackendLimitExceededTotal,
}
)
func registerBackendStats() {
metrics.RegisterAll(backendStats...)
}

View file

@ -31,6 +31,7 @@ import (
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/nats"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
const (
@ -138,7 +139,7 @@ func (s *VirtualSession) Data() *SessionIdData {
return s.data
}
func (s *VirtualSession) Backend() *Backend {
func (s *VirtualSession) Backend() *talk.Backend {
return s.session.Backend()
}

View file

@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
func TestVirtualSession(t *testing.T) {
@ -41,9 +42,7 @@ func TestVirtualSession(t *testing.T) {
roomId := "the-room-id"
emptyProperties := json.RawMessage("{}")
backend := &Backend{
id: "compat",
}
backend := talk.NewCompatBackend(nil)
room, err := hub.CreateRoom(roomId, emptyProperties, backend)
require.NoError(err)
defer room.Close()
@ -224,9 +223,7 @@ func TestVirtualSessionActorInformation(t *testing.T) {
roomId := "the-room-id"
emptyProperties := json.RawMessage("{}")
backend := &Backend{
id: "compat",
}
backend := talk.NewCompatBackend(nil)
room, err := hub.CreateRoom(roomId, emptyProperties, backend)
require.NoError(err)
defer room.Close()
@ -433,9 +430,7 @@ func TestVirtualSessionCustomInCall(t *testing.T) {
roomId := "the-room-id"
emptyProperties := json.RawMessage("{}")
backend := &Backend{
id: "compat",
}
backend := talk.NewCompatBackend(nil)
room, err := hub.CreateRoom(roomId, emptyProperties, backend)
require.NoError(err)
defer room.Close()
@ -574,9 +569,7 @@ func TestVirtualSessionCleanup(t *testing.T) {
roomId := "the-room-id"
emptyProperties := json.RawMessage("{}")
backend := &Backend{
id: "compat",
}
backend := talk.NewCompatBackend(nil)
room, err := hub.CreateRoom(roomId, emptyProperties, backend)
require.NoError(err)
defer room.Close()