2020-05-12 09:46:20 +02:00
/ * *
* Standalone signaling server for the Nextcloud Spreed app .
* Copyright ( C ) 2017 struktur AG
*
* @ author Joachim Bauch < bauch @ struktur . de >
*
* @ license GNU AGPL version 3 or any later version
*
* This program is free software : you can redistribute it and / or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation , either version 3 of the License , or
* ( at your option ) any later version .
*
* This program is distributed in the hope that it will be useful ,
* but WITHOUT ANY WARRANTY ; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
* GNU Affero General Public License for more details .
*
* You should have received a copy of the GNU Affero General Public License
* along with this program . If not , see < http : //www.gnu.org/licenses/>.
* /
package signaling
import (
2020-08-31 13:58:28 +02:00
"context"
2020-05-12 09:46:20 +02:00
"encoding/json"
"fmt"
"log"
"reflect"
"strconv"
"sync"
2020-08-07 10:23:47 +02:00
"sync/atomic"
2020-05-12 09:46:20 +02:00
"time"
"github.com/dlintw/goconf"
"github.com/notedit/janus-go"
)
const (
pluginVideoRoom = "janus.plugin.videoroom"
keepaliveInterval = 30 * time . Second
videoPublisherUserId = 1
screenPublisherUserId = 2
initialReconnectInterval = 1 * time . Second
maxReconnectInterval = 32 * time . Second
defaultMaxStreamBitrate = 1024 * 1024
defaultMaxScreenBitrate = 2048 * 1024
streamTypeVideo = "video"
streamTypeScreen = "screen"
)
var (
streamTypeUserIds = map [ string ] uint64 {
streamTypeVideo : videoPublisherUserId ,
streamTypeScreen : screenPublisherUserId ,
}
)
func getPluginValue ( data janus . PluginData , pluginName string , key string ) interface { } {
if data . Plugin != pluginName {
return nil
}
return data . Data [ key ]
}
func convertIntValue ( value interface { } ) ( uint64 , error ) {
switch t := value . ( type ) {
case float64 :
if t < 0 {
2021-06-04 16:42:17 +02:00
return 0 , fmt . Errorf ( "Unsupported float64 number: %+v" , t )
2020-05-12 09:46:20 +02:00
}
return uint64 ( t ) , nil
case uint64 :
return t , nil
case int64 :
if t < 0 {
2021-06-04 16:42:17 +02:00
return 0 , fmt . Errorf ( "Unsupported int64 number: %+v" , t )
2020-05-12 09:46:20 +02:00
}
return uint64 ( t ) , nil
case json . Number :
r , err := t . Int64 ( )
if err != nil {
return 0 , err
} else if r < 0 {
2021-06-04 16:42:17 +02:00
return 0 , fmt . Errorf ( "Unsupported JSON number: %+v" , t )
2020-05-12 09:46:20 +02:00
}
return uint64 ( r ) , nil
default :
2021-06-04 16:42:17 +02:00
return 0 , fmt . Errorf ( "Unknown number type: %+v" , t )
2020-05-12 09:46:20 +02:00
}
}
func getPluginIntValue ( data janus . PluginData , pluginName string , key string ) uint64 {
val := getPluginValue ( data , pluginName , key )
if val == nil {
return 0
}
result , err := convertIntValue ( val )
if err != nil {
2021-06-04 16:42:17 +02:00
log . Printf ( "Invalid value %+v for %s: %s" , val , key , err )
2020-05-12 09:46:20 +02:00
result = 0
}
return result
}
func getPluginStringValue ( data janus . PluginData , pluginName string , key string ) string {
val := getPluginValue ( data , pluginName , key )
if val == nil {
return ""
}
strVal , ok := val . ( string )
if ! ok {
return ""
}
return strVal
}
// TODO(jojo): Lots of error handling still missing.
type clientInterface interface {
NotifyReconnected ( )
}
type mcuJanus struct {
2021-01-05 08:37:09 +01:00
// 64-bit members that are accessed atomically must be 64-bit aligned.
clientId uint64
2021-06-04 14:52:23 +02:00
url string
mu sync . Mutex
2020-05-12 09:46:20 +02:00
maxStreamBitrate int
maxScreenBitrate int
mcuTimeout time . Duration
gw * JanusGateway
session * JanusSession
handle * JanusHandle
closeChan chan bool
muClients sync . Mutex
clients map [ clientInterface ] bool
2021-06-04 14:52:23 +02:00
publishers map [ string ] * mcuJanusPublisher
publisherCreated Notifier
publisherConnected Notifier
2020-05-12 09:46:20 +02:00
reconnectTimer * time . Timer
reconnectInterval time . Duration
2020-05-28 16:02:04 +02:00
connectedSince time . Time
2020-08-07 10:23:47 +02:00
onConnected atomic . Value
onDisconnected atomic . Value
2020-05-12 09:46:20 +02:00
}
2020-08-07 10:23:47 +02:00
func emptyOnConnected ( ) { }
func emptyOnDisconnected ( ) { }
2021-06-04 14:52:23 +02:00
func NewMcuJanus ( url string , config * goconf . ConfigFile ) ( Mcu , error ) {
2020-05-12 09:46:20 +02:00
maxStreamBitrate , _ := config . GetInt ( "mcu" , "maxstreambitrate" )
if maxStreamBitrate <= 0 {
maxStreamBitrate = defaultMaxStreamBitrate
}
maxScreenBitrate , _ := config . GetInt ( "mcu" , "maxscreenbitrate" )
if maxScreenBitrate <= 0 {
maxScreenBitrate = defaultMaxScreenBitrate
}
mcuTimeoutSeconds , _ := config . GetInt ( "mcu" , "timeout" )
if mcuTimeoutSeconds <= 0 {
mcuTimeoutSeconds = defaultMcuTimeoutSeconds
}
mcuTimeout := time . Duration ( mcuTimeoutSeconds ) * time . Second
mcu := & mcuJanus {
url : url ,
maxStreamBitrate : maxStreamBitrate ,
maxScreenBitrate : maxScreenBitrate ,
mcuTimeout : mcuTimeout ,
closeChan : make ( chan bool , 1 ) ,
clients : make ( map [ clientInterface ] bool ) ,
2021-06-04 14:52:23 +02:00
publishers : make ( map [ string ] * mcuJanusPublisher ) ,
2020-05-12 09:46:20 +02:00
reconnectInterval : initialReconnectInterval ,
}
2020-08-07 10:23:47 +02:00
mcu . onConnected . Store ( emptyOnConnected )
mcu . onDisconnected . Store ( emptyOnDisconnected )
2020-05-12 09:46:20 +02:00
mcu . reconnectTimer = time . AfterFunc ( mcu . reconnectInterval , mcu . doReconnect )
mcu . reconnectTimer . Stop ( )
if err := mcu . reconnect ( ) ; err != nil {
return nil , err
}
return mcu , nil
}
func ( m * mcuJanus ) disconnect ( ) {
if m . handle != nil {
2021-04-26 17:19:39 +02:00
if _ , err := m . handle . Detach ( context . TODO ( ) ) ; err != nil {
log . Printf ( "Error detaching handle %d: %s" , m . handle . Id , err )
}
2020-05-12 09:46:20 +02:00
m . handle = nil
}
if m . session != nil {
m . closeChan <- true
2021-04-26 17:19:39 +02:00
if _ , err := m . session . Destroy ( context . TODO ( ) ) ; err != nil {
log . Printf ( "Error destroying session %d: %s" , m . session . Id , err )
}
2020-05-12 09:46:20 +02:00
m . session = nil
}
if m . gw != nil {
if err := m . gw . Close ( ) ; err != nil {
log . Println ( "Error while closing connection to MCU" , err )
}
m . gw = nil
}
}
func ( m * mcuJanus ) reconnect ( ) error {
m . disconnect ( )
gw , err := NewJanusGateway ( m . url , m )
if err != nil {
return err
}
m . gw = gw
m . reconnectTimer . Stop ( )
return nil
}
func ( m * mcuJanus ) doReconnect ( ) {
if err := m . reconnect ( ) ; err != nil {
m . scheduleReconnect ( err )
return
}
if err := m . Start ( ) ; err != nil {
m . scheduleReconnect ( err )
return
}
log . Println ( "Reconnection to Janus gateway successful" )
m . mu . Lock ( )
2021-06-04 14:52:23 +02:00
m . publishers = make ( map [ string ] * mcuJanusPublisher )
m . publisherCreated . Reset ( )
m . publisherConnected . Reset ( )
2020-05-12 09:46:20 +02:00
m . reconnectInterval = initialReconnectInterval
m . mu . Unlock ( )
m . muClients . Lock ( )
2020-07-31 14:57:04 +02:00
for client := range m . clients {
2020-05-12 09:46:20 +02:00
go client . NotifyReconnected ( )
}
m . muClients . Unlock ( )
}
func ( m * mcuJanus ) scheduleReconnect ( err error ) {
m . mu . Lock ( )
defer m . mu . Unlock ( )
m . reconnectTimer . Reset ( m . reconnectInterval )
if err == nil {
2021-06-04 16:42:17 +02:00
log . Printf ( "Connection to Janus gateway was interrupted, reconnecting in %s" , m . reconnectInterval )
2020-05-12 09:46:20 +02:00
} else {
2021-06-04 16:42:17 +02:00
log . Printf ( "Reconnect to Janus gateway failed (%s), reconnecting in %s" , err , m . reconnectInterval )
2020-05-12 09:46:20 +02:00
}
m . reconnectInterval = m . reconnectInterval * 2
if m . reconnectInterval > maxReconnectInterval {
m . reconnectInterval = maxReconnectInterval
}
}
func ( m * mcuJanus ) ConnectionInterrupted ( ) {
m . scheduleReconnect ( nil )
2020-08-07 10:23:47 +02:00
m . notifyOnDisconnected ( )
2020-05-12 09:46:20 +02:00
}
func ( m * mcuJanus ) Start ( ) error {
ctx := context . TODO ( )
info , err := m . gw . Info ( ctx )
if err != nil {
return err
}
2021-06-04 16:42:17 +02:00
log . Printf ( "Connected to %s %s by %s" , info . Name , info . VersionString , info . Author )
2020-05-12 09:46:20 +02:00
if plugin , found := info . Plugins [ pluginVideoRoom ] ; ! found {
return fmt . Errorf ( "Plugin %s is not supported" , pluginVideoRoom )
} else {
2021-06-04 16:42:17 +02:00
log . Printf ( "Found %s %s by %s" , plugin . Name , plugin . VersionString , plugin . Author )
2020-05-12 09:46:20 +02:00
}
if ! info . DataChannels {
return fmt . Errorf ( "Data channels are not supported" )
} else {
log . Println ( "Data channels are supported" )
}
if ! info . FullTrickle {
log . Println ( "WARNING: Full-Trickle is NOT enabled in Janus!" )
} else {
log . Println ( "Full-Trickle is enabled" )
}
log . Printf ( "Maximum bandwidth %d bits/sec per publishing stream" , m . maxStreamBitrate )
log . Printf ( "Maximum bandwidth %d bits/sec per screensharing stream" , m . maxScreenBitrate )
if m . session , err = m . gw . Create ( ctx ) ; err != nil {
m . disconnect ( )
return err
}
log . Println ( "Created Janus session" , m . session . Id )
2020-05-28 16:02:04 +02:00
m . connectedSince = time . Now ( )
2020-05-12 09:46:20 +02:00
if m . handle , err = m . session . Attach ( ctx , pluginVideoRoom ) ; err != nil {
m . disconnect ( )
return err
}
log . Println ( "Created Janus handle" , m . handle . Id )
go m . run ( )
2020-08-07 10:23:47 +02:00
m . notifyOnConnected ( )
2020-05-12 09:46:20 +02:00
return nil
}
func ( m * mcuJanus ) registerClient ( client clientInterface ) {
m . muClients . Lock ( )
m . clients [ client ] = true
m . muClients . Unlock ( )
}
func ( m * mcuJanus ) unregisterClient ( client clientInterface ) {
m . muClients . Lock ( )
delete ( m . clients , client )
m . muClients . Unlock ( )
}
func ( m * mcuJanus ) run ( ) {
ticker := time . NewTicker ( keepaliveInterval )
defer ticker . Stop ( )
loop :
for {
select {
case <- ticker . C :
m . sendKeepalive ( )
case <- m . closeChan :
break loop
}
}
}
func ( m * mcuJanus ) Stop ( ) {
m . disconnect ( )
m . reconnectTimer . Stop ( )
}
2020-08-31 13:07:03 +02:00
func ( m * mcuJanus ) Reload ( config * goconf . ConfigFile ) {
}
2020-08-07 10:23:47 +02:00
func ( m * mcuJanus ) SetOnConnected ( f func ( ) ) {
if f == nil {
f = emptyOnConnected
}
m . onConnected . Store ( f )
}
func ( m * mcuJanus ) notifyOnConnected ( ) {
f := m . onConnected . Load ( ) . ( func ( ) )
f ( )
}
func ( m * mcuJanus ) SetOnDisconnected ( f func ( ) ) {
if f == nil {
f = emptyOnDisconnected
}
m . onDisconnected . Store ( f )
}
func ( m * mcuJanus ) notifyOnDisconnected ( ) {
f := m . onDisconnected . Load ( ) . ( func ( ) )
f ( )
}
2020-05-28 16:02:04 +02:00
type mcuJanusConnectionStats struct {
Url string ` json:"url" `
Connected bool ` json:"connected" `
Publishers int64 ` json:"publishers" `
Clients int64 ` json:"clients" `
Uptime * time . Time ` json:"uptime,omitempty" `
}
func ( m * mcuJanus ) GetStats ( ) interface { } {
result := mcuJanusConnectionStats {
Url : m . url ,
}
if m . session != nil {
result . Connected = true
result . Uptime = & m . connectedSince
}
m . mu . Lock ( )
2021-06-04 14:52:23 +02:00
result . Publishers = int64 ( len ( m . publishers ) )
2020-05-28 16:02:04 +02:00
m . mu . Unlock ( )
m . muClients . Lock ( )
result . Clients = int64 ( len ( m . clients ) )
m . muClients . Unlock ( )
return result
}
2020-05-12 09:46:20 +02:00
func ( m * mcuJanus ) sendKeepalive ( ) {
ctx := context . TODO ( )
if _ , err := m . session . KeepAlive ( ctx ) ; err != nil {
log . Println ( "Could not send keepalive request" , err )
if e , ok := err . ( * janus . ErrorMsg ) ; ok {
switch e . Err . Code {
case JANUS_ERROR_SESSION_NOT_FOUND :
m . scheduleReconnect ( err )
}
}
}
}
type mcuJanusClient struct {
mcu * mcuJanus
listener McuListener
2021-04-26 17:19:39 +02:00
mu sync . Mutex // nolint
2020-05-12 09:46:20 +02:00
2020-10-26 09:51:33 +01:00
id uint64
2020-05-12 09:46:20 +02:00
session uint64
roomId uint64
streamType string
handle * JanusHandle
handleId uint64
closeChan chan bool
2020-08-13 14:40:06 +02:00
deferred chan func ( )
2020-05-12 09:46:20 +02:00
handleEvent func ( event * janus . EventMsg )
handleHangup func ( event * janus . HangupMsg )
handleDetached func ( event * janus . DetachedMsg )
handleConnected func ( event * janus . WebRTCUpMsg )
handleSlowLink func ( event * janus . SlowLinkMsg )
2021-04-21 14:50:41 +02:00
handleMedia func ( event * janus . MediaMsg )
2020-05-12 09:46:20 +02:00
}
func ( c * mcuJanusClient ) Id ( ) string {
2020-10-26 09:51:33 +01:00
return strconv . FormatUint ( c . id , 10 )
2020-05-12 09:46:20 +02:00
}
func ( c * mcuJanusClient ) StreamType ( ) string {
return c . streamType
}
func ( c * mcuJanusClient ) Close ( ctx context . Context ) {
}
func ( c * mcuJanusClient ) SendMessage ( ctx context . Context , message * MessageClientMessage , data * MessageClientMessageData , callback func ( error , map [ string ] interface { } ) ) {
}
2021-04-20 17:12:28 +02:00
func ( c * mcuJanusClient ) closeClient ( ctx context . Context ) bool {
2020-05-12 09:46:20 +02:00
if handle := c . handle ; handle != nil {
c . handle = nil
c . closeChan <- true
if _ , err := handle . Detach ( ctx ) ; err != nil {
if e , ok := err . ( * janus . ErrorMsg ) ; ! ok || e . Err . Code != JANUS_ERROR_HANDLE_NOT_FOUND {
log . Println ( "Could not detach client" , handle . Id , err )
}
}
2021-04-20 17:12:28 +02:00
return true
2020-05-12 09:46:20 +02:00
}
2021-04-20 17:12:28 +02:00
return false
2020-05-12 09:46:20 +02:00
}
func ( c * mcuJanusClient ) run ( handle * JanusHandle , closeChan chan bool ) {
loop :
for {
select {
case msg := <- handle . Events :
switch t := msg . ( type ) {
case * janus . EventMsg :
c . handleEvent ( t )
case * janus . HangupMsg :
c . handleHangup ( t )
case * janus . DetachedMsg :
c . handleDetached ( t )
case * janus . MediaMsg :
2021-04-21 14:50:41 +02:00
c . handleMedia ( t )
2020-05-12 09:46:20 +02:00
case * janus . WebRTCUpMsg :
c . handleConnected ( t )
case * janus . SlowLinkMsg :
c . handleSlowLink ( t )
case * TrickleMsg :
c . handleTrickle ( t )
default :
log . Println ( "Received unsupported event type" , msg , reflect . TypeOf ( msg ) )
}
2020-08-13 14:40:06 +02:00
case f := <- c . deferred :
f ( )
2020-05-12 09:46:20 +02:00
case <- closeChan :
break loop
}
}
}
func ( c * mcuJanusClient ) sendOffer ( ctx context . Context , offer map [ string ] interface { } , callback func ( error , map [ string ] interface { } ) ) {
handle := c . handle
if handle == nil {
callback ( ErrNotConnected , nil )
return
}
configure_msg := map [ string ] interface { } {
"request" : "configure" ,
"audio" : true ,
"video" : true ,
"data" : true ,
}
answer_msg , err := handle . Message ( ctx , configure_msg , offer )
if err != nil {
callback ( err , nil )
return
}
callback ( nil , answer_msg . Jsep )
}
func ( c * mcuJanusClient ) sendAnswer ( ctx context . Context , answer map [ string ] interface { } , callback func ( error , map [ string ] interface { } ) ) {
handle := c . handle
if handle == nil {
callback ( ErrNotConnected , nil )
return
}
start_msg := map [ string ] interface { } {
"request" : "start" ,
"room" : c . roomId ,
}
start_response , err := handle . Message ( ctx , start_msg , answer )
if err != nil {
callback ( err , nil )
return
}
log . Println ( "Started listener" , start_response )
callback ( nil , nil )
}
func ( c * mcuJanusClient ) sendCandidate ( ctx context . Context , candidate interface { } , callback func ( error , map [ string ] interface { } ) ) {
handle := c . handle
if handle == nil {
callback ( ErrNotConnected , nil )
return
}
if _ , err := handle . Trickle ( ctx , candidate ) ; err != nil {
callback ( err , nil )
return
}
callback ( nil , nil )
}
func ( c * mcuJanusClient ) handleTrickle ( event * TrickleMsg ) {
if event . Candidate . Completed {
c . listener . OnIceCompleted ( c )
} else {
c . listener . OnIceCandidate ( c , event . Candidate )
}
}
2021-04-29 08:53:22 +02:00
func ( c * mcuJanusClient ) selectStream ( ctx context . Context , substream int , temporal int , callback func ( error , map [ string ] interface { } ) ) {
handle := c . handle
if handle == nil {
callback ( ErrNotConnected , nil )
return
}
if substream < 0 && temporal < 0 {
callback ( nil , nil )
return
}
configure_msg := map [ string ] interface { } {
"request" : "configure" ,
}
if substream >= 0 {
configure_msg [ "substream" ] = substream
}
if temporal >= 0 {
configure_msg [ "temporal" ] = temporal
}
_ , err := handle . Message ( ctx , configure_msg , nil )
if err != nil {
callback ( err , nil )
return
}
callback ( nil , nil )
}
2021-04-23 14:21:01 +02:00
type publisherStatsCounter struct {
mu sync . Mutex
streamTypes map [ string ] bool
subscribers map [ string ] bool
}
func ( c * publisherStatsCounter ) Reset ( ) {
c . mu . Lock ( )
defer c . mu . Unlock ( )
count := len ( c . subscribers )
for streamType := range c . streamTypes {
statsMcuPublisherStreamTypesCurrent . WithLabelValues ( streamType ) . Dec ( )
statsMcuSubscriberStreamTypesCurrent . WithLabelValues ( streamType ) . Sub ( float64 ( count ) )
}
c . streamTypes = nil
c . subscribers = nil
}
func ( c * publisherStatsCounter ) EnableStream ( streamType string , enable bool ) {
c . mu . Lock ( )
defer c . mu . Unlock ( )
if enable == c . streamTypes [ streamType ] {
return
}
if enable {
if c . streamTypes == nil {
c . streamTypes = make ( map [ string ] bool )
}
c . streamTypes [ streamType ] = true
statsMcuPublisherStreamTypesCurrent . WithLabelValues ( streamType ) . Inc ( )
statsMcuSubscriberStreamTypesCurrent . WithLabelValues ( streamType ) . Add ( float64 ( len ( c . subscribers ) ) )
} else {
delete ( c . streamTypes , streamType )
statsMcuPublisherStreamTypesCurrent . WithLabelValues ( streamType ) . Dec ( )
statsMcuSubscriberStreamTypesCurrent . WithLabelValues ( streamType ) . Sub ( float64 ( len ( c . subscribers ) ) )
}
}
func ( c * publisherStatsCounter ) AddSubscriber ( id string ) {
c . mu . Lock ( )
defer c . mu . Unlock ( )
if c . subscribers [ id ] {
return
}
if c . subscribers == nil {
c . subscribers = make ( map [ string ] bool )
}
c . subscribers [ id ] = true
for streamType := range c . streamTypes {
statsMcuSubscriberStreamTypesCurrent . WithLabelValues ( streamType ) . Inc ( )
}
}
func ( c * publisherStatsCounter ) RemoveSubscriber ( id string ) {
c . mu . Lock ( )
defer c . mu . Unlock ( )
if ! c . subscribers [ id ] {
return
}
delete ( c . subscribers , id )
for streamType := range c . streamTypes {
statsMcuSubscriberStreamTypesCurrent . WithLabelValues ( streamType ) . Dec ( )
}
}
2020-05-12 09:46:20 +02:00
type mcuJanusPublisher struct {
mcuJanusClient
2021-11-08 12:06:59 +01:00
id string
bitrate int
mediaTypes MediaType
stats publisherStatsCounter
2021-04-23 14:21:01 +02:00
}
func ( m * mcuJanus ) SubscriberConnected ( id string , publisher string , streamType string ) {
m . mu . Lock ( )
defer m . mu . Unlock ( )
if p , found := m . publishers [ publisher + "|" + streamType ] ; found {
p . stats . AddSubscriber ( id )
}
}
func ( m * mcuJanus ) SubscriberDisconnected ( id string , publisher string , streamType string ) {
m . mu . Lock ( )
defer m . mu . Unlock ( )
if p , found := m . publishers [ publisher + "|" + streamType ] ; found {
p . stats . RemoveSubscriber ( id )
}
2020-05-12 09:46:20 +02:00
}
2021-01-21 14:39:33 +01:00
func min ( a , b int ) int {
if a <= b {
return a
} else {
return b
}
}
func ( m * mcuJanus ) getOrCreatePublisherHandle ( ctx context . Context , id string , streamType string , bitrate int ) ( * JanusHandle , uint64 , uint64 , error ) {
2020-05-12 09:46:20 +02:00
session := m . session
if session == nil {
return nil , 0 , 0 , ErrNotConnected
}
handle , err := session . Attach ( ctx , pluginVideoRoom )
if err != nil {
return nil , 0 , 0 , err
}
log . Printf ( "Attached %s as publisher %d to plugin %s in session %d" , streamType , handle . Id , pluginVideoRoom , session . Id )
2021-06-04 14:52:23 +02:00
create_msg := map [ string ] interface { } {
"request" : "create" ,
"description" : id + "|" + streamType ,
// We publish every stream in its own Janus room.
"publishers" : 1 ,
// Do not use the video-orientation RTP extension as it breaks video
// orientation changes in Firefox.
"videoorient_ext" : false ,
}
var maxBitrate int
if streamType == streamTypeScreen {
maxBitrate = m . maxScreenBitrate
} else {
maxBitrate = m . maxStreamBitrate
}
if bitrate <= 0 {
bitrate = maxBitrate
} else {
bitrate = min ( bitrate , maxBitrate )
}
create_msg [ "bitrate" ] = bitrate
create_response , err := handle . Request ( ctx , create_msg )
2020-05-12 09:46:20 +02:00
if err != nil {
2021-06-04 14:52:23 +02:00
if _ , err2 := handle . Detach ( ctx ) ; err2 != nil {
log . Printf ( "Error detaching handle %d: %s" , handle . Id , err2 )
}
return nil , 0 , 0 , err
2020-05-12 09:46:20 +02:00
}
2021-06-04 14:52:23 +02:00
roomId := getPluginIntValue ( create_response . PluginData , pluginVideoRoom , "room" )
2020-05-12 09:46:20 +02:00
if roomId == 0 {
2021-06-04 14:52:23 +02:00
if _ , err := handle . Detach ( ctx ) ; err != nil {
log . Printf ( "Error detaching handle %d: %s" , handle . Id , err )
2020-05-12 09:46:20 +02:00
}
2021-06-04 14:52:23 +02:00
return nil , 0 , 0 , fmt . Errorf ( "No room id received: %+v" , create_response )
2020-05-12 09:46:20 +02:00
}
2021-06-04 14:52:23 +02:00
log . Println ( "Created room" , roomId , create_response . PluginData )
2020-05-12 09:46:20 +02:00
msg := map [ string ] interface { } {
"request" : "join" ,
"ptype" : "publisher" ,
"room" : roomId ,
"id" : streamTypeUserIds [ streamType ] ,
}
response , err := handle . Message ( ctx , msg , nil )
if err != nil {
2021-04-26 17:19:39 +02:00
if _ , err2 := handle . Detach ( ctx ) ; err2 != nil {
log . Printf ( "Error detaching handle %d: %s" , handle . Id , err2 )
}
2020-05-12 09:46:20 +02:00
return nil , 0 , 0 , err
}
return handle , response . Session , roomId , nil
}
2021-11-08 12:06:59 +01:00
func ( m * mcuJanus ) NewPublisher ( ctx context . Context , listener McuListener , id string , streamType string , bitrate int , mediaTypes MediaType , initiator McuInitiator ) ( McuPublisher , error ) {
2020-05-12 09:46:20 +02:00
if _ , found := streamTypeUserIds [ streamType ] ; ! found {
return nil , fmt . Errorf ( "Unsupported stream type %s" , streamType )
}
2021-01-21 14:39:33 +01:00
handle , session , roomId , err := m . getOrCreatePublisherHandle ( ctx , id , streamType , bitrate )
2020-05-12 09:46:20 +02:00
if err != nil {
return nil , err
}
client := & mcuJanusPublisher {
mcuJanusClient : mcuJanusClient {
mcu : m ,
listener : listener ,
2020-10-26 09:51:33 +01:00
id : atomic . AddUint64 ( & m . clientId , 1 ) ,
2020-05-12 09:46:20 +02:00
session : session ,
roomId : roomId ,
streamType : streamType ,
handle : handle ,
handleId : handle . Id ,
closeChan : make ( chan bool , 1 ) ,
2020-08-13 14:40:06 +02:00
deferred : make ( chan func ( ) , 64 ) ,
2020-05-12 09:46:20 +02:00
} ,
2021-11-08 12:06:59 +01:00
id : id ,
bitrate : bitrate ,
mediaTypes : mediaTypes ,
2020-05-12 09:46:20 +02:00
}
client . mcuJanusClient . handleEvent = client . handleEvent
client . mcuJanusClient . handleHangup = client . handleHangup
client . mcuJanusClient . handleDetached = client . handleDetached
client . mcuJanusClient . handleConnected = client . handleConnected
client . mcuJanusClient . handleSlowLink = client . handleSlowLink
2021-04-21 14:50:41 +02:00
client . mcuJanusClient . handleMedia = client . handleMedia
2020-05-12 09:46:20 +02:00
m . registerClient ( client )
2020-10-26 09:51:33 +01:00
log . Printf ( "Publisher %s is using handle %d" , client . id , client . handleId )
2020-05-12 09:46:20 +02:00
go client . run ( handle , client . closeChan )
2021-06-04 14:52:23 +02:00
m . mu . Lock ( )
m . publishers [ id + "|" + streamType ] = client
m . publisherCreated . Notify ( id + "|" + streamType )
m . mu . Unlock ( )
2021-04-20 17:12:28 +02:00
statsPublishersCurrent . WithLabelValues ( streamType ) . Inc ( )
statsPublishersTotal . WithLabelValues ( streamType ) . Inc ( )
2020-05-12 09:46:20 +02:00
return client , nil
}
func ( p * mcuJanusPublisher ) handleEvent ( event * janus . EventMsg ) {
if videoroom := getPluginStringValue ( event . Plugindata , pluginVideoRoom , "videoroom" ) ; videoroom != "" {
ctx := context . TODO ( )
switch videoroom {
case "destroyed" :
log . Printf ( "Publisher %d: associated room has been destroyed, closing" , p . handleId )
go p . Close ( ctx )
case "slow_link" :
// Ignore, processed through "handleSlowLink" in the general events.
default :
log . Printf ( "Unsupported videoroom publisher event in %d: %+v" , p . handleId , event )
}
} else {
log . Printf ( "Unsupported publisher event in %d: %+v" , p . handleId , event )
}
}
func ( p * mcuJanusPublisher ) handleHangup ( event * janus . HangupMsg ) {
log . Printf ( "Publisher %d received hangup (%s), closing" , p . handleId , event . Reason )
go p . Close ( context . Background ( ) )
}
func ( p * mcuJanusPublisher ) handleDetached ( event * janus . DetachedMsg ) {
log . Printf ( "Publisher %d received detached, closing" , p . handleId )
go p . Close ( context . Background ( ) )
}
func ( p * mcuJanusPublisher ) handleConnected ( event * janus . WebRTCUpMsg ) {
log . Printf ( "Publisher %d received connected" , p . handleId )
2021-06-04 14:52:23 +02:00
p . mcu . publisherConnected . Notify ( p . id + "|" + p . streamType )
2020-05-12 09:46:20 +02:00
}
func ( p * mcuJanusPublisher ) handleSlowLink ( event * janus . SlowLinkMsg ) {
if event . Uplink {
2020-08-12 16:43:17 +02:00
log . Printf ( "Publisher %s (%d) is reporting %d lost packets on the uplink (Janus -> client)" , p . listener . PublicId ( ) , p . handleId , event . Lost )
2020-05-12 09:46:20 +02:00
} else {
2020-08-12 16:43:17 +02:00
log . Printf ( "Publisher %s (%d) is reporting %d lost packets on the downlink (client -> Janus)" , p . listener . PublicId ( ) , p . handleId , event . Lost )
2020-05-12 09:46:20 +02:00
}
}
2021-04-21 14:50:41 +02:00
func ( p * mcuJanusPublisher ) handleMedia ( event * janus . MediaMsg ) {
mediaType := event . Type
if mediaType == "video" && p . streamType == "screen" {
// We want to differentiate between audio, video and screensharing
mediaType = p . streamType
}
2021-04-23 14:21:01 +02:00
p . stats . EnableStream ( mediaType , event . Receiving )
2021-04-21 14:50:41 +02:00
}
2021-11-08 12:06:59 +01:00
func ( p * mcuJanusPublisher ) HasMedia ( mt MediaType ) bool {
return ( p . mediaTypes & mt ) == mt
}
2020-05-12 09:46:20 +02:00
func ( p * mcuJanusPublisher ) NotifyReconnected ( ) {
ctx := context . TODO ( )
2021-01-21 14:39:33 +01:00
handle , session , roomId , err := p . mcu . getOrCreatePublisherHandle ( ctx , p . id , p . streamType , p . bitrate )
2020-05-12 09:46:20 +02:00
if err != nil {
2021-06-04 16:42:17 +02:00
log . Printf ( "Could not reconnect publisher %s: %s" , p . id , err )
2020-05-12 09:46:20 +02:00
// TODO(jojo): Retry
return
}
p . handle = handle
p . handleId = handle . Id
p . session = session
p . roomId = roomId
2020-10-26 09:51:33 +01:00
log . Printf ( "Publisher %s reconnected on handle %d" , p . id , p . handleId )
2020-05-12 09:46:20 +02:00
}
func ( p * mcuJanusPublisher ) Close ( ctx context . Context ) {
notify := false
p . mu . Lock ( )
if handle := p . handle ; handle != nil && p . roomId != 0 {
destroy_msg := map [ string ] interface { } {
"request" : "destroy" ,
"room" : p . roomId ,
}
if _ , err := handle . Request ( ctx , destroy_msg ) ; err != nil {
log . Printf ( "Error destroying room %d: %s" , p . roomId , err )
} else {
log . Printf ( "Room %d destroyed" , p . roomId )
}
p . mcu . mu . Lock ( )
2021-06-04 14:52:23 +02:00
delete ( p . mcu . publishers , p . id + "|" + p . streamType )
2020-05-12 09:46:20 +02:00
p . mcu . mu . Unlock ( )
p . roomId = 0
notify = true
}
p . closeClient ( ctx )
p . mu . Unlock ( )
2021-04-23 14:21:01 +02:00
p . stats . Reset ( )
2021-04-21 14:50:41 +02:00
2020-05-12 09:46:20 +02:00
if notify {
2021-04-20 17:12:28 +02:00
statsPublishersCurrent . WithLabelValues ( p . streamType ) . Dec ( )
2020-05-12 09:46:20 +02:00
p . mcu . unregisterClient ( p )
p . listener . PublisherClosed ( p )
}
2021-04-23 14:21:01 +02:00
p . mcuJanusClient . Close ( ctx )
2020-05-12 09:46:20 +02:00
}
func ( p * mcuJanusPublisher ) SendMessage ( ctx context . Context , message * MessageClientMessage , data * MessageClientMessageData , callback func ( error , map [ string ] interface { } ) ) {
2021-04-20 17:12:28 +02:00
statsMcuMessagesTotal . WithLabelValues ( data . Type ) . Inc ( )
2020-05-12 09:46:20 +02:00
jsep_msg := data . Payload
switch data . Type {
case "offer" :
2020-08-13 14:40:06 +02:00
p . deferred <- func ( ) {
2020-05-12 09:46:20 +02:00
msgctx , cancel := context . WithTimeout ( context . Background ( ) , p . mcu . mcuTimeout )
defer cancel ( )
p . sendOffer ( msgctx , jsep_msg , callback )
2020-08-13 14:40:06 +02:00
}
2020-05-12 09:46:20 +02:00
case "candidate" :
2020-08-13 14:40:06 +02:00
p . deferred <- func ( ) {
2020-05-12 09:46:20 +02:00
msgctx , cancel := context . WithTimeout ( context . Background ( ) , p . mcu . mcuTimeout )
defer cancel ( )
p . sendCandidate ( msgctx , jsep_msg [ "candidate" ] , callback )
2020-08-13 14:40:06 +02:00
}
2020-05-12 09:46:20 +02:00
case "endOfCandidates" :
// Ignore
default :
go callback ( fmt . Errorf ( "Unsupported message type: %s" , data . Type ) , nil )
}
}
type mcuJanusSubscriber struct {
mcuJanusClient
publisher string
}
2021-06-04 14:52:23 +02:00
func ( m * mcuJanus ) getPublisher ( ctx context . Context , publisher string , streamType string ) ( * mcuJanusPublisher , error ) {
2020-05-12 09:46:20 +02:00
// Do the direct check immediately as this should be the normal case.
2021-06-04 14:52:23 +02:00
key := publisher + "|" + streamType
2020-05-12 09:46:20 +02:00
m . mu . Lock ( )
2021-06-04 14:52:23 +02:00
if result , found := m . publishers [ key ] ; found {
2020-05-12 09:46:20 +02:00
m . mu . Unlock ( )
2021-06-04 14:52:23 +02:00
return result , nil
2020-05-12 09:46:20 +02:00
}
2021-06-04 14:52:23 +02:00
waiter := m . publisherCreated . NewWaiter ( key )
2020-05-12 09:46:20 +02:00
m . mu . Unlock ( )
2021-06-04 14:52:23 +02:00
defer m . publisherCreated . Release ( waiter )
for {
m . mu . Lock ( )
result := m . publishers [ key ]
m . mu . Unlock ( )
if result != nil {
return result , nil
2020-05-12 09:46:20 +02:00
}
2021-06-04 14:52:23 +02:00
if err := waiter . Wait ( ctx ) ; err != nil {
return nil , err
2020-05-12 09:46:20 +02:00
}
}
}
2021-06-04 14:52:23 +02:00
func ( m * mcuJanus ) getOrCreateSubscriberHandle ( ctx context . Context , publisher string , streamType string ) ( * JanusHandle , * mcuJanusPublisher , error ) {
var pub * mcuJanusPublisher
2020-05-12 09:46:20 +02:00
var err error
2021-06-04 14:52:23 +02:00
if pub , err = m . getPublisher ( ctx , publisher , streamType ) ; err != nil {
return nil , nil , err
2020-05-12 09:46:20 +02:00
}
session := m . session
if session == nil {
2021-06-04 14:52:23 +02:00
return nil , nil , ErrNotConnected
2020-05-12 09:46:20 +02:00
}
handle , err := session . Attach ( ctx , pluginVideoRoom )
if err != nil {
2021-06-04 14:52:23 +02:00
return nil , nil , err
2020-05-12 09:46:20 +02:00
}
2021-06-04 14:52:23 +02:00
log . Printf ( "Attached subscriber to room %d of publisher %s in plugin %s in session %d as %d" , pub . roomId , publisher , pluginVideoRoom , session . Id , handle . Id )
return handle , pub , nil
2020-05-12 09:46:20 +02:00
}
func ( m * mcuJanus ) NewSubscriber ( ctx context . Context , listener McuListener , publisher string , streamType string ) ( McuSubscriber , error ) {
if _ , found := streamTypeUserIds [ streamType ] ; ! found {
return nil , fmt . Errorf ( "Unsupported stream type %s" , streamType )
}
2021-06-04 14:52:23 +02:00
handle , pub , err := m . getOrCreateSubscriberHandle ( ctx , publisher , streamType )
2020-05-12 09:46:20 +02:00
if err != nil {
return nil , err
}
client := & mcuJanusSubscriber {
mcuJanusClient : mcuJanusClient {
mcu : m ,
listener : listener ,
2020-10-26 09:51:33 +01:00
id : atomic . AddUint64 ( & m . clientId , 1 ) ,
2021-06-04 14:52:23 +02:00
roomId : pub . roomId ,
2020-05-12 09:46:20 +02:00
streamType : streamType ,
handle : handle ,
handleId : handle . Id ,
closeChan : make ( chan bool , 1 ) ,
2020-08-13 14:40:06 +02:00
deferred : make ( chan func ( ) , 64 ) ,
2020-05-12 09:46:20 +02:00
} ,
publisher : publisher ,
}
client . mcuJanusClient . handleEvent = client . handleEvent
client . mcuJanusClient . handleHangup = client . handleHangup
client . mcuJanusClient . handleDetached = client . handleDetached
client . mcuJanusClient . handleConnected = client . handleConnected
client . mcuJanusClient . handleSlowLink = client . handleSlowLink
2021-04-21 14:50:41 +02:00
client . mcuJanusClient . handleMedia = client . handleMedia
2020-05-12 09:46:20 +02:00
m . registerClient ( client )
go client . run ( handle , client . closeChan )
2021-04-20 17:12:28 +02:00
statsSubscribersCurrent . WithLabelValues ( streamType ) . Inc ( )
statsSubscribersTotal . WithLabelValues ( streamType ) . Inc ( )
2020-05-12 09:46:20 +02:00
return client , nil
}
func ( p * mcuJanusSubscriber ) Publisher ( ) string {
return p . publisher
}
func ( p * mcuJanusSubscriber ) handleEvent ( event * janus . EventMsg ) {
if videoroom := getPluginStringValue ( event . Plugindata , pluginVideoRoom , "videoroom" ) ; videoroom != "" {
ctx := context . TODO ( )
switch videoroom {
case "destroyed" :
log . Printf ( "Subscriber %d: associated room has been destroyed, closing" , p . handleId )
go p . Close ( ctx )
case "event" :
// Ignore events like selected substream / temporal layer.
case "slow_link" :
// Ignore, processed through "handleSlowLink" in the general events.
default :
log . Printf ( "Unsupported videoroom event %s for subscriber %d: %+v" , videoroom , p . handleId , event )
}
} else {
log . Printf ( "Unsupported event for subscriber %d: %+v" , p . handleId , event )
}
}
func ( p * mcuJanusSubscriber ) handleHangup ( event * janus . HangupMsg ) {
log . Printf ( "Subscriber %d received hangup (%s), closing" , p . handleId , event . Reason )
go p . Close ( context . Background ( ) )
}
func ( p * mcuJanusSubscriber ) handleDetached ( event * janus . DetachedMsg ) {
log . Printf ( "Subscriber %d received detached, closing" , p . handleId )
go p . Close ( context . Background ( ) )
}
func ( p * mcuJanusSubscriber ) handleConnected ( event * janus . WebRTCUpMsg ) {
log . Printf ( "Subscriber %d received connected" , p . handleId )
2021-04-23 14:21:01 +02:00
p . mcu . SubscriberConnected ( p . Id ( ) , p . publisher , p . streamType )
2020-05-12 09:46:20 +02:00
}
func ( p * mcuJanusSubscriber ) handleSlowLink ( event * janus . SlowLinkMsg ) {
if event . Uplink {
2020-08-12 16:43:17 +02:00
log . Printf ( "Subscriber %s (%d) is reporting %d lost packets on the uplink (Janus -> client)" , p . listener . PublicId ( ) , p . handleId , event . Lost )
2020-05-12 09:46:20 +02:00
} else {
2020-08-12 16:43:17 +02:00
log . Printf ( "Subscriber %s (%d) is reporting %d lost packets on the downlink (client -> Janus)" , p . listener . PublicId ( ) , p . handleId , event . Lost )
2020-05-12 09:46:20 +02:00
}
}
2021-04-21 14:50:41 +02:00
func ( p * mcuJanusSubscriber ) handleMedia ( event * janus . MediaMsg ) {
// Only triggered for publishers
}
2020-05-12 09:46:20 +02:00
func ( p * mcuJanusSubscriber ) NotifyReconnected ( ) {
ctx , cancel := context . WithTimeout ( context . Background ( ) , p . mcu . mcuTimeout )
defer cancel ( )
2021-06-04 14:52:23 +02:00
handle , pub , err := p . mcu . getOrCreateSubscriberHandle ( ctx , p . publisher , p . streamType )
2020-05-12 09:46:20 +02:00
if err != nil {
// TODO(jojo): Retry?
2021-06-04 16:42:17 +02:00
log . Printf ( "Could not reconnect subscriber for publisher %s: %s" , p . publisher , err )
2020-05-12 09:46:20 +02:00
p . Close ( context . Background ( ) )
return
}
p . handle = handle
p . handleId = handle . Id
2021-06-04 14:52:23 +02:00
p . roomId = pub . roomId
2020-10-26 09:51:33 +01:00
log . Printf ( "Subscriber %d for publisher %s reconnected on handle %d" , p . id , p . publisher , p . handleId )
2020-05-12 09:46:20 +02:00
}
func ( p * mcuJanusSubscriber ) Close ( ctx context . Context ) {
p . mu . Lock ( )
2021-04-20 17:12:28 +02:00
closed := p . closeClient ( ctx )
2020-05-12 09:46:20 +02:00
p . mu . Unlock ( )
2021-04-20 17:12:28 +02:00
if closed {
2021-04-23 14:21:01 +02:00
p . mcu . SubscriberDisconnected ( p . Id ( ) , p . publisher , p . streamType )
2021-04-20 17:12:28 +02:00
statsSubscribersCurrent . WithLabelValues ( p . streamType ) . Dec ( )
}
2020-05-12 09:46:20 +02:00
p . mcu . unregisterClient ( p )
p . listener . SubscriberClosed ( p )
2021-04-23 14:21:01 +02:00
p . mcuJanusClient . Close ( ctx )
2020-05-12 09:46:20 +02:00
}
func ( p * mcuJanusSubscriber ) joinRoom ( ctx context . Context , callback func ( error , map [ string ] interface { } ) ) {
handle := p . handle
if handle == nil {
callback ( ErrNotConnected , nil )
return
}
2021-06-04 14:52:23 +02:00
waiter := p . mcu . publisherConnected . NewWaiter ( p . publisher + "|" + p . streamType )
defer p . mcu . publisherConnected . Release ( waiter )
2020-05-12 09:46:20 +02:00
2021-04-20 17:12:28 +02:00
loggedNotPublishingYet := false
2020-05-12 09:46:20 +02:00
retry :
join_msg := map [ string ] interface { } {
"request" : "join" ,
2021-02-24 08:49:10 +01:00
"ptype" : "subscriber" ,
2020-05-12 09:46:20 +02:00
"room" : p . roomId ,
"feed" : streamTypeUserIds [ p . streamType ] ,
}
join_response , err := handle . Message ( ctx , join_msg , nil )
if err != nil {
callback ( err , nil )
return
}
if error_code := getPluginIntValue ( join_response . Plugindata , pluginVideoRoom , "error_code" ) ; error_code > 0 {
switch error_code {
case JANUS_VIDEOROOM_ERROR_ALREADY_JOINED :
// The subscriber is already connected to the room. This can happen
// if a client leaves a call but keeps the subscriber objects active.
// On joining the call again, the subscriber tries to join on the
// MCU which will fail because he is still connected.
// To get a new Offer SDP, we have to tear down the session on the
// MCU and join again.
p . mu . Lock ( )
p . closeClient ( ctx )
p . mu . Unlock ( )
2021-06-04 14:52:23 +02:00
var pub * mcuJanusPublisher
handle , pub , err = p . mcu . getOrCreateSubscriberHandle ( ctx , p . publisher , p . streamType )
2020-05-12 09:46:20 +02:00
if err != nil {
2020-08-13 14:51:07 +02:00
// Reconnection didn't work, need to unregister/remove subscriber
// so a new object will be created if the request is retried.
p . mcu . unregisterClient ( p )
p . listener . SubscriberClosed ( p )
2020-05-12 09:46:20 +02:00
callback ( fmt . Errorf ( "Already connected as subscriber for %s, error during re-joining: %s" , p . streamType , err ) , nil )
return
}
p . handle = handle
p . handleId = handle . Id
2021-06-04 14:52:23 +02:00
p . roomId = pub . roomId
2020-05-12 09:46:20 +02:00
p . closeChan = make ( chan bool , 1 )
go p . run ( p . handle , p . closeChan )
2020-10-26 09:51:33 +01:00
log . Printf ( "Already connected subscriber %d for %s, leaving and re-joining on handle %d" , p . id , p . streamType , p . handleId )
2020-05-12 09:46:20 +02:00
goto retry
case JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM :
fallthrough
case JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED :
switch error_code {
case JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM :
log . Printf ( "Publisher %s not created yet for %s, wait and retry to join room %d as subscriber" , p . publisher , p . streamType , p . roomId )
case JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED :
log . Printf ( "Publisher %s not sending yet for %s, wait and retry to join room %d as subscriber" , p . publisher , p . streamType , p . roomId )
}
2021-06-04 14:52:23 +02:00
2021-04-20 17:12:28 +02:00
if ! loggedNotPublishingYet {
loggedNotPublishingYet = true
statsWaitingForPublisherTotal . WithLabelValues ( p . streamType ) . Inc ( )
}
2021-06-04 14:52:23 +02:00
if err := waiter . Wait ( ctx ) ; err != nil {
callback ( err , nil )
2020-05-12 09:46:20 +02:00
return
}
2021-06-04 14:52:23 +02:00
log . Printf ( "Retry subscribing %s from %s" , p . streamType , p . publisher )
2020-05-12 09:46:20 +02:00
goto retry
default :
// TODO(jojo): Should we handle other errors, too?
callback ( fmt . Errorf ( "Error joining room as subscriber: %+v" , join_response ) , nil )
return
}
}
//log.Println("Joined as listener", join_response)
p . session = join_response . Session
callback ( nil , join_response . Jsep )
}
func ( p * mcuJanusSubscriber ) SendMessage ( ctx context . Context , message * MessageClientMessage , data * MessageClientMessageData , callback func ( error , map [ string ] interface { } ) ) {
2021-04-20 17:12:28 +02:00
statsMcuMessagesTotal . WithLabelValues ( data . Type ) . Inc ( )
2020-05-12 09:46:20 +02:00
jsep_msg := data . Payload
switch data . Type {
case "requestoffer" :
fallthrough
case "sendoffer" :
2020-08-13 14:40:06 +02:00
p . deferred <- func ( ) {
2020-05-12 09:46:20 +02:00
msgctx , cancel := context . WithTimeout ( context . Background ( ) , p . mcu . mcuTimeout )
defer cancel ( )
p . joinRoom ( msgctx , callback )
2020-08-13 14:40:06 +02:00
}
2020-05-12 09:46:20 +02:00
case "answer" :
2020-08-13 14:40:06 +02:00
p . deferred <- func ( ) {
2020-05-12 09:46:20 +02:00
msgctx , cancel := context . WithTimeout ( context . Background ( ) , p . mcu . mcuTimeout )
defer cancel ( )
p . sendAnswer ( msgctx , jsep_msg , callback )
2020-08-13 14:40:06 +02:00
}
2020-05-12 09:46:20 +02:00
case "candidate" :
2020-08-13 14:40:06 +02:00
p . deferred <- func ( ) {
2020-05-12 09:46:20 +02:00
msgctx , cancel := context . WithTimeout ( context . Background ( ) , p . mcu . mcuTimeout )
defer cancel ( )
p . sendCandidate ( msgctx , jsep_msg [ "candidate" ] , callback )
2020-08-13 14:40:06 +02:00
}
2020-05-12 09:46:20 +02:00
case "endOfCandidates" :
// Ignore
2021-04-29 08:53:22 +02:00
case "selectStream" :
substream := - 1
if s , found := jsep_msg [ "substream" ] ; found {
switch s := s . ( type ) {
case int :
substream = s
case float32 :
substream = int ( s )
case float64 :
substream = int ( s )
default :
go callback ( fmt . Errorf ( "Unsupported substream value: %v" , s ) , nil )
return
}
}
temporal := - 1
if s , found := jsep_msg [ "temporal" ] ; found {
switch s := s . ( type ) {
case int :
temporal = s
case float32 :
temporal = int ( s )
case float64 :
temporal = int ( s )
default :
go callback ( fmt . Errorf ( "Unsupported temporal value: %v" , s ) , nil )
return
}
}
if substream == - 1 && temporal == - 1 {
// Nothing to do
go callback ( nil , nil )
return
}
p . deferred <- func ( ) {
msgctx , cancel := context . WithTimeout ( context . Background ( ) , p . mcu . mcuTimeout )
defer cancel ( )
p . selectStream ( msgctx , substream , temporal , callback )
}
2020-05-12 09:46:20 +02:00
default :
// Return error asynchronously
go callback ( fmt . Errorf ( "Unsupported message type: %s" , data . Type ) , nil )
}
}