2020-05-12 09:46:20 +02:00
|
|
|
/**
|
|
|
|
* Standalone signaling server for the Nextcloud Spreed app.
|
|
|
|
* Copyright (C) 2017 struktur AG
|
|
|
|
*
|
|
|
|
* @author Joachim Bauch <bauch@struktur.de>
|
|
|
|
*
|
|
|
|
* @license GNU AGPL version 3 or any later version
|
|
|
|
*
|
|
|
|
* This program is free software: you can redistribute it and/or modify
|
|
|
|
* it under the terms of the GNU Affero General Public License as published by
|
|
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
|
|
* (at your option) any later version.
|
|
|
|
*
|
|
|
|
* This program is distributed in the hope that it will be useful,
|
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
* GNU Affero General Public License for more details.
|
|
|
|
*
|
|
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
*/
|
|
|
|
package signaling
|
|
|
|
|
|
|
|
import (
|
2020-08-31 13:58:28 +02:00
|
|
|
"context"
|
2020-05-12 09:46:20 +02:00
|
|
|
"encoding/json"
|
2021-11-08 12:06:59 +01:00
|
|
|
"fmt"
|
2020-05-12 09:46:20 +02:00
|
|
|
"log"
|
|
|
|
"net/url"
|
2020-10-21 16:26:57 +02:00
|
|
|
"strings"
|
2020-05-12 09:46:20 +02:00
|
|
|
"sync"
|
|
|
|
"sync/atomic"
|
|
|
|
"time"
|
|
|
|
"unsafe"
|
|
|
|
|
2021-04-13 16:47:28 +02:00
|
|
|
"github.com/nats-io/nats.go"
|
2021-11-08 12:06:59 +01:00
|
|
|
"github.com/pion/sdp"
|
2020-05-12 09:46:20 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
// Sessions expire 30 seconds after the connection closed.
|
|
|
|
sessionExpireDuration = 30 * time.Second
|
|
|
|
|
|
|
|
// Warn if a session has 32 or more pending messages.
|
|
|
|
warnPendingMessagesCount = 32
|
2020-10-21 16:26:57 +02:00
|
|
|
|
|
|
|
PathToOcsSignalingBackend = "ocs/v2.php/apps/spreed/api/v1/signaling/backend"
|
2020-05-12 09:46:20 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
type ClientSession struct {
|
2021-06-08 09:20:18 +02:00
|
|
|
roomJoinTime int64
|
|
|
|
|
2020-05-12 09:46:20 +02:00
|
|
|
running int32
|
|
|
|
hub *Hub
|
|
|
|
privateId string
|
|
|
|
publicId string
|
|
|
|
data *SessionIdData
|
|
|
|
|
|
|
|
clientType string
|
|
|
|
features []string
|
|
|
|
userId string
|
|
|
|
userData *json.RawMessage
|
|
|
|
|
|
|
|
supportsPermissions bool
|
|
|
|
permissions map[Permission]bool
|
|
|
|
|
2020-07-07 09:44:02 +02:00
|
|
|
backend *Backend
|
2020-05-12 09:46:20 +02:00
|
|
|
backendUrl string
|
|
|
|
parsedBackendUrl *url.URL
|
|
|
|
|
|
|
|
natsReceiver chan *nats.Msg
|
|
|
|
stopRun chan bool
|
|
|
|
runStopped chan bool
|
|
|
|
expires time.Time
|
|
|
|
|
|
|
|
mu sync.Mutex
|
|
|
|
|
|
|
|
client *Client
|
|
|
|
room unsafe.Pointer
|
|
|
|
roomSessionId string
|
|
|
|
|
|
|
|
userSubscription NatsSubscription
|
|
|
|
sessionSubscription NatsSubscription
|
|
|
|
roomSubscription NatsSubscription
|
|
|
|
|
|
|
|
publishers map[string]McuPublisher
|
|
|
|
subscribers map[string]McuSubscriber
|
|
|
|
|
2020-10-21 14:34:53 +02:00
|
|
|
pendingClientMessages []*ServerMessage
|
|
|
|
hasPendingChat bool
|
|
|
|
hasPendingParticipantsUpdate bool
|
2020-10-20 14:29:58 +02:00
|
|
|
|
|
|
|
virtualSessions map[*VirtualSession]bool
|
2020-05-12 09:46:20 +02:00
|
|
|
}
|
|
|
|
|
2020-07-07 09:44:02 +02:00
|
|
|
func NewClientSession(hub *Hub, privateId string, publicId string, data *SessionIdData, backend *Backend, hello *HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) {
|
2020-05-12 09:46:20 +02:00
|
|
|
s := &ClientSession{
|
|
|
|
hub: hub,
|
|
|
|
privateId: privateId,
|
|
|
|
publicId: publicId,
|
|
|
|
data: data,
|
|
|
|
|
|
|
|
clientType: hello.Auth.Type,
|
|
|
|
features: hello.Features,
|
|
|
|
userId: auth.UserId,
|
|
|
|
userData: auth.User,
|
|
|
|
|
2020-10-20 14:29:58 +02:00
|
|
|
backend: backend,
|
2020-05-12 09:46:20 +02:00
|
|
|
|
|
|
|
natsReceiver: make(chan *nats.Msg, 64),
|
|
|
|
stopRun: make(chan bool, 1),
|
|
|
|
runStopped: make(chan bool, 1),
|
|
|
|
}
|
2020-10-20 14:29:58 +02:00
|
|
|
if s.clientType == HelloClientTypeInternal {
|
|
|
|
s.backendUrl = hello.Auth.internalParams.Backend
|
|
|
|
s.parsedBackendUrl = hello.Auth.internalParams.parsedBackend
|
|
|
|
} else {
|
|
|
|
s.backendUrl = hello.Auth.Url
|
|
|
|
s.parsedBackendUrl = hello.Auth.parsedUrl
|
|
|
|
}
|
2020-10-21 16:26:57 +02:00
|
|
|
if !strings.Contains(s.backendUrl, "/ocs/v2.php/") {
|
|
|
|
backendUrl := s.backendUrl
|
|
|
|
if !strings.HasSuffix(backendUrl, "/") {
|
|
|
|
backendUrl += "/"
|
|
|
|
}
|
|
|
|
backendUrl += PathToOcsSignalingBackend
|
|
|
|
if u, err := url.Parse(backendUrl); err != nil {
|
|
|
|
return nil, err
|
|
|
|
} else {
|
2020-11-30 11:42:18 +01:00
|
|
|
if strings.Contains(u.Host, ":") && hasStandardPort(u) {
|
|
|
|
u.Host = u.Hostname()
|
|
|
|
}
|
|
|
|
|
2020-10-21 16:26:57 +02:00
|
|
|
s.backendUrl = backendUrl
|
|
|
|
s.parsedBackendUrl = u
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-05-12 09:46:20 +02:00
|
|
|
if err := s.SubscribeNats(hub.nats); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
atomic.StoreInt32(&s.running, 1)
|
|
|
|
go s.run()
|
|
|
|
return s, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) PrivateId() string {
|
|
|
|
return s.privateId
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) PublicId() string {
|
|
|
|
return s.publicId
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) RoomSessionId() string {
|
|
|
|
return s.roomSessionId
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) Data() *SessionIdData {
|
|
|
|
return s.data
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) ClientType() string {
|
|
|
|
return s.clientType
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) GetFeatures() []string {
|
|
|
|
return s.features
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) HasFeature(feature string) bool {
|
|
|
|
for _, f := range s.features {
|
|
|
|
if f == feature {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2021-09-28 10:46:24 +02:00
|
|
|
// HasPermission checks if the session has the passed permissions.
|
2020-05-12 09:46:20 +02:00
|
|
|
func (s *ClientSession) HasPermission(permission Permission) bool {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
2021-08-05 12:15:26 +02:00
|
|
|
|
|
|
|
return s.hasPermissionLocked(permission)
|
|
|
|
}
|
|
|
|
|
2021-09-28 10:46:24 +02:00
|
|
|
// HasAnyPermission checks if the session has one of the passed permissions.
|
|
|
|
func (s *ClientSession) HasAnyPermission(permission ...Permission) bool {
|
|
|
|
if len(permission) == 0 {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
2021-11-08 12:06:59 +01:00
|
|
|
return s.hasAnyPermissionLocked(permission...)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) hasAnyPermissionLocked(permission ...Permission) bool {
|
|
|
|
if len(permission) == 0 {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2021-09-28 10:46:24 +02:00
|
|
|
for _, p := range permission {
|
|
|
|
if s.hasPermissionLocked(p) {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2021-08-05 12:15:26 +02:00
|
|
|
func (s *ClientSession) hasPermissionLocked(permission Permission) bool {
|
2020-05-12 09:46:20 +02:00
|
|
|
if !s.supportsPermissions {
|
|
|
|
// Old-style session that doesn't receive permissions from Nextcloud.
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
if val, found := s.permissions[permission]; found {
|
|
|
|
return val
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
func permissionsEqual(a, b map[Permission]bool) bool {
|
|
|
|
if a == nil && b == nil {
|
|
|
|
return true
|
|
|
|
} else if a != nil && b == nil {
|
|
|
|
return false
|
|
|
|
} else if a == nil && b != nil {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
if len(a) != len(b) {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
for k, v1 := range a {
|
|
|
|
if v2, found := b[k]; !found || v1 != v2 {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) SetPermissions(permissions []Permission) {
|
|
|
|
var p map[Permission]bool
|
|
|
|
for _, permission := range permissions {
|
|
|
|
if p == nil {
|
|
|
|
p = make(map[Permission]bool)
|
|
|
|
}
|
|
|
|
p[permission] = true
|
|
|
|
}
|
|
|
|
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
if s.supportsPermissions && permissionsEqual(s.permissions, p) {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
s.permissions = p
|
|
|
|
s.supportsPermissions = true
|
|
|
|
log.Printf("Permissions of session %s changed: %s", s.PublicId(), permissions)
|
|
|
|
}
|
|
|
|
|
2020-07-07 09:44:02 +02:00
|
|
|
func (s *ClientSession) Backend() *Backend {
|
|
|
|
return s.backend
|
|
|
|
}
|
|
|
|
|
2020-05-12 09:46:20 +02:00
|
|
|
func (s *ClientSession) BackendUrl() string {
|
|
|
|
return s.backendUrl
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) ParsedBackendUrl() *url.URL {
|
|
|
|
return s.parsedBackendUrl
|
|
|
|
}
|
|
|
|
|
2021-05-19 16:22:15 +02:00
|
|
|
func (s *ClientSession) AuthUserId() string {
|
|
|
|
return s.userId
|
|
|
|
}
|
|
|
|
|
2020-05-12 09:46:20 +02:00
|
|
|
func (s *ClientSession) UserId() string {
|
2021-04-22 14:18:35 +02:00
|
|
|
userId := s.userId
|
|
|
|
if userId == "" {
|
|
|
|
if room := s.GetRoom(); room != nil {
|
|
|
|
if data := room.GetRoomSessionData(s); data != nil {
|
|
|
|
userId = data.UserId
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return userId
|
2020-05-12 09:46:20 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) UserData() *json.RawMessage {
|
|
|
|
return s.userData
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) run() {
|
|
|
|
loop:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case msg := <-s.natsReceiver:
|
|
|
|
s.processClientMessage(msg)
|
|
|
|
case <-s.stopRun:
|
|
|
|
break loop
|
|
|
|
}
|
|
|
|
}
|
|
|
|
s.runStopped <- true
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) StartExpire() {
|
|
|
|
// The hub mutex must be held when calling this method.
|
|
|
|
s.expires = time.Now().Add(sessionExpireDuration)
|
|
|
|
s.hub.expiredSessions[s] = true
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) StopExpire() {
|
|
|
|
// The hub mutex must be held when calling this method.
|
|
|
|
delete(s.hub.expiredSessions, s)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) IsExpired(now time.Time) bool {
|
|
|
|
return now.After(s.expires)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) SetRoom(room *Room) {
|
|
|
|
atomic.StorePointer(&s.room, unsafe.Pointer(room))
|
2021-06-08 09:20:18 +02:00
|
|
|
if room != nil {
|
|
|
|
atomic.StoreInt64(&s.roomJoinTime, time.Now().UnixNano())
|
|
|
|
} else {
|
|
|
|
atomic.StoreInt64(&s.roomJoinTime, 0)
|
|
|
|
}
|
2020-05-12 09:46:20 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) GetRoom() *Room {
|
|
|
|
return (*Room)(atomic.LoadPointer(&s.room))
|
|
|
|
}
|
|
|
|
|
2021-06-08 09:20:18 +02:00
|
|
|
func (s *ClientSession) getRoomJoinTime() time.Time {
|
|
|
|
return time.Unix(0, atomic.LoadInt64(&s.roomJoinTime))
|
|
|
|
}
|
|
|
|
|
2020-05-12 09:46:20 +02:00
|
|
|
func (s *ClientSession) releaseMcuObjects() {
|
|
|
|
if len(s.publishers) > 0 {
|
|
|
|
go func(publishers map[string]McuPublisher) {
|
|
|
|
ctx := context.TODO()
|
|
|
|
for _, publisher := range publishers {
|
|
|
|
publisher.Close(ctx)
|
|
|
|
}
|
|
|
|
}(s.publishers)
|
|
|
|
s.publishers = nil
|
|
|
|
}
|
|
|
|
if len(s.subscribers) > 0 {
|
|
|
|
go func(subscribers map[string]McuSubscriber) {
|
|
|
|
ctx := context.TODO()
|
|
|
|
for _, subscriber := range subscribers {
|
|
|
|
subscriber.Close(ctx)
|
|
|
|
}
|
|
|
|
}(s.subscribers)
|
|
|
|
s.subscribers = nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) Close() {
|
|
|
|
s.closeAndWait(true)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) closeAndWait(wait bool) {
|
|
|
|
s.hub.removeSession(s)
|
|
|
|
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
if s.userSubscription != nil {
|
2021-04-26 17:19:39 +02:00
|
|
|
if err := s.userSubscription.Unsubscribe(); err != nil {
|
|
|
|
log.Printf("Error closing user subscription in session %s: %s", s.PublicId(), err)
|
|
|
|
}
|
2020-05-12 09:46:20 +02:00
|
|
|
s.userSubscription = nil
|
|
|
|
}
|
|
|
|
if s.sessionSubscription != nil {
|
2021-04-26 17:19:39 +02:00
|
|
|
if err := s.sessionSubscription.Unsubscribe(); err != nil {
|
|
|
|
log.Printf("Error closing session subscription in session %s: %s", s.PublicId(), err)
|
|
|
|
}
|
2020-05-12 09:46:20 +02:00
|
|
|
s.sessionSubscription = nil
|
|
|
|
}
|
2020-10-20 14:29:58 +02:00
|
|
|
go func(virtualSessions map[*VirtualSession]bool) {
|
2021-04-21 14:12:11 +02:00
|
|
|
for session := range virtualSessions {
|
2020-10-20 14:29:58 +02:00
|
|
|
session.Close()
|
|
|
|
}
|
|
|
|
}(s.virtualSessions)
|
|
|
|
s.virtualSessions = nil
|
2020-05-12 09:46:20 +02:00
|
|
|
s.releaseMcuObjects()
|
|
|
|
s.clearClientLocked(nil)
|
2020-12-17 14:56:45 +01:00
|
|
|
s.backend.RemoveSession(s)
|
2020-05-12 09:46:20 +02:00
|
|
|
if atomic.CompareAndSwapInt32(&s.running, 1, 0) {
|
|
|
|
s.stopRun <- true
|
|
|
|
// Only wait if called from outside the Session goroutine.
|
|
|
|
if wait {
|
|
|
|
s.mu.Unlock()
|
|
|
|
// Wait for Session goroutine to stop
|
|
|
|
<-s.runStopped
|
|
|
|
s.mu.Lock()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-07-07 09:44:02 +02:00
|
|
|
func GetSubjectForUserId(userId string, backend *Backend) string {
|
|
|
|
if backend == nil || backend.IsCompat() {
|
|
|
|
return GetEncodedSubject("user", userId)
|
|
|
|
} else {
|
|
|
|
return GetEncodedSubject("user", userId+"|"+backend.Id())
|
|
|
|
}
|
2020-05-12 09:46:20 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) SubscribeNats(n NatsClient) error {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
var err error
|
|
|
|
if s.userId != "" {
|
2020-07-07 09:44:02 +02:00
|
|
|
if s.userSubscription, err = n.Subscribe(GetSubjectForUserId(s.userId, s.backend), s.natsReceiver); err != nil {
|
2020-05-12 09:46:20 +02:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if s.sessionSubscription, err = n.Subscribe("session."+s.publicId, s.natsReceiver); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) SubscribeRoomNats(n NatsClient, roomid string, roomSessionId string) error {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
var err error
|
2020-07-07 09:44:02 +02:00
|
|
|
if s.roomSubscription, err = n.Subscribe(GetSubjectForRoomId(roomid, s.Backend()), s.natsReceiver); err != nil {
|
2020-05-12 09:46:20 +02:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if roomSessionId != "" {
|
|
|
|
if err = s.hub.roomSessions.SetRoomSession(s, roomSessionId); err != nil {
|
|
|
|
s.doUnsubscribeRoomNats(true)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2021-06-04 16:42:17 +02:00
|
|
|
log.Printf("Session %s joined room %s with room session id %s", s.PublicId(), roomid, roomSessionId)
|
2020-05-12 09:46:20 +02:00
|
|
|
s.roomSessionId = roomSessionId
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) LeaveCall() {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
room := s.GetRoom()
|
|
|
|
if room == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2021-06-04 16:42:17 +02:00
|
|
|
log.Printf("Session %s left call %s", s.PublicId(), room.Id())
|
2020-05-12 09:46:20 +02:00
|
|
|
s.releaseMcuObjects()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) LeaveRoom(notify bool) *Room {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
room := s.GetRoom()
|
|
|
|
if room == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
s.doUnsubscribeRoomNats(notify)
|
|
|
|
s.SetRoom(nil)
|
|
|
|
s.releaseMcuObjects()
|
|
|
|
room.RemoveSession(s)
|
|
|
|
return room
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) UnsubscribeRoomNats() {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
s.doUnsubscribeRoomNats(true)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) doUnsubscribeRoomNats(notify bool) {
|
|
|
|
if s.roomSubscription != nil {
|
2021-04-26 17:19:39 +02:00
|
|
|
if err := s.roomSubscription.Unsubscribe(); err != nil {
|
|
|
|
log.Printf("Error closing room subscription in session %s: %s", s.PublicId(), err)
|
|
|
|
}
|
2020-05-12 09:46:20 +02:00
|
|
|
s.roomSubscription = nil
|
|
|
|
}
|
|
|
|
s.hub.roomSessions.DeleteRoomSession(s)
|
|
|
|
room := s.GetRoom()
|
|
|
|
if notify && room != nil && s.roomSessionId != "" {
|
|
|
|
// Notify
|
|
|
|
go func(sid string) {
|
|
|
|
ctx := context.Background()
|
2021-05-19 16:22:15 +02:00
|
|
|
request := NewBackendClientRoomRequest(room.Id(), s.userId, sid)
|
2020-05-12 09:46:20 +02:00
|
|
|
request.Room.Action = "leave"
|
|
|
|
var response map[string]interface{}
|
|
|
|
if err := s.hub.backend.PerformJSONRequest(ctx, s.ParsedBackendUrl(), request, &response); err != nil {
|
|
|
|
log.Printf("Could not notify about room session %s left room %s: %s", sid, room.Id(), err)
|
|
|
|
} else {
|
|
|
|
log.Printf("Removed room session %s: %+v", sid, response)
|
|
|
|
}
|
|
|
|
}(s.roomSessionId)
|
|
|
|
}
|
|
|
|
s.roomSessionId = ""
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) ClearClient(client *Client) {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
s.clearClientLocked(client)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) clearClientLocked(client *Client) {
|
|
|
|
if s.client == nil {
|
|
|
|
return
|
|
|
|
} else if client != nil && s.client != client {
|
|
|
|
log.Printf("Trying to clear other client in session %s", s.PublicId())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
prevClient := s.client
|
|
|
|
s.client = nil
|
|
|
|
prevClient.SetSession(nil)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) GetClient() *Client {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
2020-08-12 15:42:11 +02:00
|
|
|
return s.getClientUnlocked()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) getClientUnlocked() *Client {
|
2020-05-12 09:46:20 +02:00
|
|
|
return s.client
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) SetClient(client *Client) *Client {
|
|
|
|
if client == nil {
|
|
|
|
panic("Use ClearClient to set the client to nil")
|
|
|
|
}
|
|
|
|
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
if client == s.client {
|
|
|
|
// No change
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
client.SetSession(s)
|
|
|
|
prev := s.client
|
|
|
|
if prev != nil {
|
|
|
|
s.clearClientLocked(prev)
|
|
|
|
}
|
|
|
|
s.client = client
|
|
|
|
return prev
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) sendCandidate(client McuClient, sender string, streamType string, candidate interface{}) {
|
|
|
|
candidate_message := &AnswerOfferMessage{
|
|
|
|
To: s.PublicId(),
|
|
|
|
From: sender,
|
|
|
|
Type: "candidate",
|
|
|
|
RoomType: streamType,
|
|
|
|
Payload: map[string]interface{}{
|
|
|
|
"candidate": candidate,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
candidate_data, err := json.Marshal(candidate_message)
|
|
|
|
if err != nil {
|
|
|
|
log.Println("Could not serialize candidate", candidate_message, err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
response_message := &ServerMessage{
|
|
|
|
Type: "message",
|
|
|
|
Message: &MessageServerMessage{
|
|
|
|
Sender: &MessageServerMessageSender{
|
|
|
|
Type: "session",
|
|
|
|
SessionId: sender,
|
|
|
|
},
|
|
|
|
Data: (*json.RawMessage)(&candidate_data),
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2020-10-21 14:16:30 +02:00
|
|
|
s.sendMessageUnlocked(response_message)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) sendMessageUnlocked(message *ServerMessage) bool {
|
|
|
|
if c := s.getClientUnlocked(); c != nil {
|
|
|
|
if c.SendMessage(message) {
|
|
|
|
return true
|
|
|
|
}
|
2020-05-12 09:46:20 +02:00
|
|
|
}
|
2020-10-21 14:16:30 +02:00
|
|
|
|
|
|
|
s.storePendingMessage(message)
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2021-05-31 16:40:21 +02:00
|
|
|
func (s *ClientSession) SendError(e *Error) bool {
|
|
|
|
message := &ServerMessage{
|
|
|
|
Type: "error",
|
|
|
|
Error: e,
|
|
|
|
}
|
|
|
|
return s.SendMessage(message)
|
|
|
|
}
|
|
|
|
|
2020-10-21 14:16:30 +02:00
|
|
|
func (s *ClientSession) SendMessage(message *ServerMessage) bool {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
return s.sendMessageUnlocked(message)
|
2020-05-12 09:46:20 +02:00
|
|
|
}
|
|
|
|
|
2020-10-21 14:34:53 +02:00
|
|
|
func (s *ClientSession) SendMessages(messages []*ServerMessage) bool {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
for _, message := range messages {
|
|
|
|
s.sendMessageUnlocked(message)
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2020-05-12 09:46:20 +02:00
|
|
|
func (s *ClientSession) OnIceCandidate(client McuClient, candidate interface{}) {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
for _, sub := range s.subscribers {
|
|
|
|
if sub.Id() == client.Id() {
|
|
|
|
s.sendCandidate(client, sub.Publisher(), client.StreamType(), candidate)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, pub := range s.publishers {
|
|
|
|
if pub.Id() == client.Id() {
|
|
|
|
s.sendCandidate(client, s.PublicId(), client.StreamType(), candidate)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Printf("Session %s received candidate %+v for unknown client %s", s.PublicId(), candidate, client.Id())
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) OnIceCompleted(client McuClient) {
|
|
|
|
// TODO(jojo): This causes a JavaScript error when creating a candidate from "null".
|
|
|
|
// Figure out a better way to signal this.
|
|
|
|
|
|
|
|
// An empty candidate signals the end of candidates.
|
|
|
|
// s.OnIceCandidate(client, nil)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) PublisherClosed(publisher McuPublisher) {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
for id, p := range s.publishers {
|
|
|
|
if p == publisher {
|
|
|
|
delete(s.publishers, id)
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) SubscriberClosed(subscriber McuSubscriber) {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
for id, sub := range s.subscribers {
|
|
|
|
if sub == subscriber {
|
|
|
|
delete(s.subscribers, id)
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-08 12:06:59 +01:00
|
|
|
type SdpError struct {
|
|
|
|
message string
|
|
|
|
}
|
|
|
|
|
|
|
|
func (e *SdpError) Error() string {
|
|
|
|
return e.message
|
|
|
|
}
|
|
|
|
|
|
|
|
type WrappedSdpError struct {
|
|
|
|
SdpError
|
|
|
|
err error
|
|
|
|
}
|
|
|
|
|
|
|
|
func (e *WrappedSdpError) Unwrap() error {
|
|
|
|
return e.err
|
|
|
|
}
|
|
|
|
|
|
|
|
type PermissionError struct {
|
|
|
|
permission Permission
|
|
|
|
}
|
|
|
|
|
|
|
|
func (e *PermissionError) Permission() Permission {
|
|
|
|
return e.permission
|
|
|
|
}
|
|
|
|
|
|
|
|
func (e *PermissionError) Error() string {
|
|
|
|
return fmt.Sprintf("permission \"%s\" not found", e.permission)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) isSdpAllowedToSendLocked(payload map[string]interface{}) (MediaType, error) {
|
|
|
|
sdpValue, found := payload["sdp"]
|
|
|
|
if !found {
|
|
|
|
return 0, &SdpError{"payload does not contain a sdp"}
|
|
|
|
}
|
|
|
|
sdpText, ok := sdpValue.(string)
|
|
|
|
if !ok {
|
|
|
|
return 0, &SdpError{"payload does not contain a valid sdp"}
|
|
|
|
}
|
|
|
|
var sdp sdp.SessionDescription
|
|
|
|
if err := sdp.Unmarshal(sdpText); err != nil {
|
|
|
|
return 0, &WrappedSdpError{
|
|
|
|
SdpError: SdpError{
|
|
|
|
message: fmt.Sprintf("could not parse sdp: %s", err),
|
|
|
|
},
|
|
|
|
err: err,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
var mediaTypes MediaType
|
|
|
|
mayPublishMedia := s.hasPermissionLocked(PERMISSION_MAY_PUBLISH_MEDIA)
|
|
|
|
for _, md := range sdp.MediaDescriptions {
|
|
|
|
switch md.MediaName.Media {
|
|
|
|
case "audio":
|
|
|
|
if !mayPublishMedia && !s.hasPermissionLocked(PERMISSION_MAY_PUBLISH_AUDIO) {
|
|
|
|
return 0, &PermissionError{PERMISSION_MAY_PUBLISH_AUDIO}
|
|
|
|
}
|
|
|
|
|
|
|
|
mediaTypes |= MediaTypeAudio
|
|
|
|
case "video":
|
|
|
|
if !mayPublishMedia && !s.hasPermissionLocked(PERMISSION_MAY_PUBLISH_VIDEO) {
|
|
|
|
return 0, &PermissionError{PERMISSION_MAY_PUBLISH_VIDEO}
|
|
|
|
}
|
|
|
|
|
|
|
|
mediaTypes |= MediaTypeVideo
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return mediaTypes, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) IsAllowedToSend(data *MessageClientMessageData) error {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
if data != nil && data.RoomType == "screen" {
|
|
|
|
if s.hasPermissionLocked(PERMISSION_MAY_PUBLISH_SCREEN) {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return &PermissionError{PERMISSION_MAY_PUBLISH_SCREEN}
|
|
|
|
} else if s.hasPermissionLocked(PERMISSION_MAY_PUBLISH_MEDIA) {
|
|
|
|
// Client is allowed to publish any media (audio / video).
|
|
|
|
return nil
|
|
|
|
} else if data != nil && data.Type == "offer" {
|
|
|
|
// Parse SDP to check what user is trying to publish and check permissions accordingly.
|
|
|
|
if _, err := s.isSdpAllowedToSendLocked(data.Payload); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
} else {
|
|
|
|
// Candidate or unknown event, check if client is allowed to publish any media.
|
|
|
|
if s.hasAnyPermissionLocked(PERMISSION_MAY_PUBLISH_AUDIO, PERMISSION_MAY_PUBLISH_VIDEO) {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return fmt.Errorf("permission check failed")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) CheckOfferType(streamType string, data *MessageClientMessageData) (MediaType, error) {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
return s.checkOfferTypeLocked(streamType, data)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) checkOfferTypeLocked(streamType string, data *MessageClientMessageData) (MediaType, error) {
|
|
|
|
if streamType == streamTypeScreen {
|
|
|
|
if !s.hasPermissionLocked(PERMISSION_MAY_PUBLISH_SCREEN) {
|
|
|
|
return 0, &PermissionError{PERMISSION_MAY_PUBLISH_SCREEN}
|
|
|
|
}
|
|
|
|
|
|
|
|
return MediaTypeScreen, nil
|
|
|
|
} else if data != nil && data.Type == "offer" {
|
|
|
|
mediaTypes, err := s.isSdpAllowedToSendLocked(data.Payload)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return mediaTypes, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return 0, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, streamType string, data *MessageClientMessageData) (McuPublisher, error) {
|
2020-05-12 09:46:20 +02:00
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
2021-11-08 12:06:59 +01:00
|
|
|
mediaTypes, err := s.checkOfferTypeLocked(streamType, data)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2020-05-12 09:46:20 +02:00
|
|
|
publisher, found := s.publishers[streamType]
|
|
|
|
if !found {
|
2020-08-12 15:42:11 +02:00
|
|
|
client := s.getClientUnlocked()
|
2020-05-12 09:46:20 +02:00
|
|
|
s.mu.Unlock()
|
2021-01-21 14:39:33 +01:00
|
|
|
|
|
|
|
var bitrate int
|
|
|
|
if backend := s.Backend(); backend != nil {
|
|
|
|
if streamType == streamTypeScreen {
|
|
|
|
bitrate = backend.maxScreenBitrate
|
|
|
|
} else {
|
|
|
|
bitrate = backend.maxStreamBitrate
|
|
|
|
}
|
|
|
|
}
|
2020-05-12 09:46:20 +02:00
|
|
|
var err error
|
2021-11-08 12:06:59 +01:00
|
|
|
publisher, err = mcu.NewPublisher(ctx, s, s.PublicId(), streamType, bitrate, mediaTypes, client)
|
2020-05-12 09:46:20 +02:00
|
|
|
s.mu.Lock()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if s.publishers == nil {
|
|
|
|
s.publishers = make(map[string]McuPublisher)
|
|
|
|
}
|
|
|
|
if prev, found := s.publishers[streamType]; found {
|
|
|
|
// Another thread created the publisher while we were waiting.
|
|
|
|
go func(pub McuPublisher) {
|
|
|
|
closeCtx := context.TODO()
|
|
|
|
pub.Close(closeCtx)
|
|
|
|
}(publisher)
|
|
|
|
publisher = prev
|
|
|
|
} else {
|
|
|
|
s.publishers[streamType] = publisher
|
|
|
|
}
|
|
|
|
log.Printf("Publishing %s as %s for session %s", streamType, publisher.Id(), s.PublicId())
|
|
|
|
}
|
|
|
|
|
|
|
|
return publisher, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) GetPublisher(streamType string) McuPublisher {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
return s.publishers[streamType]
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) GetOrCreateSubscriber(ctx context.Context, mcu Mcu, id string, streamType string) (McuSubscriber, error) {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
// TODO(jojo): Add method to remove subscribers.
|
|
|
|
|
|
|
|
subscriber, found := s.subscribers[id+"|"+streamType]
|
|
|
|
if !found {
|
|
|
|
s.mu.Unlock()
|
|
|
|
var err error
|
|
|
|
subscriber, err = mcu.NewSubscriber(ctx, s, id, streamType)
|
|
|
|
s.mu.Lock()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if s.subscribers == nil {
|
|
|
|
s.subscribers = make(map[string]McuSubscriber)
|
|
|
|
}
|
|
|
|
if prev, found := s.subscribers[id+"|"+streamType]; found {
|
|
|
|
// Another thread created the subscriber while we were waiting.
|
|
|
|
go func(sub McuSubscriber) {
|
|
|
|
closeCtx := context.TODO()
|
|
|
|
sub.Close(closeCtx)
|
|
|
|
}(subscriber)
|
|
|
|
subscriber = prev
|
|
|
|
} else {
|
|
|
|
s.subscribers[id+"|"+streamType] = subscriber
|
|
|
|
}
|
|
|
|
log.Printf("Subscribing %s from %s as %s in session %s", streamType, id, subscriber.Id(), s.PublicId())
|
|
|
|
}
|
|
|
|
|
|
|
|
return subscriber, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) GetSubscriber(id string, streamType string) McuSubscriber {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
return s.subscribers[id+"|"+streamType]
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) processClientMessage(msg *nats.Msg) {
|
|
|
|
var message NatsMessage
|
|
|
|
if err := s.hub.nats.Decode(msg, &message); err != nil {
|
|
|
|
log.Printf("Could not decode NATS message %+v for session %s: %s", *msg, s.PublicId(), err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
switch message.Type {
|
|
|
|
case "permissions":
|
|
|
|
s.SetPermissions(message.Permissions)
|
2021-08-05 12:15:26 +02:00
|
|
|
go func() {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
if !s.hasPermissionLocked(PERMISSION_MAY_PUBLISH_MEDIA) {
|
|
|
|
if publisher, found := s.publishers[streamTypeVideo]; found {
|
2021-11-08 12:06:59 +01:00
|
|
|
if (publisher.HasMedia(MediaTypeAudio) && !s.hasPermissionLocked(PERMISSION_MAY_PUBLISH_AUDIO)) ||
|
|
|
|
(publisher.HasMedia(MediaTypeVideo) && !s.hasPermissionLocked(PERMISSION_MAY_PUBLISH_VIDEO)) {
|
|
|
|
delete(s.publishers, streamTypeVideo)
|
|
|
|
log.Printf("Session %s is no longer allowed to publish media, closing publisher %s", s.PublicId(), publisher.Id())
|
|
|
|
go func() {
|
|
|
|
publisher.Close(context.Background())
|
|
|
|
}()
|
|
|
|
return
|
|
|
|
}
|
2021-08-05 12:15:26 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if !s.hasPermissionLocked(PERMISSION_MAY_PUBLISH_SCREEN) {
|
|
|
|
if publisher, found := s.publishers[streamTypeScreen]; found {
|
|
|
|
delete(s.publishers, streamTypeScreen)
|
|
|
|
log.Printf("Session %s is no longer allowed to publish screen, closing publisher %s", s.PublicId(), publisher.Id())
|
|
|
|
go func() {
|
|
|
|
publisher.Close(context.Background())
|
|
|
|
}()
|
2021-11-08 12:06:59 +01:00
|
|
|
return
|
2021-08-05 12:15:26 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
2020-05-12 09:46:20 +02:00
|
|
|
return
|
|
|
|
case "message":
|
|
|
|
if message.Message.Type == "bye" && message.Message.Bye.Reason == "room_session_reconnected" {
|
|
|
|
s.mu.Lock()
|
|
|
|
roomSessionId := s.RoomSessionId()
|
|
|
|
s.mu.Unlock()
|
|
|
|
log.Printf("Closing session %s because same room session %s connected", s.PublicId(), roomSessionId)
|
|
|
|
s.LeaveRoom(false)
|
|
|
|
defer s.closeAndWait(false)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-10-21 14:16:30 +02:00
|
|
|
serverMessage := s.processNatsMessage(&message)
|
|
|
|
if serverMessage == nil {
|
2020-05-12 09:46:20 +02:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-10-21 14:16:30 +02:00
|
|
|
s.SendMessage(serverMessage)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) storePendingMessage(message *ServerMessage) {
|
2020-10-21 14:34:53 +02:00
|
|
|
if message.IsChatRefresh() {
|
|
|
|
if s.hasPendingChat {
|
|
|
|
// Only send a single "chat-refresh" message on resume.
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
s.hasPendingChat = true
|
|
|
|
}
|
|
|
|
if !s.hasPendingParticipantsUpdate && message.IsParticipantsUpdate() {
|
|
|
|
s.hasPendingParticipantsUpdate = true
|
|
|
|
}
|
2020-10-21 14:16:30 +02:00
|
|
|
s.pendingClientMessages = append(s.pendingClientMessages, message)
|
|
|
|
if len(s.pendingClientMessages) >= warnPendingMessagesCount {
|
|
|
|
log.Printf("Session %s has %d pending messages", s.PublicId(), len(s.pendingClientMessages))
|
|
|
|
}
|
2020-08-03 14:28:00 +02:00
|
|
|
}
|
|
|
|
|
2020-10-21 14:16:30 +02:00
|
|
|
func (s *ClientSession) processNatsMessage(msg *NatsMessage) *ServerMessage {
|
2020-08-03 14:28:00 +02:00
|
|
|
switch msg.Type {
|
|
|
|
case "message":
|
|
|
|
if msg.Message == nil {
|
2021-06-04 16:42:17 +02:00
|
|
|
log.Printf("Received NATS message without payload: %+v", msg)
|
2020-10-21 14:16:30 +02:00
|
|
|
return nil
|
2020-08-03 14:28:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
switch msg.Message.Type {
|
|
|
|
case "message":
|
|
|
|
if msg.Message.Message != nil &&
|
|
|
|
msg.Message.Message.Sender != nil &&
|
|
|
|
msg.Message.Message.Sender.SessionId == s.PublicId() {
|
|
|
|
// Don't send message back to sender (can happen if sent to user or room)
|
2020-10-21 14:16:30 +02:00
|
|
|
return nil
|
2020-08-03 14:28:00 +02:00
|
|
|
}
|
|
|
|
case "control":
|
|
|
|
if msg.Message.Control != nil &&
|
|
|
|
msg.Message.Control.Sender != nil &&
|
|
|
|
msg.Message.Control.Sender.SessionId == s.PublicId() {
|
|
|
|
// Don't send message back to sender (can happen if sent to user or room)
|
2020-10-21 14:16:30 +02:00
|
|
|
return nil
|
2020-08-03 14:28:00 +02:00
|
|
|
}
|
|
|
|
case "event":
|
|
|
|
if msg.Message.Event.Target == "participants" &&
|
|
|
|
msg.Message.Event.Type == "update" {
|
|
|
|
m := msg.Message.Event.Update
|
|
|
|
users := make(map[string]bool)
|
|
|
|
for _, entry := range m.Users {
|
|
|
|
users[entry["sessionId"].(string)] = true
|
|
|
|
}
|
|
|
|
for _, entry := range m.Changed {
|
|
|
|
if users[entry["sessionId"].(string)] {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
m.Users = append(m.Users, entry)
|
|
|
|
}
|
|
|
|
// TODO(jojo): Only send all users if current session id has
|
|
|
|
// changed its "inCall" flag to true.
|
|
|
|
m.Changed = nil
|
2021-06-08 09:20:18 +02:00
|
|
|
} else if msg.Message.Event.Target == "room" {
|
|
|
|
// Can happen mostly during tests where an older room NATS message
|
|
|
|
// could be received by a subscriber that joined after it was sent.
|
|
|
|
if msg.SendTime.Before(s.getRoomJoinTime()) {
|
|
|
|
log.Printf("Message %+v was sent before room was joined, ignoring", msg.Message)
|
|
|
|
return nil
|
|
|
|
}
|
2020-08-03 14:28:00 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-10-21 14:16:30 +02:00
|
|
|
return msg.Message
|
2020-08-03 14:28:00 +02:00
|
|
|
default:
|
2021-06-04 16:42:17 +02:00
|
|
|
log.Printf("Received NATS message with unsupported type %s: %+v", msg.Type, msg)
|
2020-10-21 14:16:30 +02:00
|
|
|
return nil
|
2020-08-03 14:28:00 +02:00
|
|
|
}
|
2020-05-12 09:46:20 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) NotifySessionResumed(client *Client) {
|
|
|
|
s.mu.Lock()
|
|
|
|
if len(s.pendingClientMessages) == 0 {
|
|
|
|
s.mu.Unlock()
|
|
|
|
if room := s.GetRoom(); room != nil {
|
2021-05-31 16:40:21 +02:00
|
|
|
room.NotifySessionResumed(s)
|
2020-05-12 09:46:20 +02:00
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-10-21 14:34:53 +02:00
|
|
|
messages := s.pendingClientMessages
|
|
|
|
hasPendingParticipantsUpdate := s.hasPendingParticipantsUpdate
|
2020-05-12 09:46:20 +02:00
|
|
|
s.pendingClientMessages = nil
|
2020-10-21 14:34:53 +02:00
|
|
|
s.hasPendingChat = false
|
|
|
|
s.hasPendingParticipantsUpdate = false
|
2020-05-12 09:46:20 +02:00
|
|
|
s.mu.Unlock()
|
|
|
|
|
|
|
|
log.Printf("Send %d pending messages to session %s", len(messages), s.PublicId())
|
2020-10-21 14:34:53 +02:00
|
|
|
// Send through session to handle connection interruptions.
|
|
|
|
s.SendMessages(messages)
|
2020-05-12 09:46:20 +02:00
|
|
|
|
2020-10-21 14:34:53 +02:00
|
|
|
if !hasPendingParticipantsUpdate {
|
2020-05-12 09:46:20 +02:00
|
|
|
// Only need to send initial participants list update if none was part of the pending messages.
|
|
|
|
if room := s.GetRoom(); room != nil {
|
2021-05-31 16:40:21 +02:00
|
|
|
room.NotifySessionResumed(s)
|
2020-05-12 09:46:20 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-10-20 14:29:58 +02:00
|
|
|
|
|
|
|
func (s *ClientSession) AddVirtualSession(session *VirtualSession) {
|
|
|
|
s.mu.Lock()
|
|
|
|
if s.virtualSessions == nil {
|
|
|
|
s.virtualSessions = make(map[*VirtualSession]bool)
|
|
|
|
}
|
|
|
|
s.virtualSessions[session] = true
|
|
|
|
s.mu.Unlock()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientSession) RemoveVirtualSession(session *VirtualSession) {
|
|
|
|
s.mu.Lock()
|
|
|
|
delete(s.virtualSessions, session)
|
|
|
|
s.mu.Unlock()
|
|
|
|
}
|