Fix sending messages to virtual sessions on different servers.

This commit is contained in:
Joachim Bauch 2022-07-12 11:54:41 +02:00
parent 6173a350a1
commit e101e74672
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
2 changed files with 58 additions and 3 deletions

17
hub.go
View file

@ -1306,6 +1306,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) {
} else { } else {
subject = "session." + msg.Recipient.SessionId subject = "session." + msg.Recipient.SessionId
recipientSessionId = msg.Recipient.SessionId recipientSessionId = msg.Recipient.SessionId
serverRecipient = &msg.Recipient
} }
case RecipientTypeUser: case RecipientTypeUser:
if msg.Recipient.UserId != "" { if msg.Recipient.UserId != "" {
@ -1501,8 +1502,12 @@ func (h *Hub) processControlMsg(client *Client, message *ClientMessage) {
SessionId: virtualSession.SessionId(), SessionId: virtualSession.SessionId(),
} }
} }
} else {
serverRecipient = &msg.Recipient
} }
h.mu.RUnlock() h.mu.RUnlock()
} else {
serverRecipient = &msg.Recipient
} }
case RecipientTypeUser: case RecipientTypeUser:
if msg.Recipient.UserId != "" { if msg.Recipient.UserId != "" {
@ -1600,6 +1605,14 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
virtualSessionId := GetVirtualSessionId(session, msg.SessionId) virtualSessionId := GetVirtualSessionId(session, msg.SessionId)
sess, err := NewVirtualSession(session, privateSessionId, publicSessionId, sessionIdData, msg)
if err != nil {
log.Printf("Could not create virtual session %s: %s", virtualSessionId, err)
reply := message.NewErrorServerMessage(NewError("add_failed", "Could not create virtual session."))
session.SendMessage(reply)
return
}
if msg.Options != nil { if msg.Options != nil {
request := NewBackendClientRoomRequest(room.Id(), msg.UserId, publicSessionId) request := NewBackendClientRoomRequest(room.Id(), msg.UserId, publicSessionId)
request.Room.ActorId = msg.Options.ActorId request.Room.ActorId = msg.Options.ActorId
@ -1608,6 +1621,7 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
var response BackendClientResponse var response BackendClientResponse
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil { if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil {
sess.Close()
log.Printf("Could not join virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err) log.Printf("Could not join virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err)
reply := message.NewErrorServerMessage(NewError("add_failed", "Could not join virtual session.")) reply := message.NewErrorServerMessage(NewError("add_failed", "Could not join virtual session."))
session.SendMessage(reply) session.SendMessage(reply)
@ -1615,6 +1629,7 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
} }
if response.Type == "error" { if response.Type == "error" {
sess.Close()
log.Printf("Could not join virtual session %s at backend %s: %+v", virtualSessionId, session.BackendUrl(), response.Error) log.Printf("Could not join virtual session %s at backend %s: %+v", virtualSessionId, session.BackendUrl(), response.Error)
reply := message.NewErrorServerMessage(NewError("add_failed", response.Error.Error())) reply := message.NewErrorServerMessage(NewError("add_failed", response.Error.Error()))
session.SendMessage(reply) session.SendMessage(reply)
@ -1624,6 +1639,7 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
request := NewBackendClientSessionRequest(room.Id(), "add", publicSessionId, msg) request := NewBackendClientSessionRequest(room.Id(), "add", publicSessionId, msg)
var response BackendClientSessionResponse var response BackendClientSessionResponse
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil { if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil {
sess.Close()
log.Printf("Could not add virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err) log.Printf("Could not add virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err)
reply := message.NewErrorServerMessage(NewError("add_failed", "Could not add virtual session.")) reply := message.NewErrorServerMessage(NewError("add_failed", "Could not add virtual session."))
session.SendMessage(reply) session.SendMessage(reply)
@ -1631,7 +1647,6 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
} }
} }
sess := NewVirtualSession(session, privateSessionId, publicSessionId, sessionIdData, msg)
h.mu.Lock() h.mu.Lock()
h.sessions[sessionIdData.Sid] = sess h.sessions[sessionIdData.Sid] = sess
h.virtualSessions[virtualSessionId] = sessionIdData.Sid h.virtualSessions[virtualSessionId] = sessionIdData.Sid

View file

@ -56,8 +56,8 @@ func GetVirtualSessionId(session *ClientSession, sessionId string) string {
return session.PublicId() + "|" + sessionId return session.PublicId() + "|" + sessionId
} }
func NewVirtualSession(session *ClientSession, privateId string, publicId string, data *SessionIdData, msg *AddSessionInternalClientMessage) *VirtualSession { func NewVirtualSession(session *ClientSession, privateId string, publicId string, data *SessionIdData, msg *AddSessionInternalClientMessage) (*VirtualSession, error) {
return &VirtualSession{ result := &VirtualSession{
hub: session.hub, hub: session.hub,
session: session, session: session,
privateId: privateId, privateId: privateId,
@ -70,6 +70,12 @@ func NewVirtualSession(session *ClientSession, privateId string, publicId string
flags: msg.Flags, flags: msg.Flags,
options: msg.Options, options: msg.Options,
} }
if err := session.events.RegisterSessionListener(publicId, session.Backend(), result); err != nil {
return nil, err
}
return result, nil
} }
func (s *VirtualSession) PrivateId() string { func (s *VirtualSession) PrivateId() string {
@ -142,6 +148,7 @@ func (s *VirtualSession) CloseWithFeedback(session *ClientSession, message *Clie
if removed && room != nil { if removed && room != nil {
go s.notifyBackendRemoved(room, session, message) go s.notifyBackendRemoved(room, session, message)
} }
s.session.events.UnregisterSessionListener(s.PublicId(), s.session.Backend(), s)
} }
func (s *VirtualSession) notifyBackendRemoved(room *Room, session *ClientSession, message *ClientMessage) { func (s *VirtualSession) notifyBackendRemoved(room *Room, session *ClientSession, message *ClientMessage) {
@ -255,3 +262,36 @@ func (s *VirtualSession) Flags() uint32 {
func (s *VirtualSession) Options() *AddSessionOptions { func (s *VirtualSession) Options() *AddSessionOptions {
return s.options return s.options
} }
func (s *VirtualSession) ProcessAsyncSessionMessage(message *AsyncMessage) {
if message.Type == "message" && message.Message != nil {
switch message.Message.Type {
case "message":
if message.Message.Message != nil &&
message.Message.Message.Recipient != nil &&
message.Message.Message.Recipient.Type == "session" &&
message.Message.Message.Recipient.SessionId == s.PublicId() {
// The client should see his session id as recipient.
message.Message.Message.Recipient = &MessageClientMessageRecipient{
Type: "session",
SessionId: s.SessionId(),
UserId: s.UserId(),
}
s.session.ProcessAsyncSessionMessage(message)
}
case "control":
if message.Message.Control != nil &&
message.Message.Control.Recipient != nil &&
message.Message.Control.Recipient.Type == "session" &&
message.Message.Control.Recipient.SessionId == s.PublicId() {
// The client should see his session id as recipient.
message.Message.Control.Recipient = &MessageClientMessageRecipient{
Type: "session",
SessionId: s.SessionId(),
UserId: s.UserId(),
}
s.session.ProcessAsyncSessionMessage(message)
}
}
}
}