Merge pull request #104 from strukturag/stream-selection-api

Add API to select a simulcast substream / temporal layer.
This commit is contained in:
Joachim Bauch 2021-07-01 08:54:11 +02:00 committed by GitHub
commit e40e86f32c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 89 additions and 1 deletions

View file

@ -323,7 +323,8 @@ func (m *HelloClientMessage) CheckValid() error {
const (
// Features for all clients.
ServerFeatureMcu = "mcu"
ServerFeatureMcu = "mcu"
ServerFeatureSimulcast = "simulcast"
// Features for internal clients only.
ServerFeatureInternalVirtualSessions = "virtual-sessions"

14
hub.go
View file

@ -356,11 +356,15 @@ func (h *Hub) SetMcu(mcu Mcu) {
h.mcu = mcu
if mcu == nil {
removeFeature(h.info, ServerFeatureMcu)
removeFeature(h.info, ServerFeatureSimulcast)
removeFeature(h.infoInternal, ServerFeatureMcu)
removeFeature(h.infoInternal, ServerFeatureSimulcast)
} else {
log.Printf("Using a timeout of %s for MCU requests", h.mcuTimeout)
addFeature(h.info, ServerFeatureMcu)
addFeature(h.info, ServerFeatureSimulcast)
addFeature(h.infoInternal, ServerFeatureMcu)
addFeature(h.infoInternal, ServerFeatureSimulcast)
}
}
@ -1240,6 +1244,8 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) {
fallthrough
case "endOfCandidates":
fallthrough
case "selectStream":
fallthrough
case "candidate":
h.processMcuMessage(session, session, message, msg, &data)
return
@ -1669,6 +1675,14 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes
clientType = "publisher"
mc, err = session.GetOrCreatePublisher(ctx, h.mcu, data.RoomType)
case "selectStream":
if session.PublicId() == message.Recipient.SessionId {
log.Printf("Not selecting substream for own %s stream in session %s", data.RoomType, session.PublicId())
return
}
clientType = "subscriber"
mc = session.GetSubscriber(message.Recipient.SessionId, data.RoomType)
default:
if session.PublicId() == message.Recipient.SessionId {
if !isAllowedToSend(session, data) {

View file

@ -572,6 +572,36 @@ func (c *mcuJanusClient) handleTrickle(event *TrickleMsg) {
}
}
func (c *mcuJanusClient) selectStream(ctx context.Context, substream int, temporal int, callback func(error, map[string]interface{})) {
handle := c.handle
if handle == nil {
callback(ErrNotConnected, nil)
return
}
if substream < 0 && temporal < 0 {
callback(nil, nil)
return
}
configure_msg := map[string]interface{}{
"request": "configure",
}
if substream >= 0 {
configure_msg["substream"] = substream
}
if temporal >= 0 {
configure_msg["temporal"] = temporal
}
_, err := handle.Message(ctx, configure_msg, nil)
if err != nil {
callback(err, nil)
return
}
callback(nil, nil)
}
type mcuJanusPublisher struct {
mcuJanusClient
@ -1079,6 +1109,47 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl
}
case "endOfCandidates":
// Ignore
case "selectStream":
substream := -1
if s, found := jsep_msg["substream"]; found {
switch s := s.(type) {
case int:
substream = s
case float32:
substream = int(s)
case float64:
substream = int(s)
default:
go callback(fmt.Errorf("Unsupported substream value: %v", s), nil)
return
}
}
temporal := -1
if s, found := jsep_msg["temporal"]; found {
switch s := s.(type) {
case int:
temporal = s
case float32:
temporal = int(s)
case float64:
temporal = int(s)
default:
go callback(fmt.Errorf("Unsupported temporal value: %v", s), nil)
return
}
}
if substream == -1 && temporal == -1 {
// Nothing to do
go callback(nil, nil)
return
}
p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel()
p.selectStream(msgctx, substream, temporal, callback)
}
default:
// Return error asynchronously
go callback(fmt.Errorf("Unsupported message type: %s", data.Type), nil)

View file

@ -756,6 +756,8 @@ func (s *ProxyServer) processPayload(ctx context.Context, client *ProxyClient, s
fallthrough
case "answer":
fallthrough
case "selectStream":
fallthrough
case "candidate":
mcuData = &signaling.MessageClientMessageData{
Type: payload.Type,