Refactor asynchronous events to central location.

This commit is contained in:
Joachim Bauch 2022-06-14 17:01:57 +02:00
parent ddb7ece622
commit 0115c97946
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
16 changed files with 1085 additions and 388 deletions

View file

@ -95,10 +95,10 @@ coverhtml: vet common
PATH="$(GODIR)":$(PATH) "$(GOPATHBIN)/easyjson" -all $*.go
common: \
api_async_easyjson.go \
api_backend_easyjson.go \
api_proxy_easyjson.go \
api_signaling_easyjson.go \
natsclient_easyjson.go
api_signaling_easyjson.go
$(BINDIR):
mkdir -p $(BINDIR)

38
api_async.go Normal file
View file

@ -0,0 +1,38 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2022 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import "time"
type AsyncMessage struct {
SendTime time.Time `json:"sendtime"`
Type string `json:"type"`
Message *ServerMessage `json:"message,omitempty"`
Room *BackendServerRoomRequest `json:"room,omitempty"`
Permissions []Permission `json:"permissions,omitempty"`
Id string `json:"id"`
}

210
async_events.go Normal file
View file

@ -0,0 +1,210 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2022 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import "sync"
type AsyncBackendRoomEventListener interface {
ProcessBackendRoomRequest(request *BackendServerRoomRequest)
}
type AsyncRoomEventListener interface {
ProcessAsyncRoomMessage(message *AsyncMessage)
}
type AsyncUserEventListener interface {
ProcessAsyncUserMessage(message *AsyncMessage)
}
type AsyncSessionEventListener interface {
ProcessAsyncSessionMessage(message *AsyncMessage)
}
type AsyncEvents interface {
Close()
RegisterBackendRoomListener(roomId string, backend *Backend, listener AsyncBackendRoomEventListener) error
UnregisterBackendRoomListener(roomId string, backend *Backend, listener AsyncBackendRoomEventListener)
RegisterRoomListener(roomId string, backend *Backend, listener AsyncRoomEventListener) error
UnregisterRoomListener(roomId string, backend *Backend, listener AsyncRoomEventListener)
RegisterUserListener(userId string, backend *Backend, listener AsyncUserEventListener) error
UnregisterUserListener(userId string, backend *Backend, listener AsyncUserEventListener)
RegisterSessionListener(sessionId string, backend *Backend, listener AsyncSessionEventListener) error
UnregisterSessionListener(sessionId string, backend *Backend, listener AsyncSessionEventListener)
PublishBackendRoomMessage(roomId string, backend *Backend, message *AsyncMessage) error
PublishRoomMessage(roomId string, backend *Backend, message *AsyncMessage) error
PublishUserMessage(userId string, backend *Backend, message *AsyncMessage) error
PublishSessionMessage(sessionId string, backend *Backend, message *AsyncMessage) error
}
func NewAsyncEvents(url string) (AsyncEvents, error) {
client, err := NewNatsClient(url)
if err != nil {
return nil, err
}
return NewAsyncEventsNats(client)
}
type asyncBackendRoomSubscriber struct {
mu sync.Mutex
listeners map[AsyncBackendRoomEventListener]bool
}
func (s *asyncBackendRoomSubscriber) processBackendRoomRequest(message *BackendServerRoomRequest) {
s.mu.Lock()
defer s.mu.Unlock()
for listener := range s.listeners {
s.mu.Unlock()
listener.ProcessBackendRoomRequest(message)
s.mu.Lock()
}
}
func (s *asyncBackendRoomSubscriber) addListener(listener AsyncBackendRoomEventListener) {
s.mu.Lock()
defer s.mu.Unlock()
if s.listeners == nil {
s.listeners = make(map[AsyncBackendRoomEventListener]bool)
}
s.listeners[listener] = true
}
func (s *asyncBackendRoomSubscriber) removeListener(listener AsyncBackendRoomEventListener) bool {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.listeners, listener)
return len(s.listeners) > 0
}
type asyncRoomSubscriber struct {
mu sync.Mutex
listeners map[AsyncRoomEventListener]bool
}
func (s *asyncRoomSubscriber) processAsyncRoomMessage(message *AsyncMessage) {
s.mu.Lock()
defer s.mu.Unlock()
for listener := range s.listeners {
s.mu.Unlock()
listener.ProcessAsyncRoomMessage(message)
s.mu.Lock()
}
}
func (s *asyncRoomSubscriber) addListener(listener AsyncRoomEventListener) {
s.mu.Lock()
defer s.mu.Unlock()
if s.listeners == nil {
s.listeners = make(map[AsyncRoomEventListener]bool)
}
s.listeners[listener] = true
}
func (s *asyncRoomSubscriber) removeListener(listener AsyncRoomEventListener) bool {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.listeners, listener)
return len(s.listeners) > 0
}
type asyncUserSubscriber struct {
mu sync.Mutex
listeners map[AsyncUserEventListener]bool
}
func (s *asyncUserSubscriber) processAsyncUserMessage(message *AsyncMessage) {
s.mu.Lock()
defer s.mu.Unlock()
for listener := range s.listeners {
s.mu.Unlock()
listener.ProcessAsyncUserMessage(message)
s.mu.Lock()
}
}
func (s *asyncUserSubscriber) addListener(listener AsyncUserEventListener) {
s.mu.Lock()
defer s.mu.Unlock()
if s.listeners == nil {
s.listeners = make(map[AsyncUserEventListener]bool)
}
s.listeners[listener] = true
}
func (s *asyncUserSubscriber) removeListener(listener AsyncUserEventListener) bool {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.listeners, listener)
return len(s.listeners) > 0
}
type asyncSessionSubscriber struct {
mu sync.Mutex
listeners map[AsyncSessionEventListener]bool
}
func (s *asyncSessionSubscriber) processAsyncSessionMessage(message *AsyncMessage) {
s.mu.Lock()
defer s.mu.Unlock()
for listener := range s.listeners {
s.mu.Unlock()
listener.ProcessAsyncSessionMessage(message)
s.mu.Lock()
}
}
func (s *asyncSessionSubscriber) addListener(listener AsyncSessionEventListener) {
s.mu.Lock()
defer s.mu.Unlock()
if s.listeners == nil {
s.listeners = make(map[AsyncSessionEventListener]bool)
}
s.listeners[listener] = true
}
func (s *asyncSessionSubscriber) removeListener(listener AsyncSessionEventListener) bool {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.listeners, listener)
return len(s.listeners) > 0
}

