Merge pull request #1153 from strukturag/fix-flaky-tests

Fix flaky tests that fail under load.
This commit is contained in:
Joachim Bauch 2025-12-11 11:21:36 +01:00 committed by GitHub
commit 20d09941b9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 29 additions and 3 deletions

View file

@ -83,3 +83,21 @@ func getLoopbackAsyncEventsForTest(t *testing.T) AsyncEvents {
})
return events
}
func waitForAsyncEventsFlushed(ctx context.Context, t *testing.T, events AsyncEvents) {
t.Helper()
nats, ok := (events.(*asyncEventsNats))
if !ok {
// Only can wait for NATS events.
return
}
client, ok := nats.client.(*natsClient)
if !ok {
// The loopback NATS clients is executing all events synchronously.
return
}
assert.NoError(t, client.conn.FlushWithContext(ctx))
}

View file

@ -1424,7 +1424,7 @@ func (s *ClientSession) filterAsyncMessage(msg *AsyncMessage) *ServerMessage {
}
}
case "event":
if msg.Message.Event.Target == "room" {
if msg.Message.Event.Target == "room" || msg.Message.Event.Target == "participants" {
// Can happen mostly during tests where an older room async message
// could be received by a subscriber that joined after it was sent.
if joined := s.getRoomJoinTime(); joined.IsZero() || msg.SendTime.Before(joined) {

4
hub.go
View file

@ -1719,11 +1719,11 @@ func (h *Hub) processRoom(sess Session, message *ClientMessage) {
if roomId == "" {
// We can handle leaving a room directly.
if session.LeaveRoomWithMessage(true, message) != nil {
// User was in a room before, so need to notify about leaving it.
h.sendRoom(session, message, nil)
if session.UserId() == "" && session.ClientType() != HelloClientTypeInternal {
h.startWaitAnonymousSessionRoom(session)
}
// User was in a room before, so need to notify about leaving it.
h.sendRoom(session, message, nil)
}
return

View file

@ -1985,6 +1985,10 @@ func TestClientMessageToSessionId(t *testing.T) {
client2, hello2 := NewTestClientWithHello(ctx, t, server2, hub2, testDefaultUserId+"2")
require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId)
// Make sure the session subscription events are processed.
waitForAsyncEventsFlushed(ctx, t, hub1.events)
waitForAsyncEventsFlushed(ctx, t, hub2.events)
recipient1 := MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
@ -2042,6 +2046,10 @@ func TestClientControlToSessionId(t *testing.T) {
client2, hello2 := NewTestClientWithHello(ctx, t, server2, hub2, testDefaultUserId+"2")
require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId)
// Make sure the session subscription events are processed.
waitForAsyncEventsFlushed(ctx, t, hub1.events)
waitForAsyncEventsFlushed(ctx, t, hub2.events)
recipient1 := MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,