From 020cbaf49d58dc3267d7352f267d6b56f9d4e5e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Thu, 10 Feb 2022 20:19:25 +0100 Subject: [PATCH] Send updated offers to subscribers after publisher renegotiations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a publisher has a connection the publisher can update the connection (for example, to add a video track to an audio only connection) by sending an updated offer to Janus. Janus detects that, adjusts the connection and then sends back an answer. Once the publisher connection is updated Janus starts a renegotiation for the subscribers and generates the offers to be sent to them. The signaling server did not handle the event, so the offers were not sent and the subscriber connections were not updated. Now the offers are sent as needed, which makes possible for the renegotiation to be completed by the clients. In this case the "offer" message will also include an "update" parameter so clients can differentiate between offers to create new connections or update the existing one. Signed-off-by: Daniel Calviño Sánchez --- api_signaling.go | 1 + clientsession.go | 40 ++++++++++++++++++++++++++++++++++++++++ mcu_common.go | 2 ++ mcu_janus.go | 7 ++++++- mcu_proxy.go | 2 ++ proxy/proxy_session.go | 20 ++++++++++++++++++++ 6 files changed, 71 insertions(+), 1 deletion(-) diff --git a/api_signaling.go b/api_signaling.go index a20eed3..f528d86 100644 --- a/api_signaling.go +++ b/api_signaling.go @@ -647,4 +647,5 @@ type AnswerOfferMessage struct { Type string `json:"type"` RoomType string `json:"roomType"` Payload map[string]interface{} `json:"payload"` + Update bool `json:"update,omitempty"` } diff --git a/clientsession.go b/clientsession.go index 0e24176..7907ef9 100644 --- a/clientsession.go +++ b/clientsession.go @@ -563,6 +563,34 @@ func (s *ClientSession) SetClient(client *Client) *Client { return prev } +func (s *ClientSession) sendOffer(client McuClient, sender string, streamType string, offer map[string]interface{}) { + offer_message := &AnswerOfferMessage{ + To: s.PublicId(), + From: sender, + Type: "offer", + RoomType: streamType, + Payload: offer, + Update: true, + } + offer_data, err := json.Marshal(offer_message) + if err != nil { + log.Println("Could not serialize offer", offer_message, err) + return + } + response_message := &ServerMessage{ + Type: "message", + Message: &MessageServerMessage{ + Sender: &MessageServerMessageSender{ + Type: "session", + SessionId: sender, + }, + Data: (*json.RawMessage)(&offer_data), + }, + } + + s.sendMessageUnlocked(response_message) +} + func (s *ClientSession) sendCandidate(client McuClient, sender string, streamType string, candidate interface{}) { candidate_message := &AnswerOfferMessage{ To: s.PublicId(), @@ -628,6 +656,18 @@ func (s *ClientSession) SendMessages(messages []*ServerMessage) bool { return true } +func (s *ClientSession) OnUpdateOffer(client McuClient, offer map[string]interface{}) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, sub := range s.subscribers { + if sub.Id() == client.Id() { + s.sendOffer(client, sub.Publisher(), client.StreamType(), offer) + return + } + } +} + func (s *ClientSession) OnIceCandidate(client McuClient, candidate interface{}) { s.mu.Lock() defer s.mu.Unlock() diff --git a/mcu_common.go b/mcu_common.go index a5bf7a9..b1d99bc 100644 --- a/mcu_common.go +++ b/mcu_common.go @@ -50,6 +50,8 @@ const ( type McuListener interface { PublicId() string + OnUpdateOffer(client McuClient, offer map[string]interface{}) + OnIceCandidate(client McuClient, candidate interface{}) OnIceCompleted(client McuClient) diff --git a/mcu_janus.go b/mcu_janus.go index 3e147ec..5b70156 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -1063,7 +1063,12 @@ func (p *mcuJanusSubscriber) handleEvent(event *janus.EventMsg) { log.Printf("Subscriber %d: associated room has been destroyed, closing", p.handleId) go p.Close(ctx) case "event": - // Ignore events like selected substream / temporal layer. + // Handle renegotiations, but ignore other events like selected + // substream / temporal layer. + if getPluginStringValue(event.Plugindata, pluginVideoRoom, "configured") == "ok" && + event.Jsep != nil && event.Jsep["type"] == "offer" && event.Jsep["sdp"] != nil { + p.listener.OnUpdateOffer(p, event.Jsep) + } case "slow_link": // Ignore, processed through "handleSlowLink" in the general events. default: diff --git a/mcu_proxy.go b/mcu_proxy.go index 63edca6..016c5e0 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -107,6 +107,8 @@ func (c *mcuProxyPubSubCommon) doSendMessage(ctx context.Context, msg *ProxyClie func (c *mcuProxyPubSubCommon) doProcessPayload(client McuClient, msg *PayloadProxyServerMessage) { switch msg.Type { + case "offer": + c.listener.OnUpdateOffer(client, msg.Payload["offer"].(map[string]interface{})) case "candidate": c.listener.OnIceCandidate(client, msg.Payload["candidate"]) default: diff --git a/proxy/proxy_session.go b/proxy/proxy_session.go index 7f3f84c..560703f 100644 --- a/proxy/proxy_session.go +++ b/proxy/proxy_session.go @@ -122,6 +122,26 @@ func (s *ProxySession) SetClient(client *ProxyClient) *ProxyClient { return prev } +func (s *ProxySession) OnUpdateOffer(client signaling.McuClient, offer map[string]interface{}) { + id := s.proxy.GetClientId(client) + if id == "" { + log.Printf("Received offer %+v from unknown %s client %s (%+v)", offer, client.StreamType(), client.Id(), client) + return + } + + msg := &signaling.ProxyServerMessage{ + Type: "payload", + Payload: &signaling.PayloadProxyServerMessage{ + Type: "offer", + ClientId: id, + Payload: map[string]interface{}{ + "offer": offer, + }, + }, + } + s.sendMessage(msg) +} + func (s *ProxySession) OnIceCandidate(client signaling.McuClient, candidate interface{}) { id := s.proxy.GetClientId(client) if id == "" {