diff --git a/.codecov.yml b/.codecov.yml index ef152ef..b3f0da5 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -46,6 +46,10 @@ component_management: name: metrics paths: - metrics/** + - component_id: module_nats + name: nats + paths: + - nats/** - component_id: module_proxy name: proxy paths: diff --git a/async_events.go b/async_events.go index c8e2284..e86a07c 100644 --- a/async_events.go +++ b/async_events.go @@ -25,9 +25,8 @@ import ( "context" "errors" - "github.com/nats-io/nats.go" - "github.com/strukturag/nextcloud-spreed-signaling/log" + "github.com/strukturag/nextcloud-spreed-signaling/nats" ) var ( @@ -66,7 +65,7 @@ type AsyncEvents interface { } func NewAsyncEvents(ctx context.Context, url string) (AsyncEvents, error) { - client, err := NewNatsClient(ctx, url) + client, err := nats.NewClient(ctx, url) if err != nil { return nil, err } diff --git a/async_events_nats.go b/async_events_nats.go index b89a4ab..81edba0 100644 --- a/async_events_nats.go +++ b/async_events_nats.go @@ -27,44 +27,43 @@ import ( "sync" "time" - "github.com/nats-io/nats.go" - "github.com/strukturag/nextcloud-spreed-signaling/log" + "github.com/strukturag/nextcloud-spreed-signaling/nats" ) func GetSubjectForBackendRoomId(roomId string, backend *Backend) string { if backend == nil || backend.IsCompat() { - return GetEncodedSubject("backend.room", roomId) + return nats.GetEncodedSubject("backend.room", roomId) } - return GetEncodedSubject("backend.room", roomId+"|"+backend.Id()) + return nats.GetEncodedSubject("backend.room", roomId+"|"+backend.Id()) } func GetSubjectForRoomId(roomId string, backend *Backend) string { if backend == nil || backend.IsCompat() { - return GetEncodedSubject("room", roomId) + return nats.GetEncodedSubject("room", roomId) } - return GetEncodedSubject("room", roomId+"|"+backend.Id()) + return nats.GetEncodedSubject("room", roomId+"|"+backend.Id()) } func GetSubjectForUserId(userId string, backend *Backend) string { if backend == nil || backend.IsCompat() { - return GetEncodedSubject("user", userId) + return nats.GetEncodedSubject("user", userId) } - return GetEncodedSubject("user", userId+"|"+backend.Id()) + return nats.GetEncodedSubject("user", userId+"|"+backend.Id()) } func GetSubjectForSessionId(sessionId PublicSessionId, backend *Backend) string { return string("session." + sessionId) } -type asyncEventsNatsSubscriptions map[string]map[AsyncEventListener]NatsSubscription +type asyncEventsNatsSubscriptions map[string]map[AsyncEventListener]nats.Subscription type asyncEventsNats struct { mu sync.Mutex - client NatsClient + client nats.Client logger log.Logger // +checklocksignore // +checklocks:mu @@ -77,7 +76,7 @@ type asyncEventsNats struct { sessionSubscriptions asyncEventsNatsSubscriptions } -func NewAsyncEventsNats(logger log.Logger, client NatsClient) (AsyncEvents, error) { +func NewAsyncEventsNats(logger log.Logger, client nats.Client) (AsyncEvents, error) { events := &asyncEventsNats{ client: client, logger: logger, @@ -91,28 +90,29 @@ func NewAsyncEventsNats(logger log.Logger, client NatsClient) (AsyncEvents, erro } func (e *asyncEventsNats) GetServerInfoNats() *BackendServerInfoNats { - var nats *BackendServerInfoNats + // TODO: This should call a method on "e.client" directly instead of having a type switch. + var result *BackendServerInfoNats switch n := e.client.(type) { - case *natsClient: - nats = &BackendServerInfoNats{ - Urls: n.conn.Servers(), + case *nats.NativeClient: + result = &BackendServerInfoNats{ + Urls: n.URLs(), } - if c := n.conn; c.IsConnected() { - nats.Connected = true - nats.ServerUrl = c.ConnectedUrl() - nats.ServerID = c.ConnectedServerId() - nats.ServerVersion = c.ConnectedServerVersion() - nats.ClusterName = c.ConnectedClusterName() + if n.IsConnected() { + result.Connected = true + result.ServerUrl = n.ConnectedUrl() + result.ServerID = n.ConnectedServerId() + result.ServerVersion = n.ConnectedServerVersion() + result.ClusterName = n.ConnectedClusterName() } - case *LoopbackNatsClient: - nats = &BackendServerInfoNats{ - Urls: []string{NatsLoopbackUrl}, + case *nats.LoopbackClient: + result = &BackendServerInfoNats{ + Urls: []string{nats.LoopbackUrl}, Connected: true, - ServerUrl: NatsLoopbackUrl, + ServerUrl: nats.LoopbackUrl, } } - return nats + return result } func closeSubscriptions(logger log.Logger, wg *sync.WaitGroup, subscriptions asyncEventsNatsSubscriptions) { @@ -153,7 +153,7 @@ func (e *asyncEventsNats) Close(ctx context.Context) error { func (e *asyncEventsNats) registerListener(key string, subscriptions asyncEventsNatsSubscriptions, listener AsyncEventListener) error { subs, found := subscriptions[key] if !found { - subs = make(map[AsyncEventListener]NatsSubscription) + subs = make(map[AsyncEventListener]nats.Subscription) subscriptions[key] = subs } else if _, found := subs[listener]; found { return ErrAlreadyRegistered diff --git a/async_events_test.go b/async_events_test.go index 107a41d..dead7c0 100644 --- a/async_events_test.go +++ b/async_events_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/require" "github.com/strukturag/nextcloud-spreed-signaling/log" + "github.com/strukturag/nextcloud-spreed-signaling/nats" ) var ( @@ -58,7 +59,7 @@ func getAsyncEventsForTest(t *testing.T) AsyncEvents { func getRealAsyncEventsForTest(t *testing.T) AsyncEvents { logger := log.NewLoggerForTest(t) ctx := log.NewLoggerContext(t.Context(), logger) - server, _ := startLocalNatsServer(t) + server, _ := nats.StartLocalServer(t) events, err := NewAsyncEvents(ctx, server.ClientURL()) if err != nil { require.NoError(t, err) @@ -69,7 +70,7 @@ func getRealAsyncEventsForTest(t *testing.T) AsyncEvents { func getLoopbackAsyncEventsForTest(t *testing.T) AsyncEvents { logger := log.NewLoggerForTest(t) ctx := log.NewLoggerContext(t.Context(), logger) - events, err := NewAsyncEvents(ctx, NatsLoopbackUrl) + events, err := NewAsyncEvents(ctx, nats.LoopbackUrl) if err != nil { require.NoError(t, err) } @@ -78,8 +79,8 @@ func getLoopbackAsyncEventsForTest(t *testing.T) AsyncEvents { ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() - nats := (events.(*asyncEventsNats)).client - (nats).(*LoopbackNatsClient).waitForSubscriptionsEmpty(ctx, t) + client := (events.(*asyncEventsNats)).client + nats.WaitForSubscriptionsEmpty(ctx, t, client) }) return events } @@ -87,17 +88,17 @@ func getLoopbackAsyncEventsForTest(t *testing.T) AsyncEvents { func waitForAsyncEventsFlushed(ctx context.Context, t *testing.T, events AsyncEvents) { t.Helper() - nats, ok := (events.(*asyncEventsNats)) + e, ok := (events.(*asyncEventsNats)) if !ok { // Only can wait for NATS events. return } - client, ok := nats.client.(*natsClient) + client, ok := e.client.(*nats.NativeClient) if !ok { // The loopback NATS clients is executing all events synchronously. return } - assert.NoError(t, client.conn.FlushWithContext(ctx)) + assert.NoError(t, client.FlushWithContext(ctx)) } diff --git a/backend_server_test.go b/backend_server_test.go index aaddcf5..502f6ab 100644 --- a/backend_server_test.go +++ b/backend_server_test.go @@ -48,6 +48,7 @@ import ( "github.com/strukturag/nextcloud-spreed-signaling/api" "github.com/strukturag/nextcloud-spreed-signaling/log" + "github.com/strukturag/nextcloud-spreed-signaling/nats" ) var ( @@ -143,7 +144,7 @@ func CreateBackendServerWithClusteringForTestFromConfig(t *testing.T, config1 *g server2.Close() }) - nats, _ := startLocalNatsServer(t) + nats, _ := nats.StartLocalServer(t) grpcServer1, addr1 := NewGrpcServerForTest(t) grpcServer2, addr2 := NewGrpcServerForTest(t) @@ -248,7 +249,7 @@ func expectRoomlistEvent(t *testing.T, ch AsyncChannel, msgType string) (*EventS select { case natsMsg := <-ch: var message AsyncMessage - if !assert.NoError(NatsDecode(natsMsg, &message)) || + if !assert.NoError(nats.Decode(natsMsg, &message)) || !assert.Equal("message", message.Type, "invalid message type, got %+v", message) || !assert.NotNil(message.Message, "message missing, got %+v", message) { return nil, false diff --git a/clientsession.go b/clientsession.go index 47c5165..fad2331 100644 --- a/clientsession.go +++ b/clientsession.go @@ -33,12 +33,12 @@ import ( "sync/atomic" "time" - "github.com/nats-io/nats.go" "github.com/pion/sdp/v3" "github.com/strukturag/nextcloud-spreed-signaling/api" "github.com/strukturag/nextcloud-spreed-signaling/async" "github.com/strukturag/nextcloud-spreed-signaling/log" + "github.com/strukturag/nextcloud-spreed-signaling/nats" ) var ( @@ -1070,7 +1070,7 @@ func (s *ClientSession) GetSubscriber(id PublicSessionId, streamType StreamType) func (s *ClientSession) processAsyncNatsMessage(msg *nats.Msg) { var message AsyncMessage - if err := NatsDecode(msg, &message); err != nil { + if err := nats.Decode(msg, &message); err != nil { s.logger.Printf("Could not decode NATS message %+v: %s", msg, err) return } diff --git a/hub_test.go b/hub_test.go index ef1bd84..2fc6f3a 100644 --- a/hub_test.go +++ b/hub_test.go @@ -57,6 +57,7 @@ import ( "github.com/strukturag/nextcloud-spreed-signaling/container" "github.com/strukturag/nextcloud-spreed-signaling/internal" "github.com/strukturag/nextcloud-spreed-signaling/log" + "github.com/strukturag/nextcloud-spreed-signaling/nats" "github.com/strukturag/nextcloud-spreed-signaling/test" ) @@ -226,10 +227,10 @@ func CreateClusteredHubsForTestWithConfig(t *testing.T, getConfigFunc func(*http server2.Close() }) - nats1, _ := startLocalNatsServer(t) + nats1, _ := nats.StartLocalServer(t) var nats2 *server.Server if strings.Contains(t.Name(), "Federation") { - nats2, _ = startLocalNatsServer(t) + nats2, _ = nats.StartLocalServer(t) } else { nats2 = nats1 } diff --git a/natsclient.go b/nats/client.go similarity index 65% rename from natsclient.go rename to nats/client.go index 070d5e0..12dd75f 100644 --- a/natsclient.go +++ b/nats/client.go @@ -1,6 +1,6 @@ /** * Standalone signaling server for the Nextcloud Spreed app. - * Copyright (C) 2017 struktur AG + * Copyright (C) 2025 struktur AG * * @author Joachim Bauch * @@ -19,21 +19,19 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package signaling +package nats import ( "context" "encoding/base64" "encoding/json" "errors" - "net/url" "os" "os/signal" "strings" "time" "github.com/nats-io/nats.go" - "github.com/strukturag/nextcloud-spreed-signaling/async" "github.com/strukturag/nextcloud-spreed-signaling/log" ) @@ -42,17 +40,25 @@ const ( initialConnectInterval = time.Second maxConnectInterval = 8 * time.Second - NatsLoopbackUrl = "nats://loopback" + LoopbackUrl = "nats://loopback" + + DefaultURL = nats.DefaultURL ) -type NatsSubscription interface { +var ( + ErrConnectionClosed = nats.ErrConnectionClosed +) + +type Msg = nats.Msg + +type Subscription interface { Unsubscribe() error } -type NatsClient interface { +type Client interface { Close(ctx context.Context) error - Subscribe(subject string, ch chan *nats.Msg) (NatsSubscription, error) + Subscribe(subject string, ch chan *Msg) (Subscription, error) Publish(subject string, message any) error } @@ -63,21 +69,15 @@ func GetEncodedSubject(prefix string, suffix string) string { return prefix + "." + base64.StdEncoding.EncodeToString([]byte(suffix)) } -type natsClient struct { - logger log.Logger - conn *nats.Conn - closed chan struct{} -} - -func NewNatsClient(ctx context.Context, url string, options ...nats.Option) (NatsClient, error) { +func NewClient(ctx context.Context, url string, options ...nats.Option) (Client, error) { logger := log.LoggerFromContext(ctx) if url == ":loopback:" { - logger.Printf("WARNING: events url %s is deprecated, please use %s instead", url, NatsLoopbackUrl) - url = NatsLoopbackUrl + logger.Printf("WARNING: events url %s is deprecated, please use %s instead", url, LoopbackUrl) + url = LoopbackUrl } - if url == NatsLoopbackUrl { + if url == LoopbackUrl { logger.Println("Using internal NATS loopback client") - return NewLoopbackNatsClient(logger) + return NewLoopbackClient(logger) } backoff, err := async.NewExponentialBackoff(initialConnectInterval, maxConnectInterval) @@ -85,7 +85,7 @@ func NewNatsClient(ctx context.Context, url string, options ...nats.Option) (Nat return nil, err } - client := &natsClient{ + client := &NativeClient{ logger: logger, closed: make(chan struct{}), } @@ -115,47 +115,7 @@ func NewNatsClient(ctx context.Context, url string, options ...nats.Option) (Nat return client, nil } -func (c *natsClient) Close(ctx context.Context) error { - c.conn.Close() - select { - case <-c.closed: - return nil - case <-ctx.Done(): - return ctx.Err() - } -} - -func (c *natsClient) onClosed(conn *nats.Conn) { - if err := conn.LastError(); err != nil { - c.logger.Printf("NATS client closed, last error %s", conn.LastError()) - } else { - c.logger.Println("NATS client closed") - } - close(c.closed) -} - -func (c *natsClient) onDisconnected(conn *nats.Conn) { - c.logger.Println("NATS client disconnected") -} - -func (c *natsClient) onReconnected(conn *nats.Conn) { - c.logger.Printf("NATS client reconnected to %s (%s)", conn.ConnectedUrl(), conn.ConnectedServerId()) -} - -func (c *natsClient) Subscribe(subject string, ch chan *nats.Msg) (NatsSubscription, error) { - return c.conn.ChanSubscribe(subject, ch) -} - -func (c *natsClient) Publish(subject string, message any) error { - data, err := json.Marshal(message) - if err != nil { - return err - } - - return c.conn.Publish(subject, data) -} - -func NatsDecode(msg *nats.Msg, vPtr any) (err error) { +func Decode(msg *nats.Msg, vPtr any) (err error) { switch arg := vPtr.(type) { case *string: // If they want a string and it is a JSON string, strip quotes @@ -174,11 +134,3 @@ func NatsDecode(msg *nats.Msg, vPtr any) (err error) { } return } - -func removeURLCredentials(u string) string { - if u, err := url.Parse(u); err == nil && u.User != nil { - u.User = url.User("***") - return u.String() - } - return u -} diff --git a/nats/client_test.go b/nats/client_test.go new file mode 100644 index 0000000..06894c5 --- /dev/null +++ b/nats/client_test.go @@ -0,0 +1,159 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2026 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package nats + +import ( + "testing" + + "github.com/nats-io/nats.go" + "github.com/stretchr/testify/assert" +) + +func TestGetEncodedSubject(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + encoded := GetEncodedSubject("foo", "this is the subject") + assert.NotContains(encoded, " ") + + encoded = GetEncodedSubject("foo", "this-is-the-subject") + assert.NotContains(encoded, "this-is") +} + +func TestDecodeToString(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + testcases := []struct { + data []byte + expected string + }{ + { + []byte(`""`), + "", + }, + { + []byte(`"foo"`), + "foo", + }, + { + []byte(`{"type":"foo"}`), + `{"type":"foo"}`, + }, + { + []byte(`1234`), + "1234", + }, + } + + for idx, tc := range testcases { + var dest string + if assert.NoError(Decode(&nats.Msg{ + Data: tc.data, + }, &dest), "decoding failed for test %d (%s)", idx, string(tc.data)) { + assert.Equal(tc.expected, dest, "failed for test %s (%s)", idx, string(tc.data)) + } + } +} + +func TestDecodeToByteSlice(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + testcases := []struct { + data []byte + expected []byte + }{ + { + []byte(``), + []byte{}, + }, + { + []byte(`""`), + []byte(`""`), + }, + { + []byte(`"foo"`), + []byte(`"foo"`), + }, + { + []byte(`{"type":"foo"}`), + []byte(`{"type":"foo"}`), + }, + { + []byte(`1234`), + []byte(`1234`), + }, + } + + for idx, tc := range testcases { + var dest []byte + if assert.NoError(Decode(&nats.Msg{ + Data: tc.data, + }, &dest), "decoding failed for test %d (%s)", idx, string(tc.data)) { + assert.Equal(tc.expected, dest, "failed for test %s (%s)", idx, string(tc.data)) + } + } +} + +func TestDecodeRegular(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + type testdata struct { + Type string `json:"type"` + Value any `json:"value"` + } + + testcases := []struct { + data []byte + expected *testdata + }{ + { + []byte(`null`), + nil, + }, + { + []byte(`{"value":"bar","type":"foo"}`), + &testdata{ + Type: "foo", + Value: "bar", + }, + }, + { + []byte(`{"value":123,"type":"foo"}`), + &testdata{ + Type: "foo", + Value: float64(123), + }, + }, + } + + for idx, tc := range testcases { + var dest *testdata + if assert.NoError(Decode(&nats.Msg{ + Data: tc.data, + }, &dest), "decoding failed for test %d (%s)", idx, string(tc.data)) { + assert.Equal(tc.expected, dest, "failed for test %s (%s)", idx, string(tc.data)) + } + } +} diff --git a/natsclient_loopback.go b/nats/loopback.go similarity index 73% rename from natsclient_loopback.go rename to nats/loopback.go index bd2deef..dba2955 100644 --- a/natsclient_loopback.go +++ b/nats/loopback.go @@ -19,7 +19,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package signaling +package nats import ( "container/list" @@ -33,14 +33,14 @@ import ( "github.com/strukturag/nextcloud-spreed-signaling/log" ) -type LoopbackNatsClient struct { +type LoopbackClient struct { logger log.Logger mu sync.Mutex closed chan struct{} // +checklocks:mu - subscriptions map[string]map[*loopbackNatsSubscription]bool + subscriptions map[string]map[*loopbackSubscription]bool // +checklocks:mu wakeup sync.Cond @@ -48,19 +48,19 @@ type LoopbackNatsClient struct { incoming list.List } -func NewLoopbackNatsClient(logger log.Logger) (NatsClient, error) { - client := &LoopbackNatsClient{ +func NewLoopbackClient(logger log.Logger) (Client, error) { + client := &LoopbackClient{ logger: logger, closed: make(chan struct{}), - subscriptions: make(map[string]map[*loopbackNatsSubscription]bool), + subscriptions: make(map[string]map[*loopbackSubscription]bool), } client.wakeup.L = &client.mu go client.processMessages() return client, nil } -func (c *LoopbackNatsClient) processMessages() { +func (c *LoopbackClient) processMessages() { defer close(c.closed) c.mu.Lock() @@ -74,19 +74,19 @@ func (c *LoopbackNatsClient) processMessages() { break } - msg := c.incoming.Remove(c.incoming.Front()).(*nats.Msg) + msg := c.incoming.Remove(c.incoming.Front()).(*Msg) c.processMessage(msg) } } // +checklocks:c.mu -func (c *LoopbackNatsClient) processMessage(msg *nats.Msg) { +func (c *LoopbackClient) processMessage(msg *Msg) { subs, found := c.subscriptions[msg.Subject] if !found { return } - channels := make([]chan *nats.Msg, 0, len(subs)) + channels := make([]chan *Msg, 0, len(subs)) for sub := range subs { channels = append(channels, sub.ch) } @@ -101,7 +101,7 @@ func (c *LoopbackNatsClient) processMessage(msg *nats.Msg) { } } -func (c *LoopbackNatsClient) doClose() { +func (c *LoopbackClient) doClose() { c.mu.Lock() defer c.mu.Unlock() @@ -110,7 +110,7 @@ func (c *LoopbackNatsClient) doClose() { c.wakeup.Signal() } -func (c *LoopbackNatsClient) Close(ctx context.Context) error { +func (c *LoopbackClient) Close(ctx context.Context) error { c.doClose() select { case <-c.closed: @@ -120,19 +120,19 @@ func (c *LoopbackNatsClient) Close(ctx context.Context) error { } } -type loopbackNatsSubscription struct { +type loopbackSubscription struct { subject string - client *LoopbackNatsClient + client *LoopbackClient - ch chan *nats.Msg + ch chan *Msg } -func (s *loopbackNatsSubscription) Unsubscribe() error { +func (s *loopbackSubscription) Unsubscribe() error { s.client.unsubscribe(s) return nil } -func (c *LoopbackNatsClient) Subscribe(subject string, ch chan *nats.Msg) (NatsSubscription, error) { +func (c *LoopbackClient) Subscribe(subject string, ch chan *Msg) (Subscription, error) { if strings.HasSuffix(subject, ".") || strings.Contains(subject, " ") { return nil, nats.ErrBadSubject } @@ -143,14 +143,14 @@ func (c *LoopbackNatsClient) Subscribe(subject string, ch chan *nats.Msg) (NatsS return nil, nats.ErrConnectionClosed } - s := &loopbackNatsSubscription{ + s := &loopbackSubscription{ subject: subject, client: c, ch: ch, } subs, found := c.subscriptions[subject] if !found { - subs = make(map[*loopbackNatsSubscription]bool) + subs = make(map[*loopbackSubscription]bool) c.subscriptions[subject] = subs } subs[s] = true @@ -158,7 +158,7 @@ func (c *LoopbackNatsClient) Subscribe(subject string, ch chan *nats.Msg) (NatsS return s, nil } -func (c *LoopbackNatsClient) unsubscribe(s *loopbackNatsSubscription) { +func (c *LoopbackClient) unsubscribe(s *loopbackSubscription) { c.mu.Lock() defer c.mu.Unlock() @@ -170,7 +170,7 @@ func (c *LoopbackNatsClient) unsubscribe(s *loopbackNatsSubscription) { } } -func (c *LoopbackNatsClient) Publish(subject string, message any) error { +func (c *LoopbackClient) Publish(subject string, message any) error { if strings.HasSuffix(subject, ".") || strings.Contains(subject, " ") { return nats.ErrBadSubject } @@ -181,7 +181,7 @@ func (c *LoopbackNatsClient) Publish(subject string, message any) error { return nats.ErrConnectionClosed } - msg := &nats.Msg{ + msg := &Msg{ Subject: subject, } var err error diff --git a/natsclient_loopback_test.go b/nats/loopback_test.go similarity index 61% rename from natsclient_loopback_test.go rename to nats/loopback_test.go index bfb3bf0..01357f0 100644 --- a/natsclient_loopback_test.go +++ b/nats/loopback_test.go @@ -19,7 +19,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package signaling +package nats import ( "context" @@ -32,30 +32,9 @@ import ( "github.com/strukturag/nextcloud-spreed-signaling/log" ) -func (c *LoopbackNatsClient) waitForSubscriptionsEmpty(ctx context.Context, t *testing.T) { - for { - c.mu.Lock() - count := len(c.subscriptions) - c.mu.Unlock() - if count == 0 { - break - } - - select { - case <-ctx.Done(): - c.mu.Lock() - assert.NoError(t, ctx.Err(), "Error waiting for subscriptions %+v to terminate", c.subscriptions) - c.mu.Unlock() - return - default: - time.Sleep(time.Millisecond) - } - } -} - -func CreateLoopbackNatsClientForTest(t *testing.T) NatsClient { +func CreateLoopbackClientForTest(t *testing.T) Client { logger := log.NewLoggerForTest(t) - result, err := NewLoopbackNatsClient(logger) + result, err := NewLoopbackClient(logger) require.NoError(t, err) t.Cleanup(func() { ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -65,30 +44,30 @@ func CreateLoopbackNatsClientForTest(t *testing.T) NatsClient { return result } -func TestLoopbackNatsClient_Subscribe(t *testing.T) { +func TestLoopbackClient_Subscribe(t *testing.T) { t.Parallel() - client := CreateLoopbackNatsClientForTest(t) - testNatsClient_Subscribe(t, client) + client := CreateLoopbackClientForTest(t) + testClient_Subscribe(t, client) } func TestLoopbackClient_PublishAfterClose(t *testing.T) { t.Parallel() - client := CreateLoopbackNatsClientForTest(t) - testNatsClient_PublishAfterClose(t, client) + client := CreateLoopbackClientForTest(t) + test_PublishAfterClose(t, client) } func TestLoopbackClient_SubscribeAfterClose(t *testing.T) { t.Parallel() - client := CreateLoopbackNatsClientForTest(t) - testNatsClient_SubscribeAfterClose(t, client) + client := CreateLoopbackClientForTest(t) + testClient_SubscribeAfterClose(t, client) } func TestLoopbackClient_BadSubjects(t *testing.T) { t.Parallel() - client := CreateLoopbackNatsClientForTest(t) - testNatsClient_BadSubjects(t, client) + client := CreateLoopbackClientForTest(t) + testClient_BadSubjects(t, client) } diff --git a/nats/native.go b/nats/native.go new file mode 100644 index 0000000..fdf36a7 --- /dev/null +++ b/nats/native.go @@ -0,0 +1,114 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2017 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package nats + +import ( + "context" + "encoding/json" + "net/url" + + "github.com/nats-io/nats.go" + + "github.com/strukturag/nextcloud-spreed-signaling/log" +) + +type NativeClient struct { + logger log.Logger + conn *nats.Conn + closed chan struct{} +} + +func (c *NativeClient) URLs() []string { + return c.conn.Servers() +} + +func (c *NativeClient) IsConnected() bool { + return c.conn.IsConnected() +} + +func (c *NativeClient) ConnectedUrl() string { + return c.conn.ConnectedUrl() +} + +func (c *NativeClient) ConnectedServerId() string { + return c.conn.ConnectedServerId() +} + +func (c *NativeClient) ConnectedServerVersion() string { + return c.conn.ConnectedServerVersion() +} + +func (c *NativeClient) ConnectedClusterName() string { + return c.conn.ConnectedClusterName() +} + +func (c *NativeClient) Close(ctx context.Context) error { + c.conn.Close() + select { + case <-c.closed: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (c *NativeClient) FlushWithContext(ctx context.Context) error { + return c.conn.FlushWithContext(ctx) +} + +func (c *NativeClient) onClosed(conn *nats.Conn) { + if err := conn.LastError(); err != nil { + c.logger.Printf("NATS client closed, last error %s", conn.LastError()) + } else { + c.logger.Println("NATS client closed") + } + close(c.closed) +} + +func (c *NativeClient) onDisconnected(conn *nats.Conn) { + c.logger.Println("NATS client disconnected") +} + +func (c *NativeClient) onReconnected(conn *nats.Conn) { + c.logger.Printf("NATS client reconnected to %s (%s)", conn.ConnectedUrl(), conn.ConnectedServerId()) +} + +func (c *NativeClient) Subscribe(subject string, ch chan *Msg) (Subscription, error) { + return c.conn.ChanSubscribe(subject, ch) +} + +func (c *NativeClient) Publish(subject string, message any) error { + data, err := json.Marshal(message) + if err != nil { + return err + } + + return c.conn.Publish(subject, data) +} + +func removeURLCredentials(u string) string { + if u, err := url.Parse(u); err == nil && u.User != nil { + u.User = url.User("***") + return u.String() + } + return u +} diff --git a/natsclient_test.go b/nats/native_test.go similarity index 67% rename from natsclient_test.go rename to nats/native_test.go index e4ae1ae..fe7a56c 100644 --- a/natsclient_test.go +++ b/nats/native_test.go @@ -19,7 +19,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package signaling +package nats import ( "context" @@ -27,41 +27,22 @@ import ( "testing" "time" - "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/nats-io/nats-server/v2/server" - natsserver "github.com/nats-io/nats-server/v2/test" + "github.com/nats-io/nats.go" "github.com/strukturag/nextcloud-spreed-signaling/log" "github.com/strukturag/nextcloud-spreed-signaling/test" ) -func startLocalNatsServer(t *testing.T) (*server.Server, int) { +func CreateLocalClientForTest(t *testing.T, options ...nats.Option) (*server.Server, int, Client) { t.Helper() - return startLocalNatsServerPort(t, server.RANDOM_PORT) -} - -func startLocalNatsServerPort(t *testing.T, port int) (*server.Server, int) { - t.Helper() - opts := natsserver.DefaultTestOptions - opts.Port = port - opts.Cluster.Name = "testing" - srv := natsserver.RunServer(&opts) - t.Cleanup(func() { - srv.Shutdown() - srv.WaitForShutdown() - }) - return srv, opts.Port -} - -func CreateLocalNatsClientForTest(t *testing.T, options ...nats.Option) (*server.Server, int, NatsClient) { - t.Helper() - server, port := startLocalNatsServer(t) + server, port := StartLocalServer(t) logger := log.NewLoggerForTest(t) ctx := log.NewLoggerContext(t.Context(), logger) - result, err := NewNatsClient(ctx, server.ClientURL(), options...) + result, err := NewClient(ctx, server.ClientURL(), options...) require.NoError(t, err) t.Cleanup(func() { ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -71,10 +52,10 @@ func CreateLocalNatsClientForTest(t *testing.T, options ...nats.Option) (*server return server, port, result } -func testNatsClient_Subscribe(t *testing.T, client NatsClient) { +func testClient_Subscribe(t *testing.T, client Client) { require := require.New(t) assert := assert.New(t) - dest := make(chan *nats.Msg) + dest := make(chan *Msg) sub, err := client.Subscribe("foo", dest) require.NoError(err) ch := make(chan struct{}) @@ -113,76 +94,76 @@ func testNatsClient_Subscribe(t *testing.T, client NatsClient) { require.Equal(maxPublish, received.Load(), "Received wrong # of messages") } -func TestNatsClient_Subscribe(t *testing.T) { // nolint:paralleltest +func TestClient_Subscribe(t *testing.T) { // nolint:paralleltest test.EnsureNoGoroutinesLeak(t, func(t *testing.T) { - _, _, client := CreateLocalNatsClientForTest(t) + _, _, client := CreateLocalClientForTest(t) - testNatsClient_Subscribe(t, client) + testClient_Subscribe(t, client) }) } -func testNatsClient_PublishAfterClose(t *testing.T, client NatsClient) { +func test_PublishAfterClose(t *testing.T, client Client) { assert.NoError(t, client.Close(t.Context())) assert.ErrorIs(t, client.Publish("foo", "bar"), nats.ErrConnectionClosed) } -func TestNatsClient_PublishAfterClose(t *testing.T) { // nolint:paralleltest +func TestClient_PublishAfterClose(t *testing.T) { // nolint:paralleltest test.EnsureNoGoroutinesLeak(t, func(t *testing.T) { - _, _, client := CreateLocalNatsClientForTest(t) + _, _, client := CreateLocalClientForTest(t) - testNatsClient_PublishAfterClose(t, client) + test_PublishAfterClose(t, client) }) } -func testNatsClient_SubscribeAfterClose(t *testing.T, client NatsClient) { +func testClient_SubscribeAfterClose(t *testing.T, client Client) { assert.NoError(t, client.Close(t.Context())) - ch := make(chan *nats.Msg) + ch := make(chan *Msg) _, err := client.Subscribe("foo", ch) assert.ErrorIs(t, err, nats.ErrConnectionClosed) } -func TestNatsClient_SubscribeAfterClose(t *testing.T) { // nolint:paralleltest +func TestClient_SubscribeAfterClose(t *testing.T) { // nolint:paralleltest test.EnsureNoGoroutinesLeak(t, func(t *testing.T) { - _, _, client := CreateLocalNatsClientForTest(t) + _, _, client := CreateLocalClientForTest(t) - testNatsClient_SubscribeAfterClose(t, client) + testClient_SubscribeAfterClose(t, client) }) } -func testNatsClient_BadSubjects(t *testing.T, client NatsClient) { +func testClient_BadSubjects(t *testing.T, client Client) { assert := assert.New(t) subjects := []string{ "foo bar", "foo.", } - ch := make(chan *nats.Msg) + ch := make(chan *Msg) for _, s := range subjects { _, err := client.Subscribe(s, ch) assert.ErrorIs(err, nats.ErrBadSubject, "Expected error for subject %s", s) } } -func TestNatsClient_BadSubjects(t *testing.T) { // nolint:paralleltest +func TestClient_BadSubjects(t *testing.T) { // nolint:paralleltest test.EnsureNoGoroutinesLeak(t, func(t *testing.T) { - _, _, client := CreateLocalNatsClientForTest(t) + _, _, client := CreateLocalClientForTest(t) - testNatsClient_BadSubjects(t, client) + testClient_BadSubjects(t, client) }) } -func TestNatsClient_MaxReconnects(t *testing.T) { // nolint:paralleltest +func TestClient_MaxReconnects(t *testing.T) { // nolint:paralleltest test.EnsureNoGoroutinesLeak(t, func(t *testing.T) { assert := assert.New(t) require := require.New(t) reconnectWait := time.Millisecond - server, port, client := CreateLocalNatsClientForTest(t, + server, port, client := CreateLocalClientForTest(t, nats.ReconnectWait(reconnectWait), nats.ReconnectJitter(0, 0), ) - c, ok := client.(*natsClient) + c, ok := client.(*NativeClient) require.True(ok, "wrong class: %T", client) require.True(c.conn.IsConnected(), "not connected initially") assert.Equal(server.ID(), c.conn.ConnectedServerId()) @@ -197,7 +178,7 @@ func TestNatsClient_MaxReconnects(t *testing.T) { // nolint:paralleltest } require.False(c.conn.IsConnected(), "should be disconnected after server shutdown") - server, _ = startLocalNatsServerPort(t, port) + server, _ = StartLocalServerPort(t, port) // Wait for automatic reconnection for i := 0; i < 1000 && !c.conn.IsConnected(); i++ { diff --git a/nats/test_helpers.go b/nats/test_helpers.go new file mode 100644 index 0000000..3dc7d9f --- /dev/null +++ b/nats/test_helpers.go @@ -0,0 +1,74 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2025 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package nats + +import ( + "context" + "testing" + "time" + + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats-server/v2/test" + "github.com/stretchr/testify/assert" +) + +func StartLocalServer(t *testing.T) (*server.Server, int) { + t.Helper() + return StartLocalServerPort(t, server.RANDOM_PORT) +} + +func StartLocalServerPort(t *testing.T, port int) (*server.Server, int) { + t.Helper() + opts := test.DefaultTestOptions + opts.Port = port + opts.Cluster.Name = "testing" + srv := test.RunServer(&opts) + t.Cleanup(func() { + srv.Shutdown() + srv.WaitForShutdown() + }) + return srv, opts.Port +} + +func WaitForSubscriptionsEmpty(ctx context.Context, t *testing.T, client Client) { + t.Helper() + if c, ok := client.(*LoopbackClient); assert.True(t, ok, "expected LoopbackNatsClient, got %T", client) { + for { + c.mu.Lock() + count := len(c.subscriptions) + c.mu.Unlock() + if count == 0 { + break + } + + select { + case <-ctx.Done(): + c.mu.Lock() + assert.NoError(t, ctx.Err(), "Error waiting for subscriptions %+v to terminate", c.subscriptions) + c.mu.Unlock() + return + default: + time.Sleep(time.Millisecond) + } + } + } +} diff --git a/room.go b/room.go index 5688316..218d42e 100644 --- a/room.go +++ b/room.go @@ -33,12 +33,12 @@ import ( "sync" "time" - "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" "github.com/strukturag/nextcloud-spreed-signaling/api" "github.com/strukturag/nextcloud-spreed-signaling/internal" "github.com/strukturag/nextcloud-spreed-signaling/log" + "github.com/strukturag/nextcloud-spreed-signaling/nats" ) const ( @@ -235,7 +235,7 @@ func (r *Room) Close() []Session { func (r *Room) processAsyncNatsMessage(msg *nats.Msg) { var message AsyncMessage - if err := NatsDecode(msg, &message); err != nil { + if err := nats.Decode(msg, &message); err != nil { r.logger.Printf("Could not decode NATS message %+v: %s", msg, err) return } diff --git a/server/main.go b/server/main.go index 02c8835..4f2b7a4 100644 --- a/server/main.go +++ b/server/main.go @@ -40,11 +40,11 @@ import ( "github.com/dlintw/goconf" "github.com/gorilla/mux" - "github.com/nats-io/nats.go" signaling "github.com/strukturag/nextcloud-spreed-signaling" "github.com/strukturag/nextcloud-spreed-signaling/internal" signalinglog "github.com/strukturag/nextcloud-spreed-signaling/log" + "github.com/strukturag/nextcloud-spreed-signaling/nats" ) var ( diff --git a/virtualsession.go b/virtualsession.go index 536655a..112c441 100644 --- a/virtualsession.go +++ b/virtualsession.go @@ -28,9 +28,9 @@ import ( "net/url" "sync/atomic" - "github.com/nats-io/nats.go" "github.com/strukturag/nextcloud-spreed-signaling/api" "github.com/strukturag/nextcloud-spreed-signaling/log" + "github.com/strukturag/nextcloud-spreed-signaling/nats" ) const ( @@ -310,7 +310,7 @@ func (s *VirtualSession) Options() *AddSessionOptions { func (s *VirtualSession) processAsyncNatsMessage(msg *nats.Msg) { var message AsyncMessage - if err := NatsDecode(msg, &message); err != nil { + if err := nats.Decode(msg, &message); err != nil { s.logger.Printf("Could not decode NATS message %+v: %s", msg, err) return }