diff --git a/mcu_janus.go b/mcu_janus.go index 6278855..be998dc 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -449,6 +449,7 @@ type mcuJanusClient struct { handleDetached func(event *janus.DetachedMsg) handleConnected func(event *janus.WebRTCUpMsg) handleSlowLink func(event *janus.SlowLinkMsg) + handleMedia func(event *janus.MediaMsg) } func (c *mcuJanusClient) Id() string { @@ -493,7 +494,7 @@ loop: case *janus.DetachedMsg: c.handleDetached(t) case *janus.MediaMsg: - // Ignore + c.handleMedia(t) case *janus.WebRTCUpMsg: c.handleConnected(t) case *janus.SlowLinkMsg: @@ -608,8 +609,10 @@ func (c *mcuJanusClient) selectStream(ctx context.Context, substream int, tempor type mcuJanusPublisher struct { mcuJanusClient - id string - bitrate int + id string + bitrate int + aciveStreamsLock sync.Mutex + activeStreams map[string]bool } func min(a, b int) int { @@ -713,14 +716,16 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st closeChan: make(chan bool, 1), deferred: make(chan func(), 64), }, - id: id, - bitrate: bitrate, + id: id, + bitrate: bitrate, + activeStreams: make(map[string]bool), } client.mcuJanusClient.handleEvent = client.handleEvent client.mcuJanusClient.handleHangup = client.handleHangup client.mcuJanusClient.handleDetached = client.handleDetached client.mcuJanusClient.handleConnected = client.handleConnected client.mcuJanusClient.handleSlowLink = client.handleSlowLink + client.mcuJanusClient.handleMedia = client.handleMedia m.registerClient(client) log.Printf("Publisher %s is using handle %d", client.id, client.handleId) @@ -774,6 +779,31 @@ func (p *mcuJanusPublisher) handleSlowLink(event *janus.SlowLinkMsg) { } } +func (p *mcuJanusPublisher) handleMedia(event *janus.MediaMsg) { + mediaType := event.Type + if mediaType == "video" && p.streamType == "screen" { + // We want to differentiate between audio, video and screensharing + mediaType = p.streamType + } + + delta := 0.0 + p.aciveStreamsLock.Lock() + prev := p.activeStreams[mediaType] + if event.Receiving != prev { + if event.Receiving { + delta = 1 + p.activeStreams[mediaType] = true + } else { + delta = -1 + delete(p.activeStreams, mediaType) + } + } + p.aciveStreamsLock.Unlock() + if delta != 0 { + statsMcuPublisherStreamTypesCurrent.WithLabelValues(mediaType).Add(delta) + } +} + func (p *mcuJanusPublisher) NotifyReconnected() { ctx := context.TODO() handle, session, roomId, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate) @@ -813,6 +843,13 @@ func (p *mcuJanusPublisher) Close(ctx context.Context) { p.closeClient(ctx) p.mu.Unlock() + p.aciveStreamsLock.Lock() + for mediaType := range p.activeStreams { + statsMcuPublisherStreamTypesCurrent.WithLabelValues(mediaType).Dec() + } + p.activeStreams = make(map[string]bool) + p.aciveStreamsLock.Unlock() + if notify { statsPublishersCurrent.WithLabelValues(p.streamType).Dec() p.mcu.unregisterClient(p) @@ -930,6 +967,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ client.mcuJanusClient.handleDetached = client.handleDetached client.mcuJanusClient.handleConnected = client.handleConnected client.mcuJanusClient.handleSlowLink = client.handleSlowLink + client.mcuJanusClient.handleMedia = client.handleMedia m.registerClient(client) go client.run(handle, client.closeChan) statsSubscribersCurrent.WithLabelValues(streamType).Inc() @@ -982,6 +1020,10 @@ func (p *mcuJanusSubscriber) handleSlowLink(event *janus.SlowLinkMsg) { } } +func (p *mcuJanusSubscriber) handleMedia(event *janus.MediaMsg) { + // Only triggered for publishers +} + func (p *mcuJanusSubscriber) NotifyReconnected() { ctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() diff --git a/mcu_stats_prometheus.go b/mcu_stats_prometheus.go index fc76229..f6a3a45 100644 --- a/mcu_stats_prometheus.go +++ b/mcu_stats_prometheus.go @@ -62,6 +62,12 @@ var ( Name: "messages_total", Help: "The total number of MCU messages", }, []string{"type"}) + statsMcuPublisherStreamTypesCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "publisher_streams", + Help: "The current number of published media streams", + }, []string{"type"}) commonMcuStats = []prometheus.Collector{ statsPublishersCurrent, @@ -70,6 +76,7 @@ var ( statsSubscribersTotal, statsWaitingForPublisherTotal, statsMcuMessagesTotal, + statsMcuPublisherStreamTypesCurrent, } statsConnectedProxyBackendsCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{