From 550e40f322e95d07f686691ed436ce81e340e7a9 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 11 Dec 2025 11:00:59 +0100 Subject: [PATCH] Wait for events to be processed in tests before sending between sessions. Fixes flaky "TestClientControlToSessionId" under load which could send to a session before the events subscription was processed completely. --- async_events_test.go | 18 ++++++++++++++++++ hub_test.go | 8 ++++++++ 2 files changed, 26 insertions(+) diff --git a/async_events_test.go b/async_events_test.go index 9cf2c5f..107a41d 100644 --- a/async_events_test.go +++ b/async_events_test.go @@ -83,3 +83,21 @@ func getLoopbackAsyncEventsForTest(t *testing.T) AsyncEvents { }) return events } + +func waitForAsyncEventsFlushed(ctx context.Context, t *testing.T, events AsyncEvents) { + t.Helper() + + nats, ok := (events.(*asyncEventsNats)) + if !ok { + // Only can wait for NATS events. + return + } + + client, ok := nats.client.(*natsClient) + if !ok { + // The loopback NATS clients is executing all events synchronously. + return + } + + assert.NoError(t, client.conn.FlushWithContext(ctx)) +} diff --git a/hub_test.go b/hub_test.go index 43fa737..e8ec61a 100644 --- a/hub_test.go +++ b/hub_test.go @@ -1985,6 +1985,10 @@ func TestClientMessageToSessionId(t *testing.T) { client2, hello2 := NewTestClientWithHello(ctx, t, server2, hub2, testDefaultUserId+"2") require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId) + // Make sure the session subscription events are processed. + waitForAsyncEventsFlushed(ctx, t, hub1.events) + waitForAsyncEventsFlushed(ctx, t, hub2.events) + recipient1 := MessageClientMessageRecipient{ Type: "session", SessionId: hello1.Hello.SessionId, @@ -2042,6 +2046,10 @@ func TestClientControlToSessionId(t *testing.T) { client2, hello2 := NewTestClientWithHello(ctx, t, server2, hub2, testDefaultUserId+"2") require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId) + // Make sure the session subscription events are processed. + waitForAsyncEventsFlushed(ctx, t, hub1.events) + waitForAsyncEventsFlushed(ctx, t, hub2.events) + recipient1 := MessageClientMessageRecipient{ Type: "session", SessionId: hello1.Hello.SessionId,