Ignore room NATS messages that were sent before the room was joined.

Can happen mostly during tests (e.g. TestClientTakeoverRoomSession)
where the new client could receive the "leave" event that was sent
before it joined the room (but got delivered after joining).
pull/115/head
Joachim Bauch 1 year ago
parent 2628735431
commit 7618bc683c
No known key found for this signature in database
GPG Key ID: 77C1D22D53E15F02
  1. 18
      clientsession.go
  2. 12
      natsclient.go
  3. 11
      natsclient_loopback.go

@ -46,6 +46,8 @@ var (
)
type ClientSession struct {
roomJoinTime int64
running int32
hub *Hub
privateId string
@ -289,12 +291,21 @@ func (s *ClientSession) IsExpired(now time.Time) bool {
func (s *ClientSession) SetRoom(room *Room) {
atomic.StorePointer(&s.room, unsafe.Pointer(room))
if room != nil {
atomic.StoreInt64(&s.roomJoinTime, time.Now().UnixNano())
} else {
atomic.StoreInt64(&s.roomJoinTime, 0)
}
}
func (s *ClientSession) GetRoom() *Room {
return (*Room)(atomic.LoadPointer(&s.room))
}
func (s *ClientSession) getRoomJoinTime() time.Time {
return time.Unix(0, atomic.LoadInt64(&s.roomJoinTime))
}
func (s *ClientSession) releaseMcuObjects() {
if len(s.publishers) > 0 {
go func(publishers map[string]McuPublisher) {
@ -815,6 +826,13 @@ func (s *ClientSession) processNatsMessage(msg *NatsMessage) *ServerMessage {
// TODO(jojo): Only send all users if current session id has
// changed its "inCall" flag to true.
m.Changed = nil
} else if msg.Message.Event.Target == "room" {
// Can happen mostly during tests where an older room NATS message
// could be received by a subscriber that joined after it was sent.
if msg.SendTime.Before(s.getRoomJoinTime()) {
log.Printf("Message %+v was sent before room was joined, ignoring", msg.Message)
return nil
}
}
}

@ -38,6 +38,8 @@ const (
)
type NatsMessage struct {
SendTime time.Time `json:"sendtime"`
Type string `json:"type"`
Message *ServerMessage `json:"message,omitempty"`
@ -150,16 +152,18 @@ func (c *natsClient) PublishNats(subject string, message *NatsMessage) error {
func (c *natsClient) PublishMessage(subject string, message *ServerMessage) error {
msg := &NatsMessage{
Type: "message",
Message: message,
SendTime: time.Now(),
Type: "message",
Message: message,
}
return c.PublishNats(subject, msg)
}
func (c *natsClient) PublishBackendServerRoomRequest(subject string, message *BackendServerRoomRequest) error {
msg := &NatsMessage{
Type: "room",
Room: message,
SendTime: time.Now(),
Type: "room",
Room: message,
}
return c.PublishNats(subject, msg)
}

@ -27,6 +27,7 @@ import (
"log"
"strings"
"sync"
"time"
"github.com/nats-io/nats.go"
)
@ -175,16 +176,18 @@ func (c *LoopbackNatsClient) PublishNats(subject string, message *NatsMessage) e
func (c *LoopbackNatsClient) PublishMessage(subject string, message *ServerMessage) error {
msg := &NatsMessage{
Type: "message",
Message: message,
SendTime: time.Now(),
Type: "message",
Message: message,
}
return c.PublishNats(subject, msg)
}
func (c *LoopbackNatsClient) PublishBackendServerRoomRequest(subject string, message *BackendServerRoomRequest) error {
msg := &NatsMessage{
Type: "room",
Room: message,
SendTime: time.Now(),
Type: "room",
Room: message,
}
return c.PublishNats(subject, msg)
}

Loading…
Cancel
Save