Support passing codec parameters when creating publishers.

This commit is contained in:
Joachim Bauch 2024-10-31 16:57:33 +01:00
commit a487afc909
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
13 changed files with 371 additions and 113 deletions

View file

@ -193,6 +193,16 @@ type ByeProxyServerMessage struct {
// Type "command"
type NewPublisherSettings struct {
Bitrate int `json:"bitrate,omitempty"`
MediaTypes MediaType `json:"mediatypes,omitempty"`
AudioCodec string `json:"audiocodec,omitempty"`
VideoCodec string `json:"videocodec,omitempty"`
VP9Profile string `json:"vp9_profile,omitempty"`
H264Profile string `json:"h264_profile,omitempty"`
}
type CommandProxyClientMessage struct {
Type string `json:"type"`
@ -200,8 +210,13 @@ type CommandProxyClientMessage struct {
StreamType StreamType `json:"streamType,omitempty"`
PublisherId string `json:"publisherId,omitempty"`
ClientId string `json:"clientId,omitempty"`
Bitrate int `json:"bitrate,omitempty"`
MediaTypes MediaType `json:"mediatypes,omitempty"`
// Deprecated: use PublisherSettings instead.
Bitrate int `json:"bitrate,omitempty"`
// Deprecated: use PublisherSettings instead.
MediaTypes MediaType `json:"mediatypes,omitempty"`
PublisherSettings *NewPublisherSettings `json:"publisherSettings,omitempty"`
RemoteUrl string `json:"remoteUrl,omitempty"`
remoteUrl *url.URL

View file

@ -700,9 +700,15 @@ type MessageClientMessageData struct {
Type string `json:"type"`
Sid string `json:"sid"`
RoomType string `json:"roomType"`
Bitrate int `json:"bitrate,omitempty"`
Payload map[string]interface{} `json:"payload"`
// Only supported if Type == "offer"
Bitrate int `json:"bitrate,omitempty"`
AudioCodec string `json:"audiocodec,omitempty"`
VideoCodec string `json:"videocodec,omitempty"`
VP9Profile string `json:"vp9profile,omitempty"`
H264Profile string `json:"h264profile,omitempty"`
offerSdp *sdp.SessionDescription // Only set if Type == "offer"
answerSdp *sdp.SessionDescription // Only set if Type == "answer"
}

View file

@ -903,7 +903,15 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea
s.mu.Unlock()
defer s.mu.Lock()
bitrate := data.Bitrate
settings := NewPublisherSettings{
Bitrate: data.Bitrate,
MediaTypes: mediaTypes,
AudioCodec: data.AudioCodec,
VideoCodec: data.VideoCodec,
VP9Profile: data.VP9Profile,
H264Profile: data.H264Profile,
}
if backend := s.Backend(); backend != nil {
var maxBitrate int
if streamType == StreamTypeScreen {
@ -911,14 +919,14 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea
} else {
maxBitrate = backend.maxStreamBitrate
}
if bitrate <= 0 {
bitrate = maxBitrate
} else if maxBitrate > 0 && bitrate > maxBitrate {
bitrate = maxBitrate
if settings.Bitrate <= 0 {
settings.Bitrate = maxBitrate
} else if maxBitrate > 0 && settings.Bitrate > maxBitrate {
settings.Bitrate = maxBitrate
}
}
var err error
publisher, err = mcu.NewPublisher(ctx, s, s.PublicId(), data.Sid, streamType, bitrate, mediaTypes, client)
publisher, err = mcu.NewPublisher(ctx, s, s.PublicId(), data.Sid, streamType, settings, client)
if err != nil {
return nil, err
}

View file

@ -180,7 +180,7 @@ func TestBandwidth_Client(t *testing.T) {
pub := mcu.GetPublisher(hello.Hello.SessionId)
require.NotNil(pub)
assert.Equal(bitrate, pub.bitrate)
assert.Equal(bitrate, pub.settings.Bitrate)
}
func TestBandwidth_Backend(t *testing.T) {
@ -261,7 +261,7 @@ func TestBandwidth_Backend(t *testing.T) {
} else {
expectBitrate = backend.maxScreenBitrate
}
assert.Equal(expectBitrate, pub.bitrate)
assert.Equal(expectBitrate, pub.settings.Bitrate)
})
}
}

View file

@ -129,7 +129,7 @@ type Mcu interface {
GetStats() interface{}
NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error)
NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, settings NewPublisherSettings, initiator McuInitiator) (McuPublisher, error)
NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType StreamType, initiator McuInitiator) (McuSubscriber, error)
}

