diff --git a/clientsession.go b/clientsession.go index 8d0da87..11de3fc 100644 --- a/clientsession.go +++ b/clientsession.go @@ -46,6 +46,8 @@ var ( ) type ClientSession struct { + roomJoinTime int64 + running int32 hub *Hub privateId string @@ -289,12 +291,21 @@ func (s *ClientSession) IsExpired(now time.Time) bool { func (s *ClientSession) SetRoom(room *Room) { atomic.StorePointer(&s.room, unsafe.Pointer(room)) + if room != nil { + atomic.StoreInt64(&s.roomJoinTime, time.Now().UnixNano()) + } else { + atomic.StoreInt64(&s.roomJoinTime, 0) + } } func (s *ClientSession) GetRoom() *Room { return (*Room)(atomic.LoadPointer(&s.room)) } +func (s *ClientSession) getRoomJoinTime() time.Time { + return time.Unix(0, atomic.LoadInt64(&s.roomJoinTime)) +} + func (s *ClientSession) releaseMcuObjects() { if len(s.publishers) > 0 { go func(publishers map[string]McuPublisher) { @@ -815,6 +826,13 @@ func (s *ClientSession) processNatsMessage(msg *NatsMessage) *ServerMessage { // TODO(jojo): Only send all users if current session id has // changed its "inCall" flag to true. m.Changed = nil + } else if msg.Message.Event.Target == "room" { + // Can happen mostly during tests where an older room NATS message + // could be received by a subscriber that joined after it was sent. + if msg.SendTime.Before(s.getRoomJoinTime()) { + log.Printf("Message %+v was sent before room was joined, ignoring", msg.Message) + return nil + } } } diff --git a/natsclient.go b/natsclient.go index 6925b57..a9e07ae 100644 --- a/natsclient.go +++ b/natsclient.go @@ -38,6 +38,8 @@ const ( ) type NatsMessage struct { + SendTime time.Time `json:"sendtime"` + Type string `json:"type"` Message *ServerMessage `json:"message,omitempty"` @@ -150,16 +152,18 @@ func (c *natsClient) PublishNats(subject string, message *NatsMessage) error { func (c *natsClient) PublishMessage(subject string, message *ServerMessage) error { msg := &NatsMessage{ - Type: "message", - Message: message, + SendTime: time.Now(), + Type: "message", + Message: message, } return c.PublishNats(subject, msg) } func (c *natsClient) PublishBackendServerRoomRequest(subject string, message *BackendServerRoomRequest) error { msg := &NatsMessage{ - Type: "room", - Room: message, + SendTime: time.Now(), + Type: "room", + Room: message, } return c.PublishNats(subject, msg) } diff --git a/natsclient_loopback.go b/natsclient_loopback.go index 7a7d819..aaa1699 100644 --- a/natsclient_loopback.go +++ b/natsclient_loopback.go @@ -27,6 +27,7 @@ import ( "log" "strings" "sync" + "time" "github.com/nats-io/nats.go" ) @@ -175,16 +176,18 @@ func (c *LoopbackNatsClient) PublishNats(subject string, message *NatsMessage) e func (c *LoopbackNatsClient) PublishMessage(subject string, message *ServerMessage) error { msg := &NatsMessage{ - Type: "message", - Message: message, + SendTime: time.Now(), + Type: "message", + Message: message, } return c.PublishNats(subject, msg) } func (c *LoopbackNatsClient) PublishBackendServerRoomRequest(subject string, message *BackendServerRoomRequest) error { msg := &NatsMessage{ - Type: "room", - Room: message, + SendTime: time.Now(), + Type: "room", + Room: message, } return c.PublishNats(subject, msg) }