Merge pull request #195 from danxuliu/send-updated-offers-to-subscribers-after-publisher-renegotiations

Send updated offers to subscribers after publisher renegotiations
This commit is contained in:
Joachim Bauch 2022-02-25 09:16:10 +01:00 committed by GitHub
commit 8fd9c688b6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 76 additions and 1 deletions

View file

@ -335,6 +335,7 @@ const (
// Features for all clients.
ServerFeatureMcu = "mcu"
ServerFeatureSimulcast = "simulcast"
ServerFeatureUpdateSdp = "update-sdp"
ServerFeatureAudioVideoPermissions = "audio-video-permissions"
ServerFeatureTransientData = "transient-data"
@ -648,6 +649,7 @@ type AnswerOfferMessage struct {
Type string `json:"type"`
RoomType string `json:"roomType"`
Payload map[string]interface{} `json:"payload"`
Update bool `json:"update,omitempty"`
}
// Type "transient"

View file

@ -564,6 +564,34 @@ func (s *ClientSession) SetClient(client *Client) *Client {
return prev
}
func (s *ClientSession) sendOffer(client McuClient, sender string, streamType string, offer map[string]interface{}) {
offer_message := &AnswerOfferMessage{
To: s.PublicId(),
From: sender,
Type: "offer",
RoomType: streamType,
Payload: offer,
Update: true,
}
offer_data, err := json.Marshal(offer_message)
if err != nil {
log.Println("Could not serialize offer", offer_message, err)
return
}
response_message := &ServerMessage{
Type: "message",
Message: &MessageServerMessage{
Sender: &MessageServerMessageSender{
Type: "session",
SessionId: sender,
},
Data: (*json.RawMessage)(&offer_data),
},
}
s.sendMessageUnlocked(response_message)
}
func (s *ClientSession) sendCandidate(client McuClient, sender string, streamType string, candidate interface{}) {
candidate_message := &AnswerOfferMessage{
To: s.PublicId(),
@ -629,6 +657,18 @@ func (s *ClientSession) SendMessages(messages []*ServerMessage) bool {
return true
}
func (s *ClientSession) OnUpdateOffer(client McuClient, offer map[string]interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
for _, sub := range s.subscribers {
if sub.Id() == client.Id() {
s.sendOffer(client, sub.Publisher(), client.StreamType(), offer)
return
}
}
}
func (s *ClientSession) OnIceCandidate(client McuClient, candidate interface{}) {
s.mu.Lock()
defer s.mu.Unlock()

4
hub.go
View file

@ -371,14 +371,18 @@ func (h *Hub) SetMcu(mcu Mcu) {
if mcu == nil {
removeFeature(h.info, ServerFeatureMcu)
removeFeature(h.info, ServerFeatureSimulcast)
removeFeature(h.info, ServerFeatureUpdateSdp)
removeFeature(h.infoInternal, ServerFeatureMcu)
removeFeature(h.infoInternal, ServerFeatureSimulcast)
removeFeature(h.infoInternal, ServerFeatureUpdateSdp)
} else {
log.Printf("Using a timeout of %s for MCU requests", h.mcuTimeout)
addFeature(h.info, ServerFeatureMcu)
addFeature(h.info, ServerFeatureSimulcast)
addFeature(h.info, ServerFeatureUpdateSdp)
addFeature(h.infoInternal, ServerFeatureMcu)
addFeature(h.infoInternal, ServerFeatureSimulcast)
addFeature(h.infoInternal, ServerFeatureUpdateSdp)
}
}

View file

@ -50,6 +50,8 @@ const (
type McuListener interface {
PublicId() string
OnUpdateOffer(client McuClient, offer map[string]interface{})
OnIceCandidate(client McuClient, candidate interface{})
OnIceCompleted(client McuClient)

View file

@ -1062,7 +1062,12 @@ func (p *mcuJanusSubscriber) handleEvent(event *janus.EventMsg) {
log.Printf("Subscriber %d: associated room has been destroyed, closing", p.handleId)
go p.Close(ctx)
case "event":
// Ignore events like selected substream / temporal layer.
// Handle renegotiations, but ignore other events like selected
// substream / temporal layer.
if getPluginStringValue(event.Plugindata, pluginVideoRoom, "configured") == "ok" &&
event.Jsep != nil && event.Jsep["type"] == "offer" && event.Jsep["sdp"] != nil {
p.listener.OnUpdateOffer(p, event.Jsep)
}
case "slow_link":
// Ignore, processed through "handleSlowLink" in the general events.
default:

View file

@ -107,6 +107,8 @@ func (c *mcuProxyPubSubCommon) doSendMessage(ctx context.Context, msg *ProxyClie
func (c *mcuProxyPubSubCommon) doProcessPayload(client McuClient, msg *PayloadProxyServerMessage) {
switch msg.Type {
case "offer":
c.listener.OnUpdateOffer(client, msg.Payload["offer"].(map[string]interface{}))
case "candidate":
c.listener.OnIceCandidate(client, msg.Payload["candidate"])
default:

View file

@ -122,6 +122,26 @@ func (s *ProxySession) SetClient(client *ProxyClient) *ProxyClient {
return prev
}
func (s *ProxySession) OnUpdateOffer(client signaling.McuClient, offer map[string]interface{}) {
id := s.proxy.GetClientId(client)
if id == "" {
log.Printf("Received offer %+v from unknown %s client %s (%+v)", offer, client.StreamType(), client.Id(), client)
return
}
msg := &signaling.ProxyServerMessage{
Type: "payload",
Payload: &signaling.PayloadProxyServerMessage{
Type: "offer",
ClientId: id,
Payload: map[string]interface{}{
"offer": offer,
},
},
}
s.sendMessage(msg)
}
func (s *ProxySession) OnIceCandidate(client signaling.McuClient, candidate interface{}) {
id := s.proxy.GetClientId(client)
if id == "" {