Merge pull request #1183 from strukturag/sessions-in-call-metrics

Add more metrics about sessions in calls.
This commit is contained in:
Joachim Bauch 2026-01-29 10:13:10 +01:00 committed by GitHub
commit 66e2d73ee5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 279 additions and 24 deletions

View file

@ -74,3 +74,6 @@ The following metrics are available:
| `signaling_mcu_media_lost_total` | Counter | 2.0.5 | The total number of lost media packets | `media`, `origin` |
| `signaling_client_bytes_total` | Counter | 2.0.5 | The total number of bytes sent to or received by clients | `direction` |
| `signaling_client_messages_total` | Counter | 2.0.5 | The total number of messages sent to or received by clients | `direction` |
| `signaling_call_sessions` | Gauge | 2.0.5 | The current number of sessions in a call | `backend`, `room`, `clienttype` |
| `signaling_call_sessions_total` | Counter | 2.0.5 | The total number of sessions in a call | `backend`, `clienttype` |
| `signaling_call_rooms_total` | Counter | 2.0.5 | The total number of rooms with an active call | `backend` |

View file

@ -94,6 +94,12 @@ type Room struct {
// +checklocks:mu
statsRoomSessionsCurrent *prometheus.GaugeVec
// +checklocks:mu
statsCallSessionsCurrent *prometheus.GaugeVec
// +checklocks:mu
statsCallSessionsTotal *prometheus.CounterVec
// +checklocks:mu
statsCallRoomsTotal prometheus.Counter
// Users currently in the room
users []api.StringMap
@ -136,6 +142,14 @@ func NewRoom(roomId string, properties json.RawMessage, hub *Hub, asyncEvents ev
"backend": backend.Id(),
"room": roomId,
}),
statsCallSessionsCurrent: statsCallSessionsCurrent.MustCurryWith(prometheus.Labels{
"backend": backend.Id(),
"room": roomId,
}),
statsCallSessionsTotal: statsCallSessionsTotal.MustCurryWith(prometheus.Labels{
"backend": backend.Id(),
}),
statsCallRoomsTotal: statsCallRoomsTotal.WithLabelValues(backend.Id()),
lastRoomRequests: make(map[string]int64),
@ -223,6 +237,7 @@ func (r *Room) Close() []Session {
r.hub.removeRoom(r)
r.doClose()
r.mu.Lock()
defer r.mu.Unlock()
r.unsubscribeBackend()
result := make([]Session, 0, len(r.sessions))
for _, s := range r.sessions {
@ -230,9 +245,10 @@ func (r *Room) Close() []Session {
}
r.sessions = nil
r.statsRoomSessionsCurrent.Delete(prometheus.Labels{"clienttype": string(api.HelloClientTypeClient)})
r.statsRoomSessionsCurrent.Delete(prometheus.Labels{"clienttype": string(api.HelloClientTypeFederation)})
r.statsRoomSessionsCurrent.Delete(prometheus.Labels{"clienttype": string(api.HelloClientTypeInternal)})
r.statsRoomSessionsCurrent.Delete(prometheus.Labels{"clienttype": string(api.HelloClientTypeVirtual)})
r.mu.Unlock()
r.clearInCallStats()
return result
}
@ -480,16 +496,69 @@ func (r *Room) notifySessionJoined(sessionId api.PublicSessionId) {
func (r *Room) HasSession(session Session) bool {
r.mu.RLock()
defer r.mu.RUnlock()
_, result := r.sessions[session.PublicId()]
r.mu.RUnlock()
return result
}
func (r *Room) IsSessionInCall(session Session) bool {
r.mu.RLock()
_, result := r.inCallSessions[session]
r.mu.RUnlock()
return result
defer r.mu.RUnlock()
return r.inCallSessions[session]
}
// +checklocks:r.mu
func (r *Room) clearInCallStats() {
r.statsCallSessionsCurrent.Delete(prometheus.Labels{"clienttype": string(api.HelloClientTypeClient)})
r.statsCallSessionsCurrent.Delete(prometheus.Labels{"clienttype": string(api.HelloClientTypeFederation)})
r.statsCallSessionsCurrent.Delete(prometheus.Labels{"clienttype": string(api.HelloClientTypeInternal)})
r.statsCallSessionsCurrent.Delete(prometheus.Labels{"clienttype": string(api.HelloClientTypeVirtual)})
}
func (r *Room) addSessionToCall(session Session) bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.addSessionToCallLocked(session)
}
// +checklocks:r.mu
func (r *Room) addSessionToCallLocked(session Session) bool {
if r.inCallSessions[session] {
return false
}
if len(r.inCallSessions) == 0 {
r.statsCallRoomsTotal.Inc()
}
r.inCallSessions[session] = true
r.statsCallSessionsCurrent.WithLabelValues(string(session.ClientType())).Inc()
r.statsCallSessionsTotal.WithLabelValues(string(session.ClientType())).Inc()
return true
}
func (r *Room) removeSessionFromCall(session Session) bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.removeSessionFromCallLocked(session)
}
// +checklocks:r.mu
func (r *Room) removeSessionFromCallLocked(session Session) bool {
if !r.inCallSessions[session] {
return false
}
delete(r.inCallSessions, session)
if len(r.inCallSessions) == 0 {
r.clearInCallStats()
} else {
r.statsCallSessionsCurrent.WithLabelValues(string(session.ClientType())).Dec()
}
return true
}
// Returns "true" if there are still clients in the room.
@ -520,7 +589,7 @@ func (r *Room) RemoveSession(session Session) bool {
delete(r.internalSessions, clientSession)
r.transientData.RemoveListener(clientSession)
}
delete(r.inCallSessions, session)
r.removeSessionFromCallLocked(session)
delete(r.roomSessionData, sid)
if len(r.sessions) > 0 {
r.mu.Unlock()
@ -829,16 +898,11 @@ func (r *Room) PublishUsersInCallChanged(changed []api.StringMap, users []api.St
}
if inCall {
r.mu.Lock()
if !r.inCallSessions[session] {
r.inCallSessions[session] = true
if r.addSessionToCall(session) {
r.logger.Printf("Session %s joined call %s", session.PublicId(), r.id)
}
r.mu.Unlock()
} else {
r.mu.Lock()
delete(r.inCallSessions, session)
r.mu.Unlock()
r.removeSessionFromCall(session)
if clientSession, ok := session.(*ClientSession); ok {
clientSession.LeaveCall()
}
@ -884,8 +948,7 @@ func (r *Room) PublishUsersInCallChangedAll(inCall int) {
continue
}
if !r.inCallSessions[session] {
r.inCallSessions[session] = true
if r.addSessionToCallLocked(session) {
joined = append(joined, session.PublicId())
}
notify = append(notify, clientSession)
@ -925,7 +988,8 @@ func (r *Room) PublishUsersInCallChangedAll(inCall int) {
}
}
close(ch)
r.inCallSessions = make(map[Session]bool)
clear(r.inCallSessions)
r.clearInCallStats()
} else {
// All sessions already left the call, no need to notify.
return
@ -1021,16 +1085,11 @@ func (r *Room) NotifySessionChanged(session Session, flags SessionChangeFlag) {
if joinLeave != 0 {
switch joinLeave {
case 1:
r.mu.Lock()
if !r.inCallSessions[session] {
r.inCallSessions[session] = true
if r.addSessionToCall(session) {
r.logger.Printf("Session %s joined call %s", session.PublicId(), r.id)
}
r.mu.Unlock()
case 2:
r.mu.Lock()
delete(r.inCallSessions, session)
r.mu.Unlock()
r.removeSessionFromCall(session)
if clientSession, ok := session.(*ClientSession); ok {
clientSession.LeaveCall()
}

View file

@ -34,9 +34,30 @@ var (
Name: "sessions",
Help: "The current number of sessions in a room",
}, []string{"backend", "room", "clienttype"})
statsCallSessionsCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "call",
Name: "sessions",
Help: "The current number of sessions in a call",
}, []string{"backend", "room", "clienttype"})
statsCallSessionsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "call",
Name: "sessions_total",
Help: "The total number of sessions in a call",
}, []string{"backend", "clienttype"})
statsCallRoomsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "call",
Name: "rooms_total",
Help: "The total number of rooms with an active call",
}, []string{"backend"})
roomStats = []prometheus.Collector{
statsRoomSessionsCurrent,
statsCallSessionsCurrent,
statsCallSessionsTotal,
statsCallRoomsTotal,
}
)

