Implement "sendoffer" for remote sessions.

This commit is contained in:
Joachim Bauch 2022-06-28 15:59:58 +02:00
parent 36710c8aa9
commit dcb5be956c
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
3 changed files with 178 additions and 64 deletions

View file

@ -36,6 +36,8 @@ type AsyncMessage struct {
AsyncRoom *AsyncRoomMessage `json:"asyncroom,omitempty"`
SendOffer *SendOfferMessage `json:"sendoffer,omitempty"`
Id string `json:"id"`
}
@ -44,3 +46,9 @@ type AsyncRoomMessage struct {
SessionId string `json:"sessionid,omitempty"`
}
type SendOfferMessage struct {
MessageId string `json:"messageid,omitempty"`
SessionId string `json:"sessionid"`
Data *MessageClientMessageData `json:"data"`
}

View file

@ -1017,6 +1017,68 @@ func (s *ClientSession) processAsyncMessage(message *AsyncMessage) {
s.LeaveRoom(false)
defer s.closeAndWait(false)
}
case "sendoffer":
// Process asynchronously to not block other messages received.
go func() {
ctx, cancel := context.WithTimeout(context.Background(), s.hub.mcuTimeout)
defer cancel()
mc, err := s.GetOrCreateSubscriber(ctx, s.hub.mcu, message.SendOffer.SessionId, message.SendOffer.Data.RoomType)
if err != nil {
log.Printf("Could not create MCU subscriber for session %s to process sendoffer in %s: %s", message.SendOffer.SessionId, s.PublicId(), err)
if err := s.events.PublishSessionMessage(message.SendOffer.SessionId, s.backend, &AsyncMessage{
Type: "message",
Message: &ServerMessage{
Id: message.SendOffer.MessageId,
Type: "error",
Error: NewError("client_not_found", "No MCU client found to send message to."),
},
}); err != nil {
log.Printf("Error sending sendoffer error response to %s: %s", message.SendOffer.SessionId, err)
}
return
} else if mc == nil {
log.Printf("No MCU subscriber found for session %s to process sendoffer in %s", message.SendOffer.SessionId, s.PublicId())
if err := s.events.PublishSessionMessage(message.SendOffer.SessionId, s.backend, &AsyncMessage{
Type: "message",
Message: &ServerMessage{
Id: message.SendOffer.MessageId,
Type: "error",
Error: NewError("client_not_found", "No MCU client found to send message to."),
},
}); err != nil {
log.Printf("Error sending sendoffer error response to %s: %s", message.SendOffer.SessionId, err)
}
return
}
mc.SendMessage(context.TODO(), nil, message.SendOffer.Data, func(err error, response map[string]interface{}) {
if err != nil {
log.Printf("Could not send MCU message %+v for session %s to %s: %s", message.SendOffer.Data, message.SendOffer.SessionId, s.PublicId(), err)
if err := s.events.PublishSessionMessage(message.SendOffer.SessionId, s.backend, &AsyncMessage{
Type: "message",
Message: &ServerMessage{
Id: message.SendOffer.MessageId,
Type: "error",
Error: NewError("processing_failed", "Processing of the message failed, please check server logs."),
},
}); err != nil {
log.Printf("Error sending sendoffer error response to %s: %s", message.SendOffer.SessionId, err)
}
return
} else if response == nil {
// No response received
return
}
s.hub.sendMcuMessageResponse(s, mc, &MessageClientMessage{
Recipient: MessageClientMessageRecipient{
SessionId: message.SendOffer.SessionId,
},
}, message.SendOffer.Data, response)
})
}()
return
}
serverMessage := s.filterAsyncMessage(message)

172
hub.go
View file

