diff --git a/api_signaling.go b/api_signaling.go index caae520..5976c33 100644 --- a/api_signaling.go +++ b/api_signaling.go @@ -335,6 +335,7 @@ const ( // Features for all clients. ServerFeatureMcu = "mcu" ServerFeatureSimulcast = "simulcast" + ServerFeatureUpdateSdp = "update-sdp" ServerFeatureAudioVideoPermissions = "audio-video-permissions" ServerFeatureTransientData = "transient-data" @@ -648,6 +649,7 @@ type AnswerOfferMessage struct { Type string `json:"type"` RoomType string `json:"roomType"` Payload map[string]interface{} `json:"payload"` + Update bool `json:"update,omitempty"` } // Type "transient" diff --git a/clientsession.go b/clientsession.go index 410c0ac..2d1b476 100644 --- a/clientsession.go +++ b/clientsession.go @@ -564,6 +564,34 @@ func (s *ClientSession) SetClient(client *Client) *Client { return prev } +func (s *ClientSession) sendOffer(client McuClient, sender string, streamType string, offer map[string]interface{}) { + offer_message := &AnswerOfferMessage{ + To: s.PublicId(), + From: sender, + Type: "offer", + RoomType: streamType, + Payload: offer, + Update: true, + } + offer_data, err := json.Marshal(offer_message) + if err != nil { + log.Println("Could not serialize offer", offer_message, err) + return + } + response_message := &ServerMessage{ + Type: "message", + Message: &MessageServerMessage{ + Sender: &MessageServerMessageSender{ + Type: "session", + SessionId: sender, + }, + Data: (*json.RawMessage)(&offer_data), + }, + } + + s.sendMessageUnlocked(response_message) +} + func (s *ClientSession) sendCandidate(client McuClient, sender string, streamType string, candidate interface{}) { candidate_message := &AnswerOfferMessage{ To: s.PublicId(), @@ -629,6 +657,18 @@ func (s *ClientSession) SendMessages(messages []*ServerMessage) bool { return true } +func (s *ClientSession) OnUpdateOffer(client McuClient, offer map[string]interface{}) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, sub := range s.subscribers { + if sub.Id() == client.Id() { + s.sendOffer(client, sub.Publisher(), client.StreamType(), offer) + return + } + } +} + func (s *ClientSession) OnIceCandidate(client McuClient, candidate interface{}) { s.mu.Lock() defer s.mu.Unlock() diff --git a/hub.go b/hub.go index 80e4b95..b1e4acc 100644 --- a/hub.go +++ b/hub.go @@ -371,14 +371,18 @@ func (h *Hub) SetMcu(mcu Mcu) { if mcu == nil { removeFeature(h.info, ServerFeatureMcu) removeFeature(h.info, ServerFeatureSimulcast) + removeFeature(h.info, ServerFeatureUpdateSdp) removeFeature(h.infoInternal, ServerFeatureMcu) removeFeature(h.infoInternal, ServerFeatureSimulcast) + removeFeature(h.infoInternal, ServerFeatureUpdateSdp) } else { log.Printf("Using a timeout of %s for MCU requests", h.mcuTimeout) addFeature(h.info, ServerFeatureMcu) addFeature(h.info, ServerFeatureSimulcast) + addFeature(h.info, ServerFeatureUpdateSdp) addFeature(h.infoInternal, ServerFeatureMcu) addFeature(h.infoInternal, ServerFeatureSimulcast) + addFeature(h.infoInternal, ServerFeatureUpdateSdp) } } diff --git a/mcu_common.go b/mcu_common.go index a5bf7a9..b1d99bc 100644 --- a/mcu_common.go +++ b/mcu_common.go @@ -50,6 +50,8 @@ const ( type McuListener interface { PublicId() string + OnUpdateOffer(client McuClient, offer map[string]interface{}) + OnIceCandidate(client McuClient, candidate interface{}) OnIceCompleted(client McuClient) diff --git a/mcu_janus.go b/mcu_janus.go index 0c43b49..f4b6f45 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -1062,7 +1062,12 @@ func (p *mcuJanusSubscriber) handleEvent(event *janus.EventMsg) { log.Printf("Subscriber %d: associated room has been destroyed, closing", p.handleId) go p.Close(ctx) case "event": - // Ignore events like selected substream / temporal layer. + // Handle renegotiations, but ignore other events like selected + // substream / temporal layer. + if getPluginStringValue(event.Plugindata, pluginVideoRoom, "configured") == "ok" && + event.Jsep != nil && event.Jsep["type"] == "offer" && event.Jsep["sdp"] != nil { + p.listener.OnUpdateOffer(p, event.Jsep) + } case "slow_link": // Ignore, processed through "handleSlowLink" in the general events. default: diff --git a/mcu_proxy.go b/mcu_proxy.go index 63edca6..016c5e0 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -107,6 +107,8 @@ func (c *mcuProxyPubSubCommon) doSendMessage(ctx context.Context, msg *ProxyClie func (c *mcuProxyPubSubCommon) doProcessPayload(client McuClient, msg *PayloadProxyServerMessage) { switch msg.Type { + case "offer": + c.listener.OnUpdateOffer(client, msg.Payload["offer"].(map[string]interface{})) case "candidate": c.listener.OnIceCandidate(client, msg.Payload["candidate"]) default: diff --git a/proxy/proxy_session.go b/proxy/proxy_session.go index 7f3f84c..560703f 100644 --- a/proxy/proxy_session.go +++ b/proxy/proxy_session.go @@ -122,6 +122,26 @@ func (s *ProxySession) SetClient(client *ProxyClient) *ProxyClient { return prev } +func (s *ProxySession) OnUpdateOffer(client signaling.McuClient, offer map[string]interface{}) { + id := s.proxy.GetClientId(client) + if id == "" { + log.Printf("Received offer %+v from unknown %s client %s (%+v)", offer, client.StreamType(), client.Id(), client) + return + } + + msg := &signaling.ProxyServerMessage{ + Type: "payload", + Payload: &signaling.PayloadProxyServerMessage{ + Type: "offer", + ClientId: id, + Payload: map[string]interface{}{ + "offer": offer, + }, + }, + } + s.sendMessage(msg) +} + func (s *ProxySession) OnIceCandidate(client signaling.McuClient, candidate interface{}) { id := s.proxy.GetClientId(client) if id == "" {