Add more media-related metrics.

This commit is contained in:
Joachim Bauch 2025-11-13 16:51:12 +01:00
commit 2d5379b61d
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
6 changed files with 276 additions and 9 deletions

View file

@ -154,7 +154,7 @@ websocket events handler (`janus.eventhandler.wsevh.jcfg`), the module must be
enabled by setting `enabled` to `true`, the `backend` must be set to the
websocket url of the signaling server (or signaling proxy) and `subprotocol`
must be set to `janus-events`.
At least events of type `media` and `webrtc` must be subscribed.
At least events of type `handles`, `media` and `webrtc` must be subscribed.
Edit the `server.conf` and enter the URL to the websocket endpoint of Janus in
the section `[mcu]` and key `url`. During startup, the signaling server will

View file

@ -67,3 +67,8 @@ The following metrics are available:
| `signaling_mcu_slow_link_total` | Counter | 2.0.5 | Total number of slow link events | `media`, `direction` |
| `signaling_mcu_media_rtt` | Histogram | 2.0.5 | The roundtrip time of WebRTC media in milliseconds | `media` |
| `signaling_mcu_media_jitter` | Histogram | 2.0.5 | The jitter of WebRTC media in milliseconds | `media`, `origin` |
| `signaling_mcu_media_codecs_total` | Counter | 2.0.5 | The total number of codecs | `media`, `codec` |
| `signaling_mcu_media_nacks_total` | Counter | 2.0.5 | The total number of NACKs | `media`, `direction` |
| `signaling_mcu_media_retransmissions_total` | Counter | 2.0.5 | The total number of received retransmissions | `media` |
| `signaling_mcu_media_bytes_total` | Counter | 2.0.5 | The total number of media bytes sent / received | `media`, `direction` |
| `signaling_mcu_media_lost_total` | Counter | 2.0.5 | The total number of lost media packets | `media`, `origin` |

View file

