From 7681e26fbb3aa921988919df583172568dfb4576 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Mon, 26 Oct 2020 09:34:11 +0100 Subject: [PATCH 1/2] Use publisher/subscriber objects as map keys. --- src/proxy/proxy_session.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/proxy/proxy_session.go b/src/proxy/proxy_session.go index ea91949..2315389 100644 --- a/src/proxy/proxy_session.go +++ b/src/proxy/proxy_session.go @@ -48,11 +48,11 @@ type ProxySession struct { publishersLock sync.Mutex publishers map[string]signaling.McuPublisher - publisherIds map[string]string + publisherIds map[signaling.McuPublisher]string subscribersLock sync.Mutex subscribers map[string]signaling.McuSubscriber - subscriberIds map[string]string + subscriberIds map[signaling.McuSubscriber]string } func NewProxySession(proxy *ProxyServer, sid uint64, id string) *ProxySession { @@ -63,10 +63,10 @@ func NewProxySession(proxy *ProxyServer, sid uint64, id string) *ProxySession { lastUsed: time.Now().UnixNano(), publishers: make(map[string]signaling.McuPublisher), - publisherIds: make(map[string]string), + publisherIds: make(map[signaling.McuPublisher]string), subscribers: make(map[string]signaling.McuSubscriber), - subscriberIds: make(map[string]string), + subscriberIds: make(map[signaling.McuSubscriber]string), } } @@ -200,20 +200,20 @@ func (s *ProxySession) StorePublisher(ctx context.Context, id string, publisher defer s.publishersLock.Unlock() s.publishers[id] = publisher - s.publisherIds[publisher.Id()] = id + s.publisherIds[publisher] = id } func (s *ProxySession) DeletePublisher(publisher signaling.McuPublisher) string { s.publishersLock.Lock() defer s.publishersLock.Unlock() - id, found := s.publisherIds[publisher.Id()] + id, found := s.publisherIds[publisher] if !found { return "" } delete(s.publishers, id) - delete(s.publisherIds, publisher.Id()) + delete(s.publisherIds, publisher) return id } @@ -222,20 +222,20 @@ func (s *ProxySession) StoreSubscriber(ctx context.Context, id string, subscribe defer s.subscribersLock.Unlock() s.subscribers[id] = subscriber - s.subscriberIds[subscriber.Id()] = id + s.subscriberIds[subscriber] = id } func (s *ProxySession) DeleteSubscriber(subscriber signaling.McuSubscriber) string { s.subscribersLock.Lock() defer s.subscribersLock.Unlock() - id, found := s.subscriberIds[subscriber.Id()] + id, found := s.subscriberIds[subscriber] if !found { return "" } delete(s.subscribers, id) - delete(s.subscriberIds, subscriber.Id()) + delete(s.subscriberIds, subscriber) return id } @@ -249,7 +249,7 @@ func (s *ProxySession) clearPublishers() { } }(s.publishers) s.publishers = make(map[string]signaling.McuPublisher) - s.publisherIds = make(map[string]string) + s.publisherIds = make(map[signaling.McuPublisher]string) } func (s *ProxySession) clearSubscribers() { @@ -262,7 +262,7 @@ func (s *ProxySession) clearSubscribers() { } }(s.subscribers) s.subscribers = make(map[string]signaling.McuSubscriber) - s.subscriberIds = make(map[string]string) + s.subscriberIds = make(map[signaling.McuSubscriber]string) } func (s *ProxySession) NotifyDisconnected() { From 21199936062c7e0b3884b5f969b6b5d62f628956 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Mon, 26 Oct 2020 09:51:33 +0100 Subject: [PATCH 2/2] Use publisher/subscriber ids that stay the same across reconnections. Otherwise it could happen that objects were kept in maps (e.g. on the proxy server) if they reconnected to the WebRTC gateway during their lifetime. This resulted in incorrect load calculations. --- src/signaling/mcu_janus.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/signaling/mcu_janus.go b/src/signaling/mcu_janus.go index 50d53b7..19dd96e 100644 --- a/src/signaling/mcu_janus.go +++ b/src/signaling/mcu_janus.go @@ -150,6 +150,7 @@ type mcuJanus struct { closeChan chan bool + clientId uint64 muClients sync.Mutex clients map[clientInterface]bool @@ -430,6 +431,7 @@ type mcuJanusClient struct { listener McuListener mu sync.Mutex + id uint64 session uint64 roomId uint64 streamType string @@ -447,7 +449,7 @@ type mcuJanusClient struct { } func (c *mcuJanusClient) Id() string { - return strconv.FormatUint(c.handleId, 10) + return strconv.FormatUint(c.id, 10) } func (c *mcuJanusClient) StreamType() string { @@ -652,6 +654,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st mcu: m, listener: listener, + id: atomic.AddUint64(&m.clientId, 1), session: session, roomId: roomId, streamType: streamType, @@ -676,6 +679,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st if err := client.publishNats("created"); err != nil { log.Printf("Could not publish \"created\" event for publisher %s: %s\n", id, err) } + log.Printf("Publisher %s is using handle %d", client.id, client.handleId) go client.run(handle, client.closeChan) return client, nil } @@ -743,7 +747,7 @@ func (p *mcuJanusPublisher) NotifyReconnected() { p.mcu.mu.Lock() p.mcu.publisherRoomIds[p.id+"|"+p.streamType] = roomId p.mcu.mu.Unlock() - log.Printf("Publisher %s reconnected\n", p.id) + log.Printf("Publisher %s reconnected on handle %d", p.id, p.handleId) } func (p *mcuJanusPublisher) Close(ctx context.Context) { @@ -945,6 +949,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ mcu: m, listener: listener, + id: atomic.AddUint64(&m.clientId, 1), roomId: roomId, streamType: streamType, @@ -1024,7 +1029,7 @@ func (p *mcuJanusSubscriber) NotifyReconnected() { p.handle = handle p.handleId = handle.Id p.roomId = roomId - log.Printf("Reconnected subscriber for publisher %s\n", p.publisher) + log.Printf("Subscriber %d for publisher %s reconnected on handle %d", p.id, p.publisher, p.handleId) } func (p *mcuJanusSubscriber) Close(ctx context.Context) { @@ -1093,7 +1098,7 @@ retry: p.roomId = roomId p.closeChan = make(chan bool, 1) go p.run(p.handle, p.closeChan) - log.Printf("Already connected as subscriber for %s, leaving and re-joining", p.streamType) + log.Printf("Already connected subscriber %d for %s, leaving and re-joining on handle %d", p.id, p.streamType, p.handleId) goto retry case JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM: fallthrough