View file

@ -478,7 +478,7 @@ func (m *mcuJanus) SubscriberDisconnected(id string, publisher string, streamTyp
}
}
func (m *mcuJanus) createPublisherRoom(ctx context.Context, handle *JanusHandle, id string, streamType StreamType, bitrate int) (uint64, int, error) {
func (m *mcuJanus) createPublisherRoom(ctx context.Context, handle *JanusHandle, id string, streamType StreamType, settings NewPublisherSettings) (uint64, int, error) {
create_msg := map[string]interface{}{
"request": "create",
"description": getStreamId(id, streamType),
@ -488,12 +488,25 @@ func (m *mcuJanus) createPublisherRoom(ctx context.Context, handle *JanusHandle,
// orientation changes in Firefox.
"videoorient_ext": false,
}
if codec := settings.AudioCodec; codec != "" {
create_msg["audiocodec"] = codec
}
if codec := settings.VideoCodec; codec != "" {
create_msg["videocodec"] = codec
}
if profile := settings.VP9Profile; profile != "" {
create_msg["vp9_profile"] = profile
}
if profile := settings.H264Profile; profile != "" {
create_msg["h264_profile"] = profile
}
var maxBitrate int
if streamType == StreamTypeScreen {
maxBitrate = int(m.settings.MaxScreenBitrate())
} else {
maxBitrate = int(m.settings.MaxStreamBitrate())
}
bitrate := settings.Bitrate
if bitrate <= 0 {
bitrate = maxBitrate
} else {
@ -520,7 +533,7 @@ func (m *mcuJanus) createPublisherRoom(ctx context.Context, handle *JanusHandle,
return roomId, bitrate, nil
}
func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, streamType StreamType, bitrate int) (*JanusHandle, uint64, uint64, int, error) {
func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, streamType StreamType, settings NewPublisherSettings) (*JanusHandle, uint64, uint64, int, error) {
session := m.session
if session == nil {
return nil, 0, 0, 0, ErrNotConnected
@ -532,7 +545,7 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st
log.Printf("Attached %s as publisher %d to plugin %s in session %d", streamType, handle.Id, pluginVideoRoom, session.Id)
roomId, bitrate, err := m.createPublisherRoom(ctx, handle, id, streamType, bitrate)
roomId, bitrate, err := m.createPublisherRoom(ctx, handle, id, streamType, settings)
if err != nil {
if _, err2 := handle.Detach(ctx); err2 != nil {
log.Printf("Error detaching handle %d: %s", handle.Id, err2)
@ -558,12 +571,12 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st
return handle, response.Session, roomId, bitrate, nil
}
func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, settings NewPublisherSettings, initiator McuInitiator) (McuPublisher, error) {
if _, found := streamTypeUserIds[streamType]; !found {
return nil, fmt.Errorf("Unsupported stream type %s", streamType)
}
handle, session, roomId, maxBitrate, err := m.getOrCreatePublisherHandle(ctx, id, streamType, bitrate)
handle, session, roomId, maxBitrate, err := m.getOrCreatePublisherHandle(ctx, id, streamType, settings)
if err != nil {
return nil, err
}
@ -585,10 +598,9 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
closeChan: make(chan struct{}, 1),
deferred: make(chan func(), 64),
},
sdpReady: NewCloser(),
id: id,
bitrate: bitrate,
mediaTypes: mediaTypes,
sdpReady: NewCloser(),
id: id,
settings: settings,
}
client.mcuJanusClient.handleEvent = client.handleEvent
client.mcuJanusClient.handleHangup = client.handleHangup
@ -698,7 +710,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ
return client, nil
}
func (m *mcuJanus) getOrCreateRemotePublisher(ctx context.Context, controller RemotePublisherController, streamType StreamType, bitrate int) (*mcuJanusRemotePublisher, error) {
func (m *mcuJanus) getOrCreateRemotePublisher(ctx context.Context, controller RemotePublisherController, streamType StreamType, settings NewPublisherSettings) (*mcuJanusRemotePublisher, error) {
m.mu.Lock()
defer m.mu.Unlock()
pub, found := m.remotePublishers[getStreamId(controller.PublisherId(), streamType)]
@ -725,7 +737,7 @@ func (m *mcuJanus) getOrCreateRemotePublisher(ctx context.Context, controller Re
return nil, err
}
roomId, bitrate, err := m.createPublisherRoom(ctx, handle, controller.PublisherId(), streamType, bitrate)
roomId, maxBitrate, err := m.createPublisherRoom(ctx, handle, controller.PublisherId(), streamType, settings)
if err != nil {
if _, err2 := handle.Detach(ctx); err2 != nil {
log.Printf("Error detaching handle %d: %s", handle.Id, err2)
@ -760,7 +772,7 @@ func (m *mcuJanus) getOrCreateRemotePublisher(ctx context.Context, controller Re
roomId: roomId,
sid: strconv.FormatUint(handle.Id, 10),
streamType: streamType,
maxBitrate: bitrate,
maxBitrate: maxBitrate,
handle: handle,
handleId: handle.Id,
@ -770,6 +782,7 @@ func (m *mcuJanus) getOrCreateRemotePublisher(ctx context.Context, controller Re
sdpReady: NewCloser(),
id: controller.PublisherId(),
settings: settings,
},
port: int(port),
@ -801,7 +814,7 @@ func (m *mcuJanus) NewRemotePublisher(ctx context.Context, listener McuListener,
return nil, ErrRemoteStreamsNotSupported
}
pub, err := m.getOrCreateRemotePublisher(ctx, controller, streamType, 0)
pub, err := m.getOrCreateRemotePublisher(ctx, controller, streamType, NewPublisherSettings{})
if err != nil {
return nil, err
}

View file

@ -47,14 +47,13 @@ const (
type mcuJanusPublisher struct {
mcuJanusClient
id string
bitrate int
mediaTypes MediaType
stats publisherStatsCounter
sdpFlags Flags
sdpReady *Closer
offerSdp atomic.Pointer[sdp.SessionDescription]
answerSdp atomic.Pointer[sdp.SessionDescription]
id string
settings NewPublisherSettings
stats publisherStatsCounter
sdpFlags Flags
sdpReady *Closer
offerSdp atomic.Pointer[sdp.SessionDescription]
answerSdp atomic.Pointer[sdp.SessionDescription]
}
func (p *mcuJanusPublisher) handleEvent(event *janus.EventMsg) {
@ -108,16 +107,16 @@ func (p *mcuJanusPublisher) handleMedia(event *janus.MediaMsg) {
}
func (p *mcuJanusPublisher) HasMedia(mt MediaType) bool {
return (p.mediaTypes & mt) == mt
return (p.settings.MediaTypes & mt) == mt
}
func (p *mcuJanusPublisher) SetMedia(mt MediaType) {
p.mediaTypes = mt
p.settings.MediaTypes = mt
}
func (p *mcuJanusPublisher) NotifyReconnected() {
ctx := context.TODO()
handle, session, roomId, _, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate)
handle, session, roomId, _, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.settings)
if err != nil {
log.Printf("Could not reconnect publisher %s: %s", p.id, err)
// TODO(jojo): Retry

View file

@ -96,7 +96,7 @@ func (p *mcuJanusRemotePublisher) handleSlowLink(event *janus.SlowLinkMsg) {
func (p *mcuJanusRemotePublisher) NotifyReconnected() {
ctx := context.TODO()
handle, session, roomId, _, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate)
handle, session, roomId, _, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.settings)
if err != nil {
log.Printf("Could not reconnect remote publisher %s: %s", p.id, err)
// TODO(jojo): Retry

View file

@ -134,11 +134,11 @@ func (c *mcuProxyPubSubCommon) doProcessPayload(client McuClient, msg *PayloadPr
type mcuProxyPublisher struct {
mcuProxyPubSubCommon
id string
mediaTypes MediaType
id string
settings NewPublisherSettings
}
func newMcuProxyPublisher(id string, sid string, streamType StreamType, maxBitrate int, mediaTypes MediaType, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxyPublisher {
func newMcuProxyPublisher(id string, sid string, streamType StreamType, maxBitrate int, settings NewPublisherSettings, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxyPublisher {
return &mcuProxyPublisher{
mcuProxyPubSubCommon: mcuProxyPubSubCommon{
sid: sid,
@ -148,18 +148,18 @@ func newMcuProxyPublisher(id string, sid string, streamType StreamType, maxBitra
conn: conn,
listener: listener,
},
id: id,
mediaTypes: mediaTypes,
id: id,
settings: settings,
}
}
func (p *mcuProxyPublisher) HasMedia(mt MediaType) bool {
return (p.mediaTypes & mt) == mt
return (p.settings.MediaTypes & mt) == mt
}
func (p *mcuProxyPublisher) SetMedia(mt MediaType) {
// TODO: Also update mediaTypes on proxy.
p.mediaTypes = mt
p.settings.MediaTypes = mt
}
func (p *mcuProxyPublisher) NotifyClosed() {
@ -1140,15 +1140,17 @@ func (c *mcuProxyConnection) performSyncRequest(ctx context.Context, msg *ProxyC
}
}
func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType) (McuPublisher, error) {
func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, settings NewPublisherSettings) (McuPublisher, error) {
msg := &ProxyClientMessage{
Type: "command",
Command: &CommandProxyClientMessage{
Type: "create-publisher",
Sid: sid,
StreamType: streamType,
Bitrate: bitrate,
MediaTypes: mediaTypes,
Type: "create-publisher",
Sid: sid,
StreamType: streamType,
PublisherSettings: &settings,
// Include for older version of the signaling proxy.
Bitrate: settings.Bitrate,
MediaTypes: settings.MediaTypes,
},
}
@ -1162,7 +1164,7 @@ func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListe
proxyId := response.Command.Id
log.Printf("Created %s publisher %s on %s for %s", streamType, proxyId, c, id)
publisher := newMcuProxyPublisher(id, sid, streamType, response.Command.Bitrate, mediaTypes, proxyId, c, listener)
publisher := newMcuProxyPublisher(id, sid, streamType, response.Command.Bitrate, settings, proxyId, c, listener)
c.publishersLock.Lock()
c.publishers[proxyId] = publisher
c.publisherIds[getStreamId(id, streamType)] = proxyId
@ -1770,17 +1772,19 @@ func (m *mcuProxy) removePublisher(publisher *mcuProxyPublisher) {
delete(m.publishers, getStreamId(publisher.id, publisher.StreamType()))
}
func (m *mcuProxy) createPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator, connections []*mcuProxyConnection, isAllowed func(c *mcuProxyConnection) bool) McuPublisher {
func (m *mcuProxy) createPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, settings NewPublisherSettings, initiator McuInitiator, connections []*mcuProxyConnection, isAllowed func(c *mcuProxyConnection) bool) McuPublisher {
var maxBitrate int
if streamType == StreamTypeScreen {
maxBitrate = int(m.settings.MaxScreenBitrate())
} else {
maxBitrate = int(m.settings.MaxStreamBitrate())
}
if bitrate <= 0 {
bitrate = maxBitrate
publisherSettings := settings
if publisherSettings.Bitrate <= 0 {
publisherSettings.Bitrate = maxBitrate
} else {
bitrate = min(bitrate, maxBitrate)
publisherSettings.Bitrate = min(publisherSettings.Bitrate, maxBitrate)
}
for _, conn := range connections {
@ -1791,7 +1795,7 @@ func (m *mcuProxy) createPublisher(ctx context.Context, listener McuListener, id
subctx, cancel := context.WithTimeout(ctx, m.settings.Timeout())
defer cancel()
publisher, err := conn.newPublisher(subctx, listener, id, sid, streamType, bitrate, mediaTypes)
publisher, err := conn.newPublisher(subctx, listener, id, sid, streamType, publisherSettings)
if err != nil {
log.Printf("Could not create %s publisher for %s on %s: %s", streamType, id, conn, err)
continue
@ -1807,9 +1811,9 @@ func (m *mcuProxy) createPublisher(ctx context.Context, listener McuListener, id
return nil
}
func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, settings NewPublisherSettings, initiator McuInitiator) (McuPublisher, error) {
connections := m.getSortedConnections(initiator)
publisher := m.createPublisher(ctx, listener, id, sid, streamType, bitrate, mediaTypes, initiator, connections, func(c *mcuProxyConnection) bool {
publisher := m.createPublisher(ctx, listener, id, sid, streamType, settings, initiator, connections, func(c *mcuProxyConnection) bool {
bw := c.Bandwidth()
return bw == nil || bw.AllowIncoming()
})
@ -1845,7 +1849,7 @@ func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id st
}
return 0
})
publisher = m.createPublisher(ctx, listener, id, sid, streamType, bitrate, mediaTypes, initiator, connections2, func(c *mcuProxyConnection) bool {
publisher = m.createPublisher(ctx, listener, id, sid, streamType, settings, initiator, connections2, func(c *mcuProxyConnection) bool {
return true
})
}

View file

@ -255,6 +255,20 @@ func (c *testProxyServerClient) processCommandMessage(msg *ProxyClientMessage) (
case "create-publisher":
pub := c.server.createPublisher()
if assert.NotNil(c.t, msg.Command.PublisherSettings) {
if assert.NotEqualValues(c.t, 0, msg.Command.PublisherSettings.Bitrate) {
assert.EqualValues(c.t, msg.Command.Bitrate, msg.Command.PublisherSettings.Bitrate)
}
assert.EqualValues(c.t, msg.Command.MediaTypes, msg.Command.PublisherSettings.MediaTypes)
if strings.Contains(c.t.Name(), "Codecs") {
assert.Equal(c.t, "opus,g722", msg.Command.PublisherSettings.AudioCodec)
assert.Equal(c.t, "vp9,vp8,av1", msg.Command.PublisherSettings.VideoCodec)
} else {
assert.Empty(c.t, msg.Command.PublisherSettings.AudioCodec)
assert.Empty(c.t, msg.Command.PublisherSettings.VideoCodec)
}
}
response = &ProxyServerMessage{
Id: msg.Id,
Type: "command",
@ -766,7 +780,9 @@ func Test_ProxyPublisherSubscriber(t *testing.T) {
country: "DE",
}
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubInitiator)
require.NoError(t, err)
defer pub.Close(context.Background())
@ -783,6 +799,33 @@ func Test_ProxyPublisherSubscriber(t *testing.T) {
defer sub.Close(context.Background())
}
func Test_ProxyPublisherCodecs(t *testing.T) {
CatchLogForTest(t)
t.Parallel()
mcu := newMcuProxyForTest(t)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
pubId := "the-publisher"
pubSid := "1234567890"
pubListener := &MockMcuListener{
publicId: pubId + "-public",
}
pubInitiator := &MockMcuInitiator{
country: "DE",
}
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
AudioCodec: "opus,g722",
VideoCodec: "vp9,vp8,av1",
}, pubInitiator)
require.NoError(t, err)
defer pub.Close(context.Background())
}
func Test_ProxyWaitForPublisher(t *testing.T) {
CatchLogForTest(t)
t.Parallel()
@ -820,7 +863,9 @@ func Test_ProxyWaitForPublisher(t *testing.T) {
// Give subscriber goroutine some time to start
time.Sleep(100 * time.Millisecond)
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubInitiator)
require.NoError(t, err)
select {
@ -852,7 +897,9 @@ func Test_ProxyPublisherBandwidth(t *testing.T) {
pub1Initiator := &MockMcuInitiator{
country: "DE",
}
pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub1Initiator)
pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pub1Initiator)
require.NoError(t, err)
defer pub1.Close(context.Background())
@ -889,7 +936,9 @@ func Test_ProxyPublisherBandwidth(t *testing.T) {
pub2Initiator := &MockMcuInitiator{
country: "DE",
}
pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub2Initiator)
pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pub2Initiator)
require.NoError(t, err)
defer pub2.Close(context.Background())
@ -918,7 +967,9 @@ func Test_ProxyPublisherBandwidthOverload(t *testing.T) {
pub1Initiator := &MockMcuInitiator{
country: "DE",
}
pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub1Initiator)
pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pub1Initiator)
require.NoError(t, err)
defer pub1.Close(context.Background())
@ -958,7 +1009,9 @@ func Test_ProxyPublisherBandwidthOverload(t *testing.T) {
pub2Initiator := &MockMcuInitiator{
country: "DE",
}
pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub2Initiator)
pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pub2Initiator)
require.NoError(t, err)
defer pub2.Close(context.Background())
@ -987,7 +1040,9 @@ func Test_ProxyPublisherLoad(t *testing.T) {
pub1Initiator := &MockMcuInitiator{
country: "DE",
}
pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub1Initiator)
pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pub1Initiator)
require.NoError(t, err)
defer pub1.Close(context.Background())
@ -1004,7 +1059,9 @@ func Test_ProxyPublisherLoad(t *testing.T) {
pub2Initiator := &MockMcuInitiator{
country: "DE",
}
pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub2Initiator)
pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pub2Initiator)
require.NoError(t, err)
defer pub2.Close(context.Background())
@ -1033,7 +1090,9 @@ func Test_ProxyPublisherCountry(t *testing.T) {
pubDEInitiator := &MockMcuInitiator{
country: "DE",
}
pubDE, err := mcu.NewPublisher(ctx, pubDEListener, pubDEId, pubDESid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubDEInitiator)
pubDE, err := mcu.NewPublisher(ctx, pubDEListener, pubDEId, pubDESid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubDEInitiator)
require.NoError(t, err)
defer pubDE.Close(context.Background())
@ -1048,7 +1107,9 @@ func Test_ProxyPublisherCountry(t *testing.T) {
pubUSInitiator := &MockMcuInitiator{
country: "US",
}
pubUS, err := mcu.NewPublisher(ctx, pubUSListener, pubUSId, pubUSSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubUSInitiator)
pubUS, err := mcu.NewPublisher(ctx, pubUSListener, pubUSId, pubUSSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubUSInitiator)
require.NoError(t, err)
defer pubUS.Close(context.Background())
@ -1077,7 +1138,9 @@ func Test_ProxyPublisherContinent(t *testing.T) {
pubDEInitiator := &MockMcuInitiator{
country: "DE",
}
pubDE, err := mcu.NewPublisher(ctx, pubDEListener, pubDEId, pubDESid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubDEInitiator)
pubDE, err := mcu.NewPublisher(ctx, pubDEListener, pubDEId, pubDESid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubDEInitiator)
require.NoError(t, err)
defer pubDE.Close(context.Background())
@ -1092,7 +1155,9 @@ func Test_ProxyPublisherContinent(t *testing.T) {
pubFRInitiator := &MockMcuInitiator{
country: "FR",
}
pubFR, err := mcu.NewPublisher(ctx, pubFRListener, pubFRId, pubFRSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubFRInitiator)
pubFR, err := mcu.NewPublisher(ctx, pubFRListener, pubFRId, pubFRSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubFRInitiator)
require.NoError(t, err)
defer pubFR.Close(context.Background())
@ -1121,7 +1186,9 @@ func Test_ProxySubscriberCountry(t *testing.T) {
pubInitiator := &MockMcuInitiator{
country: "DE",
}
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubInitiator)
require.NoError(t, err)
defer pub.Close(context.Background())
@ -1163,7 +1230,9 @@ func Test_ProxySubscriberContinent(t *testing.T) {
pubInitiator := &MockMcuInitiator{
country: "DE",
}
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubInitiator)
require.NoError(t, err)
defer pub.Close(context.Background())
@ -1205,7 +1274,9 @@ func Test_ProxySubscriberBandwidth(t *testing.T) {
pubInitiator := &MockMcuInitiator{
country: "DE",
}
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubInitiator)
require.NoError(t, err)
defer pub.Close(context.Background())
@ -1267,7 +1338,9 @@ func Test_ProxySubscriberBandwidthOverload(t *testing.T) {
pubInitiator := &MockMcuInitiator{
country: "DE",
}
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubInitiator)
require.NoError(t, err)
defer pub.Close(context.Background())
@ -1405,7 +1478,9 @@ func Test_ProxyRemotePublisher(t *testing.T) {
hub1.addSession(session1)
defer hub1.removeSession(session1)
pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubInitiator)
require.NoError(t, err)
defer pub.Close(context.Background())
@ -1502,7 +1577,9 @@ func Test_ProxyRemotePublisherWait(t *testing.T) {
// Give subscriber goroutine some time to start
time.Sleep(100 * time.Millisecond)
pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubInitiator)
require.NoError(t, err)
defer pub.Close(context.Background())
@ -1571,7 +1648,9 @@ func Test_ProxyRemotePublisherTemporary(t *testing.T) {
hub1.addSession(session1)
defer hub1.removeSession(session1)
pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubInitiator)
require.NoError(t, err)
defer pub.Close(context.Background())

View file

@ -70,17 +70,17 @@ func (m *TestMCU) GetStats() interface{} {
return nil
}
func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, settings NewPublisherSettings, initiator McuInitiator) (McuPublisher, error) {
var maxBitrate int
if streamType == StreamTypeScreen {
maxBitrate = TestMaxBitrateScreen
} else {
maxBitrate = TestMaxBitrateVideo
}
if bitrate <= 0 {
bitrate = maxBitrate
} else if bitrate > maxBitrate {
bitrate = maxBitrate
publisherSettings := settings
bitrate := publisherSettings.Bitrate
if bitrate <= 0 || bitrate > maxBitrate {
publisherSettings.Bitrate = maxBitrate
}
pub := &TestMCUPublisher{
TestMCUClient: TestMCUClient{
@ -89,8 +89,7 @@ func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id str
streamType: streamType,
},
mediaTypes: mediaTypes,
bitrate: bitrate,
settings: publisherSettings,
}
m.mu.Lock()
@ -176,18 +175,17 @@ func (c *TestMCUClient) isClosed() bool {
type TestMCUPublisher struct {
TestMCUClient
mediaTypes MediaType
bitrate int
settings NewPublisherSettings
sdp string
}
func (p *TestMCUPublisher) HasMedia(mt MediaType) bool {
return (p.mediaTypes & mt) == mt
return (p.settings.MediaTypes & mt) == mt
}
func (p *TestMCUPublisher) SetMedia(mt MediaType) {
p.mediaTypes = mt
p.settings.MediaTypes = mt
}
func (p *TestMCUPublisher) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) {

View file

@ -889,7 +889,14 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s
defer cancel()
id := uuid.New().String()
publisher, err := s.mcu.NewPublisher(ctx2, session, id, cmd.Sid, cmd.StreamType, cmd.Bitrate, cmd.MediaTypes, &emptyInitiator{})
settings := cmd.PublisherSettings
if settings == nil {
settings = &signaling.NewPublisherSettings{
Bitrate: cmd.Bitrate, // nolint
MediaTypes: cmd.MediaTypes, // nolint
}
}
publisher, err := s.mcu.NewPublisher(ctx2, session, id, cmd.Sid, cmd.StreamType, *settings, &emptyInitiator{})
if err == context.DeadlineExceeded {
log.Printf("Timeout while creating %s publisher %s for %s", cmd.StreamType, id, session.PublicId())
session.sendMessage(message.NewErrorServerMessage(TimeoutCreatingPublisher))

View file

@ -355,8 +355,88 @@ func TestProxyCreateSession(t *testing.T) {
assert.NoError(err)
}
type TestMCU struct {
t *testing.T
}
func (m *TestMCU) Start(ctx context.Context) error {
return nil
}
func (m *TestMCU) Stop() {
}
func (m *TestMCU) Reload(config *goconf.ConfigFile) {
}
func (m *TestMCU) SetOnConnected(f func()) {
}
func (m *TestMCU) SetOnDisconnected(f func()) {
}
func (m *TestMCU) GetStats() interface{} {
return nil
}
func (m *TestMCU) NewPublisher(ctx context.Context, listener signaling.McuListener, id string, sid string, streamType signaling.StreamType, settings signaling.NewPublisherSettings, initiator signaling.McuInitiator) (signaling.McuPublisher, error) {
return nil, errors.New("not implemented")
}
func (m *TestMCU) NewSubscriber(ctx context.Context, listener signaling.McuListener, publisher string, streamType signaling.StreamType, initiator signaling.McuInitiator) (signaling.McuSubscriber, error) {
return nil, errors.New("not implemented")
}
type TestMCUPublisher struct {
id string
sid string
streamType signaling.StreamType
}
func (p *TestMCUPublisher) Id() string {
return p.id
}
func (p *TestMCUPublisher) Sid() string {
return p.sid
}
func (p *TestMCUPublisher) StreamType() signaling.StreamType {
return p.streamType
}
func (p *TestMCUPublisher) MaxBitrate() int {
return 0
}
func (p *TestMCUPublisher) Close(ctx context.Context) {
}
func (p *TestMCUPublisher) SendMessage(ctx context.Context, message *signaling.MessageClientMessage, data *signaling.MessageClientMessageData, callback func(error, map[string]interface{})) {
callback(errors.New("not implemented"), nil)
}
func (p *TestMCUPublisher) HasMedia(signaling.MediaType) bool {
return false
}
func (p *TestMCUPublisher) SetMedia(mediaTypes signaling.MediaType) {
}
func (p *TestMCUPublisher) GetStreams(ctx context.Context) ([]signaling.PublisherStream, error) {
return nil, errors.New("not implemented")
}
func (p *TestMCUPublisher) PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error {
return errors.New("not implemented")
}
func (p *TestMCUPublisher) UnpublishRemote(ctx context.Context, remoteId string) error {
return errors.New("not implemented")
}
type HangingTestMCU struct {
t *testing.T
TestMCU
ctx context.Context
creating chan struct{}
created chan struct{}
@ -370,34 +450,16 @@ func NewHangingTestMCU(t *testing.T) *HangingTestMCU {
})
return &HangingTestMCU{
t: t,
TestMCU: TestMCU{
t: t,
},
ctx: ctx,
creating: make(chan struct{}),
created: make(chan struct{}),
}
}
func (m *HangingTestMCU) Start(ctx context.Context) error {
return nil
}
func (m *HangingTestMCU) Stop() {
}
func (m *HangingTestMCU) Reload(config *goconf.ConfigFile) {
}
func (m *HangingTestMCU) SetOnConnected(f func()) {
}
func (m *HangingTestMCU) SetOnDisconnected(f func()) {
}
func (m *HangingTestMCU) GetStats() interface{} {
return nil
}
func (m *HangingTestMCU) NewPublisher(ctx context.Context, listener signaling.McuListener, id string, sid string, streamType signaling.StreamType, bitrate int, mediaTypes signaling.MediaType, initiator signaling.McuInitiator) (signaling.McuPublisher, error) {
func (m *HangingTestMCU) NewPublisher(ctx context.Context, listener signaling.McuListener, id string, sid string, streamType signaling.StreamType, settings signaling.NewPublisherSettings, initiator signaling.McuInitiator) (signaling.McuPublisher, error) {
ctx2, cancel := context.WithTimeout(m.ctx, testTimeout*2)
defer cancel()
@ -489,3 +551,70 @@ func TestProxyCancelOnClose(t *testing.T) {
<-mcu.created
assert.True(mcu.cancelled.Load())
}
type CodecsTestMCU struct {
TestMCU
}
func NewCodecsTestMCU(t *testing.T) *CodecsTestMCU {
return &CodecsTestMCU{
TestMCU: TestMCU{
t: t,
},
}
}
func (m *CodecsTestMCU) NewPublisher(ctx context.Context, listener signaling.McuListener, id string, sid string, streamType signaling.StreamType, settings signaling.NewPublisherSettings, initiator signaling.McuInitiator) (signaling.McuPublisher, error) {
assert.Equal(m.t, "opus,g722", settings.AudioCodec)
assert.Equal(m.t, "vp9,vp8,av1", settings.VideoCodec)
return &TestMCUPublisher{
id: id,
sid: sid,
streamType: streamType,
}, nil
}
func TestProxyCodecs(t *testing.T) {
signaling.CatchLogForTest(t)
assert := assert.New(t)
require := require.New(t)
proxy, key, server := newProxyServerForTest(t)
mcu := NewCodecsTestMCU(t)
proxy.mcu = mcu
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client := NewProxyTestClient(ctx, t, server.URL)
defer client.CloseWithBye()
require.NoError(client.SendHello(key))
if hello, err := client.RunUntilHello(ctx); assert.NoError(err) {
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello)
}
_, err := client.RunUntilLoad(ctx, 0)
assert.NoError(err)
require.NoError(client.WriteJSON(&signaling.ProxyClientMessage{
Id: "2345",
Type: "command",
Command: &signaling.CommandProxyClientMessage{
Type: "create-publisher",
StreamType: signaling.StreamTypeVideo,
PublisherSettings: &signaling.NewPublisherSettings{
AudioCodec: "opus,g722",
VideoCodec: "vp9,vp8,av1",
},
},
}))
if message, err := client.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("2345", message.Id)
if err := checkMessageType(message, "command"); assert.NoError(err) {
assert.NotEmpty(message.Command.Id)
}
}
}