diff --git a/clientsession.go b/clientsession.go index d8b3a45..a10e262 100644 --- a/clientsession.go +++ b/clientsession.go @@ -76,8 +76,7 @@ type ClientSession struct { room unsafe.Pointer roomSessionId string - publisherWaitersId uint64 - publisherWaiters map[uint64]chan bool + publisherWaiters ChannelWaiters publishers map[string]McuPublisher subscribers map[string]McuSubscriber @@ -832,26 +831,6 @@ func (s *ClientSession) checkOfferTypeLocked(streamType string, data *MessageCli return 0, nil } -func (s *ClientSession) wakeupPublisherWaiters() { - for _, ch := range s.publisherWaiters { - ch <- true - } -} - -func (s *ClientSession) addPublisherWaiter(ch chan bool) uint64 { - if s.publisherWaiters == nil { - s.publisherWaiters = make(map[uint64]chan bool) - } - id := s.publisherWaitersId + 1 - s.publisherWaitersId = id - s.publisherWaiters[id] = ch - return id -} - -func (s *ClientSession) removePublisherWaiter(id uint64) { - delete(s.publisherWaiters, id) -} - func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, streamType string, data *MessageClientMessageData) (McuPublisher, error) { s.mu.Lock() defer s.mu.Unlock() @@ -900,7 +879,7 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea s.publishers[streamType] = publisher } log.Printf("Publishing %s as %s for session %s", streamType, publisher.Id(), s.PublicId()) - s.wakeupPublisherWaiters() + s.publisherWaiters.Wakeup() } else { publisher.SetMedia(mediaTypes) } @@ -928,9 +907,9 @@ func (s *ClientSession) GetOrWaitForPublisher(ctx context.Context, streamType st return publisher } - ch := make(chan bool, 1) - id := s.addPublisherWaiter(ch) - defer s.removePublisherWaiter(id) + ch := make(chan struct{}, 1) + id := s.publisherWaiters.Add(ch) + defer s.publisherWaiters.Remove(id) for { s.mu.Unlock() diff --git a/mcu_proxy.go b/mcu_proxy.go index c3e22b7..3e1e9cd 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -1124,8 +1124,7 @@ type mcuProxy struct { mu sync.RWMutex publishers map[string]*mcuProxyConnection - publisherWaitersId uint64 - publisherWaiters map[uint64]chan bool + publisherWaiters ChannelWaiters continentsMap atomic.Value @@ -1193,8 +1192,6 @@ func NewMcuProxy(config *goconf.ConfigFile, etcdClient *EtcdClient, rpcClients * publishers: make(map[string]*mcuProxyConnection), - publisherWaiters: make(map[uint64]chan bool), - rpcClients: rpcClients, } @@ -1861,25 +1858,6 @@ func (m *mcuProxy) removePublisher(publisher *mcuProxyPublisher) { delete(m.publishers, publisher.id+"|"+publisher.StreamType()) } -func (m *mcuProxy) wakeupWaiters() { - m.mu.RLock() - defer m.mu.RUnlock() - for _, ch := range m.publisherWaiters { - ch <- true - } -} - -func (m *mcuProxy) addWaiter(ch chan bool) uint64 { - id := m.publisherWaitersId + 1 - m.publisherWaitersId = id - m.publisherWaiters[id] = ch - return id -} - -func (m *mcuProxy) removeWaiter(id uint64) { - delete(m.publisherWaiters, id) -} - func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) { connections := m.getSortedConnections(initiator) for _, conn := range connections { @@ -1910,7 +1888,7 @@ func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id st m.mu.Lock() m.publishers[id+"|"+streamType] = conn m.mu.Unlock() - m.wakeupWaiters() + m.publisherWaiters.Wakeup() return publisher, nil } @@ -1935,9 +1913,9 @@ func (m *mcuProxy) waitForPublisherConnection(ctx context.Context, publisher str return conn } - ch := make(chan bool, 1) - id := m.addWaiter(ch) - defer m.removeWaiter(id) + ch := make(chan struct{}, 1) + id := m.publisherWaiters.Add(ch) + defer m.publisherWaiters.Remove(id) statsWaitingForPublisherTotal.WithLabelValues(streamType).Inc() for {