Make JanusGateway an interface to help with testing.

This commit is contained in:
Joachim Bauch 2024-11-07 16:57:12 +01:00
commit 469e97f483
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
2 changed files with 42 additions and 10 deletions

View file

@ -219,6 +219,17 @@ type dummyGatewayListener struct {
func (l *dummyGatewayListener) ConnectionInterrupted() {
}
type JanusGatewayInterface interface {
Info(context.Context) (*InfoMsg, error)
Create(context.Context) (*JanusSession, error)
Close() error
send(map[string]interface{}, *transaction) (uint64, error)
removeTransaction(uint64)
removeSession(*JanusSession)
}
// Gateway represents a connection to an instance of the Janus Gateway.
type JanusGateway struct {
listener GatewayListener
@ -560,12 +571,18 @@ func (gateway *JanusGateway) Create(ctx context.Context) (*JanusSession, error)
// Store this session
gateway.Lock()
defer gateway.Unlock()
gateway.Sessions[session.Id] = session
gateway.Unlock()
return session, nil
}
func (gateway *JanusGateway) removeSession(session *JanusSession) {
gateway.Lock()
defer gateway.Unlock()
delete(gateway.Sessions, session.Id)
}
// Session represents a session instance on the Janus Gateway.
type JanusSession struct {
// Id is the session_id of this session
@ -578,7 +595,7 @@ type JanusSession struct {
// and Session.Unlock() methods provided by the embedded sync.Mutex.
sync.Mutex
gateway *JanusGateway
gateway JanusGatewayInterface
}
func (session *JanusSession) send(msg map[string]interface{}, t *transaction) (uint64, error) {
@ -670,9 +687,7 @@ func (session *JanusSession) Destroy(ctx context.Context) (*janus.AckMsg, error)
}
// Remove this session from the gateway
session.gateway.Lock()
delete(session.gateway.Sessions, session.Id)
session.gateway.Unlock()
session.gateway.removeSession(session)
return ack, nil
}

View file

@ -78,6 +78,11 @@ func convertIntValue(value interface{}) (uint64, error) {
return uint64(t), nil
case uint64:
return t, nil
case int:
if t < 0 {
return 0, fmt.Errorf("Unsupported int number: %+v", t)
}
return uint64(t), nil
case int64:
if t < 0 {
return 0, fmt.Errorf("Unsupported int64 number: %+v", t)
@ -92,7 +97,7 @@ func convertIntValue(value interface{}) (uint64, error) {
}
return uint64(r), nil
default:
return 0, fmt.Errorf("Unknown number type: %+v", t)
return 0, fmt.Errorf("Unknown number type: %+v (%T)", t, t)
}
}
@ -170,7 +175,9 @@ type mcuJanus struct {
settings McuSettings
gw *JanusGateway
createJanusGateway func(ctx context.Context, wsURL string, listener GatewayListener) (JanusGatewayInterface, error)
gw JanusGatewayInterface
session *JanusSession
handle *JanusHandle
@ -213,6 +220,9 @@ func NewMcuJanus(ctx context.Context, url string, config *goconf.ConfigFile) (Mc
publishers: make(map[string]*mcuJanusPublisher),
remotePublishers: make(map[string]*mcuJanusRemotePublisher),
createJanusGateway: func(ctx context.Context, wsURL string, listener GatewayListener) (JanusGatewayInterface, error) {
return NewJanusGateway(ctx, wsURL, listener)
},
reconnectInterval: initialReconnectInterval,
}
mcu.onConnected.Store(emptyOnConnected)
@ -222,8 +232,10 @@ func NewMcuJanus(ctx context.Context, url string, config *goconf.ConfigFile) (Mc
mcu.doReconnect(context.Background())
})
mcu.reconnectTimer.Stop()
if err := mcu.reconnect(ctx); err != nil {
return nil, err
if mcu.url != "" {
if err := mcu.reconnect(ctx); err != nil {
return nil, err
}
}
return mcu, nil
}
@ -252,7 +264,7 @@ func (m *mcuJanus) disconnect() {
func (m *mcuJanus) reconnect(ctx context.Context) error {
m.disconnect()
gw, err := NewJanusGateway(ctx, m.url, m)
gw, err := m.createJanusGateway(ctx, m.url, m)
if err != nil {
return err
}
@ -317,6 +329,11 @@ func (m *mcuJanus) hasRemotePublisher() bool {
}
func (m *mcuJanus) Start(ctx context.Context) error {
if m.url == "" {
if err := m.reconnect(ctx); err != nil {
return err
}
}
info, err := m.gw.Info(ctx)
if err != nil {
return err