From 78d74ea3ee5fd748a215397d02b23fbf06402130 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 11 Dec 2025 09:58:36 +0100 Subject: [PATCH 1/3] Start timer for anonymous sessions to join room before sending response. Fixes flaky "TestExpectAnonymousJoinRoomAfterLeave" under load. --- hub.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hub.go b/hub.go index a122de9..5a33bde 100644 --- a/hub.go +++ b/hub.go @@ -1719,11 +1719,11 @@ func (h *Hub) processRoom(sess Session, message *ClientMessage) { if roomId == "" { // We can handle leaving a room directly. if session.LeaveRoomWithMessage(true, message) != nil { - // User was in a room before, so need to notify about leaving it. - h.sendRoom(session, message, nil) if session.UserId() == "" && session.ClientType() != HelloClientTypeInternal { h.startWaitAnonymousSessionRoom(session) } + // User was in a room before, so need to notify about leaving it. + h.sendRoom(session, message, nil) } return From 18e41f243a83c88cbcdb702b16dbc70578ae1a86 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 11 Dec 2025 09:59:27 +0100 Subject: [PATCH 2/3] Also ignore "participants" events sent before room was joined. Fix flaky "TestVirtualSessionCustomInCall" under load. --- clientsession.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clientsession.go b/clientsession.go index 5c5d2e9..fa414ec 100644 --- a/clientsession.go +++ b/clientsession.go @@ -1424,7 +1424,7 @@ func (s *ClientSession) filterAsyncMessage(msg *AsyncMessage) *ServerMessage { } } case "event": - if msg.Message.Event.Target == "room" { + if msg.Message.Event.Target == "room" || msg.Message.Event.Target == "participants" { // Can happen mostly during tests where an older room async message // could be received by a subscriber that joined after it was sent. if joined := s.getRoomJoinTime(); joined.IsZero() || msg.SendTime.Before(joined) { From 550e40f322e95d07f686691ed436ce81e340e7a9 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 11 Dec 2025 11:00:59 +0100 Subject: [PATCH 3/3] Wait for events to be processed in tests before sending between sessions. Fixes flaky "TestClientControlToSessionId" under load which could send to a session before the events subscription was processed completely. --- async_events_test.go | 18 ++++++++++++++++++ hub_test.go | 8 ++++++++ 2 files changed, 26 insertions(+) diff --git a/async_events_test.go b/async_events_test.go index 9cf2c5f..107a41d 100644 --- a/async_events_test.go +++ b/async_events_test.go @@ -83,3 +83,21 @@ func getLoopbackAsyncEventsForTest(t *testing.T) AsyncEvents { }) return events } + +func waitForAsyncEventsFlushed(ctx context.Context, t *testing.T, events AsyncEvents) { + t.Helper() + + nats, ok := (events.(*asyncEventsNats)) + if !ok { + // Only can wait for NATS events. + return + } + + client, ok := nats.client.(*natsClient) + if !ok { + // The loopback NATS clients is executing all events synchronously. + return + } + + assert.NoError(t, client.conn.FlushWithContext(ctx)) +} diff --git a/hub_test.go b/hub_test.go index 43fa737..e8ec61a 100644 --- a/hub_test.go +++ b/hub_test.go @@ -1985,6 +1985,10 @@ func TestClientMessageToSessionId(t *testing.T) { client2, hello2 := NewTestClientWithHello(ctx, t, server2, hub2, testDefaultUserId+"2") require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId) + // Make sure the session subscription events are processed. + waitForAsyncEventsFlushed(ctx, t, hub1.events) + waitForAsyncEventsFlushed(ctx, t, hub2.events) + recipient1 := MessageClientMessageRecipient{ Type: "session", SessionId: hello1.Hello.SessionId, @@ -2042,6 +2046,10 @@ func TestClientControlToSessionId(t *testing.T) { client2, hello2 := NewTestClientWithHello(ctx, t, server2, hub2, testDefaultUserId+"2") require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId) + // Make sure the session subscription events are processed. + waitForAsyncEventsFlushed(ctx, t, hub1.events) + waitForAsyncEventsFlushed(ctx, t, hub2.events) + recipient1 := MessageClientMessageRecipient{ Type: "session", SessionId: hello1.Hello.SessionId,