From 95d5e50705f47058e8d4b83ba4b21119b266e675 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Mon, 11 Jul 2022 10:39:06 +0200 Subject: [PATCH] Disconnect sessions with the same room session id synchronously. Previously there was a difference in the events being received by other sessions for the clustered case compared to the single-server case. Now the events / ordering are the same for both cases. --- backend_server.go | 2 +- grpc_client.go | 5 +++-- grpc_server.go | 13 +++++++++++ grpc_sessions.proto | 2 ++ hub.go | 5 +++-- hub_test.go | 50 +++++++++++++---------------------------- roomsessions.go | 2 +- roomsessions_builtin.go | 4 ++-- 8 files changed, 41 insertions(+), 42 deletions(-) diff --git a/backend_server.go b/backend_server.go index 9998c33..d5072e3 100644 --- a/backend_server.go +++ b/backend_server.go @@ -392,7 +392,7 @@ func (b *BackendServer) lookupByRoomSessionId(ctx context.Context, roomSessionId } } - sid, err := b.roomSessions.LookupSessionId(ctx, roomSessionId) + sid, err := b.roomSessions.LookupSessionId(ctx, roomSessionId, "") if err == ErrNoSuchRoomSession { return "", nil } else if err != nil { diff --git a/grpc_client.go b/grpc_client.go index f68222e..f31b7db 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -186,12 +186,13 @@ func (c *GrpcClient) GetServerId(ctx context.Context) (string, error) { return response.GetServerId(), nil } -func (c *GrpcClient) LookupSessionId(ctx context.Context, roomSessionId string) (string, error) { +func (c *GrpcClient) LookupSessionId(ctx context.Context, roomSessionId string, disconnectReason string) (string, error) { statsGrpcClientCalls.WithLabelValues("LookupSessionId").Inc() // TODO: Remove debug logging log.Printf("Lookup room session %s on %s", roomSessionId, c.Target()) response, err := c.impl.LookupSessionId(ctx, &LookupSessionIdRequest{ - RoomSessionId: roomSessionId, + RoomSessionId: roomSessionId, + DisconnectReason: disconnectReason, }, grpc.WaitForReady(true)) if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound { return "", ErrNoSuchRoomSession diff --git a/grpc_server.go b/grpc_server.go index ac654f9..2631b19 100644 --- a/grpc_server.go +++ b/grpc_server.go @@ -115,6 +115,19 @@ func (s *GrpcServer) LookupSessionId(ctx context.Context, request *LookupSession return nil, err } + if sid != "" && request.DisconnectReason != "" { + if session := s.hub.GetSessionByPublicId(sid); session != nil { + log.Printf("Closing session %s because same room session %s connected", session.PublicId(), request.RoomSessionId) + session.LeaveRoom(false) + switch sess := session.(type) { + case *ClientSession: + if client := sess.GetClient(); client != nil { + client.SendByeResponseWithReason(nil, "room_session_reconnected") + } + } + session.Close() + } + } return &LookupSessionIdReply{ SessionId: sid, }, nil diff --git a/grpc_sessions.proto b/grpc_sessions.proto index 497a74f..9eef15a 100644 --- a/grpc_sessions.proto +++ b/grpc_sessions.proto @@ -32,6 +32,8 @@ service RpcSessions { message LookupSessionIdRequest { string roomSessionId = 1; + // Optional: set if the session should be disconnected with a given reason. + string disconnectReason = 2; } message LookupSessionIdReply { diff --git a/hub.go b/hub.go index d82a82d..d672886 100644 --- a/hub.go +++ b/hub.go @@ -1001,7 +1001,7 @@ func (h *Hub) processHelloInternal(client *Client, message *ClientMessage) { } func (h *Hub) disconnectByRoomSessionId(ctx context.Context, roomSessionId string, backend *Backend) { - sessionId, err := h.roomSessions.LookupSessionId(ctx, roomSessionId) + sessionId, err := h.roomSessions.LookupSessionId(ctx, roomSessionId, "room_session_reconnected") if err == ErrNoSuchRoomSession { return } else if err != nil { @@ -1011,7 +1011,8 @@ func (h *Hub) disconnectByRoomSessionId(ctx context.Context, roomSessionId strin session := h.GetSessionByPublicId(sessionId) if session == nil { - // Session is located on a different server. + // Session is located on a different server. Should already have been closed + // but send "bye" again as additional safeguard. msg := &AsyncMessage{ Type: "message", Message: &ServerMessage{ diff --git a/hub_test.go b/hub_test.go index f9f41f0..4d49759 100644 --- a/hub_test.go +++ b/hub_test.go @@ -3016,42 +3016,24 @@ func RunTestClientTakeoverRoomSession(t *testing.T) { t.Error(err) } - if isLocalTest(t) { - // No message about the closing is sent to the new connection. - ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel2() + // No message about the closing is sent to the new connection. + ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel2() - if message, err := client2.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded { - t.Error(err) - } else if message != nil { - t.Errorf("Expected no message, got %+v", message) - } + if message, err := client2.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded { + t.Error(err) + } else if message != nil { + t.Errorf("Expected no message, got %+v", message) + } - // The permanently connected client will receive a "left" event from the - // overridden session and a "joined" for the new session. In that order as - // both were on the same server. - if err := client3.RunUntilLeft(ctx, hello1.Hello); err != nil { - t.Error(err) - } - if err := client3.RunUntilJoined(ctx, hello2.Hello); err != nil { - t.Error(err) - } - } else { - // In the clustered case, the new connection will receive a "leave" event - // due to the asynchronous events. - if err := client2.RunUntilLeft(ctx, hello1.Hello); err != nil { - t.Error(err) - } - - // The permanently connected client will first a "joined" event from the new - // session (on the same server) and a "left" from the session on the remote - // server (asynchronously). - if err := client3.RunUntilJoined(ctx, hello2.Hello); err != nil { - t.Error(err) - } - if err := client3.RunUntilLeft(ctx, hello1.Hello); err != nil { - t.Error(err) - } + // The permanently connected client will receive a "left" event from the + // overridden session and a "joined" for the new session. In that order as + // both were on the same server. + if err := client3.RunUntilLeft(ctx, hello1.Hello); err != nil { + t.Error(err) + } + if err := client3.RunUntilJoined(ctx, hello2.Hello); err != nil { + t.Error(err) } } diff --git a/roomsessions.go b/roomsessions.go index cb2c1f1..b984463 100644 --- a/roomsessions.go +++ b/roomsessions.go @@ -35,5 +35,5 @@ type RoomSessions interface { DeleteRoomSession(session Session) GetSessionId(roomSessionId string) (string, error) - LookupSessionId(ctx context.Context, roomSessionId string) (string, error) + LookupSessionId(ctx context.Context, roomSessionId string, disconnectReason string) (string, error) } diff --git a/roomsessions_builtin.go b/roomsessions_builtin.go index 3c85984..f82a6b4 100644 --- a/roomsessions_builtin.go +++ b/roomsessions_builtin.go @@ -87,7 +87,7 @@ func (r *BuiltinRoomSessions) GetSessionId(roomSessionId string) (string, error) return sid, nil } -func (r *BuiltinRoomSessions) LookupSessionId(ctx context.Context, roomSessionId string) (string, error) { +func (r *BuiltinRoomSessions) LookupSessionId(ctx context.Context, roomSessionId string, disconnectReason string) (string, error) { sid, err := r.GetSessionId(roomSessionId) if err == nil { return sid, nil @@ -112,7 +112,7 @@ func (r *BuiltinRoomSessions) LookupSessionId(ctx context.Context, roomSessionId go func(client *GrpcClient) { defer wg.Done() - sid, err := client.LookupSessionId(lookupctx, roomSessionId) + sid, err := client.LookupSessionId(lookupctx, roomSessionId, disconnectReason) if errors.Is(err, context.Canceled) { return } else if err != nil {