diff --git a/proxy/proxy_server.go b/proxy/proxy_server.go index 91e9cdb..a56a002 100644 --- a/proxy/proxy_server.go +++ b/proxy/proxy_server.go @@ -934,8 +934,11 @@ func (s *ProxyServer) DeleteSession(id uint64) { } func (s *ProxyServer) deleteSessionLocked(id uint64) { - delete(s.sessions, id) - statsSessionsCurrent.Dec() + if session, found := s.sessions[id]; found { + delete(s.sessions, id) + session.Close() + statsSessionsCurrent.Dec() + } } func (s *ProxyServer) StoreClient(id string, client signaling.McuClient) { diff --git a/proxy/proxy_session.go b/proxy/proxy_session.go index 98f1f19..7f3f84c 100644 --- a/proxy/proxy_session.go +++ b/proxy/proxy_session.go @@ -95,6 +95,11 @@ func (s *ProxySession) MarkUsed() { atomic.StoreInt64(&s.lastUsed, now.UnixNano()) } +func (s *ProxySession) Close() { + s.clearPublishers() + s.clearSubscribers() +} + func (s *ProxySession) SetClient(client *ProxyClient) *ProxyClient { s.clientLock.Lock() prev := s.client @@ -250,7 +255,10 @@ func (s *ProxySession) clearPublishers() { defer s.publishersLock.Unlock() go func(publishers map[string]signaling.McuPublisher) { - for _, publisher := range publishers { + for id, publisher := range publishers { + if s.proxy.DeleteClient(id, publisher) { + statsPublishersCurrent.WithLabelValues(publisher.StreamType()).Dec() + } publisher.Close(context.Background()) } }(s.publishers) @@ -263,7 +271,10 @@ func (s *ProxySession) clearSubscribers() { defer s.publishersLock.Unlock() go func(subscribers map[string]signaling.McuSubscriber) { - for _, subscriber := range subscribers { + for id, subscriber := range subscribers { + if s.proxy.DeleteClient(id, subscriber) { + statsSubscribersCurrent.WithLabelValues(subscriber.StreamType()).Dec() + } subscriber.Close(context.Background()) } }(s.subscribers)