diff --git a/hub.go b/hub.go index 2e13719..55824fa 100644 --- a/hub.go +++ b/hub.go @@ -1211,6 +1211,8 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { fallthrough case "endOfCandidates": fallthrough + case "selectStream": + fallthrough case "candidate": h.processMcuMessage(session, session, message, msg, &data) return @@ -1640,6 +1642,14 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes clientType = "publisher" mc, err = session.GetOrCreatePublisher(ctx, h.mcu, data.RoomType) + case "selectStream": + if session.PublicId() == message.Recipient.SessionId { + log.Printf("Not selecting substream for own %s stream in session %s", data.RoomType, session.PublicId()) + return + } + + clientType = "subscriber" + mc = session.GetSubscriber(message.Recipient.SessionId, data.RoomType) default: if session.PublicId() == message.Recipient.SessionId { if !isAllowedToSend(session, data) { diff --git a/mcu_janus.go b/mcu_janus.go index f02510b..1f2c1a9 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -572,6 +572,36 @@ func (c *mcuJanusClient) handleTrickle(event *TrickleMsg) { } } +func (c *mcuJanusClient) selectStream(ctx context.Context, substream int, temporal int, callback func(error, map[string]interface{})) { + handle := c.handle + if handle == nil { + callback(ErrNotConnected, nil) + return + } + + if substream < 0 && temporal < 0 { + callback(nil, nil) + return + } + + configure_msg := map[string]interface{}{ + "request": "configure", + } + if substream >= 0 { + configure_msg["substream"] = substream + } + if temporal >= 0 { + configure_msg["temporal"] = temporal + } + _, err := handle.Message(ctx, configure_msg, nil) + if err != nil { + callback(err, nil) + return + } + + callback(nil, nil) +} + type mcuJanusPublisher struct { mcuJanusClient @@ -1079,6 +1109,47 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl } case "endOfCandidates": // Ignore + case "selectStream": + substream := -1 + if s, found := jsep_msg["substream"]; found { + switch s := s.(type) { + case int: + substream = s + case float32: + substream = int(s) + case float64: + substream = int(s) + default: + go callback(fmt.Errorf("Unsupported substream value: %v", s), nil) + return + } + } + temporal := -1 + if s, found := jsep_msg["temporal"]; found { + switch s := s.(type) { + case int: + temporal = s + case float32: + temporal = int(s) + case float64: + temporal = int(s) + default: + go callback(fmt.Errorf("Unsupported temporal value: %v", s), nil) + return + } + } + if substream == -1 && temporal == -1 { + // Nothing to do + go callback(nil, nil) + return + } + + p.deferred <- func() { + msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) + defer cancel() + + p.selectStream(msgctx, substream, temporal, callback) + } default: // Return error asynchronously go callback(fmt.Errorf("Unsupported message type: %s", data.Type), nil) diff --git a/proxy/proxy_server.go b/proxy/proxy_server.go index bbea72b..58e2030 100644 --- a/proxy/proxy_server.go +++ b/proxy/proxy_server.go @@ -756,6 +756,8 @@ func (s *ProxyServer) processPayload(ctx context.Context, client *ProxyClient, s fallthrough case "answer": fallthrough + case "selectStream": + fallthrough case "candidate": mcuData = &signaling.MessageClientMessageData{ Type: payload.Type,