Expose publisher streams by type through metrics.

This commit is contained in:
Joachim Bauch 2021-04-21 14:50:41 +02:00
parent c01caa94e6
commit f758f8a5e7
No known key found for this signature in database
GPG Key ID: 77C1D22D53E15F02
2 changed files with 54 additions and 5 deletions

View File

@ -449,6 +449,7 @@ type mcuJanusClient struct {
handleDetached func(event *janus.DetachedMsg)
handleConnected func(event *janus.WebRTCUpMsg)
handleSlowLink func(event *janus.SlowLinkMsg)
handleMedia func(event *janus.MediaMsg)
}
func (c *mcuJanusClient) Id() string {
@ -493,7 +494,7 @@ loop:
case *janus.DetachedMsg:
c.handleDetached(t)
case *janus.MediaMsg:
// Ignore
c.handleMedia(t)
case *janus.WebRTCUpMsg:
c.handleConnected(t)
case *janus.SlowLinkMsg:
@ -608,8 +609,10 @@ func (c *mcuJanusClient) selectStream(ctx context.Context, substream int, tempor
type mcuJanusPublisher struct {
mcuJanusClient
id string
bitrate int
id string
bitrate int
aciveStreamsLock sync.Mutex
activeStreams map[string]bool
}
func min(a, b int) int {
@ -713,14 +716,16 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
closeChan: make(chan bool, 1),
deferred: make(chan func(), 64),
},
id: id,
bitrate: bitrate,
id: id,
bitrate: bitrate,
activeStreams: make(map[string]bool),
}
client.mcuJanusClient.handleEvent = client.handleEvent
client.mcuJanusClient.handleHangup = client.handleHangup
client.mcuJanusClient.handleDetached = client.handleDetached
client.mcuJanusClient.handleConnected = client.handleConnected
client.mcuJanusClient.handleSlowLink = client.handleSlowLink
client.mcuJanusClient.handleMedia = client.handleMedia
m.registerClient(client)
log.Printf("Publisher %s is using handle %d", client.id, client.handleId)
@ -774,6 +779,31 @@ func (p *mcuJanusPublisher) handleSlowLink(event *janus.SlowLinkMsg) {
}
}
func (p *mcuJanusPublisher) handleMedia(event *janus.MediaMsg) {
mediaType := event.Type
if mediaType == "video" && p.streamType == "screen" {
// We want to differentiate between audio, video and screensharing
mediaType = p.streamType
}
delta := 0.0
p.aciveStreamsLock.Lock()
prev := p.activeStreams[mediaType]
if event.Receiving != prev {
if event.Receiving {
delta = 1
p.activeStreams[mediaType] = true
} else {
delta = -1
delete(p.activeStreams, mediaType)
}
}
p.aciveStreamsLock.Unlock()
if delta != 0 {
statsMcuPublisherStreamTypesCurrent.WithLabelValues(mediaType).Add(delta)
}
}
func (p *mcuJanusPublisher) NotifyReconnected() {
ctx := context.TODO()
handle, session, roomId, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate)
@ -813,6 +843,13 @@ func (p *mcuJanusPublisher) Close(ctx context.Context) {
p.closeClient(ctx)
p.mu.Unlock()
p.aciveStreamsLock.Lock()
for mediaType := range p.activeStreams {
statsMcuPublisherStreamTypesCurrent.WithLabelValues(mediaType).Dec()
}
p.activeStreams = make(map[string]bool)
p.aciveStreamsLock.Unlock()
if notify {
statsPublishersCurrent.WithLabelValues(p.streamType).Dec()
p.mcu.unregisterClient(p)
@ -930,6 +967,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ
client.mcuJanusClient.handleDetached = client.handleDetached
client.mcuJanusClient.handleConnected = client.handleConnected
client.mcuJanusClient.handleSlowLink = client.handleSlowLink
client.mcuJanusClient.handleMedia = client.handleMedia
m.registerClient(client)
go client.run(handle, client.closeChan)
statsSubscribersCurrent.WithLabelValues(streamType).Inc()
@ -982,6 +1020,10 @@ func (p *mcuJanusSubscriber) handleSlowLink(event *janus.SlowLinkMsg) {
}
}
func (p *mcuJanusSubscriber) handleMedia(event *janus.MediaMsg) {
// Only triggered for publishers
}
func (p *mcuJanusSubscriber) NotifyReconnected() {
ctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel()

View File

@ -62,6 +62,12 @@ var (
Name: "messages_total",
Help: "The total number of MCU messages",
}, []string{"type"})
statsMcuPublisherStreamTypesCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "publisher_streams",
Help: "The current number of published media streams",
}, []string{"type"})
commonMcuStats = []prometheus.Collector{
statsPublishersCurrent,
@ -70,6 +76,7 @@ var (
statsSubscribersTotal,
statsWaitingForPublisherTotal,
statsMcuMessagesTotal,
statsMcuPublisherStreamTypesCurrent,
}
statsConnectedProxyBackendsCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{