From 7618bc683c514c63d62b5ed969ed1ffaf417bd8a Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 8 Jun 2021 09:20:18 +0200 Subject: [PATCH] Ignore room NATS messages that were sent before the room was joined. Can happen mostly during tests (e.g. TestClientTakeoverRoomSession) where the new client could receive the "leave" event that was sent before it joined the room (but got delivered after joining). --- clientsession.go | 18 ++++++++++++++++++ natsclient.go | 12 ++++++++---- natsclient_loopback.go | 11 +++++++---- 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/clientsession.go b/clientsession.go index 8d0da87..11de3fc 100644 --- a/clientsession.go +++ b/clientsession.go @@ -46,6 +46,8 @@ var ( ) type ClientSession struct { + roomJoinTime int64 + running int32 hub *Hub privateId string @@ -289,12 +291,21 @@ func (s *ClientSession) IsExpired(now time.Time) bool { func (s *ClientSession) SetRoom(room *Room) { atomic.StorePointer(&s.room, unsafe.Pointer(room)) + if room != nil { + atomic.StoreInt64(&s.roomJoinTime, time.Now().UnixNano()) + } else { + atomic.StoreInt64(&s.roomJoinTime, 0) + } } func (s *ClientSession) GetRoom() *Room { return (*Room)(atomic.LoadPointer(&s.room)) } +func (s *ClientSession) getRoomJoinTime() time.Time { + return time.Unix(0, atomic.LoadInt64(&s.roomJoinTime)) +} + func (s *ClientSession) releaseMcuObjects() { if len(s.publishers) > 0 { go func(publishers map[string]McuPublisher) { @@ -815,6 +826,13 @@ func (s *ClientSession) processNatsMessage(msg *NatsMessage) *ServerMessage { // TODO(jojo): Only send all users if current session id has // changed its "inCall" flag to true. m.Changed = nil + } else if msg.Message.Event.Target == "room" { + // Can happen mostly during tests where an older room NATS message + // could be received by a subscriber that joined after it was sent. + if msg.SendTime.Before(s.getRoomJoinTime()) { + log.Printf("Message %+v was sent before room was joined, ignoring", msg.Message) + return nil + } } } diff --git a/natsclient.go b/natsclient.go index 6925b57..a9e07ae 100644 --- a/natsclient.go +++ b/natsclient.go @@ -38,6 +38,8 @@ const ( ) type NatsMessage struct { + SendTime time.Time `json:"sendtime"` + Type string `json:"type"` Message *ServerMessage `json:"message,omitempty"` @@ -150,16 +152,18 @@ func (c *natsClient) PublishNats(subject string, message *NatsMessage) error { func (c *natsClient) PublishMessage(subject string, message *ServerMessage) error { msg := &NatsMessage{ - Type: "message", - Message: message, + SendTime: time.Now(), + Type: "message", + Message: message, } return c.PublishNats(subject, msg) } func (c *natsClient) PublishBackendServerRoomRequest(subject string, message *BackendServerRoomRequest) error { msg := &NatsMessage{ - Type: "room", - Room: message, + SendTime: time.Now(), + Type: "room", + Room: message, } return c.PublishNats(subject, msg) } diff --git a/natsclient_loopback.go b/natsclient_loopback.go index 7a7d819..aaa1699 100644 --- a/natsclient_loopback.go +++ b/natsclient_loopback.go @@ -27,6 +27,7 @@ import ( "log" "strings" "sync" + "time" "github.com/nats-io/nats.go" ) @@ -175,16 +176,18 @@ func (c *LoopbackNatsClient) PublishNats(subject string, message *NatsMessage) e func (c *LoopbackNatsClient) PublishMessage(subject string, message *ServerMessage) error { msg := &NatsMessage{ - Type: "message", - Message: message, + SendTime: time.Now(), + Type: "message", + Message: message, } return c.PublishNats(subject, msg) } func (c *LoopbackNatsClient) PublishBackendServerRoomRequest(subject string, message *BackendServerRoomRequest) error { msg := &NatsMessage{ - Type: "room", - Room: message, + SendTime: time.Now(), + Type: "room", + Room: message, } return c.PublishNats(subject, msg) }