Also track subscriber streams by type.

This commit is contained in:
Joachim Bauch 2021-04-23 14:21:01 +02:00
parent f758f8a5e7
commit 70f9f7ea91
No known key found for this signature in database
GPG Key ID: 77C1D22D53E15F02
5 changed files with 259 additions and 29 deletions

View File

@ -606,13 +606,103 @@ func (c *mcuJanusClient) selectStream(ctx context.Context, substream int, tempor
callback(nil, nil)
}
type publisherStatsCounter struct {
mu sync.Mutex
streamTypes map[string]bool
subscribers map[string]bool
}
func (c *publisherStatsCounter) Reset() {
c.mu.Lock()
defer c.mu.Unlock()
count := len(c.subscribers)
for streamType := range c.streamTypes {
statsMcuPublisherStreamTypesCurrent.WithLabelValues(streamType).Dec()
statsMcuSubscriberStreamTypesCurrent.WithLabelValues(streamType).Sub(float64(count))
}
c.streamTypes = nil
c.subscribers = nil
}
func (c *publisherStatsCounter) EnableStream(streamType string, enable bool) {
c.mu.Lock()
defer c.mu.Unlock()
if enable == c.streamTypes[streamType] {
return
}
if enable {
if c.streamTypes == nil {
c.streamTypes = make(map[string]bool)
}
c.streamTypes[streamType] = true
statsMcuPublisherStreamTypesCurrent.WithLabelValues(streamType).Inc()
statsMcuSubscriberStreamTypesCurrent.WithLabelValues(streamType).Add(float64(len(c.subscribers)))
} else {
delete(c.streamTypes, streamType)
statsMcuPublisherStreamTypesCurrent.WithLabelValues(streamType).Dec()
statsMcuSubscriberStreamTypesCurrent.WithLabelValues(streamType).Sub(float64(len(c.subscribers)))
}
}
func (c *publisherStatsCounter) AddSubscriber(id string) {
c.mu.Lock()
defer c.mu.Unlock()
if c.subscribers[id] {
return
}
if c.subscribers == nil {
c.subscribers = make(map[string]bool)
}
c.subscribers[id] = true
for streamType := range c.streamTypes {
statsMcuSubscriberStreamTypesCurrent.WithLabelValues(streamType).Inc()
}
}
func (c *publisherStatsCounter) RemoveSubscriber(id string) {
c.mu.Lock()
defer c.mu.Unlock()
if !c.subscribers[id] {
return
}
delete(c.subscribers, id)
for streamType := range c.streamTypes {
statsMcuSubscriberStreamTypesCurrent.WithLabelValues(streamType).Dec()
}
}
type mcuJanusPublisher struct {
mcuJanusClient
id string
bitrate int
aciveStreamsLock sync.Mutex
activeStreams map[string]bool
stats publisherStatsCounter
}
func (m *mcuJanus) SubscriberConnected(id string, publisher string, streamType string) {
m.mu.Lock()
defer m.mu.Unlock()
if p, found := m.publishers[publisher+"|"+streamType]; found {
p.stats.AddSubscriber(id)
}
}
func (m *mcuJanus) SubscriberDisconnected(id string, publisher string, streamType string) {
m.mu.Lock()
defer m.mu.Unlock()
if p, found := m.publishers[publisher+"|"+streamType]; found {
p.stats.RemoveSubscriber(id)
}
}
func min(a, b int) int {
@ -718,7 +808,6 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
},
id: id,
bitrate: bitrate,
activeStreams: make(map[string]bool),
}
client.mcuJanusClient.handleEvent = client.handleEvent
client.mcuJanusClient.handleHangup = client.handleHangup
@ -786,22 +875,7 @@ func (p *mcuJanusPublisher) handleMedia(event *janus.MediaMsg) {
mediaType = p.streamType
}
delta := 0.0
p.aciveStreamsLock.Lock()
prev := p.activeStreams[mediaType]
if event.Receiving != prev {
if event.Receiving {
delta = 1
p.activeStreams[mediaType] = true
} else {
delta = -1
delete(p.activeStreams, mediaType)
}
}
p.aciveStreamsLock.Unlock()
if delta != 0 {
statsMcuPublisherStreamTypesCurrent.WithLabelValues(mediaType).Add(delta)
}
p.stats.EnableStream(mediaType, event.Receiving)
}
func (p *mcuJanusPublisher) NotifyReconnected() {
@ -843,18 +917,14 @@ func (p *mcuJanusPublisher) Close(ctx context.Context) {
p.closeClient(ctx)
p.mu.Unlock()
p.aciveStreamsLock.Lock()
for mediaType := range p.activeStreams {
statsMcuPublisherStreamTypesCurrent.WithLabelValues(mediaType).Dec()
}
p.activeStreams = make(map[string]bool)
p.aciveStreamsLock.Unlock()
p.stats.Reset()
if notify {
statsPublishersCurrent.WithLabelValues(p.streamType).Dec()
p.mcu.unregisterClient(p)
p.listener.PublisherClosed(p)
}
p.mcuJanusClient.Close(ctx)
}
func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) {
@ -1010,6 +1080,7 @@ func (p *mcuJanusSubscriber) handleDetached(event *janus.DetachedMsg) {
func (p *mcuJanusSubscriber) handleConnected(event *janus.WebRTCUpMsg) {
log.Printf("Subscriber %d received connected", p.handleId)
p.mcu.SubscriberConnected(p.Id(), p.publisher, p.streamType)
}
func (p *mcuJanusSubscriber) handleSlowLink(event *janus.SlowLinkMsg) {
@ -1047,10 +1118,12 @@ func (p *mcuJanusSubscriber) Close(ctx context.Context) {
p.mu.Unlock()
if closed {
p.mcu.SubscriberDisconnected(p.Id(), p.publisher, p.streamType)
statsSubscribersCurrent.WithLabelValues(p.streamType).Dec()
}
p.mcu.unregisterClient(p)
p.listener.SubscriberClosed(p)
p.mcuJanusClient.Close(ctx)
}
func (p *mcuJanusSubscriber) joinRoom(ctx context.Context, callback func(error, map[string]interface{})) {

109
mcu_janus_test.go Normal file
View File

@ -0,0 +1,109 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2021 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import (
"testing"
)
func TestPublisherStatsCounter(t *testing.T) {
RegisterJanusMcuStats()
var c publisherStatsCounter
c.Reset()
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 0)
c.EnableStream("audio", false)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 0)
c.EnableStream("audio", true)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1)
c.EnableStream("audio", true)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1)
c.EnableStream("video", true)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1)
c.EnableStream("audio", false)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 0)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1)
c.EnableStream("audio", false)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 0)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1)
c.AddSubscriber("1")
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 0)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 0)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 1)
c.EnableStream("audio", true)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 1)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 1)
c.AddSubscriber("1")
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 1)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 1)
c.AddSubscriber("2")
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 2)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 2)
c.RemoveSubscriber("3")
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 2)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 2)
c.RemoveSubscriber("1")
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 1)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 1)
c.AddSubscriber("1")
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 2)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 2)
c.EnableStream("audio", false)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 0)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 0)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 2)
c.EnableStream("audio", true)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 2)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 2)
c.EnableStream("audio", false)
c.EnableStream("video", false)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 0)
checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 0)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 0)
checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 0)
}

