diff --git a/hub.go b/hub.go index d672886..2f34d82 100644 --- a/hub.go +++ b/hub.go @@ -1306,6 +1306,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { } else { subject = "session." + msg.Recipient.SessionId recipientSessionId = msg.Recipient.SessionId + serverRecipient = &msg.Recipient } case RecipientTypeUser: if msg.Recipient.UserId != "" { @@ -1501,8 +1502,12 @@ func (h *Hub) processControlMsg(client *Client, message *ClientMessage) { SessionId: virtualSession.SessionId(), } } + } else { + serverRecipient = &msg.Recipient } h.mu.RUnlock() + } else { + serverRecipient = &msg.Recipient } case RecipientTypeUser: if msg.Recipient.UserId != "" { @@ -1600,6 +1605,14 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { virtualSessionId := GetVirtualSessionId(session, msg.SessionId) + sess, err := NewVirtualSession(session, privateSessionId, publicSessionId, sessionIdData, msg) + if err != nil { + log.Printf("Could not create virtual session %s: %s", virtualSessionId, err) + reply := message.NewErrorServerMessage(NewError("add_failed", "Could not create virtual session.")) + session.SendMessage(reply) + return + } + if msg.Options != nil { request := NewBackendClientRoomRequest(room.Id(), msg.UserId, publicSessionId) request.Room.ActorId = msg.Options.ActorId @@ -1608,6 +1621,7 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { var response BackendClientResponse if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil { + sess.Close() log.Printf("Could not join virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err) reply := message.NewErrorServerMessage(NewError("add_failed", "Could not join virtual session.")) session.SendMessage(reply) @@ -1615,6 +1629,7 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { } if response.Type == "error" { + sess.Close() log.Printf("Could not join virtual session %s at backend %s: %+v", virtualSessionId, session.BackendUrl(), response.Error) reply := message.NewErrorServerMessage(NewError("add_failed", response.Error.Error())) session.SendMessage(reply) @@ -1624,6 +1639,7 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { request := NewBackendClientSessionRequest(room.Id(), "add", publicSessionId, msg) var response BackendClientSessionResponse if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil { + sess.Close() log.Printf("Could not add virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err) reply := message.NewErrorServerMessage(NewError("add_failed", "Could not add virtual session.")) session.SendMessage(reply) @@ -1631,7 +1647,6 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { } } - sess := NewVirtualSession(session, privateSessionId, publicSessionId, sessionIdData, msg) h.mu.Lock() h.sessions[sessionIdData.Sid] = sess h.virtualSessions[virtualSessionId] = sessionIdData.Sid diff --git a/virtualsession.go b/virtualsession.go index 0fb7d39..45ec832 100644 --- a/virtualsession.go +++ b/virtualsession.go @@ -56,8 +56,8 @@ func GetVirtualSessionId(session *ClientSession, sessionId string) string { return session.PublicId() + "|" + sessionId } -func NewVirtualSession(session *ClientSession, privateId string, publicId string, data *SessionIdData, msg *AddSessionInternalClientMessage) *VirtualSession { - return &VirtualSession{ +func NewVirtualSession(session *ClientSession, privateId string, publicId string, data *SessionIdData, msg *AddSessionInternalClientMessage) (*VirtualSession, error) { + result := &VirtualSession{ hub: session.hub, session: session, privateId: privateId, @@ -70,6 +70,12 @@ func NewVirtualSession(session *ClientSession, privateId string, publicId string flags: msg.Flags, options: msg.Options, } + + if err := session.events.RegisterSessionListener(publicId, session.Backend(), result); err != nil { + return nil, err + } + + return result, nil } func (s *VirtualSession) PrivateId() string { @@ -142,6 +148,7 @@ func (s *VirtualSession) CloseWithFeedback(session *ClientSession, message *Clie if removed && room != nil { go s.notifyBackendRemoved(room, session, message) } + s.session.events.UnregisterSessionListener(s.PublicId(), s.session.Backend(), s) } func (s *VirtualSession) notifyBackendRemoved(room *Room, session *ClientSession, message *ClientMessage) { @@ -255,3 +262,36 @@ func (s *VirtualSession) Flags() uint32 { func (s *VirtualSession) Options() *AddSessionOptions { return s.options } + +func (s *VirtualSession) ProcessAsyncSessionMessage(message *AsyncMessage) { + if message.Type == "message" && message.Message != nil { + switch message.Message.Type { + case "message": + if message.Message.Message != nil && + message.Message.Message.Recipient != nil && + message.Message.Message.Recipient.Type == "session" && + message.Message.Message.Recipient.SessionId == s.PublicId() { + // The client should see his session id as recipient. + message.Message.Message.Recipient = &MessageClientMessageRecipient{ + Type: "session", + SessionId: s.SessionId(), + UserId: s.UserId(), + } + s.session.ProcessAsyncSessionMessage(message) + } + case "control": + if message.Message.Control != nil && + message.Message.Control.Recipient != nil && + message.Message.Control.Recipient.Type == "session" && + message.Message.Control.Recipient.SessionId == s.PublicId() { + // The client should see his session id as recipient. + message.Message.Control.Recipient = &MessageClientMessageRecipient{ + Type: "session", + SessionId: s.SessionId(), + UserId: s.UserId(), + } + s.session.ProcessAsyncSessionMessage(message) + } + } + } +}