445
async_events_nats.go Normal file
View file

@ -0,0 +1,445 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2022 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import (
"log"
"sync"
"time"
"github.com/nats-io/nats.go"
)
func GetSubjectForBackendRoomId(roomId string, backend *Backend) string {
if backend == nil || backend.IsCompat() {
return GetEncodedSubject("backend.room", roomId)
}
return GetEncodedSubject("backend.room", roomId+"|"+backend.Id())
}
func GetSubjectForRoomId(roomId string, backend *Backend) string {
if backend == nil || backend.IsCompat() {
return GetEncodedSubject("room", roomId)
}
return GetEncodedSubject("room", roomId+"|"+backend.Id())
}
func GetSubjectForUserId(userId string, backend *Backend) string {
if backend == nil || backend.IsCompat() {
return GetEncodedSubject("user", userId)
}
return GetEncodedSubject("user", userId+"|"+backend.Id())
}
func GetSubjectForSessionId(sessionId string, backend *Backend) string {
return "session." + sessionId
}
type asyncSubscriberNats struct {
key string
client NatsClient
receiver chan *nats.Msg
closeChan chan bool
subscription NatsSubscription
processMessage func(*nats.Msg)
}
func newAsyncSubscriberNats(key string, client NatsClient) (*asyncSubscriberNats, error) {
receiver := make(chan *nats.Msg, 64)
sub, err := client.Subscribe(key, receiver)
if err != nil {
return nil, err
}
result := &asyncSubscriberNats{
key: key,
client: client,
receiver: receiver,
closeChan: make(chan bool),
subscription: sub,
}
return result, nil
}
func (s *asyncSubscriberNats) run() {
defer func() {
if err := s.subscription.Unsubscribe(); err != nil {
log.Printf("Error unsubscribing %s: %s", s.key, err)
}
}()
for {
select {
case msg := <-s.receiver:
s.processMessage(msg)
for count := len(s.receiver); count > 0; count-- {
s.processMessage(<-s.receiver)
}
case <-s.closeChan:
return
}
}
}
func (s *asyncSubscriberNats) close() {
close(s.closeChan)
}
type asyncBackendRoomSubscriberNats struct {
*asyncSubscriberNats
asyncBackendRoomSubscriber
}
func newAsyncBackendRoomSubscriberNats(key string, client NatsClient) (*asyncBackendRoomSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(key, client)
if err != nil {
return nil, err
}
result := &asyncBackendRoomSubscriberNats{
asyncSubscriberNats: sub,
}
result.processMessage = result.doProcessMessage
go result.run()
return result, nil
}
func (s *asyncBackendRoomSubscriberNats) doProcessMessage(msg *nats.Msg) {
var message AsyncMessage
if err := s.client.Decode(msg, &message); err != nil {
log.Printf("Could not decode NATS message %+v, %s", msg, err)
return
}
switch message.Type {
case "room":
s.processBackendRoomRequest(message.Room)
default:
log.Printf("Unsupported NATS room request with type %s: %+v", message.Type, message)
}
}
type asyncRoomSubscriberNats struct {
asyncRoomSubscriber
*asyncSubscriberNats
}
func newAsyncRoomSubscriberNats(key string, client NatsClient) (*asyncRoomSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(key, client)
if err != nil {
return nil, err
}
result := &asyncRoomSubscriberNats{
asyncSubscriberNats: sub,
}
result.processMessage = result.doProcessMessage
go result.run()
return result, nil
}
func (s *asyncRoomSubscriberNats) doProcessMessage(msg *nats.Msg) {
var message AsyncMessage
if err := s.client.Decode(msg, &message); err != nil {
log.Printf("Could not decode nats message %+v, %s", msg, err)
return
}
s.processAsyncRoomMessage(&message)
}
type asyncUserSubscriberNats struct {
*asyncSubscriberNats
asyncUserSubscriber
}
func newAsyncUserSubscriberNats(key string, client NatsClient) (*asyncUserSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(key, client)
if err != nil {
return nil, err
}
result := &asyncUserSubscriberNats{
asyncSubscriberNats: sub,
}
result.processMessage = result.doProcessMessage
go result.run()
return result, nil
}
func (s *asyncUserSubscriberNats) doProcessMessage(msg *nats.Msg) {
var message AsyncMessage
if err := s.client.Decode(msg, &message); err != nil {
log.Printf("Could not decode nats message %+v, %s", msg, err)
return
}
s.processAsyncUserMessage(&message)
}
type asyncSessionSubscriberNats struct {
*asyncSubscriberNats
asyncSessionSubscriber
}
func newAsyncSessionSubscriberNats(key string, client NatsClient) (*asyncSessionSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(key, client)
if err != nil {
return nil, err
}
result := &asyncSessionSubscriberNats{
asyncSubscriberNats: sub,
}
result.processMessage = result.doProcessMessage
go result.run()
return result, nil
}
func (s *asyncSessionSubscriberNats) doProcessMessage(msg *nats.Msg) {
var message AsyncMessage
if err := s.client.Decode(msg, &message); err != nil {
log.Printf("Could not decode nats message %+v, %s", msg, err)
return
}
s.processAsyncSessionMessage(&message)
}
type asyncEventsNats struct {
mu sync.Mutex
client NatsClient
backendRoomSubscriptions map[string]*asyncBackendRoomSubscriberNats
roomSubscriptions map[string]*asyncRoomSubscriberNats
userSubscriptions map[string]*asyncUserSubscriberNats
sessionSubscriptions map[string]*asyncSessionSubscriberNats
}
func NewAsyncEventsNats(client NatsClient) (AsyncEvents, error) {
events := &asyncEventsNats{
client: client,
backendRoomSubscriptions: make(map[string]*asyncBackendRoomSubscriberNats),
roomSubscriptions: make(map[string]*asyncRoomSubscriberNats),
userSubscriptions: make(map[string]*asyncUserSubscriberNats),
sessionSubscriptions: make(map[string]*asyncSessionSubscriberNats),
}
return events, nil
}
func (e *asyncEventsNats) Close() {
e.mu.Lock()
defer e.mu.Unlock()
go func(subscriptions map[string]*asyncBackendRoomSubscriberNats) {
for _, sub := range subscriptions {
sub.close()
}
}(e.backendRoomSubscriptions)
go func(subscriptions map[string]*asyncRoomSubscriberNats) {
for _, sub := range subscriptions {
sub.close()
}
}(e.roomSubscriptions)
go func(subscriptions map[string]*asyncUserSubscriberNats) {
for _, sub := range subscriptions {
sub.close()
}
}(e.userSubscriptions)
go func(subscriptions map[string]*asyncSessionSubscriberNats) {
for _, sub := range subscriptions {
sub.close()
}
}(e.sessionSubscriptions)
e.backendRoomSubscriptions = make(map[string]*asyncBackendRoomSubscriberNats)
e.roomSubscriptions = make(map[string]*asyncRoomSubscriberNats)
e.userSubscriptions = make(map[string]*asyncUserSubscriberNats)
e.sessionSubscriptions = make(map[string]*asyncSessionSubscriberNats)
e.client.Close()
}
func (e *asyncEventsNats) RegisterBackendRoomListener(roomId string, backend *Backend, listener AsyncBackendRoomEventListener) error {
key := GetSubjectForBackendRoomId(roomId, backend)
e.mu.Lock()
defer e.mu.Unlock()
sub, found := e.backendRoomSubscriptions[key]
if !found {
var err error
if sub, err = newAsyncBackendRoomSubscriberNats(key, e.client); err != nil {
return err
}
e.backendRoomSubscriptions[key] = sub
}
sub.addListener(listener)
return nil
}
func (e *asyncEventsNats) UnregisterBackendRoomListener(roomId string, backend *Backend, listener AsyncBackendRoomEventListener) {
key := GetSubjectForBackendRoomId(roomId, backend)
e.mu.Lock()
defer e.mu.Unlock()
sub, found := e.backendRoomSubscriptions[key]
if !found {
return
}
if !sub.removeListener(listener) {
delete(e.backendRoomSubscriptions, key)
sub.close()
}
}
func (e *asyncEventsNats) RegisterRoomListener(roomId string, backend *Backend, listener AsyncRoomEventListener) error {
key := GetSubjectForRoomId(roomId, backend)
e.mu.Lock()
defer e.mu.Unlock()
sub, found := e.roomSubscriptions[key]
if !found {
var err error
if sub, err = newAsyncRoomSubscriberNats(key, e.client); err != nil {
return err
}
e.roomSubscriptions[key] = sub
}
sub.addListener(listener)
return nil
}
func (e *asyncEventsNats) UnregisterRoomListener(roomId string, backend *Backend, listener AsyncRoomEventListener) {
key := GetSubjectForRoomId(roomId, backend)
e.mu.Lock()
defer e.mu.Unlock()
sub, found := e.roomSubscriptions[key]
if !found {
return
}
if !sub.removeListener(listener) {
delete(e.roomSubscriptions, key)
sub.close()
}
}
func (e *asyncEventsNats) RegisterUserListener(roomId string, backend *Backend, listener AsyncUserEventListener) error {
key := GetSubjectForUserId(roomId, backend)
e.mu.Lock()
defer e.mu.Unlock()
sub, found := e.userSubscriptions[key]
if !found {
var err error
if sub, err = newAsyncUserSubscriberNats(key, e.client); err != nil {
return err
}
e.userSubscriptions[key] = sub
}
sub.addListener(listener)
return nil
}
func (e *asyncEventsNats) UnregisterUserListener(roomId string, backend *Backend, listener AsyncUserEventListener) {
key := GetSubjectForUserId(roomId, backend)
e.mu.Lock()
defer e.mu.Unlock()
sub, found := e.userSubscriptions[key]
if !found {
return
}
if !sub.removeListener(listener) {
delete(e.userSubscriptions, key)
sub.close()
}
}
func (e *asyncEventsNats) RegisterSessionListener(sessionId string, backend *Backend, listener AsyncSessionEventListener) error {
key := GetSubjectForSessionId(sessionId, backend)
e.mu.Lock()
defer e.mu.Unlock()
sub, found := e.sessionSubscriptions[key]
if !found {
var err error
if sub, err = newAsyncSessionSubscriberNats(key, e.client); err != nil {
return err
}
e.sessionSubscriptions[key] = sub
}
sub.addListener(listener)
return nil
}
func (e *asyncEventsNats) UnregisterSessionListener(sessionId string, backend *Backend, listener AsyncSessionEventListener) {
key := GetSubjectForSessionId(sessionId, backend)
e.mu.Lock()
defer e.mu.Unlock()
sub, found := e.sessionSubscriptions[key]
if !found {
return
}
if !sub.removeListener(listener) {
delete(e.sessionSubscriptions, key)
sub.close()
}
}
func (e *asyncEventsNats) publish(subject string, message *AsyncMessage) error {
message.SendTime = time.Now()
return e.client.Publish(subject, message)
}
func (e *asyncEventsNats) PublishBackendRoomMessage(roomId string, backend *Backend, message *AsyncMessage) error {
subject := GetSubjectForBackendRoomId(roomId, backend)
return e.publish(subject, message)
}
func (e *asyncEventsNats) PublishRoomMessage(roomId string, backend *Backend, message *AsyncMessage) error {
subject := GetSubjectForRoomId(roomId, backend)
return e.publish(subject, message)
}
func (e *asyncEventsNats) PublishUserMessage(userId string, backend *Backend, message *AsyncMessage) error {
subject := GetSubjectForUserId(userId, backend)
return e.publish(subject, message)
}
func (e *asyncEventsNats) PublishSessionMessage(sessionId string, backend *Backend, message *AsyncMessage) error {
subject := GetSubjectForSessionId(sessionId, backend)
return e.publish(subject, message)
}

