Merge pull request #294 from strukturag/synchronous-roomsession-disconnect

Disconnect sessions with the same room session id synchronously.
This commit is contained in:
Joachim Bauch 2022-07-11 10:53:05 +02:00 committed by GitHub
commit 13d2795b00
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 41 additions and 42 deletions

View file

@ -392,7 +392,7 @@ func (b *BackendServer) lookupByRoomSessionId(ctx context.Context, roomSessionId
} }
} }
sid, err := b.roomSessions.LookupSessionId(ctx, roomSessionId) sid, err := b.roomSessions.LookupSessionId(ctx, roomSessionId, "")
if err == ErrNoSuchRoomSession { if err == ErrNoSuchRoomSession {
return "", nil return "", nil
} else if err != nil { } else if err != nil {

View file

@ -186,12 +186,13 @@ func (c *GrpcClient) GetServerId(ctx context.Context) (string, error) {
return response.GetServerId(), nil return response.GetServerId(), nil
} }
func (c *GrpcClient) LookupSessionId(ctx context.Context, roomSessionId string) (string, error) { func (c *GrpcClient) LookupSessionId(ctx context.Context, roomSessionId string, disconnectReason string) (string, error) {
statsGrpcClientCalls.WithLabelValues("LookupSessionId").Inc() statsGrpcClientCalls.WithLabelValues("LookupSessionId").Inc()
// TODO: Remove debug logging // TODO: Remove debug logging
log.Printf("Lookup room session %s on %s", roomSessionId, c.Target()) log.Printf("Lookup room session %s on %s", roomSessionId, c.Target())
response, err := c.impl.LookupSessionId(ctx, &LookupSessionIdRequest{ response, err := c.impl.LookupSessionId(ctx, &LookupSessionIdRequest{
RoomSessionId: roomSessionId, RoomSessionId: roomSessionId,
DisconnectReason: disconnectReason,
}, grpc.WaitForReady(true)) }, grpc.WaitForReady(true))
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound { if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
return "", ErrNoSuchRoomSession return "", ErrNoSuchRoomSession

View file

@ -115,6 +115,19 @@ func (s *GrpcServer) LookupSessionId(ctx context.Context, request *LookupSession
return nil, err return nil, err
} }
if sid != "" && request.DisconnectReason != "" {
if session := s.hub.GetSessionByPublicId(sid); session != nil {
log.Printf("Closing session %s because same room session %s connected", session.PublicId(), request.RoomSessionId)
session.LeaveRoom(false)
switch sess := session.(type) {
case *ClientSession:
if client := sess.GetClient(); client != nil {
client.SendByeResponseWithReason(nil, "room_session_reconnected")
}
}
session.Close()
}
}
return &LookupSessionIdReply{ return &LookupSessionIdReply{
SessionId: sid, SessionId: sid,
}, nil }, nil

View file

@ -32,6 +32,8 @@ service RpcSessions {
message LookupSessionIdRequest { message LookupSessionIdRequest {
string roomSessionId = 1; string roomSessionId = 1;
// Optional: set if the session should be disconnected with a given reason.
string disconnectReason = 2;
} }
message LookupSessionIdReply { message LookupSessionIdReply {

5
hub.go
View file

@ -1001,7 +1001,7 @@ func (h *Hub) processHelloInternal(client *Client, message *ClientMessage) {
} }
func (h *Hub) disconnectByRoomSessionId(ctx context.Context, roomSessionId string, backend *Backend) { func (h *Hub) disconnectByRoomSessionId(ctx context.Context, roomSessionId string, backend *Backend) {
sessionId, err := h.roomSessions.LookupSessionId(ctx, roomSessionId) sessionId, err := h.roomSessions.LookupSessionId(ctx, roomSessionId, "room_session_reconnected")
if err == ErrNoSuchRoomSession { if err == ErrNoSuchRoomSession {
return return
} else if err != nil { } else if err != nil {
@ -1011,7 +1011,8 @@ func (h *Hub) disconnectByRoomSessionId(ctx context.Context, roomSessionId strin
session := h.GetSessionByPublicId(sessionId) session := h.GetSessionByPublicId(sessionId)
if session == nil { if session == nil {
// Session is located on a different server. // Session is located on a different server. Should already have been closed
// but send "bye" again as additional safeguard.
msg := &AsyncMessage{ msg := &AsyncMessage{
Type: "message", Type: "message",
Message: &ServerMessage{ Message: &ServerMessage{

View file

@ -3016,42 +3016,24 @@ func RunTestClientTakeoverRoomSession(t *testing.T) {
t.Error(err) t.Error(err)
} }
if isLocalTest(t) { // No message about the closing is sent to the new connection.
// No message about the closing is sent to the new connection. ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel2()
defer cancel2()
if message, err := client2.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded { if message, err := client2.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded {
t.Error(err) t.Error(err)
} else if message != nil { } else if message != nil {
t.Errorf("Expected no message, got %+v", message) t.Errorf("Expected no message, got %+v", message)
} }
// The permanently connected client will receive a "left" event from the // The permanently connected client will receive a "left" event from the
// overridden session and a "joined" for the new session. In that order as // overridden session and a "joined" for the new session. In that order as
// both were on the same server. // both were on the same server.
if err := client3.RunUntilLeft(ctx, hello1.Hello); err != nil { if err := client3.RunUntilLeft(ctx, hello1.Hello); err != nil {
t.Error(err) t.Error(err)
} }
if err := client3.RunUntilJoined(ctx, hello2.Hello); err != nil { if err := client3.RunUntilJoined(ctx, hello2.Hello); err != nil {
t.Error(err) t.Error(err)
}
} else {
// In the clustered case, the new connection will receive a "leave" event
// due to the asynchronous events.
if err := client2.RunUntilLeft(ctx, hello1.Hello); err != nil {
t.Error(err)
}
// The permanently connected client will first a "joined" event from the new
// session (on the same server) and a "left" from the session on the remote
// server (asynchronously).
if err := client3.RunUntilJoined(ctx, hello2.Hello); err != nil {
t.Error(err)
}
if err := client3.RunUntilLeft(ctx, hello1.Hello); err != nil {
t.Error(err)
}
} }
} }

View file

@ -35,5 +35,5 @@ type RoomSessions interface {
DeleteRoomSession(session Session) DeleteRoomSession(session Session)
GetSessionId(roomSessionId string) (string, error) GetSessionId(roomSessionId string) (string, error)
LookupSessionId(ctx context.Context, roomSessionId string) (string, error) LookupSessionId(ctx context.Context, roomSessionId string, disconnectReason string) (string, error)
} }

View file

@ -87,7 +87,7 @@ func (r *BuiltinRoomSessions) GetSessionId(roomSessionId string) (string, error)
return sid, nil return sid, nil
} }
func (r *BuiltinRoomSessions) LookupSessionId(ctx context.Context, roomSessionId string) (string, error) { func (r *BuiltinRoomSessions) LookupSessionId(ctx context.Context, roomSessionId string, disconnectReason string) (string, error) {
sid, err := r.GetSessionId(roomSessionId) sid, err := r.GetSessionId(roomSessionId)
if err == nil { if err == nil {
return sid, nil return sid, nil
@ -112,7 +112,7 @@ func (r *BuiltinRoomSessions) LookupSessionId(ctx context.Context, roomSessionId
go func(client *GrpcClient) { go func(client *GrpcClient) {
defer wg.Done() defer wg.Done()
sid, err := client.LookupSessionId(lookupctx, roomSessionId) sid, err := client.LookupSessionId(lookupctx, roomSessionId, disconnectReason)
if errors.Is(err, context.Canceled) { if errors.Is(err, context.Canceled) {
return return
} else if err != nil { } else if err != nil {