diff --git a/src/proxy/proxy_session.go b/src/proxy/proxy_session.go index ea91949..2315389 100644 --- a/src/proxy/proxy_session.go +++ b/src/proxy/proxy_session.go @@ -48,11 +48,11 @@ type ProxySession struct { publishersLock sync.Mutex publishers map[string]signaling.McuPublisher - publisherIds map[string]string + publisherIds map[signaling.McuPublisher]string subscribersLock sync.Mutex subscribers map[string]signaling.McuSubscriber - subscriberIds map[string]string + subscriberIds map[signaling.McuSubscriber]string } func NewProxySession(proxy *ProxyServer, sid uint64, id string) *ProxySession { @@ -63,10 +63,10 @@ func NewProxySession(proxy *ProxyServer, sid uint64, id string) *ProxySession { lastUsed: time.Now().UnixNano(), publishers: make(map[string]signaling.McuPublisher), - publisherIds: make(map[string]string), + publisherIds: make(map[signaling.McuPublisher]string), subscribers: make(map[string]signaling.McuSubscriber), - subscriberIds: make(map[string]string), + subscriberIds: make(map[signaling.McuSubscriber]string), } } @@ -200,20 +200,20 @@ func (s *ProxySession) StorePublisher(ctx context.Context, id string, publisher defer s.publishersLock.Unlock() s.publishers[id] = publisher - s.publisherIds[publisher.Id()] = id + s.publisherIds[publisher] = id } func (s *ProxySession) DeletePublisher(publisher signaling.McuPublisher) string { s.publishersLock.Lock() defer s.publishersLock.Unlock() - id, found := s.publisherIds[publisher.Id()] + id, found := s.publisherIds[publisher] if !found { return "" } delete(s.publishers, id) - delete(s.publisherIds, publisher.Id()) + delete(s.publisherIds, publisher) return id } @@ -222,20 +222,20 @@ func (s *ProxySession) StoreSubscriber(ctx context.Context, id string, subscribe defer s.subscribersLock.Unlock() s.subscribers[id] = subscriber - s.subscriberIds[subscriber.Id()] = id + s.subscriberIds[subscriber] = id } func (s *ProxySession) DeleteSubscriber(subscriber signaling.McuSubscriber) string { s.subscribersLock.Lock() defer s.subscribersLock.Unlock() - id, found := s.subscriberIds[subscriber.Id()] + id, found := s.subscriberIds[subscriber] if !found { return "" } delete(s.subscribers, id) - delete(s.subscriberIds, subscriber.Id()) + delete(s.subscriberIds, subscriber) return id } @@ -249,7 +249,7 @@ func (s *ProxySession) clearPublishers() { } }(s.publishers) s.publishers = make(map[string]signaling.McuPublisher) - s.publisherIds = make(map[string]string) + s.publisherIds = make(map[signaling.McuPublisher]string) } func (s *ProxySession) clearSubscribers() { @@ -262,7 +262,7 @@ func (s *ProxySession) clearSubscribers() { } }(s.subscribers) s.subscribers = make(map[string]signaling.McuSubscriber) - s.subscriberIds = make(map[string]string) + s.subscriberIds = make(map[signaling.McuSubscriber]string) } func (s *ProxySession) NotifyDisconnected() { diff --git a/src/signaling/mcu_janus.go b/src/signaling/mcu_janus.go index 50d53b7..19dd96e 100644 --- a/src/signaling/mcu_janus.go +++ b/src/signaling/mcu_janus.go @@ -150,6 +150,7 @@ type mcuJanus struct { closeChan chan bool + clientId uint64 muClients sync.Mutex clients map[clientInterface]bool @@ -430,6 +431,7 @@ type mcuJanusClient struct { listener McuListener mu sync.Mutex + id uint64 session uint64 roomId uint64 streamType string @@ -447,7 +449,7 @@ type mcuJanusClient struct { } func (c *mcuJanusClient) Id() string { - return strconv.FormatUint(c.handleId, 10) + return strconv.FormatUint(c.id, 10) } func (c *mcuJanusClient) StreamType() string { @@ -652,6 +654,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st mcu: m, listener: listener, + id: atomic.AddUint64(&m.clientId, 1), session: session, roomId: roomId, streamType: streamType, @@ -676,6 +679,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st if err := client.publishNats("created"); err != nil { log.Printf("Could not publish \"created\" event for publisher %s: %s\n", id, err) } + log.Printf("Publisher %s is using handle %d", client.id, client.handleId) go client.run(handle, client.closeChan) return client, nil } @@ -743,7 +747,7 @@ func (p *mcuJanusPublisher) NotifyReconnected() { p.mcu.mu.Lock() p.mcu.publisherRoomIds[p.id+"|"+p.streamType] = roomId p.mcu.mu.Unlock() - log.Printf("Publisher %s reconnected\n", p.id) + log.Printf("Publisher %s reconnected on handle %d", p.id, p.handleId) } func (p *mcuJanusPublisher) Close(ctx context.Context) { @@ -945,6 +949,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ mcu: m, listener: listener, + id: atomic.AddUint64(&m.clientId, 1), roomId: roomId, streamType: streamType, @@ -1024,7 +1029,7 @@ func (p *mcuJanusSubscriber) NotifyReconnected() { p.handle = handle p.handleId = handle.Id p.roomId = roomId - log.Printf("Reconnected subscriber for publisher %s\n", p.publisher) + log.Printf("Subscriber %d for publisher %s reconnected on handle %d", p.id, p.publisher, p.handleId) } func (p *mcuJanusSubscriber) Close(ctx context.Context) { @@ -1093,7 +1098,7 @@ retry: p.roomId = roomId p.closeChan = make(chan bool, 1) go p.run(p.handle, p.closeChan) - log.Printf("Already connected as subscriber for %s, leaving and re-joining", p.streamType) + log.Printf("Already connected subscriber %d for %s, leaving and re-joining on handle %d", p.id, p.streamType, p.handleId) goto retry case JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM: fallthrough