diff --git a/janus_client.go b/janus_client.go index b7b33a5..5716bef 100644 --- a/janus_client.go +++ b/janus_client.go @@ -219,6 +219,17 @@ type dummyGatewayListener struct { func (l *dummyGatewayListener) ConnectionInterrupted() { } +type JanusGatewayInterface interface { + Info(context.Context) (*InfoMsg, error) + Create(context.Context) (*JanusSession, error) + Close() error + + send(map[string]interface{}, *transaction) (uint64, error) + removeTransaction(uint64) + + removeSession(*JanusSession) +} + // Gateway represents a connection to an instance of the Janus Gateway. type JanusGateway struct { listener GatewayListener @@ -560,12 +571,18 @@ func (gateway *JanusGateway) Create(ctx context.Context) (*JanusSession, error) // Store this session gateway.Lock() + defer gateway.Unlock() gateway.Sessions[session.Id] = session - gateway.Unlock() return session, nil } +func (gateway *JanusGateway) removeSession(session *JanusSession) { + gateway.Lock() + defer gateway.Unlock() + delete(gateway.Sessions, session.Id) +} + // Session represents a session instance on the Janus Gateway. type JanusSession struct { // Id is the session_id of this session @@ -578,7 +595,7 @@ type JanusSession struct { // and Session.Unlock() methods provided by the embedded sync.Mutex. sync.Mutex - gateway *JanusGateway + gateway JanusGatewayInterface } func (session *JanusSession) send(msg map[string]interface{}, t *transaction) (uint64, error) { @@ -670,9 +687,7 @@ func (session *JanusSession) Destroy(ctx context.Context) (*janus.AckMsg, error) } // Remove this session from the gateway - session.gateway.Lock() - delete(session.gateway.Sessions, session.Id) - session.gateway.Unlock() + session.gateway.removeSession(session) return ack, nil } diff --git a/mcu_janus.go b/mcu_janus.go index e25adca..1a68003 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -78,6 +78,11 @@ func convertIntValue(value interface{}) (uint64, error) { return uint64(t), nil case uint64: return t, nil + case int: + if t < 0 { + return 0, fmt.Errorf("Unsupported int number: %+v", t) + } + return uint64(t), nil case int64: if t < 0 { return 0, fmt.Errorf("Unsupported int64 number: %+v", t) @@ -92,7 +97,7 @@ func convertIntValue(value interface{}) (uint64, error) { } return uint64(r), nil default: - return 0, fmt.Errorf("Unknown number type: %+v", t) + return 0, fmt.Errorf("Unknown number type: %+v (%T)", t, t) } } @@ -170,7 +175,9 @@ type mcuJanus struct { settings McuSettings - gw *JanusGateway + createJanusGateway func(ctx context.Context, wsURL string, listener GatewayListener) (JanusGatewayInterface, error) + + gw JanusGatewayInterface session *JanusSession handle *JanusHandle @@ -213,6 +220,9 @@ func NewMcuJanus(ctx context.Context, url string, config *goconf.ConfigFile) (Mc publishers: make(map[string]*mcuJanusPublisher), remotePublishers: make(map[string]*mcuJanusRemotePublisher), + createJanusGateway: func(ctx context.Context, wsURL string, listener GatewayListener) (JanusGatewayInterface, error) { + return NewJanusGateway(ctx, wsURL, listener) + }, reconnectInterval: initialReconnectInterval, } mcu.onConnected.Store(emptyOnConnected) @@ -222,8 +232,10 @@ func NewMcuJanus(ctx context.Context, url string, config *goconf.ConfigFile) (Mc mcu.doReconnect(context.Background()) }) mcu.reconnectTimer.Stop() - if err := mcu.reconnect(ctx); err != nil { - return nil, err + if mcu.url != "" { + if err := mcu.reconnect(ctx); err != nil { + return nil, err + } } return mcu, nil } @@ -252,7 +264,7 @@ func (m *mcuJanus) disconnect() { func (m *mcuJanus) reconnect(ctx context.Context) error { m.disconnect() - gw, err := NewJanusGateway(ctx, m.url, m) + gw, err := m.createJanusGateway(ctx, m.url, m) if err != nil { return err } @@ -317,6 +329,11 @@ func (m *mcuJanus) hasRemotePublisher() bool { } func (m *mcuJanus) Start(ctx context.Context) error { + if m.url == "" { + if err := m.reconnect(ctx); err != nil { + return err + } + } info, err := m.gw.Info(ctx) if err != nil { return err