From 4d991725c376e5e3f881e35c9041a7f5fa9cd126 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 8 Jun 2021 13:45:58 +0200 Subject: [PATCH 1/2] Add method to get pending messages from testclient. --- testclient_test.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/testclient_test.go b/testclient_test.go index 9dc275a..10818e9 100644 --- a/testclient_test.go +++ b/testclient_test.go @@ -422,6 +422,32 @@ func (c *TestClient) DrainMessages(ctx context.Context) error { return nil } +func (c *TestClient) GetPendingMessages(ctx context.Context) ([]*ServerMessage, error) { + var result []*ServerMessage + select { + case err := <-c.readErrorChan: + return nil, err + case msg := <-c.messageChan: + var m ServerMessage + if err := json.Unmarshal(msg, &m); err != nil { + return nil, err + } + result = append(result, &m) + n := len(c.messageChan) + for i := 0; i < n; i++ { + var m ServerMessage + msg = <-c.messageChan + if err := json.Unmarshal(msg, &m); err != nil { + return nil, err + } + result = append(result, &m) + } + case <-ctx.Done(): + return nil, ctx.Err() + } + return result, nil +} + func (c *TestClient) RunUntilMessage(ctx context.Context) (message *ServerMessage, err error) { select { case err = <-c.readErrorChan: From 9d2ad0f2435180c1cc02f09342836a4d7c9441e3 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 8 Jun 2021 13:46:25 +0200 Subject: [PATCH 2/2] Notify new clients about current flags of virtual sessions. --- hub.go | 29 +++++++++++++++++++++++ virtualsession_test.go | 52 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/hub.go b/hub.go index 2e13719..e287d36 100644 --- a/hub.go +++ b/hub.go @@ -1170,6 +1170,35 @@ func (h *Hub) notifyUserJoinedRoom(room *Room, session *ClientSession, sessionDa // No need to send through NATS, the session is connected locally. session.SendMessage(msg) + + // Notify about initial flags of virtual sessions. + for _, s := range sessions { + vsess, ok := s.(*VirtualSession) + if !ok { + continue + } + + flags := vsess.Flags() + if flags == 0 { + continue + } + + msg := &ServerMessage{ + Type: "event", + Event: &EventServerMessage{ + Target: "participants", + Type: "flags", + Flags: &RoomFlagsServerMessage{ + RoomId: room.Id(), + SessionId: vsess.PublicId(), + Flags: vsess.Flags(), + }, + }, + } + + // No need to send through NATS, the session is connected locally. + session.SendMessage(msg) + } } } diff --git a/virtualsession_test.go b/virtualsession_test.go index a69e4eb..e6dac71 100644 --- a/virtualsession_test.go +++ b/virtualsession_test.go @@ -197,6 +197,58 @@ func TestVirtualSession(t *testing.T) { t.Errorf("Expected flags %d, got %+v", newFlags, flagsMsg.Flags) } + // A new client will receive the initial flags of the virtual session. + client2 := NewTestClient(t, server, hub) + defer client2.CloseWithBye() + if err := client2.SendHello(testDefaultUserId + "2"); err != nil { + t.Fatal(err) + } + + if _, err := client2.RunUntilHello(ctx); err != nil { + t.Error(err) + } + + if room, err := client2.JoinRoom(ctx, roomId); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + + gotFlags := false + var receivedMessages []*ServerMessage + for !gotFlags { + messages, err := client2.GetPendingMessages(ctx) + if err != nil { + t.Error(err) + } + + receivedMessages = append(receivedMessages, messages...) + for _, msg := range messages { + if msg.Type != "event" || msg.Event.Target != "participants" || msg.Event.Type != "flags" { + continue + } + + if msg.Event.Flags.RoomId != roomId { + t.Errorf("Expected flags in room %s, got %s", roomId, msg.Event.Flags.RoomId) + } else if msg.Event.Flags.SessionId != sessionId { + t.Errorf("Expected flags for session %s, got %s", sessionId, msg.Event.Flags.SessionId) + } else if msg.Event.Flags.Flags != newFlags { + t.Errorf("Expected flags %d, got %d", newFlags, msg.Event.Flags.Flags) + } else { + gotFlags = true + break + } + } + } + if !gotFlags { + t.Errorf("Didn't receive initial flags in %+v", receivedMessages) + } + + // Ignore "join" messages from second client + if err := client.DrainMessages(ctx); err != nil { + t.Error(err) + } + // When sending to a virtual session, the message is sent to the actual // client and contains a "Recipient" block with the internal session id. recipient := MessageClientMessageRecipient{