@ -27,6 +27,7 @@ import (
"errors"
"fmt"
"log"
"math"
"net"
"strconv"
"strings"
@ -374,10 +375,10 @@ type JanusEventMediaStats struct {
RTTValues *JanusMediaStatsRTTValues `json:"rtt-values,omitempty"`
// For all media on all layers
PacketsReceived int32 `json:"packets-received"`
PacketsSent int32 `json:"packets-sent"`
BytesReceived int64 `json:"bytes-received"`
BytesSent int64 `json:"bytes-sent"`
PacketsReceived uint32 `json:"packets-received"`
PacketsSent uint32 `json:"packets-sent"`
BytesReceived uint64 `json:"bytes-received"`
BytesSent uint64 `json:"bytes-sent"`
// For layer 0 if REMB is enabled
REMBBitrate uint32 `json:"remb-bitrate"`
@ -532,6 +533,91 @@ type McuEventHandler interface {
UpdateBandwidth(handle uint64, media string, sent api.Bandwidth, received api.Bandwidth)
}
type ValueCounter struct {
values map[string]uint64
}
func (c *ValueCounter) Update(key string, value uint64) uint64 {
if c.values == nil {
c.values = make(map[string]uint64)
}
var delta uint64
prev := c.values[key]
if value == prev {
return 0
} else if value < prev {
// Wrap around
c.values[key] = 0
delta = math.MaxUint64 - prev + value
} else {
delta = value - prev
}
c.values[key] += delta
return delta
}
type handleStats struct {
codecs map[string]string
bytesReceived ValueCounter
bytesSent ValueCounter
nacksReceived ValueCounter
nacksSent ValueCounter
lostLocal ValueCounter
lostRemote ValueCounter
retransmissionsReceived ValueCounter
}
func (h *handleStats) Codec(media string, codec string) {
if h.codecs == nil {
h.codecs = make(map[string]string)
}
if h.codecs[media] != codec {
statsJanusMediaCodecsTotal.WithLabelValues(media, codec).Inc()
h.codecs[media] = codec
}
}
func (h *handleStats) BytesReceived(media string, bytes uint64) {
delta := h.bytesReceived.Update(media, bytes)
statsJanusMediaBytesTotal.WithLabelValues(media, "incoming").Add(float64(delta))
}
func (h *handleStats) BytesSent(media string, bytes uint64) {
delta := h.bytesSent.Update(media, bytes)
statsJanusMediaBytesTotal.WithLabelValues(media, "outgoing").Add(float64(delta))
}
func (h *handleStats) NacksReceived(media string, nacks uint64) {
delta := h.nacksReceived.Update(media, nacks)
statsJanusMediaNACKTotal.WithLabelValues(media, "incoming").Add(float64(delta))
}
func (h *handleStats) NacksSent(media string, nacks uint64) {
delta := h.nacksSent.Update(media, nacks)
statsJanusMediaNACKTotal.WithLabelValues(media, "outgoing").Add(float64(delta))
}
func (h *handleStats) RetransmissionsReceived(media string, retransmissions uint64) {
delta := h.retransmissionsReceived.Update(media, retransmissions)
statsJanusMediaRetransmissionsTotal.WithLabelValues(media).Add(float64(delta))
}
func (h *handleStats) LostLocal(media string, lost uint64) {
delta := h.lostLocal.Update(media, lost)
statsJanusMediaLostTotal.WithLabelValues(media, "local").Add(float64(delta))
}
func (h *handleStats) LostRemote(media string, lost uint64) {
delta := h.lostRemote.Update(media, lost)
statsJanusMediaLostTotal.WithLabelValues(media, "remote").Add(float64(delta))
}
type JanusEventsHandler struct {
mu sync.Mutex
@ -542,6 +628,9 @@ type JanusEventsHandler struct {
addr string
agent string
supportsHandles bool
handleStats map[uint64]*handleStats
events chan JanusEvent
}
@ -730,6 +819,32 @@ func (h *JanusEventsHandler) processEvents() {
}
}
func (h *JanusEventsHandler) deleteHandleStats(event JanusEvent) {
if event.HandleId != 0 {
delete(h.handleStats, event.HandleId)
}
}
func (h *JanusEventsHandler) getHandleStats(event JanusEvent) *handleStats {
if !h.supportsHandles {
// Only create per-handle stats if enabled in Janus. Otherwise the
// handleStats map will never be cleaned up.
return nil
} else if event.HandleId == 0 {
return nil
}
if h.handleStats == nil {
h.handleStats = make(map[uint64]*handleStats)
}
stats, found := h.handleStats[event.HandleId]
if !found {
stats = &handleStats{}
h.handleStats[event.HandleId] = stats
}
return stats
}
func (h *JanusEventsHandler) processEvent(event JanusEvent) {
evt, err := event.Decode()
if err != nil {
@ -738,6 +853,13 @@ func (h *JanusEventsHandler) processEvent(event JanusEvent) {
}
switch evt := evt.(type) {
case *JanusEventHandle:
switch evt.Name {
case "attached":
h.supportsHandles = true
case "detached":
h.deleteHandleStats(event)
}
case *JanusEventWebRTCICE:
statsJanusICEStateTotal.WithLabelValues(evt.ICE).Inc()
case *JanusEventWebRTCDTLS:
@ -766,6 +888,42 @@ func (h *JanusEventsHandler) processEvent(event JanusEvent) {
if jitter := evt.JitterRemote; jitter > 0 {
statsJanusMediaJitter.WithLabelValues(evt.Media, "remote").Observe(float64(jitter))
}
if codec := evt.Codec; codec != "" {
if stats := h.getHandleStats(event); stats != nil {
stats.Codec(evt.Media, codec)
}
}
if stats := h.getHandleStats(event); stats != nil {
stats.BytesReceived(evt.Media, evt.BytesReceived)
}
if stats := h.getHandleStats(event); stats != nil {
stats.BytesSent(evt.Media, evt.BytesSent)
}
if nacks := evt.NacksReceived; nacks > 0 {
if stats := h.getHandleStats(event); stats != nil {
stats.NacksReceived(evt.Media, uint64(nacks))
}
}
if nacks := evt.NacksSent; nacks > 0 {
if stats := h.getHandleStats(event); stats != nil {
stats.NacksSent(evt.Media, uint64(nacks))
}
}
if retransmissions := evt.RetransmissionsReceived; retransmissions > 0 {
if stats := h.getHandleStats(event); stats != nil {
stats.RetransmissionsReceived(evt.Media, uint64(retransmissions))
}
}
if lost := evt.Lost; lost > 0 {
if stats := h.getHandleStats(event); stats != nil {
stats.LostLocal(evt.Media, uint64(lost))
}
}
if lost := evt.LostByRemote; lost > 0 {
if stats := h.getHandleStats(event); stats != nil {
stats.LostRemote(evt.Media, uint64(lost))
}
}
h.mcu.UpdateBandwidth(event.HandleId, evt.Media, api.BandwidthFromBytes(uint64(evt.BytesSentLastSec)), api.BandwidthFromBytes(uint64(evt.BytesReceivedLastSec)))
}
}

View file

@ -25,6 +25,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"net"
"net/http"
"net/http/httptest"
@ -248,6 +249,7 @@ func (s *janusEventSender) SendSingle(t *testing.T, conn *websocket.Conn) {
require.Len(s.events, 1)
require.NoError(conn.WriteJSON(s.events[0]))
s.events = nil
}
func (s *janusEventSender) Send(t *testing.T, conn *websocket.Conn) {
@ -255,6 +257,7 @@ func (s *janusEventSender) Send(t *testing.T, conn *websocket.Conn) {
require := require.New(t)
require.NoError(conn.WriteJSON(s.events))
s.events = nil
}
func (s *janusEventSender) AddEvent(t *testing.T, eventType int, eventSubtype int, handleId uint64, event any) {
@ -493,7 +496,6 @@ func TestJanusEventsHandlerDifferentTypes(t *testing.T) {
}
func TestJanusEventsHandlerNotGrouped(t *testing.T) {
t.Parallel()
require := require.New(t)
assert := assert.New(t)
@ -517,16 +519,51 @@ func TestJanusEventsHandlerNotGrouped(t *testing.T) {
assert.Equal(JanusEventsSubprotocol, response.Header.Get("Sec-WebSocket-Protocol"))
assertCollectorChangeBy(t, statsJanusMediaNACKTotal.WithLabelValues("audio", "incoming"), 20)
assertCollectorChangeBy(t, statsJanusMediaNACKTotal.WithLabelValues("audio", "outgoing"), 30)
assertCollectorChangeBy(t, statsJanusMediaRetransmissionsTotal.WithLabelValues("audio"), 40)
assertCollectorChangeBy(t, statsJanusMediaLostTotal.WithLabelValues("audio", "local"), 50)
assertCollectorChangeBy(t, statsJanusMediaLostTotal.WithLabelValues("audio", "remote"), 60)
var sender janusEventSender
sender.AddEvent(
t,
JanusEventTypeHandle,
0,
1,
JanusEventHandle{
Name: "attached",
},
)
sender.SendSingle(t, conn)
sender.AddEvent(
t,
JanusEventTypeMedia,
JanusEventSubTypeMediaStats,
1,
JanusEventMediaStats{
Media: "audio",
BytesSentLastSec: 100,
BytesReceivedLastSec: 200,
Media: "audio",
BytesSentLastSec: 100,
BytesReceivedLastSec: 200,
Codec: "opus",
RTT: 10,
JitterLocal: 11,
JitterRemote: 12,
NacksReceived: 20,
NacksSent: 30,
RetransmissionsReceived: 40,
Lost: 50,
LostByRemote: 60,
},
)
sender.SendSingle(t, conn)
sender.AddEvent(
t,
JanusEventTypeHandle,
0,
1,
JanusEventHandle{
Name: "detached",
},
)
sender.SendSingle(t, conn)
@ -585,3 +622,19 @@ func TestJanusEventsHandlerGrouped(t *testing.T) {
assert.NoError(mcu.WaitForUpdates(ctx, 2))
}
func TestValueCounter(t *testing.T) {
t.Parallel()
assert := assert.New(t)
var c ValueCounter
assert.EqualValues(0, c.Update("foo", 0))
assert.EqualValues(10, c.Update("foo", 10))
assert.EqualValues(0, c.Update("foo", 10))
assert.EqualValues(1, c.Update("foo", 11))
assert.EqualValues(10, c.Update("bar", 10))
assert.EqualValues(1, c.Update("bar", 11))
assert.EqualValues(uint64(math.MaxUint64-10), c.Update("baz", math.MaxUint64-10))
assert.EqualValues(20, c.Update("baz", 10))
}

View file

@ -136,6 +136,36 @@ var (
Help: "The jitter of WebRTC media in milliseconds",
Buckets: prometheus.ExponentialBucketsRange(1, 2000, 20),
}, []string{"media", "origin"})
statsJanusMediaCodecsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "media_codecs_total",
Help: "The total number of codecs",
}, []string{"media", "codec"})
statsJanusMediaNACKTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "media_nacks_total",
Help: "The total number of NACKs",
}, []string{"media", "direction"})
statsJanusMediaRetransmissionsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "media_retransmissions_total",
Help: "The total number of received retransmissions",
}, []string{"media"})
statsJanusMediaBytesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "media_bytes_total",
Help: "The total number of media bytes sent / received",
}, []string{"media", "direction"})
statsJanusMediaLostTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "media_lost_total",
Help: "The total number of lost media packets",
}, []string{"media", "origin"})
janusMcuStats = []prometheus.Collector{
statsJanusBandwidthCurrent,
@ -146,6 +176,11 @@ var (
statsJanusSlowLinkTotal,
statsJanusMediaRTT,
statsJanusMediaJitter,
statsJanusMediaCodecsTotal,
statsJanusMediaNACKTotal,
statsJanusMediaRetransmissionsTotal,
statsJanusMediaBytesTotal,
statsJanusMediaLostTotal,
}
statsConnectedProxyBackendsCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{

View file

@ -42,6 +42,22 @@ func ResetStatsValue[T prometheus.Gauge](t *testing.T, collector T) {
})
}
func assertCollectorChangeBy(t *testing.T, collector prometheus.Collector, delta float64) {
t.Helper()
ch := make(chan *prometheus.Desc, 1)
collector.Describe(ch)
desc := <-ch
before := testutil.ToFloat64(collector)
t.Cleanup(func() {
t.Helper()
after := testutil.ToFloat64(collector)
assert.EqualValues(t, delta, after-before, "failed for %s", desc)
})
}
func checkStatsValue(t *testing.T, collector prometheus.Collector, value float64) {
// Make sure test is not executed with "t.Parallel()"
t.Setenv("PARALLEL_CHECK", "1")