diff --git a/README.md b/README.md index 7cc8b83..7398c48 100644 --- a/README.md +++ b/README.md @@ -154,7 +154,7 @@ websocket events handler (`janus.eventhandler.wsevh.jcfg`), the module must be enabled by setting `enabled` to `true`, the `backend` must be set to the websocket url of the signaling server (or signaling proxy) and `subprotocol` must be set to `janus-events`. -At least events of type `media` and `webrtc` must be subscribed. +At least events of type `handles`, `media` and `webrtc` must be subscribed. Edit the `server.conf` and enter the URL to the websocket endpoint of Janus in the section `[mcu]` and key `url`. During startup, the signaling server will diff --git a/docs/prometheus-metrics.md b/docs/prometheus-metrics.md index c8443df..3016d6a 100644 --- a/docs/prometheus-metrics.md +++ b/docs/prometheus-metrics.md @@ -67,3 +67,8 @@ The following metrics are available: | `signaling_mcu_slow_link_total` | Counter | 2.0.5 | Total number of slow link events | `media`, `direction` | | `signaling_mcu_media_rtt` | Histogram | 2.0.5 | The roundtrip time of WebRTC media in milliseconds | `media` | | `signaling_mcu_media_jitter` | Histogram | 2.0.5 | The jitter of WebRTC media in milliseconds | `media`, `origin` | +| `signaling_mcu_media_codecs_total` | Counter | 2.0.5 | The total number of codecs | `media`, `codec` | +| `signaling_mcu_media_nacks_total` | Counter | 2.0.5 | The total number of NACKs | `media`, `direction` | +| `signaling_mcu_media_retransmissions_total` | Counter | 2.0.5 | The total number of received retransmissions | `media` | +| `signaling_mcu_media_bytes_total` | Counter | 2.0.5 | The total number of media bytes sent / received | `media`, `direction` | +| `signaling_mcu_media_lost_total` | Counter | 2.0.5 | The total number of lost media packets | `media`, `origin` | diff --git a/mcu_janus_events_handler.go b/mcu_janus_events_handler.go index b488548..7d0ddc1 100644 --- a/mcu_janus_events_handler.go +++ b/mcu_janus_events_handler.go @@ -27,6 +27,7 @@ import ( "errors" "fmt" "log" + "math" "net" "strconv" "strings" @@ -374,10 +375,10 @@ type JanusEventMediaStats struct { RTTValues *JanusMediaStatsRTTValues `json:"rtt-values,omitempty"` // For all media on all layers - PacketsReceived int32 `json:"packets-received"` - PacketsSent int32 `json:"packets-sent"` - BytesReceived int64 `json:"bytes-received"` - BytesSent int64 `json:"bytes-sent"` + PacketsReceived uint32 `json:"packets-received"` + PacketsSent uint32 `json:"packets-sent"` + BytesReceived uint64 `json:"bytes-received"` + BytesSent uint64 `json:"bytes-sent"` // For layer 0 if REMB is enabled REMBBitrate uint32 `json:"remb-bitrate"` @@ -532,6 +533,91 @@ type McuEventHandler interface { UpdateBandwidth(handle uint64, media string, sent api.Bandwidth, received api.Bandwidth) } +type ValueCounter struct { + values map[string]uint64 +} + +func (c *ValueCounter) Update(key string, value uint64) uint64 { + if c.values == nil { + c.values = make(map[string]uint64) + } + + var delta uint64 + prev := c.values[key] + if value == prev { + return 0 + } else if value < prev { + // Wrap around + c.values[key] = 0 + delta = math.MaxUint64 - prev + value + } else { + delta = value - prev + } + + c.values[key] += delta + return delta +} + +type handleStats struct { + codecs map[string]string + + bytesReceived ValueCounter + bytesSent ValueCounter + + nacksReceived ValueCounter + nacksSent ValueCounter + + lostLocal ValueCounter + lostRemote ValueCounter + + retransmissionsReceived ValueCounter +} + +func (h *handleStats) Codec(media string, codec string) { + if h.codecs == nil { + h.codecs = make(map[string]string) + } + if h.codecs[media] != codec { + statsJanusMediaCodecsTotal.WithLabelValues(media, codec).Inc() + h.codecs[media] = codec + } +} + +func (h *handleStats) BytesReceived(media string, bytes uint64) { + delta := h.bytesReceived.Update(media, bytes) + statsJanusMediaBytesTotal.WithLabelValues(media, "incoming").Add(float64(delta)) +} + +func (h *handleStats) BytesSent(media string, bytes uint64) { + delta := h.bytesSent.Update(media, bytes) + statsJanusMediaBytesTotal.WithLabelValues(media, "outgoing").Add(float64(delta)) +} + +func (h *handleStats) NacksReceived(media string, nacks uint64) { + delta := h.nacksReceived.Update(media, nacks) + statsJanusMediaNACKTotal.WithLabelValues(media, "incoming").Add(float64(delta)) +} + +func (h *handleStats) NacksSent(media string, nacks uint64) { + delta := h.nacksSent.Update(media, nacks) + statsJanusMediaNACKTotal.WithLabelValues(media, "outgoing").Add(float64(delta)) +} + +func (h *handleStats) RetransmissionsReceived(media string, retransmissions uint64) { + delta := h.retransmissionsReceived.Update(media, retransmissions) + statsJanusMediaRetransmissionsTotal.WithLabelValues(media).Add(float64(delta)) +} + +func (h *handleStats) LostLocal(media string, lost uint64) { + delta := h.lostLocal.Update(media, lost) + statsJanusMediaLostTotal.WithLabelValues(media, "local").Add(float64(delta)) +} + +func (h *handleStats) LostRemote(media string, lost uint64) { + delta := h.lostRemote.Update(media, lost) + statsJanusMediaLostTotal.WithLabelValues(media, "remote").Add(float64(delta)) +} + type JanusEventsHandler struct { mu sync.Mutex @@ -542,6 +628,9 @@ type JanusEventsHandler struct { addr string agent string + supportsHandles bool + handleStats map[uint64]*handleStats + events chan JanusEvent } @@ -730,6 +819,32 @@ func (h *JanusEventsHandler) processEvents() { } } +func (h *JanusEventsHandler) deleteHandleStats(event JanusEvent) { + if event.HandleId != 0 { + delete(h.handleStats, event.HandleId) + } +} + +func (h *JanusEventsHandler) getHandleStats(event JanusEvent) *handleStats { + if !h.supportsHandles { + // Only create per-handle stats if enabled in Janus. Otherwise the + // handleStats map will never be cleaned up. + return nil + } else if event.HandleId == 0 { + return nil + } + + if h.handleStats == nil { + h.handleStats = make(map[uint64]*handleStats) + } + stats, found := h.handleStats[event.HandleId] + if !found { + stats = &handleStats{} + h.handleStats[event.HandleId] = stats + } + return stats +} + func (h *JanusEventsHandler) processEvent(event JanusEvent) { evt, err := event.Decode() if err != nil { @@ -738,6 +853,13 @@ func (h *JanusEventsHandler) processEvent(event JanusEvent) { } switch evt := evt.(type) { + case *JanusEventHandle: + switch evt.Name { + case "attached": + h.supportsHandles = true + case "detached": + h.deleteHandleStats(event) + } case *JanusEventWebRTCICE: statsJanusICEStateTotal.WithLabelValues(evt.ICE).Inc() case *JanusEventWebRTCDTLS: @@ -766,6 +888,42 @@ func (h *JanusEventsHandler) processEvent(event JanusEvent) { if jitter := evt.JitterRemote; jitter > 0 { statsJanusMediaJitter.WithLabelValues(evt.Media, "remote").Observe(float64(jitter)) } + if codec := evt.Codec; codec != "" { + if stats := h.getHandleStats(event); stats != nil { + stats.Codec(evt.Media, codec) + } + } + if stats := h.getHandleStats(event); stats != nil { + stats.BytesReceived(evt.Media, evt.BytesReceived) + } + if stats := h.getHandleStats(event); stats != nil { + stats.BytesSent(evt.Media, evt.BytesSent) + } + if nacks := evt.NacksReceived; nacks > 0 { + if stats := h.getHandleStats(event); stats != nil { + stats.NacksReceived(evt.Media, uint64(nacks)) + } + } + if nacks := evt.NacksSent; nacks > 0 { + if stats := h.getHandleStats(event); stats != nil { + stats.NacksSent(evt.Media, uint64(nacks)) + } + } + if retransmissions := evt.RetransmissionsReceived; retransmissions > 0 { + if stats := h.getHandleStats(event); stats != nil { + stats.RetransmissionsReceived(evt.Media, uint64(retransmissions)) + } + } + if lost := evt.Lost; lost > 0 { + if stats := h.getHandleStats(event); stats != nil { + stats.LostLocal(evt.Media, uint64(lost)) + } + } + if lost := evt.LostByRemote; lost > 0 { + if stats := h.getHandleStats(event); stats != nil { + stats.LostRemote(evt.Media, uint64(lost)) + } + } h.mcu.UpdateBandwidth(event.HandleId, evt.Media, api.BandwidthFromBytes(uint64(evt.BytesSentLastSec)), api.BandwidthFromBytes(uint64(evt.BytesReceivedLastSec))) } } diff --git a/mcu_janus_events_handler_test.go b/mcu_janus_events_handler_test.go index ace254f..b04d604 100644 --- a/mcu_janus_events_handler_test.go +++ b/mcu_janus_events_handler_test.go @@ -25,6 +25,7 @@ import ( "context" "encoding/json" "fmt" + "math" "net" "net/http" "net/http/httptest" @@ -248,6 +249,7 @@ func (s *janusEventSender) SendSingle(t *testing.T, conn *websocket.Conn) { require.Len(s.events, 1) require.NoError(conn.WriteJSON(s.events[0])) + s.events = nil } func (s *janusEventSender) Send(t *testing.T, conn *websocket.Conn) { @@ -255,6 +257,7 @@ func (s *janusEventSender) Send(t *testing.T, conn *websocket.Conn) { require := require.New(t) require.NoError(conn.WriteJSON(s.events)) + s.events = nil } func (s *janusEventSender) AddEvent(t *testing.T, eventType int, eventSubtype int, handleId uint64, event any) { @@ -493,7 +496,6 @@ func TestJanusEventsHandlerDifferentTypes(t *testing.T) { } func TestJanusEventsHandlerNotGrouped(t *testing.T) { - t.Parallel() require := require.New(t) assert := assert.New(t) @@ -517,16 +519,51 @@ func TestJanusEventsHandlerNotGrouped(t *testing.T) { assert.Equal(JanusEventsSubprotocol, response.Header.Get("Sec-WebSocket-Protocol")) + assertCollectorChangeBy(t, statsJanusMediaNACKTotal.WithLabelValues("audio", "incoming"), 20) + assertCollectorChangeBy(t, statsJanusMediaNACKTotal.WithLabelValues("audio", "outgoing"), 30) + assertCollectorChangeBy(t, statsJanusMediaRetransmissionsTotal.WithLabelValues("audio"), 40) + assertCollectorChangeBy(t, statsJanusMediaLostTotal.WithLabelValues("audio", "local"), 50) + assertCollectorChangeBy(t, statsJanusMediaLostTotal.WithLabelValues("audio", "remote"), 60) + var sender janusEventSender + sender.AddEvent( + t, + JanusEventTypeHandle, + 0, + 1, + JanusEventHandle{ + Name: "attached", + }, + ) + sender.SendSingle(t, conn) sender.AddEvent( t, JanusEventTypeMedia, JanusEventSubTypeMediaStats, 1, JanusEventMediaStats{ - Media: "audio", - BytesSentLastSec: 100, - BytesReceivedLastSec: 200, + Media: "audio", + BytesSentLastSec: 100, + BytesReceivedLastSec: 200, + Codec: "opus", + RTT: 10, + JitterLocal: 11, + JitterRemote: 12, + NacksReceived: 20, + NacksSent: 30, + RetransmissionsReceived: 40, + Lost: 50, + LostByRemote: 60, + }, + ) + sender.SendSingle(t, conn) + sender.AddEvent( + t, + JanusEventTypeHandle, + 0, + 1, + JanusEventHandle{ + Name: "detached", }, ) sender.SendSingle(t, conn) @@ -585,3 +622,19 @@ func TestJanusEventsHandlerGrouped(t *testing.T) { assert.NoError(mcu.WaitForUpdates(ctx, 2)) } + +func TestValueCounter(t *testing.T) { + t.Parallel() + + assert := assert.New(t) + + var c ValueCounter + assert.EqualValues(0, c.Update("foo", 0)) + assert.EqualValues(10, c.Update("foo", 10)) + assert.EqualValues(0, c.Update("foo", 10)) + assert.EqualValues(1, c.Update("foo", 11)) + assert.EqualValues(10, c.Update("bar", 10)) + assert.EqualValues(1, c.Update("bar", 11)) + assert.EqualValues(uint64(math.MaxUint64-10), c.Update("baz", math.MaxUint64-10)) + assert.EqualValues(20, c.Update("baz", 10)) +} diff --git a/mcu_stats_prometheus.go b/mcu_stats_prometheus.go index 2e08092..cb5ecf3 100644 --- a/mcu_stats_prometheus.go +++ b/mcu_stats_prometheus.go @@ -136,6 +136,36 @@ var ( Help: "The jitter of WebRTC media in milliseconds", Buckets: prometheus.ExponentialBucketsRange(1, 2000, 20), }, []string{"media", "origin"}) + statsJanusMediaCodecsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "media_codecs_total", + Help: "The total number of codecs", + }, []string{"media", "codec"}) + statsJanusMediaNACKTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "media_nacks_total", + Help: "The total number of NACKs", + }, []string{"media", "direction"}) + statsJanusMediaRetransmissionsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "media_retransmissions_total", + Help: "The total number of received retransmissions", + }, []string{"media"}) + statsJanusMediaBytesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "media_bytes_total", + Help: "The total number of media bytes sent / received", + }, []string{"media", "direction"}) + statsJanusMediaLostTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "signaling", + Subsystem: "mcu", + Name: "media_lost_total", + Help: "The total number of lost media packets", + }, []string{"media", "origin"}) janusMcuStats = []prometheus.Collector{ statsJanusBandwidthCurrent, @@ -146,6 +176,11 @@ var ( statsJanusSlowLinkTotal, statsJanusMediaRTT, statsJanusMediaJitter, + statsJanusMediaCodecsTotal, + statsJanusMediaNACKTotal, + statsJanusMediaRetransmissionsTotal, + statsJanusMediaBytesTotal, + statsJanusMediaLostTotal, } statsConnectedProxyBackendsCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{ diff --git a/stats_prometheus_test.go b/stats_prometheus_test.go index 7783d26..0516de9 100644 --- a/stats_prometheus_test.go +++ b/stats_prometheus_test.go @@ -42,6 +42,22 @@ func ResetStatsValue[T prometheus.Gauge](t *testing.T, collector T) { }) } +func assertCollectorChangeBy(t *testing.T, collector prometheus.Collector, delta float64) { + t.Helper() + + ch := make(chan *prometheus.Desc, 1) + collector.Describe(ch) + desc := <-ch + + before := testutil.ToFloat64(collector) + t.Cleanup(func() { + t.Helper() + + after := testutil.ToFloat64(collector) + assert.EqualValues(t, delta, after-before, "failed for %s", desc) + }) +} + func checkStatsValue(t *testing.T, collector prometheus.Collector, value float64) { // Make sure test is not executed with "t.Parallel()" t.Setenv("PARALLEL_CHECK", "1")