mirror of
https://github.com/strukturag/nextcloud-spreed-signaling
synced 2024-06-15 20:25:12 +02:00
Merge pull request #111 from strukturag/async-client-processing
Make client message processing asynchronous.
This commit is contained in:
commit
9a921a6572
35
client.go
35
client.go
|
@ -101,6 +101,8 @@ type Client struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
|
||||||
closeChan chan bool
|
closeChan chan bool
|
||||||
|
messagesDone sync.WaitGroup
|
||||||
|
messageChan chan *bytes.Buffer
|
||||||
|
|
||||||
OnLookupCountry func(*Client) string
|
OnLookupCountry func(*Client) string
|
||||||
OnClosed func(*Client)
|
OnClosed func(*Client)
|
||||||
|
@ -121,7 +123,9 @@ func NewClient(conn *websocket.Conn, remoteAddress string, agent string) (*Clien
|
||||||
conn: conn,
|
conn: conn,
|
||||||
addr: remoteAddress,
|
addr: remoteAddress,
|
||||||
agent: agent,
|
agent: agent,
|
||||||
|
|
||||||
closeChan: make(chan bool, 1),
|
closeChan: make(chan bool, 1),
|
||||||
|
messageChan: make(chan *bytes.Buffer, 16),
|
||||||
|
|
||||||
OnLookupCountry: func(client *Client) string { return unknownCountry },
|
OnLookupCountry: func(client *Client) string { return unknownCountry },
|
||||||
OnClosed: func(client *Client) {},
|
OnClosed: func(client *Client) {},
|
||||||
|
@ -135,6 +139,7 @@ func (c *Client) SetConn(conn *websocket.Conn, remoteAddress string) {
|
||||||
c.conn = conn
|
c.conn = conn
|
||||||
c.addr = remoteAddress
|
c.addr = remoteAddress
|
||||||
c.closeChan = make(chan bool, 1)
|
c.closeChan = make(chan bool, 1)
|
||||||
|
c.messageChan = make(chan *bytes.Buffer, 16)
|
||||||
c.OnLookupCountry = func(client *Client) string { return unknownCountry }
|
c.OnLookupCountry = func(client *Client) string { return unknownCountry }
|
||||||
c.OnClosed = func(client *Client) {}
|
c.OnClosed = func(client *Client) {}
|
||||||
c.OnMessageReceived = func(client *Client, data []byte) {}
|
c.OnMessageReceived = func(client *Client, data []byte) {}
|
||||||
|
@ -179,6 +184,8 @@ func (c *Client) Close() {
|
||||||
}
|
}
|
||||||
|
|
||||||
c.closeChan <- true
|
c.closeChan <- true
|
||||||
|
c.messagesDone.Wait()
|
||||||
|
close(c.messageChan)
|
||||||
|
|
||||||
c.OnClosed(c)
|
c.OnClosed(c)
|
||||||
c.SetSession(nil)
|
c.SetSession(nil)
|
||||||
|
@ -255,8 +262,8 @@ func (c *Client) ReadPump() {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
decodeBuffer := bufferPool.Get().(*bytes.Buffer)
|
go c.processMessages()
|
||||||
defer bufferPool.Put(decodeBuffer)
|
|
||||||
for {
|
for {
|
||||||
conn.SetReadDeadline(time.Now().Add(pongWait)) // nolint
|
conn.SetReadDeadline(time.Now().Add(pongWait)) // nolint
|
||||||
messageType, reader, err := conn.NextReader()
|
messageType, reader, err := conn.NextReader()
|
||||||
|
@ -284,8 +291,10 @@ func (c *Client) ReadPump() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
decodeBuffer := bufferPool.Get().(*bytes.Buffer)
|
||||||
decodeBuffer.Reset()
|
decodeBuffer.Reset()
|
||||||
if _, err := decodeBuffer.ReadFrom(reader); err != nil {
|
if _, err := decodeBuffer.ReadFrom(reader); err != nil {
|
||||||
|
bufferPool.Put(decodeBuffer)
|
||||||
if session := c.GetSession(); session != nil {
|
if session := c.GetSession(); session != nil {
|
||||||
log.Printf("Error reading message from client %s: %v", session.PublicId(), err)
|
log.Printf("Error reading message from client %s: %v", session.PublicId(), err)
|
||||||
} else {
|
} else {
|
||||||
|
@ -294,7 +303,27 @@ func (c *Client) ReadPump() {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
c.OnMessageReceived(c, decodeBuffer.Bytes())
|
// Stop processing if the client was closed.
|
||||||
|
if atomic.LoadUint32(&c.closed) == 1 {
|
||||||
|
bufferPool.Put(decodeBuffer)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
c.messagesDone.Add(1)
|
||||||
|
c.messageChan <- decodeBuffer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) processMessages() {
|
||||||
|
for {
|
||||||
|
buffer := <-c.messageChan
|
||||||
|
if buffer == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
c.OnMessageReceived(c, buffer.Bytes())
|
||||||
|
c.messagesDone.Done()
|
||||||
|
bufferPool.Put(buffer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -559,6 +559,14 @@ func (s *ClientSession) sendMessageUnlocked(message *ServerMessage) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *ClientSession) SendError(e *Error) bool {
|
||||||
|
message := &ServerMessage{
|
||||||
|
Type: "error",
|
||||||
|
Error: e,
|
||||||
|
}
|
||||||
|
return s.SendMessage(message)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *ClientSession) SendMessage(message *ServerMessage) bool {
|
func (s *ClientSession) SendMessage(message *ServerMessage) bool {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
@ -822,7 +830,7 @@ func (s *ClientSession) NotifySessionResumed(client *Client) {
|
||||||
if len(s.pendingClientMessages) == 0 {
|
if len(s.pendingClientMessages) == 0 {
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
if room := s.GetRoom(); room != nil {
|
if room := s.GetRoom(); room != nil {
|
||||||
room.NotifySessionResumed(client)
|
room.NotifySessionResumed(s)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -841,7 +849,7 @@ func (s *ClientSession) NotifySessionResumed(client *Client) {
|
||||||
if !hasPendingParticipantsUpdate {
|
if !hasPendingParticipantsUpdate {
|
||||||
// Only need to send initial participants list update if none was part of the pending messages.
|
// Only need to send initial participants list update if none was part of the pending messages.
|
||||||
if room := s.GetRoom(); room != nil {
|
if room := s.GetRoom(); room != nil {
|
||||||
room.NotifySessionResumed(client)
|
room.NotifySessionResumed(s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
100
hub.go
100
hub.go
|
@ -740,7 +740,7 @@ func (h *Hub) processRegister(client *Client, message *ClientMessage, backend *B
|
||||||
|
|
||||||
h.setDecodedSessionId(privateSessionId, privateSessionName, sessionIdData)
|
h.setDecodedSessionId(privateSessionId, privateSessionName, sessionIdData)
|
||||||
h.setDecodedSessionId(publicSessionId, publicSessionName, sessionIdData)
|
h.setDecodedSessionId(publicSessionId, publicSessionName, sessionIdData)
|
||||||
h.sendHelloResponse(client, message, session)
|
h.sendHelloResponse(session, message)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Hub) processUnregister(client *Client) *ClientSession {
|
func (h *Hub) processUnregister(client *Client) *ClientSession {
|
||||||
|
@ -768,20 +768,22 @@ func (h *Hub) processMessage(client *Client, data []byte) {
|
||||||
if err := message.UnmarshalJSON(data); err != nil {
|
if err := message.UnmarshalJSON(data); err != nil {
|
||||||
if session := client.GetSession(); session != nil {
|
if session := client.GetSession(); session != nil {
|
||||||
log.Printf("Error decoding message from client %s: %v", session.PublicId(), err)
|
log.Printf("Error decoding message from client %s: %v", session.PublicId(), err)
|
||||||
|
session.SendError(InvalidFormat)
|
||||||
} else {
|
} else {
|
||||||
log.Printf("Error decoding message from %s: %v", client.RemoteAddr(), err)
|
log.Printf("Error decoding message from %s: %v", client.RemoteAddr(), err)
|
||||||
}
|
|
||||||
client.SendError(InvalidFormat)
|
client.SendError(InvalidFormat)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := message.CheckValid(); err != nil {
|
if err := message.CheckValid(); err != nil {
|
||||||
if session := client.GetSession(); session != nil {
|
if session := client.GetSession(); session != nil {
|
||||||
log.Printf("Invalid message %+v from client %s: %v", message, session.PublicId(), err)
|
log.Printf("Invalid message %+v from client %s: %v", message, session.PublicId(), err)
|
||||||
|
session.SendMessage(message.NewErrorServerMessage(InvalidFormat))
|
||||||
} else {
|
} else {
|
||||||
log.Printf("Invalid message %+v from %s: %v", message, client.RemoteAddr(), err)
|
log.Printf("Invalid message %+v from %s: %v", message, client.RemoteAddr(), err)
|
||||||
}
|
|
||||||
client.SendMessage(message.NewErrorServerMessage(InvalidFormat))
|
client.SendMessage(message.NewErrorServerMessage(InvalidFormat))
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -814,7 +816,7 @@ func (h *Hub) processMessage(client *Client, data []byte) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Hub) sendHelloResponse(client *Client, message *ClientMessage, session *ClientSession) bool {
|
func (h *Hub) sendHelloResponse(session *ClientSession, message *ClientMessage) bool {
|
||||||
response := &ServerMessage{
|
response := &ServerMessage{
|
||||||
Id: message.Id,
|
Id: message.Id,
|
||||||
Type: "hello",
|
Type: "hello",
|
||||||
|
@ -826,7 +828,7 @@ func (h *Hub) sendHelloResponse(client *Client, message *ClientMessage, session
|
||||||
Server: h.GetServerInfo(session),
|
Server: h.GetServerInfo(session),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return client.SendMessage(response)
|
return session.SendMessage(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Hub) processHello(client *Client, message *ClientMessage) {
|
func (h *Hub) processHello(client *Client, message *ClientMessage) {
|
||||||
|
@ -873,7 +875,7 @@ func (h *Hub) processHello(client *Client, message *ClientMessage) {
|
||||||
|
|
||||||
log.Printf("Resume session from %s in %s (%s) %s (private=%s)", client.RemoteAddr(), client.Country(), client.UserAgent(), session.PublicId(), session.PrivateId())
|
log.Printf("Resume session from %s in %s (%s) %s (private=%s)", client.RemoteAddr(), client.Country(), client.UserAgent(), session.PublicId(), session.PrivateId())
|
||||||
|
|
||||||
h.sendHelloResponse(client, message, clientSession)
|
h.sendHelloResponse(clientSession, message)
|
||||||
clientSession.NotifySessionResumed(client)
|
clientSession.NotifySessionResumed(client)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -986,7 +988,7 @@ func (h *Hub) disconnectByRoomSessionId(roomSessionId string) {
|
||||||
session.Close()
|
session.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Hub) sendRoom(client *Client, message *ClientMessage, room *Room) bool {
|
func (h *Hub) sendRoom(session *ClientSession, message *ClientMessage, room *Room) bool {
|
||||||
response := &ServerMessage{
|
response := &ServerMessage{
|
||||||
Type: "room",
|
Type: "room",
|
||||||
}
|
}
|
||||||
|
@ -1003,7 +1005,7 @@ func (h *Hub) sendRoom(client *Client, message *ClientMessage, room *Room) bool
|
||||||
Properties: room.properties,
|
Properties: room.properties,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return client.SendMessage(response)
|
return session.SendMessage(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Hub) processRoom(client *Client, message *ClientMessage) {
|
func (h *Hub) processRoom(client *Client, message *ClientMessage) {
|
||||||
|
@ -1017,7 +1019,7 @@ func (h *Hub) processRoom(client *Client, message *ClientMessage) {
|
||||||
// We can handle leaving a room directly.
|
// We can handle leaving a room directly.
|
||||||
if session.LeaveRoom(true) != nil {
|
if session.LeaveRoom(true) != nil {
|
||||||
// User was in a room before, so need to notify about leaving it.
|
// User was in a room before, so need to notify about leaving it.
|
||||||
h.sendRoom(client, message, nil)
|
h.sendRoom(session, message, nil)
|
||||||
}
|
}
|
||||||
if session.UserId() == "" && session.ClientType() != HelloClientTypeInternal {
|
if session.UserId() == "" && session.ClientType() != HelloClientTypeInternal {
|
||||||
h.startWaitAnonymousClientRoom(client)
|
h.startWaitAnonymousClientRoom(client)
|
||||||
|
@ -1054,7 +1056,7 @@ func (h *Hub) processRoom(client *Client, message *ClientMessage) {
|
||||||
}
|
}
|
||||||
request := NewBackendClientRoomRequest(roomId, session.UserId(), sessionId)
|
request := NewBackendClientRoomRequest(roomId, session.UserId(), sessionId)
|
||||||
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &room); err != nil {
|
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &room); err != nil {
|
||||||
client.SendMessage(message.NewWrappedErrorServerMessage(err))
|
session.SendMessage(message.NewWrappedErrorServerMessage(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1067,7 +1069,7 @@ func (h *Hub) processRoom(client *Client, message *ClientMessage) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
h.processJoinRoom(client, message, &room)
|
h.processJoinRoom(session, message, &room)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Hub) getRoomForBackend(id string, backend *Backend) *Room {
|
func (h *Hub) getRoomForBackend(id string, backend *Backend) *Room {
|
||||||
|
@ -1097,18 +1099,12 @@ func (h *Hub) createRoom(id string, properties *json.RawMessage, backend *Backen
|
||||||
return room, nil
|
return room, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Hub) processJoinRoom(client *Client, message *ClientMessage, room *BackendClientResponse) {
|
func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, room *BackendClientResponse) {
|
||||||
session := client.GetSession()
|
|
||||||
if session == nil {
|
|
||||||
// Client disconnected while waiting for join room response.
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if room.Type == "error" {
|
if room.Type == "error" {
|
||||||
client.SendMessage(message.NewErrorServerMessage(room.Error))
|
session.SendMessage(message.NewErrorServerMessage(room.Error))
|
||||||
return
|
return
|
||||||
} else if room.Type != "room" {
|
} else if room.Type != "room" {
|
||||||
client.SendMessage(message.NewErrorServerMessage(RoomJoinFailed))
|
session.SendMessage(message.NewErrorServerMessage(RoomJoinFailed))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1117,9 +1113,9 @@ func (h *Hub) processJoinRoom(client *Client, message *ClientMessage, room *Back
|
||||||
roomId := room.Room.RoomId
|
roomId := room.Room.RoomId
|
||||||
internalRoomId := getRoomIdForBackend(roomId, session.Backend())
|
internalRoomId := getRoomIdForBackend(roomId, session.Backend())
|
||||||
if err := session.SubscribeRoomNats(h.nats, roomId, message.Room.SessionId); err != nil {
|
if err := session.SubscribeRoomNats(h.nats, roomId, message.Room.SessionId); err != nil {
|
||||||
client.SendMessage(message.NewWrappedErrorServerMessage(err))
|
session.SendMessage(message.NewWrappedErrorServerMessage(err))
|
||||||
// The client (implicitly) left the room due to an error.
|
// The client (implicitly) left the room due to an error.
|
||||||
h.sendRoom(client, nil, nil)
|
h.sendRoom(session, nil, nil)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1129,28 +1125,30 @@ func (h *Hub) processJoinRoom(client *Client, message *ClientMessage, room *Back
|
||||||
var err error
|
var err error
|
||||||
if r, err = h.createRoom(roomId, room.Room.Properties, session.Backend()); err != nil {
|
if r, err = h.createRoom(roomId, room.Room.Properties, session.Backend()); err != nil {
|
||||||
h.ru.Unlock()
|
h.ru.Unlock()
|
||||||
client.SendMessage(message.NewWrappedErrorServerMessage(err))
|
session.SendMessage(message.NewWrappedErrorServerMessage(err))
|
||||||
// The client (implicitly) left the room due to an error.
|
// The client (implicitly) left the room due to an error.
|
||||||
session.UnsubscribeRoomNats()
|
session.UnsubscribeRoomNats()
|
||||||
h.sendRoom(client, nil, nil)
|
h.sendRoom(session, nil, nil)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
h.ru.Unlock()
|
h.ru.Unlock()
|
||||||
|
|
||||||
h.mu.Lock()
|
h.mu.Lock()
|
||||||
|
if client := session.GetClient(); client != nil {
|
||||||
// The client now joined a room, don't expire him if he is anonymous.
|
// The client now joined a room, don't expire him if he is anonymous.
|
||||||
delete(h.anonymousClients, client)
|
delete(h.anonymousClients, client)
|
||||||
|
}
|
||||||
h.mu.Unlock()
|
h.mu.Unlock()
|
||||||
session.SetRoom(r)
|
session.SetRoom(r)
|
||||||
if room.Room.Permissions != nil {
|
if room.Room.Permissions != nil {
|
||||||
session.SetPermissions(*room.Room.Permissions)
|
session.SetPermissions(*room.Room.Permissions)
|
||||||
}
|
}
|
||||||
h.sendRoom(client, message, r)
|
h.sendRoom(session, message, r)
|
||||||
h.notifyUserJoinedRoom(r, client, session, room.Room.Session)
|
h.notifyUserJoinedRoom(r, session, room.Room.Session)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Hub) notifyUserJoinedRoom(room *Room, client *Client, session Session, sessionData *json.RawMessage) {
|
func (h *Hub) notifyUserJoinedRoom(room *Room, session *ClientSession, sessionData *json.RawMessage) {
|
||||||
// Register session with the room
|
// Register session with the room
|
||||||
if sessions := room.AddSession(session, sessionData); len(sessions) > 0 {
|
if sessions := room.AddSession(session, sessionData); len(sessions) > 0 {
|
||||||
events := make([]*EventServerMessageSessionEntry, 0, len(sessions))
|
events := make([]*EventServerMessageSessionEntry, 0, len(sessions))
|
||||||
|
@ -1171,7 +1169,7 @@ func (h *Hub) notifyUserJoinedRoom(room *Room, client *Client, session Session,
|
||||||
}
|
}
|
||||||
|
|
||||||
// No need to send through NATS, the session is connected locally.
|
// No need to send through NATS, the session is connected locally.
|
||||||
client.SendMessage(msg)
|
session.SendMessage(msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1205,7 +1203,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) {
|
||||||
case "requestoffer":
|
case "requestoffer":
|
||||||
// Process asynchronously to avoid blocking regular
|
// Process asynchronously to avoid blocking regular
|
||||||
// message processing for this client.
|
// message processing for this client.
|
||||||
go h.processMcuMessage(client, client, session, message, msg, &data)
|
go h.processMcuMessage(session, session, message, msg, &data)
|
||||||
return
|
return
|
||||||
case "offer":
|
case "offer":
|
||||||
fallthrough
|
fallthrough
|
||||||
|
@ -1214,7 +1212,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) {
|
||||||
case "endOfCandidates":
|
case "endOfCandidates":
|
||||||
fallthrough
|
fallthrough
|
||||||
case "candidate":
|
case "candidate":
|
||||||
h.processMcuMessage(client, client, session, message, msg, &data)
|
h.processMcuMessage(session, session, message, msg, &data)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1315,7 +1313,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) {
|
||||||
if clientData != nil && clientData.Type == "sendoffer" {
|
if clientData != nil && clientData.Type == "sendoffer" {
|
||||||
if !isAllowedToSend(session, clientData) {
|
if !isAllowedToSend(session, clientData) {
|
||||||
log.Printf("Session %s is not allowed to send offer for %s, ignoring", session.PublicId(), clientData.RoomType)
|
log.Printf("Session %s is not allowed to send offer for %s, ignoring", session.PublicId(), clientData.RoomType)
|
||||||
sendNotAllowed(client, message)
|
sendNotAllowed(session, message)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1324,7 +1322,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) {
|
||||||
// It may take some time for the publisher (which is the current
|
// It may take some time for the publisher (which is the current
|
||||||
// client) to start his stream, so we must not block the active
|
// client) to start his stream, so we must not block the active
|
||||||
// goroutine.
|
// goroutine.
|
||||||
go h.processMcuMessage(client, recipient, recipientSession, message, msg, clientData)
|
go h.processMcuMessage(session, recipientSession, message, msg, clientData)
|
||||||
} else { // nolint
|
} else { // nolint
|
||||||
// Client is not connected yet.
|
// Client is not connected yet.
|
||||||
}
|
}
|
||||||
|
@ -1491,14 +1489,14 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
|
||||||
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil {
|
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil {
|
||||||
log.Printf("Could not join virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err)
|
log.Printf("Could not join virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err)
|
||||||
reply := message.NewErrorServerMessage(NewError("add_failed", "Could not join virtual session."))
|
reply := message.NewErrorServerMessage(NewError("add_failed", "Could not join virtual session."))
|
||||||
client.SendMessage(reply)
|
session.SendMessage(reply)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if response.Type == "error" {
|
if response.Type == "error" {
|
||||||
log.Printf("Could not join virtual session %s at backend %s: %+v", virtualSessionId, session.BackendUrl(), response.Error)
|
log.Printf("Could not join virtual session %s at backend %s: %+v", virtualSessionId, session.BackendUrl(), response.Error)
|
||||||
reply := message.NewErrorServerMessage(NewError("add_failed", response.Error.Error()))
|
reply := message.NewErrorServerMessage(NewError("add_failed", response.Error.Error()))
|
||||||
client.SendMessage(reply)
|
session.SendMessage(reply)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1507,7 +1505,7 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
|
||||||
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil {
|
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil {
|
||||||
log.Printf("Could not add virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err)
|
log.Printf("Could not add virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err)
|
||||||
reply := message.NewErrorServerMessage(NewError("add_failed", "Could not add virtual session."))
|
reply := message.NewErrorServerMessage(NewError("add_failed", "Could not add virtual session."))
|
||||||
client.SendMessage(reply)
|
session.SendMessage(reply)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1577,7 +1575,7 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
|
||||||
log.Printf("Session %s removed virtual session %s", session.PublicId(), sess.PublicId())
|
log.Printf("Session %s removed virtual session %s", session.PublicId(), sess.PublicId())
|
||||||
if vsess, ok := sess.(*VirtualSession); ok {
|
if vsess, ok := sess.(*VirtualSession); ok {
|
||||||
// We should always have a VirtualSession here.
|
// We should always have a VirtualSession here.
|
||||||
vsess.CloseWithFeedback(client, message)
|
vsess.CloseWithFeedback(session, message)
|
||||||
} else {
|
} else {
|
||||||
sess.Close()
|
sess.Close()
|
||||||
}
|
}
|
||||||
|
@ -1598,22 +1596,22 @@ func isAllowedToSend(session *ClientSession, data *MessageClientMessageData) boo
|
||||||
return session.HasPermission(permission)
|
return session.HasPermission(permission)
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendNotAllowed(client *Client, message *ClientMessage) {
|
func sendNotAllowed(session *ClientSession, message *ClientMessage) {
|
||||||
response := message.NewErrorServerMessage(NewError("not_allowed", "Not allowed to publish."))
|
response := message.NewErrorServerMessage(NewError("not_allowed", "Not allowed to publish."))
|
||||||
client.SendMessage(response)
|
session.SendMessage(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendMcuClientNotFound(client *Client, message *ClientMessage) {
|
func sendMcuClientNotFound(session *ClientSession, message *ClientMessage) {
|
||||||
response := message.NewErrorServerMessage(NewError("client_not_found", "No MCU client found to send message to."))
|
response := message.NewErrorServerMessage(NewError("client_not_found", "No MCU client found to send message to."))
|
||||||
client.SendMessage(response)
|
session.SendMessage(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendMcuProcessingFailed(client *Client, message *ClientMessage) {
|
func sendMcuProcessingFailed(session *ClientSession, message *ClientMessage) {
|
||||||
response := message.NewErrorServerMessage(NewError("processing_failed", "Processing of the message failed, please check server logs."))
|
response := message.NewErrorServerMessage(NewError("processing_failed", "Processing of the message failed, please check server logs."))
|
||||||
client.SendMessage(response)
|
session.SendMessage(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Hub) processMcuMessage(senderClient *Client, client *Client, session *ClientSession, client_message *ClientMessage, message *MessageClientMessage, data *MessageClientMessageData) {
|
func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSession, client_message *ClientMessage, message *MessageClientMessage, data *MessageClientMessageData) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), h.mcuTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), h.mcuTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
@ -1636,7 +1634,7 @@ func (h *Hub) processMcuMessage(senderClient *Client, client *Client, session *C
|
||||||
case "offer":
|
case "offer":
|
||||||
if !isAllowedToSend(session, data) {
|
if !isAllowedToSend(session, data) {
|
||||||
log.Printf("Session %s is not allowed to offer %s, ignoring", session.PublicId(), data.RoomType)
|
log.Printf("Session %s is not allowed to offer %s, ignoring", session.PublicId(), data.RoomType)
|
||||||
sendNotAllowed(senderClient, client_message)
|
sendNotAllowed(senderSession, client_message)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1646,7 +1644,7 @@ func (h *Hub) processMcuMessage(senderClient *Client, client *Client, session *C
|
||||||
if session.PublicId() == message.Recipient.SessionId {
|
if session.PublicId() == message.Recipient.SessionId {
|
||||||
if !isAllowedToSend(session, data) {
|
if !isAllowedToSend(session, data) {
|
||||||
log.Printf("Session %s is not allowed to send candidate for %s, ignoring", session.PublicId(), data.RoomType)
|
log.Printf("Session %s is not allowed to send candidate for %s, ignoring", session.PublicId(), data.RoomType)
|
||||||
sendNotAllowed(senderClient, client_message)
|
sendNotAllowed(senderSession, client_message)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1659,29 +1657,29 @@ func (h *Hub) processMcuMessage(senderClient *Client, client *Client, session *C
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Could not create MCU %s for session %s to send %+v to %s: %s", clientType, session.PublicId(), data, message.Recipient.SessionId, err)
|
log.Printf("Could not create MCU %s for session %s to send %+v to %s: %s", clientType, session.PublicId(), data, message.Recipient.SessionId, err)
|
||||||
sendMcuClientNotFound(senderClient, client_message)
|
sendMcuClientNotFound(senderSession, client_message)
|
||||||
return
|
return
|
||||||
} else if mc == nil {
|
} else if mc == nil {
|
||||||
log.Printf("No MCU %s found for session %s to send %+v to %s", clientType, session.PublicId(), data, message.Recipient.SessionId)
|
log.Printf("No MCU %s found for session %s to send %+v to %s", clientType, session.PublicId(), data, message.Recipient.SessionId)
|
||||||
sendMcuClientNotFound(senderClient, client_message)
|
sendMcuClientNotFound(senderSession, client_message)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
mc.SendMessage(context.TODO(), message, data, func(err error, response map[string]interface{}) {
|
mc.SendMessage(context.TODO(), message, data, func(err error, response map[string]interface{}) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Could not send MCU message %+v for session %s to %s: %s", data, session.PublicId(), message.Recipient.SessionId, err)
|
log.Printf("Could not send MCU message %+v for session %s to %s: %s", data, session.PublicId(), message.Recipient.SessionId, err)
|
||||||
sendMcuProcessingFailed(senderClient, client_message)
|
sendMcuProcessingFailed(senderSession, client_message)
|
||||||
return
|
return
|
||||||
} else if response == nil {
|
} else if response == nil {
|
||||||
// No response received
|
// No response received
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
h.sendMcuMessageResponse(client, session, message, data, response)
|
h.sendMcuMessageResponse(session, message, data, response)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Hub) sendMcuMessageResponse(client *Client, session *ClientSession, message *MessageClientMessage, data *MessageClientMessageData, response map[string]interface{}) {
|
func (h *Hub) sendMcuMessageResponse(session *ClientSession, message *MessageClientMessage, data *MessageClientMessageData, response map[string]interface{}) {
|
||||||
var response_message *ServerMessage
|
var response_message *ServerMessage
|
||||||
switch response["type"] {
|
switch response["type"] {
|
||||||
case "answer":
|
case "answer":
|
||||||
|
@ -1763,7 +1761,7 @@ func (h *Hub) processRoomDeleted(message *BackendServerRoomRequest) {
|
||||||
switch sess := session.(type) {
|
switch sess := session.(type) {
|
||||||
case *ClientSession:
|
case *ClientSession:
|
||||||
if client := sess.GetClient(); client != nil {
|
if client := sess.GetClient(); client != nil {
|
||||||
h.sendRoom(client, nil, nil)
|
h.sendRoom(sess, nil, nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
97
hub_test.go
97
hub_test.go
|
@ -254,7 +254,10 @@ func processRoomRequest(t *testing.T, w http.ResponseWriter, r *http.Request, re
|
||||||
t.Fatalf("Expected an room backend request, got %+v", request)
|
t.Fatalf("Expected an room backend request, got %+v", request)
|
||||||
}
|
}
|
||||||
|
|
||||||
if request.Room.RoomId == "test-room-takeover-room-session" {
|
switch request.Room.RoomId {
|
||||||
|
case "test-room-slow":
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
case "test-room-takeover-room-session":
|
||||||
// Additional checks for testcase "TestClientTakeoverRoomSession"
|
// Additional checks for testcase "TestClientTakeoverRoomSession"
|
||||||
if request.Room.Action == "leave" && request.Room.UserId == "test-userid1" {
|
if request.Room.Action == "leave" && request.Room.UserId == "test-userid1" {
|
||||||
t.Errorf("Should not receive \"leave\" event for first user, received %+v", request.Room)
|
t.Errorf("Should not receive \"leave\" event for first user, received %+v", request.Room)
|
||||||
|
@ -1754,6 +1757,95 @@ func TestJoinMultiple(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestJoinRoomSwitchClient(t *testing.T) {
|
||||||
|
hub, _, _, server, shutdown := CreateHubForTest(t)
|
||||||
|
defer shutdown()
|
||||||
|
|
||||||
|
client := NewTestClient(t, server, hub)
|
||||||
|
defer client.CloseWithBye()
|
||||||
|
|
||||||
|
if err := client.SendHello(testDefaultUserId); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
hello, err := client.RunUntilHello(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Join room by id.
|
||||||
|
roomId := "test-room-slow"
|
||||||
|
msg := &ClientMessage{
|
||||||
|
Id: "ABCD",
|
||||||
|
Type: "room",
|
||||||
|
Room: &RoomClientMessage{
|
||||||
|
RoomId: roomId,
|
||||||
|
SessionId: roomId + "-" + hello.Hello.SessionId,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := client.WriteJSON(msg); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// Wait a bit to make sure request is sent before closing client.
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
client.Close()
|
||||||
|
if err := client.WaitForClientRemoved(ctx); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The client needs some time to reconnect.
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
|
client2 := NewTestClient(t, server, hub)
|
||||||
|
defer client2.CloseWithBye()
|
||||||
|
if err := client2.SendHelloResume(hello.Hello.ResumeId); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
hello2, err := client2.RunUntilHello(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
} else {
|
||||||
|
if hello2.Hello.UserId != testDefaultUserId {
|
||||||
|
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello2.Hello)
|
||||||
|
}
|
||||||
|
if hello2.Hello.SessionId != hello.Hello.SessionId {
|
||||||
|
t.Errorf("Expected session id %s, got %+v", hello.Hello.SessionId, hello2.Hello)
|
||||||
|
}
|
||||||
|
if hello2.Hello.ResumeId != hello.Hello.ResumeId {
|
||||||
|
t.Errorf("Expected resume id %s, got %+v", hello.Hello.ResumeId, hello2.Hello)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
room, err := client2.RunUntilMessage(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := checkUnexpectedClose(err); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := checkMessageType(room, "room"); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if room.Room.RoomId != roomId {
|
||||||
|
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We will receive a "joined" event.
|
||||||
|
if err := client2.RunUntilJoined(ctx, hello.Hello); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Leave room.
|
||||||
|
if room, err := client2.JoinRoom(ctx, ""); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if room.Room.RoomId != "" {
|
||||||
|
t.Fatalf("Expected empty room, got %s", room.Room.RoomId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetRealUserIP(t *testing.T) {
|
func TestGetRealUserIP(t *testing.T) {
|
||||||
REMOTE_ATTR := "192.168.1.2"
|
REMOTE_ATTR := "192.168.1.2"
|
||||||
|
|
||||||
|
@ -1841,6 +1933,9 @@ func TestClientMessageToSessionIdWhileDisconnected(t *testing.T) {
|
||||||
client1.SendMessage(recipient2, data1) // nolint
|
client1.SendMessage(recipient2, data1) // nolint
|
||||||
client1.SendMessage(recipient2, data1) // nolint
|
client1.SendMessage(recipient2, data1) // nolint
|
||||||
|
|
||||||
|
// Simulate some time until client resumes the session.
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
client2 = NewTestClient(t, server, hub)
|
client2 = NewTestClient(t, server, hub)
|
||||||
defer client2.CloseWithBye()
|
defer client2.CloseWithBye()
|
||||||
if err := client2.SendHelloResume(hello2.Hello.ResumeId); err != nil {
|
if err := client2.SendHelloResume(hello2.Hello.ResumeId); err != nil {
|
||||||
|
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
)
|
)
|
||||||
|
@ -95,6 +96,9 @@ func (s *loopbackNatsSubscription) run() {
|
||||||
msg := s.incoming[0]
|
msg := s.incoming[0]
|
||||||
s.incoming = s.incoming[1:]
|
s.incoming = s.incoming[1:]
|
||||||
s.cond.L.Unlock()
|
s.cond.L.Unlock()
|
||||||
|
// A "real" NATS server would take some time to process the request,
|
||||||
|
// simulate this by sleeping a tiny bit.
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
s.ch <- msg
|
s.ch <- msg
|
||||||
s.cond.L.Lock()
|
s.cond.L.Lock()
|
||||||
}
|
}
|
||||||
|
|
4
room.go
4
room.go
|
@ -590,13 +590,13 @@ func (r *Room) getParticipantsUpdateMessage(users []map[string]interface{}) *Ser
|
||||||
return message
|
return message
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Room) NotifySessionResumed(client *Client) {
|
func (r *Room) NotifySessionResumed(session *ClientSession) {
|
||||||
message := r.getParticipantsUpdateMessage(r.users)
|
message := r.getParticipantsUpdateMessage(r.users)
|
||||||
if len(message.Event.Update.Users) == 0 {
|
if len(message.Event.Update.Users) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
client.SendMessage(message)
|
session.SendMessage(message)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Room) NotifySessionChanged(session Session) {
|
func (r *Room) NotifySessionChanged(session Session) {
|
||||||
|
|
|
@ -135,16 +135,16 @@ func (s *VirtualSession) Close() {
|
||||||
s.CloseWithFeedback(nil, nil)
|
s.CloseWithFeedback(nil, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *VirtualSession) CloseWithFeedback(client *Client, message *ClientMessage) {
|
func (s *VirtualSession) CloseWithFeedback(session *ClientSession, message *ClientMessage) {
|
||||||
room := s.GetRoom()
|
room := s.GetRoom()
|
||||||
s.session.RemoveVirtualSession(s)
|
s.session.RemoveVirtualSession(s)
|
||||||
removed := s.session.hub.removeSession(s)
|
removed := s.session.hub.removeSession(s)
|
||||||
if removed && room != nil {
|
if removed && room != nil {
|
||||||
go s.notifyBackendRemoved(room, client, message)
|
go s.notifyBackendRemoved(room, session, message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *VirtualSession) notifyBackendRemoved(room *Room, client *Client, message *ClientMessage) {
|
func (s *VirtualSession) notifyBackendRemoved(room *Room, session *ClientSession, message *ClientMessage) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), s.hub.backendTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), s.hub.backendTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
@ -160,9 +160,9 @@ func (s *VirtualSession) notifyBackendRemoved(room *Room, client *Client, messag
|
||||||
if err := s.hub.backend.PerformJSONRequest(ctx, s.ParsedBackendUrl(), request, &response); err != nil {
|
if err := s.hub.backend.PerformJSONRequest(ctx, s.ParsedBackendUrl(), request, &response); err != nil {
|
||||||
virtualSessionId := GetVirtualSessionId(s.session, s.PublicId())
|
virtualSessionId := GetVirtualSessionId(s.session, s.PublicId())
|
||||||
log.Printf("Could not leave virtual session %s at backend %s: %s", virtualSessionId, s.BackendUrl(), err)
|
log.Printf("Could not leave virtual session %s at backend %s: %s", virtualSessionId, s.BackendUrl(), err)
|
||||||
if client != nil && message != nil {
|
if session != nil && message != nil {
|
||||||
reply := message.NewErrorServerMessage(NewError("remove_failed", "Could not remove virtual session from backend."))
|
reply := message.NewErrorServerMessage(NewError("remove_failed", "Could not remove virtual session from backend."))
|
||||||
client.SendMessage(reply)
|
session.SendMessage(reply)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -170,9 +170,9 @@ func (s *VirtualSession) notifyBackendRemoved(room *Room, client *Client, messag
|
||||||
if response.Type == "error" {
|
if response.Type == "error" {
|
||||||
virtualSessionId := GetVirtualSessionId(s.session, s.PublicId())
|
virtualSessionId := GetVirtualSessionId(s.session, s.PublicId())
|
||||||
log.Printf("Could not leave virtual session %s at backend %s: %+v", virtualSessionId, s.BackendUrl(), response.Error)
|
log.Printf("Could not leave virtual session %s at backend %s: %+v", virtualSessionId, s.BackendUrl(), response.Error)
|
||||||
if client != nil && message != nil {
|
if session != nil && message != nil {
|
||||||
reply := message.NewErrorServerMessage(NewError("remove_failed", response.Error.Error()))
|
reply := message.NewErrorServerMessage(NewError("remove_failed", response.Error.Error()))
|
||||||
client.SendMessage(reply)
|
session.SendMessage(reply)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -182,9 +182,9 @@ func (s *VirtualSession) notifyBackendRemoved(room *Room, client *Client, messag
|
||||||
err := s.hub.backend.PerformJSONRequest(ctx, s.ParsedBackendUrl(), request, &response)
|
err := s.hub.backend.PerformJSONRequest(ctx, s.ParsedBackendUrl(), request, &response)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Could not remove virtual session %s from backend %s: %s", s.PublicId(), s.BackendUrl(), err)
|
log.Printf("Could not remove virtual session %s from backend %s: %s", s.PublicId(), s.BackendUrl(), err)
|
||||||
if client != nil && message != nil {
|
if session != nil && message != nil {
|
||||||
reply := message.NewErrorServerMessage(NewError("remove_failed", "Could not remove virtual session from backend."))
|
reply := message.NewErrorServerMessage(NewError("remove_failed", "Could not remove virtual session from backend."))
|
||||||
client.SendMessage(reply)
|
session.SendMessage(reply)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue