diff --git a/mcu_janus.go b/mcu_janus.go index be998dc..81ae35f 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -606,13 +606,103 @@ func (c *mcuJanusClient) selectStream(ctx context.Context, substream int, tempor callback(nil, nil) } +type publisherStatsCounter struct { + mu sync.Mutex + + streamTypes map[string]bool + subscribers map[string]bool +} + +func (c *publisherStatsCounter) Reset() { + c.mu.Lock() + defer c.mu.Unlock() + + count := len(c.subscribers) + for streamType := range c.streamTypes { + statsMcuPublisherStreamTypesCurrent.WithLabelValues(streamType).Dec() + statsMcuSubscriberStreamTypesCurrent.WithLabelValues(streamType).Sub(float64(count)) + } + c.streamTypes = nil + c.subscribers = nil +} + +func (c *publisherStatsCounter) EnableStream(streamType string, enable bool) { + c.mu.Lock() + defer c.mu.Unlock() + + if enable == c.streamTypes[streamType] { + return + } + + if enable { + if c.streamTypes == nil { + c.streamTypes = make(map[string]bool) + } + c.streamTypes[streamType] = true + statsMcuPublisherStreamTypesCurrent.WithLabelValues(streamType).Inc() + statsMcuSubscriberStreamTypesCurrent.WithLabelValues(streamType).Add(float64(len(c.subscribers))) + } else { + delete(c.streamTypes, streamType) + statsMcuPublisherStreamTypesCurrent.WithLabelValues(streamType).Dec() + statsMcuSubscriberStreamTypesCurrent.WithLabelValues(streamType).Sub(float64(len(c.subscribers))) + } +} + +func (c *publisherStatsCounter) AddSubscriber(id string) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.subscribers[id] { + return + } + + if c.subscribers == nil { + c.subscribers = make(map[string]bool) + } + c.subscribers[id] = true + for streamType := range c.streamTypes { + statsMcuSubscriberStreamTypesCurrent.WithLabelValues(streamType).Inc() + } +} + +func (c *publisherStatsCounter) RemoveSubscriber(id string) { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.subscribers[id] { + return + } + + delete(c.subscribers, id) + for streamType := range c.streamTypes { + statsMcuSubscriberStreamTypesCurrent.WithLabelValues(streamType).Dec() + } +} + type mcuJanusPublisher struct { mcuJanusClient - id string - bitrate int - aciveStreamsLock sync.Mutex - activeStreams map[string]bool + id string + bitrate int + stats publisherStatsCounter +} + +func (m *mcuJanus) SubscriberConnected(id string, publisher string, streamType string) { + m.mu.Lock() + defer m.mu.Unlock() + + if p, found := m.publishers[publisher+"|"+streamType]; found { + p.stats.AddSubscriber(id) + } +} + +func (m *mcuJanus) SubscriberDisconnected(id string, publisher string, streamType string) { + m.mu.Lock() + defer m.mu.Unlock() + + if p, found := m.publishers[publisher+"|"+streamType]; found { + p.stats.RemoveSubscriber(id) + } } func min(a, b int) int { @@ -716,9 +806,8 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st closeChan: make(chan bool, 1), deferred: make(chan func(), 64), }, - id: id, - bitrate: bitrate, - activeStreams: make(map[string]bool), + id: id, + bitrate: bitrate, } client.mcuJanusClient.handleEvent = client.handleEvent client.mcuJanusClient.handleHangup = client.handleHangup @@ -786,22 +875,7 @@ func (p *mcuJanusPublisher) handleMedia(event *janus.MediaMsg) { mediaType = p.streamType } - delta := 0.0 - p.aciveStreamsLock.Lock() - prev := p.activeStreams[mediaType] - if event.Receiving != prev { - if event.Receiving { - delta = 1 - p.activeStreams[mediaType] = true - } else { - delta = -1 - delete(p.activeStreams, mediaType) - } - } - p.aciveStreamsLock.Unlock() - if delta != 0 { - statsMcuPublisherStreamTypesCurrent.WithLabelValues(mediaType).Add(delta) - } + p.stats.EnableStream(mediaType, event.Receiving) } func (p *mcuJanusPublisher) NotifyReconnected() { @@ -843,18 +917,14 @@ func (p *mcuJanusPublisher) Close(ctx context.Context) { p.closeClient(ctx) p.mu.Unlock() - p.aciveStreamsLock.Lock() - for mediaType := range p.activeStreams { - statsMcuPublisherStreamTypesCurrent.WithLabelValues(mediaType).Dec() - } - p.activeStreams = make(map[string]bool) - p.aciveStreamsLock.Unlock() + p.stats.Reset() if notify { statsPublishersCurrent.WithLabelValues(p.streamType).Dec() p.mcu.unregisterClient(p) p.listener.PublisherClosed(p) } + p.mcuJanusClient.Close(ctx) } func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { @@ -1010,6 +1080,7 @@ func (p *mcuJanusSubscriber) handleDetached(event *janus.DetachedMsg) { func (p *mcuJanusSubscriber) handleConnected(event *janus.WebRTCUpMsg) { log.Printf("Subscriber %d received connected", p.handleId) + p.mcu.SubscriberConnected(p.Id(), p.publisher, p.streamType) } func (p *mcuJanusSubscriber) handleSlowLink(event *janus.SlowLinkMsg) { @@ -1047,10 +1118,12 @@ func (p *mcuJanusSubscriber) Close(ctx context.Context) { p.mu.Unlock() if closed { + p.mcu.SubscriberDisconnected(p.Id(), p.publisher, p.streamType) statsSubscribersCurrent.WithLabelValues(p.streamType).Dec() } p.mcu.unregisterClient(p) p.listener.SubscriberClosed(p) + p.mcuJanusClient.Close(ctx) } func (p *mcuJanusSubscriber) joinRoom(ctx context.Context, callback func(error, map[string]interface{})) { diff --git a/mcu_janus_test.go b/mcu_janus_test.go new file mode 100644 index 0000000..1d5321e --- /dev/null +++ b/mcu_janus_test.go @@ -0,0 +1,109 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2021 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "testing" +) + +func TestPublisherStatsCounter(t *testing.T) { + RegisterJanusMcuStats() + + var c publisherStatsCounter + + c.Reset() + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 0) + c.EnableStream("audio", false) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 0) + c.EnableStream("audio", true) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1) + c.EnableStream("audio", true) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1) + c.EnableStream("video", true) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1) + c.EnableStream("audio", false) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 0) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1) + c.EnableStream("audio", false) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 0) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1) + + c.AddSubscriber("1") + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 0) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 0) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 1) + c.EnableStream("audio", true) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 1) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 1) + c.AddSubscriber("1") + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 1) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 1) + + c.AddSubscriber("2") + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 2) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 2) + + c.RemoveSubscriber("3") + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 2) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 2) + + c.RemoveSubscriber("1") + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 1) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 1) + + c.AddSubscriber("1") + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 2) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 2) + + c.EnableStream("audio", false) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 0) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 0) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 2) + + c.EnableStream("audio", true) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 1) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 1) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 2) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 2) + + c.EnableStream("audio", false) + c.EnableStream("video", false) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("audio"), 0) + checkStatsValue(t, statsMcuPublisherStreamTypesCurrent.WithLabelValues("video"), 0) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("audio"), 0) + checkStatsValue(t, statsMcuSubscriberStreamTypesCurrent.WithLabelValues("video"), 0) +} diff --git a/mcu_stats_prometheus.go b/mcu_stats_prometheus.go index f6a3a45..0d0e9ca 100644 --- a/mcu_stats_prometheus.go +++ b/mcu_stats_prometheus.go @@ -62,6 +62,12 @@ var ( Name: "messages_total", Help: "The total number of MCU messages", }, []string{"type"}) + statsMcuSubscriberStreamTypesCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "subscriber_streams", + Help: "The current number of subscribed media streams", + }, []string{"type"}) statsMcuPublisherStreamTypesCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "signaling", Subsystem: "mcu", @@ -76,6 +82,7 @@ var ( statsSubscribersTotal, statsWaitingForPublisherTotal, statsMcuMessagesTotal, + statsMcuSubscriberStreamTypesCurrent, statsMcuPublisherStreamTypesCurrent, } diff --git a/natsclient.go b/natsclient.go index a9e07ae..c82c00b 100644 --- a/natsclient.go +++ b/natsclient.go @@ -47,6 +47,8 @@ type NatsMessage struct { Room *BackendServerRoomRequest `json:"room,omitempty"` Permissions []Permission `json:"permissions,omitempty"` + + Id string `json:"id"` } type NatsSubscription interface { diff --git a/stats_prometheus_test.go b/stats_prometheus_test.go new file mode 100644 index 0000000..f573629 --- /dev/null +++ b/stats_prometheus_test.go @@ -0,0 +1,39 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2021 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" +) + +func checkStatsValue(t *testing.T, collector prometheus.Collector, value float64) { + ch := make(chan *prometheus.Desc, 1) + collector.Describe(ch) + desc := <-ch + v := testutil.ToFloat64(collector) + if v != value { + t.Errorf("Expected value %f for %s, got %f", value, desc, v) + } +}