@ -1242,39 +1242,41 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) {
var room *Room
switch msg.Recipient.Type {
case RecipientTypeSession:
data := h.decodeSessionId(msg.Recipient.SessionId, publicSessionName)
if data != nil {
if data.BackendId != session.Backend().Id() {
if h.mcu != nil {
// Maybe this is a message to be processed by the MCU.
var data MessageClientMessageData
if err := json.Unmarshal(*msg.Data, &data); err == nil {
clientData = &data
switch clientData.Type {
case "requestoffer":
// Process asynchronously to avoid blocking regular
// message processing for this client.
go h.processMcuMessage(session, message, msg, clientData)
return
case "offer":
fallthrough
case "answer":
fallthrough
case "endOfCandidates":
fallthrough
case "selectStream":
fallthrough
case "candidate":
h.processMcuMessage(session, message, msg, clientData)
return
}
}
}
sess := h.GetSessionByPublicId(msg.Recipient.SessionId)
if sess != nil {
// Recipient is also connected to this instance.
if sess.Backend().Id() != session.Backend().Id() {
// Clients are only allowed to send to sessions from the same backend.
return
}
if h.mcu != nil {
// Maybe this is a message to be processed by the MCU.
var data MessageClientMessageData
if err := json.Unmarshal(*msg.Data, &data); err == nil {
clientData = &data
switch data.Type {
case "requestoffer":
// Process asynchronously to avoid blocking regular
// message processing for this client.
go h.processMcuMessage(session, session, message, msg, &data)
return
case "offer":
fallthrough
case "answer":
fallthrough
case "endOfCandidates":
fallthrough
case "selectStream":
fallthrough
case "candidate":
h.processMcuMessage(session, session, message, msg, &data)
return
}
}
}
if msg.Recipient.SessionId == session.PublicId() {
// Don't loop messages to the sender.
return
@ -1282,28 +1284,26 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) {
subject = "session." + msg.Recipient.SessionId
recipientSessionId = msg.Recipient.SessionId
h.mu.RLock()
sess, found := h.sessions[data.Sid]
if found && sess.PublicId() == msg.Recipient.SessionId {
if sess, ok := sess.(*ClientSession); ok {
recipient = sess
}
if sess, ok := sess.(*ClientSession); ok {
recipient = sess
}
// Send to client connection for virtual sessions.
if sess.ClientType() == HelloClientTypeVirtual {
virtualSession := sess.(*VirtualSession)
clientSession := virtualSession.Session()
subject = "session." + clientSession.PublicId()
recipientSessionId = clientSession.PublicId()
recipient = clientSession
// The client should see his session id as recipient.
serverRecipient = &MessageClientMessageRecipient{
Type: "session",
SessionId: virtualSession.SessionId(),
}
// Send to client connection for virtual sessions.
if sess.ClientType() == HelloClientTypeVirtual {
virtualSession := sess.(*VirtualSession)
clientSession := virtualSession.Session()
subject = "session." + clientSession.PublicId()
recipientSessionId = clientSession.PublicId()
recipient = clientSession
// The client should see his session id as recipient.
serverRecipient = &MessageClientMessageRecipient{
Type: "session",
SessionId: virtualSession.SessionId(),
}
}
h.mu.RUnlock()
} else {
subject = "session." + msg.Recipient.SessionId
recipientSessionId = msg.Recipient.SessionId
}
case RecipientTypeUser:
if msg.Recipient.UserId != "" {
@ -1379,18 +1379,63 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) {
return
}
msg.Recipient.SessionId = session.PublicId()
// It may take some time for the publisher (which is the current
// client) to start his stream, so we must not block the active
// goroutine.
go h.processMcuMessage(session, recipient, message, msg, clientData)
go func() {
ctx, cancel := context.WithTimeout(context.Background(), h.mcuTimeout)
defer cancel()
mc, err := recipient.GetOrCreateSubscriber(ctx, h.mcu, session.PublicId(), clientData.RoomType)
if err != nil {
log.Printf("Could not create MCU subscriber for session %s to send %+v to %s: %s", session.PublicId(), clientData, recipient.PublicId(), err)
sendMcuClientNotFound(session, message)
return
} else if mc == nil {
log.Printf("No MCU subscriber found for session %s to send %+v to %s", session.PublicId(), clientData, recipient.PublicId())
sendMcuClientNotFound(session, message)
return
}
mc.SendMessage(context.TODO(), msg, clientData, func(err error, response map[string]interface{}) {
if err != nil {
log.Printf("Could not send MCU message %+v for session %s to %s: %s", clientData, session.PublicId(), recipient.PublicId(), err)
sendMcuProcessingFailed(session, message)
return
} else if response == nil {
// No response received
return
}
// The response (i.e. the "offer") must be sent to the recipient but
// should be coming from the sender.
msg.Recipient.SessionId = session.PublicId()
h.sendMcuMessageResponse(recipient, mc, msg, clientData, response)
})
}()
return
}
recipient.SendMessage(response)
} else {
if clientData != nil && clientData.Type == "sendoffer" {
// TODO(jojo): Implement this.
log.Printf("Sending offers to remote clients is not supported yet (client %s)", session.PublicId())
if err := session.IsAllowedToSend(clientData); err != nil {
log.Printf("Session %s is not allowed to send offer for %s, ignoring (%s)", session.PublicId(), clientData.RoomType, err)
sendNotAllowed(session, message, "Not allowed to send offer")
return
}
async := &AsyncMessage{
Type: "sendoffer",
SendOffer: &SendOfferMessage{
MessageId: message.Id,
SessionId: session.PublicId(),
Data: clientData,
},
}
if err := h.events.PublishSessionMessage(recipientSessionId, session.Backend(), async); err != nil {
log.Printf("Error publishing message to remote session: %s", err)
}
return
}
@ -1815,7 +1860,7 @@ func (h *Hub) isInSameCall(ctx context.Context, senderSession *ClientSession, re
return true
}
func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSession, client_message *ClientMessage, message *MessageClientMessage, data *MessageClientMessageData) {
func (h *Hub) processMcuMessage(session *ClientSession, client_message *ClientMessage, message *MessageClientMessage, data *MessageClientMessageData) {
ctx, cancel := context.WithTimeout(context.Background(), h.mcuTimeout)
defer cancel()
@ -1831,29 +1876,28 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes
// A user is only allowed to subscribe a stream if she is in the same room
// as the other user and both have their "inCall" flag set.
if !h.allowSubscribeAnyStream && !h.isInSameCall(ctx, senderSession, message.Recipient.SessionId) {
if !h.allowSubscribeAnyStream && !h.isInSameCall(ctx, session, message.Recipient.SessionId) {
log.Printf("Session %s is not in the same call as session %s, not requesting offer", session.PublicId(), message.Recipient.SessionId)
sendNotAllowed(senderSession, client_message, "Not allowed to request offer.")
sendNotAllowed(session, client_message, "Not allowed to request offer.")
return
}
clientType = "subscriber"
mc, err = session.GetOrCreateSubscriber(ctx, h.mcu, message.Recipient.SessionId, data.RoomType)
case "sendoffer":
// Permissions have already been checked in "processMessageMsg".
clientType = "subscriber"
mc, err = session.GetOrCreateSubscriber(ctx, h.mcu, message.Recipient.SessionId, data.RoomType)
// Will be sent directly.
return
case "offer":
clientType = "publisher"
mc, err = session.GetOrCreatePublisher(ctx, h.mcu, data.RoomType, data)
if err, ok := err.(*PermissionError); ok {
log.Printf("Session %s is not allowed to offer %s, ignoring (%s)", session.PublicId(), data.RoomType, err)
sendNotAllowed(senderSession, client_message, "Not allowed to publish.")
sendNotAllowed(session, client_message, "Not allowed to publish.")
return
}
if err, ok := err.(*SdpError); ok {
log.Printf("Session %s sent unsupported offer %s, ignoring (%s)", session.PublicId(), data.RoomType, err)
sendNotAllowed(senderSession, client_message, "Not allowed to publish.")
sendNotAllowed(session, client_message, "Not allowed to publish.")
return
}
case "selectStream":
@ -1868,7 +1912,7 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes
if session.PublicId() == message.Recipient.SessionId {
if err := session.IsAllowedToSend(data); err != nil {
log.Printf("Session %s is not allowed to send candidate for %s, ignoring (%s)", session.PublicId(), data.RoomType, err)
sendNotAllowed(senderSession, client_message, "Not allowed to send candidate.")
sendNotAllowed(session, client_message, "Not allowed to send candidate.")
return
}
@ -1881,18 +1925,18 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes
}
if err != nil {
log.Printf("Could not create MCU %s for session %s to send %+v to %s: %s", clientType, session.PublicId(), data, message.Recipient.SessionId, err)
sendMcuClientNotFound(senderSession, client_message)
sendMcuClientNotFound(session, client_message)
return
} else if mc == nil {
log.Printf("No MCU %s found for session %s to send %+v to %s", clientType, session.PublicId(), data, message.Recipient.SessionId)
sendMcuClientNotFound(senderSession, client_message)
sendMcuClientNotFound(session, client_message)
return
}
mc.SendMessage(context.TODO(), message, data, func(err error, response map[string]interface{}) {
if err != nil {
log.Printf("Could not send MCU message %+v for session %s to %s: %s", data, session.PublicId(), message.Recipient.SessionId, err)
sendMcuProcessingFailed(senderSession, client_message)
sendMcuProcessingFailed(session, client_message)
return
} else if response == nil {
// No response received