diff --git a/clientsession.go b/clientsession.go index 6451aff..f98f160 100644 --- a/clientsession.go +++ b/clientsession.go @@ -559,6 +559,14 @@ func (s *ClientSession) sendMessageUnlocked(message *ServerMessage) bool { return true } +func (s *ClientSession) SendError(e *Error) bool { + message := &ServerMessage{ + Type: "error", + Error: e, + } + return s.SendMessage(message) +} + func (s *ClientSession) SendMessage(message *ServerMessage) bool { s.mu.Lock() defer s.mu.Unlock() @@ -822,7 +830,7 @@ func (s *ClientSession) NotifySessionResumed(client *Client) { if len(s.pendingClientMessages) == 0 { s.mu.Unlock() if room := s.GetRoom(); room != nil { - room.NotifySessionResumed(client) + room.NotifySessionResumed(s) } return } @@ -841,7 +849,7 @@ func (s *ClientSession) NotifySessionResumed(client *Client) { if !hasPendingParticipantsUpdate { // Only need to send initial participants list update if none was part of the pending messages. if room := s.GetRoom(); room != nil { - room.NotifySessionResumed(client) + room.NotifySessionResumed(s) } } } diff --git a/hub.go b/hub.go index 7a52774..78ed3f1 100644 --- a/hub.go +++ b/hub.go @@ -768,20 +768,22 @@ func (h *Hub) processMessage(client *Client, data []byte) { if err := message.UnmarshalJSON(data); err != nil { if session := client.GetSession(); session != nil { log.Printf("Error decoding message from client %s: %v", session.PublicId(), err) + session.SendError(InvalidFormat) } else { log.Printf("Error decoding message from %s: %v", client.RemoteAddr(), err) + client.SendError(InvalidFormat) } - client.SendError(InvalidFormat) return } if err := message.CheckValid(); err != nil { if session := client.GetSession(); session != nil { log.Printf("Invalid message %+v from client %s: %v", message, session.PublicId(), err) + session.SendMessage(message.NewErrorServerMessage(InvalidFormat)) } else { log.Printf("Invalid message %+v from %s: %v", message, client.RemoteAddr(), err) + client.SendMessage(message.NewErrorServerMessage(InvalidFormat)) } - client.SendMessage(message.NewErrorServerMessage(InvalidFormat)) return } @@ -1201,7 +1203,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { case "requestoffer": // Process asynchronously to avoid blocking regular // message processing for this client. - go h.processMcuMessage(client, client, session, message, msg, &data) + go h.processMcuMessage(session, session, message, msg, &data) return case "offer": fallthrough @@ -1210,7 +1212,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { case "endOfCandidates": fallthrough case "candidate": - h.processMcuMessage(client, client, session, message, msg, &data) + h.processMcuMessage(session, session, message, msg, &data) return } } @@ -1311,7 +1313,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { if clientData != nil && clientData.Type == "sendoffer" { if !isAllowedToSend(session, clientData) { log.Printf("Session %s is not allowed to send offer for %s, ignoring", session.PublicId(), clientData.RoomType) - sendNotAllowed(client, message) + sendNotAllowed(session, message) return } @@ -1320,7 +1322,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { // It may take some time for the publisher (which is the current // client) to start his stream, so we must not block the active // goroutine. - go h.processMcuMessage(client, recipient, recipientSession, message, msg, clientData) + go h.processMcuMessage(session, recipientSession, message, msg, clientData) } else { // nolint // Client is not connected yet. } @@ -1573,7 +1575,7 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { log.Printf("Session %s removed virtual session %s", session.PublicId(), sess.PublicId()) if vsess, ok := sess.(*VirtualSession); ok { // We should always have a VirtualSession here. - vsess.CloseWithFeedback(client, message) + vsess.CloseWithFeedback(session, message) } else { sess.Close() } @@ -1594,22 +1596,22 @@ func isAllowedToSend(session *ClientSession, data *MessageClientMessageData) boo return session.HasPermission(permission) } -func sendNotAllowed(client *Client, message *ClientMessage) { +func sendNotAllowed(session *ClientSession, message *ClientMessage) { response := message.NewErrorServerMessage(NewError("not_allowed", "Not allowed to publish.")) - client.SendMessage(response) + session.SendMessage(response) } -func sendMcuClientNotFound(client *Client, message *ClientMessage) { +func sendMcuClientNotFound(session *ClientSession, message *ClientMessage) { response := message.NewErrorServerMessage(NewError("client_not_found", "No MCU client found to send message to.")) - client.SendMessage(response) + session.SendMessage(response) } -func sendMcuProcessingFailed(client *Client, message *ClientMessage) { +func sendMcuProcessingFailed(session *ClientSession, message *ClientMessage) { response := message.NewErrorServerMessage(NewError("processing_failed", "Processing of the message failed, please check server logs.")) - client.SendMessage(response) + session.SendMessage(response) } -func (h *Hub) processMcuMessage(senderClient *Client, client *Client, session *ClientSession, client_message *ClientMessage, message *MessageClientMessage, data *MessageClientMessageData) { +func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSession, client_message *ClientMessage, message *MessageClientMessage, data *MessageClientMessageData) { ctx, cancel := context.WithTimeout(context.Background(), h.mcuTimeout) defer cancel() @@ -1632,7 +1634,7 @@ func (h *Hub) processMcuMessage(senderClient *Client, client *Client, session *C case "offer": if !isAllowedToSend(session, data) { log.Printf("Session %s is not allowed to offer %s, ignoring", session.PublicId(), data.RoomType) - sendNotAllowed(senderClient, client_message) + sendNotAllowed(senderSession, client_message) return } @@ -1642,7 +1644,7 @@ func (h *Hub) processMcuMessage(senderClient *Client, client *Client, session *C if session.PublicId() == message.Recipient.SessionId { if !isAllowedToSend(session, data) { log.Printf("Session %s is not allowed to send candidate for %s, ignoring", session.PublicId(), data.RoomType) - sendNotAllowed(senderClient, client_message) + sendNotAllowed(senderSession, client_message) return } @@ -1655,29 +1657,29 @@ func (h *Hub) processMcuMessage(senderClient *Client, client *Client, session *C } if err != nil { log.Printf("Could not create MCU %s for session %s to send %+v to %s: %s", clientType, session.PublicId(), data, message.Recipient.SessionId, err) - sendMcuClientNotFound(senderClient, client_message) + sendMcuClientNotFound(senderSession, client_message) return } else if mc == nil { log.Printf("No MCU %s found for session %s to send %+v to %s", clientType, session.PublicId(), data, message.Recipient.SessionId) - sendMcuClientNotFound(senderClient, client_message) + sendMcuClientNotFound(senderSession, client_message) return } mc.SendMessage(context.TODO(), message, data, func(err error, response map[string]interface{}) { if err != nil { log.Printf("Could not send MCU message %+v for session %s to %s: %s", data, session.PublicId(), message.Recipient.SessionId, err) - sendMcuProcessingFailed(senderClient, client_message) + sendMcuProcessingFailed(senderSession, client_message) return } else if response == nil { // No response received return } - h.sendMcuMessageResponse(client, session, message, data, response) + h.sendMcuMessageResponse(session, message, data, response) }) } -func (h *Hub) sendMcuMessageResponse(client *Client, session *ClientSession, message *MessageClientMessage, data *MessageClientMessageData, response map[string]interface{}) { +func (h *Hub) sendMcuMessageResponse(session *ClientSession, message *MessageClientMessage, data *MessageClientMessageData, response map[string]interface{}) { var response_message *ServerMessage switch response["type"] { case "answer": diff --git a/room.go b/room.go index e750e68..941f799 100644 --- a/room.go +++ b/room.go @@ -590,13 +590,13 @@ func (r *Room) getParticipantsUpdateMessage(users []map[string]interface{}) *Ser return message } -func (r *Room) NotifySessionResumed(client *Client) { +func (r *Room) NotifySessionResumed(session *ClientSession) { message := r.getParticipantsUpdateMessage(r.users) if len(message.Event.Update.Users) == 0 { return } - client.SendMessage(message) + session.SendMessage(message) } func (r *Room) NotifySessionChanged(session Session) { diff --git a/virtualsession.go b/virtualsession.go index 10d506a..87943da 100644 --- a/virtualsession.go +++ b/virtualsession.go @@ -135,16 +135,16 @@ func (s *VirtualSession) Close() { s.CloseWithFeedback(nil, nil) } -func (s *VirtualSession) CloseWithFeedback(client *Client, message *ClientMessage) { +func (s *VirtualSession) CloseWithFeedback(session *ClientSession, message *ClientMessage) { room := s.GetRoom() s.session.RemoveVirtualSession(s) removed := s.session.hub.removeSession(s) if removed && room != nil { - go s.notifyBackendRemoved(room, client, message) + go s.notifyBackendRemoved(room, session, message) } } -func (s *VirtualSession) notifyBackendRemoved(room *Room, client *Client, message *ClientMessage) { +func (s *VirtualSession) notifyBackendRemoved(room *Room, session *ClientSession, message *ClientMessage) { ctx, cancel := context.WithTimeout(context.Background(), s.hub.backendTimeout) defer cancel() @@ -160,9 +160,9 @@ func (s *VirtualSession) notifyBackendRemoved(room *Room, client *Client, messag if err := s.hub.backend.PerformJSONRequest(ctx, s.ParsedBackendUrl(), request, &response); err != nil { virtualSessionId := GetVirtualSessionId(s.session, s.PublicId()) log.Printf("Could not leave virtual session %s at backend %s: %s", virtualSessionId, s.BackendUrl(), err) - if client != nil && message != nil { + if session != nil && message != nil { reply := message.NewErrorServerMessage(NewError("remove_failed", "Could not remove virtual session from backend.")) - client.SendMessage(reply) + session.SendMessage(reply) } return } @@ -170,9 +170,9 @@ func (s *VirtualSession) notifyBackendRemoved(room *Room, client *Client, messag if response.Type == "error" { virtualSessionId := GetVirtualSessionId(s.session, s.PublicId()) log.Printf("Could not leave virtual session %s at backend %s: %+v", virtualSessionId, s.BackendUrl(), response.Error) - if client != nil && message != nil { + if session != nil && message != nil { reply := message.NewErrorServerMessage(NewError("remove_failed", response.Error.Error())) - client.SendMessage(reply) + session.SendMessage(reply) } return } @@ -182,9 +182,9 @@ func (s *VirtualSession) notifyBackendRemoved(room *Room, client *Client, messag err := s.hub.backend.PerformJSONRequest(ctx, s.ParsedBackendUrl(), request, &response) if err != nil { log.Printf("Could not remove virtual session %s from backend %s: %s", s.PublicId(), s.BackendUrl(), err) - if client != nil && message != nil { + if session != nil && message != nil { reply := message.NewErrorServerMessage(NewError("remove_failed", "Could not remove virtual session from backend.")) - client.SendMessage(reply) + session.SendMessage(reply) } } }