Track anonymous sessions not in rooms instead of clients.

Otherwise it would be possible for clients to reconnect to reset their
timer to join a room.
This commit is contained in:
Joachim Bauch 2022-07-08 15:52:32 +02:00
parent ab26dfe90d
commit 608415c3ff
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02

67
hub.go
View file

@ -141,8 +141,8 @@ type Hub struct {
allowSubscribeAnyStream bool allowSubscribeAnyStream bool
expiredSessions map[Session]bool expiredSessions map[Session]bool
anonymousSessions map[*ClientSession]time.Time
expectHelloClients map[*Client]time.Time expectHelloClients map[*Client]time.Time
anonymousClients map[*Client]time.Time
backendTimeout time.Duration backendTimeout time.Duration
backend *BackendClient backend *BackendClient
@ -329,7 +329,7 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer
allowSubscribeAnyStream: allowSubscribeAnyStream, allowSubscribeAnyStream: allowSubscribeAnyStream,
expiredSessions: make(map[Session]bool), expiredSessions: make(map[Session]bool),
anonymousClients: make(map[*Client]time.Time), anonymousSessions: make(map[*ClientSession]time.Time),
expectHelloClients: make(map[*Client]time.Time), expectHelloClients: make(map[*Client]time.Time),
backendTimeout: backendTimeout, backendTimeout: backendTimeout,
@ -588,35 +588,35 @@ func (h *Hub) checkExpiredSessions(now time.Time) {
} }
} }
func (h *Hub) checkExpireClients(now time.Time, clients map[*Client]time.Time, reason string) { func (h *Hub) checkAnonymousSessions(now time.Time) {
for client, timeout := range clients { for session, timeout := range h.anonymousSessions {
if now.After(timeout) { if now.After(timeout) {
// This will close the client connection. // This will close the client connection.
h.mu.Unlock() h.mu.Unlock()
client.SendByeResponseWithReason(nil, reason) if client := session.GetClient(); client != nil {
if reason == "room_join_timeout" { client.SendByeResponseWithReason(nil, "room_join_timeout")
session := client.GetSession() }
if session != nil {
session.Close() session.Close()
}
}
h.mu.Lock() h.mu.Lock()
} }
} }
} }
func (h *Hub) checkAnonymousClients(now time.Time) {
h.checkExpireClients(now, h.anonymousClients, "room_join_timeout")
}
func (h *Hub) checkInitialHello(now time.Time) { func (h *Hub) checkInitialHello(now time.Time) {
h.checkExpireClients(now, h.expectHelloClients, "hello_timeout") for client, timeout := range h.expectHelloClients {
if now.After(timeout) {
// This will close the client connection.
h.mu.Unlock()
client.SendByeResponseWithReason(nil, "hello_timeout")
h.mu.Lock()
}
}
} }
func (h *Hub) performHousekeeping(now time.Time) { func (h *Hub) performHousekeeping(now time.Time) {
h.mu.Lock() h.mu.Lock()
h.checkExpiredSessions(now) h.checkExpiredSessions(now)
h.checkAnonymousClients(now) h.checkAnonymousSessions(now)
h.checkInitialHello(now) h.checkInitialHello(now)
h.mu.Unlock() h.mu.Unlock()
} }
@ -636,30 +636,30 @@ func (h *Hub) removeSession(session Session) (removed bool) {
} }
} }
delete(h.expiredSessions, session) delete(h.expiredSessions, session)
if session, ok := session.(*ClientSession); ok {
delete(h.anonymousSessions, session)
}
h.mu.Unlock() h.mu.Unlock()
return return
} }
func (h *Hub) startWaitAnonymousClientRoom(client *Client) { func (h *Hub) startWaitAnonymousSessionRoom(session *ClientSession) {
h.mu.Lock() h.mu.Lock()
defer h.mu.Unlock() defer h.mu.Unlock()
h.startWaitAnonymousClientRoomLocked(client) h.startWaitAnonymousSessionRoomLocked(session)
} }
func (h *Hub) startWaitAnonymousClientRoomLocked(client *Client) { func (h *Hub) startWaitAnonymousSessionRoomLocked(session *ClientSession) {
if !client.IsConnected() { if session.ClientType() == HelloClientTypeInternal {
return
}
if session := client.GetSession(); session != nil && session.ClientType() == HelloClientTypeInternal {
// Internal clients don't need to join a room. // Internal clients don't need to join a room.
return return
} }
// Anonymous clients must join a public room within a given time, // Anonymous sessions must join a public room within a given time,
// otherwise they get disconnected to avoid blocking resources forever. // otherwise they get disconnected to avoid blocking resources forever.
now := time.Now() now := time.Now()
h.anonymousClients[client] = now.Add(anonmyousJoinRoomTimeout) h.anonymousSessions[session] = now.Add(anonmyousJoinRoomTimeout)
} }
func (h *Hub) startExpectHello(client *Client) { func (h *Hub) startExpectHello(client *Client) {
@ -768,7 +768,7 @@ func (h *Hub) processRegister(client *Client, message *ClientMessage, backend *B
h.clients[sessionIdData.Sid] = client h.clients[sessionIdData.Sid] = client
delete(h.expectHelloClients, client) delete(h.expectHelloClients, client)
if userId == "" && auth.Type != HelloClientTypeInternal { if userId == "" && auth.Type != HelloClientTypeInternal {
h.startWaitAnonymousClientRoomLocked(client) h.startWaitAnonymousSessionRoomLocked(session)
} }
h.mu.Unlock() h.mu.Unlock()
@ -787,7 +787,6 @@ func (h *Hub) processUnregister(client *Client) *ClientSession {
session := client.GetSession() session := client.GetSession()
h.mu.Lock() h.mu.Lock()
delete(h.anonymousClients, client)
delete(h.expectHelloClients, client) delete(h.expectHelloClients, client)
if session != nil { if session != nil {
delete(h.clients, session.Data().Sid) delete(h.clients, session.Data().Sid)
@ -1071,9 +1070,9 @@ func (h *Hub) processRoom(client *Client, message *ClientMessage) {
if session.LeaveRoom(true) != nil { if session.LeaveRoom(true) != nil {
// User was in a room before, so need to notify about leaving it. // User was in a room before, so need to notify about leaving it.
h.sendRoom(session, message, nil) h.sendRoom(session, message, nil)
}
if session.UserId() == "" && session.ClientType() != HelloClientTypeInternal { if session.UserId() == "" && session.ClientType() != HelloClientTypeInternal {
h.startWaitAnonymousClientRoom(client) h.startWaitAnonymousSessionRoom(session)
}
} }
return return
} }
@ -1173,7 +1172,7 @@ func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, ro
internalRoomId := getRoomIdForBackend(roomId, session.Backend()) internalRoomId := getRoomIdForBackend(roomId, session.Backend())
if err := session.SubscribeRoomEvents(roomId, message.Room.SessionId); err != nil { if err := session.SubscribeRoomEvents(roomId, message.Room.SessionId); err != nil {
session.SendMessage(message.NewWrappedErrorServerMessage(err)) session.SendMessage(message.NewWrappedErrorServerMessage(err))
// The client (implicitly) left the room due to an error. // The session (implicitly) left the room due to an error.
h.sendRoom(session, nil, nil) h.sendRoom(session, nil, nil)
return return
} }
@ -1185,7 +1184,7 @@ func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, ro
if r, err = h.createRoom(roomId, room.Room.Properties, session.Backend()); err != nil { if r, err = h.createRoom(roomId, room.Room.Properties, session.Backend()); err != nil {
h.ru.Unlock() h.ru.Unlock()
session.SendMessage(message.NewWrappedErrorServerMessage(err)) session.SendMessage(message.NewWrappedErrorServerMessage(err))
// The client (implicitly) left the room due to an error. // The session (implicitly) left the room due to an error.
session.UnsubscribeRoomEvents() session.UnsubscribeRoomEvents()
h.sendRoom(session, nil, nil) h.sendRoom(session, nil, nil)
return return
@ -1194,10 +1193,8 @@ func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, ro
h.ru.Unlock() h.ru.Unlock()
h.mu.Lock() h.mu.Lock()
if client := session.GetClient(); client != nil { // The session now joined a room, don't expire if it is anonymous.
// The client now joined a room, don't expire him if he is anonymous. delete(h.anonymousSessions, session)
delete(h.anonymousClients, client)
}
h.mu.Unlock() h.mu.Unlock()
session.SetRoom(r) session.SetRoom(r)
if room.Room.Permissions != nil { if room.Room.Permissions != nil {