Add API to select a simulcast substream / temporal layer.

This commit is contained in:
Joachim Bauch 2021-04-29 08:53:22 +02:00
parent 6886bb9b53
commit 0a8ce3bda3
No known key found for this signature in database
GPG Key ID: 77C1D22D53E15F02
3 changed files with 83 additions and 0 deletions

10
hub.go
View File

@ -1211,6 +1211,8 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) {
fallthrough
case "endOfCandidates":
fallthrough
case "selectStream":
fallthrough
case "candidate":
h.processMcuMessage(session, session, message, msg, &data)
return
@ -1640,6 +1642,14 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes
clientType = "publisher"
mc, err = session.GetOrCreatePublisher(ctx, h.mcu, data.RoomType)
case "selectStream":
if session.PublicId() == message.Recipient.SessionId {
log.Printf("Not selecting substream for own %s stream in session %s", data.RoomType, session.PublicId())
return
}
clientType = "subscriber"
mc = session.GetSubscriber(message.Recipient.SessionId, data.RoomType)
default:
if session.PublicId() == message.Recipient.SessionId {
if !isAllowedToSend(session, data) {

View File

@ -572,6 +572,36 @@ func (c *mcuJanusClient) handleTrickle(event *TrickleMsg) {
}
}
func (c *mcuJanusClient) selectStream(ctx context.Context, substream int, temporal int, callback func(error, map[string]interface{})) {
handle := c.handle
if handle == nil {
callback(ErrNotConnected, nil)
return
}
if substream < 0 && temporal < 0 {
callback(nil, nil)
return
}
configure_msg := map[string]interface{}{
"request": "configure",
}
if substream >= 0 {
configure_msg["substream"] = substream
}
if temporal >= 0 {
configure_msg["temporal"] = temporal
}
_, err := handle.Message(ctx, configure_msg, nil)
if err != nil {
callback(err, nil)
return
}
callback(nil, nil)
}
type mcuJanusPublisher struct {
mcuJanusClient
@ -1079,6 +1109,47 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl
}
case "endOfCandidates":
// Ignore
case "selectStream":
substream := -1
if s, found := jsep_msg["substream"]; found {
switch s := s.(type) {
case int:
substream = s
case float32:
substream = int(s)
case float64:
substream = int(s)
default:
go callback(fmt.Errorf("Unsupported substream value: %v", s), nil)
return
}
}
temporal := -1
if s, found := jsep_msg["temporal"]; found {
switch s := s.(type) {
case int:
temporal = s
case float32:
temporal = int(s)
case float64:
temporal = int(s)
default:
go callback(fmt.Errorf("Unsupported temporal value: %v", s), nil)
return
}
}
if substream == -1 && temporal == -1 {
// Nothing to do
go callback(nil, nil)
return
}
p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel()
p.selectStream(msgctx, substream, temporal, callback)
}
default:
// Return error asynchronously
go callback(fmt.Errorf("Unsupported message type: %s", data.Type), nil)

View File

@ -756,6 +756,8 @@ func (s *ProxyServer) processPayload(ctx context.Context, client *ProxyClient, s
fallthrough
case "answer":
fallthrough
case "selectStream":
fallthrough
case "candidate":
mcuData = &signaling.MessageClientMessageData{
Type: payload.Type,