diff --git a/api_signaling.go b/api_signaling.go index a20eed3..f528d86 100644 --- a/api_signaling.go +++ b/api_signaling.go @@ -647,4 +647,5 @@ type AnswerOfferMessage struct { Type string `json:"type"` RoomType string `json:"roomType"` Payload map[string]interface{} `json:"payload"` + Update bool `json:"update,omitempty"` } diff --git a/clientsession.go b/clientsession.go index 0e24176..7907ef9 100644 --- a/clientsession.go +++ b/clientsession.go @@ -563,6 +563,34 @@ func (s *ClientSession) SetClient(client *Client) *Client { return prev } +func (s *ClientSession) sendOffer(client McuClient, sender string, streamType string, offer map[string]interface{}) { + offer_message := &AnswerOfferMessage{ + To: s.PublicId(), + From: sender, + Type: "offer", + RoomType: streamType, + Payload: offer, + Update: true, + } + offer_data, err := json.Marshal(offer_message) + if err != nil { + log.Println("Could not serialize offer", offer_message, err) + return + } + response_message := &ServerMessage{ + Type: "message", + Message: &MessageServerMessage{ + Sender: &MessageServerMessageSender{ + Type: "session", + SessionId: sender, + }, + Data: (*json.RawMessage)(&offer_data), + }, + } + + s.sendMessageUnlocked(response_message) +} + func (s *ClientSession) sendCandidate(client McuClient, sender string, streamType string, candidate interface{}) { candidate_message := &AnswerOfferMessage{ To: s.PublicId(), @@ -628,6 +656,18 @@ func (s *ClientSession) SendMessages(messages []*ServerMessage) bool { return true } +func (s *ClientSession) OnUpdateOffer(client McuClient, offer map[string]interface{}) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, sub := range s.subscribers { + if sub.Id() == client.Id() { + s.sendOffer(client, sub.Publisher(), client.StreamType(), offer) + return + } + } +} + func (s *ClientSession) OnIceCandidate(client McuClient, candidate interface{}) { s.mu.Lock() defer s.mu.Unlock() diff --git a/mcu_common.go b/mcu_common.go index a5bf7a9..b1d99bc 100644 --- a/mcu_common.go +++ b/mcu_common.go @@ -50,6 +50,8 @@ const ( type McuListener interface { PublicId() string + OnUpdateOffer(client McuClient, offer map[string]interface{}) + OnIceCandidate(client McuClient, candidate interface{}) OnIceCompleted(client McuClient) diff --git a/mcu_janus.go b/mcu_janus.go index 3e147ec..5b70156 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -1063,7 +1063,12 @@ func (p *mcuJanusSubscriber) handleEvent(event *janus.EventMsg) { log.Printf("Subscriber %d: associated room has been destroyed, closing", p.handleId) go p.Close(ctx) case "event": - // Ignore events like selected substream / temporal layer. + // Handle renegotiations, but ignore other events like selected + // substream / temporal layer. + if getPluginStringValue(event.Plugindata, pluginVideoRoom, "configured") == "ok" && + event.Jsep != nil && event.Jsep["type"] == "offer" && event.Jsep["sdp"] != nil { + p.listener.OnUpdateOffer(p, event.Jsep) + } case "slow_link": // Ignore, processed through "handleSlowLink" in the general events. default: diff --git a/mcu_proxy.go b/mcu_proxy.go index 63edca6..016c5e0 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -107,6 +107,8 @@ func (c *mcuProxyPubSubCommon) doSendMessage(ctx context.Context, msg *ProxyClie func (c *mcuProxyPubSubCommon) doProcessPayload(client McuClient, msg *PayloadProxyServerMessage) { switch msg.Type { + case "offer": + c.listener.OnUpdateOffer(client, msg.Payload["offer"].(map[string]interface{})) case "candidate": c.listener.OnIceCandidate(client, msg.Payload["candidate"]) default: diff --git a/proxy/proxy_session.go b/proxy/proxy_session.go index 7f3f84c..560703f 100644 --- a/proxy/proxy_session.go +++ b/proxy/proxy_session.go @@ -122,6 +122,26 @@ func (s *ProxySession) SetClient(client *ProxyClient) *ProxyClient { return prev } +func (s *ProxySession) OnUpdateOffer(client signaling.McuClient, offer map[string]interface{}) { + id := s.proxy.GetClientId(client) + if id == "" { + log.Printf("Received offer %+v from unknown %s client %s (%+v)", offer, client.StreamType(), client.Id(), client) + return + } + + msg := &signaling.ProxyServerMessage{ + Type: "payload", + Payload: &signaling.PayloadProxyServerMessage{ + Type: "offer", + ClientId: id, + Payload: map[string]interface{}{ + "offer": offer, + }, + }, + } + s.sendMessage(msg) +} + func (s *ProxySession) OnIceCandidate(client signaling.McuClient, candidate interface{}) { id := s.proxy.GetClientId(client) if id == "" {