73
async_events_test.go Normal file
View file

@ -0,0 +1,73 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2022 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import (
"context"
"strings"
"testing"
)
var (
eventBackendsForTest = []string{
"loopback",
"nats",
}
)
func getAsyncEventsForTest(t *testing.T) AsyncEvents {
var events AsyncEvents
if strings.HasSuffix(t.Name(), "/nats") {
events = getRealAsyncEventsForTest(t)
} else {
events = getLoopbackAsyncEventsForTest(t)
}
t.Cleanup(func() {
events.Close()
})
return events
}
func getRealAsyncEventsForTest(t *testing.T) AsyncEvents {
url := startLocalNatsServer(t)
events, err := NewAsyncEvents(url)
if err != nil {
t.Fatal(err)
}
return events
}
func getLoopbackAsyncEventsForTest(t *testing.T) AsyncEvents {
events, err := NewAsyncEvents(NatsLoopbackUrl)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
nats := (events.(*asyncEventsNats)).client
(nats).(*LoopbackNatsClient).waitForSubscriptionsEmpty(ctx, t)
})
return events
}

View file

@ -53,7 +53,7 @@ const (
type BackendServer struct {
hub *Hub
nats NatsClient
events AsyncEvents
roomSessions RoomSessions
version string
@ -123,7 +123,7 @@ func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*Bac
return &BackendServer{
hub: hub,
nats: hub.nats,
events: hub.events,
roomSessions: hub.roomSessions,
version: version,
@ -279,40 +279,46 @@ func (b *BackendServer) parseRequestBody(f func(http.ResponseWriter, *http.Reque
}
func (b *BackendServer) sendRoomInvite(roomid string, backend *Backend, userids []string, properties *json.RawMessage) {
msg := &ServerMessage{
Type: "event",
Event: &EventServerMessage{
Target: "roomlist",
Type: "invite",
Invite: &RoomEventServerMessage{
RoomId: roomid,
Properties: properties,
msg := &AsyncMessage{
Type: "message",
Message: &ServerMessage{
Type: "event",
Event: &EventServerMessage{
Target: "roomlist",
Type: "invite",
Invite: &RoomEventServerMessage{
RoomId: roomid,
Properties: properties,
},
},
},
}
for _, userid := range userids {
if err := b.nats.PublishMessage(GetSubjectForUserId(userid, backend), msg); err != nil {
if err := b.events.PublishUserMessage(userid, backend, msg); err != nil {
log.Printf("Could not publish room invite for user %s in backend %s: %s", userid, backend.Id(), err)
}
}
}
func (b *BackendServer) sendRoomDisinvite(roomid string, backend *Backend, reason string, userids []string, sessionids []string) {
msg := &ServerMessage{
Type: "event",
Event: &EventServerMessage{
Target: "roomlist",
Type: "disinvite",
Disinvite: &RoomDisinviteEventServerMessage{
RoomEventServerMessage: RoomEventServerMessage{
RoomId: roomid,
msg := &AsyncMessage{
Type: "message",
Message: &ServerMessage{
Type: "event",
Event: &EventServerMessage{
Target: "roomlist",
Type: "disinvite",
Disinvite: &RoomDisinviteEventServerMessage{
RoomEventServerMessage: RoomEventServerMessage{
RoomId: roomid,
},
Reason: reason,
},
Reason: reason,
},
},
}
for _, userid := range userids {
if err := b.nats.PublishMessage(GetSubjectForUserId(userid, backend), msg); err != nil {
if err := b.events.PublishUserMessage(userid, backend, msg); err != nil {
log.Printf("Could not publish room disinvite for user %s in backend %s: %s", userid, backend.Id(), err)
}
}
@ -331,7 +337,7 @@ func (b *BackendServer) sendRoomDisinvite(roomid string, backend *Backend, reaso
if sid, err := b.lookupByRoomSessionId(sessionid, nil, timeout); err != nil {
log.Printf("Could not lookup by room session %s: %s", sessionid, err)
} else if sid != "" {
if err := b.nats.PublishMessage("session."+sid, msg); err != nil {
if err := b.events.PublishSessionMessage(sid, backend, msg); err != nil {
log.Printf("Could not publish room disinvite for session %s: %s", sid, err)
}
}
@ -341,14 +347,17 @@ func (b *BackendServer) sendRoomDisinvite(roomid string, backend *Backend, reaso
}
func (b *BackendServer) sendRoomUpdate(roomid string, backend *Backend, notified_userids []string, all_userids []string, properties *json.RawMessage) {
msg := &ServerMessage{
Type: "event",
Event: &EventServerMessage{
Target: "roomlist",
Type: "update",
Update: &RoomEventServerMessage{
RoomId: roomid,
Properties: properties,
msg := &AsyncMessage{
Type: "message",
Message: &ServerMessage{
Type: "event",
Event: &EventServerMessage{
Target: "roomlist",
Type: "update",
Update: &RoomEventServerMessage{
RoomId: roomid,
Properties: properties,
},
},
},
}
@ -362,7 +371,7 @@ func (b *BackendServer) sendRoomUpdate(roomid string, backend *Backend, notified
continue
}
if err := b.nats.PublishMessage(GetSubjectForUserId(userid, backend), msg); err != nil {
if err := b.events.PublishUserMessage(userid, backend, msg); err != nil {
log.Printf("Could not publish room update for user %s in backend %s: %s", userid, backend.Id(), err)
}
}
@ -458,7 +467,11 @@ func (b *BackendServer) sendRoomIncall(roomid string, backend *Backend, request
}
}
return b.nats.PublishBackendServerRoomRequest(GetSubjectForBackendRoomId(roomid, backend), request)
message := &AsyncMessage{
Type: "room",
Room: request,
}
return b.events.PublishBackendRoomMessage(roomid, backend, message)
}
func (b *BackendServer) sendRoomParticipantsUpdate(roomid string, backend *Backend, request *BackendServerRoomRequest) error {
@ -500,22 +513,30 @@ loop:
go func(sessionId string, permissions []Permission) {
defer wg.Done()
message := &NatsMessage{
message := &AsyncMessage{
Type: "permissions",
Permissions: permissions,
}
if err := b.nats.Publish("session."+sessionId, message); err != nil {
if err := b.events.PublishSessionMessage(sessionId, backend, message); err != nil {
log.Printf("Could not send permissions update (%+v) to session %s: %s", permissions, sessionId, err)
}
}(sessionId, permissions)
}
wg.Wait()
return b.nats.PublishBackendServerRoomRequest(GetSubjectForBackendRoomId(roomid, backend), request)
message := &AsyncMessage{
Type: "room",
Room: request,
}
return b.events.PublishBackendRoomMessage(roomid, backend, message)
}
func (b *BackendServer) sendRoomMessage(roomid string, backend *Backend, request *BackendServerRoomRequest) error {
return b.nats.PublishBackendServerRoomRequest(GetSubjectForBackendRoomId(roomid, backend), request)
message := &AsyncMessage{
Type: "room",
Room: request,
}
return b.events.PublishBackendRoomMessage(roomid, backend, message)
}
func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body []byte) {
@ -580,10 +601,18 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body
b.sendRoomDisinvite(roomid, backend, DisinviteReasonDisinvited, request.Disinvite.UserIds, request.Disinvite.SessionIds)
b.sendRoomUpdate(roomid, backend, request.Disinvite.UserIds, request.Disinvite.AllUserIds, request.Disinvite.Properties)
case "update":
err = b.nats.PublishBackendServerRoomRequest(GetSubjectForBackendRoomId(roomid, backend), &request)
message := &AsyncMessage{
Type: "room",
Room: &request,
}
err = b.events.PublishBackendRoomMessage(roomid, backend, message)
b.sendRoomUpdate(roomid, backend, nil, request.Update.UserIds, request.Update.Properties)
case "delete":
err = b.nats.PublishBackendServerRoomRequest(GetSubjectForBackendRoomId(roomid, backend), &request)
message := &AsyncMessage{
Type: "room",
Room: &request,
}
err = b.events.PublishBackendRoomMessage(roomid, backend, message)
b.sendRoomDisinvite(roomid, backend, DisinviteReasonDeleted, request.Delete.UserIds, nil)
case "incall":
err = b.sendRoomIncall(roomid, backend, &request)

View file

@ -42,7 +42,6 @@ import (
"github.com/dlintw/goconf"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/nats-io/nats.go"
)
var (
@ -52,11 +51,11 @@ var (
turnServers = strings.Split(turnServersString, ",")
)
func CreateBackendServerForTest(t *testing.T) (*goconf.ConfigFile, *BackendServer, NatsClient, *Hub, *mux.Router, *httptest.Server) {
func CreateBackendServerForTest(t *testing.T) (*goconf.ConfigFile, *BackendServer, AsyncEvents, *Hub, *mux.Router, *httptest.Server) {
return CreateBackendServerForTestFromConfig(t, nil)
}
func CreateBackendServerForTestWithTurn(t *testing.T) (*goconf.ConfigFile, *BackendServer, NatsClient, *Hub, *mux.Router, *httptest.Server) {
func CreateBackendServerForTestWithTurn(t *testing.T) (*goconf.ConfigFile, *BackendServer, AsyncEvents, *Hub, *mux.Router, *httptest.Server) {
config := goconf.NewConfigFile()
config.AddOption("turn", "apikey", turnApiKey)
config.AddOption("turn", "secret", turnSecret)
@ -64,11 +63,14 @@ func CreateBackendServerForTestWithTurn(t *testing.T) (*goconf.ConfigFile, *Back
return CreateBackendServerForTestFromConfig(t, config)
}
func CreateBackendServerForTestFromConfig(t *testing.T, config *goconf.ConfigFile) (*goconf.ConfigFile, *BackendServer, NatsClient, *Hub, *mux.Router, *httptest.Server) {
func CreateBackendServerForTestFromConfig(t *testing.T, config *goconf.ConfigFile) (*goconf.ConfigFile, *BackendServer, AsyncEvents, *Hub, *mux.Router, *httptest.Server) {
r := mux.NewRouter()
registerBackendHandler(t, r)
server := httptest.NewServer(r)
t.Cleanup(func() {
server.Close()
})
if config == nil {
config = goconf.NewConfigFile()
}
@ -85,11 +87,8 @@ func CreateBackendServerForTestFromConfig(t *testing.T, config *goconf.ConfigFil
config.AddOption("sessions", "blockkey", "09876543210987654321098765432109")
config.AddOption("clients", "internalsecret", string(testInternalSecret))
config.AddOption("geoip", "url", "none")
nats, err := NewLoopbackNatsClient()
if err != nil {
t.Fatal(err)
}
hub, err := NewHub(config, nats, r, "no-version")
events := getAsyncEventsForTest(t)
hub, err := NewHub(config, events, r, "no-version")
if err != nil {
t.Fatal(err)
}
@ -108,12 +107,9 @@ func CreateBackendServerForTestFromConfig(t *testing.T, config *goconf.ConfigFil
defer cancel()
WaitForHub(ctx, t, hub)
(nats).(*LoopbackNatsClient).waitForSubscriptionsEmpty(ctx, t)
nats.Close()
server.Close()
})
return config, b, nats, hub, r, server
return config, b, events, hub, r, server
}
func performBackendRequest(url string, body []byte) (*http.Response, error) {
@ -131,23 +127,16 @@ func performBackendRequest(url string, body []byte) (*http.Response, error) {
return client.Do(request)
}
func expectRoomlistEvent(n NatsClient, ch chan *nats.Msg, subject string, msgType string) (*EventServerMessage, error) {
func expectRoomlistEvent(ch chan *AsyncMessage, msgType string) (*EventServerMessage, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
select {
case message := <-ch:
if message.Subject != subject {
return nil, fmt.Errorf("Expected subject %s, got %s", subject, message.Subject)
}
var natsMsg NatsMessage
if err := n.Decode(message, &natsMsg); err != nil {
return nil, err
}
if natsMsg.Type != "message" || natsMsg.Message == nil {
return nil, fmt.Errorf("Expected message type message, got %+v", natsMsg)
if message.Type != "message" || message.Message == nil {
return nil, fmt.Errorf("Expected message type message, got %+v", message)
}
msg := natsMsg.Message
msg := message.Message
if msg.Type != "event" || msg.Event == nil {
return nil, fmt.Errorf("Expected message type event, got %+v", msg)
}
@ -309,7 +298,23 @@ func TestBackendServer_UnsupportedRequest(t *testing.T) {
}
func TestBackendServer_RoomInvite(t *testing.T) {
_, _, n, hub, _, server := CreateBackendServerForTest(t)
for _, backend := range eventBackendsForTest {
t.Run(backend, func(t *testing.T) {
RunTestBackendServer_RoomInvite(t)
})
}
}
type channelEventListener struct {
ch chan *AsyncMessage
}
func (l *channelEventListener) ProcessAsyncUserMessage(message *AsyncMessage) {
l.ch <- message
}
func RunTestBackendServer_RoomInvite(t *testing.T) {
_, _, events, hub, _, server := CreateBackendServerForTest(t)
u, err := url.Parse(server.URL)
if err != nil {
@ -320,17 +325,14 @@ func TestBackendServer_RoomInvite(t *testing.T) {
roomProperties := json.RawMessage("{\"foo\":\"bar\"}")
backend := hub.backend.GetBackend(u)
natsChan := make(chan *nats.Msg, 1)
subject := GetSubjectForUserId(userid, backend)
sub, err := n.Subscribe(subject, natsChan)
if err != nil {
eventsChan := make(chan *AsyncMessage, 1)
listener := &channelEventListener{
ch: eventsChan,
}
if err := events.RegisterUserListener(userid, backend, listener); err != nil {
t.Fatal(err)
}
defer func() {
if err := sub.Unsubscribe(); err != nil {
t.Error(err)
}
}()
defer events.UnregisterUserListener(userid, backend, listener)
msg := &BackendServerRoomRequest{
Type: "invite",
@ -363,7 +365,7 @@ func TestBackendServer_RoomInvite(t *testing.T) {
t.Errorf("Expected successful request, got %s: %s", res.Status, string(body))
}
event, err := expectRoomlistEvent(n, natsChan, subject, "invite")
event, err := expectRoomlistEvent(eventsChan, "invite")
if err != nil {
t.Error(err)
} else if event.Invite == nil {
@ -376,7 +378,15 @@ func TestBackendServer_RoomInvite(t *testing.T) {
}
func TestBackendServer_RoomDisinvite(t *testing.T) {
_, _, n, hub, _, server := CreateBackendServerForTest(t)
for _, backend := range eventBackendsForTest {
t.Run(backend, func(t *testing.T) {
RunTestBackendServer_RoomDisinvite(t)
})
}
}
func RunTestBackendServer_RoomDisinvite(t *testing.T) {
_, _, events, hub, _, server := CreateBackendServerForTest(t)
u, err := url.Parse(server.URL)
if err != nil {
@ -414,17 +424,14 @@ func TestBackendServer_RoomDisinvite(t *testing.T) {
roomProperties := json.RawMessage("{\"foo\":\"bar\"}")
natsChan := make(chan *nats.Msg, 1)
subject := GetSubjectForUserId(testDefaultUserId, backend)
sub, err := n.Subscribe(subject, natsChan)
if err != nil {
eventsChan := make(chan *AsyncMessage, 1)
listener := &channelEventListener{
ch: eventsChan,
}
if err := events.RegisterUserListener(testDefaultUserId, backend, listener); err != nil {
t.Fatal(err)
}
defer func() {
if err := sub.Unsubscribe(); err != nil {
t.Error(err)
}
}()
defer events.UnregisterUserListener(testDefaultUserId, backend, listener)
msg := &BackendServerRoomRequest{
Type: "disinvite",
@ -457,7 +464,7 @@ func TestBackendServer_RoomDisinvite(t *testing.T) {
t.Errorf("Expected successful request, got %s: %s", res.Status, string(body))
}
event, err := expectRoomlistEvent(n, natsChan, subject, "disinvite")
event, err := expectRoomlistEvent(eventsChan, "disinvite")
if err != nil {
t.Error(err)
} else if event.Disinvite == nil {
@ -606,11 +613,18 @@ func TestBackendServer_RoomDisinviteDifferentRooms(t *testing.T) {
} else if message.RoomId != roomId2 {
t.Errorf("Expected message for room %s, got %s", roomId2, message.RoomId)
}
}
func TestBackendServer_RoomUpdate(t *testing.T) {
_, _, n, hub, _, server := CreateBackendServerForTest(t)
for _, backend := range eventBackendsForTest {
t.Run(backend, func(t *testing.T) {
RunTestBackendServer_RoomUpdate(t)
})
}
}
func RunTestBackendServer_RoomUpdate(t *testing.T) {
_, _, events, hub, _, server := CreateBackendServerForTest(t)
u, err := url.Parse(server.URL)
if err != nil {
@ -632,17 +646,14 @@ func TestBackendServer_RoomUpdate(t *testing.T) {
userid := "test-userid"
roomProperties := json.RawMessage("{\"foo\":\"bar\"}")
natsChan := make(chan *nats.Msg, 1)
subject := GetSubjectForUserId(userid, backend)
sub, err := n.Subscribe(subject, natsChan)
if err != nil {
eventsChan := make(chan *AsyncMessage, 1)
listener := &channelEventListener{
ch: eventsChan,
}
if err := events.RegisterUserListener(userid, backend, listener); err != nil {
t.Fatal(err)
}
defer func() {
if err := sub.Unsubscribe(); err != nil {
t.Error(err)
}
}()
defer events.UnregisterUserListener(userid, backend, listener)
msg := &BackendServerRoomRequest{
Type: "update",
@ -671,7 +682,7 @@ func TestBackendServer_RoomUpdate(t *testing.T) {
t.Errorf("Expected successful request, got %s: %s", res.Status, string(body))
}
event, err := expectRoomlistEvent(n, natsChan, subject, "update")
event, err := expectRoomlistEvent(eventsChan, "update")
if err != nil {
t.Error(err)
} else if event.Update == nil {
@ -682,7 +693,7 @@ func TestBackendServer_RoomUpdate(t *testing.T) {
t.Errorf("Room properties don't match: expected %s, got %s", string(roomProperties), string(*event.Update.Properties))
}
// TODO: Use event to wait for NATS messages.
// TODO: Use event to wait for asynchronous messages.
time.Sleep(10 * time.Millisecond)
room = hub.getRoom(roomId)
@ -695,7 +706,15 @@ func TestBackendServer_RoomUpdate(t *testing.T) {
}
func TestBackendServer_RoomDelete(t *testing.T) {
_, _, n, hub, _, server := CreateBackendServerForTest(t)
for _, backend := range eventBackendsForTest {
t.Run(backend, func(t *testing.T) {
RunTestBackendServer_RoomDelete(t)
})
}
}
func RunTestBackendServer_RoomDelete(t *testing.T) {
_, _, events, hub, _, server := CreateBackendServerForTest(t)
u, err := url.Parse(server.URL)
if err != nil {
@ -713,18 +732,14 @@ func TestBackendServer_RoomDelete(t *testing.T) {
}
userid := "test-userid"
natsChan := make(chan *nats.Msg, 1)
subject := GetSubjectForUserId(userid, backend)
sub, err := n.Subscribe(subject, natsChan)
if err != nil {
eventsChan := make(chan *AsyncMessage, 1)
listener := &channelEventListener{
ch: eventsChan,
}
if err := events.RegisterUserListener(userid, backend, listener); err != nil {
t.Fatal(err)
}
defer func() {
if err := sub.Unsubscribe(); err != nil {
t.Error(err)
}
}()
defer events.UnregisterUserListener(userid, backend, listener)
msg := &BackendServerRoomRequest{
Type: "delete",
@ -753,7 +768,7 @@ func TestBackendServer_RoomDelete(t *testing.T) {
}
// A deleted room is signalled as a "disinvite" event.
event, err := expectRoomlistEvent(n, natsChan, subject, "disinvite")
event, err := expectRoomlistEvent(eventsChan, "disinvite")
if err != nil {
t.Error(err)
} else if event.Disinvite == nil {
@ -766,7 +781,7 @@ func TestBackendServer_RoomDelete(t *testing.T) {
t.Errorf("Reason should be deleted, got %s", event.Disinvite.Reason)
}
// TODO: Use event to wait for NATS messages.
// TODO: Use event to wait for asynchronous messages.
time.Sleep(10 * time.Millisecond)
room := hub.getRoom(roomId)
@ -880,7 +895,7 @@ func TestBackendServer_ParticipantsUpdatePermissions(t *testing.T) {
t.Errorf("Expected successful request, got %s: %s", res.Status, string(body))
}
// TODO: Use event to wait for NATS messages.
// TODO: Use event to wait for asynchronous messages.
time.Sleep(10 * time.Millisecond)
assertSessionHasPermission(t, session1, PERMISSION_MAY_PUBLISH_MEDIA)
@ -967,7 +982,7 @@ func TestBackendServer_ParticipantsUpdateEmptyPermissions(t *testing.T) {
t.Errorf("Expected successful request, got %s: %s", res.Status, string(body))
}
// TODO: Use event to wait for NATS messages.
// TODO: Use event to wait for asynchronous messages.
time.Sleep(10 * time.Millisecond)
assertSessionHasNotPermission(t, session, PERMISSION_MAY_PUBLISH_MEDIA)

View file

@ -33,7 +33,6 @@ import (
"time"
"unsafe"
"github.com/nats-io/nats.go"
"github.com/pion/sdp"
)
@ -50,8 +49,8 @@ var (
type ClientSession struct {
roomJoinTime int64
running int32
hub *Hub
events AsyncEvents
privateId string
publicId string
data *SessionIdData
@ -68,10 +67,7 @@ type ClientSession struct {
backendUrl string
parsedBackendUrl *url.URL
natsReceiver chan *nats.Msg
stopRun chan bool
runStopped chan bool
expires time.Time
expires time.Time
mu sync.Mutex
@ -79,10 +75,6 @@ type ClientSession struct {
room unsafe.Pointer
roomSessionId string
userSubscription NatsSubscription
sessionSubscription NatsSubscription
roomSubscription NatsSubscription
publishers map[string]McuPublisher
subscribers map[string]McuSubscriber
@ -96,6 +88,7 @@ type ClientSession struct {
func NewClientSession(hub *Hub, privateId string, publicId string, data *SessionIdData, backend *Backend, hello *HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) {
s := &ClientSession{
hub: hub,
events: hub.events,
privateId: privateId,
publicId: publicId,
data: data,
@ -106,10 +99,6 @@ func NewClientSession(hub *Hub, privateId string, publicId string, data *Session
userData: auth.User,
backend: backend,
natsReceiver: make(chan *nats.Msg, 64),
stopRun: make(chan bool, 1),