Migrate to channel waiter helper class.

This commit is contained in:
Joachim Bauch 2023-01-19 15:35:11 +01:00
parent 5e7dec014a
commit 20228b176f
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
2 changed files with 10 additions and 53 deletions

View file

@ -76,8 +76,7 @@ type ClientSession struct {
room unsafe.Pointer room unsafe.Pointer
roomSessionId string roomSessionId string
publisherWaitersId uint64 publisherWaiters ChannelWaiters
publisherWaiters map[uint64]chan bool
publishers map[string]McuPublisher publishers map[string]McuPublisher
subscribers map[string]McuSubscriber subscribers map[string]McuSubscriber
@ -832,26 +831,6 @@ func (s *ClientSession) checkOfferTypeLocked(streamType string, data *MessageCli
return 0, nil return 0, nil
} }
func (s *ClientSession) wakeupPublisherWaiters() {
for _, ch := range s.publisherWaiters {
ch <- true
}
}
func (s *ClientSession) addPublisherWaiter(ch chan bool) uint64 {
if s.publisherWaiters == nil {
s.publisherWaiters = make(map[uint64]chan bool)
}
id := s.publisherWaitersId + 1
s.publisherWaitersId = id
s.publisherWaiters[id] = ch
return id
}
func (s *ClientSession) removePublisherWaiter(id uint64) {
delete(s.publisherWaiters, id)
}
func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, streamType string, data *MessageClientMessageData) (McuPublisher, error) { func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, streamType string, data *MessageClientMessageData) (McuPublisher, error) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -900,7 +879,7 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea
s.publishers[streamType] = publisher s.publishers[streamType] = publisher
} }
log.Printf("Publishing %s as %s for session %s", streamType, publisher.Id(), s.PublicId()) log.Printf("Publishing %s as %s for session %s", streamType, publisher.Id(), s.PublicId())
s.wakeupPublisherWaiters() s.publisherWaiters.Wakeup()
} else { } else {
publisher.SetMedia(mediaTypes) publisher.SetMedia(mediaTypes)
} }
@ -928,9 +907,9 @@ func (s *ClientSession) GetOrWaitForPublisher(ctx context.Context, streamType st
return publisher return publisher
} }
ch := make(chan bool, 1) ch := make(chan struct{}, 1)
id := s.addPublisherWaiter(ch) id := s.publisherWaiters.Add(ch)
defer s.removePublisherWaiter(id) defer s.publisherWaiters.Remove(id)
for { for {
s.mu.Unlock() s.mu.Unlock()

View file

@ -1124,8 +1124,7 @@ type mcuProxy struct {
mu sync.RWMutex mu sync.RWMutex
publishers map[string]*mcuProxyConnection publishers map[string]*mcuProxyConnection
publisherWaitersId uint64 publisherWaiters ChannelWaiters
publisherWaiters map[uint64]chan bool
continentsMap atomic.Value continentsMap atomic.Value
@ -1193,8 +1192,6 @@ func NewMcuProxy(config *goconf.ConfigFile, etcdClient *EtcdClient, rpcClients *
publishers: make(map[string]*mcuProxyConnection), publishers: make(map[string]*mcuProxyConnection),
publisherWaiters: make(map[uint64]chan bool),
rpcClients: rpcClients, rpcClients: rpcClients,
} }
@ -1861,25 +1858,6 @@ func (m *mcuProxy) removePublisher(publisher *mcuProxyPublisher) {
delete(m.publishers, publisher.id+"|"+publisher.StreamType()) delete(m.publishers, publisher.id+"|"+publisher.StreamType())
} }
func (m *mcuProxy) wakeupWaiters() {
m.mu.RLock()
defer m.mu.RUnlock()
for _, ch := range m.publisherWaiters {
ch <- true
}
}
func (m *mcuProxy) addWaiter(ch chan bool) uint64 {
id := m.publisherWaitersId + 1
m.publisherWaitersId = id
m.publisherWaiters[id] = ch
return id
}
func (m *mcuProxy) removeWaiter(id uint64) {
delete(m.publisherWaiters, id)
}
func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) { func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
connections := m.getSortedConnections(initiator) connections := m.getSortedConnections(initiator)
for _, conn := range connections { for _, conn := range connections {
@ -1910,7 +1888,7 @@ func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id st
m.mu.Lock() m.mu.Lock()
m.publishers[id+"|"+streamType] = conn m.publishers[id+"|"+streamType] = conn
m.mu.Unlock() m.mu.Unlock()
m.wakeupWaiters() m.publisherWaiters.Wakeup()
return publisher, nil return publisher, nil
} }
@ -1935,9 +1913,9 @@ func (m *mcuProxy) waitForPublisherConnection(ctx context.Context, publisher str
return conn return conn
} }
ch := make(chan bool, 1) ch := make(chan struct{}, 1)
id := m.addWaiter(ch) id := m.publisherWaiters.Add(ch)
defer m.removeWaiter(id) defer m.publisherWaiters.Remove(id)
statsWaitingForPublisherTotal.WithLabelValues(streamType).Inc() statsWaitingForPublisherTotal.WithLabelValues(streamType).Inc()
for { for {