Move NATS client to nats package.

This commit is contained in:
Joachim Bauch 2025-12-10 20:44:51 +01:00
commit 5543046305
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
17 changed files with 484 additions and 219 deletions

View file

@ -46,6 +46,10 @@ component_management:
name: metrics
paths:
- metrics/**
- component_id: module_nats
name: nats
paths:
- nats/**
- component_id: module_proxy
name: proxy
paths:

View file

@ -25,9 +25,8 @@ import (
"context"
"errors"
"github.com/nats-io/nats.go"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/nats"
)
var (
@ -66,7 +65,7 @@ type AsyncEvents interface {
}
func NewAsyncEvents(ctx context.Context, url string) (AsyncEvents, error) {
client, err := NewNatsClient(ctx, url)
client, err := nats.NewClient(ctx, url)
if err != nil {
return nil, err
}

View file

@ -27,44 +27,43 @@ import (
"sync"
"time"
"github.com/nats-io/nats.go"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/nats"
)
func GetSubjectForBackendRoomId(roomId string, backend *Backend) string {
if backend == nil || backend.IsCompat() {
return GetEncodedSubject("backend.room", roomId)
return nats.GetEncodedSubject("backend.room", roomId)
}
return GetEncodedSubject("backend.room", roomId+"|"+backend.Id())
return nats.GetEncodedSubject("backend.room", roomId+"|"+backend.Id())
}
func GetSubjectForRoomId(roomId string, backend *Backend) string {
if backend == nil || backend.IsCompat() {
return GetEncodedSubject("room", roomId)
return nats.GetEncodedSubject("room", roomId)
}
return GetEncodedSubject("room", roomId+"|"+backend.Id())
return nats.GetEncodedSubject("room", roomId+"|"+backend.Id())
}
func GetSubjectForUserId(userId string, backend *Backend) string {
if backend == nil || backend.IsCompat() {
return GetEncodedSubject("user", userId)
return nats.GetEncodedSubject("user", userId)
}
return GetEncodedSubject("user", userId+"|"+backend.Id())
return nats.GetEncodedSubject("user", userId+"|"+backend.Id())
}
func GetSubjectForSessionId(sessionId PublicSessionId, backend *Backend) string {
return string("session." + sessionId)
}
type asyncEventsNatsSubscriptions map[string]map[AsyncEventListener]NatsSubscription
type asyncEventsNatsSubscriptions map[string]map[AsyncEventListener]nats.Subscription
type asyncEventsNats struct {
mu sync.Mutex
client NatsClient
client nats.Client
logger log.Logger // +checklocksignore
// +checklocks:mu
@ -77,7 +76,7 @@ type asyncEventsNats struct {
sessionSubscriptions asyncEventsNatsSubscriptions
}
func NewAsyncEventsNats(logger log.Logger, client NatsClient) (AsyncEvents, error) {
func NewAsyncEventsNats(logger log.Logger, client nats.Client) (AsyncEvents, error) {
events := &asyncEventsNats{
client: client,
logger: logger,
@ -91,28 +90,29 @@ func NewAsyncEventsNats(logger log.Logger, client NatsClient) (AsyncEvents, erro
}
func (e *asyncEventsNats) GetServerInfoNats() *BackendServerInfoNats {
var nats *BackendServerInfoNats
// TODO: This should call a method on "e.client" directly instead of having a type switch.
var result *BackendServerInfoNats
switch n := e.client.(type) {
case *natsClient:
nats = &BackendServerInfoNats{
Urls: n.conn.Servers(),
case *nats.NativeClient:
result = &BackendServerInfoNats{
Urls: n.URLs(),
}
if c := n.conn; c.IsConnected() {
nats.Connected = true
nats.ServerUrl = c.ConnectedUrl()
nats.ServerID = c.ConnectedServerId()
nats.ServerVersion = c.ConnectedServerVersion()
nats.ClusterName = c.ConnectedClusterName()
if n.IsConnected() {
result.Connected = true
result.ServerUrl = n.ConnectedUrl()
result.ServerID = n.ConnectedServerId()
result.ServerVersion = n.ConnectedServerVersion()
result.ClusterName = n.ConnectedClusterName()
}
case *LoopbackNatsClient:
nats = &BackendServerInfoNats{
Urls: []string{NatsLoopbackUrl},
case *nats.LoopbackClient:
result = &BackendServerInfoNats{
Urls: []string{nats.LoopbackUrl},
Connected: true,
ServerUrl: NatsLoopbackUrl,
ServerUrl: nats.LoopbackUrl,
}
}
return nats
return result
}
func closeSubscriptions(logger log.Logger, wg *sync.WaitGroup, subscriptions asyncEventsNatsSubscriptions) {
@ -153,7 +153,7 @@ func (e *asyncEventsNats) Close(ctx context.Context) error {
func (e *asyncEventsNats) registerListener(key string, subscriptions asyncEventsNatsSubscriptions, listener AsyncEventListener) error {
subs, found := subscriptions[key]
if !found {
subs = make(map[AsyncEventListener]NatsSubscription)
subs = make(map[AsyncEventListener]nats.Subscription)
subscriptions[key] = subs
} else if _, found := subs[listener]; found {
return ErrAlreadyRegistered

View file

@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/nats"
)
var (
@ -58,7 +59,7 @@ func getAsyncEventsForTest(t *testing.T) AsyncEvents {
func getRealAsyncEventsForTest(t *testing.T) AsyncEvents {
logger := log.NewLoggerForTest(t)
ctx := log.NewLoggerContext(t.Context(), logger)
server, _ := startLocalNatsServer(t)
server, _ := nats.StartLocalServer(t)
events, err := NewAsyncEvents(ctx, server.ClientURL())
if err != nil {
require.NoError(t, err)
@ -69,7 +70,7 @@ func getRealAsyncEventsForTest(t *testing.T) AsyncEvents {
func getLoopbackAsyncEventsForTest(t *testing.T) AsyncEvents {
logger := log.NewLoggerForTest(t)
ctx := log.NewLoggerContext(t.Context(), logger)
events, err := NewAsyncEvents(ctx, NatsLoopbackUrl)
events, err := NewAsyncEvents(ctx, nats.LoopbackUrl)
if err != nil {
require.NoError(t, err)
}
@ -78,8 +79,8 @@ func getLoopbackAsyncEventsForTest(t *testing.T) AsyncEvents {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
nats := (events.(*asyncEventsNats)).client
(nats).(*LoopbackNatsClient).waitForSubscriptionsEmpty(ctx, t)
client := (events.(*asyncEventsNats)).client
nats.WaitForSubscriptionsEmpty(ctx, t, client)
})
return events
}
@ -87,17 +88,17 @@ func getLoopbackAsyncEventsForTest(t *testing.T) AsyncEvents {
func waitForAsyncEventsFlushed(ctx context.Context, t *testing.T, events AsyncEvents) {
t.Helper()
nats, ok := (events.(*asyncEventsNats))
e, ok := (events.(*asyncEventsNats))
if !ok {
// Only can wait for NATS events.
return
}
client, ok := nats.client.(*natsClient)
client, ok := e.client.(*nats.NativeClient)
if !ok {
// The loopback NATS clients is executing all events synchronously.
return
}
assert.NoError(t, client.conn.FlushWithContext(ctx))
assert.NoError(t, client.FlushWithContext(ctx))
}

View file

@ -48,6 +48,7 @@ import (
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/nats"
)
var (
@ -143,7 +144,7 @@ func CreateBackendServerWithClusteringForTestFromConfig(t *testing.T, config1 *g
server2.Close()
})
nats, _ := startLocalNatsServer(t)
nats, _ := nats.StartLocalServer(t)
grpcServer1, addr1 := NewGrpcServerForTest(t)
grpcServer2, addr2 := NewGrpcServerForTest(t)
@ -248,7 +249,7 @@ func expectRoomlistEvent(t *testing.T, ch AsyncChannel, msgType string) (*EventS
select {
case natsMsg := <-ch:
var message AsyncMessage
if !assert.NoError(NatsDecode(natsMsg, &message)) ||
if !assert.NoError(nats.Decode(natsMsg, &message)) ||
!assert.Equal("message", message.Type, "invalid message type, got %+v", message) ||
!assert.NotNil(message.Message, "message missing, got %+v", message) {
return nil, false

View file

@ -33,12 +33,12 @@ import (
"sync/atomic"
"time"
"github.com/nats-io/nats.go"
"github.com/pion/sdp/v3"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/async"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/nats"
)
var (
@ -1070,7 +1070,7 @@ func (s *ClientSession) GetSubscriber(id PublicSessionId, streamType StreamType)
func (s *ClientSession) processAsyncNatsMessage(msg *nats.Msg) {
var message AsyncMessage
if err := NatsDecode(msg, &message); err != nil {
if err := nats.Decode(msg, &message); err != nil {
s.logger.Printf("Could not decode NATS message %+v: %s", msg, err)
return
}

View file

@ -57,6 +57,7 @@ import (
"github.com/strukturag/nextcloud-spreed-signaling/container"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/nats"
"github.com/strukturag/nextcloud-spreed-signaling/test"
)
@ -226,10 +227,10 @@ func CreateClusteredHubsForTestWithConfig(t *testing.T, getConfigFunc func(*http
server2.Close()
})
nats1, _ := startLocalNatsServer(t)
nats1, _ := nats.StartLocalServer(t)
var nats2 *server.Server
if strings.Contains(t.Name(), "Federation") {
nats2, _ = startLocalNatsServer(t)
nats2, _ = nats.StartLocalServer(t)
} else {
nats2 = nats1
}

View file

@ -1,6 +1,6 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2017 struktur AG
* Copyright (C) 2025 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
@ -19,21 +19,19 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
package nats
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"net/url"
"os"
"os/signal"
"strings"
"time"
"github.com/nats-io/nats.go"
"github.com/strukturag/nextcloud-spreed-signaling/async"
"github.com/strukturag/nextcloud-spreed-signaling/log"
)
@ -42,17 +40,25 @@ const (
initialConnectInterval = time.Second
maxConnectInterval = 8 * time.Second
NatsLoopbackUrl = "nats://loopback"
LoopbackUrl = "nats://loopback"
DefaultURL = nats.DefaultURL
)
type NatsSubscription interface {
var (
ErrConnectionClosed = nats.ErrConnectionClosed
)
type Msg = nats.Msg
type Subscription interface {
Unsubscribe() error
}
type NatsClient interface {
type Client interface {
Close(ctx context.Context) error
Subscribe(subject string, ch chan *nats.Msg) (NatsSubscription, error)
Subscribe(subject string, ch chan *Msg) (Subscription, error)
Publish(subject string, message any) error
}
@ -63,21 +69,15 @@ func GetEncodedSubject(prefix string, suffix string) string {
return prefix + "." + base64.StdEncoding.EncodeToString([]byte(suffix))
}
type natsClient struct {
logger log.Logger
conn *nats.Conn
closed chan struct{}
}
func NewNatsClient(ctx context.Context, url string, options ...nats.Option) (NatsClient, error) {
func NewClient(ctx context.Context, url string, options ...nats.Option) (Client, error) {
logger := log.LoggerFromContext(ctx)
if url == ":loopback:" {
logger.Printf("WARNING: events url %s is deprecated, please use %s instead", url, NatsLoopbackUrl)
url = NatsLoopbackUrl
logger.Printf("WARNING: events url %s is deprecated, please use %s instead", url, LoopbackUrl)
url = LoopbackUrl
}
if url == NatsLoopbackUrl {
if url == LoopbackUrl {
logger.Println("Using internal NATS loopback client")
return NewLoopbackNatsClient(logger)
return NewLoopbackClient(logger)
}
backoff, err := async.NewExponentialBackoff(initialConnectInterval, maxConnectInterval)
@ -85,7 +85,7 @@ func NewNatsClient(ctx context.Context, url string, options ...nats.Option) (Nat
return nil, err
}
client := &natsClient{
client := &NativeClient{
logger: logger,
closed: make(chan struct{}),
}
@ -115,47 +115,7 @@ func NewNatsClient(ctx context.Context, url string, options ...nats.Option) (Nat
return client, nil
}
func (c *natsClient) Close(ctx context.Context) error {
c.conn.Close()
select {
case <-c.closed:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (c *natsClient) onClosed(conn *nats.Conn) {
if err := conn.LastError(); err != nil {
c.logger.Printf("NATS client closed, last error %s", conn.LastError())
} else {
c.logger.Println("NATS client closed")
}
close(c.closed)
}
func (c *natsClient) onDisconnected(conn *nats.Conn) {
c.logger.Println("NATS client disconnected")
}
func (c *natsClient) onReconnected(conn *nats.Conn) {
c.logger.Printf("NATS client reconnected to %s (%s)", conn.ConnectedUrl(), conn.ConnectedServerId())
}
func (c *natsClient) Subscribe(subject string, ch chan *nats.Msg) (NatsSubscription, error) {
return c.conn.ChanSubscribe(subject, ch)
}
func (c *natsClient) Publish(subject string, message any) error {
data, err := json.Marshal(message)
if err != nil {
return err
}
return c.conn.Publish(subject, data)
}
func NatsDecode(msg *nats.Msg, vPtr any) (err error) {
func Decode(msg *nats.Msg, vPtr any) (err error) {
switch arg := vPtr.(type) {
case *string:
// If they want a string and it is a JSON string, strip quotes
@ -174,11 +134,3 @@ func NatsDecode(msg *nats.Msg, vPtr any) (err error) {
}
return
}
func removeURLCredentials(u string) string {
if u, err := url.Parse(u); err == nil && u.User != nil {
u.User = url.User("***")
return u.String()
}
return u
}

159
nats/client_test.go Normal file
View file

@ -0,0 +1,159 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2026 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package nats
import (
"testing"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
)
func TestGetEncodedSubject(t *testing.T) {
t.Parallel()
assert := assert.New(t)
encoded := GetEncodedSubject("foo", "this is the subject")
assert.NotContains(encoded, " ")
encoded = GetEncodedSubject("foo", "this-is-the-subject")
assert.NotContains(encoded, "this-is")
}
func TestDecodeToString(t *testing.T) {
t.Parallel()
assert := assert.New(t)
testcases := []struct {
data []byte
expected string
}{
{
[]byte(`""`),
"",
},
{
[]byte(`"foo"`),
"foo",
},
{
[]byte(`{"type":"foo"}`),
`{"type":"foo"}`,
},
{
[]byte(`1234`),
"1234",
},
}
for idx, tc := range testcases {
var dest string
if assert.NoError(Decode(&nats.Msg{
Data: tc.data,
}, &dest), "decoding failed for test %d (%s)", idx, string(tc.data)) {
assert.Equal(tc.expected, dest, "failed for test %s (%s)", idx, string(tc.data))
}
}
}
func TestDecodeToByteSlice(t *testing.T) {
t.Parallel()
assert := assert.New(t)
testcases := []struct {
data []byte
expected []byte
}{
{
[]byte(``),
[]byte{},
},
{
[]byte(`""`),
[]byte(`""`),
},
{
[]byte(`"foo"`),
[]byte(`"foo"`),
},
{
[]byte(`{"type":"foo"}`),
[]byte(`{"type":"foo"}`),
},
{
[]byte(`1234`),
[]byte(`1234`),
},
}
for idx, tc := range testcases {
var dest []byte
if assert.NoError(Decode(&nats.Msg{
Data: tc.data,
}, &dest), "decoding failed for test %d (%s)", idx, string(tc.data)) {
assert.Equal(tc.expected, dest, "failed for test %s (%s)", idx, string(tc.data))
}
}
}
func TestDecodeRegular(t *testing.T) {
t.Parallel()
assert := assert.New(t)
type testdata struct {
Type string `json:"type"`
Value any `json:"value"`
}
testcases := []struct {
data []byte
expected *testdata
}{
{
[]byte(`null`),
nil,
},
{
[]byte(`{"value":"bar","type":"foo"}`),
&testdata{
Type: "foo",
Value: "bar",
},
},
{
[]byte(`{"value":123,"type":"foo"}`),
&testdata{
Type: "foo",
Value: float64(123),
},
},
}
for idx, tc := range testcases {
var dest *testdata
if assert.NoError(Decode(&nats.Msg{
Data: tc.data,
}, &dest), "decoding failed for test %d (%s)", idx, string(tc.data)) {
assert.Equal(tc.expected, dest, "failed for test %s (%s)", idx, string(tc.data))
}
}
}

View file

@ -19,7 +19,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
package nats
import (
"container/list"
@ -33,14 +33,14 @@ import (
"github.com/strukturag/nextcloud-spreed-signaling/log"
)
type LoopbackNatsClient struct {
type LoopbackClient struct {
logger log.Logger
mu sync.Mutex
closed chan struct{}
// +checklocks:mu
subscriptions map[string]map[*loopbackNatsSubscription]bool
subscriptions map[string]map[*loopbackSubscription]bool
// +checklocks:mu
wakeup sync.Cond
@ -48,19 +48,19 @@ type LoopbackNatsClient struct {
incoming list.List
}
func NewLoopbackNatsClient(logger log.Logger) (NatsClient, error) {
client := &LoopbackNatsClient{
func NewLoopbackClient(logger log.Logger) (Client, error) {
client := &LoopbackClient{
logger: logger,
closed: make(chan struct{}),
subscriptions: make(map[string]map[*loopbackNatsSubscription]bool),
subscriptions: make(map[string]map[*loopbackSubscription]bool),
}
client.wakeup.L = &client.mu
go client.processMessages()
return client, nil
}
func (c *LoopbackNatsClient) processMessages() {
func (c *LoopbackClient) processMessages() {
defer close(c.closed)
c.mu.Lock()
@ -74,19 +74,19 @@ func (c *LoopbackNatsClient) processMessages() {
break
}
msg := c.incoming.Remove(c.incoming.Front()).(*nats.Msg)
msg := c.incoming.Remove(c.incoming.Front()).(*Msg)
c.processMessage(msg)
}
}
// +checklocks:c.mu
func (c *LoopbackNatsClient) processMessage(msg *nats.Msg) {
func (c *LoopbackClient) processMessage(msg *Msg) {
subs, found := c.subscriptions[msg.Subject]
if !found {
return
}
channels := make([]chan *nats.Msg, 0, len(subs))
channels := make([]chan *Msg, 0, len(subs))
for sub := range subs {
channels = append(channels, sub.ch)
}
@ -101,7 +101,7 @@ func (c *LoopbackNatsClient) processMessage(msg *nats.Msg) {
}
}
func (c *LoopbackNatsClient) doClose() {
func (c *LoopbackClient) doClose() {
c.mu.Lock()
defer c.mu.Unlock()
@ -110,7 +110,7 @@ func (c *LoopbackNatsClient) doClose() {
c.wakeup.Signal()
}
func (c *LoopbackNatsClient) Close(ctx context.Context) error {
func (c *LoopbackClient) Close(ctx context.Context) error {
c.doClose()
select {
case <-c.closed:
@ -120,19 +120,19 @@ func (c *LoopbackNatsClient) Close(ctx context.Context) error {
}
}
type loopbackNatsSubscription struct {
type loopbackSubscription struct {
subject string
client *LoopbackNatsClient
client *LoopbackClient
ch chan *nats.Msg
ch chan *Msg
}
func (s *loopbackNatsSubscription) Unsubscribe() error {
func (s *loopbackSubscription) Unsubscribe() error {
s.client.unsubscribe(s)
return nil
}
func (c *LoopbackNatsClient) Subscribe(subject string, ch chan *nats.Msg) (NatsSubscription, error) {
func (c *LoopbackClient) Subscribe(subject string, ch chan *Msg) (Subscription, error) {
if strings.HasSuffix(subject, ".") || strings.Contains(subject, " ") {
return nil, nats.ErrBadSubject
}
@ -143,14 +143,14 @@ func (c *LoopbackNatsClient) Subscribe(subject string, ch chan *nats.Msg) (NatsS
return nil, nats.ErrConnectionClosed
}
s := &loopbackNatsSubscription{
s := &loopbackSubscription{
subject: subject,
client: c,
ch: ch,
}
subs, found := c.subscriptions[subject]
if !found {
subs = make(map[*loopbackNatsSubscription]bool)
subs = make(map[*loopbackSubscription]bool)
c.subscriptions[subject] = subs
}
subs[s] = true
@ -158,7 +158,7 @@ func (c *LoopbackNatsClient) Subscribe(subject string, ch chan *nats.Msg) (NatsS
return s, nil
}
func (c *LoopbackNatsClient) unsubscribe(s *loopbackNatsSubscription) {
func (c *LoopbackClient) unsubscribe(s *loopbackSubscription) {
c.mu.Lock()
defer c.mu.Unlock()
@ -170,7 +170,7 @@ func (c *LoopbackNatsClient) unsubscribe(s *loopbackNatsSubscription) {
}
}
func (c *LoopbackNatsClient) Publish(subject string, message any) error {
func (c *LoopbackClient) Publish(subject string, message any) error {
if strings.HasSuffix(subject, ".") || strings.Contains(subject, " ") {
return nats.ErrBadSubject
}
@ -181,7 +181,7 @@ func (c *LoopbackNatsClient) Publish(subject string, message any) error {
return nats.ErrConnectionClosed
}
msg := &nats.Msg{
msg := &Msg{
Subject: subject,
}
var err error

View file

@ -19,7 +19,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
package nats
import (
"context"
@ -32,30 +32,9 @@ import (
"github.com/strukturag/nextcloud-spreed-signaling/log"
)
func (c *LoopbackNatsClient) waitForSubscriptionsEmpty(ctx context.Context, t *testing.T) {
for {
c.mu.Lock()
count := len(c.subscriptions)
c.mu.Unlock()
if count == 0 {
break
}
select {
case <-ctx.Done():
c.mu.Lock()
assert.NoError(t, ctx.Err(), "Error waiting for subscriptions %+v to terminate", c.subscriptions)
c.mu.Unlock()
return
default:
time.Sleep(time.Millisecond)
}
}
}
func CreateLoopbackNatsClientForTest(t *testing.T) NatsClient {
func CreateLoopbackClientForTest(t *testing.T) Client {
logger := log.NewLoggerForTest(t)
result, err := NewLoopbackNatsClient(logger)
result, err := NewLoopbackClient(logger)
require.NoError(t, err)
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
@ -65,30 +44,30 @@ func CreateLoopbackNatsClientForTest(t *testing.T) NatsClient {
return result
}
func TestLoopbackNatsClient_Subscribe(t *testing.T) {
func TestLoopbackClient_Subscribe(t *testing.T) {
t.Parallel()
client := CreateLoopbackNatsClientForTest(t)
testNatsClient_Subscribe(t, client)
client := CreateLoopbackClientForTest(t)
testClient_Subscribe(t, client)
}
func TestLoopbackClient_PublishAfterClose(t *testing.T) {
t.Parallel()
client := CreateLoopbackNatsClientForTest(t)
testNatsClient_PublishAfterClose(t, client)
client := CreateLoopbackClientForTest(t)
test_PublishAfterClose(t, client)
}
func TestLoopbackClient_SubscribeAfterClose(t *testing.T) {
t.Parallel()
client := CreateLoopbackNatsClientForTest(t)
testNatsClient_SubscribeAfterClose(t, client)
client := CreateLoopbackClientForTest(t)
testClient_SubscribeAfterClose(t, client)
}
func TestLoopbackClient_BadSubjects(t *testing.T) {
t.Parallel()
client := CreateLoopbackNatsClientForTest(t)
testNatsClient_BadSubjects(t, client)
client := CreateLoopbackClientForTest(t)
testClient_BadSubjects(t, client)
}

114
nats/native.go Normal file
View file

@ -0,0 +1,114 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2017 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package nats
import (
"context"
"encoding/json"
"net/url"
"github.com/nats-io/nats.go"
"github.com/strukturag/nextcloud-spreed-signaling/log"
)
type NativeClient struct {
logger log.Logger
conn *nats.Conn
closed chan struct{}
}
func (c *NativeClient) URLs() []string {
return c.conn.Servers()
}
func (c *NativeClient) IsConnected() bool {
return c.conn.IsConnected()
}
func (c *NativeClient) ConnectedUrl() string {
return c.conn.ConnectedUrl()
}
func (c *NativeClient) ConnectedServerId() string {
return c.conn.ConnectedServerId()
}
func (c *NativeClient) ConnectedServerVersion() string {
return c.conn.ConnectedServerVersion()
}
func (c *NativeClient) ConnectedClusterName() string {
return c.conn.ConnectedClusterName()
}
func (c *NativeClient) Close(ctx context.Context) error {
c.conn.Close()
select {
case <-c.closed:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (c *NativeClient) FlushWithContext(ctx context.Context) error {
return c.conn.FlushWithContext(ctx)
}
func (c *NativeClient) onClosed(conn *nats.Conn) {
if err := conn.LastError(); err != nil {
c.logger.Printf("NATS client closed, last error %s", conn.LastError())
} else {
c.logger.Println("NATS client closed")
}
close(c.closed)
}
func (c *NativeClient) onDisconnected(conn *nats.Conn) {
c.logger.Println("NATS client disconnected")
}
func (c *NativeClient) onReconnected(conn *nats.Conn) {
c.logger.Printf("NATS client reconnected to %s (%s)", conn.ConnectedUrl(), conn.ConnectedServerId())
}
func (c *NativeClient) Subscribe(subject string, ch chan *Msg) (Subscription, error) {
return c.conn.ChanSubscribe(subject, ch)
}
func (c *NativeClient) Publish(subject string, message any) error {
data, err := json.Marshal(message)
if err != nil {
return err
}
return c.conn.Publish(subject, data)
}
func removeURLCredentials(u string) string {
if u, err := url.Parse(u); err == nil && u.User != nil {
u.User = url.User("***")
return u.String()
}
return u
}

View file

@ -19,7 +19,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
package nats
import (
"context"
@ -27,41 +27,22 @@ import (
"testing"
"time"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/nats-io/nats-server/v2/server"
natsserver "github.com/nats-io/nats-server/v2/test"
"github.com/nats-io/nats.go"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/test"
)
func startLocalNatsServer(t *testing.T) (*server.Server, int) {
func CreateLocalClientForTest(t *testing.T, options ...nats.Option) (*server.Server, int, Client) {
t.Helper()
return startLocalNatsServerPort(t, server.RANDOM_PORT)
}
func startLocalNatsServerPort(t *testing.T, port int) (*server.Server, int) {
t.Helper()
opts := natsserver.DefaultTestOptions
opts.Port = port
opts.Cluster.Name = "testing"
srv := natsserver.RunServer(&opts)
t.Cleanup(func() {
srv.Shutdown()
srv.WaitForShutdown()
})
return srv, opts.Port
}
func CreateLocalNatsClientForTest(t *testing.T, options ...nats.Option) (*server.Server, int, NatsClient) {
t.Helper()
server, port := startLocalNatsServer(t)
server, port := StartLocalServer(t)
logger := log.NewLoggerForTest(t)
ctx := log.NewLoggerContext(t.Context(), logger)
result, err := NewNatsClient(ctx, server.ClientURL(), options...)
result, err := NewClient(ctx, server.ClientURL(), options...)
require.NoError(t, err)
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
@ -71,10 +52,10 @@ func CreateLocalNatsClientForTest(t *testing.T, options ...nats.Option) (*server
return server, port, result
}
func testNatsClient_Subscribe(t *testing.T, client NatsClient) {
func testClient_Subscribe(t *testing.T, client Client) {
require := require.New(t)
assert := assert.New(t)
dest := make(chan *nats.Msg)
dest := make(chan *Msg)
sub, err := client.Subscribe("foo", dest)
require.NoError(err)
ch := make(chan struct{})
@ -113,76 +94,76 @@ func testNatsClient_Subscribe(t *testing.T, client NatsClient) {
require.Equal(maxPublish, received.Load(), "Received wrong # of messages")
}
func TestNatsClient_Subscribe(t *testing.T) { // nolint:paralleltest
func TestClient_Subscribe(t *testing.T) { // nolint:paralleltest
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
_, _, client := CreateLocalNatsClientForTest(t)
_, _, client := CreateLocalClientForTest(t)
testNatsClient_Subscribe(t, client)
testClient_Subscribe(t, client)
})
}
func testNatsClient_PublishAfterClose(t *testing.T, client NatsClient) {
func test_PublishAfterClose(t *testing.T, client Client) {
assert.NoError(t, client.Close(t.Context()))
assert.ErrorIs(t, client.Publish("foo", "bar"), nats.ErrConnectionClosed)
}
func TestNatsClient_PublishAfterClose(t *testing.T) { // nolint:paralleltest
func TestClient_PublishAfterClose(t *testing.T) { // nolint:paralleltest
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
_, _, client := CreateLocalNatsClientForTest(t)
_, _, client := CreateLocalClientForTest(t)
testNatsClient_PublishAfterClose(t, client)
test_PublishAfterClose(t, client)
})
}
func testNatsClient_SubscribeAfterClose(t *testing.T, client NatsClient) {
func testClient_SubscribeAfterClose(t *testing.T, client Client) {
assert.NoError(t, client.Close(t.Context()))
ch := make(chan *nats.Msg)
ch := make(chan *Msg)
_, err := client.Subscribe("foo", ch)
assert.ErrorIs(t, err, nats.ErrConnectionClosed)
}
func TestNatsClient_SubscribeAfterClose(t *testing.T) { // nolint:paralleltest
func TestClient_SubscribeAfterClose(t *testing.T) { // nolint:paralleltest
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
_, _, client := CreateLocalNatsClientForTest(t)
_, _, client := CreateLocalClientForTest(t)
testNatsClient_SubscribeAfterClose(t, client)
testClient_SubscribeAfterClose(t, client)
})
}
func testNatsClient_BadSubjects(t *testing.T, client NatsClient) {
func testClient_BadSubjects(t *testing.T, client Client) {
assert := assert.New(t)
subjects := []string{
"foo bar",
"foo.",
}
ch := make(chan *nats.Msg)
ch := make(chan *Msg)
for _, s := range subjects {
_, err := client.Subscribe(s, ch)
assert.ErrorIs(err, nats.ErrBadSubject, "Expected error for subject %s", s)
}
}
func TestNatsClient_BadSubjects(t *testing.T) { // nolint:paralleltest
func TestClient_BadSubjects(t *testing.T) { // nolint:paralleltest
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
_, _, client := CreateLocalNatsClientForTest(t)
_, _, client := CreateLocalClientForTest(t)
testNatsClient_BadSubjects(t, client)
testClient_BadSubjects(t, client)
})
}
func TestNatsClient_MaxReconnects(t *testing.T) { // nolint:paralleltest
func TestClient_MaxReconnects(t *testing.T) { // nolint:paralleltest
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
reconnectWait := time.Millisecond
server, port, client := CreateLocalNatsClientForTest(t,
server, port, client := CreateLocalClientForTest(t,
nats.ReconnectWait(reconnectWait),
nats.ReconnectJitter(0, 0),
)
c, ok := client.(*natsClient)
c, ok := client.(*NativeClient)
require.True(ok, "wrong class: %T", client)
require.True(c.conn.IsConnected(), "not connected initially")
assert.Equal(server.ID(), c.conn.ConnectedServerId())
@ -197,7 +178,7 @@ func TestNatsClient_MaxReconnects(t *testing.T) { // nolint:paralleltest
}
require.False(c.conn.IsConnected(), "should be disconnected after server shutdown")
server, _ = startLocalNatsServerPort(t, port)
server, _ = StartLocalServerPort(t, port)
// Wait for automatic reconnection
for i := 0; i < 1000 && !c.conn.IsConnected(); i++ {

74
nats/test_helpers.go Normal file
View file

@ -0,0 +1,74 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2025 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package nats
import (
"context"
"testing"
"time"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats-server/v2/test"
"github.com/stretchr/testify/assert"
)
func StartLocalServer(t *testing.T) (*server.Server, int) {
t.Helper()
return StartLocalServerPort(t, server.RANDOM_PORT)
}
func StartLocalServerPort(t *testing.T, port int) (*server.Server, int) {
t.Helper()
opts := test.DefaultTestOptions
opts.Port = port
opts.Cluster.Name = "testing"
srv := test.RunServer(&opts)
t.Cleanup(func() {
srv.Shutdown()
srv.WaitForShutdown()
})
return srv, opts.Port
}
func WaitForSubscriptionsEmpty(ctx context.Context, t *testing.T, client Client) {
t.Helper()
if c, ok := client.(*LoopbackClient); assert.True(t, ok, "expected LoopbackNatsClient, got %T", client) {
for {
c.mu.Lock()
count := len(c.subscriptions)
c.mu.Unlock()
if count == 0 {
break
}
select {
case <-ctx.Done():
c.mu.Lock()
assert.NoError(t, ctx.Err(), "Error waiting for subscriptions %+v to terminate", c.subscriptions)
c.mu.Unlock()
return
default:
time.Sleep(time.Millisecond)
}
}
}
}

View file

@ -33,12 +33,12 @@ import (
"sync"
"time"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/nats"
)
const (
@ -235,7 +235,7 @@ func (r *Room) Close() []Session {
func (r *Room) processAsyncNatsMessage(msg *nats.Msg) {
var message AsyncMessage
if err := NatsDecode(msg, &message); err != nil {
if err := nats.Decode(msg, &message); err != nil {
r.logger.Printf("Could not decode NATS message %+v: %s", msg, err)
return
}

View file

@ -40,11 +40,11 @@ import (
"github.com/dlintw/goconf"
"github.com/gorilla/mux"
"github.com/nats-io/nats.go"
signaling "github.com/strukturag/nextcloud-spreed-signaling"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
signalinglog "github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/nats"
)
var (

View file

@ -28,9 +28,9 @@ import (
"net/url"
"sync/atomic"
"github.com/nats-io/nats.go"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/nats"
)
const (
@ -310,7 +310,7 @@ func (s *VirtualSession) Options() *AddSessionOptions {
func (s *VirtualSession) processAsyncNatsMessage(msg *nats.Msg) {
var message AsyncMessage
if err := NatsDecode(msg, &message); err != nil {
if err := nats.Decode(msg, &message); err != nil {
s.logger.Printf("Could not decode NATS message %+v: %s", msg, err)
return
}