From dcb5be956cb228743faf93213d14823957e8beeb Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 28 Jun 2022 15:59:58 +0200 Subject: [PATCH] Implement "sendoffer" for remote sessions. --- api_async.go | 8 +++ clientsession.go | 62 +++++++++++++++++ hub.go | 172 +++++++++++++++++++++++++++++------------------ 3 files changed, 178 insertions(+), 64 deletions(-) diff --git a/api_async.go b/api_async.go index f831341..bc49dc7 100644 --- a/api_async.go +++ b/api_async.go @@ -36,6 +36,8 @@ type AsyncMessage struct { AsyncRoom *AsyncRoomMessage `json:"asyncroom,omitempty"` + SendOffer *SendOfferMessage `json:"sendoffer,omitempty"` + Id string `json:"id"` } @@ -44,3 +46,9 @@ type AsyncRoomMessage struct { SessionId string `json:"sessionid,omitempty"` } + +type SendOfferMessage struct { + MessageId string `json:"messageid,omitempty"` + SessionId string `json:"sessionid"` + Data *MessageClientMessageData `json:"data"` +} diff --git a/clientsession.go b/clientsession.go index c958937..68d6186 100644 --- a/clientsession.go +++ b/clientsession.go @@ -1017,6 +1017,68 @@ func (s *ClientSession) processAsyncMessage(message *AsyncMessage) { s.LeaveRoom(false) defer s.closeAndWait(false) } + case "sendoffer": + // Process asynchronously to not block other messages received. + go func() { + ctx, cancel := context.WithTimeout(context.Background(), s.hub.mcuTimeout) + defer cancel() + + mc, err := s.GetOrCreateSubscriber(ctx, s.hub.mcu, message.SendOffer.SessionId, message.SendOffer.Data.RoomType) + if err != nil { + log.Printf("Could not create MCU subscriber for session %s to process sendoffer in %s: %s", message.SendOffer.SessionId, s.PublicId(), err) + if err := s.events.PublishSessionMessage(message.SendOffer.SessionId, s.backend, &AsyncMessage{ + Type: "message", + Message: &ServerMessage{ + Id: message.SendOffer.MessageId, + Type: "error", + Error: NewError("client_not_found", "No MCU client found to send message to."), + }, + }); err != nil { + log.Printf("Error sending sendoffer error response to %s: %s", message.SendOffer.SessionId, err) + } + return + } else if mc == nil { + log.Printf("No MCU subscriber found for session %s to process sendoffer in %s", message.SendOffer.SessionId, s.PublicId()) + if err := s.events.PublishSessionMessage(message.SendOffer.SessionId, s.backend, &AsyncMessage{ + Type: "message", + Message: &ServerMessage{ + Id: message.SendOffer.MessageId, + Type: "error", + Error: NewError("client_not_found", "No MCU client found to send message to."), + }, + }); err != nil { + log.Printf("Error sending sendoffer error response to %s: %s", message.SendOffer.SessionId, err) + } + return + } + + mc.SendMessage(context.TODO(), nil, message.SendOffer.Data, func(err error, response map[string]interface{}) { + if err != nil { + log.Printf("Could not send MCU message %+v for session %s to %s: %s", message.SendOffer.Data, message.SendOffer.SessionId, s.PublicId(), err) + if err := s.events.PublishSessionMessage(message.SendOffer.SessionId, s.backend, &AsyncMessage{ + Type: "message", + Message: &ServerMessage{ + Id: message.SendOffer.MessageId, + Type: "error", + Error: NewError("processing_failed", "Processing of the message failed, please check server logs."), + }, + }); err != nil { + log.Printf("Error sending sendoffer error response to %s: %s", message.SendOffer.SessionId, err) + } + return + } else if response == nil { + // No response received + return + } + + s.hub.sendMcuMessageResponse(s, mc, &MessageClientMessage{ + Recipient: MessageClientMessageRecipient{ + SessionId: message.SendOffer.SessionId, + }, + }, message.SendOffer.Data, response) + }) + }() + return } serverMessage := s.filterAsyncMessage(message) diff --git a/hub.go b/hub.go index dca5571..4dd62ee 100644 --- a/hub.go +++ b/hub.go @@ -1242,39 +1242,41 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { var room *Room switch msg.Recipient.Type { case RecipientTypeSession: - data := h.decodeSessionId(msg.Recipient.SessionId, publicSessionName) - if data != nil { - if data.BackendId != session.Backend().Id() { + if h.mcu != nil { + // Maybe this is a message to be processed by the MCU. + var data MessageClientMessageData + if err := json.Unmarshal(*msg.Data, &data); err == nil { + clientData = &data + + switch clientData.Type { + case "requestoffer": + // Process asynchronously to avoid blocking regular + // message processing for this client. + go h.processMcuMessage(session, message, msg, clientData) + return + case "offer": + fallthrough + case "answer": + fallthrough + case "endOfCandidates": + fallthrough + case "selectStream": + fallthrough + case "candidate": + h.processMcuMessage(session, message, msg, clientData) + return + } + } + } + + sess := h.GetSessionByPublicId(msg.Recipient.SessionId) + if sess != nil { + // Recipient is also connected to this instance. + if sess.Backend().Id() != session.Backend().Id() { // Clients are only allowed to send to sessions from the same backend. return } - if h.mcu != nil { - // Maybe this is a message to be processed by the MCU. - var data MessageClientMessageData - if err := json.Unmarshal(*msg.Data, &data); err == nil { - clientData = &data - switch data.Type { - case "requestoffer": - // Process asynchronously to avoid blocking regular - // message processing for this client. - go h.processMcuMessage(session, session, message, msg, &data) - return - case "offer": - fallthrough - case "answer": - fallthrough - case "endOfCandidates": - fallthrough - case "selectStream": - fallthrough - case "candidate": - h.processMcuMessage(session, session, message, msg, &data) - return - } - } - } - if msg.Recipient.SessionId == session.PublicId() { // Don't loop messages to the sender. return @@ -1282,28 +1284,26 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { subject = "session." + msg.Recipient.SessionId recipientSessionId = msg.Recipient.SessionId - h.mu.RLock() - sess, found := h.sessions[data.Sid] - if found && sess.PublicId() == msg.Recipient.SessionId { - if sess, ok := sess.(*ClientSession); ok { - recipient = sess - } + if sess, ok := sess.(*ClientSession); ok { + recipient = sess + } - // Send to client connection for virtual sessions. - if sess.ClientType() == HelloClientTypeVirtual { - virtualSession := sess.(*VirtualSession) - clientSession := virtualSession.Session() - subject = "session." + clientSession.PublicId() - recipientSessionId = clientSession.PublicId() - recipient = clientSession - // The client should see his session id as recipient. - serverRecipient = &MessageClientMessageRecipient{ - Type: "session", - SessionId: virtualSession.SessionId(), - } + // Send to client connection for virtual sessions. + if sess.ClientType() == HelloClientTypeVirtual { + virtualSession := sess.(*VirtualSession) + clientSession := virtualSession.Session() + subject = "session." + clientSession.PublicId() + recipientSessionId = clientSession.PublicId() + recipient = clientSession + // The client should see his session id as recipient. + serverRecipient = &MessageClientMessageRecipient{ + Type: "session", + SessionId: virtualSession.SessionId(), } } - h.mu.RUnlock() + } else { + subject = "session." + msg.Recipient.SessionId + recipientSessionId = msg.Recipient.SessionId } case RecipientTypeUser: if msg.Recipient.UserId != "" { @@ -1379,18 +1379,63 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { return } - msg.Recipient.SessionId = session.PublicId() // It may take some time for the publisher (which is the current // client) to start his stream, so we must not block the active // goroutine. - go h.processMcuMessage(session, recipient, message, msg, clientData) + go func() { + ctx, cancel := context.WithTimeout(context.Background(), h.mcuTimeout) + defer cancel() + + mc, err := recipient.GetOrCreateSubscriber(ctx, h.mcu, session.PublicId(), clientData.RoomType) + if err != nil { + log.Printf("Could not create MCU subscriber for session %s to send %+v to %s: %s", session.PublicId(), clientData, recipient.PublicId(), err) + sendMcuClientNotFound(session, message) + return + } else if mc == nil { + log.Printf("No MCU subscriber found for session %s to send %+v to %s", session.PublicId(), clientData, recipient.PublicId()) + sendMcuClientNotFound(session, message) + return + } + + mc.SendMessage(context.TODO(), msg, clientData, func(err error, response map[string]interface{}) { + if err != nil { + log.Printf("Could not send MCU message %+v for session %s to %s: %s", clientData, session.PublicId(), recipient.PublicId(), err) + sendMcuProcessingFailed(session, message) + return + } else if response == nil { + // No response received + return + } + + // The response (i.e. the "offer") must be sent to the recipient but + // should be coming from the sender. + msg.Recipient.SessionId = session.PublicId() + h.sendMcuMessageResponse(recipient, mc, msg, clientData, response) + }) + }() return } + recipient.SendMessage(response) } else { if clientData != nil && clientData.Type == "sendoffer" { - // TODO(jojo): Implement this. - log.Printf("Sending offers to remote clients is not supported yet (client %s)", session.PublicId()) + if err := session.IsAllowedToSend(clientData); err != nil { + log.Printf("Session %s is not allowed to send offer for %s, ignoring (%s)", session.PublicId(), clientData.RoomType, err) + sendNotAllowed(session, message, "Not allowed to send offer") + return + } + + async := &AsyncMessage{ + Type: "sendoffer", + SendOffer: &SendOfferMessage{ + MessageId: message.Id, + SessionId: session.PublicId(), + Data: clientData, + }, + } + if err := h.events.PublishSessionMessage(recipientSessionId, session.Backend(), async); err != nil { + log.Printf("Error publishing message to remote session: %s", err) + } return } @@ -1815,7 +1860,7 @@ func (h *Hub) isInSameCall(ctx context.Context, senderSession *ClientSession, re return true } -func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSession, client_message *ClientMessage, message *MessageClientMessage, data *MessageClientMessageData) { +func (h *Hub) processMcuMessage(session *ClientSession, client_message *ClientMessage, message *MessageClientMessage, data *MessageClientMessageData) { ctx, cancel := context.WithTimeout(context.Background(), h.mcuTimeout) defer cancel() @@ -1831,29 +1876,28 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes // A user is only allowed to subscribe a stream if she is in the same room // as the other user and both have their "inCall" flag set. - if !h.allowSubscribeAnyStream && !h.isInSameCall(ctx, senderSession, message.Recipient.SessionId) { + if !h.allowSubscribeAnyStream && !h.isInSameCall(ctx, session, message.Recipient.SessionId) { log.Printf("Session %s is not in the same call as session %s, not requesting offer", session.PublicId(), message.Recipient.SessionId) - sendNotAllowed(senderSession, client_message, "Not allowed to request offer.") + sendNotAllowed(session, client_message, "Not allowed to request offer.") return } clientType = "subscriber" mc, err = session.GetOrCreateSubscriber(ctx, h.mcu, message.Recipient.SessionId, data.RoomType) case "sendoffer": - // Permissions have already been checked in "processMessageMsg". - clientType = "subscriber" - mc, err = session.GetOrCreateSubscriber(ctx, h.mcu, message.Recipient.SessionId, data.RoomType) + // Will be sent directly. + return case "offer": clientType = "publisher" mc, err = session.GetOrCreatePublisher(ctx, h.mcu, data.RoomType, data) if err, ok := err.(*PermissionError); ok { log.Printf("Session %s is not allowed to offer %s, ignoring (%s)", session.PublicId(), data.RoomType, err) - sendNotAllowed(senderSession, client_message, "Not allowed to publish.") + sendNotAllowed(session, client_message, "Not allowed to publish.") return } if err, ok := err.(*SdpError); ok { log.Printf("Session %s sent unsupported offer %s, ignoring (%s)", session.PublicId(), data.RoomType, err) - sendNotAllowed(senderSession, client_message, "Not allowed to publish.") + sendNotAllowed(session, client_message, "Not allowed to publish.") return } case "selectStream": @@ -1868,7 +1912,7 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes if session.PublicId() == message.Recipient.SessionId { if err := session.IsAllowedToSend(data); err != nil { log.Printf("Session %s is not allowed to send candidate for %s, ignoring (%s)", session.PublicId(), data.RoomType, err) - sendNotAllowed(senderSession, client_message, "Not allowed to send candidate.") + sendNotAllowed(session, client_message, "Not allowed to send candidate.") return } @@ -1881,18 +1925,18 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes } if err != nil { log.Printf("Could not create MCU %s for session %s to send %+v to %s: %s", clientType, session.PublicId(), data, message.Recipient.SessionId, err) - sendMcuClientNotFound(senderSession, client_message) + sendMcuClientNotFound(session, client_message) return } else if mc == nil { log.Printf("No MCU %s found for session %s to send %+v to %s", clientType, session.PublicId(), data, message.Recipient.SessionId) - sendMcuClientNotFound(senderSession, client_message) + sendMcuClientNotFound(session, client_message) return } mc.SendMessage(context.TODO(), message, data, func(err error, response map[string]interface{}) { if err != nil { log.Printf("Could not send MCU message %+v for session %s to %s: %s", data, session.PublicId(), message.Recipient.SessionId, err) - sendMcuProcessingFailed(senderSession, client_message) + sendMcuProcessingFailed(session, client_message) return } else if response == nil { // No response received