From 71fda2f258024ea79d5b4d94f6d77eab76f6ba06 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 6 Nov 2025 16:35:54 +0100 Subject: [PATCH 1/8] Add metric for RTT of WebSocket ping messages. --- client.go | 1 + client_stats_prometheus.go | 8 ++++++++ docs/prometheus-metrics.md | 1 + 3 files changed, 10 insertions(+) diff --git a/client.go b/client.go index 53f8967..b7d7d38 100644 --- a/client.go +++ b/client.go @@ -353,6 +353,7 @@ func (c *Client) ReadPump() { log.Printf("Client from %s has RTT of %d ms (%s)", addr, rtt_ms, rtt) } } + statsClientRTT.Observe(float64(rtt.Milliseconds())) c.getHandler().OnRTTReceived(c, rtt) } return nil diff --git a/client_stats_prometheus.go b/client_stats_prometheus.go index e20447e..6a6d140 100644 --- a/client_stats_prometheus.go +++ b/client_stats_prometheus.go @@ -32,9 +32,17 @@ var ( Name: "countries_total", Help: "The total number of connections by country", }, []string{"country"}) + statsClientRTT = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "signaling", + Subsystem: "client", + Name: "rtt", + Help: "The roundtrip time of WebSocket ping messages in milliseconds", + Buckets: prometheus.ExponentialBucketsRange(1, 30000, 50), + }) clientStats = []prometheus.Collector{ statsClientCountries, + statsClientRTT, } ) diff --git a/docs/prometheus-metrics.md b/docs/prometheus-metrics.md index 1f1c155..013c601 100644 --- a/docs/prometheus-metrics.md +++ b/docs/prometheus-metrics.md @@ -59,3 +59,4 @@ The following metrics are available: | `signaling_mcu_backend_usage` | Gauge | 2.0.5 | The current usage of signaling proxy backends in percent | `url`, `direction` | | `signaling_mcu_backend_bandwidth` | Gauge | 2.0.5 | The current bandwidth of signaling proxy backends in bytes per second | `url`, `direction` | | `signaling_proxy_load` | Gauge | 2.0.5 | The current load of the signaling proxy | | +| `signaling_client_rtt` | Histogram | 2.0.5 | The roundtrip time of WebSocket ping messages in milliseconds | | From fa900132b43faef966f11a99483dd0565e774846 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 6 Nov 2025 20:41:43 +0100 Subject: [PATCH 2/8] Add metrics for candidates and ICE, DTLS and PeerConnection states. --- docs/prometheus-metrics.md | 5 +++++ mcu_janus_events_handler.go | 12 +++++++++++- mcu_stats_prometheus.go | 35 +++++++++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 1 deletion(-) diff --git a/docs/prometheus-metrics.md b/docs/prometheus-metrics.md index 013c601..3ff405d 100644 --- a/docs/prometheus-metrics.md +++ b/docs/prometheus-metrics.md @@ -60,3 +60,8 @@ The following metrics are available: | `signaling_mcu_backend_bandwidth` | Gauge | 2.0.5 | The current bandwidth of signaling proxy backends in bytes per second | `url`, `direction` | | `signaling_proxy_load` | Gauge | 2.0.5 | The current load of the signaling proxy | | | `signaling_client_rtt` | Histogram | 2.0.5 | The roundtrip time of WebSocket ping messages in milliseconds | | +| `signaling_mcu_selected_local_candidate_total` | Counter | 2.0.5 | Total number of selected local candidates | `type`, `transport`, `family` | +| `signaling_mcu_selected_remote_candidate_total` | Counter | 2.0.5 | Total number of selected local candidates | `type`, `transport`, `family` | +| `signaling_mcu_peerconnection_state_total` | Counter | 2.0.5 | Total number PeerConnection states | `state`, `reason` | +| `signaling_mcu_ice_state_total` | Counter | 2.0.5 | Total number of ICE connection states | `state` | +| `signaling_mcu_dtls_state_total` | Counter | 2.0.5 | Total number of DTLS connection states | `state` | diff --git a/mcu_janus_events_handler.go b/mcu_janus_events_handler.go index 0bb97c5..e9d6706 100644 --- a/mcu_janus_events_handler.go +++ b/mcu_janus_events_handler.go @@ -302,7 +302,8 @@ func (e JanusEventWebRTCDTLS) String() string { // type=16, subtype=6 type JanusEventWebRTCPeerConnection struct { - Connection string `json:"connection"` // "webrtcup" + Connection string `json:"connection"` // "webrtcup", "hangup" + Reason string `json:"reason,omitempty"` // Only if "connection" == "hangup" } func (e JanusEventWebRTCPeerConnection) String() string { @@ -737,6 +738,15 @@ func (h *JanusEventsHandler) processEvent(event JanusEvent) { } switch evt := evt.(type) { + case *JanusEventWebRTCICE: + statsJanusICEStateTotal.WithLabelValues(evt.ICE).Inc() + case *JanusEventWebRTCDTLS: + statsJanusDTLSStateTotal.WithLabelValues(evt.DTLS).Inc() + case *JanusEventWebRTCPeerConnection: + statsJanusPeerConnectionStateTotal.WithLabelValues(evt.Connection, evt.Reason).Inc() + case *JanusEventWebRTCSelectedPair: + statsJanusSelectedLocalCandidateTotal.WithLabelValues(evt.Candidates.Local.Type, evt.Candidates.Local.Transport, fmt.Sprintf("ipv%d", evt.Candidates.Local.Family)).Inc() + statsJanusSelectedRemoteCandidateTotal.WithLabelValues(evt.Candidates.Remote.Type, evt.Candidates.Remote.Transport, fmt.Sprintf("ipv%d", evt.Candidates.Remote.Family)).Inc() case *JanusEventMediaStats: h.mcu.UpdateBandwidth(event.HandleId, evt.Media, api.BandwidthFromBytes(uint64(evt.BytesSentLastSec)), api.BandwidthFromBytes(uint64(evt.BytesReceivedLastSec))) } diff --git a/mcu_stats_prometheus.go b/mcu_stats_prometheus.go index cd063ea..a821a89 100644 --- a/mcu_stats_prometheus.go +++ b/mcu_stats_prometheus.go @@ -92,9 +92,44 @@ var ( Name: "bandwidth", Help: "The current bandwidth in bytes per second", }, []string{"direction"}) + statsJanusSelectedLocalCandidateTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "selected_local_candidate_total", + Help: "Total number of selected local candidates", + }, []string{"type", "transport", "family"}) + statsJanusSelectedRemoteCandidateTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "selected_remote_candidate_total", + Help: "Total number of selected remote candidates", + }, []string{"type", "transport", "family"}) + statsJanusPeerConnectionStateTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "peerconnection_state_total", + Help: "Total number of PeerConnections states", + }, []string{"state", "reason"}) + statsJanusICEStateTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "ice_state_total", + Help: "Total number of ICE connection states", + }, []string{"state"}) + statsJanusDTLSStateTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "dtls_state_total", + Help: "Total number of DTLS connection states", + }, []string{"state"}) janusMcuStats = []prometheus.Collector{ statsJanusBandwidthCurrent, + statsJanusSelectedLocalCandidateTotal, + statsJanusSelectedRemoteCandidateTotal, + statsJanusPeerConnectionStateTotal, + statsJanusICEStateTotal, + statsJanusDTLSStateTotal, } statsConnectedProxyBackendsCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{ From a5c7ad272fb163dc9be37b0c32fb9074d25a84f3 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 6 Nov 2025 20:44:48 +0100 Subject: [PATCH 3/8] Need events of type "webrtc". --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2c23600..7cc8b83 100644 --- a/README.md +++ b/README.md @@ -154,7 +154,7 @@ websocket events handler (`janus.eventhandler.wsevh.jcfg`), the module must be enabled by setting `enabled` to `true`, the `backend` must be set to the websocket url of the signaling server (or signaling proxy) and `subprotocol` must be set to `janus-events`. -At least events of type `media` must be subscribed. +At least events of type `media` and `webrtc` must be subscribed. Edit the `server.conf` and enter the URL to the websocket endpoint of Janus in the section `[mcu]` and key `url`. During startup, the signaling server will From 1f0ed8005aa72e0a82a1977c8a53e7af9eb704a0 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 13 Nov 2025 14:20:28 +0100 Subject: [PATCH 4/8] Add metrics for Janus slow link events. --- docs/prometheus-metrics.md | 1 + mcu_janus_events_handler.go | 9 +++++++++ mcu_stats_prometheus.go | 7 +++++++ 3 files changed, 17 insertions(+) diff --git a/docs/prometheus-metrics.md b/docs/prometheus-metrics.md index 3ff405d..6784592 100644 --- a/docs/prometheus-metrics.md +++ b/docs/prometheus-metrics.md @@ -65,3 +65,4 @@ The following metrics are available: | `signaling_mcu_peerconnection_state_total` | Counter | 2.0.5 | Total number PeerConnection states | `state`, `reason` | | `signaling_mcu_ice_state_total` | Counter | 2.0.5 | Total number of ICE connection states | `state` | | `signaling_mcu_dtls_state_total` | Counter | 2.0.5 | Total number of DTLS connection states | `state` | +| `signaling_mcu_slow_link_total` | Counter | 2.0.5 | Total number of slow link events | `media`, `direction` | diff --git a/mcu_janus_events_handler.go b/mcu_janus_events_handler.go index e9d6706..b1bae25 100644 --- a/mcu_janus_events_handler.go +++ b/mcu_janus_events_handler.go @@ -747,6 +747,15 @@ func (h *JanusEventsHandler) processEvent(event JanusEvent) { case *JanusEventWebRTCSelectedPair: statsJanusSelectedLocalCandidateTotal.WithLabelValues(evt.Candidates.Local.Type, evt.Candidates.Local.Transport, fmt.Sprintf("ipv%d", evt.Candidates.Local.Family)).Inc() statsJanusSelectedRemoteCandidateTotal.WithLabelValues(evt.Candidates.Remote.Type, evt.Candidates.Remote.Transport, fmt.Sprintf("ipv%d", evt.Candidates.Remote.Family)).Inc() + case *JanusEventMediaSlowLink: + var direction string + // "uplink" is Janus -> client, "downlink" is client -> Janus. + if evt.SlowLink == "uplink" { + direction = "outgoing" + } else { + direction = "incoming" + } + statsJanusSlowLinkTotal.WithLabelValues(evt.Media, direction).Inc() case *JanusEventMediaStats: h.mcu.UpdateBandwidth(event.HandleId, evt.Media, api.BandwidthFromBytes(uint64(evt.BytesSentLastSec)), api.BandwidthFromBytes(uint64(evt.BytesReceivedLastSec))) } diff --git a/mcu_stats_prometheus.go b/mcu_stats_prometheus.go index a821a89..239249a 100644 --- a/mcu_stats_prometheus.go +++ b/mcu_stats_prometheus.go @@ -122,6 +122,12 @@ var ( Name: "dtls_state_total", Help: "Total number of DTLS connection states", }, []string{"state"}) + statsJanusSlowLinkTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "slow_link_total", + Help: "Total number of slow link events", + }, []string{"media", "direction"}) janusMcuStats = []prometheus.Collector{ statsJanusBandwidthCurrent, @@ -130,6 +136,7 @@ var ( statsJanusPeerConnectionStateTotal, statsJanusICEStateTotal, statsJanusDTLSStateTotal, + statsJanusSlowLinkTotal, } statsConnectedProxyBackendsCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{ From 1ad460cee61a2a5724f20d8ca834107f4bf4f6f3 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 13 Nov 2025 14:39:57 +0100 Subject: [PATCH 5/8] Add metrics for media RTT / jitter. --- docs/prometheus-metrics.md | 2 ++ mcu_janus_events_handler.go | 9 +++++++++ mcu_stats_prometheus.go | 16 ++++++++++++++++ 3 files changed, 27 insertions(+) diff --git a/docs/prometheus-metrics.md b/docs/prometheus-metrics.md index 6784592..ac4a3c2 100644 --- a/docs/prometheus-metrics.md +++ b/docs/prometheus-metrics.md @@ -66,3 +66,5 @@ The following metrics are available: | `signaling_mcu_ice_state_total` | Counter | 2.0.5 | Total number of ICE connection states | `state` | | `signaling_mcu_dtls_state_total` | Counter | 2.0.5 | Total number of DTLS connection states | `state` | | `signaling_mcu_slow_link_total` | Counter | 2.0.5 | Total number of slow link events | `media`, `direction` | +| `signaling_mcu_media_rtt` | Histogram | 2.0.5 | The roundtrip time of WebRTC media in milliseconds | `media` | +| `signaling_mcu_media_jitter` | Histogram | 2.0.5 | The jitter of WebRTC media in milliseconds | `media`, `origin` | diff --git a/mcu_janus_events_handler.go b/mcu_janus_events_handler.go index b1bae25..3467102 100644 --- a/mcu_janus_events_handler.go +++ b/mcu_janus_events_handler.go @@ -757,6 +757,15 @@ func (h *JanusEventsHandler) processEvent(event JanusEvent) { } statsJanusSlowLinkTotal.WithLabelValues(evt.Media, direction).Inc() case *JanusEventMediaStats: + if rtt := evt.RTT; rtt > 0 { + statsJanusMediaRTT.WithLabelValues(evt.Media).Observe(float64(rtt)) + } + if jitter := evt.JitterLocal; jitter > 0 { + statsJanusMediaJitter.WithLabelValues(evt.Media, "local").Observe(float64(jitter)) + } + if jitter := evt.JitterRemote; jitter > 0 { + statsJanusMediaJitter.WithLabelValues(evt.Media, "remote").Observe(float64(jitter)) + } h.mcu.UpdateBandwidth(event.HandleId, evt.Media, api.BandwidthFromBytes(uint64(evt.BytesSentLastSec)), api.BandwidthFromBytes(uint64(evt.BytesReceivedLastSec))) } } diff --git a/mcu_stats_prometheus.go b/mcu_stats_prometheus.go index 239249a..a13ef1b 100644 --- a/mcu_stats_prometheus.go +++ b/mcu_stats_prometheus.go @@ -128,6 +128,20 @@ var ( Name: "slow_link_total", Help: "Total number of slow link events", }, []string{"media", "direction"}) + statsJanusMediaRTT = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "media_rtt", + Help: "The roundtrip time of WebRTC media in milliseconds", + Buckets: prometheus.ExponentialBucketsRange(1, 10000, 25), + }, []string{"media"}) + statsJanusMediaJitter = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "media_jitter", + Help: "The jitter of WebRTC media in milliseconds", + Buckets: prometheus.ExponentialBucketsRange(1, 2000, 20), + }, []string{"media", "origin"}) janusMcuStats = []prometheus.Collector{ statsJanusBandwidthCurrent, @@ -137,6 +151,8 @@ var ( statsJanusICEStateTotal, statsJanusDTLSStateTotal, statsJanusSlowLinkTotal, + statsJanusMediaRTT, + statsJanusMediaJitter, } statsConnectedProxyBackendsCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{ From 826d6244f3f808973b6e1574cde1f9807717ba58 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 13 Nov 2025 14:43:15 +0100 Subject: [PATCH 6/8] Use single metrics for selected candidates. --- docs/prometheus-metrics.md | 3 +-- mcu_janus_events_handler.go | 4 ++-- mcu_stats_prometheus.go | 17 +++++------------ 3 files changed, 8 insertions(+), 16 deletions(-) diff --git a/docs/prometheus-metrics.md b/docs/prometheus-metrics.md index ac4a3c2..c8443df 100644 --- a/docs/prometheus-metrics.md +++ b/docs/prometheus-metrics.md @@ -60,8 +60,7 @@ The following metrics are available: | `signaling_mcu_backend_bandwidth` | Gauge | 2.0.5 | The current bandwidth of signaling proxy backends in bytes per second | `url`, `direction` | | `signaling_proxy_load` | Gauge | 2.0.5 | The current load of the signaling proxy | | | `signaling_client_rtt` | Histogram | 2.0.5 | The roundtrip time of WebSocket ping messages in milliseconds | | -| `signaling_mcu_selected_local_candidate_total` | Counter | 2.0.5 | Total number of selected local candidates | `type`, `transport`, `family` | -| `signaling_mcu_selected_remote_candidate_total` | Counter | 2.0.5 | Total number of selected local candidates | `type`, `transport`, `family` | +| `signaling_mcu_selected_candidate_total` | Counter | 2.0.5 | Total number of selected candidates | `origin`, `type`, `transport`, `family` | | `signaling_mcu_peerconnection_state_total` | Counter | 2.0.5 | Total number PeerConnection states | `state`, `reason` | | `signaling_mcu_ice_state_total` | Counter | 2.0.5 | Total number of ICE connection states | `state` | | `signaling_mcu_dtls_state_total` | Counter | 2.0.5 | Total number of DTLS connection states | `state` | diff --git a/mcu_janus_events_handler.go b/mcu_janus_events_handler.go index 3467102..b488548 100644 --- a/mcu_janus_events_handler.go +++ b/mcu_janus_events_handler.go @@ -745,8 +745,8 @@ func (h *JanusEventsHandler) processEvent(event JanusEvent) { case *JanusEventWebRTCPeerConnection: statsJanusPeerConnectionStateTotal.WithLabelValues(evt.Connection, evt.Reason).Inc() case *JanusEventWebRTCSelectedPair: - statsJanusSelectedLocalCandidateTotal.WithLabelValues(evt.Candidates.Local.Type, evt.Candidates.Local.Transport, fmt.Sprintf("ipv%d", evt.Candidates.Local.Family)).Inc() - statsJanusSelectedRemoteCandidateTotal.WithLabelValues(evt.Candidates.Remote.Type, evt.Candidates.Remote.Transport, fmt.Sprintf("ipv%d", evt.Candidates.Remote.Family)).Inc() + statsJanusSelectedCandidateTotal.WithLabelValues("local", evt.Candidates.Local.Type, evt.Candidates.Local.Transport, fmt.Sprintf("ipv%d", evt.Candidates.Local.Family)).Inc() + statsJanusSelectedCandidateTotal.WithLabelValues("remote", evt.Candidates.Remote.Type, evt.Candidates.Remote.Transport, fmt.Sprintf("ipv%d", evt.Candidates.Remote.Family)).Inc() case *JanusEventMediaSlowLink: var direction string // "uplink" is Janus -> client, "downlink" is client -> Janus. diff --git a/mcu_stats_prometheus.go b/mcu_stats_prometheus.go index a13ef1b..2e08092 100644 --- a/mcu_stats_prometheus.go +++ b/mcu_stats_prometheus.go @@ -92,18 +92,12 @@ var ( Name: "bandwidth", Help: "The current bandwidth in bytes per second", }, []string{"direction"}) - statsJanusSelectedLocalCandidateTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + statsJanusSelectedCandidateTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "signaling", Subsystem: "mcu", - Name: "selected_local_candidate_total", - Help: "Total number of selected local candidates", - }, []string{"type", "transport", "family"}) - statsJanusSelectedRemoteCandidateTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "signaling", - Subsystem: "mcu", - Name: "selected_remote_candidate_total", - Help: "Total number of selected remote candidates", - }, []string{"type", "transport", "family"}) + Name: "selected_candidate_total", + Help: "Total number of selected candidates", + }, []string{"origin", "type", "transport", "family"}) statsJanusPeerConnectionStateTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "signaling", Subsystem: "mcu", @@ -145,8 +139,7 @@ var ( janusMcuStats = []prometheus.Collector{ statsJanusBandwidthCurrent, - statsJanusSelectedLocalCandidateTotal, - statsJanusSelectedRemoteCandidateTotal, + statsJanusSelectedCandidateTotal, statsJanusPeerConnectionStateTotal, statsJanusICEStateTotal, statsJanusDTLSStateTotal, From 2d5379b61d9267974da13acd323da37f44eb4f24 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 13 Nov 2025 16:51:12 +0100 Subject: [PATCH 7/8] Add more media-related metrics. --- README.md | 2 +- docs/prometheus-metrics.md | 5 + mcu_janus_events_handler.go | 166 ++++++++++++++++++++++++++++++- mcu_janus_events_handler_test.go | 61 +++++++++++- mcu_stats_prometheus.go | 35 +++++++ stats_prometheus_test.go | 16 +++ 6 files changed, 276 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 7cc8b83..7398c48 100644 --- a/README.md +++ b/README.md @@ -154,7 +154,7 @@ websocket events handler (`janus.eventhandler.wsevh.jcfg`), the module must be enabled by setting `enabled` to `true`, the `backend` must be set to the websocket url of the signaling server (or signaling proxy) and `subprotocol` must be set to `janus-events`. -At least events of type `media` and `webrtc` must be subscribed. +At least events of type `handles`, `media` and `webrtc` must be subscribed. Edit the `server.conf` and enter the URL to the websocket endpoint of Janus in the section `[mcu]` and key `url`. During startup, the signaling server will diff --git a/docs/prometheus-metrics.md b/docs/prometheus-metrics.md index c8443df..3016d6a 100644 --- a/docs/prometheus-metrics.md +++ b/docs/prometheus-metrics.md @@ -67,3 +67,8 @@ The following metrics are available: | `signaling_mcu_slow_link_total` | Counter | 2.0.5 | Total number of slow link events | `media`, `direction` | | `signaling_mcu_media_rtt` | Histogram | 2.0.5 | The roundtrip time of WebRTC media in milliseconds | `media` | | `signaling_mcu_media_jitter` | Histogram | 2.0.5 | The jitter of WebRTC media in milliseconds | `media`, `origin` | +| `signaling_mcu_media_codecs_total` | Counter | 2.0.5 | The total number of codecs | `media`, `codec` | +| `signaling_mcu_media_nacks_total` | Counter | 2.0.5 | The total number of NACKs | `media`, `direction` | +| `signaling_mcu_media_retransmissions_total` | Counter | 2.0.5 | The total number of received retransmissions | `media` | +| `signaling_mcu_media_bytes_total` | Counter | 2.0.5 | The total number of media bytes sent / received | `media`, `direction` | +| `signaling_mcu_media_lost_total` | Counter | 2.0.5 | The total number of lost media packets | `media`, `origin` | diff --git a/mcu_janus_events_handler.go b/mcu_janus_events_handler.go index b488548..7d0ddc1 100644 --- a/mcu_janus_events_handler.go +++ b/mcu_janus_events_handler.go @@ -27,6 +27,7 @@ import ( "errors" "fmt" "log" + "math" "net" "strconv" "strings" @@ -374,10 +375,10 @@ type JanusEventMediaStats struct { RTTValues *JanusMediaStatsRTTValues `json:"rtt-values,omitempty"` // For all media on all layers - PacketsReceived int32 `json:"packets-received"` - PacketsSent int32 `json:"packets-sent"` - BytesReceived int64 `json:"bytes-received"` - BytesSent int64 `json:"bytes-sent"` + PacketsReceived uint32 `json:"packets-received"` + PacketsSent uint32 `json:"packets-sent"` + BytesReceived uint64 `json:"bytes-received"` + BytesSent uint64 `json:"bytes-sent"` // For layer 0 if REMB is enabled REMBBitrate uint32 `json:"remb-bitrate"` @@ -532,6 +533,91 @@ type McuEventHandler interface { UpdateBandwidth(handle uint64, media string, sent api.Bandwidth, received api.Bandwidth) } +type ValueCounter struct { + values map[string]uint64 +} + +func (c *ValueCounter) Update(key string, value uint64) uint64 { + if c.values == nil { + c.values = make(map[string]uint64) + } + + var delta uint64 + prev := c.values[key] + if value == prev { + return 0 + } else if value < prev { + // Wrap around + c.values[key] = 0 + delta = math.MaxUint64 - prev + value + } else { + delta = value - prev + } + + c.values[key] += delta + return delta +} + +type handleStats struct { + codecs map[string]string + + bytesReceived ValueCounter + bytesSent ValueCounter + + nacksReceived ValueCounter + nacksSent ValueCounter + + lostLocal ValueCounter + lostRemote ValueCounter + + retransmissionsReceived ValueCounter +} + +func (h *handleStats) Codec(media string, codec string) { + if h.codecs == nil { + h.codecs = make(map[string]string) + } + if h.codecs[media] != codec { + statsJanusMediaCodecsTotal.WithLabelValues(media, codec).Inc() + h.codecs[media] = codec + } +} + +func (h *handleStats) BytesReceived(media string, bytes uint64) { + delta := h.bytesReceived.Update(media, bytes) + statsJanusMediaBytesTotal.WithLabelValues(media, "incoming").Add(float64(delta)) +} + +func (h *handleStats) BytesSent(media string, bytes uint64) { + delta := h.bytesSent.Update(media, bytes) + statsJanusMediaBytesTotal.WithLabelValues(media, "outgoing").Add(float64(delta)) +} + +func (h *handleStats) NacksReceived(media string, nacks uint64) { + delta := h.nacksReceived.Update(media, nacks) + statsJanusMediaNACKTotal.WithLabelValues(media, "incoming").Add(float64(delta)) +} + +func (h *handleStats) NacksSent(media string, nacks uint64) { + delta := h.nacksSent.Update(media, nacks) + statsJanusMediaNACKTotal.WithLabelValues(media, "outgoing").Add(float64(delta)) +} + +func (h *handleStats) RetransmissionsReceived(media string, retransmissions uint64) { + delta := h.retransmissionsReceived.Update(media, retransmissions) + statsJanusMediaRetransmissionsTotal.WithLabelValues(media).Add(float64(delta)) +} + +func (h *handleStats) LostLocal(media string, lost uint64) { + delta := h.lostLocal.Update(media, lost) + statsJanusMediaLostTotal.WithLabelValues(media, "local").Add(float64(delta)) +} + +func (h *handleStats) LostRemote(media string, lost uint64) { + delta := h.lostRemote.Update(media, lost) + statsJanusMediaLostTotal.WithLabelValues(media, "remote").Add(float64(delta)) +} + type JanusEventsHandler struct { mu sync.Mutex @@ -542,6 +628,9 @@ type JanusEventsHandler struct { addr string agent string + supportsHandles bool + handleStats map[uint64]*handleStats + events chan JanusEvent } @@ -730,6 +819,32 @@ func (h *JanusEventsHandler) processEvents() { } } +func (h *JanusEventsHandler) deleteHandleStats(event JanusEvent) { + if event.HandleId != 0 { + delete(h.handleStats, event.HandleId) + } +} + +func (h *JanusEventsHandler) getHandleStats(event JanusEvent) *handleStats { + if !h.supportsHandles { + // Only create per-handle stats if enabled in Janus. Otherwise the + // handleStats map will never be cleaned up. + return nil + } else if event.HandleId == 0 { + return nil + } + + if h.handleStats == nil { + h.handleStats = make(map[uint64]*handleStats) + } + stats, found := h.handleStats[event.HandleId] + if !found { + stats = &handleStats{} + h.handleStats[event.HandleId] = stats + } + return stats +} + func (h *JanusEventsHandler) processEvent(event JanusEvent) { evt, err := event.Decode() if err != nil { @@ -738,6 +853,13 @@ func (h *JanusEventsHandler) processEvent(event JanusEvent) { } switch evt := evt.(type) { + case *JanusEventHandle: + switch evt.Name { + case "attached": + h.supportsHandles = true + case "detached": + h.deleteHandleStats(event) + } case *JanusEventWebRTCICE: statsJanusICEStateTotal.WithLabelValues(evt.ICE).Inc() case *JanusEventWebRTCDTLS: @@ -766,6 +888,42 @@ func (h *JanusEventsHandler) processEvent(event JanusEvent) { if jitter := evt.JitterRemote; jitter > 0 { statsJanusMediaJitter.WithLabelValues(evt.Media, "remote").Observe(float64(jitter)) } + if codec := evt.Codec; codec != "" { + if stats := h.getHandleStats(event); stats != nil { + stats.Codec(evt.Media, codec) + } + } + if stats := h.getHandleStats(event); stats != nil { + stats.BytesReceived(evt.Media, evt.BytesReceived) + } + if stats := h.getHandleStats(event); stats != nil { + stats.BytesSent(evt.Media, evt.BytesSent) + } + if nacks := evt.NacksReceived; nacks > 0 { + if stats := h.getHandleStats(event); stats != nil { + stats.NacksReceived(evt.Media, uint64(nacks)) + } + } + if nacks := evt.NacksSent; nacks > 0 { + if stats := h.getHandleStats(event); stats != nil { + stats.NacksSent(evt.Media, uint64(nacks)) + } + } + if retransmissions := evt.RetransmissionsReceived; retransmissions > 0 { + if stats := h.getHandleStats(event); stats != nil { + stats.RetransmissionsReceived(evt.Media, uint64(retransmissions)) + } + } + if lost := evt.Lost; lost > 0 { + if stats := h.getHandleStats(event); stats != nil { + stats.LostLocal(evt.Media, uint64(lost)) + } + } + if lost := evt.LostByRemote; lost > 0 { + if stats := h.getHandleStats(event); stats != nil { + stats.LostRemote(evt.Media, uint64(lost)) + } + } h.mcu.UpdateBandwidth(event.HandleId, evt.Media, api.BandwidthFromBytes(uint64(evt.BytesSentLastSec)), api.BandwidthFromBytes(uint64(evt.BytesReceivedLastSec))) } } diff --git a/mcu_janus_events_handler_test.go b/mcu_janus_events_handler_test.go index ace254f..b04d604 100644 --- a/mcu_janus_events_handler_test.go +++ b/mcu_janus_events_handler_test.go @@ -25,6 +25,7 @@ import ( "context" "encoding/json" "fmt" + "math" "net" "net/http" "net/http/httptest" @@ -248,6 +249,7 @@ func (s *janusEventSender) SendSingle(t *testing.T, conn *websocket.Conn) { require.Len(s.events, 1) require.NoError(conn.WriteJSON(s.events[0])) + s.events = nil } func (s *janusEventSender) Send(t *testing.T, conn *websocket.Conn) { @@ -255,6 +257,7 @@ func (s *janusEventSender) Send(t *testing.T, conn *websocket.Conn) { require := require.New(t) require.NoError(conn.WriteJSON(s.events)) + s.events = nil } func (s *janusEventSender) AddEvent(t *testing.T, eventType int, eventSubtype int, handleId uint64, event any) { @@ -493,7 +496,6 @@ func TestJanusEventsHandlerDifferentTypes(t *testing.T) { } func TestJanusEventsHandlerNotGrouped(t *testing.T) { - t.Parallel() require := require.New(t) assert := assert.New(t) @@ -517,16 +519,51 @@ func TestJanusEventsHandlerNotGrouped(t *testing.T) { assert.Equal(JanusEventsSubprotocol, response.Header.Get("Sec-WebSocket-Protocol")) + assertCollectorChangeBy(t, statsJanusMediaNACKTotal.WithLabelValues("audio", "incoming"), 20) + assertCollectorChangeBy(t, statsJanusMediaNACKTotal.WithLabelValues("audio", "outgoing"), 30) + assertCollectorChangeBy(t, statsJanusMediaRetransmissionsTotal.WithLabelValues("audio"), 40) + assertCollectorChangeBy(t, statsJanusMediaLostTotal.WithLabelValues("audio", "local"), 50) + assertCollectorChangeBy(t, statsJanusMediaLostTotal.WithLabelValues("audio", "remote"), 60) + var sender janusEventSender + sender.AddEvent( + t, + JanusEventTypeHandle, + 0, + 1, + JanusEventHandle{ + Name: "attached", + }, + ) + sender.SendSingle(t, conn) sender.AddEvent( t, JanusEventTypeMedia, JanusEventSubTypeMediaStats, 1, JanusEventMediaStats{ - Media: "audio", - BytesSentLastSec: 100, - BytesReceivedLastSec: 200, + Media: "audio", + BytesSentLastSec: 100, + BytesReceivedLastSec: 200, + Codec: "opus", + RTT: 10, + JitterLocal: 11, + JitterRemote: 12, + NacksReceived: 20, + NacksSent: 30, + RetransmissionsReceived: 40, + Lost: 50, + LostByRemote: 60, + }, + ) + sender.SendSingle(t, conn) + sender.AddEvent( + t, + JanusEventTypeHandle, + 0, + 1, + JanusEventHandle{ + Name: "detached", }, ) sender.SendSingle(t, conn) @@ -585,3 +622,19 @@ func TestJanusEventsHandlerGrouped(t *testing.T) { assert.NoError(mcu.WaitForUpdates(ctx, 2)) } + +func TestValueCounter(t *testing.T) { + t.Parallel() + + assert := assert.New(t) + + var c ValueCounter + assert.EqualValues(0, c.Update("foo", 0)) + assert.EqualValues(10, c.Update("foo", 10)) + assert.EqualValues(0, c.Update("foo", 10)) + assert.EqualValues(1, c.Update("foo", 11)) + assert.EqualValues(10, c.Update("bar", 10)) + assert.EqualValues(1, c.Update("bar", 11)) + assert.EqualValues(uint64(math.MaxUint64-10), c.Update("baz", math.MaxUint64-10)) + assert.EqualValues(20, c.Update("baz", 10)) +} diff --git a/mcu_stats_prometheus.go b/mcu_stats_prometheus.go index 2e08092..cb5ecf3 100644 --- a/mcu_stats_prometheus.go +++ b/mcu_stats_prometheus.go @@ -136,6 +136,36 @@ var ( Help: "The jitter of WebRTC media in milliseconds", Buckets: prometheus.ExponentialBucketsRange(1, 2000, 20), }, []string{"media", "origin"}) + statsJanusMediaCodecsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "media_codecs_total", + Help: "The total number of codecs", + }, []string{"media", "codec"}) + statsJanusMediaNACKTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "media_nacks_total", + Help: "The total number of NACKs", + }, []string{"media", "direction"}) + statsJanusMediaRetransmissionsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "media_retransmissions_total", + Help: "The total number of received retransmissions", + }, []string{"media"}) + statsJanusMediaBytesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "media_bytes_total", + Help: "The total number of media bytes sent / received", + }, []string{"media", "direction"}) + statsJanusMediaLostTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "media_lost_total", + Help: "The total number of lost media packets", + }, []string{"media", "origin"}) janusMcuStats = []prometheus.Collector{ statsJanusBandwidthCurrent, @@ -146,6 +176,11 @@ var ( statsJanusSlowLinkTotal, statsJanusMediaRTT, statsJanusMediaJitter, + statsJanusMediaCodecsTotal, + statsJanusMediaNACKTotal, + statsJanusMediaRetransmissionsTotal, + statsJanusMediaBytesTotal, + statsJanusMediaLostTotal, } statsConnectedProxyBackendsCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{ diff --git a/stats_prometheus_test.go b/stats_prometheus_test.go index 7783d26..0516de9 100644 --- a/stats_prometheus_test.go +++ b/stats_prometheus_test.go @@ -42,6 +42,22 @@ func ResetStatsValue[T prometheus.Gauge](t *testing.T, collector T) { }) } +func assertCollectorChangeBy(t *testing.T, collector prometheus.Collector, delta float64) { + t.Helper() + + ch := make(chan *prometheus.Desc, 1) + collector.Describe(ch) + desc := <-ch + + before := testutil.ToFloat64(collector) + t.Cleanup(func() { + t.Helper() + + after := testutil.ToFloat64(collector) + assert.EqualValues(t, delta, after-before, "failed for %s", desc) + }) +} + func checkStatsValue(t *testing.T, collector prometheus.Collector, value float64) { // Make sure test is not executed with "t.Parallel()" t.Setenv("PARALLEL_CHECK", "1") From 10e55ff2410deb16ba58108a9a25f7662cf16551 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 13 Nov 2025 23:17:46 +0100 Subject: [PATCH 8/8] Lint Janus metrics. --- mcu_janus_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mcu_janus_test.go b/mcu_janus_test.go index fb3e3ef..a818f35 100644 --- a/mcu_janus_test.go +++ b/mcu_janus_test.go @@ -39,6 +39,10 @@ import ( "github.com/strukturag/nextcloud-spreed-signaling/api" ) +func TestMcuJanusStats(t *testing.T) { + collectAndLint(t, janusMcuStats...) +} + type TestJanusHandle struct { id uint64