diff --git a/Makefile b/Makefile index f9085d9..4fae2e7 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ PROTO_FILES := $(filter-out $(GRPC_PROTO_FILES),$(basename $(wildcard *.proto))) PROTO_GO_FILES := $(addsuffix .pb.go,$(PROTO_FILES)) GRPC_PROTO_GO_FILES := $(addsuffix .pb.go,$(GRPC_PROTO_FILES)) $(addsuffix _grpc.pb.go,$(GRPC_PROTO_FILES)) TEST_GO_FILES := $(wildcard *_test.go)) -EASYJSON_FILES := $(filter-out $(TEST_GO_FILES),$(wildcard api*.go api/signaling.go */api.go talk/ocs.go)) +EASYJSON_FILES := $(filter-out $(TEST_GO_FILES),$(wildcard api*.go api/signaling.go */api.go */*/api.go talk/ocs.go)) EASYJSON_GO_FILES := $(patsubst %.go,%_easyjson.go,$(EASYJSON_FILES)) COMMON_GO_FILES := $(filter-out geoip/continentmap.go $(PROTO_GO_FILES) $(GRPC_PROTO_GO_FILES) $(EASYJSON_GO_FILES) $(TEST_GO_FILES),$(wildcard *.go)) CLIENT_TEST_GO_FILES := $(wildcard client/*_test.go)) diff --git a/api_async.go b/async/events/api.go similarity index 99% rename from api_async.go rename to async/events/api.go index d0a3f81..d91049b 100644 --- a/api_async.go +++ b/async/events/api.go @@ -19,7 +19,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package signaling +package events import ( "encoding/json" diff --git a/async_events.go b/async/events/async_events.go similarity index 99% rename from async_events.go rename to async/events/async_events.go index d5fb7ab..30efb9c 100644 --- a/async_events.go +++ b/async/events/async_events.go @@ -19,7 +19,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package signaling +package events import ( "context" diff --git a/async_events_nats.go b/async/events/async_events_nats.go similarity index 99% rename from async_events_nats.go rename to async/events/async_events_nats.go index 565f655..86856c1 100644 --- a/async_events_nats.go +++ b/async/events/async_events_nats.go @@ -19,7 +19,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package signaling +package events import ( "context" @@ -91,6 +91,10 @@ func NewAsyncEventsNats(logger log.Logger, client nats.Client) (AsyncEvents, err return events, nil } +func (e *asyncEventsNats) GetNatsClient() nats.Client { + return e.client +} + func (e *asyncEventsNats) GetServerInfoNats() *talk.BackendServerInfoNats { // TODO: This should call a method on "e.client" directly instead of having a type switch. var result *talk.BackendServerInfoNats diff --git a/async_events_nats_test.go b/async/events/async_events_nats_test.go similarity index 71% rename from async_events_nats_test.go rename to async/events/async_events_nats_test.go index 24bd50f..82d0dda 100644 --- a/async_events_nats_test.go +++ b/async/events/async_events_nats_test.go @@ -19,29 +19,19 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package signaling +package events import ( "testing" - "time" - - "github.com/stretchr/testify/require" + "github.com/strukturag/nextcloud-spreed-signaling/api" + "github.com/strukturag/nextcloud-spreed-signaling/internal" "github.com/strukturag/nextcloud-spreed-signaling/talk" ) func Benchmark_GetSubjectForSessionId(b *testing.B) { - require := require.New(b) backend := talk.NewCompatBackend(nil) - data := &SessionIdData{ - Sid: 1, - Created: time.Now().UnixMicro(), - BackendId: backend.Id(), - } - codec, err := NewSessionIdCodec([]byte("12345678901234567890123456789012"), []byte("09876543210987654321098765432109")) - require.NoError(err) - sid, err := codec.EncodePublic(data) - require.NoError(err, "could not create session id") + sid := api.PublicSessionId(internal.RandomString(256)) for b.Loop() { GetSubjectForSessionId(sid, backend) } diff --git a/async/events/async_events_test.go b/async/events/async_events_test.go new file mode 100644 index 0000000..23beb5b --- /dev/null +++ b/async/events/async_events_test.go @@ -0,0 +1,168 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2025 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package events + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/strukturag/nextcloud-spreed-signaling/api" + "github.com/strukturag/nextcloud-spreed-signaling/log" + "github.com/strukturag/nextcloud-spreed-signaling/nats" + "github.com/strukturag/nextcloud-spreed-signaling/talk" +) + +type TestBackendRoomListener struct { + events AsyncChannel +} + +func (l *TestBackendRoomListener) AsyncChannel() AsyncChannel { + return l.events +} + +func testAsyncEvents(t *testing.T, events AsyncEvents) { + require := require.New(t) + assert := assert.New(t) + t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + assert.NoError(events.Close(ctx)) + }) + + listener := &TestBackendRoomListener{ + events: make(AsyncChannel, 1), + } + + roomId := "1234" + backend := talk.NewCompatBackend(nil) + require.NoError(events.RegisterBackendRoomListener(roomId, backend, listener)) + defer func() { + assert.NoError(events.UnregisterBackendRoomListener(roomId, backend, listener)) + }() + + msg := &AsyncMessage{ + Type: "room", + Room: &talk.BackendServerRoomRequest{ + Type: "test", + }, + } + if assert.NoError(events.PublishBackendRoomMessage(roomId, backend, msg)) { + received := <-listener.events + var receivedMsg AsyncMessage + if assert.NoError(nats.Decode(received, &receivedMsg)) { + assert.True(msg.SendTime.Equal(receivedMsg.SendTime), "send times don't match, expected %s, got %s", msg.SendTime, receivedMsg.SendTime) + receivedMsg.SendTime = msg.SendTime + assert.Equal(msg, &receivedMsg) + } + } + + require.NoError(events.RegisterRoomListener(roomId, backend, listener)) + defer func() { + assert.NoError(events.UnregisterRoomListener(roomId, backend, listener)) + }() + + roomMessage := &AsyncMessage{ + Type: "room", + Room: &talk.BackendServerRoomRequest{ + Type: "other-test", + }, + } + if assert.NoError(events.PublishRoomMessage(roomId, backend, roomMessage)) { + received := <-listener.events + var receivedMsg AsyncMessage + if assert.NoError(nats.Decode(received, &receivedMsg)) { + assert.True(roomMessage.SendTime.Equal(receivedMsg.SendTime), "send times don't match, expected %s, got %s", roomMessage.SendTime, receivedMsg.SendTime) + receivedMsg.SendTime = roomMessage.SendTime + assert.Equal(roomMessage, &receivedMsg) + } + } + + userId := "the-user" + require.NoError(events.RegisterUserListener(userId, backend, listener)) + defer func() { + assert.NoError(events.UnregisterUserListener(userId, backend, listener)) + }() + + userMessage := &AsyncMessage{ + Type: "room", + Room: &talk.BackendServerRoomRequest{ + Type: "user-test", + }, + } + if assert.NoError(events.PublishUserMessage(userId, backend, userMessage)) { + received := <-listener.events + var receivedMsg AsyncMessage + if assert.NoError(nats.Decode(received, &receivedMsg)) { + assert.True(userMessage.SendTime.Equal(receivedMsg.SendTime), "send times don't match, expected %s, got %s", userMessage.SendTime, receivedMsg.SendTime) + receivedMsg.SendTime = userMessage.SendTime + assert.Equal(userMessage, &receivedMsg) + } + } + + sessionId := api.PublicSessionId("the-session") + require.NoError(events.RegisterSessionListener(sessionId, backend, listener)) + defer func() { + assert.NoError(events.UnregisterSessionListener(sessionId, backend, listener)) + }() + + sessionMessage := &AsyncMessage{ + Type: "room", + Room: &talk.BackendServerRoomRequest{ + Type: "session-test", + }, + } + if assert.NoError(events.PublishSessionMessage(sessionId, backend, sessionMessage)) { + received := <-listener.events + var receivedMsg AsyncMessage + if assert.NoError(nats.Decode(received, &receivedMsg)) { + assert.True(sessionMessage.SendTime.Equal(receivedMsg.SendTime), "send times don't match, expected %s, got %s", sessionMessage.SendTime, receivedMsg.SendTime) + receivedMsg.SendTime = sessionMessage.SendTime + assert.Equal(sessionMessage, &receivedMsg) + } + } +} + +func TestAsyncEvents_Loopback(t *testing.T) { + t.Parallel() + + logger := log.NewLoggerForTest(t) + ctx := log.NewLoggerContext(t.Context(), logger) + events, err := NewAsyncEvents(ctx, nats.LoopbackUrl) + require.NoError(t, err) + testAsyncEvents(t, events) +} + +func TestAsyncEvents_NATS(t *testing.T) { + t.Parallel() + + server, _ := nats.StartLocalServer(t) + logger := log.NewLoggerForTest(t) + ctx := log.NewLoggerContext(t.Context(), logger) + events, err := NewAsyncEvents(ctx, server.ClientURL()) + require.NoError(t, err) + testAsyncEvents(t, events) +} diff --git a/async_events_test.go b/async/eventstest/eventstest.go similarity index 69% rename from async_events_test.go rename to async/eventstest/eventstest.go index dead7c0..5f551cd 100644 --- a/async_events_test.go +++ b/async/eventstest/eventstest.go @@ -1,6 +1,6 @@ /** * Standalone signaling server for the Nextcloud Spreed app. - * Copyright (C) 2022 struktur AG + * Copyright (C) 2025 struktur AG * * @author Joachim Bauch * @@ -19,7 +19,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package signaling +package eventstest import ( "context" @@ -30,19 +30,22 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/strukturag/nextcloud-spreed-signaling/async/events" "github.com/strukturag/nextcloud-spreed-signaling/log" "github.com/strukturag/nextcloud-spreed-signaling/nats" ) var ( - eventBackendsForTest = []string{ + testTimeout = 10 * time.Second + + EventBackendsForTest = []string{ "loopback", "nats", } ) -func getAsyncEventsForTest(t *testing.T) AsyncEvents { - var events AsyncEvents +func GetAsyncEventsForTest(t *testing.T) events.AsyncEvents { + var events events.AsyncEvents if strings.HasSuffix(t.Name(), "/nats") { events = getRealAsyncEventsForTest(t) } else { @@ -56,21 +59,25 @@ func getAsyncEventsForTest(t *testing.T) AsyncEvents { return events } -func getRealAsyncEventsForTest(t *testing.T) AsyncEvents { +func getRealAsyncEventsForTest(t *testing.T) events.AsyncEvents { logger := log.NewLoggerForTest(t) ctx := log.NewLoggerContext(t.Context(), logger) server, _ := nats.StartLocalServer(t) - events, err := NewAsyncEvents(ctx, server.ClientURL()) + events, err := events.NewAsyncEvents(ctx, server.ClientURL()) if err != nil { require.NoError(t, err) } return events } -func getLoopbackAsyncEventsForTest(t *testing.T) AsyncEvents { +type natsEvents interface { + GetNatsClient() nats.Client +} + +func getLoopbackAsyncEventsForTest(t *testing.T) events.AsyncEvents { logger := log.NewLoggerForTest(t) ctx := log.NewLoggerContext(t.Context(), logger) - events, err := NewAsyncEvents(ctx, nats.LoopbackUrl) + events, err := events.NewAsyncEvents(ctx, nats.LoopbackUrl) if err != nil { require.NoError(t, err) } @@ -79,22 +86,27 @@ func getLoopbackAsyncEventsForTest(t *testing.T) AsyncEvents { ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() - client := (events.(*asyncEventsNats)).client - nats.WaitForSubscriptionsEmpty(ctx, t, client) + e, ok := (events.(natsEvents)) + if !ok { + // Only can wait for NATS events. + return + } + + nats.WaitForSubscriptionsEmpty(ctx, t, e.GetNatsClient()) }) return events } -func waitForAsyncEventsFlushed(ctx context.Context, t *testing.T, events AsyncEvents) { +func WaitForAsyncEventsFlushed(ctx context.Context, t *testing.T, events events.AsyncEvents) { t.Helper() - e, ok := (events.(*asyncEventsNats)) + e, ok := (events.(natsEvents)) if !ok { // Only can wait for NATS events. return } - client, ok := e.client.(*nats.NativeClient) + client, ok := e.GetNatsClient().(*nats.NativeClient) if !ok { // The loopback NATS clients is executing all events synchronously. return diff --git a/backend_server.go b/backend_server.go index b0e23ce..c550231 100644 --- a/backend_server.go +++ b/backend_server.go @@ -50,6 +50,7 @@ import ( "github.com/strukturag/nextcloud-spreed-signaling/api" "github.com/strukturag/nextcloud-spreed-signaling/async" + "github.com/strukturag/nextcloud-spreed-signaling/async/events" "github.com/strukturag/nextcloud-spreed-signaling/config" "github.com/strukturag/nextcloud-spreed-signaling/container" "github.com/strukturag/nextcloud-spreed-signaling/internal" @@ -71,7 +72,7 @@ const ( type BackendServer struct { logger log.Logger hub *Hub - events AsyncEvents + events events.AsyncEvents roomSessions RoomSessions version string @@ -329,7 +330,7 @@ func (b *BackendServer) parseRequestBody(f func(context.Context, http.ResponseWr } func (b *BackendServer) sendRoomInvite(roomid string, backend *talk.Backend, userids []string, properties json.RawMessage) { - msg := &AsyncMessage{ + msg := &events.AsyncMessage{ Type: "message", Message: &api.ServerMessage{ Type: "event", @@ -351,7 +352,7 @@ func (b *BackendServer) sendRoomInvite(roomid string, backend *talk.Backend, use } func (b *BackendServer) sendRoomDisinvite(roomid string, backend *talk.Backend, reason string, userids []string, sessionids []api.RoomSessionId) { - msg := &AsyncMessage{ + msg := &events.AsyncMessage{ Type: "message", Message: &api.ServerMessage{ Type: "event", @@ -400,7 +401,7 @@ func (b *BackendServer) sendRoomDisinvite(roomid string, backend *talk.Backend, } func (b *BackendServer) sendRoomUpdate(roomid string, backend *talk.Backend, notified_userids []string, all_userids []string, properties json.RawMessage) { - msg := &AsyncMessage{ + msg := &events.AsyncMessage{ Type: "message", Message: &api.ServerMessage{ Type: "event", @@ -518,7 +519,7 @@ func (b *BackendServer) sendRoomIncall(roomid string, backend *talk.Backend, req } } - message := &AsyncMessage{ + message := &events.AsyncMessage{ Type: "room", Room: request, } @@ -571,7 +572,7 @@ loop: go func(sessionId api.PublicSessionId, permissions []api.Permission) { defer wg.Done() - message := &AsyncMessage{ + message := &events.AsyncMessage{ Type: "permissions", Permissions: permissions, } @@ -582,7 +583,7 @@ loop: } wg.Wait() - message := &AsyncMessage{ + message := &events.AsyncMessage{ Type: "room", Room: request, } @@ -590,7 +591,7 @@ loop: } func (b *BackendServer) sendRoomMessage(roomid string, backend *talk.Backend, request *talk.BackendServerRoomRequest) error { - message := &AsyncMessage{ + message := &events.AsyncMessage{ Type: "room", Room: request, } @@ -688,7 +689,7 @@ func (b *BackendServer) sendRoomSwitchTo(ctx context.Context, roomid string, bac } request.SwitchTo.Sessions = nil - message := &AsyncMessage{ + message := &events.AsyncMessage{ Type: "room", Room: request, } @@ -922,14 +923,14 @@ func (b *BackendServer) roomHandler(ctx context.Context, w http.ResponseWriter, b.sendRoomDisinvite(roomid, backend, api.DisinviteReasonDisinvited, request.Disinvite.UserIds, request.Disinvite.SessionIds) b.sendRoomUpdate(roomid, backend, request.Disinvite.UserIds, request.Disinvite.AllUserIds, request.Disinvite.Properties) case "update": - message := &AsyncMessage{ + message := &events.AsyncMessage{ Type: "room", Room: &request, } err = b.events.PublishBackendRoomMessage(roomid, backend, message) b.sendRoomUpdate(roomid, backend, nil, request.Update.UserIds, request.Update.Properties) case "delete": - message := &AsyncMessage{ + message := &events.AsyncMessage{ Type: "room", Room: &request, } @@ -1016,6 +1017,10 @@ func (b *BackendServer) statsHandler(w http.ResponseWriter, r *http.Request) { w.Write(statsData) // nolint } +type withServerInfoNats interface { + GetServerInfoNats() *talk.BackendServerInfoNats +} + func (b *BackendServer) serverinfoHandler(w http.ResponseWriter, r *http.Request) { info := talk.BackendServerInfo{ Version: b.version, @@ -1026,7 +1031,7 @@ func (b *BackendServer) serverinfoHandler(w http.ResponseWriter, r *http.Request if mcu := b.hub.mcu; mcu != nil { info.Sfu = mcu.GetServerInfoSfu() } - if e, ok := b.events.(*asyncEventsNats); ok { + if e, ok := b.events.(withServerInfoNats); ok { info.Nats = e.GetServerInfoNats() } if rpcClients := b.hub.rpcClients; rpcClients != nil { diff --git a/backend_server_test.go b/backend_server_test.go index 7eda72f..59b5fc4 100644 --- a/backend_server_test.go +++ b/backend_server_test.go @@ -47,6 +47,8 @@ import ( "github.com/stretchr/testify/require" "github.com/strukturag/nextcloud-spreed-signaling/api" + "github.com/strukturag/nextcloud-spreed-signaling/async/events" + "github.com/strukturag/nextcloud-spreed-signaling/async/eventstest" "github.com/strukturag/nextcloud-spreed-signaling/internal" "github.com/strukturag/nextcloud-spreed-signaling/log" "github.com/strukturag/nextcloud-spreed-signaling/nats" @@ -60,11 +62,11 @@ var ( turnServers = strings.Split(turnServersString, ",") ) -func CreateBackendServerForTest(t *testing.T) (*goconf.ConfigFile, *BackendServer, AsyncEvents, *Hub, *mux.Router, *httptest.Server) { +func CreateBackendServerForTest(t *testing.T) (*goconf.ConfigFile, *BackendServer, events.AsyncEvents, *Hub, *mux.Router, *httptest.Server) { return CreateBackendServerForTestFromConfig(t, nil) } -func CreateBackendServerForTestWithTurn(t *testing.T) (*goconf.ConfigFile, *BackendServer, AsyncEvents, *Hub, *mux.Router, *httptest.Server) { +func CreateBackendServerForTestWithTurn(t *testing.T) (*goconf.ConfigFile, *BackendServer, events.AsyncEvents, *Hub, *mux.Router, *httptest.Server) { config := goconf.NewConfigFile() config.AddOption("turn", "apikey", turnApiKey) config.AddOption("turn", "secret", turnSecret) @@ -72,7 +74,7 @@ func CreateBackendServerForTestWithTurn(t *testing.T) (*goconf.ConfigFile, *Back return CreateBackendServerForTestFromConfig(t, config) } -func CreateBackendServerForTestFromConfig(t *testing.T, config *goconf.ConfigFile) (*goconf.ConfigFile, *BackendServer, AsyncEvents, *Hub, *mux.Router, *httptest.Server) { +func CreateBackendServerForTestFromConfig(t *testing.T, config *goconf.ConfigFile) (*goconf.ConfigFile, *BackendServer, events.AsyncEvents, *Hub, *mux.Router, *httptest.Server) { require := require.New(t) r := mux.NewRouter() registerBackendHandler(t, r) @@ -102,7 +104,7 @@ func CreateBackendServerForTestFromConfig(t *testing.T, config *goconf.ConfigFil config.AddOption("sessions", "blockkey", "09876543210987654321098765432109") config.AddOption("clients", "internalsecret", string(testInternalSecret)) config.AddOption("geoip", "url", "none") - events := getAsyncEventsForTest(t) + events := eventstest.GetAsyncEventsForTest(t) logger := log.NewLoggerForTest(t) ctx := log.NewLoggerContext(t.Context(), logger) hub, err := NewHub(ctx, config, events, nil, nil, nil, r, "no-version") @@ -168,7 +170,7 @@ func CreateBackendServerWithClusteringForTestFromConfig(t *testing.T, config1 *g logger := log.NewLoggerForTest(t) ctx := log.NewLoggerContext(t.Context(), logger) - events1, err := NewAsyncEvents(ctx, nats.ClientURL()) + events1, err := events.NewAsyncEvents(ctx, nats.ClientURL()) require.NoError(err) t.Cleanup(func() { ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -193,7 +195,7 @@ func CreateBackendServerWithClusteringForTestFromConfig(t *testing.T, config1 *g config2.AddOption("sessions", "blockkey", "09876543210987654321098765432109") config2.AddOption("clients", "internalsecret", string(testInternalSecret)) config2.AddOption("geoip", "url", "none") - events2, err := NewAsyncEvents(ctx, nats.ClientURL()) + events2, err := events.NewAsyncEvents(ctx, nats.ClientURL()) require.NoError(err) t.Cleanup(func() { ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -244,13 +246,13 @@ func performBackendRequest(requestUrl string, body []byte) (*http.Response, erro return client.Do(request) } -func expectRoomlistEvent(t *testing.T, ch AsyncChannel, msgType string) (*api.EventServerMessage, bool) { +func expectRoomlistEvent(t *testing.T, ch events.AsyncChannel, msgType string) (*api.EventServerMessage, bool) { assert := assert.New(t) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() select { case natsMsg := <-ch: - var message AsyncMessage + var message events.AsyncMessage if !assert.NoError(nats.Decode(natsMsg, &message)) || !assert.Equal("message", message.Type, "invalid message type, got %+v", message) || !assert.NotNil(message.Message, "message missing, got %+v", message) { @@ -397,7 +399,7 @@ func TestBackendServer_UnsupportedRequest(t *testing.T) { func TestBackendServer_RoomInvite(t *testing.T) { t.Parallel() - for _, backend := range eventBackendsForTest { + for _, backend := range eventstest.EventBackendsForTest { t.Run(backend, func(t *testing.T) { t.Parallel() logger := log.NewLoggerForTest(t) @@ -408,17 +410,17 @@ func TestBackendServer_RoomInvite(t *testing.T) { } type channelEventListener struct { - ch AsyncChannel + ch events.AsyncChannel } -func (l *channelEventListener) AsyncChannel() AsyncChannel { +func (l *channelEventListener) AsyncChannel() events.AsyncChannel { return l.ch } func RunTestBackendServer_RoomInvite(ctx context.Context, t *testing.T) { require := require.New(t) assert := assert.New(t) - _, _, events, hub, _, server := CreateBackendServerForTest(t) + _, _, asyncEvents, hub, _, server := CreateBackendServerForTest(t) u, err := url.Parse(server.URL) require.NoError(err) @@ -427,14 +429,15 @@ func RunTestBackendServer_RoomInvite(ctx context.Context, t *testing.T) { roomProperties := json.RawMessage("{\"foo\":\"bar\"}") backend := hub.backend.GetBackend(u) - eventsChan := make(AsyncChannel, 1) + eventsChan := make(events.AsyncChannel, 1) listener := &channelEventListener{ ch: eventsChan, } - require.NoError(events.RegisterUserListener(userid, backend, listener)) + require.NoError(asyncEvents.RegisterUserListener(userid, backend, listener)) defer func() { - assert.NoError(events.UnregisterUserListener(userid, backend, listener)) + assert.NoError(asyncEvents.UnregisterUserListener(userid, backend, listener)) }() + msg := &talk.BackendServerRoomRequest{ Type: "invite", Invite: &talk.BackendRoomInviteRequest{ @@ -466,7 +469,7 @@ func RunTestBackendServer_RoomInvite(ctx context.Context, t *testing.T) { func TestBackendServer_RoomDisinvite(t *testing.T) { t.Parallel() - for _, backend := range eventBackendsForTest { + for _, backend := range eventstest.EventBackendsForTest { t.Run(backend, func(t *testing.T) { t.Parallel() logger := log.NewLoggerForTest(t) @@ -479,7 +482,7 @@ func TestBackendServer_RoomDisinvite(t *testing.T) { func RunTestBackendServer_RoomDisinvite(ctx context.Context, t *testing.T) { require := require.New(t) assert := assert.New(t) - _, _, events, hub, _, server := CreateBackendServerForTest(t) + _, _, asyncEvents, hub, _, server := CreateBackendServerForTest(t) u, err := url.Parse(server.URL) require.NoError(err) @@ -503,14 +506,15 @@ func RunTestBackendServer_RoomDisinvite(ctx context.Context, t *testing.T) { roomProperties := json.RawMessage("{\"foo\":\"bar\"}") - eventsChan := make(AsyncChannel, 1) + eventsChan := make(events.AsyncChannel, 1) listener := &channelEventListener{ ch: eventsChan, } - require.NoError(events.RegisterUserListener(testDefaultUserId, backend, listener)) + require.NoError(asyncEvents.RegisterUserListener(testDefaultUserId, backend, listener)) defer func() { - assert.NoError(events.UnregisterUserListener(testDefaultUserId, backend, listener)) + assert.NoError(asyncEvents.UnregisterUserListener(testDefaultUserId, backend, listener)) }() + msg := &talk.BackendServerRoomRequest{ Type: "disinvite", Disinvite: &talk.BackendRoomDisinviteRequest{ @@ -629,7 +633,7 @@ func TestBackendServer_RoomDisinviteDifferentRooms(t *testing.T) { func TestBackendServer_RoomUpdate(t *testing.T) { t.Parallel() - for _, backend := range eventBackendsForTest { + for _, backend := range eventstest.EventBackendsForTest { t.Run(backend, func(t *testing.T) { t.Parallel() logger := log.NewLoggerForTest(t) @@ -642,7 +646,7 @@ func TestBackendServer_RoomUpdate(t *testing.T) { func RunTestBackendServer_RoomUpdate(ctx context.Context, t *testing.T) { require := require.New(t) assert := assert.New(t) - _, _, events, hub, _, server := CreateBackendServerForTest(t) + _, _, asyncEvents, hub, _, server := CreateBackendServerForTest(t) u, err := url.Parse(server.URL) require.NoError(err) @@ -658,14 +662,15 @@ func RunTestBackendServer_RoomUpdate(ctx context.Context, t *testing.T) { userid := "test-userid" roomProperties := json.RawMessage("{\"foo\":\"bar\"}") - eventsChan := make(AsyncChannel, 1) + eventsChan := make(events.AsyncChannel, 1) listener := &channelEventListener{ ch: eventsChan, } - require.NoError(events.RegisterUserListener(userid, backend, listener)) + require.NoError(asyncEvents.RegisterUserListener(userid, backend, listener)) defer func() { - assert.NoError(events.UnregisterUserListener(userid, backend, listener)) + assert.NoError(asyncEvents.UnregisterUserListener(userid, backend, listener)) }() + msg := &talk.BackendServerRoomRequest{ Type: "update", Update: &talk.BackendRoomUpdateRequest{ @@ -700,7 +705,7 @@ func RunTestBackendServer_RoomUpdate(ctx context.Context, t *testing.T) { func TestBackendServer_RoomDelete(t *testing.T) { t.Parallel() - for _, backend := range eventBackendsForTest { + for _, backend := range eventstest.EventBackendsForTest { t.Run(backend, func(t *testing.T) { t.Parallel() logger := log.NewLoggerForTest(t) @@ -713,7 +718,7 @@ func TestBackendServer_RoomDelete(t *testing.T) { func RunTestBackendServer_RoomDelete(ctx context.Context, t *testing.T) { require := require.New(t) assert := assert.New(t) - _, _, events, hub, _, server := CreateBackendServerForTest(t) + _, _, asyncEvents, hub, _, server := CreateBackendServerForTest(t) u, err := url.Parse(server.URL) require.NoError(err) @@ -726,14 +731,15 @@ func RunTestBackendServer_RoomDelete(ctx context.Context, t *testing.T) { require.NoError(err) userid := "test-userid" - eventsChan := make(AsyncChannel, 1) + eventsChan := make(events.AsyncChannel, 1) listener := &channelEventListener{ ch: eventsChan, } - require.NoError(events.RegisterUserListener(userid, backend, listener)) + require.NoError(asyncEvents.RegisterUserListener(userid, backend, listener)) defer func() { - assert.NoError(events.UnregisterUserListener(userid, backend, listener)) + assert.NoError(asyncEvents.UnregisterUserListener(userid, backend, listener)) }() + msg := &talk.BackendServerRoomRequest{ Type: "delete", Delete: &talk.BackendRoomDeleteRequest{ diff --git a/clientsession.go b/clientsession.go index e12f08c..68a60e4 100644 --- a/clientsession.go +++ b/clientsession.go @@ -37,6 +37,7 @@ import ( "github.com/strukturag/nextcloud-spreed-signaling/api" "github.com/strukturag/nextcloud-spreed-signaling/async" + "github.com/strukturag/nextcloud-spreed-signaling/async/events" "github.com/strukturag/nextcloud-spreed-signaling/log" "github.com/strukturag/nextcloud-spreed-signaling/nats" "github.com/strukturag/nextcloud-spreed-signaling/talk" @@ -57,7 +58,7 @@ type ResponseHandlerFunc func(message *api.ClientMessage) bool type ClientSession struct { logger log.Logger hub *Hub - events AsyncEvents + events events.AsyncEvents privateId api.PrivateSessionId publicId api.PublicSessionId data *SessionIdData @@ -82,7 +83,7 @@ type ClientSession struct { parsedBackendUrl *url.URL mu sync.Mutex - asyncCh AsyncChannel + asyncCh events.AsyncChannel // +checklocks:mu client HandlerClient @@ -140,7 +141,7 @@ func NewClientSession(hub *Hub, privateId api.PrivateSessionId, publicId api.Pub parseUserData: parseUserData(auth.User), backend: backend, - asyncCh: make(AsyncChannel, DefaultAsyncChannelSize), + asyncCh: make(events.AsyncChannel, events.DefaultAsyncChannelSize), } if s.clientType == api.HelloClientTypeInternal { s.backendUrl = hello.Auth.InternalParams.Backend @@ -398,7 +399,7 @@ func (s *ClientSession) releaseMcuObjects() { } } -func (s *ClientSession) AsyncChannel() AsyncChannel { +func (s *ClientSession) AsyncChannel() events.AsyncChannel { return s.asyncCh } @@ -1071,7 +1072,7 @@ func (s *ClientSession) GetSubscriber(id api.PublicSessionId, streamType StreamT } func (s *ClientSession) processAsyncNatsMessage(msg *nats.Msg) { - var message AsyncMessage + var message events.AsyncMessage if err := nats.Decode(msg, &message); err != nil { s.logger.Printf("Could not decode NATS message %+v: %s", msg, err) return @@ -1080,7 +1081,7 @@ func (s *ClientSession) processAsyncNatsMessage(msg *nats.Msg) { s.processAsyncMessage(&message) } -func (s *ClientSession) processAsyncMessage(message *AsyncMessage) { +func (s *ClientSession) processAsyncMessage(message *events.AsyncMessage) { switch message.Type { case "permissions": s.SetPermissions(message.Permissions) @@ -1128,7 +1129,7 @@ func (s *ClientSession) processAsyncMessage(message *AsyncMessage) { mc, err := s.GetOrCreateSubscriber(ctx, s.hub.mcu, message.SendOffer.SessionId, StreamType(message.SendOffer.Data.RoomType)) if err != nil { s.logger.Printf("Could not create MCU subscriber for session %s to process sendoffer in %s: %s", message.SendOffer.SessionId, s.PublicId(), err) - if err := s.events.PublishSessionMessage(message.SendOffer.SessionId, s.backend, &AsyncMessage{ + if err := s.events.PublishSessionMessage(message.SendOffer.SessionId, s.backend, &events.AsyncMessage{ Type: "message", Message: &api.ServerMessage{ Id: message.SendOffer.MessageId, @@ -1141,7 +1142,7 @@ func (s *ClientSession) processAsyncMessage(message *AsyncMessage) { return } else if mc == nil { s.logger.Printf("No MCU subscriber found for session %s to process sendoffer in %s", message.SendOffer.SessionId, s.PublicId()) - if err := s.events.PublishSessionMessage(message.SendOffer.SessionId, s.backend, &AsyncMessage{ + if err := s.events.PublishSessionMessage(message.SendOffer.SessionId, s.backend, &events.AsyncMessage{ Type: "message", Message: &api.ServerMessage{ Id: message.SendOffer.MessageId, @@ -1157,7 +1158,7 @@ func (s *ClientSession) processAsyncMessage(message *AsyncMessage) { mc.SendMessage(s.Context(), nil, message.SendOffer.Data, func(err error, response api.StringMap) { if err != nil { s.logger.Printf("Could not send MCU message %+v for session %s to %s: %s", message.SendOffer.Data, message.SendOffer.SessionId, s.PublicId(), err) - if err := s.events.PublishSessionMessage(message.SendOffer.SessionId, s.backend, &AsyncMessage{ + if err := s.events.PublishSessionMessage(message.SendOffer.SessionId, s.backend, &events.AsyncMessage{ Type: "message", Message: &api.ServerMessage{ Id: message.SendOffer.MessageId, @@ -1413,7 +1414,7 @@ func (s *ClientSession) filterMessage(message *api.ServerMessage) *api.ServerMes return message } -func (s *ClientSession) filterAsyncMessage(msg *AsyncMessage) *api.ServerMessage { +func (s *ClientSession) filterAsyncMessage(msg *events.AsyncMessage) *api.ServerMessage { switch msg.Type { case "message": if msg.Message == nil { diff --git a/clientsession_test.go b/clientsession_test.go index a308aee..f59b45d 100644 --- a/clientsession_test.go +++ b/clientsession_test.go @@ -33,6 +33,7 @@ import ( "github.com/stretchr/testify/require" "github.com/strukturag/nextcloud-spreed-signaling/api" + "github.com/strukturag/nextcloud-spreed-signaling/async/events" "github.com/strukturag/nextcloud-spreed-signaling/mock" "github.com/strukturag/nextcloud-spreed-signaling/talk" ) @@ -218,7 +219,7 @@ func TestFeatureChatRelay(t *testing.T) { require.NoError(err) // Simulate request from the backend. - room.processAsyncMessage(&AsyncMessage{ + room.processAsyncMessage(&events.AsyncMessage{ Type: "room", Room: &talk.BackendServerRoomRequest{ Type: "message", @@ -415,7 +416,7 @@ func TestFeatureChatRelayFederation(t *testing.T) { require.NoError(err) // Simulate request from the backend. - room.processAsyncMessage(&AsyncMessage{ + room.processAsyncMessage(&events.AsyncMessage{ Type: "room", Room: &talk.BackendServerRoomRequest{ Type: "message", @@ -515,7 +516,7 @@ func TestPermissionHideDisplayNames(t *testing.T) { require.NoError(err) // Simulate request from the backend. - room.processAsyncMessage(&AsyncMessage{ + room.processAsyncMessage(&events.AsyncMessage{ Type: "room", Room: &talk.BackendServerRoomRequest{ Type: "message", diff --git a/hub.go b/hub.go index 88867a6..96b8802 100644 --- a/hub.go +++ b/hub.go @@ -53,6 +53,7 @@ import ( "github.com/strukturag/nextcloud-spreed-signaling/api" "github.com/strukturag/nextcloud-spreed-signaling/async" + "github.com/strukturag/nextcloud-spreed-signaling/async/events" "github.com/strukturag/nextcloud-spreed-signaling/config" "github.com/strukturag/nextcloud-spreed-signaling/container" "github.com/strukturag/nextcloud-spreed-signaling/etcd" @@ -146,7 +147,7 @@ func init() { type Hub struct { version string logger log.Logger - events AsyncEvents + events events.AsyncEvents upgrader websocket.Upgrader sessionIds *SessionIdCodec info *api.WelcomeServerMessage @@ -225,7 +226,7 @@ type Hub struct { blockedCandidates atomic.Pointer[container.IPList] } -func NewHub(ctx context.Context, cfg *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer, rpcClients *GrpcClients, etcdClient etcd.Client, r *mux.Router, version string) (*Hub, error) { +func NewHub(ctx context.Context, cfg *goconf.ConfigFile, events events.AsyncEvents, rpcServer *GrpcServer, rpcClients *GrpcClients, etcdClient etcd.Client, r *mux.Router, version string) (*Hub, error) { logger := log.LoggerFromContext(ctx) hashKey, _ := config.GetStringOptionWithEnv(cfg, "sessions", "hashkey") switch len(hashKey) { @@ -1626,7 +1627,7 @@ func (h *Hub) disconnectByRoomSessionId(ctx context.Context, roomSessionId api.R if session == nil { // Session is located on a different server. Should already have been closed // but send "bye" again as additional safeguard. - msg := &AsyncMessage{ + msg := &events.AsyncMessage{ Type: "message", Message: &api.ServerMessage{ Type: "bye", @@ -2150,7 +2151,7 @@ func (h *Hub) processMessageMsg(sess Session, message *api.ClientMessage) { return } - subject = GetSubjectForSessionId(msg.Recipient.SessionId, sess.Backend()) + subject = events.GetSubjectForSessionId(msg.Recipient.SessionId, sess.Backend()) recipientSessionId = msg.Recipient.SessionId if sess, ok := sess.(*ClientSession); ok { recipient = sess @@ -2160,7 +2161,7 @@ func (h *Hub) processMessageMsg(sess Session, message *api.ClientMessage) { if sess.ClientType() == api.HelloClientTypeVirtual { virtualSession := sess.(*VirtualSession) clientSession := virtualSession.Session() - subject = GetSubjectForSessionId(clientSession.PublicId(), sess.Backend()) + subject = events.GetSubjectForSessionId(clientSession.PublicId(), sess.Backend()) recipientSessionId = clientSession.PublicId() recipient = clientSession // The client should see his session id as recipient. @@ -2170,7 +2171,7 @@ func (h *Hub) processMessageMsg(sess Session, message *api.ClientMessage) { } } } else { - subject = GetSubjectForSessionId(msg.Recipient.SessionId, nil) + subject = events.GetSubjectForSessionId(msg.Recipient.SessionId, nil) recipientSessionId = msg.Recipient.SessionId serverRecipient = &msg.Recipient } @@ -2183,14 +2184,14 @@ func (h *Hub) processMessageMsg(sess Session, message *api.ClientMessage) { return } - subject = GetSubjectForUserId(msg.Recipient.UserId, session.Backend()) + subject = events.GetSubjectForUserId(msg.Recipient.UserId, session.Backend()) } case api.RecipientTypeRoom: fallthrough case api.RecipientTypeCall: if session != nil { if room = session.GetRoom(); room != nil { - subject = GetSubjectForRoomId(room.Id(), room.Backend()) + subject = events.GetSubjectForRoomId(room.Id(), room.Backend()) if h.mcu != nil { var data api.MessageClientMessageData @@ -2283,9 +2284,9 @@ func (h *Hub) processMessageMsg(sess Session, message *api.ClientMessage) { return } - async := &AsyncMessage{ + async := &events.AsyncMessage{ Type: "sendoffer", - SendOffer: &SendOfferMessage{ + SendOffer: &events.SendOfferMessage{ MessageId: message.Id, SessionId: session.PublicId(), Data: clientData, @@ -2297,7 +2298,7 @@ func (h *Hub) processMessageMsg(sess Session, message *api.ClientMessage) { return } - async := &AsyncMessage{ + async := &events.AsyncMessage{ Type: "message", Message: response, } @@ -2356,7 +2357,7 @@ func (h *Hub) processControlMsg(session Session, message *api.ClientMessage) { return } - subject = GetSubjectForSessionId(msg.Recipient.SessionId, nil) + subject = events.GetSubjectForSessionId(msg.Recipient.SessionId, nil) recipientSessionId = msg.Recipient.SessionId h.mu.RLock() sess, found := h.sessions[data.Sid] @@ -2369,7 +2370,7 @@ func (h *Hub) processControlMsg(session Session, message *api.ClientMessage) { if sess.ClientType() == api.HelloClientTypeVirtual { virtualSession := sess.(*VirtualSession) clientSession := virtualSession.Session() - subject = GetSubjectForSessionId(clientSession.PublicId(), sess.Backend()) + subject = events.GetSubjectForSessionId(clientSession.PublicId(), sess.Backend()) recipientSessionId = clientSession.PublicId() recipient = clientSession // The client should see his session id as recipient. @@ -2394,14 +2395,14 @@ func (h *Hub) processControlMsg(session Session, message *api.ClientMessage) { return } - subject = GetSubjectForUserId(msg.Recipient.UserId, session.Backend()) + subject = events.GetSubjectForUserId(msg.Recipient.UserId, session.Backend()) } case api.RecipientTypeRoom: fallthrough case api.RecipientTypeCall: if session != nil { if room = session.GetRoom(); room != nil { - subject = GetSubjectForRoomId(room.Id(), room.Backend()) + subject = events.GetSubjectForRoomId(room.Id(), room.Backend()) } } } @@ -2425,7 +2426,7 @@ func (h *Hub) processControlMsg(session Session, message *api.ClientMessage) { if recipient != nil { recipient.SendMessage(response) } else { - async := &AsyncMessage{ + async := &events.AsyncMessage{ Type: "message", Message: response, } @@ -2618,7 +2619,7 @@ func (h *Hub) processInternalMsg(sess Session, message *api.ClientMessage) { roomId := msg.Dialout.RoomId msg.Dialout.RoomId = "" // Don't send room id to recipients. if msg.Dialout.Type == "status" { - asyncMessage := &AsyncMessage{ + asyncMessage := &events.AsyncMessage{ Type: "room", Room: &talk.BackendServerRoomRequest{ Type: "transient", @@ -2636,7 +2637,7 @@ func (h *Hub) processInternalMsg(sess Session, message *api.ClientMessage) { h.logger.Printf("Error publishing dialout message %+v to room %s", msg.Dialout, roomId) } } else { - if err := h.events.PublishRoomMessage(roomId, session.Backend(), &AsyncMessage{ + if err := h.events.PublishRoomMessage(roomId, session.Backend(), &events.AsyncMessage{ Type: "message", Message: &api.ServerMessage{ Type: "dialout", diff --git a/hub_test.go b/hub_test.go index f420cac..9ee4129 100644 --- a/hub_test.go +++ b/hub_test.go @@ -54,6 +54,8 @@ import ( "github.com/strukturag/nextcloud-spreed-signaling/api" "github.com/strukturag/nextcloud-spreed-signaling/async" + "github.com/strukturag/nextcloud-spreed-signaling/async/events" + "github.com/strukturag/nextcloud-spreed-signaling/async/eventstest" "github.com/strukturag/nextcloud-spreed-signaling/container" "github.com/strukturag/nextcloud-spreed-signaling/geoip" "github.com/strukturag/nextcloud-spreed-signaling/internal" @@ -158,7 +160,7 @@ func getTestConfigWithMultipleUrls(server *httptest.Server) (*goconf.ConfigFile, return config, nil } -func CreateHubForTestWithConfig(t *testing.T, getConfigFunc func(*httptest.Server) (*goconf.ConfigFile, error)) (*Hub, AsyncEvents, *mux.Router, *httptest.Server) { +func CreateHubForTestWithConfig(t *testing.T, getConfigFunc func(*httptest.Server) (*goconf.ConfigFile, error)) (*Hub, events.AsyncEvents, *mux.Router, *httptest.Server) { logger := log.NewLoggerForTest(t) ctx := log.NewLoggerContext(t.Context(), logger) require := require.New(t) @@ -170,7 +172,7 @@ func CreateHubForTestWithConfig(t *testing.T, getConfigFunc func(*httptest.Serve server.Close() }) - events := getAsyncEventsForTest(t) + events := eventstest.GetAsyncEventsForTest(t) config, err := getConfigFunc(server) require.NoError(err) h, err := NewHub(ctx, config, events, nil, nil, nil, r, "no-version") @@ -191,18 +193,18 @@ func CreateHubForTestWithConfig(t *testing.T, getConfigFunc func(*httptest.Serve return h, events, r, server } -func CreateHubForTest(t *testing.T) (*Hub, AsyncEvents, *mux.Router, *httptest.Server) { +func CreateHubForTest(t *testing.T) (*Hub, events.AsyncEvents, *mux.Router, *httptest.Server) { return CreateHubForTestWithConfig(t, getTestConfig) } -func CreateHubWithMultipleBackendsForTest(t *testing.T) (*Hub, AsyncEvents, *mux.Router, *httptest.Server) { +func CreateHubWithMultipleBackendsForTest(t *testing.T) (*Hub, events.AsyncEvents, *mux.Router, *httptest.Server) { h, events, r, server := CreateHubForTestWithConfig(t, getTestConfigWithMultipleBackends) registerBackendHandlerUrl(t, r, "/one") registerBackendHandlerUrl(t, r, "/two") return h, events, r, server } -func CreateHubWithMultipleUrlsForTest(t *testing.T) (*Hub, AsyncEvents, *mux.Router, *httptest.Server) { +func CreateHubWithMultipleUrlsForTest(t *testing.T) (*Hub, events.AsyncEvents, *mux.Router, *httptest.Server) { h, events, r, server := CreateHubForTestWithConfig(t, getTestConfigWithMultipleUrls) registerBackendHandlerUrl(t, r, "/one") registerBackendHandlerUrl(t, r, "/two") @@ -245,7 +247,7 @@ func CreateClusteredHubsForTestWithConfig(t *testing.T, getConfigFunc func(*http addr1, addr2 = addr2, addr1 } - events1, err := NewAsyncEvents(ctx, nats1.ClientURL()) + events1, err := events.NewAsyncEvents(ctx, nats1.ClientURL()) require.NoError(err) t.Cleanup(func() { ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -259,7 +261,7 @@ func CreateClusteredHubsForTestWithConfig(t *testing.T, getConfigFunc func(*http require.NoError(err) b1, err := NewBackendServer(ctx, config1, h1, "no-version") require.NoError(err) - events2, err := NewAsyncEvents(ctx, nats2.ClientURL()) + events2, err := events.NewAsyncEvents(ctx, nats2.ClientURL()) require.NoError(err) t.Cleanup(func() { ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -2005,8 +2007,8 @@ func TestClientMessageToSessionId(t *testing.T) { require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId) // Make sure the session subscription events are processed. - waitForAsyncEventsFlushed(ctx, t, hub1.events) - waitForAsyncEventsFlushed(ctx, t, hub2.events) + eventstest.WaitForAsyncEventsFlushed(ctx, t, hub1.events) + eventstest.WaitForAsyncEventsFlushed(ctx, t, hub2.events) recipient1 := api.MessageClientMessageRecipient{ Type: "session", @@ -2066,8 +2068,8 @@ func TestClientControlToSessionId(t *testing.T) { require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId) // Make sure the session subscription events are processed. - waitForAsyncEventsFlushed(ctx, t, hub1.events) - waitForAsyncEventsFlushed(ctx, t, hub2.events) + eventstest.WaitForAsyncEventsFlushed(ctx, t, hub1.events) + eventstest.WaitForAsyncEventsFlushed(ctx, t, hub2.events) recipient1 := api.MessageClientMessageRecipient{ Type: "session", @@ -3380,7 +3382,7 @@ func TestCombineChatRefreshWhileDisconnected(t *testing.T) { require.NoError(json.Unmarshal([]byte(chat_refresh), &data)) // Simulate requests from the backend. - room.processAsyncMessage(&AsyncMessage{ + room.processAsyncMessage(&events.AsyncMessage{ Type: "room", Room: &talk.BackendServerRoomRequest{ Type: "message", @@ -3389,7 +3391,7 @@ func TestCombineChatRefreshWhileDisconnected(t *testing.T) { }, }, }) - room.processAsyncMessage(&AsyncMessage{ + room.processAsyncMessage(&events.AsyncMessage{ Type: "room", Room: &talk.BackendServerRoomRequest{ Type: "message", diff --git a/room.go b/room.go index 7484962..775d29e 100644 --- a/room.go +++ b/room.go @@ -36,6 +36,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/strukturag/nextcloud-spreed-signaling/api" + "github.com/strukturag/nextcloud-spreed-signaling/async/events" "github.com/strukturag/nextcloud-spreed-signaling/internal" "github.com/strukturag/nextcloud-spreed-signaling/log" "github.com/strukturag/nextcloud-spreed-signaling/nats" @@ -70,7 +71,7 @@ type Room struct { id string logger log.Logger hub *Hub - events AsyncEvents + events events.AsyncEvents backend *talk.Backend // +checklocks:mu @@ -78,7 +79,7 @@ type Room struct { closer *internal.Closer mu *sync.RWMutex - asyncCh AsyncChannel + asyncCh events.AsyncChannel // +checklocks:mu sessions map[api.PublicSessionId]Session // +checklocks:mu @@ -110,19 +111,19 @@ func getRoomIdForBackend(id string, backend *talk.Backend) string { return backend.Id() + "|" + id } -func NewRoom(roomId string, properties json.RawMessage, hub *Hub, events AsyncEvents, backend *talk.Backend) (*Room, error) { +func NewRoom(roomId string, properties json.RawMessage, hub *Hub, asyncEvents events.AsyncEvents, backend *talk.Backend) (*Room, error) { room := &Room{ id: roomId, logger: hub.logger, hub: hub, - events: events, + events: asyncEvents, backend: backend, properties: properties, closer: internal.NewCloser(), mu: &sync.RWMutex{}, - asyncCh: make(AsyncChannel, DefaultAsyncChannelSize), + asyncCh: make(events.AsyncChannel, events.DefaultAsyncChannelSize), sessions: make(map[api.PublicSessionId]Session), internalSessions: make(map[*ClientSession]bool), @@ -140,7 +141,7 @@ func NewRoom(roomId string, properties json.RawMessage, hub *Hub, events AsyncEv transientData: NewTransientData(), } - if err := events.RegisterBackendRoomListener(roomId, backend, room); err != nil { + if err := asyncEvents.RegisterBackendRoomListener(roomId, backend, room); err != nil { return nil, err } @@ -185,7 +186,7 @@ func (r *Room) IsEqual(other *Room) bool { return b1.Id() == b2.Id() } -func (r *Room) AsyncChannel() AsyncChannel { +func (r *Room) AsyncChannel() events.AsyncChannel { return r.asyncCh } @@ -235,7 +236,7 @@ func (r *Room) Close() []Session { } func (r *Room) processAsyncNatsMessage(msg *nats.Msg) { - var message AsyncMessage + var message events.AsyncMessage if err := nats.Decode(msg, &message); err != nil { r.logger.Printf("Could not decode NATS message %+v: %s", msg, err) return @@ -244,7 +245,7 @@ func (r *Room) processAsyncNatsMessage(msg *nats.Msg) { r.processAsyncMessage(&message) } -func (r *Room) processAsyncMessage(message *AsyncMessage) { +func (r *Room) processAsyncMessage(message *events.AsyncMessage) { switch message.Type { case "room": r.processBackendRoomRequestRoom(message.Room) @@ -301,7 +302,7 @@ func (r *Room) processBackendRoomRequestRoom(message *talk.BackendServerRoomRequ } } -func (r *Room) processBackendRoomRequestAsyncRoom(message *AsyncRoomMessage) { +func (r *Room) processBackendRoomRequestAsyncRoom(message *events.AsyncRoomMessage) { switch message.Type { case "sessionjoined": r.notifySessionJoined(message.SessionId) @@ -373,9 +374,9 @@ func (r *Room) AddSession(session Session, sessionData json.RawMessage) { } // Trigger notifications that the session joined. - if err := r.events.PublishBackendRoomMessage(r.id, r.backend, &AsyncMessage{ + if err := r.events.PublishBackendRoomMessage(r.id, r.backend, &events.AsyncMessage{ Type: "asyncroom", - AsyncRoom: &AsyncRoomMessage{ + AsyncRoom: &events.AsyncRoomMessage{ Type: "sessionjoined", SessionId: sid, ClientType: session.ClientType(), @@ -411,7 +412,7 @@ func (r *Room) notifySessionJoined(sessionId api.PublicSessionId) { session = nil } - events := make([]api.EventServerMessageSessionEntry, 0, len(sessions)) + joinEvents := make([]api.EventServerMessageSessionEntry, 0, len(sessions)) for _, s := range sessions { entry := api.EventServerMessageSessionEntry{ SessionId: s.PublicId(), @@ -423,7 +424,7 @@ func (r *Room) notifySessionJoined(sessionId api.PublicSessionId) { entry.RoomSessionId = s.RoomSessionId() entry.Federated = s.ClientType() == api.HelloClientTypeFederation } - events = append(events, entry) + joinEvents = append(joinEvents, entry) } msg := &api.ServerMessage{ @@ -431,11 +432,11 @@ func (r *Room) notifySessionJoined(sessionId api.PublicSessionId) { Event: &api.EventServerMessage{ Target: "room", Type: "join", - Join: events, + Join: joinEvents, }, } - if err := r.events.PublishSessionMessage(sessionId, r.backend, &AsyncMessage{ + if err := r.events.PublishSessionMessage(sessionId, r.backend, &events.AsyncMessage{ Type: "message", Message: msg, }); err != nil { @@ -467,7 +468,7 @@ func (r *Room) notifySessionJoined(sessionId api.PublicSessionId) { }, } - if err := r.events.PublishSessionMessage(sessionId, r.backend, &AsyncMessage{ + if err := r.events.PublishSessionMessage(sessionId, r.backend, &events.AsyncMessage{ Type: "message", Message: msg, }); err != nil { @@ -545,7 +546,7 @@ func (r *Room) RemoveSession(session Session) bool { } func (r *Room) publish(message *api.ServerMessage) error { - return r.events.PublishRoomMessage(r.id, r.backend, &AsyncMessage{ + return r.events.PublishRoomMessage(r.id, r.backend, &events.AsyncMessage{ Type: "message", Message: message, }) @@ -1188,7 +1189,7 @@ func (r *Room) publishSwitchTo(message *talk.BackendRoomSwitchToMessageRequest) go func(sessionId api.PublicSessionId) { defer wg.Done() - if err := r.events.PublishSessionMessage(sessionId, r.backend, &AsyncMessage{ + if err := r.events.PublishSessionMessage(sessionId, r.backend, &events.AsyncMessage{ Type: "message", Message: msg, }); err != nil { @@ -1216,7 +1217,7 @@ func (r *Room) publishSwitchTo(message *talk.BackendRoomSwitchToMessageRequest) }, } - if err := r.events.PublishSessionMessage(sessionId, r.backend, &AsyncMessage{ + if err := r.events.PublishSessionMessage(sessionId, r.backend, &events.AsyncMessage{ Type: "message", Message: msg, }); err != nil { @@ -1249,7 +1250,7 @@ func (r *Room) SetTransientData(key string, value any) error { return r.RemoveTransientData(key) } - return r.events.PublishBackendRoomMessage(r.Id(), r.Backend(), &AsyncMessage{ + return r.events.PublishBackendRoomMessage(r.Id(), r.Backend(), &events.AsyncMessage{ Type: "room", Room: &talk.BackendServerRoomRequest{ Type: "transient", @@ -1273,7 +1274,7 @@ func (r *Room) SetTransientDataTTL(key string, value any, ttl time.Duration) err return r.SetTransientData(key, value) } - return r.events.PublishBackendRoomMessage(r.Id(), r.Backend(), &AsyncMessage{ + return r.events.PublishBackendRoomMessage(r.Id(), r.Backend(), &events.AsyncMessage{ Type: "room", Room: &talk.BackendServerRoomRequest{ Type: "transient", @@ -1292,7 +1293,7 @@ func (r *Room) doSetTransientDataTTL(key string, value any, ttl time.Duration) { } func (r *Room) RemoveTransientData(key string) error { - return r.events.PublishBackendRoomMessage(r.Id(), r.Backend(), &AsyncMessage{ + return r.events.PublishBackendRoomMessage(r.Id(), r.Backend(), &events.AsyncMessage{ Type: "room", Room: &talk.BackendServerRoomRequest{ Type: "transient", diff --git a/server/main.go b/server/main.go index 341f395..7320b77 100644 --- a/server/main.go +++ b/server/main.go @@ -42,6 +42,7 @@ import ( "github.com/gorilla/mux" signaling "github.com/strukturag/nextcloud-spreed-signaling" + "github.com/strukturag/nextcloud-spreed-signaling/async/events" "github.com/strukturag/nextcloud-spreed-signaling/config" "github.com/strukturag/nextcloud-spreed-signaling/dns" "github.com/strukturag/nextcloud-spreed-signaling/etcd" @@ -185,7 +186,7 @@ func main() { natsUrl = nats.DefaultURL } - events, err := signaling.NewAsyncEvents(stopCtx, natsUrl) + events, err := events.NewAsyncEvents(stopCtx, natsUrl) if err != nil { logger.Fatal("Could not create async events client: ", err) } diff --git a/virtualsession.go b/virtualsession.go index 59c6e8e..bfb85bb 100644 --- a/virtualsession.go +++ b/virtualsession.go @@ -29,6 +29,7 @@ import ( "sync/atomic" "github.com/strukturag/nextcloud-spreed-signaling/api" + "github.com/strukturag/nextcloud-spreed-signaling/async/events" "github.com/strukturag/nextcloud-spreed-signaling/log" "github.com/strukturag/nextcloud-spreed-signaling/nats" "github.com/strukturag/nextcloud-spreed-signaling/talk" @@ -60,7 +61,7 @@ type VirtualSession struct { parseUserData func() (api.StringMap, error) - asyncCh AsyncChannel + asyncCh events.AsyncChannel } func GetVirtualSessionId(session Session, sessionId api.PublicSessionId) api.PublicSessionId { @@ -87,7 +88,7 @@ func NewVirtualSession(session *ClientSession, privateId api.PrivateSessionId, p parseUserData: parseUserData(msg.User), options: msg.Options, - asyncCh: make(AsyncChannel, DefaultAsyncChannelSize), + asyncCh: make(events.AsyncChannel, events.DefaultAsyncChannelSize), } if err := session.events.RegisterSessionListener(publicId, session.Backend(), result); err != nil { @@ -198,7 +199,7 @@ func (s *VirtualSession) LeaveRoom(notify bool) *Room { return room } -func (s *VirtualSession) AsyncChannel() AsyncChannel { +func (s *VirtualSession) AsyncChannel() events.AsyncChannel { return s.asyncCh } @@ -315,7 +316,7 @@ func (s *VirtualSession) Options() *api.AddSessionOptions { } func (s *VirtualSession) processAsyncNatsMessage(msg *nats.Msg) { - var message AsyncMessage + var message events.AsyncMessage if err := nats.Decode(msg, &message); err != nil { s.logger.Printf("Could not decode NATS message %+v: %s", msg, err) return @@ -324,7 +325,7 @@ func (s *VirtualSession) processAsyncNatsMessage(msg *nats.Msg) { s.processAsyncMessage(&message) } -func (s *VirtualSession) processAsyncMessage(message *AsyncMessage) { +func (s *VirtualSession) processAsyncMessage(message *events.AsyncMessage) { if message.Type == "message" && message.Message != nil { switch message.Message.Type { case "message": @@ -359,7 +360,7 @@ func (s *VirtualSession) processAsyncMessage(message *AsyncMessage) { return } - s.session.processAsyncMessage(&AsyncMessage{ + s.session.processAsyncMessage(&events.AsyncMessage{ Type: "message", SendTime: message.SendTime, Message: &api.ServerMessage{