Process all NATS messages for same target from single goroutine.

This commit is contained in:
Joachim Bauch 2025-12-22 15:18:03 +01:00
commit f2eac4c3b3
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
10 changed files with 256 additions and 504 deletions

View file

@ -23,41 +23,41 @@ package signaling
import (
"context"
"sync"
"errors"
"github.com/nats-io/nats.go"
"github.com/strukturag/nextcloud-spreed-signaling/log"
)
type AsyncBackendRoomEventListener interface {
ProcessBackendRoomRequest(message *AsyncMessage)
}
var (
ErrAlreadyRegistered = errors.New("already registered") // +checklocksignore: Global readonly variable.
)
type AsyncRoomEventListener interface {
ProcessAsyncRoomMessage(message *AsyncMessage)
}
const (
DefaultAsyncChannelSize = 64
)
type AsyncUserEventListener interface {
ProcessAsyncUserMessage(message *AsyncMessage)
}
type AsyncChannel chan *nats.Msg
type AsyncSessionEventListener interface {
ProcessAsyncSessionMessage(message *AsyncMessage)
type AsyncEventListener interface {
AsyncChannel() AsyncChannel
}
type AsyncEvents interface {
Close(ctx context.Context) error
RegisterBackendRoomListener(roomId string, backend *Backend, listener AsyncBackendRoomEventListener) error
UnregisterBackendRoomListener(roomId string, backend *Backend, listener AsyncBackendRoomEventListener)
RegisterBackendRoomListener(roomId string, backend *Backend, listener AsyncEventListener) error
UnregisterBackendRoomListener(roomId string, backend *Backend, listener AsyncEventListener) error
RegisterRoomListener(roomId string, backend *Backend, listener AsyncRoomEventListener) error
UnregisterRoomListener(roomId string, backend *Backend, listener AsyncRoomEventListener)
RegisterRoomListener(roomId string, backend *Backend, listener AsyncEventListener) error
UnregisterRoomListener(roomId string, backend *Backend, listener AsyncEventListener) error
RegisterUserListener(userId string, backend *Backend, listener AsyncUserEventListener) error
UnregisterUserListener(userId string, backend *Backend, listener AsyncUserEventListener)
RegisterUserListener(userId string, backend *Backend, listener AsyncEventListener) error
UnregisterUserListener(userId string, backend *Backend, listener AsyncEventListener) error
RegisterSessionListener(sessionId PublicSessionId, backend *Backend, listener AsyncSessionEventListener) error
UnregisterSessionListener(sessionId PublicSessionId, backend *Backend, listener AsyncSessionEventListener)
RegisterSessionListener(sessionId PublicSessionId, backend *Backend, listener AsyncEventListener) error
UnregisterSessionListener(sessionId PublicSessionId, backend *Backend, listener AsyncEventListener) error
PublishBackendRoomMessage(roomId string, backend *Backend, message *AsyncMessage) error
PublishRoomMessage(roomId string, backend *Backend, message *AsyncMessage) error
@ -73,147 +73,3 @@ func NewAsyncEvents(ctx context.Context, url string) (AsyncEvents, error) {
return NewAsyncEventsNats(log.LoggerFromContext(ctx), client)
}
type asyncBackendRoomSubscriber struct {
mu sync.Mutex
// +checklocks:mu
listeners map[AsyncBackendRoomEventListener]bool
}
func (s *asyncBackendRoomSubscriber) processBackendRoomRequest(message *AsyncMessage) {
s.mu.Lock()
defer s.mu.Unlock()
for listener := range s.listeners {
s.mu.Unlock()
listener.ProcessBackendRoomRequest(message)
s.mu.Lock()
}
}
func (s *asyncBackendRoomSubscriber) addListener(listener AsyncBackendRoomEventListener) {
s.mu.Lock()
defer s.mu.Unlock()
if s.listeners == nil {
s.listeners = make(map[AsyncBackendRoomEventListener]bool)
}
s.listeners[listener] = true
}
func (s *asyncBackendRoomSubscriber) removeListener(listener AsyncBackendRoomEventListener) bool {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.listeners, listener)
return len(s.listeners) > 0
}
type asyncRoomSubscriber struct {
mu sync.Mutex
// +checklocks:mu
listeners map[AsyncRoomEventListener]bool
}
func (s *asyncRoomSubscriber) processAsyncRoomMessage(message *AsyncMessage) {
s.mu.Lock()
defer s.mu.Unlock()
for listener := range s.listeners {
s.mu.Unlock()
listener.ProcessAsyncRoomMessage(message)
s.mu.Lock()
}
}
func (s *asyncRoomSubscriber) addListener(listener AsyncRoomEventListener) {
s.mu.Lock()
defer s.mu.Unlock()
if s.listeners == nil {
s.listeners = make(map[AsyncRoomEventListener]bool)
}
s.listeners[listener] = true
}
func (s *asyncRoomSubscriber) removeListener(listener AsyncRoomEventListener) bool {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.listeners, listener)
return len(s.listeners) > 0
}
type asyncUserSubscriber struct {
mu sync.Mutex
// +checklocks:mu
listeners map[AsyncUserEventListener]bool
}
func (s *asyncUserSubscriber) processAsyncUserMessage(message *AsyncMessage) {
s.mu.Lock()
defer s.mu.Unlock()
for listener := range s.listeners {
s.mu.Unlock()
listener.ProcessAsyncUserMessage(message)
s.mu.Lock()
}
}
func (s *asyncUserSubscriber) addListener(listener AsyncUserEventListener) {
s.mu.Lock()
defer s.mu.Unlock()
if s.listeners == nil {
s.listeners = make(map[AsyncUserEventListener]bool)
}
s.listeners[listener] = true
}
func (s *asyncUserSubscriber) removeListener(listener AsyncUserEventListener) bool {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.listeners, listener)
return len(s.listeners) > 0
}
type asyncSessionSubscriber struct {
mu sync.Mutex
// +checklocks:mu
listeners map[AsyncSessionEventListener]bool
}
func (s *asyncSessionSubscriber) processAsyncSessionMessage(message *AsyncMessage) {
s.mu.Lock()
defer s.mu.Unlock()
for listener := range s.listeners {
s.mu.Unlock()
listener.ProcessAsyncSessionMessage(message)
s.mu.Lock()
}
}
func (s *asyncSessionSubscriber) addListener(listener AsyncSessionEventListener) {
s.mu.Lock()
defer s.mu.Unlock()
if s.listeners == nil {
s.listeners = make(map[AsyncSessionEventListener]bool)
}
s.listeners[listener] = true
}
func (s *asyncSessionSubscriber) removeListener(listener AsyncSessionEventListener) bool {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.listeners, listener)
return len(s.listeners) > 0
}

