Remove remaining virtual sessions if client session is closed.

Previously any virtual sessions that have not been explicitly closed were
not removed from the backend if the client session was closed / expired.
This commit is contained in:
Joachim Bauch 2021-04-21 14:05:45 +02:00
parent d772e10fec
commit 0dd2e8a086
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
2 changed files with 69 additions and 46 deletions

57
hub.go
View file

@ -591,7 +591,7 @@ func (h *Hub) performHousekeeping(now time.Time) {
h.mu.Unlock()
}
func (h *Hub) removeSession(session Session) {
func (h *Hub) removeSession(session Session) (removed bool) {
session.LeaveRoom(true)
h.invalidateSessionId(session.PrivateId(), privateSessionName)
h.invalidateSessionId(session.PublicId(), publicSessionName)
@ -599,10 +599,14 @@ func (h *Hub) removeSession(session Session) {
h.mu.Lock()
if data := session.Data(); data != nil && data.Sid > 0 {
delete(h.clients, data.Sid)
delete(h.sessions, data.Sid)
if _, found := h.sessions[data.Sid]; found {
delete(h.sessions, data.Sid)
removed = true
}
}
delete(h.expiredSessions, session)
h.mu.Unlock()
return
}
func (h *Hub) startWaitAnonymousClientRoom(client *Client) {
@ -1576,49 +1580,12 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
h.mu.Unlock()
if sess != nil {
log.Printf("Session %s removed virtual session %s", session.PublicId(), sess.PublicId())
sess.Close()
go func() {
ctx, cancel := context.WithTimeout(context.Background(), h.backendTimeout)
defer cancel()
var options *AddSessionOptions
if vsess, ok := sess.(*VirtualSession); ok {
options = vsess.Options()
}
if options != nil {
request := NewBackendClientRoomRequest(room.Id(), sess.UserId(), sess.PublicId())
request.Room.Action = "leave"
if options != nil {
request.Room.ActorId = options.ActorId
request.Room.ActorType = options.ActorType
}
var response BackendClientResponse
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil {
log.Printf("Could not leave virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err)
reply := message.NewErrorServerMessage(NewError("add_failed", "Could not join virtual session."))
client.SendMessage(reply)
return
}
if response.Type == "error" {
log.Printf("Could not leave virtual session %s at backend %s: %+v", virtualSessionId, session.BackendUrl(), response.Error)
reply := message.NewErrorServerMessage(NewError("add_failed", response.Error.Error()))
client.SendMessage(reply)
return
}
} else {
request := NewBackendClientSessionRequest(room.Id(), "remove", sess.PublicId(), nil)
var response BackendClientSessionResponse
err := h.backend.PerformJSONRequest(ctx, sess.ParsedBackendUrl(), request, &response)
if err != nil {
log.Printf("Could not remove virtual session %s from backend %s: %s", sess.PublicId(), sess.BackendUrl(), err)
reply := message.NewErrorServerMessage(NewError("remove_failed", "Could not remove virtual session from backend."))
client.SendMessage(reply)
}
}
}()
if vsess, ok := sess.(*VirtualSession); ok {
// We should always have a VirtualSession here.
vsess.CloseWithFeedback(client, message)
} else {
sess.Close()
}
}
default:
log.Printf("Ignore unsupported internal message %+v from %s", msg, session.PublicId())

View file

@ -22,6 +22,7 @@
package signaling
import (
"context"
"encoding/json"
"log"
"net/url"
@ -57,6 +58,7 @@ func GetVirtualSessionId(session *ClientSession, sessionId string) string {
func NewVirtualSession(session *ClientSession, privateId string, publicId string, data *SessionIdData, msg *AddSessionInternalClientMessage) *VirtualSession {
return &VirtualSession{
hub: session.hub,
session: session,
privateId: privateId,
publicId: publicId,
@ -130,8 +132,62 @@ func (s *VirtualSession) IsExpired(now time.Time) bool {
}
func (s *VirtualSession) Close() {
s.CloseWithFeedback(nil, nil)
}
func (s *VirtualSession) CloseWithFeedback(client *Client, message *ClientMessage) {
room := s.GetRoom()
s.session.RemoveVirtualSession(s)
s.session.hub.removeSession(s)
removed := s.session.hub.removeSession(s)
if removed && room != nil {
go s.notifyBackendRemoved(room, client, message)
}
}
func (s *VirtualSession) notifyBackendRemoved(room *Room, client *Client, message *ClientMessage) {
ctx, cancel := context.WithTimeout(context.Background(), s.hub.backendTimeout)
defer cancel()
if options := s.Options(); options != nil {
request := NewBackendClientRoomRequest(room.Id(), s.UserId(), s.PublicId())
request.Room.Action = "leave"
if options != nil {
request.Room.ActorId = options.ActorId
request.Room.ActorType = options.ActorType
}
var response BackendClientResponse
if err := s.hub.backend.PerformJSONRequest(ctx, s.ParsedBackendUrl(), request, &response); err != nil {
virtualSessionId := GetVirtualSessionId(s.session, s.PublicId())
log.Printf("Could not leave virtual session %s at backend %s: %s", virtualSessionId, s.BackendUrl(), err)
if client != nil && message != nil {
reply := message.NewErrorServerMessage(NewError("remove_failed", "Could not remove virtual session from backend."))
client.SendMessage(reply)
}
return
}
if response.Type == "error" {
virtualSessionId := GetVirtualSessionId(s.session, s.PublicId())
log.Printf("Could not leave virtual session %s at backend %s: %+v", virtualSessionId, s.BackendUrl(), response.Error)
if client != nil && message != nil {
reply := message.NewErrorServerMessage(NewError("remove_failed", response.Error.Error()))
client.SendMessage(reply)
}
return
}
} else {
request := NewBackendClientSessionRequest(room.Id(), "remove", s.PublicId(), nil)
var response BackendClientSessionResponse
err := s.hub.backend.PerformJSONRequest(ctx, s.ParsedBackendUrl(), request, &response)
if err != nil {
log.Printf("Could not remove virtual session %s from backend %s: %s", s.PublicId(), s.BackendUrl(), err)
if client != nil && message != nil {
reply := message.NewErrorServerMessage(NewError("remove_failed", "Could not remove virtual session from backend."))
client.SendMessage(reply)
}
}
}
}
func (s *VirtualSession) HasPermission(permission Permission) bool {