Disconnect sessions with the same room session id synchronously.

Previously there was a difference in the events being received by other
sessions for the clustered case compared to the single-server case.
Now the events / ordering are the same for both cases.
This commit is contained in:
Joachim Bauch 2022-07-11 10:39:06 +02:00
parent 3de149c7ae
commit 95d5e50705
No known key found for this signature in database
GPG Key ID: 77C1D22D53E15F02
8 changed files with 41 additions and 42 deletions

View File

@ -392,7 +392,7 @@ func (b *BackendServer) lookupByRoomSessionId(ctx context.Context, roomSessionId
}
}
sid, err := b.roomSessions.LookupSessionId(ctx, roomSessionId)
sid, err := b.roomSessions.LookupSessionId(ctx, roomSessionId, "")
if err == ErrNoSuchRoomSession {
return "", nil
} else if err != nil {

View File

@ -186,12 +186,13 @@ func (c *GrpcClient) GetServerId(ctx context.Context) (string, error) {
return response.GetServerId(), nil
}
func (c *GrpcClient) LookupSessionId(ctx context.Context, roomSessionId string) (string, error) {
func (c *GrpcClient) LookupSessionId(ctx context.Context, roomSessionId string, disconnectReason string) (string, error) {
statsGrpcClientCalls.WithLabelValues("LookupSessionId").Inc()
// TODO: Remove debug logging
log.Printf("Lookup room session %s on %s", roomSessionId, c.Target())
response, err := c.impl.LookupSessionId(ctx, &LookupSessionIdRequest{
RoomSessionId: roomSessionId,
RoomSessionId: roomSessionId,
DisconnectReason: disconnectReason,
}, grpc.WaitForReady(true))
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
return "", ErrNoSuchRoomSession

View File

@ -115,6 +115,19 @@ func (s *GrpcServer) LookupSessionId(ctx context.Context, request *LookupSession
return nil, err
}
if sid != "" && request.DisconnectReason != "" {
if session := s.hub.GetSessionByPublicId(sid); session != nil {
log.Printf("Closing session %s because same room session %s connected", session.PublicId(), request.RoomSessionId)
session.LeaveRoom(false)
switch sess := session.(type) {
case *ClientSession:
if client := sess.GetClient(); client != nil {
client.SendByeResponseWithReason(nil, "room_session_reconnected")
}
}
session.Close()
}
}
return &LookupSessionIdReply{
SessionId: sid,
}, nil

View File

@ -32,6 +32,8 @@ service RpcSessions {
message LookupSessionIdRequest {
string roomSessionId = 1;
// Optional: set if the session should be disconnected with a given reason.
string disconnectReason = 2;
}
message LookupSessionIdReply {

5
hub.go
View File

@ -1001,7 +1001,7 @@ func (h *Hub) processHelloInternal(client *Client, message *ClientMessage) {
}
func (h *Hub) disconnectByRoomSessionId(ctx context.Context, roomSessionId string, backend *Backend) {
sessionId, err := h.roomSessions.LookupSessionId(ctx, roomSessionId)
sessionId, err := h.roomSessions.LookupSessionId(ctx, roomSessionId, "room_session_reconnected")
if err == ErrNoSuchRoomSession {
return
} else if err != nil {
@ -1011,7 +1011,8 @@ func (h *Hub) disconnectByRoomSessionId(ctx context.Context, roomSessionId strin
session := h.GetSessionByPublicId(sessionId)
if session == nil {
// Session is located on a different server.
// Session is located on a different server. Should already have been closed
// but send "bye" again as additional safeguard.
msg := &AsyncMessage{
Type: "message",
Message: &ServerMessage{

View File

@ -3016,42 +3016,24 @@ func RunTestClientTakeoverRoomSession(t *testing.T) {
t.Error(err)
}
if isLocalTest(t) {
// No message about the closing is sent to the new connection.
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel2()
// No message about the closing is sent to the new connection.
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel2()
if message, err := client2.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded {
t.Error(err)
} else if message != nil {
t.Errorf("Expected no message, got %+v", message)
}
if message, err := client2.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded {
t.Error(err)
} else if message != nil {
t.Errorf("Expected no message, got %+v", message)
}
// The permanently connected client will receive a "left" event from the
// overridden session and a "joined" for the new session. In that order as
// both were on the same server.
if err := client3.RunUntilLeft(ctx, hello1.Hello); err != nil {
t.Error(err)
}
if err := client3.RunUntilJoined(ctx, hello2.Hello); err != nil {
t.Error(err)
}
} else {
// In the clustered case, the new connection will receive a "leave" event
// due to the asynchronous events.
if err := client2.RunUntilLeft(ctx, hello1.Hello); err != nil {
t.Error(err)
}
// The permanently connected client will first a "joined" event from the new
// session (on the same server) and a "left" from the session on the remote
// server (asynchronously).
if err := client3.RunUntilJoined(ctx, hello2.Hello); err != nil {
t.Error(err)
}
if err := client3.RunUntilLeft(ctx, hello1.Hello); err != nil {
t.Error(err)
}
// The permanently connected client will receive a "left" event from the
// overridden session and a "joined" for the new session. In that order as
// both were on the same server.
if err := client3.RunUntilLeft(ctx, hello1.Hello); err != nil {
t.Error(err)
}
if err := client3.RunUntilJoined(ctx, hello2.Hello); err != nil {
t.Error(err)
}
}

View File

@ -35,5 +35,5 @@ type RoomSessions interface {
DeleteRoomSession(session Session)
GetSessionId(roomSessionId string) (string, error)
LookupSessionId(ctx context.Context, roomSessionId string) (string, error)
LookupSessionId(ctx context.Context, roomSessionId string, disconnectReason string) (string, error)
}

View File

@ -87,7 +87,7 @@ func (r *BuiltinRoomSessions) GetSessionId(roomSessionId string) (string, error)
return sid, nil
}
func (r *BuiltinRoomSessions) LookupSessionId(ctx context.Context, roomSessionId string) (string, error) {
func (r *BuiltinRoomSessions) LookupSessionId(ctx context.Context, roomSessionId string, disconnectReason string) (string, error) {
sid, err := r.GetSessionId(roomSessionId)
if err == nil {
return sid, nil
@ -112,7 +112,7 @@ func (r *BuiltinRoomSessions) LookupSessionId(ctx context.Context, roomSessionId
go func(client *GrpcClient) {
defer wg.Done()
sid, err := client.LookupSessionId(lookupctx, roomSessionId)
sid, err := client.LookupSessionId(lookupctx, roomSessionId, disconnectReason)
if errors.Is(err, context.Canceled) {
return
} else if err != nil {