View file

@ -35,12 +35,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/log"
logtest "github.com/strukturag/nextcloud-spreed-signaling/log/test"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
func TestRoom_InCall(t *testing.T) {
func TestRoom_InCallFlag(t *testing.T) {
t.Parallel()
type Testcase struct {
Value any
@ -354,6 +355,153 @@ func TestRoom_RoomSessionData(t *testing.T) {
wg.Wait()
}
func TestRoom_InCall(t *testing.T) {
t.Parallel()
logger := logtest.NewLoggerForTest(t)
ctx := log.NewLoggerContext(t.Context(), logger)
require := require.New(t)
assert := assert.New(t)
hub, _, router, server := CreateHubForTest(t)
config, err := getTestConfig(server)
require.NoError(err)
b, err := NewBackendServer(ctx, config, hub, "no-version")
require.NoError(err)
require.NoError(b.Start(router))
ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()
client1, hello1 := NewTestClientWithHello(ctx, t, server, hub, testDefaultUserId+"1")
client2, hello2 := NewTestClientWithHello(ctx, t, server, hub, testDefaultUserId+"2")
// Join room by id.
roomId := "test-room"
roomMsg := MustSucceed2(t, client1.JoinRoom, ctx, roomId)
require.Equal(roomId, roomMsg.Room.RoomId)
client1.RunUntilJoined(ctx, hello1.Hello)
roomMsg = MustSucceed2(t, client2.JoinRoom, ctx, roomId)
require.Equal(roomId, roomMsg.Room.RoomId)
client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello)
client1.RunUntilJoined(ctx, hello2.Hello)
msg1 := &talk.BackendServerRoomRequest{
Type: "incall",
InCall: &talk.BackendRoomInCallRequest{
InCall: json.RawMessage(strconv.FormatInt(FlagInCall, 10)),
Changed: []api.StringMap{
{
"sessionId": fmt.Sprintf("%s-%s", roomId, hello1.Hello.SessionId),
"inCall": json.RawMessage(strconv.FormatInt(FlagInCall, 10)),
},
},
Users: []api.StringMap{
{
"sessionId": fmt.Sprintf("%s-%s", roomId, hello1.Hello.SessionId),
"inCall": json.RawMessage(strconv.FormatInt(FlagInCall, 10)),
},
{
"sessionId": fmt.Sprintf("%s-%s", roomId, hello2.Hello.SessionId),
"inCall": json.RawMessage(strconv.FormatInt(0, 10)),
},
},
},
}
data1, err := json.Marshal(msg1)
require.NoError(err)
res1, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data1)
require.NoError(err)
defer res1.Body.Close()
body1, err := io.ReadAll(res1.Body)
assert.NoError(err)
assert.Equal(http.StatusOK, res1.StatusCode, "Expected successful request, got %s", string(body1))
if msg, ok := client1.RunUntilMessage(ctx); ok {
if message, ok := checkMessageParticipantsInCall(t, msg); ok {
assert.Equal(roomId, message.RoomId)
if assert.Len(message.Users, 2) {
assert.EqualValues(hello1.Hello.SessionId, message.Users[0]["sessionId"])
assert.EqualValues(FlagInCall, message.Users[0]["inCall"])
assert.EqualValues(hello2.Hello.SessionId, message.Users[1]["sessionId"])
assert.EqualValues(0, message.Users[1]["inCall"])
}
}
}
if msg, ok := client2.RunUntilMessage(ctx); ok {
if message, ok := checkMessageParticipantsInCall(t, msg); ok {
assert.Equal(roomId, message.RoomId)
if assert.Len(message.Users, 2) {
assert.EqualValues(hello1.Hello.SessionId, message.Users[0]["sessionId"])
assert.EqualValues(FlagInCall, message.Users[0]["inCall"])
assert.EqualValues(hello2.Hello.SessionId, message.Users[1]["sessionId"])
assert.EqualValues(0, message.Users[1]["inCall"])
}
}
}
msg2 := &talk.BackendServerRoomRequest{
Type: "incall",
InCall: &talk.BackendRoomInCallRequest{
InCall: json.RawMessage(strconv.FormatInt(0, 10)),
Changed: []api.StringMap{
{
"sessionId": fmt.Sprintf("%s-%s", roomId, hello1.Hello.SessionId),
"inCall": json.RawMessage(strconv.FormatInt(0, 10)),
},
},
Users: []api.StringMap{
{
"sessionId": fmt.Sprintf("%s-%s", roomId, hello1.Hello.SessionId),
"inCall": json.RawMessage(strconv.FormatInt(0, 10)),
},
{
"sessionId": fmt.Sprintf("%s-%s", roomId, hello2.Hello.SessionId),
"inCall": json.RawMessage(strconv.FormatInt(0, 10)),
},
},
},
}
data2, err := json.Marshal(msg2)
require.NoError(err)
res2, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data2)
require.NoError(err)
defer res2.Body.Close()
body2, err := io.ReadAll(res2.Body)
assert.NoError(err)
assert.Equal(http.StatusOK, res2.StatusCode, "Expected successful request, got %s", string(body2))
if msg, ok := client1.RunUntilMessage(ctx); ok {
if message, ok := checkMessageParticipantsInCall(t, msg); ok {
assert.Equal(roomId, message.RoomId)
if assert.Len(message.Users, 2) {
assert.EqualValues(hello1.Hello.SessionId, message.Users[0]["sessionId"])
assert.EqualValues(0, message.Users[0]["inCall"])
assert.EqualValues(hello2.Hello.SessionId, message.Users[1]["sessionId"])
assert.EqualValues(0, message.Users[1]["inCall"])
}
}
}
if msg, ok := client2.RunUntilMessage(ctx); ok {
if message, ok := checkMessageParticipantsInCall(t, msg); ok {
assert.Equal(roomId, message.RoomId)
if assert.Len(message.Users, 2) {
assert.EqualValues(hello1.Hello.SessionId, message.Users[0]["sessionId"])
assert.EqualValues(0, message.Users[0]["inCall"])
assert.EqualValues(hello2.Hello.SessionId, message.Users[1]["sessionId"])
assert.EqualValues(0, message.Users[1]["inCall"])
}
}
}
}
func TestRoom_InCallAll(t *testing.T) {
t.Parallel()
logger := logtest.NewLoggerForTest(t)

View file

@ -589,6 +589,30 @@ func TestVirtualSessionCustomInCall(t *testing.T) {
checkHasEntryWithInCall(t, updateMsg, sessionId, "virtual", newInCall)
checkHasEntryWithInCall(t, updateMsg, helloInternal.Hello.SessionId, "internal", FlagInCall|FlagWithAudio)
}
newInCall2 := FlagDisconnected
msgInCall3 := &api.ClientMessage{
Type: "internal",
Internal: &api.InternalClientMessage{
Type: "updatesession",
UpdateSession: &api.UpdateSessionInternalClientMessage{
CommonSessionInternalClientMessage: api.CommonSessionInternalClientMessage{
SessionId: internalSessionId,
RoomId: roomId,
},
InCall: &newInCall2,
},
},
}
require.NoError(clientInternal.WriteJSON(msgInCall3))
msg6 := MustSucceed1(t, client.RunUntilMessage, ctx)
if updateMsg, ok := checkMessageParticipantsInCall(t, msg6); ok {
assert.Equal(roomId, updateMsg.RoomId)
assert.Len(updateMsg.Users, 2)
checkHasEntryWithInCall(t, updateMsg, sessionId, "virtual", newInCall2)
checkHasEntryWithInCall(t, updateMsg, helloInternal.Hello.SessionId, "internal", FlagInCall|FlagWithAudio)
}
}
func TestVirtualSessionCleanup(t *testing.T) {