From 0115c979460c0e4da5c98bb8056402215a271614 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 14 Jun 2022 17:01:57 +0200 Subject: [PATCH] Refactor asynchronous events to central location. --- Makefile | 4 +- api_async.go | 38 ++++ async_events.go | 210 +++++++++++++++++++ async_events_nats.go | 445 +++++++++++++++++++++++++++++++++++++++++ async_events_test.go | 73 +++++++ backend_server.go | 105 ++++++---- backend_server_test.go | 163 ++++++++------- clientsession.go | 131 ++++-------- hub.go | 83 ++++++-- hub_test.go | 39 ++-- natsclient.go | 48 +---- natsclient_loopback.go | 23 --- room.go | 94 ++------- room_test.go | 8 +- server.conf.in | 2 +- server/main.go | 7 +- 16 files changed, 1085 insertions(+), 388 deletions(-) create mode 100644 api_async.go create mode 100644 async_events.go create mode 100644 async_events_nats.go create mode 100644 async_events_test.go diff --git a/Makefile b/Makefile index fb23cd9..e1ca9ea 100644 --- a/Makefile +++ b/Makefile @@ -95,10 +95,10 @@ coverhtml: vet common PATH="$(GODIR)":$(PATH) "$(GOPATHBIN)/easyjson" -all $*.go common: \ + api_async_easyjson.go \ api_backend_easyjson.go \ api_proxy_easyjson.go \ - api_signaling_easyjson.go \ - natsclient_easyjson.go + api_signaling_easyjson.go $(BINDIR): mkdir -p $(BINDIR) diff --git a/api_async.go b/api_async.go new file mode 100644 index 0000000..2b5e954 --- /dev/null +++ b/api_async.go @@ -0,0 +1,38 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2022 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import "time" + +type AsyncMessage struct { + SendTime time.Time `json:"sendtime"` + + Type string `json:"type"` + + Message *ServerMessage `json:"message,omitempty"` + + Room *BackendServerRoomRequest `json:"room,omitempty"` + + Permissions []Permission `json:"permissions,omitempty"` + + Id string `json:"id"` +} diff --git a/async_events.go b/async_events.go new file mode 100644 index 0000000..5f7938b --- /dev/null +++ b/async_events.go @@ -0,0 +1,210 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2022 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import "sync" + +type AsyncBackendRoomEventListener interface { + ProcessBackendRoomRequest(request *BackendServerRoomRequest) +} + +type AsyncRoomEventListener interface { + ProcessAsyncRoomMessage(message *AsyncMessage) +} + +type AsyncUserEventListener interface { + ProcessAsyncUserMessage(message *AsyncMessage) +} + +type AsyncSessionEventListener interface { + ProcessAsyncSessionMessage(message *AsyncMessage) +} + +type AsyncEvents interface { + Close() + + RegisterBackendRoomListener(roomId string, backend *Backend, listener AsyncBackendRoomEventListener) error + UnregisterBackendRoomListener(roomId string, backend *Backend, listener AsyncBackendRoomEventListener) + + RegisterRoomListener(roomId string, backend *Backend, listener AsyncRoomEventListener) error + UnregisterRoomListener(roomId string, backend *Backend, listener AsyncRoomEventListener) + + RegisterUserListener(userId string, backend *Backend, listener AsyncUserEventListener) error + UnregisterUserListener(userId string, backend *Backend, listener AsyncUserEventListener) + + RegisterSessionListener(sessionId string, backend *Backend, listener AsyncSessionEventListener) error + UnregisterSessionListener(sessionId string, backend *Backend, listener AsyncSessionEventListener) + + PublishBackendRoomMessage(roomId string, backend *Backend, message *AsyncMessage) error + PublishRoomMessage(roomId string, backend *Backend, message *AsyncMessage) error + PublishUserMessage(userId string, backend *Backend, message *AsyncMessage) error + PublishSessionMessage(sessionId string, backend *Backend, message *AsyncMessage) error +} + +func NewAsyncEvents(url string) (AsyncEvents, error) { + client, err := NewNatsClient(url) + if err != nil { + return nil, err + } + + return NewAsyncEventsNats(client) +} + +type asyncBackendRoomSubscriber struct { + mu sync.Mutex + + listeners map[AsyncBackendRoomEventListener]bool +} + +func (s *asyncBackendRoomSubscriber) processBackendRoomRequest(message *BackendServerRoomRequest) { + s.mu.Lock() + defer s.mu.Unlock() + + for listener := range s.listeners { + s.mu.Unlock() + listener.ProcessBackendRoomRequest(message) + s.mu.Lock() + } +} + +func (s *asyncBackendRoomSubscriber) addListener(listener AsyncBackendRoomEventListener) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.listeners == nil { + s.listeners = make(map[AsyncBackendRoomEventListener]bool) + } + s.listeners[listener] = true +} + +func (s *asyncBackendRoomSubscriber) removeListener(listener AsyncBackendRoomEventListener) bool { + s.mu.Lock() + defer s.mu.Unlock() + + delete(s.listeners, listener) + return len(s.listeners) > 0 +} + +type asyncRoomSubscriber struct { + mu sync.Mutex + + listeners map[AsyncRoomEventListener]bool +} + +func (s *asyncRoomSubscriber) processAsyncRoomMessage(message *AsyncMessage) { + s.mu.Lock() + defer s.mu.Unlock() + + for listener := range s.listeners { + s.mu.Unlock() + listener.ProcessAsyncRoomMessage(message) + s.mu.Lock() + } +} + +func (s *asyncRoomSubscriber) addListener(listener AsyncRoomEventListener) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.listeners == nil { + s.listeners = make(map[AsyncRoomEventListener]bool) + } + s.listeners[listener] = true +} + +func (s *asyncRoomSubscriber) removeListener(listener AsyncRoomEventListener) bool { + s.mu.Lock() + defer s.mu.Unlock() + + delete(s.listeners, listener) + return len(s.listeners) > 0 +} + +type asyncUserSubscriber struct { + mu sync.Mutex + + listeners map[AsyncUserEventListener]bool +} + +func (s *asyncUserSubscriber) processAsyncUserMessage(message *AsyncMessage) { + s.mu.Lock() + defer s.mu.Unlock() + + for listener := range s.listeners { + s.mu.Unlock() + listener.ProcessAsyncUserMessage(message) + s.mu.Lock() + } +} + +func (s *asyncUserSubscriber) addListener(listener AsyncUserEventListener) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.listeners == nil { + s.listeners = make(map[AsyncUserEventListener]bool) + } + s.listeners[listener] = true +} + +func (s *asyncUserSubscriber) removeListener(listener AsyncUserEventListener) bool { + s.mu.Lock() + defer s.mu.Unlock() + + delete(s.listeners, listener) + return len(s.listeners) > 0 +} + +type asyncSessionSubscriber struct { + mu sync.Mutex + + listeners map[AsyncSessionEventListener]bool +} + +func (s *asyncSessionSubscriber) processAsyncSessionMessage(message *AsyncMessage) { + s.mu.Lock() + defer s.mu.Unlock() + + for listener := range s.listeners { + s.mu.Unlock() + listener.ProcessAsyncSessionMessage(message) + s.mu.Lock() + } +} + +func (s *asyncSessionSubscriber) addListener(listener AsyncSessionEventListener) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.listeners == nil { + s.listeners = make(map[AsyncSessionEventListener]bool) + } + s.listeners[listener] = true +} + +func (s *asyncSessionSubscriber) removeListener(listener AsyncSessionEventListener) bool { + s.mu.Lock() + defer s.mu.Unlock() + + delete(s.listeners, listener) + return len(s.listeners) > 0 +} diff --git a/async_events_nats.go b/async_events_nats.go new file mode 100644 index 0000000..00af5ed --- /dev/null +++ b/async_events_nats.go @@ -0,0 +1,445 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2022 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "log" + "sync" + "time" + + "github.com/nats-io/nats.go" +) + +func GetSubjectForBackendRoomId(roomId string, backend *Backend) string { + if backend == nil || backend.IsCompat() { + return GetEncodedSubject("backend.room", roomId) + } + + return GetEncodedSubject("backend.room", roomId+"|"+backend.Id()) +} + +func GetSubjectForRoomId(roomId string, backend *Backend) string { + if backend == nil || backend.IsCompat() { + return GetEncodedSubject("room", roomId) + } + + return GetEncodedSubject("room", roomId+"|"+backend.Id()) +} + +func GetSubjectForUserId(userId string, backend *Backend) string { + if backend == nil || backend.IsCompat() { + return GetEncodedSubject("user", userId) + } + + return GetEncodedSubject("user", userId+"|"+backend.Id()) +} + +func GetSubjectForSessionId(sessionId string, backend *Backend) string { + return "session." + sessionId +} + +type asyncSubscriberNats struct { + key string + client NatsClient + + receiver chan *nats.Msg + closeChan chan bool + subscription NatsSubscription + + processMessage func(*nats.Msg) +} + +func newAsyncSubscriberNats(key string, client NatsClient) (*asyncSubscriberNats, error) { + receiver := make(chan *nats.Msg, 64) + sub, err := client.Subscribe(key, receiver) + if err != nil { + return nil, err + } + + result := &asyncSubscriberNats{ + key: key, + client: client, + + receiver: receiver, + closeChan: make(chan bool), + subscription: sub, + } + return result, nil +} + +func (s *asyncSubscriberNats) run() { + defer func() { + if err := s.subscription.Unsubscribe(); err != nil { + log.Printf("Error unsubscribing %s: %s", s.key, err) + } + }() + + for { + select { + case msg := <-s.receiver: + s.processMessage(msg) + for count := len(s.receiver); count > 0; count-- { + s.processMessage(<-s.receiver) + } + case <-s.closeChan: + return + } + } +} + +func (s *asyncSubscriberNats) close() { + close(s.closeChan) +} + +type asyncBackendRoomSubscriberNats struct { + *asyncSubscriberNats + asyncBackendRoomSubscriber +} + +func newAsyncBackendRoomSubscriberNats(key string, client NatsClient) (*asyncBackendRoomSubscriberNats, error) { + sub, err := newAsyncSubscriberNats(key, client) + if err != nil { + return nil, err + } + + result := &asyncBackendRoomSubscriberNats{ + asyncSubscriberNats: sub, + } + result.processMessage = result.doProcessMessage + go result.run() + return result, nil +} + +func (s *asyncBackendRoomSubscriberNats) doProcessMessage(msg *nats.Msg) { + var message AsyncMessage + if err := s.client.Decode(msg, &message); err != nil { + log.Printf("Could not decode NATS message %+v, %s", msg, err) + return + } + + switch message.Type { + case "room": + s.processBackendRoomRequest(message.Room) + default: + log.Printf("Unsupported NATS room request with type %s: %+v", message.Type, message) + } +} + +type asyncRoomSubscriberNats struct { + asyncRoomSubscriber + *asyncSubscriberNats +} + +func newAsyncRoomSubscriberNats(key string, client NatsClient) (*asyncRoomSubscriberNats, error) { + sub, err := newAsyncSubscriberNats(key, client) + if err != nil { + return nil, err + } + + result := &asyncRoomSubscriberNats{ + asyncSubscriberNats: sub, + } + result.processMessage = result.doProcessMessage + go result.run() + return result, nil +} + +func (s *asyncRoomSubscriberNats) doProcessMessage(msg *nats.Msg) { + var message AsyncMessage + if err := s.client.Decode(msg, &message); err != nil { + log.Printf("Could not decode nats message %+v, %s", msg, err) + return + } + + s.processAsyncRoomMessage(&message) +} + +type asyncUserSubscriberNats struct { + *asyncSubscriberNats + asyncUserSubscriber +} + +func newAsyncUserSubscriberNats(key string, client NatsClient) (*asyncUserSubscriberNats, error) { + sub, err := newAsyncSubscriberNats(key, client) + if err != nil { + return nil, err + } + + result := &asyncUserSubscriberNats{ + asyncSubscriberNats: sub, + } + result.processMessage = result.doProcessMessage + go result.run() + return result, nil +} + +func (s *asyncUserSubscriberNats) doProcessMessage(msg *nats.Msg) { + var message AsyncMessage + if err := s.client.Decode(msg, &message); err != nil { + log.Printf("Could not decode nats message %+v, %s", msg, err) + return + } + + s.processAsyncUserMessage(&message) +} + +type asyncSessionSubscriberNats struct { + *asyncSubscriberNats + asyncSessionSubscriber +} + +func newAsyncSessionSubscriberNats(key string, client NatsClient) (*asyncSessionSubscriberNats, error) { + sub, err := newAsyncSubscriberNats(key, client) + if err != nil { + return nil, err + } + + result := &asyncSessionSubscriberNats{ + asyncSubscriberNats: sub, + } + result.processMessage = result.doProcessMessage + go result.run() + return result, nil +} + +func (s *asyncSessionSubscriberNats) doProcessMessage(msg *nats.Msg) { + var message AsyncMessage + if err := s.client.Decode(msg, &message); err != nil { + log.Printf("Could not decode nats message %+v, %s", msg, err) + return + } + + s.processAsyncSessionMessage(&message) +} + +type asyncEventsNats struct { + mu sync.Mutex + client NatsClient + + backendRoomSubscriptions map[string]*asyncBackendRoomSubscriberNats + roomSubscriptions map[string]*asyncRoomSubscriberNats + userSubscriptions map[string]*asyncUserSubscriberNats + sessionSubscriptions map[string]*asyncSessionSubscriberNats +} + +func NewAsyncEventsNats(client NatsClient) (AsyncEvents, error) { + events := &asyncEventsNats{ + client: client, + + backendRoomSubscriptions: make(map[string]*asyncBackendRoomSubscriberNats), + roomSubscriptions: make(map[string]*asyncRoomSubscriberNats), + userSubscriptions: make(map[string]*asyncUserSubscriberNats), + sessionSubscriptions: make(map[string]*asyncSessionSubscriberNats), + } + return events, nil +} + +func (e *asyncEventsNats) Close() { + e.mu.Lock() + defer e.mu.Unlock() + go func(subscriptions map[string]*asyncBackendRoomSubscriberNats) { + for _, sub := range subscriptions { + sub.close() + } + }(e.backendRoomSubscriptions) + go func(subscriptions map[string]*asyncRoomSubscriberNats) { + for _, sub := range subscriptions { + sub.close() + } + }(e.roomSubscriptions) + go func(subscriptions map[string]*asyncUserSubscriberNats) { + for _, sub := range subscriptions { + sub.close() + } + }(e.userSubscriptions) + go func(subscriptions map[string]*asyncSessionSubscriberNats) { + for _, sub := range subscriptions { + sub.close() + } + }(e.sessionSubscriptions) + e.backendRoomSubscriptions = make(map[string]*asyncBackendRoomSubscriberNats) + e.roomSubscriptions = make(map[string]*asyncRoomSubscriberNats) + e.userSubscriptions = make(map[string]*asyncUserSubscriberNats) + e.sessionSubscriptions = make(map[string]*asyncSessionSubscriberNats) + e.client.Close() +} + +func (e *asyncEventsNats) RegisterBackendRoomListener(roomId string, backend *Backend, listener AsyncBackendRoomEventListener) error { + key := GetSubjectForBackendRoomId(roomId, backend) + + e.mu.Lock() + defer e.mu.Unlock() + sub, found := e.backendRoomSubscriptions[key] + if !found { + var err error + if sub, err = newAsyncBackendRoomSubscriberNats(key, e.client); err != nil { + return err + } + + e.backendRoomSubscriptions[key] = sub + } + sub.addListener(listener) + return nil +} + +func (e *asyncEventsNats) UnregisterBackendRoomListener(roomId string, backend *Backend, listener AsyncBackendRoomEventListener) { + key := GetSubjectForBackendRoomId(roomId, backend) + + e.mu.Lock() + defer e.mu.Unlock() + sub, found := e.backendRoomSubscriptions[key] + if !found { + return + } + + if !sub.removeListener(listener) { + delete(e.backendRoomSubscriptions, key) + sub.close() + } +} + +func (e *asyncEventsNats) RegisterRoomListener(roomId string, backend *Backend, listener AsyncRoomEventListener) error { + key := GetSubjectForRoomId(roomId, backend) + + e.mu.Lock() + defer e.mu.Unlock() + sub, found := e.roomSubscriptions[key] + if !found { + var err error + if sub, err = newAsyncRoomSubscriberNats(key, e.client); err != nil { + return err + } + + e.roomSubscriptions[key] = sub + } + sub.addListener(listener) + return nil +} + +func (e *asyncEventsNats) UnregisterRoomListener(roomId string, backend *Backend, listener AsyncRoomEventListener) { + key := GetSubjectForRoomId(roomId, backend) + + e.mu.Lock() + defer e.mu.Unlock() + sub, found := e.roomSubscriptions[key] + if !found { + return + } + + if !sub.removeListener(listener) { + delete(e.roomSubscriptions, key) + sub.close() + } +} + +func (e *asyncEventsNats) RegisterUserListener(roomId string, backend *Backend, listener AsyncUserEventListener) error { + key := GetSubjectForUserId(roomId, backend) + + e.mu.Lock() + defer e.mu.Unlock() + sub, found := e.userSubscriptions[key] + if !found { + var err error + if sub, err = newAsyncUserSubscriberNats(key, e.client); err != nil { + return err + } + + e.userSubscriptions[key] = sub + } + sub.addListener(listener) + return nil +} + +func (e *asyncEventsNats) UnregisterUserListener(roomId string, backend *Backend, listener AsyncUserEventListener) { + key := GetSubjectForUserId(roomId, backend) + + e.mu.Lock() + defer e.mu.Unlock() + sub, found := e.userSubscriptions[key] + if !found { + return + } + + if !sub.removeListener(listener) { + delete(e.userSubscriptions, key) + sub.close() + } +} + +func (e *asyncEventsNats) RegisterSessionListener(sessionId string, backend *Backend, listener AsyncSessionEventListener) error { + key := GetSubjectForSessionId(sessionId, backend) + + e.mu.Lock() + defer e.mu.Unlock() + sub, found := e.sessionSubscriptions[key] + if !found { + var err error + if sub, err = newAsyncSessionSubscriberNats(key, e.client); err != nil { + return err + } + + e.sessionSubscriptions[key] = sub + } + sub.addListener(listener) + return nil +} + +func (e *asyncEventsNats) UnregisterSessionListener(sessionId string, backend *Backend, listener AsyncSessionEventListener) { + key := GetSubjectForSessionId(sessionId, backend) + + e.mu.Lock() + defer e.mu.Unlock() + sub, found := e.sessionSubscriptions[key] + if !found { + return + } + + if !sub.removeListener(listener) { + delete(e.sessionSubscriptions, key) + sub.close() + } +} + +func (e *asyncEventsNats) publish(subject string, message *AsyncMessage) error { + message.SendTime = time.Now() + return e.client.Publish(subject, message) +} + +func (e *asyncEventsNats) PublishBackendRoomMessage(roomId string, backend *Backend, message *AsyncMessage) error { + subject := GetSubjectForBackendRoomId(roomId, backend) + return e.publish(subject, message) +} + +func (e *asyncEventsNats) PublishRoomMessage(roomId string, backend *Backend, message *AsyncMessage) error { + subject := GetSubjectForRoomId(roomId, backend) + return e.publish(subject, message) +} + +func (e *asyncEventsNats) PublishUserMessage(userId string, backend *Backend, message *AsyncMessage) error { + subject := GetSubjectForUserId(userId, backend) + return e.publish(subject, message) +} + +func (e *asyncEventsNats) PublishSessionMessage(sessionId string, backend *Backend, message *AsyncMessage) error { + subject := GetSubjectForSessionId(sessionId, backend) + return e.publish(subject, message) +} diff --git a/async_events_test.go b/async_events_test.go new file mode 100644 index 0000000..3253c83 --- /dev/null +++ b/async_events_test.go @@ -0,0 +1,73 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2022 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "context" + "strings" + "testing" +) + +var ( + eventBackendsForTest = []string{ + "loopback", + "nats", + } +) + +func getAsyncEventsForTest(t *testing.T) AsyncEvents { + var events AsyncEvents + if strings.HasSuffix(t.Name(), "/nats") { + events = getRealAsyncEventsForTest(t) + } else { + events = getLoopbackAsyncEventsForTest(t) + } + t.Cleanup(func() { + events.Close() + }) + return events +} + +func getRealAsyncEventsForTest(t *testing.T) AsyncEvents { + url := startLocalNatsServer(t) + events, err := NewAsyncEvents(url) + if err != nil { + t.Fatal(err) + } + return events +} + +func getLoopbackAsyncEventsForTest(t *testing.T) AsyncEvents { + events, err := NewAsyncEvents(NatsLoopbackUrl) + if err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + nats := (events.(*asyncEventsNats)).client + (nats).(*LoopbackNatsClient).waitForSubscriptionsEmpty(ctx, t) + }) + return events +} diff --git a/backend_server.go b/backend_server.go index 7f1fb19..81120d2 100644 --- a/backend_server.go +++ b/backend_server.go @@ -53,7 +53,7 @@ const ( type BackendServer struct { hub *Hub - nats NatsClient + events AsyncEvents roomSessions RoomSessions version string @@ -123,7 +123,7 @@ func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*Bac return &BackendServer{ hub: hub, - nats: hub.nats, + events: hub.events, roomSessions: hub.roomSessions, version: version, @@ -279,40 +279,46 @@ func (b *BackendServer) parseRequestBody(f func(http.ResponseWriter, *http.Reque } func (b *BackendServer) sendRoomInvite(roomid string, backend *Backend, userids []string, properties *json.RawMessage) { - msg := &ServerMessage{ - Type: "event", - Event: &EventServerMessage{ - Target: "roomlist", - Type: "invite", - Invite: &RoomEventServerMessage{ - RoomId: roomid, - Properties: properties, + msg := &AsyncMessage{ + Type: "message", + Message: &ServerMessage{ + Type: "event", + Event: &EventServerMessage{ + Target: "roomlist", + Type: "invite", + Invite: &RoomEventServerMessage{ + RoomId: roomid, + Properties: properties, + }, }, }, } for _, userid := range userids { - if err := b.nats.PublishMessage(GetSubjectForUserId(userid, backend), msg); err != nil { + if err := b.events.PublishUserMessage(userid, backend, msg); err != nil { log.Printf("Could not publish room invite for user %s in backend %s: %s", userid, backend.Id(), err) } } } func (b *BackendServer) sendRoomDisinvite(roomid string, backend *Backend, reason string, userids []string, sessionids []string) { - msg := &ServerMessage{ - Type: "event", - Event: &EventServerMessage{ - Target: "roomlist", - Type: "disinvite", - Disinvite: &RoomDisinviteEventServerMessage{ - RoomEventServerMessage: RoomEventServerMessage{ - RoomId: roomid, + msg := &AsyncMessage{ + Type: "message", + Message: &ServerMessage{ + Type: "event", + Event: &EventServerMessage{ + Target: "roomlist", + Type: "disinvite", + Disinvite: &RoomDisinviteEventServerMessage{ + RoomEventServerMessage: RoomEventServerMessage{ + RoomId: roomid, + }, + Reason: reason, }, - Reason: reason, }, }, } for _, userid := range userids { - if err := b.nats.PublishMessage(GetSubjectForUserId(userid, backend), msg); err != nil { + if err := b.events.PublishUserMessage(userid, backend, msg); err != nil { log.Printf("Could not publish room disinvite for user %s in backend %s: %s", userid, backend.Id(), err) } } @@ -331,7 +337,7 @@ func (b *BackendServer) sendRoomDisinvite(roomid string, backend *Backend, reaso if sid, err := b.lookupByRoomSessionId(sessionid, nil, timeout); err != nil { log.Printf("Could not lookup by room session %s: %s", sessionid, err) } else if sid != "" { - if err := b.nats.PublishMessage("session."+sid, msg); err != nil { + if err := b.events.PublishSessionMessage(sid, backend, msg); err != nil { log.Printf("Could not publish room disinvite for session %s: %s", sid, err) } } @@ -341,14 +347,17 @@ func (b *BackendServer) sendRoomDisinvite(roomid string, backend *Backend, reaso } func (b *BackendServer) sendRoomUpdate(roomid string, backend *Backend, notified_userids []string, all_userids []string, properties *json.RawMessage) { - msg := &ServerMessage{ - Type: "event", - Event: &EventServerMessage{ - Target: "roomlist", - Type: "update", - Update: &RoomEventServerMessage{ - RoomId: roomid, - Properties: properties, + msg := &AsyncMessage{ + Type: "message", + Message: &ServerMessage{ + Type: "event", + Event: &EventServerMessage{ + Target: "roomlist", + Type: "update", + Update: &RoomEventServerMessage{ + RoomId: roomid, + Properties: properties, + }, }, }, } @@ -362,7 +371,7 @@ func (b *BackendServer) sendRoomUpdate(roomid string, backend *Backend, notified continue } - if err := b.nats.PublishMessage(GetSubjectForUserId(userid, backend), msg); err != nil { + if err := b.events.PublishUserMessage(userid, backend, msg); err != nil { log.Printf("Could not publish room update for user %s in backend %s: %s", userid, backend.Id(), err) } } @@ -458,7 +467,11 @@ func (b *BackendServer) sendRoomIncall(roomid string, backend *Backend, request } } - return b.nats.PublishBackendServerRoomRequest(GetSubjectForBackendRoomId(roomid, backend), request) + message := &AsyncMessage{ + Type: "room", + Room: request, + } + return b.events.PublishBackendRoomMessage(roomid, backend, message) } func (b *BackendServer) sendRoomParticipantsUpdate(roomid string, backend *Backend, request *BackendServerRoomRequest) error { @@ -500,22 +513,30 @@ loop: go func(sessionId string, permissions []Permission) { defer wg.Done() - message := &NatsMessage{ + message := &AsyncMessage{ Type: "permissions", Permissions: permissions, } - if err := b.nats.Publish("session."+sessionId, message); err != nil { + if err := b.events.PublishSessionMessage(sessionId, backend, message); err != nil { log.Printf("Could not send permissions update (%+v) to session %s: %s", permissions, sessionId, err) } }(sessionId, permissions) } wg.Wait() - return b.nats.PublishBackendServerRoomRequest(GetSubjectForBackendRoomId(roomid, backend), request) + message := &AsyncMessage{ + Type: "room", + Room: request, + } + return b.events.PublishBackendRoomMessage(roomid, backend, message) } func (b *BackendServer) sendRoomMessage(roomid string, backend *Backend, request *BackendServerRoomRequest) error { - return b.nats.PublishBackendServerRoomRequest(GetSubjectForBackendRoomId(roomid, backend), request) + message := &AsyncMessage{ + Type: "room", + Room: request, + } + return b.events.PublishBackendRoomMessage(roomid, backend, message) } func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body []byte) { @@ -580,10 +601,18 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body b.sendRoomDisinvite(roomid, backend, DisinviteReasonDisinvited, request.Disinvite.UserIds, request.Disinvite.SessionIds) b.sendRoomUpdate(roomid, backend, request.Disinvite.UserIds, request.Disinvite.AllUserIds, request.Disinvite.Properties) case "update": - err = b.nats.PublishBackendServerRoomRequest(GetSubjectForBackendRoomId(roomid, backend), &request) + message := &AsyncMessage{ + Type: "room", + Room: &request, + } + err = b.events.PublishBackendRoomMessage(roomid, backend, message) b.sendRoomUpdate(roomid, backend, nil, request.Update.UserIds, request.Update.Properties) case "delete": - err = b.nats.PublishBackendServerRoomRequest(GetSubjectForBackendRoomId(roomid, backend), &request) + message := &AsyncMessage{ + Type: "room", + Room: &request, + } + err = b.events.PublishBackendRoomMessage(roomid, backend, message) b.sendRoomDisinvite(roomid, backend, DisinviteReasonDeleted, request.Delete.UserIds, nil) case "incall": err = b.sendRoomIncall(roomid, backend, &request) diff --git a/backend_server_test.go b/backend_server_test.go index 82716dd..e14b740 100644 --- a/backend_server_test.go +++ b/backend_server_test.go @@ -42,7 +42,6 @@ import ( "github.com/dlintw/goconf" "github.com/gorilla/mux" "github.com/gorilla/websocket" - "github.com/nats-io/nats.go" ) var ( @@ -52,11 +51,11 @@ var ( turnServers = strings.Split(turnServersString, ",") ) -func CreateBackendServerForTest(t *testing.T) (*goconf.ConfigFile, *BackendServer, NatsClient, *Hub, *mux.Router, *httptest.Server) { +func CreateBackendServerForTest(t *testing.T) (*goconf.ConfigFile, *BackendServer, AsyncEvents, *Hub, *mux.Router, *httptest.Server) { return CreateBackendServerForTestFromConfig(t, nil) } -func CreateBackendServerForTestWithTurn(t *testing.T) (*goconf.ConfigFile, *BackendServer, NatsClient, *Hub, *mux.Router, *httptest.Server) { +func CreateBackendServerForTestWithTurn(t *testing.T) (*goconf.ConfigFile, *BackendServer, AsyncEvents, *Hub, *mux.Router, *httptest.Server) { config := goconf.NewConfigFile() config.AddOption("turn", "apikey", turnApiKey) config.AddOption("turn", "secret", turnSecret) @@ -64,11 +63,14 @@ func CreateBackendServerForTestWithTurn(t *testing.T) (*goconf.ConfigFile, *Back return CreateBackendServerForTestFromConfig(t, config) } -func CreateBackendServerForTestFromConfig(t *testing.T, config *goconf.ConfigFile) (*goconf.ConfigFile, *BackendServer, NatsClient, *Hub, *mux.Router, *httptest.Server) { +func CreateBackendServerForTestFromConfig(t *testing.T, config *goconf.ConfigFile) (*goconf.ConfigFile, *BackendServer, AsyncEvents, *Hub, *mux.Router, *httptest.Server) { r := mux.NewRouter() registerBackendHandler(t, r) server := httptest.NewServer(r) + t.Cleanup(func() { + server.Close() + }) if config == nil { config = goconf.NewConfigFile() } @@ -85,11 +87,8 @@ func CreateBackendServerForTestFromConfig(t *testing.T, config *goconf.ConfigFil config.AddOption("sessions", "blockkey", "09876543210987654321098765432109") config.AddOption("clients", "internalsecret", string(testInternalSecret)) config.AddOption("geoip", "url", "none") - nats, err := NewLoopbackNatsClient() - if err != nil { - t.Fatal(err) - } - hub, err := NewHub(config, nats, r, "no-version") + events := getAsyncEventsForTest(t) + hub, err := NewHub(config, events, r, "no-version") if err != nil { t.Fatal(err) } @@ -108,12 +107,9 @@ func CreateBackendServerForTestFromConfig(t *testing.T, config *goconf.ConfigFil defer cancel() WaitForHub(ctx, t, hub) - (nats).(*LoopbackNatsClient).waitForSubscriptionsEmpty(ctx, t) - nats.Close() - server.Close() }) - return config, b, nats, hub, r, server + return config, b, events, hub, r, server } func performBackendRequest(url string, body []byte) (*http.Response, error) { @@ -131,23 +127,16 @@ func performBackendRequest(url string, body []byte) (*http.Response, error) { return client.Do(request) } -func expectRoomlistEvent(n NatsClient, ch chan *nats.Msg, subject string, msgType string) (*EventServerMessage, error) { +func expectRoomlistEvent(ch chan *AsyncMessage, msgType string) (*EventServerMessage, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() select { case message := <-ch: - if message.Subject != subject { - return nil, fmt.Errorf("Expected subject %s, got %s", subject, message.Subject) - } - var natsMsg NatsMessage - if err := n.Decode(message, &natsMsg); err != nil { - return nil, err - } - if natsMsg.Type != "message" || natsMsg.Message == nil { - return nil, fmt.Errorf("Expected message type message, got %+v", natsMsg) + if message.Type != "message" || message.Message == nil { + return nil, fmt.Errorf("Expected message type message, got %+v", message) } - msg := natsMsg.Message + msg := message.Message if msg.Type != "event" || msg.Event == nil { return nil, fmt.Errorf("Expected message type event, got %+v", msg) } @@ -309,7 +298,23 @@ func TestBackendServer_UnsupportedRequest(t *testing.T) { } func TestBackendServer_RoomInvite(t *testing.T) { - _, _, n, hub, _, server := CreateBackendServerForTest(t) + for _, backend := range eventBackendsForTest { + t.Run(backend, func(t *testing.T) { + RunTestBackendServer_RoomInvite(t) + }) + } +} + +type channelEventListener struct { + ch chan *AsyncMessage +} + +func (l *channelEventListener) ProcessAsyncUserMessage(message *AsyncMessage) { + l.ch <- message +} + +func RunTestBackendServer_RoomInvite(t *testing.T) { + _, _, events, hub, _, server := CreateBackendServerForTest(t) u, err := url.Parse(server.URL) if err != nil { @@ -320,17 +325,14 @@ func TestBackendServer_RoomInvite(t *testing.T) { roomProperties := json.RawMessage("{\"foo\":\"bar\"}") backend := hub.backend.GetBackend(u) - natsChan := make(chan *nats.Msg, 1) - subject := GetSubjectForUserId(userid, backend) - sub, err := n.Subscribe(subject, natsChan) - if err != nil { + eventsChan := make(chan *AsyncMessage, 1) + listener := &channelEventListener{ + ch: eventsChan, + } + if err := events.RegisterUserListener(userid, backend, listener); err != nil { t.Fatal(err) } - defer func() { - if err := sub.Unsubscribe(); err != nil { - t.Error(err) - } - }() + defer events.UnregisterUserListener(userid, backend, listener) msg := &BackendServerRoomRequest{ Type: "invite", @@ -363,7 +365,7 @@ func TestBackendServer_RoomInvite(t *testing.T) { t.Errorf("Expected successful request, got %s: %s", res.Status, string(body)) } - event, err := expectRoomlistEvent(n, natsChan, subject, "invite") + event, err := expectRoomlistEvent(eventsChan, "invite") if err != nil { t.Error(err) } else if event.Invite == nil { @@ -376,7 +378,15 @@ func TestBackendServer_RoomInvite(t *testing.T) { } func TestBackendServer_RoomDisinvite(t *testing.T) { - _, _, n, hub, _, server := CreateBackendServerForTest(t) + for _, backend := range eventBackendsForTest { + t.Run(backend, func(t *testing.T) { + RunTestBackendServer_RoomDisinvite(t) + }) + } +} + +func RunTestBackendServer_RoomDisinvite(t *testing.T) { + _, _, events, hub, _, server := CreateBackendServerForTest(t) u, err := url.Parse(server.URL) if err != nil { @@ -414,17 +424,14 @@ func TestBackendServer_RoomDisinvite(t *testing.T) { roomProperties := json.RawMessage("{\"foo\":\"bar\"}") - natsChan := make(chan *nats.Msg, 1) - subject := GetSubjectForUserId(testDefaultUserId, backend) - sub, err := n.Subscribe(subject, natsChan) - if err != nil { + eventsChan := make(chan *AsyncMessage, 1) + listener := &channelEventListener{ + ch: eventsChan, + } + if err := events.RegisterUserListener(testDefaultUserId, backend, listener); err != nil { t.Fatal(err) } - defer func() { - if err := sub.Unsubscribe(); err != nil { - t.Error(err) - } - }() + defer events.UnregisterUserListener(testDefaultUserId, backend, listener) msg := &BackendServerRoomRequest{ Type: "disinvite", @@ -457,7 +464,7 @@ func TestBackendServer_RoomDisinvite(t *testing.T) { t.Errorf("Expected successful request, got %s: %s", res.Status, string(body)) } - event, err := expectRoomlistEvent(n, natsChan, subject, "disinvite") + event, err := expectRoomlistEvent(eventsChan, "disinvite") if err != nil { t.Error(err) } else if event.Disinvite == nil { @@ -606,11 +613,18 @@ func TestBackendServer_RoomDisinviteDifferentRooms(t *testing.T) { } else if message.RoomId != roomId2 { t.Errorf("Expected message for room %s, got %s", roomId2, message.RoomId) } - } func TestBackendServer_RoomUpdate(t *testing.T) { - _, _, n, hub, _, server := CreateBackendServerForTest(t) + for _, backend := range eventBackendsForTest { + t.Run(backend, func(t *testing.T) { + RunTestBackendServer_RoomUpdate(t) + }) + } +} + +func RunTestBackendServer_RoomUpdate(t *testing.T) { + _, _, events, hub, _, server := CreateBackendServerForTest(t) u, err := url.Parse(server.URL) if err != nil { @@ -632,17 +646,14 @@ func TestBackendServer_RoomUpdate(t *testing.T) { userid := "test-userid" roomProperties := json.RawMessage("{\"foo\":\"bar\"}") - natsChan := make(chan *nats.Msg, 1) - subject := GetSubjectForUserId(userid, backend) - sub, err := n.Subscribe(subject, natsChan) - if err != nil { + eventsChan := make(chan *AsyncMessage, 1) + listener := &channelEventListener{ + ch: eventsChan, + } + if err := events.RegisterUserListener(userid, backend, listener); err != nil { t.Fatal(err) } - defer func() { - if err := sub.Unsubscribe(); err != nil { - t.Error(err) - } - }() + defer events.UnregisterUserListener(userid, backend, listener) msg := &BackendServerRoomRequest{ Type: "update", @@ -671,7 +682,7 @@ func TestBackendServer_RoomUpdate(t *testing.T) { t.Errorf("Expected successful request, got %s: %s", res.Status, string(body)) } - event, err := expectRoomlistEvent(n, natsChan, subject, "update") + event, err := expectRoomlistEvent(eventsChan, "update") if err != nil { t.Error(err) } else if event.Update == nil { @@ -682,7 +693,7 @@ func TestBackendServer_RoomUpdate(t *testing.T) { t.Errorf("Room properties don't match: expected %s, got %s", string(roomProperties), string(*event.Update.Properties)) } - // TODO: Use event to wait for NATS messages. + // TODO: Use event to wait for asynchronous messages. time.Sleep(10 * time.Millisecond) room = hub.getRoom(roomId) @@ -695,7 +706,15 @@ func TestBackendServer_RoomUpdate(t *testing.T) { } func TestBackendServer_RoomDelete(t *testing.T) { - _, _, n, hub, _, server := CreateBackendServerForTest(t) + for _, backend := range eventBackendsForTest { + t.Run(backend, func(t *testing.T) { + RunTestBackendServer_RoomDelete(t) + }) + } +} + +func RunTestBackendServer_RoomDelete(t *testing.T) { + _, _, events, hub, _, server := CreateBackendServerForTest(t) u, err := url.Parse(server.URL) if err != nil { @@ -713,18 +732,14 @@ func TestBackendServer_RoomDelete(t *testing.T) { } userid := "test-userid" - - natsChan := make(chan *nats.Msg, 1) - subject := GetSubjectForUserId(userid, backend) - sub, err := n.Subscribe(subject, natsChan) - if err != nil { + eventsChan := make(chan *AsyncMessage, 1) + listener := &channelEventListener{ + ch: eventsChan, + } + if err := events.RegisterUserListener(userid, backend, listener); err != nil { t.Fatal(err) } - defer func() { - if err := sub.Unsubscribe(); err != nil { - t.Error(err) - } - }() + defer events.UnregisterUserListener(userid, backend, listener) msg := &BackendServerRoomRequest{ Type: "delete", @@ -753,7 +768,7 @@ func TestBackendServer_RoomDelete(t *testing.T) { } // A deleted room is signalled as a "disinvite" event. - event, err := expectRoomlistEvent(n, natsChan, subject, "disinvite") + event, err := expectRoomlistEvent(eventsChan, "disinvite") if err != nil { t.Error(err) } else if event.Disinvite == nil { @@ -766,7 +781,7 @@ func TestBackendServer_RoomDelete(t *testing.T) { t.Errorf("Reason should be deleted, got %s", event.Disinvite.Reason) } - // TODO: Use event to wait for NATS messages. + // TODO: Use event to wait for asynchronous messages. time.Sleep(10 * time.Millisecond) room := hub.getRoom(roomId) @@ -880,7 +895,7 @@ func TestBackendServer_ParticipantsUpdatePermissions(t *testing.T) { t.Errorf("Expected successful request, got %s: %s", res.Status, string(body)) } - // TODO: Use event to wait for NATS messages. + // TODO: Use event to wait for asynchronous messages. time.Sleep(10 * time.Millisecond) assertSessionHasPermission(t, session1, PERMISSION_MAY_PUBLISH_MEDIA) @@ -967,7 +982,7 @@ func TestBackendServer_ParticipantsUpdateEmptyPermissions(t *testing.T) { t.Errorf("Expected successful request, got %s: %s", res.Status, string(body)) } - // TODO: Use event to wait for NATS messages. + // TODO: Use event to wait for asynchronous messages. time.Sleep(10 * time.Millisecond) assertSessionHasNotPermission(t, session, PERMISSION_MAY_PUBLISH_MEDIA) diff --git a/clientsession.go b/clientsession.go index 5dc1ec7..3d447ef 100644 --- a/clientsession.go +++ b/clientsession.go @@ -33,7 +33,6 @@ import ( "time" "unsafe" - "github.com/nats-io/nats.go" "github.com/pion/sdp" ) @@ -50,8 +49,8 @@ var ( type ClientSession struct { roomJoinTime int64 - running int32 hub *Hub + events AsyncEvents privateId string publicId string data *SessionIdData @@ -68,10 +67,7 @@ type ClientSession struct { backendUrl string parsedBackendUrl *url.URL - natsReceiver chan *nats.Msg - stopRun chan bool - runStopped chan bool - expires time.Time + expires time.Time mu sync.Mutex @@ -79,10 +75,6 @@ type ClientSession struct { room unsafe.Pointer roomSessionId string - userSubscription NatsSubscription - sessionSubscription NatsSubscription - roomSubscription NatsSubscription - publishers map[string]McuPublisher subscribers map[string]McuSubscriber @@ -96,6 +88,7 @@ type ClientSession struct { func NewClientSession(hub *Hub, privateId string, publicId string, data *SessionIdData, backend *Backend, hello *HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) { s := &ClientSession{ hub: hub, + events: hub.events, privateId: privateId, publicId: publicId, data: data, @@ -106,10 +99,6 @@ func NewClientSession(hub *Hub, privateId string, publicId string, data *Session userData: auth.User, backend: backend, - - natsReceiver: make(chan *nats.Msg, 64), - stopRun: make(chan bool, 1), - runStopped: make(chan bool, 1), } if s.clientType == HelloClientTypeInternal { s.backendUrl = hello.Auth.internalParams.Backend @@ -137,11 +126,9 @@ func NewClientSession(hub *Hub, privateId string, publicId string, data *Session s.parsedBackendUrl = u } - if err := s.SubscribeNats(hub.nats); err != nil { + if err := s.SubscribeEvents(); err != nil { return nil, err } - atomic.StoreInt32(&s.running, 1) - go s.run() return s, nil } @@ -298,19 +285,6 @@ func (s *ClientSession) UserData() *json.RawMessage { return s.userData } -func (s *ClientSession) run() { -loop: - for { - select { - case msg := <-s.natsReceiver: - s.processClientMessage(msg) - case <-s.stopRun: - break loop - } - } - s.runStopped <- true -} - func (s *ClientSession) StartExpire() { // The hub mutex must be held when calling this method. s.expires = time.Now().Add(sessionExpireDuration) @@ -378,18 +352,10 @@ func (s *ClientSession) closeAndWait(wait bool) { s.mu.Lock() defer s.mu.Unlock() - if s.userSubscription != nil { - if err := s.userSubscription.Unsubscribe(); err != nil { - log.Printf("Error closing user subscription in session %s: %s", s.PublicId(), err) - } - s.userSubscription = nil - } - if s.sessionSubscription != nil { - if err := s.sessionSubscription.Unsubscribe(); err != nil { - log.Printf("Error closing session subscription in session %s: %s", s.PublicId(), err) - } - s.sessionSubscription = nil + if s.userId != "" { + s.events.UnregisterUserListener(s.userId, s.backend, s) } + s.events.UnregisterSessionListener(s.publicId, s.backend, s) go func(virtualSessions map[*VirtualSession]bool) { for session := range virtualSessions { session.Close() @@ -399,56 +365,32 @@ func (s *ClientSession) closeAndWait(wait bool) { s.releaseMcuObjects() s.clearClientLocked(nil) s.backend.RemoveSession(s) - if atomic.CompareAndSwapInt32(&s.running, 1, 0) { - s.stopRun <- true - // Only wait if called from outside the Session goroutine. - if wait { - s.mu.Unlock() - // Wait for Session goroutine to stop - <-s.runStopped - s.mu.Lock() - } - } } -func GetSubjectForUserId(userId string, backend *Backend) string { - if backend == nil || backend.IsCompat() { - return GetEncodedSubject("user", userId) - } - - return GetEncodedSubject("user", userId+"|"+backend.Id()) -} - -func (s *ClientSession) SubscribeNats(n NatsClient) error { +func (s *ClientSession) SubscribeEvents() error { s.mu.Lock() defer s.mu.Unlock() - var err error if s.userId != "" { - if s.userSubscription, err = n.Subscribe(GetSubjectForUserId(s.userId, s.backend), s.natsReceiver); err != nil { + if err := s.events.RegisterUserListener(s.userId, s.backend, s); err != nil { return err } } - if s.sessionSubscription, err = n.Subscribe("session."+s.publicId, s.natsReceiver); err != nil { - return err - } - - return nil + return s.events.RegisterSessionListener(s.publicId, s.backend, s) } -func (s *ClientSession) SubscribeRoomNats(n NatsClient, roomid string, roomSessionId string) error { +func (s *ClientSession) SubscribeRoomEvents(roomid string, roomSessionId string) error { s.mu.Lock() defer s.mu.Unlock() - var err error - if s.roomSubscription, err = n.Subscribe(GetSubjectForRoomId(roomid, s.Backend()), s.natsReceiver); err != nil { + if err := s.events.RegisterRoomListener(roomid, s.backend, s); err != nil { return err } if roomSessionId != "" { - if err = s.hub.roomSessions.SetRoomSession(s, roomSessionId); err != nil { - s.doUnsubscribeRoomNats(true) + if err := s.hub.roomSessions.SetRoomSession(s, roomSessionId); err != nil { + s.doUnsubscribeRoomEvents(true) return err } } @@ -479,29 +421,26 @@ func (s *ClientSession) LeaveRoom(notify bool) *Room { return nil } - s.doUnsubscribeRoomNats(notify) + s.doUnsubscribeRoomEvents(notify) s.SetRoom(nil) s.releaseMcuObjects() room.RemoveSession(s) return room } -func (s *ClientSession) UnsubscribeRoomNats() { +func (s *ClientSession) UnsubscribeRoomEvents() { s.mu.Lock() defer s.mu.Unlock() - s.doUnsubscribeRoomNats(true) + s.doUnsubscribeRoomEvents(true) } -func (s *ClientSession) doUnsubscribeRoomNats(notify bool) { - if s.roomSubscription != nil { - if err := s.roomSubscription.Unsubscribe(); err != nil { - log.Printf("Error closing room subscription in session %s: %s", s.PublicId(), err) - } - s.roomSubscription = nil +func (s *ClientSession) doUnsubscribeRoomEvents(notify bool) { + room := s.GetRoom() + if room != nil { + s.events.UnregisterRoomListener(room.Id(), s.Backend(), s) } s.hub.roomSessions.DeleteRoomSession(s) - room := s.GetRoom() if notify && room != nil && s.roomSessionId != "" { // Notify go func(sid string) { @@ -967,13 +906,19 @@ func (s *ClientSession) GetSubscriber(id string, streamType string) McuSubscribe return s.subscribers[id+"|"+streamType] } -func (s *ClientSession) processClientMessage(msg *nats.Msg) { - var message NatsMessage - if err := s.hub.nats.Decode(msg, &message); err != nil { - log.Printf("Could not decode NATS message %+v for session %s: %s", *msg, s.PublicId(), err) - return - } +func (s *ClientSession) ProcessAsyncRoomMessage(message *AsyncMessage) { + s.processAsyncMessage(message) +} +func (s *ClientSession) ProcessAsyncUserMessage(message *AsyncMessage) { + s.processAsyncMessage(message) +} + +func (s *ClientSession) ProcessAsyncSessionMessage(message *AsyncMessage) { + s.processAsyncMessage(message) +} + +func (s *ClientSession) processAsyncMessage(message *AsyncMessage) { switch message.Type { case "permissions": s.SetPermissions(message.Permissions) @@ -1017,7 +962,7 @@ func (s *ClientSession) processClientMessage(msg *nats.Msg) { } } - serverMessage := s.processNatsMessage(&message) + serverMessage := s.filterAsyncMessage(message) if serverMessage == nil { return } @@ -1147,11 +1092,11 @@ func (s *ClientSession) filterMessage(message *ServerMessage) *ServerMessage { return message } -func (s *ClientSession) processNatsMessage(msg *NatsMessage) *ServerMessage { +func (s *ClientSession) filterAsyncMessage(msg *AsyncMessage) *ServerMessage { switch msg.Type { case "message": if msg.Message == nil { - log.Printf("Received NATS message without payload: %+v", msg) + log.Printf("Received asynchronous message without payload: %+v", msg) return nil } @@ -1172,7 +1117,7 @@ func (s *ClientSession) processNatsMessage(msg *NatsMessage) *ServerMessage { } case "event": if msg.Message.Event.Target == "room" { - // Can happen mostly during tests where an older room NATS message + // Can happen mostly during tests where an older room async message // could be received by a subscriber that joined after it was sent. if joined := s.getRoomJoinTime(); joined.IsZero() || msg.SendTime.Before(joined) { log.Printf("Message %+v was sent before room was joined, ignoring", msg.Message) @@ -1183,7 +1128,7 @@ func (s *ClientSession) processNatsMessage(msg *NatsMessage) *ServerMessage { return msg.Message default: - log.Printf("Received NATS message with unsupported type %s: %+v", msg.Type, msg) + log.Printf("Received async message with unsupported type %s: %+v", msg.Type, msg) return nil } } diff --git a/hub.go b/hub.go index c7e131c..9675dae 100644 --- a/hub.go +++ b/hub.go @@ -103,7 +103,7 @@ type Hub struct { // 64-bit members that are accessed atomically must be 64-bit aligned. sid uint64 - nats NatsClient + events AsyncEvents upgrader websocket.Upgrader cookie *securecookie.SecureCookie info *HelloServerMessageServer @@ -150,7 +150,7 @@ type Hub struct { geoipUpdating int32 } -func NewHub(config *goconf.ConfigFile, nats NatsClient, r *mux.Router, version string) (*Hub, error) { +func NewHub(config *goconf.ConfigFile, events AsyncEvents, r *mux.Router, version string) (*Hub, error) { hashKey, _ := config.GetString("sessions", "hashkey") switch len(hashKey) { case 32: @@ -292,7 +292,7 @@ func NewHub(config *goconf.ConfigFile, nats NatsClient, r *mux.Router, version s } hub := &Hub{ - nats: nats, + events: events, upgrader: websocket.Upgrader{ ReadBufferSize: websocketReadBufferSize, WriteBufferSize: websocketWriteBufferSize, @@ -999,7 +999,7 @@ func (h *Hub) processHelloInternal(client *Client, message *ClientMessage) { h.processRegister(client, message, backend, auth) } -func (h *Hub) disconnectByRoomSessionId(roomSessionId string) { +func (h *Hub) disconnectByRoomSessionId(roomSessionId string, backend *Backend) { sessionId, err := h.roomSessions.GetSessionId(roomSessionId) if err == ErrNoSuchRoomSession { return @@ -1011,13 +1011,16 @@ func (h *Hub) disconnectByRoomSessionId(roomSessionId string) { session := h.GetSessionByPublicId(sessionId) if session == nil { // Session is located on a different server. - msg := &ServerMessage{ - Type: "bye", - Bye: &ByeServerMessage{ - Reason: "room_session_reconnected", + msg := &AsyncMessage{ + Type: "message", + Message: &ServerMessage{ + Type: "bye", + Bye: &ByeServerMessage{ + Reason: "room_session_reconnected", + }, }, } - if err := h.nats.PublishMessage("session."+sessionId, msg); err != nil { + if err := h.events.PublishSessionMessage(sessionId, backend, msg); err != nil { log.Printf("Could not send reconnect bye to session %s: %s", sessionId, err) } return @@ -1111,7 +1114,7 @@ func (h *Hub) processRoom(client *Client, message *ClientMessage) { if message.Room.SessionId != "" { // There can only be one connection per Nextcloud Talk session, // disconnect any other connections without sending a "leave" event. - h.disconnectByRoomSessionId(message.Room.SessionId) + h.disconnectByRoomSessionId(message.Room.SessionId, session.Backend()) } } @@ -1139,7 +1142,7 @@ func (h *Hub) removeRoom(room *Room) { func (h *Hub) createRoom(id string, properties *json.RawMessage, backend *Backend) (*Room, error) { // Note the write lock must be held. - room, err := NewRoom(id, properties, h, h.nats, backend) + room, err := NewRoom(id, properties, h, h.events, backend) if err != nil { return nil, err } @@ -1163,7 +1166,7 @@ func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, ro roomId := room.Room.RoomId internalRoomId := getRoomIdForBackend(roomId, session.Backend()) - if err := session.SubscribeRoomNats(h.nats, roomId, message.Room.SessionId); err != nil { + if err := session.SubscribeRoomEvents(roomId, message.Room.SessionId); err != nil { session.SendMessage(message.NewWrappedErrorServerMessage(err)) // The client (implicitly) left the room due to an error. h.sendRoom(session, nil, nil) @@ -1178,7 +1181,7 @@ func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, ro h.ru.Unlock() session.SendMessage(message.NewWrappedErrorServerMessage(err)) // The client (implicitly) left the room due to an error. - session.UnsubscribeRoomNats() + session.UnsubscribeRoomEvents() h.sendRoom(session, nil, nil) return } @@ -1223,7 +1226,7 @@ func (h *Hub) notifyUserJoinedRoom(room *Room, session *ClientSession, sessionDa }, } - // No need to send through NATS, the session is connected locally. + // No need to send through asynchronous events, the session is connected locally. session.SendMessage(msg) // Notify about initial flags of virtual sessions. @@ -1251,7 +1254,7 @@ func (h *Hub) notifyUserJoinedRoom(room *Room, session *ClientSession, sessionDa }, } - // No need to send through NATS, the session is connected locally. + // No need to send through asynchronous events, the session is connected locally. session.SendMessage(msg) } } @@ -1269,6 +1272,8 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { var subject string var clientData *MessageClientMessageData var serverRecipient *MessageClientMessageRecipient + var recipientSessionId string + var room *Room switch msg.Recipient.Type { case RecipientTypeSession: data := h.decodeSessionId(msg.Recipient.SessionId, publicSessionName) @@ -1310,6 +1315,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { } subject = "session." + msg.Recipient.SessionId + recipientSessionId = msg.Recipient.SessionId h.mu.RLock() sess, found := h.sessions[data.Sid] if found { @@ -1322,6 +1328,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { virtualSession := sess.(*VirtualSession) clientSession := virtualSession.Session() subject = "session." + clientSession.PublicId() + recipientSessionId = clientSession.PublicId() recipient = clientSession // The client should see his session id as recipient. serverRecipient = &MessageClientMessageRecipient{ @@ -1345,7 +1352,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { } case RecipientTypeRoom: if session != nil { - if room := session.GetRoom(); room != nil { + if room = session.GetRoom(); room != nil { subject = GetSubjectForRoomId(room.Id(), room.Backend()) if h.mcu != nil { @@ -1398,7 +1405,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { }, } if recipient != nil { - // The recipient is connected to this instance, no need to go through NATS. + // The recipient is connected to this instance, no need to go through asynchronous events. if clientData != nil && clientData.Type == "sendoffer" { if err := session.IsAllowedToSend(clientData); err != nil { log.Printf("Session %s is not allowed to send offer for %s, ignoring (%s)", session.PublicId(), clientData.RoomType, err) @@ -1420,7 +1427,24 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { log.Printf("Sending offers to remote clients is not supported yet (client %s)", session.PublicId()) return } - if err := h.nats.PublishMessage(subject, response); err != nil { + + async := &AsyncMessage{ + Type: "message", + Message: response, + } + var err error + switch msg.Recipient.Type { + case RecipientTypeSession: + err = h.events.PublishSessionMessage(recipientSessionId, session.Backend(), async) + case RecipientTypeUser: + err = h.events.PublishUserMessage(msg.Recipient.UserId, session.Backend(), async) + case RecipientTypeRoom: + err = h.events.PublishRoomMessage(room.Id(), session.Backend(), async) + default: + err = fmt.Errorf("unsupported recipient type: %s", msg.Recipient.Type) + } + + if err != nil { log.Printf("Error publishing message to remote session: %s", err) } } @@ -1454,6 +1478,8 @@ func (h *Hub) processControlMsg(client *Client, message *ClientMessage) { var recipient *Client var subject string var serverRecipient *MessageClientMessageRecipient + var recipientSessionId string + var room *Room switch msg.Recipient.Type { case RecipientTypeSession: data := h.decodeSessionId(msg.Recipient.SessionId, publicSessionName) @@ -1464,6 +1490,7 @@ func (h *Hub) processControlMsg(client *Client, message *ClientMessage) { } subject = "session." + msg.Recipient.SessionId + recipientSessionId = msg.Recipient.SessionId h.mu.RLock() recipient = h.clients[data.Sid] if recipient == nil { @@ -1473,6 +1500,7 @@ func (h *Hub) processControlMsg(client *Client, message *ClientMessage) { virtualSession := sess.(*VirtualSession) clientSession := virtualSession.Session() subject = "session." + clientSession.PublicId() + recipientSessionId = clientSession.PublicId() recipient = clientSession.GetClient() // The client should see his session id as recipient. serverRecipient = &MessageClientMessageRecipient{ @@ -1496,7 +1524,7 @@ func (h *Hub) processControlMsg(client *Client, message *ClientMessage) { } case RecipientTypeRoom: if session != nil { - if room := session.GetRoom(); room != nil { + if room = session.GetRoom(); room != nil { subject = GetSubjectForRoomId(room.Id(), room.Backend()) } } @@ -1521,7 +1549,22 @@ func (h *Hub) processControlMsg(client *Client, message *ClientMessage) { if recipient != nil { recipient.SendMessage(response) } else { - if err := h.nats.PublishMessage(subject, response); err != nil { + async := &AsyncMessage{ + Type: "message", + Message: response, + } + var err error + switch msg.Recipient.Type { + case RecipientTypeSession: + err = h.events.PublishSessionMessage(recipientSessionId, session.Backend(), async) + case RecipientTypeUser: + err = h.events.PublishUserMessage(msg.Recipient.UserId, session.Backend(), async) + case RecipientTypeRoom: + err = h.events.PublishRoomMessage(room.Id(), room.Backend(), async) + default: + err = fmt.Errorf("unsupported recipient type: %s", msg.Recipient.Type) + } + if err != nil { log.Printf("Error publishing message to remote session: %s", err) } } diff --git a/hub_test.go b/hub_test.go index 24e47f7..39f9a50 100644 --- a/hub_test.go +++ b/hub_test.go @@ -97,20 +97,21 @@ func getTestConfigWithMultipleBackends(server *httptest.Server) (*goconf.ConfigF return config, nil } -func CreateHubForTestWithConfig(t *testing.T, getConfigFunc func(*httptest.Server) (*goconf.ConfigFile, error)) (*Hub, NatsClient, *mux.Router, *httptest.Server) { +func CreateHubForTestWithConfig(t *testing.T, getConfigFunc func(*httptest.Server) (*goconf.ConfigFile, error)) (*Hub, AsyncEvents, *mux.Router, *httptest.Server) { r := mux.NewRouter() registerBackendHandler(t, r) server := httptest.NewServer(r) - nats, err := NewLoopbackNatsClient() - if err != nil { - t.Fatal(err) - } + t.Cleanup(func() { + server.Close() + }) + + events := getAsyncEventsForTest(t) config, err := getConfigFunc(server) if err != nil { t.Fatal(err) } - h, err := NewHub(config, nats, r, "no-version") + h, err := NewHub(config, events, r, "no-version") if err != nil { t.Fatal(err) } @@ -129,23 +130,20 @@ func CreateHubForTestWithConfig(t *testing.T, getConfigFunc func(*httptest.Serve defer cancel() WaitForHub(ctx, t, h) - (nats).(*LoopbackNatsClient).waitForSubscriptionsEmpty(ctx, t) - nats.Close() - server.Close() }) - return h, nats, r, server + return h, events, r, server } -func CreateHubForTest(t *testing.T) (*Hub, NatsClient, *mux.Router, *httptest.Server) { +func CreateHubForTest(t *testing.T) (*Hub, AsyncEvents, *mux.Router, *httptest.Server) { return CreateHubForTestWithConfig(t, getTestConfig) } -func CreateHubWithMultipleBackendsForTest(t *testing.T) (*Hub, NatsClient, *mux.Router, *httptest.Server) { - h, nats, r, server := CreateHubForTestWithConfig(t, getTestConfigWithMultipleBackends) +func CreateHubWithMultipleBackendsForTest(t *testing.T) (*Hub, AsyncEvents, *mux.Router, *httptest.Server) { + h, events, r, server := CreateHubForTestWithConfig(t, getTestConfigWithMultipleBackends) registerBackendHandlerUrl(t, r, "/one") registerBackendHandlerUrl(t, r, "/two") - return h, nats, r, server + return h, events, r, server } func WaitForHub(ctx context.Context, t *testing.T, h *Hub) { @@ -178,7 +176,6 @@ func WaitForHub(ctx context.Context, t *testing.T, h *Hub) { time.Sleep(time.Millisecond) } } - } func validateBackendChecksum(t *testing.T, f func(http.ResponseWriter, *http.Request, *BackendClientRequest) *BackendClientResponse) func(http.ResponseWriter, *http.Request) { @@ -1531,7 +1528,7 @@ func TestClientMessageToUserIdMultipleSessions(t *testing.T) { func WaitForUsersJoined(ctx context.Context, t *testing.T, client1 *TestClient, hello1 *ServerMessage, client2 *TestClient, hello2 *ServerMessage) { // We will receive "joined" events for all clients. The ordering is not - // defined as messages are processed and sent by asynchronous NATS handlers. + // defined as messages are processed and sent by asynchronous event handlers. msg1_1, err := client1.RunUntilMessage(ctx) if err != nil { t.Error(err) @@ -2244,7 +2241,7 @@ func TestRoomParticipantsListUpdateWhileDisconnected(t *testing.T) { room.PublishUsersInCallChanged(users, users) - // Give NATS message some time to be processed. + // Give asynchronous events some time to be processed. time.Sleep(100 * time.Millisecond) recipient2 := MessageClientMessageRecipient{ @@ -2305,6 +2302,14 @@ func TestRoomParticipantsListUpdateWhileDisconnected(t *testing.T) { } func TestClientTakeoverRoomSession(t *testing.T) { + for _, backend := range eventBackendsForTest { + t.Run(backend, func(t *testing.T) { + RunTestClientTakeoverRoomSession(t) + }) + } +} + +func RunTestClientTakeoverRoomSession(t *testing.T) { hub, _, _, server := CreateHubForTest(t) client1 := NewTestClient(t, server, hub) diff --git a/natsclient.go b/natsclient.go index c82c00b..5cc17ba 100644 --- a/natsclient.go +++ b/natsclient.go @@ -35,22 +35,10 @@ import ( const ( initialConnectInterval = time.Second maxConnectInterval = 8 * time.Second + + NatsLoopbackUrl = "nats://loopback" ) -type NatsMessage struct { - SendTime time.Time `json:"sendtime"` - - Type string `json:"type"` - - Message *ServerMessage `json:"message,omitempty"` - - Room *BackendServerRoomRequest `json:"room,omitempty"` - - Permissions []Permission `json:"permissions,omitempty"` - - Id string `json:"id"` -} - type NatsSubscription interface { Unsubscribe() error } @@ -59,11 +47,7 @@ type NatsClient interface { Close() Subscribe(subject string, ch chan *nats.Msg) (NatsSubscription, error) - Publish(subject string, message interface{}) error - PublishNats(subject string, message *NatsMessage) error - PublishMessage(subject string, message *ServerMessage) error - PublishBackendServerRoomRequest(subject string, message *BackendServerRoomRequest) error Decode(msg *nats.Msg, v interface{}) error } @@ -82,7 +66,11 @@ type natsClient struct { func NewNatsClient(url string) (NatsClient, error) { if url == ":loopback:" { - log.Println("No NATS url configured, using internal loopback client") + log.Printf("WARNING: events url %s is deprecated, please use %s instead", url, NatsLoopbackUrl) + url = NatsLoopbackUrl + } + if url == NatsLoopbackUrl { + log.Println("Using internal NATS loopback client") return NewLoopbackNatsClient() } @@ -148,28 +136,6 @@ func (c *natsClient) Publish(subject string, message interface{}) error { return c.conn.Publish(subject, message) } -func (c *natsClient) PublishNats(subject string, message *NatsMessage) error { - return c.Publish(subject, message) -} - -func (c *natsClient) PublishMessage(subject string, message *ServerMessage) error { - msg := &NatsMessage{ - SendTime: time.Now(), - Type: "message", - Message: message, - } - return c.PublishNats(subject, msg) -} - -func (c *natsClient) PublishBackendServerRoomRequest(subject string, message *BackendServerRoomRequest) error { - msg := &NatsMessage{ - SendTime: time.Now(), - Type: "room", - Room: message, - } - return c.PublishNats(subject, msg) -} - func (c *natsClient) Decode(msg *nats.Msg, v interface{}) error { return c.conn.Enc.Decode(msg.Subject, msg.Data, v) } diff --git a/natsclient_loopback.go b/natsclient_loopback.go index aaa1699..8c95991 100644 --- a/natsclient_loopback.go +++ b/natsclient_loopback.go @@ -27,7 +27,6 @@ import ( "log" "strings" "sync" - "time" "github.com/nats-io/nats.go" ) @@ -170,28 +169,6 @@ func (c *LoopbackNatsClient) Publish(subject string, message interface{}) error return nil } -func (c *LoopbackNatsClient) PublishNats(subject string, message *NatsMessage) error { - return c.Publish(subject, message) -} - -func (c *LoopbackNatsClient) PublishMessage(subject string, message *ServerMessage) error { - msg := &NatsMessage{ - SendTime: time.Now(), - Type: "message", - Message: message, - } - return c.PublishNats(subject, msg) -} - -func (c *LoopbackNatsClient) PublishBackendServerRoomRequest(subject string, message *BackendServerRoomRequest) error { - msg := &NatsMessage{ - SendTime: time.Now(), - Type: "room", - Room: message, - } - return c.PublishNats(subject, msg) -} - func (c *LoopbackNatsClient) Decode(msg *nats.Msg, v interface{}) error { return json.Unmarshal(msg.Data, v) } diff --git a/room.go b/room.go index 1649123..0cd5b22 100644 --- a/room.go +++ b/room.go @@ -32,7 +32,6 @@ import ( "sync" "time" - "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" ) @@ -56,7 +55,7 @@ func init() { type Room struct { id string hub *Hub - nats NatsClient + events AsyncEvents backend *Backend properties *json.RawMessage @@ -72,34 +71,15 @@ type Room struct { statsRoomSessionsCurrent *prometheus.GaugeVec - natsReceiver chan *nats.Msg - backendSubscription NatsSubscription - // Users currently in the room users []map[string]interface{} - // Timestamps of last NATS backend requests for the different types. - lastNatsRoomRequests map[string]int64 + // Timestamps of last backend requests for the different types. + lastRoomRequests map[string]int64 transientData *TransientData } -func GetSubjectForRoomId(roomId string, backend *Backend) string { - if backend == nil || backend.IsCompat() { - return GetEncodedSubject("room", roomId) - } - - return GetEncodedSubject("room", roomId+"|"+backend.Id()) -} - -func GetSubjectForBackendRoomId(roomId string, backend *Backend) string { - if backend == nil || backend.IsCompat() { - return GetEncodedSubject("backend.room", roomId) - } - - return GetEncodedSubject("backend.room", roomId+"|"+backend.Id()) -} - func getRoomIdForBackend(id string, backend *Backend) string { if id == "" { return "" @@ -108,18 +88,11 @@ func getRoomIdForBackend(id string, backend *Backend) string { return backend.Id() + "|" + id } -func NewRoom(roomId string, properties *json.RawMessage, hub *Hub, n NatsClient, backend *Backend) (*Room, error) { - natsReceiver := make(chan *nats.Msg, 64) - backendSubscription, err := n.Subscribe(GetSubjectForBackendRoomId(roomId, backend), natsReceiver) - if err != nil { - close(natsReceiver) - return nil, err - } - +func NewRoom(roomId string, properties *json.RawMessage, hub *Hub, events AsyncEvents, backend *Backend) (*Room, error) { room := &Room{ id: roomId, hub: hub, - nats: n, + events: events, backend: backend, properties: properties, @@ -138,13 +111,15 @@ func NewRoom(roomId string, properties *json.RawMessage, hub *Hub, n NatsClient, "room": roomId, }), - natsReceiver: natsReceiver, - backendSubscription: backendSubscription, - - lastNatsRoomRequests: make(map[string]int64), + lastRoomRequests: make(map[string]int64), transientData: NewTransientData(), } + + if err := events.RegisterBackendRoomListener(roomId, backend, room); err != nil { + return nil, err + } + go room.run() return room, nil @@ -193,10 +168,6 @@ loop: select { case <-r.closeChan: break loop - case msg := <-r.natsReceiver: - if msg != nil { - r.processNatsMessage(msg) - } case <-ticker.C: r.publishActiveSessions() } @@ -211,16 +182,7 @@ func (r *Room) doClose() { } func (r *Room) unsubscribeBackend() { - if r.backendSubscription == nil { - return - } - - go func(subscription NatsSubscription) { - if err := subscription.Unsubscribe(); err != nil { - log.Printf("Error closing backend subscription for room %s: %s", r.Id(), err) - } - }(r.backendSubscription) - r.backendSubscription = nil + r.events.UnregisterBackendRoomListener(r.id, r.backend, r) } func (r *Room) Close() []Session { @@ -240,33 +202,18 @@ func (r *Room) Close() []Session { return result } -func (r *Room) processNatsMessage(message *nats.Msg) { - var msg NatsMessage - if err := r.nats.Decode(message, &msg); err != nil { - log.Printf("Could not decode nats message %+v, %s", message, err) - return - } - - switch msg.Type { - case "room": - r.processBackendRoomRequest(msg.Room) - default: - log.Printf("Unsupported NATS room request with type %s: %+v", msg.Type, msg) - } -} - -func (r *Room) processBackendRoomRequest(message *BackendServerRoomRequest) { +func (r *Room) ProcessBackendRoomRequest(message *BackendServerRoomRequest) { received := message.ReceivedTime - if last, found := r.lastNatsRoomRequests[message.Type]; found && last > received { + if last, found := r.lastRoomRequests[message.Type]; found && last > received { if msg, err := json.Marshal(message); err == nil { - log.Printf("Ignore old NATS backend room request for %s: %s", r.Id(), string(msg)) + log.Printf("Ignore old backend room request for %s: %s", r.Id(), string(msg)) } else { - log.Printf("Ignore old NATS backend room request for %s: %+v", r.Id(), message) + log.Printf("Ignore old backend room request for %s: %+v", r.Id(), message) } return } - r.lastNatsRoomRequests[message.Type] = received + r.lastRoomRequests[message.Type] = received message.room = r switch message.Type { case "update": @@ -281,7 +228,7 @@ func (r *Room) processBackendRoomRequest(message *BackendServerRoomRequest) { case "message": r.publishRoomMessage(message.Message) default: - log.Printf("Unsupported NATS backend room request with type %s in %s: %+v", message.Type, r.Id(), message) + log.Printf("Unsupported backend room request with type %s in %s: %+v", message.Type, r.Id(), message) } } @@ -394,7 +341,10 @@ func (r *Room) RemoveSession(session Session) bool { } func (r *Room) publish(message *ServerMessage) error { - return r.nats.PublishMessage(GetSubjectForRoomId(r.id, r.backend), message) + return r.events.PublishRoomMessage(r.id, r.backend, &AsyncMessage{ + Type: "message", + Message: message, + }) } func (r *Room) UpdateProperties(properties *json.RawMessage) { diff --git a/room_test.go b/room_test.go index ab7caf4..8b239a7 100644 --- a/room_test.go +++ b/room_test.go @@ -145,7 +145,7 @@ func TestRoom_Update(t *testing.T) { } // The client receives a roomlist update and a changed room event. The - // ordering is not defined because messages are sent by asynchronous NATS + // ordering is not defined because messages are sent by asynchronous event // handlers. message1, err := client.RunUntilMessage(ctx) if err != nil { @@ -178,7 +178,7 @@ func TestRoom_Update(t *testing.T) { } } - // Allow up to 100 milliseconds for NATS processing. + // Allow up to 100 milliseconds for asynchronous event processing. ctx2, cancel2 := context.WithTimeout(ctx, 100*time.Millisecond) defer cancel2() @@ -280,7 +280,7 @@ func TestRoom_Delete(t *testing.T) { } // The client is no longer invited to the room and leaves it. The ordering - // of messages is not defined as they get published through NATS and handled + // of messages is not defined as they get published through events and handled // by asynchronous channels. message1, err := client.RunUntilMessage(ctx) if err != nil { @@ -318,7 +318,7 @@ func TestRoom_Delete(t *testing.T) { } } - // Allow up to 100 milliseconds for NATS processing. + // Allow up to 100 milliseconds for asynchronous event processing. ctx2, cancel2 := context.WithTimeout(ctx, 100*time.Millisecond) defer cancel2() diff --git a/server.conf.in b/server.conf.in index f06b475..7542a55 100644 --- a/server.conf.in +++ b/server.conf.in @@ -109,7 +109,7 @@ connectionsperhost = 8 [nats] # Url of NATS backend to use. This can also be a list of URLs to connect to -# multiple backends. For local development, this can be set to ":loopback:" +# multiple backends. For local development, this can be set to "nats://loopback" # to process NATS messages internally instead of sending them through an # external NATS backend. #url = nats://localhost:4222 diff --git a/server/main.go b/server/main.go index 9db6dcb..46fa2b6 100644 --- a/server/main.go +++ b/server/main.go @@ -148,10 +148,11 @@ func main() { natsUrl = nats.DefaultURL } - nats, err := signaling.NewNatsClient(natsUrl) + events, err := signaling.NewAsyncEvents(natsUrl) if err != nil { - log.Fatal("Could not create NATS client: ", err) + log.Fatal("Could not create async events client: ", err) } + defer events.Close() etcdClient, err := signaling.NewEtcdClient(config, "mcu") if err != nil { @@ -164,7 +165,7 @@ func main() { }() r := mux.NewRouter() - hub, err := signaling.NewHub(config, nats, r, version) + hub, err := signaling.NewHub(config, events, r, version) if err != nil { log.Fatal("Could not create hub: ", err) }