View File

@ -62,6 +62,12 @@ var (
Name: "messages_total",
Help: "The total number of MCU messages",
}, []string{"type"})
statsMcuSubscriberStreamTypesCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "subscriber_streams",
Help: "The current number of subscribed media streams",
}, []string{"type"})
statsMcuPublisherStreamTypesCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "mcu",
@ -76,6 +82,7 @@ var (
statsSubscribersTotal,
statsWaitingForPublisherTotal,
statsMcuMessagesTotal,
statsMcuSubscriberStreamTypesCurrent,
statsMcuPublisherStreamTypesCurrent,
}

View File

@ -47,6 +47,8 @@ type NatsMessage struct {
Room *BackendServerRoomRequest `json:"room,omitempty"`
Permissions []Permission `json:"permissions,omitempty"`
Id string `json:"id"`
}
type NatsSubscription interface {

39
stats_prometheus_test.go Normal file
View File

@ -0,0 +1,39 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2021 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import (
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
)
func checkStatsValue(t *testing.T, collector prometheus.Collector, value float64) {
ch := make(chan *prometheus.Desc, 1)
collector.Describe(ch)
desc := <-ch
v := testutil.ToFloat64(collector)
if v != value {
t.Errorf("Expected value %f for %s, got %f", value, desc, v)
}
}