Close publishers/subscribers if proxy session is closed.

This commit is contained in:
Joachim Bauch 2021-08-10 09:24:59 +02:00
parent 6841d1495f
commit 4c77ae04ef
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
2 changed files with 18 additions and 4 deletions

View file

@ -934,8 +934,11 @@ func (s *ProxyServer) DeleteSession(id uint64) {
}
func (s *ProxyServer) deleteSessionLocked(id uint64) {
delete(s.sessions, id)
statsSessionsCurrent.Dec()
if session, found := s.sessions[id]; found {
delete(s.sessions, id)
session.Close()
statsSessionsCurrent.Dec()
}
}
func (s *ProxyServer) StoreClient(id string, client signaling.McuClient) {

View file

@ -95,6 +95,11 @@ func (s *ProxySession) MarkUsed() {
atomic.StoreInt64(&s.lastUsed, now.UnixNano())
}
func (s *ProxySession) Close() {
s.clearPublishers()
s.clearSubscribers()
}
func (s *ProxySession) SetClient(client *ProxyClient) *ProxyClient {
s.clientLock.Lock()
prev := s.client
@ -250,7 +255,10 @@ func (s *ProxySession) clearPublishers() {
defer s.publishersLock.Unlock()
go func(publishers map[string]signaling.McuPublisher) {
for _, publisher := range publishers {
for id, publisher := range publishers {
if s.proxy.DeleteClient(id, publisher) {
statsPublishersCurrent.WithLabelValues(publisher.StreamType()).Dec()
}
publisher.Close(context.Background())
}
}(s.publishers)
@ -263,7 +271,10 @@ func (s *ProxySession) clearSubscribers() {
defer s.publishersLock.Unlock()
go func(subscribers map[string]signaling.McuSubscriber) {
for _, subscriber := range subscribers {
for id, subscriber := range subscribers {
if s.proxy.DeleteClient(id, subscriber) {
statsSubscribersCurrent.WithLabelValues(subscriber.StreamType()).Dec()
}
subscriber.Close(context.Background())
}
}(s.subscribers)