From 343c4634a1c3af8306f2fca98da4ceff2d882519 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 18 Apr 2024 16:48:10 +0200 Subject: [PATCH] Get list of remote streams from offer/answer SDP. --- api_proxy.go | 2 + mcu_common.go | 31 +++++ mcu_janus.go | 35 +++--- mcu_janus_publisher.go | 219 +++++++++++++++++++++++++++++++++++- mcu_janus_publisher_test.go | 92 +++++++++++++++ mcu_proxy.go | 4 + mcu_test.go | 4 + proxy/proxy_server.go | 50 +++++++- 8 files changed, 413 insertions(+), 24 deletions(-) create mode 100644 mcu_janus_publisher_test.go diff --git a/api_proxy.go b/api_proxy.go index c809e31..255c3c2 100644 --- a/api_proxy.go +++ b/api_proxy.go @@ -253,6 +253,8 @@ type CommandProxyServerMessage struct { Sid string `json:"sid,omitempty"` Bitrate int `json:"bitrate,omitempty"` + + Streams []PublisherStream `json:"streams,omitempty"` } // Type "payload" diff --git a/mcu_common.go b/mcu_common.go index 0df9cb0..017f703 100644 --- a/mcu_common.go +++ b/mcu_common.go @@ -79,10 +79,40 @@ type Mcu interface { NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType StreamType, initiator McuInitiator) (McuSubscriber, error) } +// PublisherStream contains the available properties when creating a +// remote publisher in Janus. +type PublisherStream struct { + Mid string `json:"mid"` + Mindex int `json:"mindex"` + Type string `json:"type"` + + Description string `json:"description,omitempty"` + Disabled bool `json:"disabled,omitempty"` + + // For types "audio" and "video" + Codec string `json:"codec,omitempty"` + + // For type "audio" + Stereo bool `json:"stereo,omitempty"` + Fec bool `json:"fec,omitempty"` + Dtx bool `json:"dtx,omitempty"` + + // For type "video" + Simulcast bool `json:"simulcast,omitempty"` + Svc bool `json:"svc,omitempty"` + + ProfileH264 string `json:"h264_profile,omitempty"` + ProfileVP9 string `json:"vp9_profile,omitempty"` + + ExtIdVideoOrientation int `json:"videoorient_ext_id,omitempty"` + ExtIdPlayoutDelay int `json:"playoutdelay_ext_id,omitempty"` +} + type RemotePublisherController interface { PublisherId() string StartPublishing(ctx context.Context, publisher McuRemotePublisherProperties) error + GetStreams(ctx context.Context) ([]PublisherStream, error) } type RemoteMcu interface { @@ -128,6 +158,7 @@ type McuPublisher interface { HasMedia(MediaType) bool SetMedia(MediaType) + GetStreams(ctx context.Context) ([]PublisherStream, error) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error } diff --git a/mcu_janus.go b/mcu_janus.go index 8af5cb4..c9895cb 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -576,6 +576,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st closeChan: make(chan struct{}, 1), deferred: make(chan func(), 64), }, + sdpReady: NewCloser(), id: id, bitrate: bitrate, mediaTypes: mediaTypes, @@ -696,6 +697,15 @@ func (m *mcuJanus) getOrCreateRemotePublisher(ctx context.Context, controller Re return pub, nil } + streams, err := controller.GetStreams(ctx) + if err != nil { + return nil, err + } + + if len(streams) == 0 { + return nil, errors.New("remote publisher has no streams") + } + session := m.session if session == nil { return nil, ErrNotConnected @@ -718,27 +728,7 @@ func (m *mcuJanus) getOrCreateRemotePublisher(ctx context.Context, controller Re "request": "add_remote_publisher", "room": roomId, "id": streamTypeUserIds[streamType], - "streams": []map[string]interface{}{ - { - "mid": "0", - "mindex": 0, - "type": "audio", - "codec": "opus", - "fec": true, - }, - { - "mid": "1", - "mindex": 1, - "type": "video", - "codec": "vp8", - "simulcast": true, - }, - { - "mid": "2", - "mindex": 2, - "type": "data", - }, - }, + "streams": streams, }) if err != nil { if _, err2 := handle.Detach(ctx); err2 != nil { @@ -769,7 +759,8 @@ func (m *mcuJanus) getOrCreateRemotePublisher(ctx context.Context, controller Re deferred: make(chan func(), 64), }, - id: controller.PublisherId(), + sdpReady: NewCloser(), + id: controller.PublisherId(), }, port: int(port), diff --git a/mcu_janus_publisher.go b/mcu_janus_publisher.go index ffb5b35..d897703 100644 --- a/mcu_janus_publisher.go +++ b/mcu_janus_publisher.go @@ -23,10 +23,25 @@ package signaling import ( "context" + "errors" "fmt" "log" + "strconv" + "strings" + "sync/atomic" "github.com/notedit/janus-go" + "github.com/pion/sdp/v3" +) + +const ( + ExtensionUrlPlayoutDelay = "http://www.webrtc.org/experiments/rtp-hdrext/playout-delay" + ExtensionUrlVideoOrientation = "urn:3gpp:video-orientation" +) + +const ( + sdpHasOffer = 1 + sdpHasAnswer = 2 ) type mcuJanusPublisher struct { @@ -36,6 +51,10 @@ type mcuJanusPublisher struct { bitrate int mediaTypes MediaType stats publisherStatsCounter + sdpFlags Flags + sdpReady *Closer + offerSdp atomic.Pointer[sdp.SessionDescription] + answerSdp atomic.Pointer[sdp.SessionDescription] } func (p *mcuJanusPublisher) handleEvent(event *janus.EventMsg) { @@ -154,9 +173,63 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() + sdpData, found := jsep_msg["sdp"] + if !found { + go callback(errors.New("No sdp found in offer"), nil) + return + } + + sdpString, ok := sdpData.(string) + if !ok { + go callback(errors.New("Invalid sdp found in offer"), nil) + return + } + + var offerSdp sdp.SessionDescription + if err := offerSdp.UnmarshalString(sdpString); err != nil { + log.Printf("Error parsing offer sdp %+v: %s", sdpString, err) + p.offerSdp.Store(nil) + p.sdpFlags.Remove(sdpHasOffer) + } else { + p.offerSdp.Store(&offerSdp) + p.sdpFlags.Add(sdpHasOffer) + if p.sdpFlags.Get() == sdpHasAnswer|sdpHasOffer { + p.sdpReady.Close() + } + } // TODO Tear down previous publisher and get a new one if sid does // not match? - p.sendOffer(msgctx, jsep_msg, callback) + p.sendOffer(msgctx, jsep_msg, func(err error, jsep map[string]interface{}) { + if err != nil { + callback(err, jsep) + return + } + + sdpData, found := jsep["sdp"] + if !found { + log.Printf("No sdp found in answer %+v", jsep) + } else { + sdpString, ok := sdpData.(string) + if !ok { + log.Printf("Invalid sdp found in answer %+v", jsep) + } else { + var answerSdp sdp.SessionDescription + if err := answerSdp.UnmarshalString(sdpString); err != nil { + log.Printf("Error parsing answer sdp %+v: %s", sdpString, err) + p.answerSdp.Store(nil) + p.sdpFlags.Remove(sdpHasAnswer) + } else { + p.answerSdp.Store(&answerSdp) + p.sdpFlags.Add(sdpHasAnswer) + if p.sdpFlags.Get() == sdpHasAnswer|sdpHasOffer { + p.sdpReady.Close() + } + } + } + } + + callback(nil, jsep) + }) } case "candidate": p.deferred <- func() { @@ -176,6 +249,150 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli } } +func getFmtpValue(fmtp string, key string) (string, bool) { + parts := strings.Split(fmtp, ";") + for _, part := range parts { + kv := strings.SplitN(part, "=", 2) + if len(kv) != 2 { + continue + } + + if strings.EqualFold(strings.TrimSpace(kv[0]), key) { + return strings.TrimSpace(kv[1]), true + } + + } + return "", false +} + +func (p *mcuJanusPublisher) GetStreams(ctx context.Context) ([]PublisherStream, error) { + offerSdp := p.offerSdp.Load() + answerSdp := p.answerSdp.Load() + if offerSdp == nil || answerSdp == nil { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-p.sdpReady.C: + offerSdp = p.offerSdp.Load() + answerSdp = p.answerSdp.Load() + if offerSdp == nil || answerSdp == nil { + // Only can happen on invalid SDPs. + return nil, errors.New("no offer and/or answer processed yet") + } + } + } + + var streams []PublisherStream + for idx, m := range answerSdp.MediaDescriptions { + mid, found := m.Attribute(sdp.AttrKeyMID) + if !found { + continue + } + + s := PublisherStream{ + Mid: mid, + Mindex: idx, + Type: m.MediaName.Media, + } + + if len(m.MediaName.Formats) == 0 { + continue + } + + if strings.EqualFold(s.Type, "application") && strings.EqualFold(m.MediaName.Formats[0], "webrtc-datachannel") { + s.Type = "data" + streams = append(streams, s) + continue + } + + pt, err := strconv.ParseInt(m.MediaName.Formats[0], 10, 8) + if err != nil { + continue + } + + answerCodec, err := answerSdp.GetCodecForPayloadType(uint8(pt)) + if err != nil { + continue + } + + if strings.EqualFold(s.Type, "audio") { + s.Codec = answerCodec.Name + if value, found := getFmtpValue(answerCodec.Fmtp, "useinbandfec"); found && value == "1" { + s.Fec = true + } + if value, found := getFmtpValue(answerCodec.Fmtp, "usedtx"); found && value == "1" { + s.Dtx = true + } + if value, found := getFmtpValue(answerCodec.Fmtp, "stereo"); found && value == "1" { + s.Stereo = true + } + } else if strings.EqualFold(s.Type, "video") { + s.Codec = answerCodec.Name + // TODO: Determine if SVC is used. + s.Svc = false + + if strings.EqualFold(answerCodec.Name, "vp9") { + // Parse VP9 profile from "profile-id=XXX" + // Exampe: "a=fmtp:98 profile-id=0" + if profile, found := getFmtpValue(answerCodec.Fmtp, "profile-id"); found { + s.ProfileVP9 = profile + } + } else if strings.EqualFold(answerCodec.Name, "h264") { + // Parse H.264 profile from "profile-level-id=XXX" + // Example: "a=fmtp:104 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f" + if profile, found := getFmtpValue(answerCodec.Fmtp, "profile-level-id"); found { + s.ProfileH264 = profile + } + } + + var extmap sdp.ExtMap + for _, a := range m.Attributes { + switch a.Key { + case sdp.AttrKeyExtMap: + if err := extmap.Unmarshal(extmap.Name() + ":" + a.Value); err != nil { + log.Printf("Error parsing extmap %s: %s", a.Value, err) + continue + } + + switch extmap.URI.String() { + case ExtensionUrlPlayoutDelay: + s.ExtIdPlayoutDelay = extmap.Value + case ExtensionUrlVideoOrientation: + s.ExtIdVideoOrientation = extmap.Value + } + case "simulcast": + s.Simulcast = true + case sdp.AttrKeySSRCGroup: + if strings.HasPrefix(a.Value, "SIM ") { + s.Simulcast = true + } + } + } + + for _, a := range offerSdp.MediaDescriptions[idx].Attributes { + switch a.Key { + case "simulcast": + s.Simulcast = true + case sdp.AttrKeySSRCGroup: + if strings.HasPrefix(a.Value, "SIM ") { + s.Simulcast = true + } + } + } + + } else if strings.EqualFold(s.Type, "data") { // nolint + // Already handled above. + } else { + log.Printf("Skip type %s", s.Type) + continue + } + + streams = append(streams, s) + } + + return streams, nil +} + func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error { msg := map[string]interface{}{ "request": "publish_remotely", diff --git a/mcu_janus_publisher_test.go b/mcu_janus_publisher_test.go new file mode 100644 index 0000000..dd81e79 --- /dev/null +++ b/mcu_janus_publisher_test.go @@ -0,0 +1,92 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2024 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "testing" +) + +func TestGetFmtpValueH264(t *testing.T) { + testcases := []struct { + fmtp string + profile string + }{ + { + "", + "", + }, + { + "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f", + "42001f", + }, + { + "level-asymmetry-allowed=1;packetization-mode=0", + "", + }, + { + "level-asymmetry-allowed=1; packetization-mode=0; profile-level-id = 42001f", + "42001f", + }, + } + + for _, tc := range testcases { + value, found := getFmtpValue(tc.fmtp, "profile-level-id") + if !found && tc.profile != "" { + t.Errorf("did not find profile \"%s\" in \"%s\"", tc.profile, tc.fmtp) + } else if found && tc.profile == "" { + t.Errorf("did not expect profile in \"%s\" but got \"%s\"", tc.fmtp, value) + } else if found && tc.profile != value { + t.Errorf("expected profile \"%s\" in \"%s\" but got \"%s\"", tc.profile, tc.fmtp, value) + } + } +} + +func TestGetFmtpValueVP9(t *testing.T) { + testcases := []struct { + fmtp string + profile string + }{ + { + "", + "", + }, + { + "profile-id=0", + "0", + }, + { + "profile-id = 0", + "0", + }, + } + + for _, tc := range testcases { + value, found := getFmtpValue(tc.fmtp, "profile-id") + if !found && tc.profile != "" { + t.Errorf("did not find profile \"%s\" in \"%s\"", tc.profile, tc.fmtp) + } else if found && tc.profile == "" { + t.Errorf("did not expect profile in \"%s\" but got \"%s\"", tc.fmtp, value) + } else if found && tc.profile != value { + t.Errorf("expected profile \"%s\" in \"%s\" but got \"%s\"", tc.profile, tc.fmtp, value) + } + } +} diff --git a/mcu_proxy.go b/mcu_proxy.go index 4b5ba8b..edf911d 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -217,6 +217,10 @@ func (p *mcuProxyPublisher) ProcessEvent(msg *EventProxyServerMessage) { } } +func (p *mcuProxyPublisher) GetStreams(ctx context.Context) ([]PublisherStream, error) { + return nil, errors.New("not implemented") +} + func (p *mcuProxyPublisher) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error { return errors.New("remote publishing not supported for proxy publishers") } diff --git a/mcu_test.go b/mcu_test.go index bf6e43b..4ab7fea 100644 --- a/mcu_test.go +++ b/mcu_test.go @@ -223,6 +223,10 @@ func (p *TestMCUPublisher) SendMessage(ctx context.Context, message *MessageClie }() } +func (p *TestMCUPublisher) GetStreams(ctx context.Context) ([]PublisherStream, error) { + return nil, errors.New("not implemented") +} + func (p *TestMCUPublisher) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error { return errors.New("remote publishing not supported") } diff --git a/proxy/proxy_server.go b/proxy/proxy_server.go index 2e8d061..9e76bed 100644 --- a/proxy/proxy_server.go +++ b/proxy/proxy_server.go @@ -724,7 +724,6 @@ func (p *proxyRemotePublisher) PublisherId() string { } func (p *proxyRemotePublisher) StartPublishing(ctx context.Context, publisher signaling.McuRemotePublisherProperties) error { - var conn *RemoteConnection conn, err := p.proxy.getRemoteConnection(ctx, p.remoteUrl) if err != nil { return err @@ -746,6 +745,26 @@ func (p *proxyRemotePublisher) StartPublishing(ctx context.Context, publisher si return nil } +func (p *proxyRemotePublisher) GetStreams(ctx context.Context) ([]signaling.PublisherStream, error) { + conn, err := p.proxy.getRemoteConnection(ctx, p.remoteUrl) + if err != nil { + return nil, err + } + + response, err := conn.RequestMessage(ctx, &signaling.ProxyClientMessage{ + Type: "command", + Command: &signaling.CommandProxyClientMessage{ + Type: "get-publisher-streams", + ClientId: p.publisherId, + }, + }) + if err != nil { + return nil, err + } + + return response.Command.Streams, nil +} + func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, session *ProxySession, message *signaling.ProxyClientMessage) { cmd := message.Command @@ -967,6 +986,35 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s }, } session.sendMessage(response) + case "get-publisher-streams": + client := s.GetClient(cmd.ClientId) + if client == nil { + session.sendMessage(message.NewErrorServerMessage(UnknownClient)) + return + } + + publisher, ok := client.(signaling.McuPublisher) + if !ok { + session.sendMessage(message.NewErrorServerMessage(UnknownClient)) + return + } + + streams, err := publisher.GetStreams(ctx) + if err != nil { + log.Printf("Could not get streams of publisher %s: %s", publisher.Id(), err) + session.sendMessage(message.NewWrappedErrorServerMessage(err)) + return + } + + response := &signaling.ProxyServerMessage{ + Id: message.Id, + Type: "command", + Command: &signaling.CommandProxyServerMessage{ + Id: cmd.ClientId, + Streams: streams, + }, + } + session.sendMessage(response) default: log.Printf("Unsupported command %+v", message.Command) session.sendMessage(message.NewErrorServerMessage(UnsupportedCommand))