diff --git a/src/signaling/mcu_common.go b/src/signaling/mcu_common.go index 7bfd250..3429e9d 100644 --- a/src/signaling/mcu_common.go +++ b/src/signaling/mcu_common.go @@ -22,6 +22,8 @@ package signaling import ( + "fmt" + "golang.org/x/net/context" ) @@ -31,8 +33,12 @@ const ( McuTypeDefault = McuTypeJanus ) +var ( + ErrNotConnected = fmt.Errorf("Not connected") +) + type McuListener interface { - Session + PublicId() string OnIceCandidate(client McuClient, candidate interface{}) OnIceCompleted(client McuClient) @@ -45,6 +51,9 @@ type Mcu interface { Start() error Stop() + SetOnConnected(func()) + SetOnDisconnected(func()) + GetStats() interface{} NewPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error) diff --git a/src/signaling/mcu_janus.go b/src/signaling/mcu_janus.go index 0acd282..84bd18d 100644 --- a/src/signaling/mcu_janus.go +++ b/src/signaling/mcu_janus.go @@ -28,6 +28,7 @@ import ( "reflect" "strconv" "sync" + "sync/atomic" "time" "github.com/dlintw/goconf" @@ -64,8 +65,6 @@ var ( videoPublisherUserId: streamTypeVideo, screenPublisherUserId: streamTypeScreen, } - - ErrNotConnected = fmt.Errorf("Not connected") ) func getPluginValue(data janus.PluginData, pluginName string, key string) interface{} { @@ -161,8 +160,13 @@ type mcuJanus struct { reconnectInterval time.Duration connectedSince time.Time + onConnected atomic.Value + onDisconnected atomic.Value } +func emptyOnConnected() {} +func emptyOnDisconnected() {} + func NewMcuJanus(url string, config *goconf.ConfigFile, nats NatsClient) (Mcu, error) { maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate") if maxStreamBitrate <= 0 { @@ -190,6 +194,9 @@ func NewMcuJanus(url string, config *goconf.ConfigFile, nats NatsClient) (Mcu, e reconnectInterval: initialReconnectInterval, } + mcu.onConnected.Store(emptyOnConnected) + mcu.onDisconnected.Store(emptyOnDisconnected) + mcu.reconnectTimer = time.AfterFunc(mcu.reconnectInterval, mcu.doReconnect) mcu.reconnectTimer.Stop() if err := mcu.reconnect(); err != nil { @@ -269,6 +276,7 @@ func (m *mcuJanus) scheduleReconnect(err error) { func (m *mcuJanus) ConnectionInterrupted() { m.scheduleReconnect(nil) + m.notifyOnDisconnected() } func (m *mcuJanus) Start() error { @@ -314,6 +322,8 @@ func (m *mcuJanus) Start() error { log.Println("Created Janus handle", m.handle.Id) go m.run() + + m.notifyOnConnected() return nil } @@ -349,6 +359,32 @@ func (m *mcuJanus) Stop() { m.reconnectTimer.Stop() } +func (m *mcuJanus) SetOnConnected(f func()) { + if f == nil { + f = emptyOnConnected + } + + m.onConnected.Store(f) +} + +func (m *mcuJanus) notifyOnConnected() { + f := m.onConnected.Load().(func()) + f() +} + +func (m *mcuJanus) SetOnDisconnected(f func()) { + if f == nil { + f = emptyOnDisconnected + } + + m.onDisconnected.Store(f) +} + +func (m *mcuJanus) notifyOnDisconnected() { + f := m.onDisconnected.Load().(func()) + f() +} + type mcuJanusConnectionStats struct { Url string `json:"url"` Connected bool `json:"connected"` diff --git a/src/signaling/mcu_test.go b/src/signaling/mcu_test.go index 8bdaad3..7062a49 100644 --- a/src/signaling/mcu_test.go +++ b/src/signaling/mcu_test.go @@ -41,6 +41,12 @@ func (m *TestMCU) Start() error { func (m *TestMCU) Stop() { } +func (m *TestMCU) SetOnConnected(f func()) { +} + +func (m *TestMCU) SetOnDisconnected(f func()) { +} + func (m *TestMCU) GetStats() interface{} { return nil }