Merge pull request #1109 from strukturag/webrtc-metrics

Add more WebRTC-related metrics
This commit is contained in:
Joachim Bauch 2025-11-13 23:27:36 +01:00 committed by GitHub
commit 75ea5e710c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 377 additions and 10 deletions

View file

@ -154,7 +154,7 @@ websocket events handler (`janus.eventhandler.wsevh.jcfg`), the module must be
enabled by setting `enabled` to `true`, the `backend` must be set to the
websocket url of the signaling server (or signaling proxy) and `subprotocol`
must be set to `janus-events`.
At least events of type `media` must be subscribed.
At least events of type `handles`, `media` and `webrtc` must be subscribed.
Edit the `server.conf` and enter the URL to the websocket endpoint of Janus in
the section `[mcu]` and key `url`. During startup, the signaling server will

View file

@ -353,6 +353,7 @@ func (c *Client) ReadPump() {
log.Printf("Client from %s has RTT of %d ms (%s)", addr, rtt_ms, rtt)
}
}
statsClientRTT.Observe(float64(rtt.Milliseconds()))
c.getHandler().OnRTTReceived(c, rtt)
}
return nil

View file

@ -32,9 +32,17 @@ var (
Name: "countries_total",
Help: "The total number of connections by country",
}, []string{"country"})
statsClientRTT = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "signaling",
Subsystem: "client",
Name: "rtt",
Help: "The roundtrip time of WebSocket ping messages in milliseconds",
Buckets: prometheus.ExponentialBucketsRange(1, 30000, 50),
})
clientStats = []prometheus.Collector{
statsClientCountries,
statsClientRTT,
}
)

View file

@ -59,3 +59,16 @@ The following metrics are available:
| `signaling_mcu_backend_usage` | Gauge | 2.0.5 | The current usage of signaling proxy backends in percent | `url`, `direction` |
| `signaling_mcu_backend_bandwidth` | Gauge | 2.0.5 | The current bandwidth of signaling proxy backends in bytes per second | `url`, `direction` |
| `signaling_proxy_load` | Gauge | 2.0.5 | The current load of the signaling proxy | |
| `signaling_client_rtt` | Histogram | 2.0.5 | The roundtrip time of WebSocket ping messages in milliseconds | |
| `signaling_mcu_selected_candidate_total` | Counter | 2.0.5 | Total number of selected candidates | `origin`, `type`, `transport`, `family` |
| `signaling_mcu_peerconnection_state_total` | Counter | 2.0.5 | Total number PeerConnection states | `state`, `reason` |
| `signaling_mcu_ice_state_total` | Counter | 2.0.5 | Total number of ICE connection states | `state` |
| `signaling_mcu_dtls_state_total` | Counter | 2.0.5 | Total number of DTLS connection states | `state` |
| `signaling_mcu_slow_link_total` | Counter | 2.0.5 | Total number of slow link events | `media`, `direction` |
| `signaling_mcu_media_rtt` | Histogram | 2.0.5 | The roundtrip time of WebRTC media in milliseconds | `media` |
| `signaling_mcu_media_jitter` | Histogram | 2.0.5 | The jitter of WebRTC media in milliseconds | `media`, `origin` |
| `signaling_mcu_media_codecs_total` | Counter | 2.0.5 | The total number of codecs | `media`, `codec` |
| `signaling_mcu_media_nacks_total` | Counter | 2.0.5 | The total number of NACKs | `media`, `direction` |
| `signaling_mcu_media_retransmissions_total` | Counter | 2.0.5 | The total number of received retransmissions | `media` |
| `signaling_mcu_media_bytes_total` | Counter | 2.0.5 | The total number of media bytes sent / received | `media`, `direction` |
| `signaling_mcu_media_lost_total` | Counter | 2.0.5 | The total number of lost media packets | `media`, `origin` |

View file

