Merge pull request #853 from strukturag/codec-config

Support passing codec parameters when creating publishers.
This commit is contained in:
Joachim Bauch 2024-11-04 13:51:17 +01:00 committed by GitHub
commit 47abaa151c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 793 additions and 271 deletions

View file

@ -141,8 +141,6 @@ func easyjson9289e183DecodeGithubComStrukturagNextcloudSpreedSignaling1(in *jlex
out.Sid = string(in.String())
case "roomType":
out.RoomType = string(in.String())
case "bitrate":
out.Bitrate = int(in.Int())
case "payload":
if in.IsNull() {
in.Skip()
@ -165,6 +163,16 @@ func easyjson9289e183DecodeGithubComStrukturagNextcloudSpreedSignaling1(in *jlex
}
in.Delim('}')
}
case "bitrate":
out.Bitrate = int(in.Int())
case "audiocodec":
out.AudioCodec = string(in.String())
case "videocodec":
out.VideoCodec = string(in.String())
case "vp9profile":
out.VP9Profile = string(in.String())
case "h264profile":
out.H264Profile = string(in.String())
default:
in.SkipRecursive()
}
@ -194,11 +202,6 @@ func easyjson9289e183EncodeGithubComStrukturagNextcloudSpreedSignaling1(out *jwr
out.RawString(prefix)
out.String(string(in.RoomType))
}
if in.Bitrate != 0 {
const prefix string = ",\"bitrate\":"
out.RawString(prefix)
out.Int(int(in.Bitrate))
}
{
const prefix string = ",\"payload\":"
out.RawString(prefix)
@ -226,6 +229,31 @@ func easyjson9289e183EncodeGithubComStrukturagNextcloudSpreedSignaling1(out *jwr
out.RawByte('}')
}
}
if in.Bitrate != 0 {
const prefix string = ",\"bitrate\":"
out.RawString(prefix)
out.Int(int(in.Bitrate))
}
if in.AudioCodec != "" {
const prefix string = ",\"audiocodec\":"
out.RawString(prefix)
out.String(string(in.AudioCodec))
}
if in.VideoCodec != "" {
const prefix string = ",\"videocodec\":"
out.RawString(prefix)
out.String(string(in.VideoCodec))
}
if in.VP9Profile != "" {
const prefix string = ",\"vp9profile\":"
out.RawString(prefix)
out.String(string(in.VP9Profile))
}
if in.H264Profile != "" {
const prefix string = ",\"h264profile\":"
out.RawString(prefix)
out.String(string(in.H264Profile))
}
out.RawByte('}')
}
func easyjson9289e183DecodeGithubComStrukturagNextcloudSpreedSignaling2(in *jlexer.Lexer, out *AsyncRoomMessage) {

View file

@ -193,6 +193,16 @@ type ByeProxyServerMessage struct {
// Type "command"
type NewPublisherSettings struct {
Bitrate int `json:"bitrate,omitempty"`
MediaTypes MediaType `json:"mediatypes,omitempty"`
AudioCodec string `json:"audiocodec,omitempty"`
VideoCodec string `json:"videocodec,omitempty"`
VP9Profile string `json:"vp9_profile,omitempty"`
H264Profile string `json:"h264_profile,omitempty"`
}
type CommandProxyClientMessage struct {
Type string `json:"type"`
@ -200,8 +210,13 @@ type CommandProxyClientMessage struct {
StreamType StreamType `json:"streamType,omitempty"`
PublisherId string `json:"publisherId,omitempty"`
ClientId string `json:"clientId,omitempty"`
Bitrate int `json:"bitrate,omitempty"`
MediaTypes MediaType `json:"mediatypes,omitempty"`
// Deprecated: use PublisherSettings instead.
Bitrate int `json:"bitrate,omitempty"`
// Deprecated: use PublisherSettings instead.
MediaTypes MediaType `json:"mediatypes,omitempty"`
PublisherSettings *NewPublisherSettings `json:"publisherSettings,omitempty"`
RemoteUrl string `json:"remoteUrl,omitempty"`
remoteUrl *url.URL

View file

@ -874,7 +874,134 @@ func (v *PayloadProxyClientMessage) UnmarshalJSON(data []byte) error {
func (v *PayloadProxyClientMessage) UnmarshalEasyJSON(l *jlexer.Lexer) {
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling6(l, v)
}
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling7(in *jlexer.Lexer, out *HelloProxyServerMessage) {
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling7(in *jlexer.Lexer, out *NewPublisherSettings) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
in.Consumed()
}
in.Skip()
return
}
in.Delim('{')
for !in.IsDelim('}') {
key := in.UnsafeFieldName(false)
in.WantColon()
if in.IsNull() {
in.Skip()
in.WantComma()
continue
}
switch key {
case "bitrate":
out.Bitrate = int(in.Int())
case "mediatypes":
out.MediaTypes = MediaType(in.Int())
case "audiocodec":
out.AudioCodec = string(in.String())
case "videocodec":
out.VideoCodec = string(in.String())
case "vp9_profile":
out.VP9Profile = string(in.String())
case "h264_profile":
out.H264Profile = string(in.String())
default:
in.SkipRecursive()
}
in.WantComma()
}
in.Delim('}')
if isTopLevel {
in.Consumed()
}
}
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling7(out *jwriter.Writer, in NewPublisherSettings) {
out.RawByte('{')
first := true
_ = first
if in.Bitrate != 0 {
const prefix string = ",\"bitrate\":"
first = false
out.RawString(prefix[1:])
out.Int(int(in.Bitrate))
}
if in.MediaTypes != 0 {
const prefix string = ",\"mediatypes\":"
if first {
first = false
out.RawString(prefix[1:])
} else {
out.RawString(prefix)
}
out.Int(int(in.MediaTypes))
}
if in.AudioCodec != "" {
const prefix string = ",\"audiocodec\":"
if first {
first = false
out.RawString(prefix[1:])
} else {
out.RawString(prefix)
}
out.String(string(in.AudioCodec))
}
if in.VideoCodec != "" {
const prefix string = ",\"videocodec\":"
if first {
first = false
out.RawString(prefix[1:])
} else {
out.RawString(prefix)
}
out.String(string(in.VideoCodec))
}
if in.VP9Profile != "" {
const prefix string = ",\"vp9_profile\":"
if first {
first = false
out.RawString(prefix[1:])
} else {
out.RawString(prefix)
}
out.String(string(in.VP9Profile))
}
if in.H264Profile != "" {
const prefix string = ",\"h264_profile\":"
if first {
first = false
out.RawString(prefix[1:])
} else {
out.RawString(prefix)
}
out.String(string(in.H264Profile))
}
out.RawByte('}')
}
// MarshalJSON supports json.Marshaler interface
func (v NewPublisherSettings) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling7(&w, v)
return w.Buffer.BuildBytes(), w.Error
}
// MarshalEasyJSON supports easyjson.Marshaler interface
func (v NewPublisherSettings) MarshalEasyJSON(w *jwriter.Writer) {
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling7(w, v)
}
// UnmarshalJSON supports json.Unmarshaler interface
func (v *NewPublisherSettings) UnmarshalJSON(data []byte) error {
r := jlexer.Lexer{Data: data}
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling7(&r, v)
return r.Error()
}
// UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (v *NewPublisherSettings) UnmarshalEasyJSON(l *jlexer.Lexer) {
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling7(l, v)
}
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling8(in *jlexer.Lexer, out *HelloProxyServerMessage) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
@ -905,7 +1032,7 @@ func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling7(in *jlex
if out.Server == nil {
out.Server = new(WelcomeServerMessage)
}
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling8(in, out.Server)
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling9(in, out.Server)
}
default:
in.SkipRecursive()
@ -917,7 +1044,7 @@ func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling7(in *jlex
in.Consumed()
}
}
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling7(out *jwriter.Writer, in HelloProxyServerMessage) {
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling8(out *jwriter.Writer, in HelloProxyServerMessage) {
out.RawByte('{')
first := true
_ = first
@ -934,7 +1061,7 @@ func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling7(out *jwr
if in.Server != nil {
const prefix string = ",\"server\":"
out.RawString(prefix)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling8(out, *in.Server)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling9(out, *in.Server)
}
out.RawByte('}')
}
@ -942,27 +1069,27 @@ func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling7(out *jwr
// MarshalJSON supports json.Marshaler interface
func (v HelloProxyServerMessage) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling7(&w, v)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling8(&w, v)
return w.Buffer.BuildBytes(), w.Error
}
// MarshalEasyJSON supports easyjson.Marshaler interface
func (v HelloProxyServerMessage) MarshalEasyJSON(w *jwriter.Writer) {
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling7(w, v)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling8(w, v)
}
// UnmarshalJSON supports json.Unmarshaler interface
func (v *HelloProxyServerMessage) UnmarshalJSON(data []byte) error {
r := jlexer.Lexer{Data: data}
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling7(&r, v)
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling8(&r, v)
return r.Error()
}
// UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (v *HelloProxyServerMessage) UnmarshalEasyJSON(l *jlexer.Lexer) {
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling7(l, v)
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling8(l, v)
}
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling8(in *jlexer.Lexer, out *WelcomeServerMessage) {
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling9(in *jlexer.Lexer, out *WelcomeServerMessage) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
@ -1018,7 +1145,7 @@ func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling8(in *jlex
in.Consumed()
}
}
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling8(out *jwriter.Writer, in WelcomeServerMessage) {
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling9(out *jwriter.Writer, in WelcomeServerMessage) {
out.RawByte('{')
first := true
_ = first
@ -1048,7 +1175,7 @@ func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling8(out *jwr
}
out.RawByte('}')
}
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling9(in *jlexer.Lexer, out *HelloProxyClientMessage) {
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling10(in *jlexer.Lexer, out *HelloProxyClientMessage) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
@ -1106,7 +1233,7 @@ func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling9(in *jlex
in.Consumed()
}
}
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling9(out *jwriter.Writer, in HelloProxyClientMessage) {
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling10(out *jwriter.Writer, in HelloProxyClientMessage) {
out.RawByte('{')
first := true
_ = first
@ -1145,27 +1272,27 @@ func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling9(out *jwr
// MarshalJSON supports json.Marshaler interface
func (v HelloProxyClientMessage) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling9(&w, v)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling10(&w, v)
return w.Buffer.BuildBytes(), w.Error
}
// MarshalEasyJSON supports easyjson.Marshaler interface
func (v HelloProxyClientMessage) MarshalEasyJSON(w *jwriter.Writer) {
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling9(w, v)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling10(w, v)
}
// UnmarshalJSON supports json.Unmarshaler interface
func (v *HelloProxyClientMessage) UnmarshalJSON(data []byte) error {
r := jlexer.Lexer{Data: data}
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling9(&r, v)
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling10(&r, v)
return r.Error()
}
// UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (v *HelloProxyClientMessage) UnmarshalEasyJSON(l *jlexer.Lexer) {
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling9(l, v)
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling10(l, v)
}
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling10(in *jlexer.Lexer, out *EventProxyServerMessage) {
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling11(in *jlexer.Lexer, out *EventProxyServerMessage) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
@ -1212,7 +1339,7 @@ func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling10(in *jle
in.Consumed()
}
}
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling10(out *jwriter.Writer, in EventProxyServerMessage) {
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling11(out *jwriter.Writer, in EventProxyServerMessage) {
out.RawByte('{')
first := true
_ = first
@ -1247,27 +1374,27 @@ func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling10(out *jw
// MarshalJSON supports json.Marshaler interface
func (v EventProxyServerMessage) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling10(&w, v)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling11(&w, v)
return w.Buffer.BuildBytes(), w.Error
}
// MarshalEasyJSON supports easyjson.Marshaler interface
func (v EventProxyServerMessage) MarshalEasyJSON(w *jwriter.Writer) {
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling10(w, v)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling11(w, v)
}
// UnmarshalJSON supports json.Unmarshaler interface
func (v *EventProxyServerMessage) UnmarshalJSON(data []byte) error {
r := jlexer.Lexer{Data: data}
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling10(&r, v)
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling11(&r, v)
return r.Error()
}
// UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (v *EventProxyServerMessage) UnmarshalEasyJSON(l *jlexer.Lexer) {
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling10(l, v)
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling11(l, v)
}
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling11(in *jlexer.Lexer, out *EventProxyServerBandwidth) {
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling12(in *jlexer.Lexer, out *EventProxyServerBandwidth) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
@ -1316,7 +1443,7 @@ func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling11(in *jle
in.Consumed()
}
}
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling11(out *jwriter.Writer, in EventProxyServerBandwidth) {
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling12(out *jwriter.Writer, in EventProxyServerBandwidth) {
out.RawByte('{')
first := true
_ = first
@ -1342,27 +1469,27 @@ func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling11(out *jw
// MarshalJSON supports json.Marshaler interface
func (v EventProxyServerBandwidth) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling11(&w, v)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling12(&w, v)
return w.Buffer.BuildBytes(), w.Error
}
// MarshalEasyJSON supports easyjson.Marshaler interface
func (v EventProxyServerBandwidth) MarshalEasyJSON(w *jwriter.Writer) {
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling11(w, v)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling12(w, v)
}
// UnmarshalJSON supports json.Unmarshaler interface
func (v *EventProxyServerBandwidth) UnmarshalJSON(data []byte) error {
r := jlexer.Lexer{Data: data}
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling11(&r, v)
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling12(&r, v)
return r.Error()
}
// UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (v *EventProxyServerBandwidth) UnmarshalEasyJSON(l *jlexer.Lexer) {
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling11(l, v)
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling12(l, v)
}
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling12(in *jlexer.Lexer, out *CommandProxyServerMessage) {
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling13(in *jlexer.Lexer, out *CommandProxyServerMessage) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
@ -1404,7 +1531,7 @@ func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling12(in *jle
}
for !in.IsDelim(']') {
var v11 PublisherStream
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling13(in, &v11)
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling14(in, &v11)
out.Streams = append(out.Streams, v11)
in.WantComma()
}
@ -1420,7 +1547,7 @@ func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling12(in *jle
in.Consumed()
}
}
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling12(out *jwriter.Writer, in CommandProxyServerMessage) {
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling13(out *jwriter.Writer, in CommandProxyServerMessage) {
out.RawByte('{')
first := true
_ = first
@ -1464,7 +1591,7 @@ func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling12(out *jw
if v12 > 0 {
out.RawByte(',')
}
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling13(out, v13)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling14(out, v13)
}
out.RawByte(']')
}
@ -1475,27 +1602,27 @@ func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling12(out *jw
// MarshalJSON supports json.Marshaler interface
func (v CommandProxyServerMessage) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling12(&w, v)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling13(&w, v)
return w.Buffer.BuildBytes(), w.Error
}
// MarshalEasyJSON supports easyjson.Marshaler interface
func (v CommandProxyServerMessage) MarshalEasyJSON(w *jwriter.Writer) {
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling12(w, v)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling13(w, v)
}
// UnmarshalJSON supports json.Unmarshaler interface
func (v *CommandProxyServerMessage) UnmarshalJSON(data []byte) error {
r := jlexer.Lexer{Data: data}
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling12(&r, v)
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling13(&r, v)
return r.Error()
}
// UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (v *CommandProxyServerMessage) UnmarshalEasyJSON(l *jlexer.Lexer) {
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling12(l, v)
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling13(l, v)
}
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling13(in *jlexer.Lexer, out *PublisherStream) {
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling14(in *jlexer.Lexer, out *PublisherStream) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
@ -1554,7 +1681,7 @@ func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling13(in *jle
in.Consumed()
}
}
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling13(out *jwriter.Writer, in PublisherStream) {
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling14(out *jwriter.Writer, in PublisherStream) {
out.RawByte('{')
first := true
_ = first
@ -1635,7 +1762,7 @@ func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling13(out *jw
}
out.RawByte('}')
}
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling14(in *jlexer.Lexer, out *CommandProxyClientMessage) {
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling15(in *jlexer.Lexer, out *CommandProxyClientMessage) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
@ -1668,6 +1795,16 @@ func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling14(in *jle
out.Bitrate = int(in.Int())
case "mediatypes":
out.MediaTypes = MediaType(in.Int())
case "publisherSettings":
if in.IsNull() {
in.Skip()
out.PublisherSettings = nil
} else {
if out.PublisherSettings == nil {
out.PublisherSettings = new(NewPublisherSettings)
}
(*out.PublisherSettings).UnmarshalEasyJSON(in)
}
case "remoteUrl":
out.RemoteUrl = string(in.String())
case "remoteToken":
@ -1688,7 +1825,7 @@ func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling14(in *jle
in.Consumed()
}
}
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling14(out *jwriter.Writer, in CommandProxyClientMessage) {
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling15(out *jwriter.Writer, in CommandProxyClientMessage) {
out.RawByte('{')
first := true
_ = first
@ -1727,6 +1864,11 @@ func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling14(out *jw
out.RawString(prefix)
out.Int(int(in.MediaTypes))
}
if in.PublisherSettings != nil {
const prefix string = ",\"publisherSettings\":"
out.RawString(prefix)
(*in.PublisherSettings).MarshalEasyJSON(out)
}
if in.RemoteUrl != "" {
const prefix string = ",\"remoteUrl\":"
out.RawString(prefix)
@ -1758,27 +1900,27 @@ func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling14(out *jw
// MarshalJSON supports json.Marshaler interface
func (v CommandProxyClientMessage) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling14(&w, v)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling15(&w, v)
return w.Buffer.BuildBytes(), w.Error
}
// MarshalEasyJSON supports easyjson.Marshaler interface
func (v CommandProxyClientMessage) MarshalEasyJSON(w *jwriter.Writer) {
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling14(w, v)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling15(w, v)
}
// UnmarshalJSON supports json.Unmarshaler interface
func (v *CommandProxyClientMessage) UnmarshalJSON(data []byte) error {
r := jlexer.Lexer{Data: data}
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling14(&r, v)
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling15(&r, v)
return r.Error()
}
// UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (v *CommandProxyClientMessage) UnmarshalEasyJSON(l *jlexer.Lexer) {
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling14(l, v)
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling15(l, v)
}
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling15(in *jlexer.Lexer, out *ByeProxyServerMessage) {
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling16(in *jlexer.Lexer, out *ByeProxyServerMessage) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
@ -1809,7 +1951,7 @@ func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling15(in *jle
in.Consumed()
}
}
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling15(out *jwriter.Writer, in ByeProxyServerMessage) {
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling16(out *jwriter.Writer, in ByeProxyServerMessage) {
out.RawByte('{')
first := true
_ = first
@ -1824,27 +1966,27 @@ func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling15(out *jw
// MarshalJSON supports json.Marshaler interface
func (v ByeProxyServerMessage) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling15(&w, v)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling16(&w, v)
return w.Buffer.BuildBytes(), w.Error
}
// MarshalEasyJSON supports easyjson.Marshaler interface
func (v ByeProxyServerMessage) MarshalEasyJSON(w *jwriter.Writer) {
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling15(w, v)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling16(w, v)
}
// UnmarshalJSON supports json.Unmarshaler interface
func (v *ByeProxyServerMessage) UnmarshalJSON(data []byte) error {
r := jlexer.Lexer{Data: data}
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling15(&r, v)
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling16(&r, v)
return r.Error()
}
// UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (v *ByeProxyServerMessage) UnmarshalEasyJSON(l *jlexer.Lexer) {
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling15(l, v)
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling16(l, v)
}
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling16(in *jlexer.Lexer, out *ByeProxyClientMessage) {
func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling17(in *jlexer.Lexer, out *ByeProxyClientMessage) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
@ -1873,7 +2015,7 @@ func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling16(in *jle
in.Consumed()
}
}
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling16(out *jwriter.Writer, in ByeProxyClientMessage) {
func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling17(out *jwriter.Writer, in ByeProxyClientMessage) {
out.RawByte('{')
first := true
_ = first
@ -1883,23 +2025,23 @@ func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling16(out *jw
// MarshalJSON supports json.Marshaler interface
func (v ByeProxyClientMessage) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling16(&w, v)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling17(&w, v)
return w.Buffer.BuildBytes(), w.Error
}
// MarshalEasyJSON supports easyjson.Marshaler interface
func (v ByeProxyClientMessage) MarshalEasyJSON(w *jwriter.Writer) {
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling16(w, v)
easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling17(w, v)
}
// UnmarshalJSON supports json.Unmarshaler interface
func (v *ByeProxyClientMessage) UnmarshalJSON(data []byte) error {
r := jlexer.Lexer{Data: data}
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling16(&r, v)
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling17(&r, v)
return r.Error()
}
// UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (v *ByeProxyClientMessage) UnmarshalEasyJSON(l *jlexer.Lexer) {
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling16(l, v)
easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling17(l, v)
}

View file

@ -700,9 +700,15 @@ type MessageClientMessageData struct {
Type string `json:"type"`
Sid string `json:"sid"`
RoomType string `json:"roomType"`
Bitrate int `json:"bitrate,omitempty"`
Payload map[string]interface{} `json:"payload"`
// Only supported if Type == "offer"
Bitrate int `json:"bitrate,omitempty"`
AudioCodec string `json:"audiocodec,omitempty"`
VideoCodec string `json:"videocodec,omitempty"`
VP9Profile string `json:"vp9profile,omitempty"`
H264Profile string `json:"h264profile,omitempty"`
offerSdp *sdp.SessionDescription // Only set if Type == "offer"
answerSdp *sdp.SessionDescription // Only set if Type == "answer"
}

View file

@ -2464,8 +2464,6 @@ func easyjson29f189fbDecodeGithubComStrukturagNextcloudSpreedSignaling21(in *jle
out.Sid = string(in.String())
case "roomType":
out.RoomType = string(in.String())
case "bitrate":
out.Bitrate = int(in.Int())
case "payload":
if in.IsNull() {
in.Skip()
@ -2488,6 +2486,16 @@ func easyjson29f189fbDecodeGithubComStrukturagNextcloudSpreedSignaling21(in *jle
}
in.Delim('}')
}
case "bitrate":
out.Bitrate = int(in.Int())
case "audiocodec":
out.AudioCodec = string(in.String())
case "videocodec":
out.VideoCodec = string(in.String())
case "vp9profile":
out.VP9Profile = string(in.String())
case "h264profile":
out.H264Profile = string(in.String())
default:
in.SkipRecursive()
}
@ -2517,11 +2525,6 @@ func easyjson29f189fbEncodeGithubComStrukturagNextcloudSpreedSignaling21(out *jw
out.RawString(prefix)
out.String(string(in.RoomType))
}
if in.Bitrate != 0 {
const prefix string = ",\"bitrate\":"
out.RawString(prefix)
out.Int(int(in.Bitrate))
}
{
const prefix string = ",\"payload\":"
out.RawString(prefix)
@ -2549,6 +2552,31 @@ func easyjson29f189fbEncodeGithubComStrukturagNextcloudSpreedSignaling21(out *jw
out.RawByte('}')
}
}
if in.Bitrate != 0 {
const prefix string = ",\"bitrate\":"
out.RawString(prefix)
out.Int(int(in.Bitrate))
}
if in.AudioCodec != "" {
const prefix string = ",\"audiocodec\":"
out.RawString(prefix)
out.String(string(in.AudioCodec))
}
if in.VideoCodec != "" {
const prefix string = ",\"videocodec\":"
out.RawString(prefix)
out.String(string(in.VideoCodec))
}
if in.VP9Profile != "" {
const prefix string = ",\"vp9profile\":"
out.RawString(prefix)
out.String(string(in.VP9Profile))
}
if in.H264Profile != "" {
const prefix string = ",\"h264profile\":"
out.RawString(prefix)
out.String(string(in.H264Profile))
}
out.RawByte('}')
}

View file

@ -903,7 +903,15 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea
s.mu.Unlock()
defer s.mu.Lock()
bitrate := data.Bitrate
settings := NewPublisherSettings{
Bitrate: data.Bitrate,
MediaTypes: mediaTypes,
AudioCodec: data.AudioCodec,
VideoCodec: data.VideoCodec,
VP9Profile: data.VP9Profile,
H264Profile: data.H264Profile,
}
if backend := s.Backend(); backend != nil {
var maxBitrate int
if streamType == StreamTypeScreen {
@ -911,14 +919,14 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea
} else {
maxBitrate = backend.maxStreamBitrate
}
if bitrate <= 0 {
bitrate = maxBitrate
} else if maxBitrate > 0 && bitrate > maxBitrate {
bitrate = maxBitrate
if settings.Bitrate <= 0 {
settings.Bitrate = maxBitrate
} else if maxBitrate > 0 && settings.Bitrate > maxBitrate {
settings.Bitrate = maxBitrate
}
}
var err error
publisher, err = mcu.NewPublisher(ctx, s, s.PublicId(), data.Sid, streamType, bitrate, mediaTypes, client)
publisher, err = mcu.NewPublisher(ctx, s, s.PublicId(), data.Sid, streamType, settings, client)
if err != nil {
return nil, err
}

View file

@ -180,7 +180,7 @@ func TestBandwidth_Client(t *testing.T) {
pub := mcu.GetPublisher(hello.Hello.SessionId)
require.NotNil(pub)
assert.Equal(bitrate, pub.bitrate)
assert.Equal(bitrate, pub.settings.Bitrate)
}
func TestBandwidth_Backend(t *testing.T) {
@ -261,7 +261,7 @@ func TestBandwidth_Backend(t *testing.T) {
} else {
expectBitrate = backend.maxScreenBitrate
}
assert.Equal(expectBitrate, pub.bitrate)
assert.Equal(expectBitrate, pub.settings.Bitrate)
})
}
}

View file

@ -24,6 +24,9 @@ package signaling
import (
"context"
"fmt"
"log"
"sync/atomic"
"time"
"github.com/dlintw/goconf"
)
@ -33,6 +36,9 @@ const (
McuTypeProxy = "proxy"
McuTypeDefault = McuTypeJanus
defaultMaxStreamBitrate = 1024 * 1024
defaultMaxScreenBitrate = 2048 * 1024
)
var (
@ -65,6 +71,54 @@ type McuInitiator interface {
Country() string
}
type McuSettings interface {
MaxStreamBitrate() int32
MaxScreenBitrate() int32
Timeout() time.Duration
Reload(config *goconf.ConfigFile)
}
type mcuCommonSettings struct {
maxStreamBitrate atomic.Int32
maxScreenBitrate atomic.Int32
timeout atomic.Int64
}
func (s *mcuCommonSettings) MaxStreamBitrate() int32 {
return s.maxStreamBitrate.Load()
}
func (s *mcuCommonSettings) MaxScreenBitrate() int32 {
return s.maxScreenBitrate.Load()
}
func (s *mcuCommonSettings) Timeout() time.Duration {
return time.Duration(s.timeout.Load())
}
func (s *mcuCommonSettings) setTimeout(timeout time.Duration) {
s.timeout.Store(int64(timeout))
}
func (s *mcuCommonSettings) load(config *goconf.ConfigFile) error {
maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate")
if maxStreamBitrate <= 0 {
maxStreamBitrate = defaultMaxStreamBitrate
}
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", maxStreamBitrate)
s.maxStreamBitrate.Store(int32(maxStreamBitrate))
maxScreenBitrate, _ := config.GetInt("mcu", "maxscreenbitrate")
if maxScreenBitrate <= 0 {
maxScreenBitrate = defaultMaxScreenBitrate
}
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", maxScreenBitrate)
s.maxScreenBitrate.Store(int32(maxScreenBitrate))
return nil
}
type Mcu interface {
Start(ctx context.Context) error
Stop()
@ -75,7 +129,7 @@ type Mcu interface {
GetStats() interface{}
NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error)
NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, settings NewPublisherSettings, initiator McuInitiator) (McuPublisher, error)
NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType StreamType, initiator McuInitiator) (McuSubscriber, error)
}

View file

@ -46,9 +46,6 @@ const (
initialReconnectInterval = 1 * time.Second
maxReconnectInterval = 32 * time.Second
defaultMaxStreamBitrate = 1024 * 1024
defaultMaxScreenBitrate = 2048 * 1024
)
var (
@ -133,13 +130,45 @@ type clientInterface interface {
NotifyReconnected()
}
type mcuJanusSettings struct {
mcuCommonSettings
}
func newMcuJanusSettings(config *goconf.ConfigFile) (McuSettings, error) {
settings := &mcuJanusSettings{}
if err := settings.load(config); err != nil {
return nil, err
}
return settings, nil
}
func (s *mcuJanusSettings) load(config *goconf.ConfigFile) error {
if err := s.mcuCommonSettings.load(config); err != nil {
return err
}
mcuTimeoutSeconds, _ := config.GetInt("mcu", "timeout")
if mcuTimeoutSeconds <= 0 {
mcuTimeoutSeconds = defaultMcuTimeoutSeconds
}
mcuTimeout := time.Duration(mcuTimeoutSeconds) * time.Second
log.Printf("Using a timeout of %s for MCU requests", mcuTimeout)
s.setTimeout(mcuTimeout)
return nil
}
func (s *mcuJanusSettings) Reload(config *goconf.ConfigFile) {
if err := s.load(config); err != nil {
log.Printf("Error reloading MCU settings: %s", err)
}
}
type mcuJanus struct {
url string
mu sync.Mutex
maxStreamBitrate atomic.Int32
maxScreenBitrate atomic.Int32
mcuTimeout time.Duration
settings McuSettings
gw *JanusGateway
session *JanusSession
@ -170,33 +199,22 @@ func emptyOnConnected() {}
func emptyOnDisconnected() {}
func NewMcuJanus(ctx context.Context, url string, config *goconf.ConfigFile) (Mcu, error) {
maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate")
if maxStreamBitrate <= 0 {
maxStreamBitrate = defaultMaxStreamBitrate
settings, err := newMcuJanusSettings(config)
if err != nil {
return nil, err
}
maxScreenBitrate, _ := config.GetInt("mcu", "maxscreenbitrate")
if maxScreenBitrate <= 0 {
maxScreenBitrate = defaultMaxScreenBitrate
}
mcuTimeoutSeconds, _ := config.GetInt("mcu", "timeout")
if mcuTimeoutSeconds <= 0 {
mcuTimeoutSeconds = defaultMcuTimeoutSeconds
}
mcuTimeout := time.Duration(mcuTimeoutSeconds) * time.Second
mcu := &mcuJanus{
url: url,
mcuTimeout: mcuTimeout,
closeChan: make(chan struct{}, 1),
clients: make(map[clientInterface]bool),
url: url,
settings: settings,
closeChan: make(chan struct{}, 1),
clients: make(map[clientInterface]bool),
publishers: make(map[string]*mcuJanusPublisher),
remotePublishers: make(map[string]*mcuJanusRemotePublisher),
reconnectInterval: initialReconnectInterval,
}
mcu.maxStreamBitrate.Store(int32(maxStreamBitrate))
mcu.maxScreenBitrate.Store(int32(maxScreenBitrate))
mcu.onConnected.Store(emptyOnConnected)
mcu.onDisconnected.Store(emptyOnDisconnected)
@ -323,8 +341,6 @@ func (m *mcuJanus) Start(ctx context.Context) error {
} else {
log.Println("Full-Trickle is enabled")
}
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate.Load())
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate.Load())
if m.session, err = m.gw.Create(ctx); err != nil {
m.disconnect()
@ -378,19 +394,7 @@ func (m *mcuJanus) Stop() {
}
func (m *mcuJanus) Reload(config *goconf.ConfigFile) {
maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate")
if maxStreamBitrate <= 0 {
maxStreamBitrate = defaultMaxStreamBitrate
}
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate.Load())
m.maxStreamBitrate.Store(int32(maxStreamBitrate))
maxScreenBitrate, _ := config.GetInt("mcu", "maxscreenbitrate")
if maxScreenBitrate <= 0 {
maxScreenBitrate = defaultMaxScreenBitrate
}
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate.Load())
m.maxScreenBitrate.Store(int32(maxScreenBitrate))
m.settings.Reload(config)
}
func (m *mcuJanus) SetOnConnected(f func()) {
@ -474,7 +478,7 @@ func (m *mcuJanus) SubscriberDisconnected(id string, publisher string, streamTyp
}
}
func (m *mcuJanus) createPublisherRoom(ctx context.Context, handle *JanusHandle, id string, streamType StreamType, bitrate int) (uint64, int, error) {
func (m *mcuJanus) createPublisherRoom(ctx context.Context, handle *JanusHandle, id string, streamType StreamType, settings NewPublisherSettings) (uint64, int, error) {
create_msg := map[string]interface{}{
"request": "create",
"description": getStreamId(id, streamType),
@ -484,12 +488,25 @@ func (m *mcuJanus) createPublisherRoom(ctx context.Context, handle *JanusHandle,
// orientation changes in Firefox.
"videoorient_ext": false,
}
if codec := settings.AudioCodec; codec != "" {
create_msg["audiocodec"] = codec
}
if codec := settings.VideoCodec; codec != "" {
create_msg["videocodec"] = codec
}
if profile := settings.VP9Profile; profile != "" {
create_msg["vp9_profile"] = profile
}
if profile := settings.H264Profile; profile != "" {
create_msg["h264_profile"] = profile
}
var maxBitrate int
if streamType == StreamTypeScreen {
maxBitrate = int(m.maxScreenBitrate.Load())
maxBitrate = int(m.settings.MaxScreenBitrate())
} else {
maxBitrate = int(m.maxStreamBitrate.Load())
maxBitrate = int(m.settings.MaxStreamBitrate())
}
bitrate := settings.Bitrate
if bitrate <= 0 {
bitrate = maxBitrate
} else {
@ -516,7 +533,7 @@ func (m *mcuJanus) createPublisherRoom(ctx context.Context, handle *JanusHandle,
return roomId, bitrate, nil
}
func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, streamType StreamType, bitrate int) (*JanusHandle, uint64, uint64, int, error) {
func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, streamType StreamType, settings NewPublisherSettings) (*JanusHandle, uint64, uint64, int, error) {
session := m.session
if session == nil {
return nil, 0, 0, 0, ErrNotConnected
@ -528,7 +545,7 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st
log.Printf("Attached %s as publisher %d to plugin %s in session %d", streamType, handle.Id, pluginVideoRoom, session.Id)
roomId, bitrate, err := m.createPublisherRoom(ctx, handle, id, streamType, bitrate)
roomId, bitrate, err := m.createPublisherRoom(ctx, handle, id, streamType, settings)
if err != nil {
if _, err2 := handle.Detach(ctx); err2 != nil {
log.Printf("Error detaching handle %d: %s", handle.Id, err2)
@ -554,12 +571,12 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st
return handle, response.Session, roomId, bitrate, nil
}
func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, settings NewPublisherSettings, initiator McuInitiator) (McuPublisher, error) {
if _, found := streamTypeUserIds[streamType]; !found {
return nil, fmt.Errorf("Unsupported stream type %s", streamType)
}
handle, session, roomId, maxBitrate, err := m.getOrCreatePublisherHandle(ctx, id, streamType, bitrate)
handle, session, roomId, maxBitrate, err := m.getOrCreatePublisherHandle(ctx, id, streamType, settings)
if err != nil {
return nil, err
}
@ -581,10 +598,9 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
closeChan: make(chan struct{}, 1),
deferred: make(chan func(), 64),
},
sdpReady: NewCloser(),
id: id,
bitrate: bitrate,
mediaTypes: mediaTypes,
sdpReady: NewCloser(),
id: id,
settings: settings,
}
client.mcuJanusClient.handleEvent = client.handleEvent
client.mcuJanusClient.handleHangup = client.handleHangup
@ -694,7 +710,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ
return client, nil
}
func (m *mcuJanus) getOrCreateRemotePublisher(ctx context.Context, controller RemotePublisherController, streamType StreamType, bitrate int) (*mcuJanusRemotePublisher, error) {
func (m *mcuJanus) getOrCreateRemotePublisher(ctx context.Context, controller RemotePublisherController, streamType StreamType, settings NewPublisherSettings) (*mcuJanusRemotePublisher, error) {
m.mu.Lock()
defer m.mu.Unlock()
pub, found := m.remotePublishers[getStreamId(controller.PublisherId(), streamType)]
@ -721,7 +737,7 @@ func (m *mcuJanus) getOrCreateRemotePublisher(ctx context.Context, controller Re
return nil, err
}
roomId, bitrate, err := m.createPublisherRoom(ctx, handle, controller.PublisherId(), streamType, bitrate)
roomId, maxBitrate, err := m.createPublisherRoom(ctx, handle, controller.PublisherId(), streamType, settings)
if err != nil {
if _, err2 := handle.Detach(ctx); err2 != nil {
log.Printf("Error detaching handle %d: %s", handle.Id, err2)
@ -756,7 +772,7 @@ func (m *mcuJanus) getOrCreateRemotePublisher(ctx context.Context, controller Re
roomId: roomId,
sid: strconv.FormatUint(handle.Id, 10),
streamType: streamType,
maxBitrate: bitrate,
maxBitrate: maxBitrate,
handle: handle,
handleId: handle.Id,
@ -766,6 +782,7 @@ func (m *mcuJanus) getOrCreateRemotePublisher(ctx context.Context, controller Re
sdpReady: NewCloser(),
id: controller.PublisherId(),
settings: settings,
},
port: int(port),
@ -797,7 +814,7 @@ func (m *mcuJanus) NewRemotePublisher(ctx context.Context, listener McuListener,
return nil, ErrRemoteStreamsNotSupported
}
pub, err := m.getOrCreateRemotePublisher(ctx, controller, streamType, 0)
pub, err := m.getOrCreateRemotePublisher(ctx, controller, streamType, NewPublisherSettings{})
if err != nil {
return nil, err
}

View file

@ -47,14 +47,13 @@ const (
type mcuJanusPublisher struct {
mcuJanusClient
id string
bitrate int
mediaTypes MediaType
stats publisherStatsCounter
sdpFlags Flags
sdpReady *Closer
offerSdp atomic.Pointer[sdp.SessionDescription]
answerSdp atomic.Pointer[sdp.SessionDescription]
id string
settings NewPublisherSettings
stats publisherStatsCounter
sdpFlags Flags
sdpReady *Closer
offerSdp atomic.Pointer[sdp.SessionDescription]
answerSdp atomic.Pointer[sdp.SessionDescription]
}
func (p *mcuJanusPublisher) handleEvent(event *janus.EventMsg) {
@ -108,16 +107,16 @@ func (p *mcuJanusPublisher) handleMedia(event *janus.MediaMsg) {
}
func (p *mcuJanusPublisher) HasMedia(mt MediaType) bool {
return (p.mediaTypes & mt) == mt
return (p.settings.MediaTypes & mt) == mt
}
func (p *mcuJanusPublisher) SetMedia(mt MediaType) {
p.mediaTypes = mt
p.settings.MediaTypes = mt
}
func (p *mcuJanusPublisher) NotifyReconnected() {
ctx := context.TODO()
handle, session, roomId, _, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate)
handle, session, roomId, _, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.settings)
if err != nil {
log.Printf("Could not reconnect publisher %s: %s", p.id, err)
// TODO(jojo): Retry
@ -184,7 +183,7 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli
// TODO Tear down previous publisher and get a new one if sid does
// not match?
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.settings.Timeout())
defer cancel()
p.sendOffer(msgctx, jsep_msg, func(err error, jsep map[string]interface{}) {
@ -221,7 +220,7 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli
}
case "candidate":
p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.settings.Timeout())
defer cancel()
if data.Sid == "" || data.Sid == p.Sid() {

View file

@ -96,7 +96,7 @@ func (p *mcuJanusRemotePublisher) handleSlowLink(event *janus.SlowLinkMsg) {
func (p *mcuJanusRemotePublisher) NotifyReconnected() {
ctx := context.TODO()
handle, session, roomId, _, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate)
handle, session, roomId, _, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.settings)
if err != nil {
log.Printf("Could not reconnect remote publisher %s: %s", p.id, err)
// TODO(jojo): Retry

View file

@ -88,7 +88,7 @@ func (p *mcuJanusRemoteSubscriber) handleMedia(event *janus.MediaMsg) {
}
func (p *mcuJanusRemoteSubscriber) NotifyReconnected() {
ctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
ctx, cancel := context.WithTimeout(context.Background(), p.mcu.settings.Timeout())
defer cancel()
handle, pub, err := p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType)
if err != nil {

View file

@ -92,7 +92,7 @@ func (p *mcuJanusSubscriber) handleMedia(event *janus.MediaMsg) {
}
func (p *mcuJanusSubscriber) NotifyReconnected() {
ctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
ctx, cancel := context.WithTimeout(context.Background(), p.mcu.settings.Timeout())
defer cancel()
handle, pub, err := p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType)
if err != nil {
@ -256,7 +256,7 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl
fallthrough
case "sendoffer":
p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.settings.Timeout())
defer cancel()
stream, err := parseStreamSelection(jsep_msg)
@ -273,7 +273,7 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl
}
case "answer":
p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.settings.Timeout())
defer cancel()
if data.Sid == "" || data.Sid == p.Sid() {
@ -284,7 +284,7 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl
}
case "candidate":
p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.settings.Timeout())
defer cancel()
if data.Sid == "" || data.Sid == p.Sid() {
@ -309,7 +309,7 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl
}
p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.settings.Timeout())
defer cancel()
p.selectStream(msgctx, stream, callback)

View file

@ -134,11 +134,11 @@ func (c *mcuProxyPubSubCommon) doProcessPayload(client McuClient, msg *PayloadPr
type mcuProxyPublisher struct {
mcuProxyPubSubCommon
id string
mediaTypes MediaType
id string
settings NewPublisherSettings
}
func newMcuProxyPublisher(id string, sid string, streamType StreamType, maxBitrate int, mediaTypes MediaType, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxyPublisher {
func newMcuProxyPublisher(id string, sid string, streamType StreamType, maxBitrate int, settings NewPublisherSettings, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxyPublisher {
return &mcuProxyPublisher{
mcuProxyPubSubCommon: mcuProxyPubSubCommon{
sid: sid,
@ -148,18 +148,18 @@ func newMcuProxyPublisher(id string, sid string, streamType StreamType, maxBitra
conn: conn,
listener: listener,
},
id: id,
mediaTypes: mediaTypes,
id: id,
settings: settings,
}
}
func (p *mcuProxyPublisher) HasMedia(mt MediaType) bool {
return (p.mediaTypes & mt) == mt
return (p.settings.MediaTypes & mt) == mt
}
func (p *mcuProxyPublisher) SetMedia(mt MediaType) {
// TODO: Also update mediaTypes on proxy.
p.mediaTypes = mt
p.settings.MediaTypes = mt
}
func (p *mcuProxyPublisher) NotifyClosed() {
@ -1140,15 +1140,17 @@ func (c *mcuProxyConnection) performSyncRequest(ctx context.Context, msg *ProxyC
}
}
func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType) (McuPublisher, error) {
func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, settings NewPublisherSettings) (McuPublisher, error) {
msg := &ProxyClientMessage{
Type: "command",
Command: &CommandProxyClientMessage{
Type: "create-publisher",
Sid: sid,
StreamType: streamType,
Bitrate: bitrate,
MediaTypes: mediaTypes,
Type: "create-publisher",
Sid: sid,
StreamType: streamType,
PublisherSettings: &settings,
// Include for older version of the signaling proxy.
Bitrate: settings.Bitrate,
MediaTypes: settings.MediaTypes,
},
}
@ -1162,7 +1164,7 @@ func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListe
proxyId := response.Command.Id
log.Printf("Created %s publisher %s on %s for %s", streamType, proxyId, c, id)
publisher := newMcuProxyPublisher(id, sid, streamType, response.Command.Bitrate, mediaTypes, proxyId, c, listener)
publisher := newMcuProxyPublisher(id, sid, streamType, response.Command.Bitrate, settings, proxyId, c, listener)
c.publishersLock.Lock()
c.publishers[proxyId] = publisher
c.publisherIds[getStreamId(id, streamType)] = proxyId
@ -1242,6 +1244,40 @@ func (c *mcuProxyConnection) newRemoteSubscriber(ctx context.Context, listener M
return subscriber, nil
}
type mcuProxySettings struct {
mcuCommonSettings
}
func newMcuProxySettings(config *goconf.ConfigFile) (McuSettings, error) {
settings := &mcuProxySettings{}
if err := settings.load(config); err != nil {
return nil, err
}
return settings, nil
}
func (s *mcuProxySettings) load(config *goconf.ConfigFile) error {
if err := s.mcuCommonSettings.load(config); err != nil {
return err
}
proxyTimeoutSeconds, _ := config.GetInt("mcu", "proxytimeout")
if proxyTimeoutSeconds <= 0 {
proxyTimeoutSeconds = defaultProxyTimeoutSeconds
}
proxyTimeout := time.Duration(proxyTimeoutSeconds) * time.Second
log.Printf("Using a timeout of %s for proxy requests", proxyTimeout)
s.setTimeout(proxyTimeout)
return nil
}
func (s *mcuProxySettings) Reload(config *goconf.ConfigFile) {
if err := s.load(config); err != nil {
log.Printf("Error reloading proxy settings: %s", err)
}
}
type mcuProxy struct {
urlType string
tokenId string
@ -1252,12 +1288,10 @@ type mcuProxy struct {
connections []*mcuProxyConnection
connectionsMap map[string][]*mcuProxyConnection
connectionsMu sync.RWMutex
proxyTimeout time.Duration
connRequests atomic.Int64
nextSort atomic.Int64
maxStreamBitrate atomic.Int32
maxScreenBitrate atomic.Int32
settings McuSettings
mu sync.RWMutex
publishers map[string]*mcuProxyConnection
@ -1292,20 +1326,9 @@ func NewMcuProxy(config *goconf.ConfigFile, etcdClient *EtcdClient, rpcClients *
return nil, fmt.Errorf("Could not parse private key from %s: %s", tokenKeyFilename, err)
}
proxyTimeoutSeconds, _ := config.GetInt("mcu", "proxytimeout")
if proxyTimeoutSeconds <= 0 {
proxyTimeoutSeconds = defaultProxyTimeoutSeconds
}
proxyTimeout := time.Duration(proxyTimeoutSeconds) * time.Second
log.Printf("Using a timeout of %s for proxy requests", proxyTimeout)
maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate")
if maxStreamBitrate <= 0 {
maxStreamBitrate = defaultMaxStreamBitrate
}
maxScreenBitrate, _ := config.GetInt("mcu", "maxscreenbitrate")
if maxScreenBitrate <= 0 {
maxScreenBitrate = defaultMaxScreenBitrate
settings, err := newMcuProxySettings((config))
if err != nil {
return nil, err
}
mcu := &mcuProxy{
@ -1315,19 +1338,16 @@ func NewMcuProxy(config *goconf.ConfigFile, etcdClient *EtcdClient, rpcClients *
dialer: &websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: proxyTimeout,
HandshakeTimeout: settings.Timeout(),
},
connectionsMap: make(map[string][]*mcuProxyConnection),
proxyTimeout: proxyTimeout,
settings: settings,
publishers: make(map[string]*mcuProxyConnection),
rpcClients: rpcClients,
}
mcu.maxStreamBitrate.Store(int32(maxStreamBitrate))
mcu.maxScreenBitrate.Store(int32(maxScreenBitrate))
if err := mcu.loadContinentsMap(config); err != nil {
return nil, err
}
@ -1397,9 +1417,6 @@ func (m *mcuProxy) loadContinentsMap(config *goconf.ConfigFile) error {
}
func (m *mcuProxy) Start(ctx context.Context) error {
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate.Load())
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate.Load())
return m.config.Start()
}
@ -1557,19 +1574,11 @@ func (m *mcuProxy) KeepConnection(url string, ips ...net.IP) {
}
func (m *mcuProxy) Reload(config *goconf.ConfigFile) {
maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate")
if maxStreamBitrate <= 0 {
maxStreamBitrate = defaultMaxStreamBitrate
}
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate.Load())
m.maxStreamBitrate.Store(int32(maxStreamBitrate))
m.settings.Reload(config)
maxScreenBitrate, _ := config.GetInt("mcu", "maxscreenbitrate")
if maxScreenBitrate <= 0 {
maxScreenBitrate = defaultMaxScreenBitrate
if m.settings.Timeout() != m.dialer.HandshakeTimeout {
m.dialer.HandshakeTimeout = m.settings.Timeout()
}
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate.Load())
m.maxScreenBitrate.Store(int32(maxScreenBitrate))
if err := m.loadContinentsMap(config); err != nil {
log.Printf("Error loading continents map: %s", err)
@ -1763,17 +1772,19 @@ func (m *mcuProxy) removePublisher(publisher *mcuProxyPublisher) {
delete(m.publishers, getStreamId(publisher.id, publisher.StreamType()))
}
func (m *mcuProxy) createPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator, connections []*mcuProxyConnection, isAllowed func(c *mcuProxyConnection) bool) McuPublisher {
func (m *mcuProxy) createPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, settings NewPublisherSettings, initiator McuInitiator, connections []*mcuProxyConnection, isAllowed func(c *mcuProxyConnection) bool) McuPublisher {
var maxBitrate int
if streamType == StreamTypeScreen {
maxBitrate = int(m.maxScreenBitrate.Load())
maxBitrate = int(m.settings.MaxScreenBitrate())
} else {
maxBitrate = int(m.maxStreamBitrate.Load())
maxBitrate = int(m.settings.MaxStreamBitrate())
}
if bitrate <= 0 {
bitrate = maxBitrate
publisherSettings := settings
if publisherSettings.Bitrate <= 0 {
publisherSettings.Bitrate = maxBitrate
} else {
bitrate = min(bitrate, maxBitrate)
publisherSettings.Bitrate = min(publisherSettings.Bitrate, maxBitrate)
}
for _, conn := range connections {
@ -1781,10 +1792,10 @@ func (m *mcuProxy) createPublisher(ctx context.Context, listener McuListener, id
continue
}
subctx, cancel := context.WithTimeout(ctx, m.proxyTimeout)
subctx, cancel := context.WithTimeout(ctx, m.settings.Timeout())
defer cancel()
publisher, err := conn.newPublisher(subctx, listener, id, sid, streamType, bitrate, mediaTypes)
publisher, err := conn.newPublisher(subctx, listener, id, sid, streamType, publisherSettings)
if err != nil {
log.Printf("Could not create %s publisher for %s on %s: %s", streamType, id, conn, err)
continue
@ -1800,9 +1811,9 @@ func (m *mcuProxy) createPublisher(ctx context.Context, listener McuListener, id
return nil
}
func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, settings NewPublisherSettings, initiator McuInitiator) (McuPublisher, error) {
connections := m.getSortedConnections(initiator)
publisher := m.createPublisher(ctx, listener, id, sid, streamType, bitrate, mediaTypes, initiator, connections, func(c *mcuProxyConnection) bool {
publisher := m.createPublisher(ctx, listener, id, sid, streamType, settings, initiator, connections, func(c *mcuProxyConnection) bool {
bw := c.Bandwidth()
return bw == nil || bw.AllowIncoming()
})
@ -1838,7 +1849,7 @@ func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id st
}
return 0
})
publisher = m.createPublisher(ctx, listener, id, sid, streamType, bitrate, mediaTypes, initiator, connections2, func(c *mcuProxyConnection) bool {
publisher = m.createPublisher(ctx, listener, id, sid, streamType, settings, initiator, connections2, func(c *mcuProxyConnection) bool {
return true
})
}

View file

@ -255,6 +255,20 @@ func (c *testProxyServerClient) processCommandMessage(msg *ProxyClientMessage) (
case "create-publisher":
pub := c.server.createPublisher()
if assert.NotNil(c.t, msg.Command.PublisherSettings) {
if assert.NotEqualValues(c.t, 0, msg.Command.PublisherSettings.Bitrate) {
assert.EqualValues(c.t, msg.Command.Bitrate, msg.Command.PublisherSettings.Bitrate)
}
assert.EqualValues(c.t, msg.Command.MediaTypes, msg.Command.PublisherSettings.MediaTypes)
if strings.Contains(c.t.Name(), "Codecs") {
assert.Equal(c.t, "opus,g722", msg.Command.PublisherSettings.AudioCodec)
assert.Equal(c.t, "vp9,vp8,av1", msg.Command.PublisherSettings.VideoCodec)
} else {
assert.Empty(c.t, msg.Command.PublisherSettings.AudioCodec)
assert.Empty(c.t, msg.Command.PublisherSettings.VideoCodec)
}
}
response = &ProxyServerMessage{
Id: msg.Id,
Type: "command",
@ -671,6 +685,7 @@ func newMcuProxyForTestWithOptions(t *testing.T, options proxyTestOptions) *mcuP
cfg := goconf.NewConfigFile()
cfg.AddOption("mcu", "urltype", "static")
cfg.AddOption("mcu", "proxytimeout", fmt.Sprintf("%d", int(testTimeout.Seconds())))
var urls []string
waitingMap := make(map[string]bool)
if len(options.servers) == 0 {
@ -765,7 +780,9 @@ func Test_ProxyPublisherSubscriber(t *testing.T) {
country: "DE",
}
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubInitiator)
require.NoError(t, err)
defer pub.Close(context.Background())
@ -782,6 +799,33 @@ func Test_ProxyPublisherSubscriber(t *testing.T) {
defer sub.Close(context.Background())
}
func Test_ProxyPublisherCodecs(t *testing.T) {
CatchLogForTest(t)
t.Parallel()
mcu := newMcuProxyForTest(t)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
pubId := "the-publisher"
pubSid := "1234567890"
pubListener := &MockMcuListener{
publicId: pubId + "-public",
}
pubInitiator := &MockMcuInitiator{
country: "DE",
}
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
AudioCodec: "opus,g722",
VideoCodec: "vp9,vp8,av1",
}, pubInitiator)
require.NoError(t, err)
defer pub.Close(context.Background())
}
func Test_ProxyWaitForPublisher(t *testing.T) {
CatchLogForTest(t)
t.Parallel()
@ -819,7 +863,9 @@ func Test_ProxyWaitForPublisher(t *testing.T) {
// Give subscriber goroutine some time to start
time.Sleep(100 * time.Millisecond)
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubInitiator)
require.NoError(t, err)
select {
@ -851,7 +897,9 @@ func Test_ProxyPublisherBandwidth(t *testing.T) {
pub1Initiator := &MockMcuInitiator{
country: "DE",
}
pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub1Initiator)
pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pub1Initiator)
require.NoError(t, err)
defer pub1.Close(context.Background())
@ -888,7 +936,9 @@ func Test_ProxyPublisherBandwidth(t *testing.T) {
pub2Initiator := &MockMcuInitiator{
country: "DE",
}
pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub2Initiator)
pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pub2Initiator)
require.NoError(t, err)
defer pub2.Close(context.Background())
@ -917,7 +967,9 @@ func Test_ProxyPublisherBandwidthOverload(t *testing.T) {
pub1Initiator := &MockMcuInitiator{
country: "DE",
}
pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub1Initiator)
pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pub1Initiator)
require.NoError(t, err)
defer pub1.Close(context.Background())
@ -957,7 +1009,9 @@ func Test_ProxyPublisherBandwidthOverload(t *testing.T) {
pub2Initiator := &MockMcuInitiator{
country: "DE",
}
pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub2Initiator)
pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pub2Initiator)
require.NoError(t, err)
defer pub2.Close(context.Background())
@ -986,7 +1040,9 @@ func Test_ProxyPublisherLoad(t *testing.T) {
pub1Initiator := &MockMcuInitiator{
country: "DE",
}
pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub1Initiator)
pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pub1Initiator)
require.NoError(t, err)
defer pub1.Close(context.Background())
@ -1003,7 +1059,9 @@ func Test_ProxyPublisherLoad(t *testing.T) {
pub2Initiator := &MockMcuInitiator{
country: "DE",
}
pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub2Initiator)
pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pub2Initiator)
require.NoError(t, err)
defer pub2.Close(context.Background())
@ -1032,7 +1090,9 @@ func Test_ProxyPublisherCountry(t *testing.T) {
pubDEInitiator := &MockMcuInitiator{
country: "DE",
}
pubDE, err := mcu.NewPublisher(ctx, pubDEListener, pubDEId, pubDESid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubDEInitiator)
pubDE, err := mcu.NewPublisher(ctx, pubDEListener, pubDEId, pubDESid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubDEInitiator)
require.NoError(t, err)
defer pubDE.Close(context.Background())
@ -1047,7 +1107,9 @@ func Test_ProxyPublisherCountry(t *testing.T) {
pubUSInitiator := &MockMcuInitiator{
country: "US",
}
pubUS, err := mcu.NewPublisher(ctx, pubUSListener, pubUSId, pubUSSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubUSInitiator)
pubUS, err := mcu.NewPublisher(ctx, pubUSListener, pubUSId, pubUSSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubUSInitiator)
require.NoError(t, err)
defer pubUS.Close(context.Background())
@ -1076,7 +1138,9 @@ func Test_ProxyPublisherContinent(t *testing.T) {
pubDEInitiator := &MockMcuInitiator{
country: "DE",
}
pubDE, err := mcu.NewPublisher(ctx, pubDEListener, pubDEId, pubDESid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubDEInitiator)
pubDE, err := mcu.NewPublisher(ctx, pubDEListener, pubDEId, pubDESid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubDEInitiator)
require.NoError(t, err)
defer pubDE.Close(context.Background())
@ -1091,7 +1155,9 @@ func Test_ProxyPublisherContinent(t *testing.T) {
pubFRInitiator := &MockMcuInitiator{
country: "FR",
}
pubFR, err := mcu.NewPublisher(ctx, pubFRListener, pubFRId, pubFRSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubFRInitiator)
pubFR, err := mcu.NewPublisher(ctx, pubFRListener, pubFRId, pubFRSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubFRInitiator)
require.NoError(t, err)
defer pubFR.Close(context.Background())
@ -1120,7 +1186,9 @@ func Test_ProxySubscriberCountry(t *testing.T) {
pubInitiator := &MockMcuInitiator{
country: "DE",
}
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubInitiator)
require.NoError(t, err)
defer pub.Close(context.Background())
@ -1162,7 +1230,9 @@ func Test_ProxySubscriberContinent(t *testing.T) {
pubInitiator := &MockMcuInitiator{
country: "DE",
}
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubInitiator)
require.NoError(t, err)
defer pub.Close(context.Background())
@ -1204,7 +1274,9 @@ func Test_ProxySubscriberBandwidth(t *testing.T) {
pubInitiator := &MockMcuInitiator{
country: "DE",
}
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubInitiator)
require.NoError(t, err)
defer pub.Close(context.Background())
@ -1266,7 +1338,9 @@ func Test_ProxySubscriberBandwidthOverload(t *testing.T) {
pubInitiator := &MockMcuInitiator{
country: "DE",
}
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubInitiator)
require.NoError(t, err)
defer pub.Close(context.Background())
@ -1404,7 +1478,9 @@ func Test_ProxyRemotePublisher(t *testing.T) {
hub1.addSession(session1)
defer hub1.removeSession(session1)
pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubInitiator)
require.NoError(t, err)
defer pub.Close(context.Background())
@ -1501,7 +1577,9 @@ func Test_ProxyRemotePublisherWait(t *testing.T) {
// Give subscriber goroutine some time to start
time.Sleep(100 * time.Millisecond)
pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubInitiator)
require.NoError(t, err)
defer pub.Close(context.Background())
@ -1570,7 +1648,9 @@ func Test_ProxyRemotePublisherTemporary(t *testing.T) {
hub1.addSession(session1)
defer hub1.removeSession(session1)
pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, NewPublisherSettings{
MediaTypes: MediaTypeVideo | MediaTypeAudio,
}, pubInitiator)
require.NoError(t, err)
defer pub.Close(context.Background())

View file

@ -70,17 +70,17 @@ func (m *TestMCU) GetStats() interface{} {
return nil
}
func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, settings NewPublisherSettings, initiator McuInitiator) (McuPublisher, error) {
var maxBitrate int
if streamType == StreamTypeScreen {
maxBitrate = TestMaxBitrateScreen
} else {
maxBitrate = TestMaxBitrateVideo
}
if bitrate <= 0 {
bitrate = maxBitrate
} else if bitrate > maxBitrate {
bitrate = maxBitrate
publisherSettings := settings
bitrate := publisherSettings.Bitrate
if bitrate <= 0 || bitrate > maxBitrate {
publisherSettings.Bitrate = maxBitrate
}
pub := &TestMCUPublisher{
TestMCUClient: TestMCUClient{
@ -89,8 +89,7 @@ func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id str
streamType: streamType,
},
mediaTypes: mediaTypes,
bitrate: bitrate,
settings: publisherSettings,
}
m.mu.Lock()
@ -176,18 +175,17 @@ func (c *TestMCUClient) isClosed() bool {
type TestMCUPublisher struct {
TestMCUClient
mediaTypes MediaType
bitrate int
settings NewPublisherSettings
sdp string
}
func (p *TestMCUPublisher) HasMedia(mt MediaType) bool {
return (p.mediaTypes & mt) == mt
return (p.settings.MediaTypes & mt) == mt
}
func (p *TestMCUPublisher) SetMedia(mt MediaType) {
p.mediaTypes = mt
p.settings.MediaTypes = mt
}
func (p *TestMCUPublisher) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) {

View file

@ -889,7 +889,14 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s
defer cancel()
id := uuid.New().String()
publisher, err := s.mcu.NewPublisher(ctx2, session, id, cmd.Sid, cmd.StreamType, cmd.Bitrate, cmd.MediaTypes, &emptyInitiator{})
settings := cmd.PublisherSettings
if settings == nil {
settings = &signaling.NewPublisherSettings{
Bitrate: cmd.Bitrate, // nolint
MediaTypes: cmd.MediaTypes, // nolint
}
}
publisher, err := s.mcu.NewPublisher(ctx2, session, id, cmd.Sid, cmd.StreamType, *settings, &emptyInitiator{})
if err == context.DeadlineExceeded {
log.Printf("Timeout while creating %s publisher %s for %s", cmd.StreamType, id, session.PublicId())
session.sendMessage(message.NewErrorServerMessage(TimeoutCreatingPublisher))

View file

@ -355,8 +355,88 @@ func TestProxyCreateSession(t *testing.T) {
assert.NoError(err)
}
type TestMCU struct {
t *testing.T
}
func (m *TestMCU) Start(ctx context.Context) error {
return nil
}
func (m *TestMCU) Stop() {
}
func (m *TestMCU) Reload(config *goconf.ConfigFile) {
}
func (m *TestMCU) SetOnConnected(f func()) {
}
func (m *TestMCU) SetOnDisconnected(f func()) {
}
func (m *TestMCU) GetStats() interface{} {
return nil
}
func (m *TestMCU) NewPublisher(ctx context.Context, listener signaling.McuListener, id string, sid string, streamType signaling.StreamType, settings signaling.NewPublisherSettings, initiator signaling.McuInitiator) (signaling.McuPublisher, error) {
return nil, errors.New("not implemented")
}
func (m *TestMCU) NewSubscriber(ctx context.Context, listener signaling.McuListener, publisher string, streamType signaling.StreamType, initiator signaling.McuInitiator) (signaling.McuSubscriber, error) {
return nil, errors.New("not implemented")
}
type TestMCUPublisher struct {
id string
sid string
streamType signaling.StreamType
}
func (p *TestMCUPublisher) Id() string {
return p.id
}
func (p *TestMCUPublisher) Sid() string {
return p.sid
}
func (p *TestMCUPublisher) StreamType() signaling.StreamType {
return p.streamType
}
func (p *TestMCUPublisher) MaxBitrate() int {
return 0
}
func (p *TestMCUPublisher) Close(ctx context.Context) {
}
func (p *TestMCUPublisher) SendMessage(ctx context.Context, message *signaling.MessageClientMessage, data *signaling.MessageClientMessageData, callback func(error, map[string]interface{})) {
callback(errors.New("not implemented"), nil)
}
func (p *TestMCUPublisher) HasMedia(signaling.MediaType) bool {
return false
}
func (p *TestMCUPublisher) SetMedia(mediaTypes signaling.MediaType) {
}
func (p *TestMCUPublisher) GetStreams(ctx context.Context) ([]signaling.PublisherStream, error) {
return nil, errors.New("not implemented")
}
func (p *TestMCUPublisher) PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error {
return errors.New("not implemented")
}
func (p *TestMCUPublisher) UnpublishRemote(ctx context.Context, remoteId string) error {
return errors.New("not implemented")
}
type HangingTestMCU struct {
t *testing.T
TestMCU
ctx context.Context
creating chan struct{}
created chan struct{}
@ -370,34 +450,16 @@ func NewHangingTestMCU(t *testing.T) *HangingTestMCU {
})
return &HangingTestMCU{
t: t,
TestMCU: TestMCU{
t: t,
},
ctx: ctx,
creating: make(chan struct{}),
created: make(chan struct{}),
}
}
func (m *HangingTestMCU) Start(ctx context.Context) error {
return nil
}
func (m *HangingTestMCU) Stop() {
}
func (m *HangingTestMCU) Reload(config *goconf.ConfigFile) {
}
func (m *HangingTestMCU) SetOnConnected(f func()) {
}
func (m *HangingTestMCU) SetOnDisconnected(f func()) {
}
func (m *HangingTestMCU) GetStats() interface{} {
return nil
}
func (m *HangingTestMCU) NewPublisher(ctx context.Context, listener signaling.McuListener, id string, sid string, streamType signaling.StreamType, bitrate int, mediaTypes signaling.MediaType, initiator signaling.McuInitiator) (signaling.McuPublisher, error) {
func (m *HangingTestMCU) NewPublisher(ctx context.Context, listener signaling.McuListener, id string, sid string, streamType signaling.StreamType, settings signaling.NewPublisherSettings, initiator signaling.McuInitiator) (signaling.McuPublisher, error) {
ctx2, cancel := context.WithTimeout(m.ctx, testTimeout*2)
defer cancel()
@ -489,3 +551,70 @@ func TestProxyCancelOnClose(t *testing.T) {
<-mcu.created
assert.True(mcu.cancelled.Load())
}
type CodecsTestMCU struct {
TestMCU
}
func NewCodecsTestMCU(t *testing.T) *CodecsTestMCU {
return &CodecsTestMCU{
TestMCU: TestMCU{
t: t,
},
}
}
func (m *CodecsTestMCU) NewPublisher(ctx context.Context, listener signaling.McuListener, id string, sid string, streamType signaling.StreamType, settings signaling.NewPublisherSettings, initiator signaling.McuInitiator) (signaling.McuPublisher, error) {
assert.Equal(m.t, "opus,g722", settings.AudioCodec)
assert.Equal(m.t, "vp9,vp8,av1", settings.VideoCodec)
return &TestMCUPublisher{
id: id,
sid: sid,
streamType: streamType,
}, nil
}
func TestProxyCodecs(t *testing.T) {
signaling.CatchLogForTest(t)
assert := assert.New(t)
require := require.New(t)
proxy, key, server := newProxyServerForTest(t)
mcu := NewCodecsTestMCU(t)
proxy.mcu = mcu
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client := NewProxyTestClient(ctx, t, server.URL)
defer client.CloseWithBye()
require.NoError(client.SendHello(key))
if hello, err := client.RunUntilHello(ctx); assert.NoError(err) {
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello)
}
_, err := client.RunUntilLoad(ctx, 0)
assert.NoError(err)
require.NoError(client.WriteJSON(&signaling.ProxyClientMessage{
Id: "2345",
Type: "command",
Command: &signaling.CommandProxyClientMessage{
Type: "create-publisher",
StreamType: signaling.StreamTypeVideo,
PublisherSettings: &signaling.NewPublisherSettings{
AudioCodec: "opus,g722",
VideoCodec: "vp9,vp8,av1",
},
},
}))
if message, err := client.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("2345", message.Id)
if err := checkMessageType(message, "command"); assert.NoError(err) {
assert.NotEmpty(message.Command.Id)
}
}
}