Wait for events to be processed in tests before sending between sessions.

Fixes flaky "TestClientControlToSessionId" under load which could send to
a session before the events subscription was processed completely.
This commit is contained in:
Joachim Bauch 2025-12-11 11:00:59 +01:00
commit 550e40f322
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
2 changed files with 26 additions and 0 deletions

View file

@ -83,3 +83,21 @@ func getLoopbackAsyncEventsForTest(t *testing.T) AsyncEvents {
})
return events
}
func waitForAsyncEventsFlushed(ctx context.Context, t *testing.T, events AsyncEvents) {
t.Helper()
nats, ok := (events.(*asyncEventsNats))
if !ok {
// Only can wait for NATS events.
return
}
client, ok := nats.client.(*natsClient)
if !ok {
// The loopback NATS clients is executing all events synchronously.
return
}
assert.NoError(t, client.conn.FlushWithContext(ctx))
}

View file

@ -1985,6 +1985,10 @@ func TestClientMessageToSessionId(t *testing.T) {
client2, hello2 := NewTestClientWithHello(ctx, t, server2, hub2, testDefaultUserId+"2")
require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId)
// Make sure the session subscription events are processed.
waitForAsyncEventsFlushed(ctx, t, hub1.events)
waitForAsyncEventsFlushed(ctx, t, hub2.events)
recipient1 := MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
@ -2042,6 +2046,10 @@ func TestClientControlToSessionId(t *testing.T) {
client2, hello2 := NewTestClientWithHello(ctx, t, server2, hub2, testDefaultUserId+"2")
require.NotEqual(hello1.Hello.SessionId, hello2.Hello.SessionId)
// Make sure the session subscription events are processed.
waitForAsyncEventsFlushed(ctx, t, hub1.events)
waitForAsyncEventsFlushed(ctx, t, hub2.events)
recipient1 := MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,