@ -27,6 +27,7 @@ import (
"errors"
"fmt"
"log"
"math"
"net"
"strconv"
"strings"
@ -302,7 +303,8 @@ func (e JanusEventWebRTCDTLS) String() string {
// type=16, subtype=6
type JanusEventWebRTCPeerConnection struct {
Connection string `json:"connection"` // "webrtcup"
Connection string `json:"connection"` // "webrtcup", "hangup"
Reason string `json:"reason,omitempty"` // Only if "connection" == "hangup"
}
func (e JanusEventWebRTCPeerConnection) String() string {
@ -373,10 +375,10 @@ type JanusEventMediaStats struct {
RTTValues *JanusMediaStatsRTTValues `json:"rtt-values,omitempty"`
// For all media on all layers
PacketsReceived int32 `json:"packets-received"`
PacketsSent int32 `json:"packets-sent"`
BytesReceived int64 `json:"bytes-received"`
BytesSent int64 `json:"bytes-sent"`
PacketsReceived uint32 `json:"packets-received"`
PacketsSent uint32 `json:"packets-sent"`
BytesReceived uint64 `json:"bytes-received"`
BytesSent uint64 `json:"bytes-sent"`
// For layer 0 if REMB is enabled
REMBBitrate uint32 `json:"remb-bitrate"`
@ -531,6 +533,91 @@ type McuEventHandler interface {
UpdateBandwidth(handle uint64, media string, sent api.Bandwidth, received api.Bandwidth)
}
type ValueCounter struct {
values map[string]uint64
}
func (c *ValueCounter) Update(key string, value uint64) uint64 {
if c.values == nil {
c.values = make(map[string]uint64)
}
var delta uint64
prev := c.values[key]
if value == prev {
return 0
} else if value < prev {
// Wrap around
c.values[key] = 0
delta = math.MaxUint64 - prev + value
} else {
delta = value - prev
}
c.values[key] += delta
return delta
}
type handleStats struct {
codecs map[string]string
bytesReceived ValueCounter
bytesSent ValueCounter
nacksReceived ValueCounter
nacksSent ValueCounter
lostLocal ValueCounter
lostRemote ValueCounter
retransmissionsReceived ValueCounter
}
func (h *handleStats) Codec(media string, codec string) {
if h.codecs == nil {
h.codecs = make(map[string]string)
}
if h.codecs[media] != codec {
statsJanusMediaCodecsTotal.WithLabelValues(media, codec).Inc()
h.codecs[media] = codec
}
}
func (h *handleStats) BytesReceived(media string, bytes uint64) {
delta := h.bytesReceived.Update(media, bytes)
statsJanusMediaBytesTotal.WithLabelValues(media, "incoming").Add(float64(delta))
}
func (h *handleStats) BytesSent(media string, bytes uint64) {
delta := h.bytesSent.Update(media, bytes)
statsJanusMediaBytesTotal.WithLabelValues(media, "outgoing").Add(float64(delta))
}
func (h *handleStats) NacksReceived(media string, nacks uint64) {
delta := h.nacksReceived.Update(media, nacks)
statsJanusMediaNACKTotal.WithLabelValues(media, "incoming").Add(float64(delta))
}
func (h *handleStats) NacksSent(media string, nacks uint64) {
delta := h.nacksSent.Update(media, nacks)
statsJanusMediaNACKTotal.WithLabelValues(media, "outgoing").Add(float64(delta))
}
func (h *handleStats) RetransmissionsReceived(media string, retransmissions uint64) {
delta := h.retransmissionsReceived.Update(media, retransmissions)
statsJanusMediaRetransmissionsTotal.WithLabelValues(media).Add(float64(delta))
}
func (h *handleStats) LostLocal(media string, lost uint64) {
delta := h.lostLocal.Update(media, lost)
statsJanusMediaLostTotal.WithLabelValues(media, "local").Add(float64(delta))
}
func (h *handleStats) LostRemote(media string, lost uint64) {
delta := h.lostRemote.Update(media, lost)
statsJanusMediaLostTotal.WithLabelValues(media, "remote").Add(float64(delta))
}
type JanusEventsHandler struct {
mu sync.Mutex
@ -541,6 +628,9 @@ type JanusEventsHandler struct {
addr string
agent string
supportsHandles bool
handleStats map[uint64]*handleStats
events chan JanusEvent
}
@ -729,6 +819,32 @@ func (h *JanusEventsHandler) processEvents() {
}
}
func (h *JanusEventsHandler) deleteHandleStats(event JanusEvent) {
if event.HandleId != 0 {
delete(h.handleStats, event.HandleId)
}
}
func (h *JanusEventsHandler) getHandleStats(event JanusEvent) *handleStats {
if !h.supportsHandles {
// Only create per-handle stats if enabled in Janus. Otherwise the
// handleStats map will never be cleaned up.
return nil
} else if event.HandleId == 0 {
return nil
}
if h.handleStats == nil {
h.handleStats = make(map[uint64]*handleStats)
}
stats, found := h.handleStats[event.HandleId]
if !found {
stats = &handleStats{}
h.handleStats[event.HandleId] = stats
}
return stats
}
func (h *JanusEventsHandler) processEvent(event JanusEvent) {
evt, err := event.Decode()
if err != nil {
@ -737,7 +853,77 @@ func (h *JanusEventsHandler) processEvent(event JanusEvent) {
}
switch evt := evt.(type) {
case *JanusEventHandle:
switch evt.Name {
case "attached":
h.supportsHandles = true
case "detached":
h.deleteHandleStats(event)
}
case *JanusEventWebRTCICE:
statsJanusICEStateTotal.WithLabelValues(evt.ICE).Inc()
case *JanusEventWebRTCDTLS:
statsJanusDTLSStateTotal.WithLabelValues(evt.DTLS).Inc()
case *JanusEventWebRTCPeerConnection:
statsJanusPeerConnectionStateTotal.WithLabelValues(evt.Connection, evt.Reason).Inc()
case *JanusEventWebRTCSelectedPair:
statsJanusSelectedCandidateTotal.WithLabelValues("local", evt.Candidates.Local.Type, evt.Candidates.Local.Transport, fmt.Sprintf("ipv%d", evt.Candidates.Local.Family)).Inc()
statsJanusSelectedCandidateTotal.WithLabelValues("remote", evt.Candidates.Remote.Type, evt.Candidates.Remote.Transport, fmt.Sprintf("ipv%d", evt.Candidates.Remote.Family)).Inc()
case *JanusEventMediaSlowLink:
var direction string
// "uplink" is Janus -> client, "downlink" is client -> Janus.
if evt.SlowLink == "uplink" {
direction = "outgoing"
} else {
direction = "incoming"
}
statsJanusSlowLinkTotal.WithLabelValues(evt.Media, direction).Inc()
case *JanusEventMediaStats:
if rtt := evt.RTT; rtt > 0 {
statsJanusMediaRTT.WithLabelValues(evt.Media).Observe(float64(rtt))
}
if jitter := evt.JitterLocal; jitter > 0 {
statsJanusMediaJitter.WithLabelValues(evt.Media, "local").Observe(float64(jitter))
}
if jitter := evt.JitterRemote; jitter > 0 {
statsJanusMediaJitter.WithLabelValues(evt.Media, "remote").Observe(float64(jitter))
}
if codec := evt.Codec; codec != "" {
if stats := h.getHandleStats(event); stats != nil {
stats.Codec(evt.Media, codec)
}
}
if stats := h.getHandleStats(event); stats != nil {
stats.BytesReceived(evt.Media, evt.BytesReceived)
}
if stats := h.getHandleStats(event); stats != nil {
stats.BytesSent(evt.Media, evt.BytesSent)
}
if nacks := evt.NacksReceived; nacks > 0 {
if stats := h.getHandleStats(event); stats != nil {
stats.NacksReceived(evt.Media, uint64(nacks))
}
}
if nacks := evt.NacksSent; nacks > 0 {
if stats := h.getHandleStats(event); stats != nil {
stats.NacksSent(evt.Media, uint64(nacks))
}
}
if retransmissions := evt.RetransmissionsReceived; retransmissions > 0 {
if stats := h.getHandleStats(event); stats != nil {
stats.RetransmissionsReceived(evt.Media, uint64(retransmissions))
}
}
if lost := evt.Lost; lost > 0 {
if stats := h.getHandleStats(event); stats != nil {
stats.LostLocal(evt.Media, uint64(lost))
}
}
if lost := evt.LostByRemote; lost > 0 {
if stats := h.getHandleStats(event); stats != nil {
stats.LostRemote(evt.Media, uint64(lost))
}
}
h.mcu.UpdateBandwidth(event.HandleId, evt.Media, api.BandwidthFromBytes(uint64(evt.BytesSentLastSec)), api.BandwidthFromBytes(uint64(evt.BytesReceivedLastSec)))
}
}

View file

@ -25,6 +25,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"net"
"net/http"
"net/http/httptest"
@ -248,6 +249,7 @@ func (s *janusEventSender) SendSingle(t *testing.T, conn *websocket.Conn) {
require.Len(s.events, 1)
require.NoError(conn.WriteJSON(s.events[0]))
s.events = nil
}
func (s *janusEventSender) Send(t *testing.T, conn *websocket.Conn) {
@ -255,6 +257,7 @@ func (s *janusEventSender) Send(t *testing.T, conn *websocket.Conn) {
require := require.New(t)
require.NoError(conn.WriteJSON(s.events))
s.events = nil
}
func (s *janusEventSender) AddEvent(t *testing.T, eventType int, eventSubtype int, handleId uint64, event any) {
@ -493,7 +496,6 @@ func TestJanusEventsHandlerDifferentTypes(t *testing.T) {
}
func TestJanusEventsHandlerNotGrouped(t *testing.T) {
t.Parallel()
require := require.New(t)
assert := assert.New(t)
@ -517,16 +519,51 @@ func TestJanusEventsHandlerNotGrouped(t *testing.T) {
assert.Equal(JanusEventsSubprotocol, response.Header.Get("Sec-WebSocket-Protocol"))
assertCollectorChangeBy(t, statsJanusMediaNACKTotal.WithLabelValues("audio", "incoming"), 20)
assertCollectorChangeBy(t, statsJanusMediaNACKTotal.WithLabelValues("audio", "outgoing"), 30)
assertCollectorChangeBy(t, statsJanusMediaRetransmissionsTotal.WithLabelValues("audio"), 40)
assertCollectorChangeBy(t, statsJanusMediaLostTotal.WithLabelValues("audio", "local"), 50)
assertCollectorChangeBy(t, statsJanusMediaLostTotal.WithLabelValues("audio", "remote"), 60)
var sender janusEventSender
sender.AddEvent(
t,
JanusEventTypeHandle,
0,
1,
JanusEventHandle{
Name: "attached",
},
)
sender.SendSingle(t, conn)
sender.AddEvent(
t,
JanusEventTypeMedia,
JanusEventSubTypeMediaStats,
1,
JanusEventMediaStats{
Media: "audio",
BytesSentLastSec: 100,
BytesReceivedLastSec: 200,
Media: "audio",
BytesSentLastSec: 100,
BytesReceivedLastSec: 200,
Codec: "opus",
RTT: 10,
JitterLocal: 11,
JitterRemote: 12,
NacksReceived: 20,
NacksSent: 30,
RetransmissionsReceived: 40,
Lost: 50,
LostByRemote: 60,
},
)
sender.SendSingle(t, conn)
sender.AddEvent(
t,
JanusEventTypeHandle,
0,
1,
JanusEventHandle{
Name: "detached",
},
)
sender.SendSingle(t, conn)
@ -585,3 +622,19 @@ func TestJanusEventsHandlerGrouped(t *testing.T) {
assert.NoError(mcu.WaitForUpdates(ctx, 2))
}
func TestValueCounter(t *testing.T) {
t.Parallel()
assert := assert.New(t)
var c ValueCounter
assert.EqualValues(0, c.Update("foo", 0))
assert.EqualValues(10, c.Update("foo", 10))
assert.EqualValues(0, c.Update("foo", 10))
assert.EqualValues(1, c.Update("foo", 11))
assert.EqualValues(10, c.Update("bar", 10))
assert.EqualValues(1, c.Update("bar", 11))
assert.EqualValues(uint64(math.MaxUint64-10), c.Update("baz", math.MaxUint64-10))
assert.EqualValues(20, c.Update("baz", 10))
}

View file

@ -39,6 +39,10 @@ import (
"github.com/strukturag/nextcloud-spreed-signaling/api"
)
func TestMcuJanusStats(t *testing.T) {
collectAndLint(t, janusMcuStats...)
}
type TestJanusHandle struct {
id uint64

View file

@ -92,9 +92,95 @@ var (
Name: "bandwidth",
Help: "The current bandwidth in bytes per second",
}, []string{"direction"})
statsJanusSelectedCandidateTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "selected_candidate_total",
Help: "Total number of selected candidates",
}, []string{"origin", "type", "transport", "family"})
statsJanusPeerConnectionStateTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "peerconnection_state_total",
Help: "Total number of PeerConnections states",
}, []string{"state", "reason"})
statsJanusICEStateTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "ice_state_total",
Help: "Total number of ICE connection states",
}, []string{"state"})
statsJanusDTLSStateTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "dtls_state_total",
Help: "Total number of DTLS connection states",
}, []string{"state"})
statsJanusSlowLinkTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "slow_link_total",
Help: "Total number of slow link events",
}, []string{"media", "direction"})
statsJanusMediaRTT = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "media_rtt",
Help: "The roundtrip time of WebRTC media in milliseconds",
Buckets: prometheus.ExponentialBucketsRange(1, 10000, 25),
}, []string{"media"})
statsJanusMediaJitter = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "media_jitter",
Help: "The jitter of WebRTC media in milliseconds",
Buckets: prometheus.ExponentialBucketsRange(1, 2000, 20),
}, []string{"media", "origin"})
statsJanusMediaCodecsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "media_codecs_total",
Help: "The total number of codecs",
}, []string{"media", "codec"})
statsJanusMediaNACKTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "media_nacks_total",
Help: "The total number of NACKs",
}, []string{"media", "direction"})
statsJanusMediaRetransmissionsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "media_retransmissions_total",
Help: "The total number of received retransmissions",
}, []string{"media"})
statsJanusMediaBytesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "media_bytes_total",
Help: "The total number of media bytes sent / received",
}, []string{"media", "direction"})
statsJanusMediaLostTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "media_lost_total",
Help: "The total number of lost media packets",
}, []string{"media", "origin"})
janusMcuStats = []prometheus.Collector{
statsJanusBandwidthCurrent,
statsJanusSelectedCandidateTotal,
statsJanusPeerConnectionStateTotal,
statsJanusICEStateTotal,
statsJanusDTLSStateTotal,
statsJanusSlowLinkTotal,
statsJanusMediaRTT,
statsJanusMediaJitter,
statsJanusMediaCodecsTotal,
statsJanusMediaNACKTotal,
statsJanusMediaRetransmissionsTotal,
statsJanusMediaBytesTotal,
statsJanusMediaLostTotal,
}
statsConnectedProxyBackendsCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{

View file

@ -42,6 +42,22 @@ func ResetStatsValue[T prometheus.Gauge](t *testing.T, collector T) {
})
}
func assertCollectorChangeBy(t *testing.T, collector prometheus.Collector, delta float64) {
t.Helper()
ch := make(chan *prometheus.Desc, 1)
collector.Describe(ch)
desc := <-ch
before := testutil.ToFloat64(collector)
t.Cleanup(func() {
t.Helper()
after := testutil.ToFloat64(collector)
assert.EqualValues(t, delta, after-before, "failed for %s", desc)
})
}
func checkStatsValue(t *testing.T, collector prometheus.Collector, value float64) {
// Make sure test is not executed with "t.Parallel()"
t.Setenv("PARALLEL_CHECK", "1")