Run Janus bandwidth tests in parallel.

This commit is contained in:
Joachim Bauch 2025-12-09 09:51:38 +01:00
commit 415a49e04b
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
2 changed files with 47 additions and 20 deletions

View file

@ -333,7 +333,22 @@ func (m *mcuJanus) Bandwidth() (result *McuClientBandwidthInfo) {
return
}
func (m *mcuJanus) updateBandwidthStats() {
type janusBandwidthStats interface {
SetBandwidth(incoming uint64, outgoing uint64)
}
type prometheusJanusBandwidthStats struct{}
func (s *prometheusJanusBandwidthStats) SetBandwidth(incoming uint64, outgoing uint64) {
statsJanusBandwidthCurrent.WithLabelValues("incoming").Set(float64(incoming))
statsJanusBandwidthCurrent.WithLabelValues("outgoing").Set(float64(outgoing))
}
var (
defaultJanusBandwidthStats = &prometheusJanusBandwidthStats{}
)
func (m *mcuJanus) updateBandwidthStats(stats janusBandwidthStats) {
if info := m.info.Load(); info != nil {
if !info.EventHandlers {
// Event handlers are disabled, no stats will be available.
@ -346,12 +361,14 @@ func (m *mcuJanus) updateBandwidthStats() {
}
}
if stats == nil {
stats = defaultJanusBandwidthStats
}
if bandwidth := m.Bandwidth(); bandwidth != nil {
statsJanusBandwidthCurrent.WithLabelValues("incoming").Set(float64(bandwidth.Received.Bytes()))
statsJanusBandwidthCurrent.WithLabelValues("outgoing").Set(float64(bandwidth.Sent.Bytes()))
stats.SetBandwidth(bandwidth.Received.Bytes(), bandwidth.Sent.Bytes())
} else {
statsJanusBandwidthCurrent.WithLabelValues("incoming").Set(0)
statsJanusBandwidthCurrent.WithLabelValues("outgoing").Set(0)
stats.SetBandwidth(0, 0)
}
}
@ -524,7 +541,7 @@ loop:
case <-ticker.C:
m.sendKeepalive(context.Background())
case <-bandwidthTicker.C:
m.updateBandwidthStats()
m.updateBandwidthStats(nil)
case <-m.closeChan:
break loop
}

View file

@ -1081,10 +1081,20 @@ func Test_JanusPublisherGetStreamsAudioVideo(t *testing.T) {
}
}
func Test_JanusPublisherSubscriber(t *testing.T) { // nolint:paralleltest
ResetStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("incoming"))
ResetStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("outgoing"))
type mockBandwidthStats struct {
incoming uint64
outgoing uint64
}
func (s *mockBandwidthStats) SetBandwidth(incoming uint64, outgoing uint64) {
s.incoming = incoming
s.outgoing = outgoing
}
func Test_JanusPublisherSubscriber(t *testing.T) {
t.Parallel()
stats := &mockBandwidthStats{}
require := require.New(t)
assert := assert.New(t)
@ -1096,9 +1106,9 @@ func Test_JanusPublisherSubscriber(t *testing.T) { // nolint:paralleltest
// Bandwidth for unknown handles is ignored.
mcu.UpdateBandwidth(1234, "video", api.BandwidthFromBytes(100), api.BandwidthFromBytes(200))
mcu.updateBandwidthStats()
checkStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("incoming"), 0)
checkStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("outgoing"), 0)
mcu.updateBandwidthStats(stats)
assert.EqualValues(0, stats.incoming)
assert.EqualValues(0, stats.outgoing)
pubId := PublicSessionId("publisher-id")
listener1 := &TestMcuListener{
@ -1128,9 +1138,9 @@ func Test_JanusPublisherSubscriber(t *testing.T) { // nolint:paralleltest
assert.Equal(api.BandwidthFromBytes(1000), bw.Sent)
assert.Equal(api.BandwidthFromBytes(2000), bw.Received)
}
mcu.updateBandwidthStats()
checkStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("incoming"), 2000)
checkStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("outgoing"), 1000)
mcu.updateBandwidthStats(stats)
assert.EqualValues(2000, stats.incoming)
assert.EqualValues(1000, stats.outgoing)
listener2 := &TestMcuListener{
id: pubId,
@ -1156,11 +1166,11 @@ func Test_JanusPublisherSubscriber(t *testing.T) { // nolint:paralleltest
assert.Equal(api.BandwidthFromBytes(4000), bw.Sent)
assert.Equal(api.BandwidthFromBytes(6000), bw.Received)
}
checkStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("incoming"), 2000)
checkStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("outgoing"), 1000)
mcu.updateBandwidthStats()
checkStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("incoming"), 6000)
checkStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("outgoing"), 4000)
assert.EqualValues(2000, stats.incoming)
assert.EqualValues(1000, stats.outgoing)
mcu.updateBandwidthStats(stats)
assert.EqualValues(6000, stats.incoming)
assert.EqualValues(4000, stats.outgoing)
}
func Test_JanusSubscriberPublisher(t *testing.T) {