Use publisher/subscriber ids that stay the same across reconnections.

Otherwise it could happen that objects were kept in maps (e.g. on the
proxy server) if they reconnected to the WebRTC gateway during their
lifetime. This resulted in incorrect load calculations.
This commit is contained in:
Joachim Bauch 2020-10-26 09:51:33 +01:00
parent 7681e26fbb
commit 2119993606
Failed to extract signature

View file

@ -150,6 +150,7 @@ type mcuJanus struct {
closeChan chan bool closeChan chan bool
clientId uint64
muClients sync.Mutex muClients sync.Mutex
clients map[clientInterface]bool clients map[clientInterface]bool
@ -430,6 +431,7 @@ type mcuJanusClient struct {
listener McuListener listener McuListener
mu sync.Mutex mu sync.Mutex
id uint64
session uint64 session uint64
roomId uint64 roomId uint64
streamType string streamType string
@ -447,7 +449,7 @@ type mcuJanusClient struct {
} }
func (c *mcuJanusClient) Id() string { func (c *mcuJanusClient) Id() string {
return strconv.FormatUint(c.handleId, 10) return strconv.FormatUint(c.id, 10)
} }
func (c *mcuJanusClient) StreamType() string { func (c *mcuJanusClient) StreamType() string {
@ -652,6 +654,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
mcu: m, mcu: m,
listener: listener, listener: listener,
id: atomic.AddUint64(&m.clientId, 1),
session: session, session: session,
roomId: roomId, roomId: roomId,
streamType: streamType, streamType: streamType,
@ -676,6 +679,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
if err := client.publishNats("created"); err != nil { if err := client.publishNats("created"); err != nil {
log.Printf("Could not publish \"created\" event for publisher %s: %s\n", id, err) log.Printf("Could not publish \"created\" event for publisher %s: %s\n", id, err)
} }
log.Printf("Publisher %s is using handle %d", client.id, client.handleId)
go client.run(handle, client.closeChan) go client.run(handle, client.closeChan)
return client, nil return client, nil
} }
@ -743,7 +747,7 @@ func (p *mcuJanusPublisher) NotifyReconnected() {
p.mcu.mu.Lock() p.mcu.mu.Lock()
p.mcu.publisherRoomIds[p.id+"|"+p.streamType] = roomId p.mcu.publisherRoomIds[p.id+"|"+p.streamType] = roomId
p.mcu.mu.Unlock() p.mcu.mu.Unlock()
log.Printf("Publisher %s reconnected\n", p.id) log.Printf("Publisher %s reconnected on handle %d", p.id, p.handleId)
} }
func (p *mcuJanusPublisher) Close(ctx context.Context) { func (p *mcuJanusPublisher) Close(ctx context.Context) {
@ -945,6 +949,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ
mcu: m, mcu: m,
listener: listener, listener: listener,
id: atomic.AddUint64(&m.clientId, 1),
roomId: roomId, roomId: roomId,
streamType: streamType, streamType: streamType,
@ -1024,7 +1029,7 @@ func (p *mcuJanusSubscriber) NotifyReconnected() {
p.handle = handle p.handle = handle
p.handleId = handle.Id p.handleId = handle.Id
p.roomId = roomId p.roomId = roomId
log.Printf("Reconnected subscriber for publisher %s\n", p.publisher) log.Printf("Subscriber %d for publisher %s reconnected on handle %d", p.id, p.publisher, p.handleId)
} }
func (p *mcuJanusSubscriber) Close(ctx context.Context) { func (p *mcuJanusSubscriber) Close(ctx context.Context) {
@ -1093,7 +1098,7 @@ retry:
p.roomId = roomId p.roomId = roomId
p.closeChan = make(chan bool, 1) p.closeChan = make(chan bool, 1)
go p.run(p.handle, p.closeChan) go p.run(p.handle, p.closeChan)
log.Printf("Already connected as subscriber for %s, leaving and re-joining", p.streamType) log.Printf("Already connected subscriber %d for %s, leaving and re-joining on handle %d", p.id, p.streamType, p.handleId)
goto retry goto retry
case JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM: case JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM:
fallthrough fallthrough