View file

@ -23,6 +23,7 @@ package signaling
import (
"context"
"errors"
"sync"
"time"
@ -59,176 +60,7 @@ func GetSubjectForSessionId(sessionId PublicSessionId, backend *Backend) string
return string("session." + sessionId)
}
type asyncSubscriberNats struct {
key string
client NatsClient
logger log.Logger
receiver chan *nats.Msg
closeChan chan struct{}
subscription NatsSubscription
processMessage func(*nats.Msg)
}
func newAsyncSubscriberNats(logger log.Logger, key string, client NatsClient) (*asyncSubscriberNats, error) {
receiver := make(chan *nats.Msg, 64)
sub, err := client.Subscribe(key, receiver)
if err != nil {
return nil, err
}
result := &asyncSubscriberNats{
key: key,
client: client,
logger: logger,
receiver: receiver,
closeChan: make(chan struct{}),
subscription: sub,
}
return result, nil
}
func (s *asyncSubscriberNats) run() {
defer func() {
if err := s.subscription.Unsubscribe(); err != nil {
s.logger.Printf("Error unsubscribing %s: %s", s.key, err)
}
}()
for {
select {
case msg := <-s.receiver:
s.processMessage(msg)
for count := len(s.receiver); count > 0; count-- {
s.processMessage(<-s.receiver)
}
case <-s.closeChan:
return
}
}
}
func (s *asyncSubscriberNats) close() {
close(s.closeChan)
}
type asyncBackendRoomSubscriberNats struct {
*asyncSubscriberNats
asyncBackendRoomSubscriber
}
func newAsyncBackendRoomSubscriberNats(logger log.Logger, key string, client NatsClient) (*asyncBackendRoomSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(logger, key, client)
if err != nil {
return nil, err
}
result := &asyncBackendRoomSubscriberNats{
asyncSubscriberNats: sub,
}
result.processMessage = result.doProcessMessage
go result.run()
return result, nil
}
func (s *asyncBackendRoomSubscriberNats) doProcessMessage(msg *nats.Msg) {
var message AsyncMessage
if err := s.client.Decode(msg, &message); err != nil {
s.logger.Printf("Could not decode NATS message %+v, %s", msg, err)
return
}
s.processBackendRoomRequest(&message)
}
type asyncRoomSubscriberNats struct {
asyncRoomSubscriber
*asyncSubscriberNats
}
func newAsyncRoomSubscriberNats(logger log.Logger, key string, client NatsClient) (*asyncRoomSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(logger, key, client)
if err != nil {
return nil, err
}
result := &asyncRoomSubscriberNats{
asyncSubscriberNats: sub,
}
result.processMessage = result.doProcessMessage
go result.run()
return result, nil
}
func (s *asyncRoomSubscriberNats) doProcessMessage(msg *nats.Msg) {
var message AsyncMessage
if err := s.client.Decode(msg, &message); err != nil {
s.logger.Printf("Could not decode NATS message %+v, %s", msg, err)
return
}
s.processAsyncRoomMessage(&message)
}
type asyncUserSubscriberNats struct {
*asyncSubscriberNats
asyncUserSubscriber
}
func newAsyncUserSubscriberNats(logger log.Logger, key string, client NatsClient) (*asyncUserSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(logger, key, client)
if err != nil {
return nil, err
}
result := &asyncUserSubscriberNats{
asyncSubscriberNats: sub,
}
result.processMessage = result.doProcessMessage
go result.run()
return result, nil
}
func (s *asyncUserSubscriberNats) doProcessMessage(msg *nats.Msg) {
var message AsyncMessage
if err := s.client.Decode(msg, &message); err != nil {
s.logger.Printf("Could not decode NATS message %+v, %s", msg, err)
return
}
s.processAsyncUserMessage(&message)
}
type asyncSessionSubscriberNats struct {
*asyncSubscriberNats
asyncSessionSubscriber
}
func newAsyncSessionSubscriberNats(logger log.Logger, key string, client NatsClient) (*asyncSessionSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(logger, key, client)
if err != nil {
return nil, err
}
result := &asyncSessionSubscriberNats{
asyncSubscriberNats: sub,
}
result.processMessage = result.doProcessMessage
go result.run()
return result, nil
}
func (s *asyncSessionSubscriberNats) doProcessMessage(msg *nats.Msg) {
var message AsyncMessage
if err := s.client.Decode(msg, &message); err != nil {
s.logger.Printf("Could not decode NATS message %+v, %s", msg, err)
return
}
s.processAsyncSessionMessage(&message)
}
type asyncEventsNatsSubscriptions map[string]map[AsyncEventListener]NatsSubscription
type asyncEventsNats struct {
mu sync.Mutex
@ -236,13 +68,13 @@ type asyncEventsNats struct {
logger log.Logger // +checklocksignore
// +checklocks:mu
backendRoomSubscriptions map[string]*asyncBackendRoomSubscriberNats
backendRoomSubscriptions asyncEventsNatsSubscriptions
// +checklocks:mu
roomSubscriptions map[string]*asyncRoomSubscriberNats
roomSubscriptions asyncEventsNatsSubscriptions
// +checklocks:mu
userSubscriptions map[string]*asyncUserSubscriberNats
userSubscriptions asyncEventsNatsSubscriptions
// +checklocks:mu
sessionSubscriptions map[string]*asyncSessionSubscriberNats
sessionSubscriptions asyncEventsNatsSubscriptions
}
func NewAsyncEventsNats(logger log.Logger, client NatsClient) (AsyncEvents, error) {
@ -250,10 +82,10 @@ func NewAsyncEventsNats(logger log.Logger, client NatsClient) (AsyncEvents, erro
client: client,
logger: logger,
backendRoomSubscriptions: make(map[string]*asyncBackendRoomSubscriberNats),
roomSubscriptions: make(map[string]*asyncRoomSubscriberNats),
userSubscriptions: make(map[string]*asyncUserSubscriberNats),
sessionSubscriptions: make(map[string]*asyncSessionSubscriberNats),
backendRoomSubscriptions: make(asyncEventsNatsSubscriptions),
roomSubscriptions: make(asyncEventsNatsSubscriptions),
userSubscriptions: make(asyncEventsNatsSubscriptions),
sessionSubscriptions: make(asyncEventsNatsSubscriptions),
}
return events, nil
}
@ -283,186 +115,153 @@ func (e *asyncEventsNats) GetServerInfoNats() *BackendServerInfoNats {
return nats
}
func closeSubscriptions(logger log.Logger, wg *sync.WaitGroup, subscriptions asyncEventsNatsSubscriptions) {
defer wg.Done()
for subject, subs := range subscriptions {
for _, sub := range subs {
if err := sub.Unsubscribe(); err != nil && !errors.Is(err, nats.ErrConnectionClosed) {
logger.Printf("Error unsubscribing %s: %s", subject, err)
}
}
}
}
func (e *asyncEventsNats) Close(ctx context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()
var wg sync.WaitGroup
wg.Add(1)
go func(subscriptions map[string]*asyncBackendRoomSubscriberNats) {
defer wg.Done()
for _, sub := range subscriptions {
sub.close()
}
}(e.backendRoomSubscriptions)
go closeSubscriptions(e.logger, &wg, e.backendRoomSubscriptions)
wg.Add(1)
go func(subscriptions map[string]*asyncRoomSubscriberNats) {
defer wg.Done()
for _, sub := range subscriptions {
sub.close()
}
}(e.roomSubscriptions)
go closeSubscriptions(e.logger, &wg, e.roomSubscriptions)
wg.Add(1)
go func(subscriptions map[string]*asyncUserSubscriberNats) {
defer wg.Done()
for _, sub := range subscriptions {
sub.close()
}
}(e.userSubscriptions)
go closeSubscriptions(e.logger, &wg, e.userSubscriptions)
wg.Add(1)
go func(subscriptions map[string]*asyncSessionSubscriberNats) {
defer wg.Done()
for _, sub := range subscriptions {
sub.close()
}
}(e.sessionSubscriptions)
go closeSubscriptions(e.logger, &wg, e.sessionSubscriptions)
// Can't use clear(...) here as the maps are processed asynchronously by the
// goroutines above.
e.backendRoomSubscriptions = make(map[string]*asyncBackendRoomSubscriberNats)
e.roomSubscriptions = make(map[string]*asyncRoomSubscriberNats)
e.userSubscriptions = make(map[string]*asyncUserSubscriberNats)
e.sessionSubscriptions = make(map[string]*asyncSessionSubscriberNats)
e.backendRoomSubscriptions = make(asyncEventsNatsSubscriptions)
e.roomSubscriptions = make(asyncEventsNatsSubscriptions)
e.userSubscriptions = make(asyncEventsNatsSubscriptions)
e.sessionSubscriptions = make(asyncEventsNatsSubscriptions)
wg.Wait()
return e.client.Close(ctx)
}
func (e *asyncEventsNats) RegisterBackendRoomListener(roomId string, backend *Backend, listener AsyncBackendRoomEventListener) error {
// +checklocks:e.mu
func (e *asyncEventsNats) registerListener(key string, subscriptions asyncEventsNatsSubscriptions, listener AsyncEventListener) error {
subs, found := subscriptions[key]
if !found {
subs = make(map[AsyncEventListener]NatsSubscription)
subscriptions[key] = subs
} else if _, found := subs[listener]; found {
return ErrAlreadyRegistered
}
sub, err := e.client.Subscribe(key, listener.AsyncChannel())
if err != nil {
return err
}
subs[listener] = sub
return nil
}
// +checklocks:e.mu
func (e *asyncEventsNats) unregisterListener(key string, subscriptions asyncEventsNatsSubscriptions, listener AsyncEventListener) error {
subs, found := subscriptions[key]
if !found {
return nil
}
sub, found := subs[listener]
if !found {
return nil
}
delete(subs, listener)
if len(subs) == 0 {
delete(subscriptions, key)
}
return sub.Unsubscribe()
}
func (e *asyncEventsNats) RegisterBackendRoomListener(roomId string, backend *Backend, listener AsyncEventListener) error {
key := GetSubjectForBackendRoomId(roomId, backend)
e.mu.Lock()
defer e.mu.Unlock()
sub, found := e.backendRoomSubscriptions[key]
if !found {
var err error
if sub, err = newAsyncBackendRoomSubscriberNats(e.logger, key, e.client); err != nil {
return err
}
e.backendRoomSubscriptions[key] = sub
}
sub.addListener(listener)
return nil
return e.registerListener(key, e.backendRoomSubscriptions, listener)
}
func (e *asyncEventsNats) UnregisterBackendRoomListener(roomId string, backend *Backend, listener AsyncBackendRoomEventListener) {
func (e *asyncEventsNats) UnregisterBackendRoomListener(roomId string, backend *Backend, listener AsyncEventListener) error {
key := GetSubjectForBackendRoomId(roomId, backend)
e.mu.Lock()
defer e.mu.Unlock()
sub, found := e.backendRoomSubscriptions[key]
if !found {
return
}
if !sub.removeListener(listener) {
delete(e.backendRoomSubscriptions, key)
sub.close()
}
return e.unregisterListener(key, e.backendRoomSubscriptions, listener)
}
func (e *asyncEventsNats) RegisterRoomListener(roomId string, backend *Backend, listener AsyncRoomEventListener) error {
func (e *asyncEventsNats) RegisterRoomListener(roomId string, backend *Backend, listener AsyncEventListener) error {
key := GetSubjectForRoomId(roomId, backend)
e.mu.Lock()
defer e.mu.Unlock()
sub, found := e.roomSubscriptions[key]
if !found {
var err error
if sub, err = newAsyncRoomSubscriberNats(e.logger, key, e.client); err != nil {
return err
}
e.roomSubscriptions[key] = sub
}
sub.addListener(listener)
return nil
return e.registerListener(key, e.roomSubscriptions, listener)
}
func (e *asyncEventsNats) UnregisterRoomListener(roomId string, backend *Backend, listener AsyncRoomEventListener) {
func (e *asyncEventsNats) UnregisterRoomListener(roomId string, backend *Backend, listener AsyncEventListener) error {
key := GetSubjectForRoomId(roomId, backend)
e.mu.Lock()
defer e.mu.Unlock()
sub, found := e.roomSubscriptions[key]
if !found {
return
}
if !sub.removeListener(listener) {
delete(e.roomSubscriptions, key)
sub.close()
}
return e.unregisterListener(key, e.roomSubscriptions, listener)
}
func (e *asyncEventsNats) RegisterUserListener(roomId string, backend *Backend, listener AsyncUserEventListener) error {
func (e *asyncEventsNats) RegisterUserListener(roomId string, backend *Backend, listener AsyncEventListener) error {
key := GetSubjectForUserId(roomId, backend)
e.mu.Lock()
defer e.mu.Unlock()
sub, found := e.userSubscriptions[key]
if !found {
var err error
if sub, err = newAsyncUserSubscriberNats(e.logger, key, e.client); err != nil {
return err
}
e.userSubscriptions[key] = sub
}
sub.addListener(listener)
return nil
return e.registerListener(key, e.userSubscriptions, listener)
}
func (e *asyncEventsNats) UnregisterUserListener(roomId string, backend *Backend, listener AsyncUserEventListener) {
func (e *asyncEventsNats) UnregisterUserListener(roomId string, backend *Backend, listener AsyncEventListener) error {
key := GetSubjectForUserId(roomId, backend)
e.mu.Lock()
defer e.mu.Unlock()
sub, found := e.userSubscriptions[key]
if !found {
return
}
if !sub.removeListener(listener) {
delete(e.userSubscriptions, key)
sub.close()
}
return e.unregisterListener(key, e.userSubscriptions, listener)
}
func (e *asyncEventsNats) RegisterSessionListener(sessionId PublicSessionId, backend *Backend, listener AsyncSessionEventListener) error {
func (e *asyncEventsNats) RegisterSessionListener(sessionId PublicSessionId, backend *Backend, listener AsyncEventListener) error {
key := GetSubjectForSessionId(sessionId, backend)
e.mu.Lock()
defer e.mu.Unlock()
sub, found := e.sessionSubscriptions[key]
if !found {
var err error
if sub, err = newAsyncSessionSubscriberNats(e.logger, key, e.client); err != nil {
return err
}
e.sessionSubscriptions[key] = sub
}
sub.addListener(listener)
return nil
return e.registerListener(key, e.sessionSubscriptions, listener)
}
func (e *asyncEventsNats) UnregisterSessionListener(sessionId PublicSessionId, backend *Backend, listener AsyncSessionEventListener) {
func (e *asyncEventsNats) UnregisterSessionListener(sessionId PublicSessionId, backend *Backend, listener AsyncEventListener) error {
key := GetSubjectForSessionId(sessionId, backend)
e.mu.Lock()
defer e.mu.Unlock()
sub, found := e.sessionSubscriptions[key]
if !found {
return
}
if !sub.removeListener(listener) {
delete(e.sessionSubscriptions, key)
sub.close()
}
return e.unregisterListener(key, e.sessionSubscriptions, listener)
}
func (e *asyncEventsNats) publish(subject string, message *AsyncMessage) error {
message.SendTime = time.Now()
message.SendTime = time.Now().Truncate(time.Microsecond)
return e.client.Publish(subject, message)
}

View file

@ -241,13 +241,15 @@ func performBackendRequest(requestUrl string, body []byte) (*http.Response, erro
return client.Do(request)
}
func expectRoomlistEvent(t *testing.T, ch chan *AsyncMessage, msgType string) (*EventServerMessage, bool) {
func expectRoomlistEvent(t *testing.T, ch AsyncChannel, msgType string) (*EventServerMessage, bool) {
assert := assert.New(t)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
select {
case message := <-ch:
if !assert.Equal("message", message.Type, "invalid message type, got %+v", message) ||
case natsMsg := <-ch:
var message AsyncMessage
if !assert.NoError(NatsDecode(natsMsg, &message)) ||
!assert.Equal("message", message.Type, "invalid message type, got %+v", message) ||
!assert.NotNil(message.Message, "message missing, got %+v", message) {
return nil, false
}
@ -403,11 +405,11 @@ func TestBackendServer_RoomInvite(t *testing.T) {
}
type channelEventListener struct {
ch chan *AsyncMessage
ch AsyncChannel
}
func (l *channelEventListener) ProcessAsyncUserMessage(message *AsyncMessage) {
l.ch <- message
func (l *channelEventListener) AsyncChannel() AsyncChannel {
return l.ch
}
func RunTestBackendServer_RoomInvite(ctx context.Context, t *testing.T) {
@ -422,13 +424,14 @@ func RunTestBackendServer_RoomInvite(ctx context.Context, t *testing.T) {
roomProperties := json.RawMessage("{\"foo\":\"bar\"}")
backend := hub.backend.GetBackend(u)
eventsChan := make(chan *AsyncMessage, 1)
eventsChan := make(AsyncChannel, 1)
listener := &channelEventListener{
ch: eventsChan,
}
require.NoError(events.RegisterUserListener(userid, backend, listener))
defer events.UnregisterUserListener(userid, backend, listener)
defer func() {
assert.NoError(events.UnregisterUserListener(userid, backend, listener))
}()
msg := &BackendServerRoomRequest{
Type: "invite",
Invite: &BackendRoomInviteRequest{
@ -497,13 +500,14 @@ func RunTestBackendServer_RoomDisinvite(ctx context.Context, t *testing.T) {
roomProperties := json.RawMessage("{\"foo\":\"bar\"}")
eventsChan := make(chan *AsyncMessage, 1)
eventsChan := make(AsyncChannel, 1)
listener := &channelEventListener{
ch: eventsChan,
}
require.NoError(events.RegisterUserListener(testDefaultUserId, backend, listener))
defer events.UnregisterUserListener(testDefaultUserId, backend, listener)
defer func() {
assert.NoError(events.UnregisterUserListener(testDefaultUserId, backend, listener))
}()
msg := &BackendServerRoomRequest{
Type: "disinvite",
Disinvite: &BackendRoomDisinviteRequest{
@ -651,13 +655,14 @@ func RunTestBackendServer_RoomUpdate(ctx context.Context, t *testing.T) {
userid := "test-userid"
roomProperties := json.RawMessage("{\"foo\":\"bar\"}")
eventsChan := make(chan *AsyncMessage, 1)
eventsChan := make(AsyncChannel, 1)
listener := &channelEventListener{
ch: eventsChan,
}
require.NoError(events.RegisterUserListener(userid, backend, listener))
defer events.UnregisterUserListener(userid, backend, listener)
defer func() {
assert.NoError(events.UnregisterUserListener(userid, backend, listener))
}()
msg := &BackendServerRoomRequest{
Type: "update",
Update: &BackendRoomUpdateRequest{
@ -718,13 +723,14 @@ func RunTestBackendServer_RoomDelete(ctx context.Context, t *testing.T) {
require.NoError(err)
userid := "test-userid"
eventsChan := make(chan *AsyncMessage, 1)
eventsChan := make(AsyncChannel, 1)
listener := &channelEventListener{
ch: eventsChan,
}
require.NoError(events.RegisterUserListener(userid, backend, listener))
defer events.UnregisterUserListener(userid, backend, listener)
defer func() {
assert.NoError(events.UnregisterUserListener(userid, backend, listener))
}()
msg := &BackendServerRoomRequest{
Type: "delete",
Delete: &BackendRoomDeleteRequest{

View file

@ -33,6 +33,7 @@ import (
"sync/atomic"
"time"
"github.com/nats-io/nats.go"
"github.com/pion/sdp/v3"
"github.com/strukturag/nextcloud-spreed-signaling/api"
@ -82,7 +83,8 @@ type ClientSession struct {
backendUrl string
parsedBackendUrl *url.URL
mu sync.Mutex
mu sync.Mutex
asyncCh AsyncChannel
// +checklocks:mu
client HandlerClient
@ -140,6 +142,7 @@ func NewClientSession(hub *Hub, privateId PrivateSessionId, publicId PublicSessi
parseUserData: parseUserData(auth.User),
backend: backend,
asyncCh: make(AsyncChannel, DefaultAsyncChannelSize),
}
if s.clientType == HelloClientTypeInternal {
s.backendUrl = hello.Auth.internalParams.Backend
@ -155,6 +158,7 @@ func NewClientSession(hub *Hub, privateId PrivateSessionId, publicId PublicSessi
if err := s.SubscribeEvents(); err != nil {
return nil, err
}
go s.run()
return s, nil
}
@ -391,6 +395,24 @@ func (s *ClientSession) releaseMcuObjects() {
}
}
func (s *ClientSession) AsyncChannel() AsyncChannel {
return s.asyncCh
}
func (s *ClientSession) run() {
for {
select {
case <-s.ctx.Done():
return
case msg := <-s.asyncCh:
s.processAsyncNatsMessage(msg)
for count := len(s.asyncCh); count > 0; count-- {
s.processAsyncNatsMessage(<-s.asyncCh)
}
}
}
}
func (s *ClientSession) Close() {
s.closeAndWait(true)
}
@ -406,9 +428,13 @@ func (s *ClientSession) closeAndWait(wait bool) {
s.mu.Lock()
defer s.mu.Unlock()
if s.userId != "" {
s.events.UnregisterUserListener(s.userId, s.backend, s)
if err := s.events.UnregisterUserListener(s.userId, s.backend, s); err != nil && !errors.Is(err, nats.ErrConnectionClosed) {
s.logger.Printf("Error unsubscribing user listener for %s in session %s: %s", s.userId, s.publicId, err)
}
}
if err := s.events.UnregisterSessionListener(s.publicId, s.backend, s); err != nil && !errors.Is(err, nats.ErrConnectionClosed) {
s.logger.Printf("Error unsubscribing listener in session %s: %s", s.publicId, err)
}
s.events.UnregisterSessionListener(s.publicId, s.backend, s)
go func(virtualSessions map[*VirtualSession]bool) {
for session := range virtualSessions {
session.Close()
@ -543,7 +569,9 @@ func (s *ClientSession) UnsubscribeRoomEvents() {
func (s *ClientSession) doUnsubscribeRoomEvents(notify bool) {
room := s.GetRoom()
if room != nil {
s.events.UnregisterRoomListener(room.Id(), s.Backend(), s)
if err := s.events.UnregisterRoomListener(room.Id(), s.Backend(), s); err != nil && !errors.Is(err, nats.ErrConnectionClosed) {
s.logger.Printf("Error unsubscribing room listener for %s in session %s: %s", room.Id(), s.publicId, err)
}
}
s.hub.roomSessions.DeleteRoomSession(s)
@ -1039,16 +1067,14 @@ func (s *ClientSession) GetSubscriber(id PublicSessionId, streamType StreamType)
return s.subscribers[getStreamId(id, streamType)]
}
func (s *ClientSession) ProcessAsyncRoomMessage(message *AsyncMessage) {
s.processAsyncMessage(message)
}
func (s *ClientSession) processAsyncNatsMessage(msg *nats.Msg) {
var message AsyncMessage
if err := NatsDecode(msg, &message); err != nil {
s.logger.Printf("Could not decode NATS message %+v: %s", msg, err)
return
}
func (s *ClientSession) ProcessAsyncUserMessage(message *AsyncMessage) {
s.processAsyncMessage(message)
}
func (s *ClientSession) ProcessAsyncSessionMessage(message *AsyncMessage) {
s.processAsyncMessage(message)
s.processAsyncMessage(&message)
}
func (s *ClientSession) processAsyncMessage(message *AsyncMessage) {

View file

@ -216,7 +216,7 @@ func TestFeatureChatRelay(t *testing.T) {
require.NoError(err)
// Simulate request from the backend.
room.ProcessBackendRoomRequest(&AsyncMessage{
room.processAsyncMessage(&AsyncMessage{
Type: "room",
Room: &BackendServerRoomRequest{
Type: "message",
@ -413,7 +413,7 @@ func TestFeatureChatRelayFederation(t *testing.T) {
require.NoError(err)
// Simulate request from the backend.
room.ProcessBackendRoomRequest(&AsyncMessage{
room.processAsyncMessage(&AsyncMessage{
Type: "room",
Room: &BackendServerRoomRequest{
Type: "message",
@ -513,7 +513,7 @@ func TestPermissionHideDisplayNames(t *testing.T) {
require.NoError(err)
// Simulate request from the backend.
room.ProcessBackendRoomRequest(&AsyncMessage{
room.processAsyncMessage(&AsyncMessage{
Type: "room",
Room: &BackendServerRoomRequest{
Type: "message",

View file

@ -3361,7 +3361,7 @@ func TestCombineChatRefreshWhileDisconnected(t *testing.T) {
require.NoError(json.Unmarshal([]byte(chat_refresh), &data))
// Simulate requests from the backend.
room.ProcessBackendRoomRequest(&AsyncMessage{
room.processAsyncMessage(&AsyncMessage{
Type: "room",
Room: &BackendServerRoomRequest{
Type: "message",
@ -3370,7 +3370,7 @@ func TestCombineChatRefreshWhileDisconnected(t *testing.T) {
},
},
})
room.ProcessBackendRoomRequest(&AsyncMessage{
room.processAsyncMessage(&AsyncMessage{
Type: "room",
Room: &BackendServerRoomRequest{
Type: "message",

View file

@ -53,8 +53,6 @@ type NatsClient interface {
Subscribe(subject string, ch chan *nats.Msg) (NatsSubscription, error)
Publish(subject string, message any) error
Decode(msg *nats.Msg, v any) error
}
// The NATS client doesn't work if a subject contains spaces. As the room id
@ -156,7 +154,7 @@ func (c *natsClient) Publish(subject string, message any) error {
return c.conn.Publish(subject, data)
}
func (c *natsClient) Decode(msg *nats.Msg, vPtr any) (err error) {
func NatsDecode(msg *nats.Msg, vPtr any) (err error) {
switch arg := vPtr.(type) {
case *string:
// If they want a string and it is a JSON string, strip quotes

View file

@ -192,7 +192,3 @@ func (c *LoopbackNatsClient) Publish(subject string, message any) error {
c.wakeup.Signal()
return nil
}
func (c *LoopbackNatsClient) Decode(msg *nats.Msg, v any) error {
return json.Unmarshal(msg.Data, v)
}

33
room.go
View file

@ -25,6 +25,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"maps"
"net/url"
@ -32,6 +33,7 @@ import (
"sync"
"time"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
"github.com/strukturag/nextcloud-spreed-signaling/api"
@ -73,8 +75,9 @@ type Room struct {
// +checklocks:mu
properties json.RawMessage
closer *Closer
mu *sync.RWMutex
closer *Closer
mu *sync.RWMutex
asyncCh AsyncChannel
// +checklocks:mu
sessions map[PublicSessionId]Session
// +checklocks:mu
@ -118,6 +121,7 @@ func NewRoom(roomId string, properties json.RawMessage, hub *Hub, events AsyncEv
closer: NewCloser(),
mu: &sync.RWMutex{},
asyncCh: make(AsyncChannel, DefaultAsyncChannelSize),
sessions: make(map[PublicSessionId]Session),
internalSessions: make(map[*ClientSession]bool),
@ -180,6 +184,10 @@ func (r *Room) IsEqual(other *Room) bool {
return b1.Id() == b2.Id()
}
func (r *Room) AsyncChannel() AsyncChannel {
return r.asyncCh
}
func (r *Room) run() {
ticker := time.NewTicker(updateActiveSessionsInterval)
loop:
@ -187,6 +195,11 @@ loop:
select {
case <-r.closer.C:
break loop
case msg := <-r.asyncCh:
r.processAsyncNatsMessage(msg)
for count := len(r.asyncCh); count > 0; count-- {
r.processAsyncNatsMessage(<-r.asyncCh)
}
case <-ticker.C:
r.publishActiveSessions()
}
@ -198,7 +211,9 @@ func (r *Room) doClose() {
}
func (r *Room) unsubscribeBackend() {
r.events.UnregisterBackendRoomListener(r.id, r.backend, r)
if err := r.events.UnregisterBackendRoomListener(r.id, r.backend, r); err != nil && !errors.Is(err, nats.ErrConnectionClosed) {
r.logger.Printf("Error unsubscribing room listener in %s: %s", r.id, err)
}
}
func (r *Room) Close() []Session {
@ -218,7 +233,17 @@ func (r *Room) Close() []Session {
return result
}
func (r *Room) ProcessBackendRoomRequest(message *AsyncMessage) {
func (r *Room) processAsyncNatsMessage(msg *nats.Msg) {
var message AsyncMessage
if err := NatsDecode(msg, &message); err != nil {
r.logger.Printf("Could not decode NATS message %+v: %s", msg, err)
return
}
r.processAsyncMessage(&message)
}
func (r *Room) processAsyncMessage(message *AsyncMessage) {
switch message.Type {
case "room":
r.processBackendRoomRequestRoom(message.Room)

View file

@ -24,9 +24,11 @@ package signaling
import (
"context"
"encoding/json"
"errors"
"net/url"
"sync/atomic"
"github.com/nats-io/nats.go"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/log"
)
@ -44,6 +46,8 @@ type VirtualSession struct {
privateId PrivateSessionId
publicId PublicSessionId
data *SessionIdData
ctx context.Context
closeFunc context.CancelFunc
room atomic.Pointer[Room]
sessionId PublicSessionId
@ -54,6 +58,8 @@ type VirtualSession struct {
options *AddSessionOptions
parseUserData func() (api.StringMap, error)
asyncCh AsyncChannel
}
func GetVirtualSessionId(session Session, sessionId PublicSessionId) PublicSessionId {
@ -61,6 +67,9 @@ func GetVirtualSessionId(session Session, sessionId PublicSessionId) PublicSessi
}
func NewVirtualSession(session *ClientSession, privateId PrivateSessionId, publicId PublicSessionId, data *SessionIdData, msg *AddSessionInternalClientMessage) (*VirtualSession, error) {
ctx := log.NewLoggerContext(session.Context(), session.hub.logger)
ctx, closeFunc := context.WithCancel(ctx)
result := &VirtualSession{
logger: session.hub.logger,
hub: session.hub,
@ -68,12 +77,16 @@ func NewVirtualSession(session *ClientSession, privateId PrivateSessionId, publi
privateId: privateId,
publicId: publicId,
data: data,
ctx: ctx,
closeFunc: closeFunc,
sessionId: msg.SessionId,
userId: msg.UserId,
userData: msg.User,
parseUserData: parseUserData(msg.User),
options: msg.Options,
asyncCh: make(AsyncChannel, DefaultAsyncChannelSize),
}
if err := session.events.RegisterSessionListener(publicId, session.Backend(), result); err != nil {
@ -89,11 +102,12 @@ func NewVirtualSession(session *ClientSession, privateId PrivateSessionId, publi
result.SetFlags(msg.Flags)
}
go result.run()
return result, nil
}
func (s *VirtualSession) Context() context.Context {
return s.session.Context()
return s.ctx
}
func (s *VirtualSession) PrivateId() PrivateSessionId {
@ -178,18 +192,40 @@ func (s *VirtualSession) LeaveRoom(notify bool) *Room {
return room
}
func (s *VirtualSession) AsyncChannel() AsyncChannel {
return s.asyncCh
}
func (s *VirtualSession) run() {
for {
select {
case <-s.ctx.Done():
return
case msg := <-s.asyncCh:
s.processAsyncNatsMessage(msg)
for count := len(s.asyncCh); count > 0; count-- {
s.processAsyncNatsMessage(<-s.asyncCh)
}
}
}
}
func (s *VirtualSession) Close() {
s.CloseWithFeedback(nil, nil)
}
func (s *VirtualSession) CloseWithFeedback(session Session, message *ClientMessage) {
s.closeFunc()
room := s.GetRoom()
s.session.RemoveVirtualSession(s)
removed := s.session.hub.removeSession(s)
if removed && room != nil {
go s.notifyBackendRemoved(room, session, message)
}
s.session.events.UnregisterSessionListener(s.PublicId(), s.session.Backend(), s)
if err := s.session.events.UnregisterSessionListener(s.PublicId(), s.session.Backend(), s); err != nil && !errors.Is(err, nats.ErrConnectionClosed) {
s.logger.Printf("Error unsubscribing listener for session %s: %s", s.publicId, err)
}
}
func (s *VirtualSession) notifyBackendRemoved(room *Room, session Session, message *ClientMessage) {
@ -272,7 +308,17 @@ func (s *VirtualSession) Options() *AddSessionOptions {
return s.options
}
func (s *VirtualSession) ProcessAsyncSessionMessage(message *AsyncMessage) {
func (s *VirtualSession) processAsyncNatsMessage(msg *nats.Msg) {
var message AsyncMessage
if err := NatsDecode(msg, &message); err != nil {
s.logger.Printf("Could not decode NATS message %+v: %s", msg, err)
return
}
s.processAsyncMessage(&message)
}
func (s *VirtualSession) processAsyncMessage(message *AsyncMessage) {
if message.Type == "message" && message.Message != nil {
switch message.Message.Type {
case "message":
@ -286,7 +332,7 @@ func (s *VirtualSession) ProcessAsyncSessionMessage(message *AsyncMessage) {
SessionId: s.SessionId(),
UserId: s.UserId(),
}
s.session.ProcessAsyncSessionMessage(message)
s.session.processAsyncMessage(message)
}
case "event":
if room := s.GetRoom(); room != nil &&
@ -307,7 +353,7 @@ func (s *VirtualSession) ProcessAsyncSessionMessage(message *AsyncMessage) {
return
}
s.session.ProcessAsyncSessionMessage(&AsyncMessage{
s.session.processAsyncMessage(&AsyncMessage{
Type: "message",
SendTime: message.SendTime,
Message: &ServerMessage{
@ -334,7 +380,7 @@ func (s *VirtualSession) ProcessAsyncSessionMessage(message *AsyncMessage) {
SessionId: s.SessionId(),
UserId: s.UserId(),
}
s.session.ProcessAsyncSessionMessage(message)
s.session.processAsyncMessage(message)
}
}
}