From a256789f20d4fbfc9f06bd09b404b0908b94c7cd Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 24 Jul 2024 10:23:10 +0200 Subject: [PATCH] Support reconnecting the internal federated connection. --- api_signaling.go | 5 + api_signaling_easyjson.go | 15 ++ clientsession.go | 2 +- docs/standalone-signaling-api-v1.md | 38 ++++ federation.go | 318 +++++++++++++++++++++------- federation_test.go | 248 ++++++++++++++++++++++ 6 files changed, 546 insertions(+), 80 deletions(-) diff --git a/api_signaling.go b/api_signaling.go index 2c211e3..5434e50 100644 --- a/api_signaling.go +++ b/api_signaling.go @@ -48,6 +48,10 @@ var ( ErrInvalidSdp = NewError("invalid_sdp", "Payload does not contain a valid SDP.") ) +func makePtr[T any](v T) *T { + return &v +} + // ClientMessage is a message that is sent from a client to the server. type ClientMessage struct { json.Marshaler @@ -1024,6 +1028,7 @@ type EventServerMessage struct { Leave []string `json:"leave,omitempty"` Change []*EventServerMessageSessionEntry `json:"change,omitempty"` SwitchTo *EventServerMessageSwitchTo `json:"switchto,omitempty"` + Resumed *bool `json:"resumed,omitempty"` // Used for target "roomlist" / "participants" Invite *RoomEventServerMessage `json:"invite,omitempty"` diff --git a/api_signaling_easyjson.go b/api_signaling_easyjson.go index 62f83d3..b8264bb 100644 --- a/api_signaling_easyjson.go +++ b/api_signaling_easyjson.go @@ -4129,6 +4129,16 @@ func easyjson29f189fbDecodeGithubComStrukturagNextcloudSpreedSignaling36(in *jle } (*out.SwitchTo).UnmarshalEasyJSON(in) } + case "resumed": + if in.IsNull() { + in.Skip() + out.Resumed = nil + } else { + if out.Resumed == nil { + out.Resumed = new(bool) + } + *out.Resumed = bool(in.Bool()) + } case "invite": if in.IsNull() { in.Skip() @@ -4258,6 +4268,11 @@ func easyjson29f189fbEncodeGithubComStrukturagNextcloudSpreedSignaling36(out *jw out.RawString(prefix) (*in.SwitchTo).MarshalEasyJSON(out) } + if in.Resumed != nil { + const prefix string = ",\"resumed\":" + out.RawString(prefix) + out.Bool(bool(*in.Resumed)) + } if in.Invite != nil { const prefix string = ",\"invite\":" out.RawString(prefix) diff --git a/clientsession.go b/clientsession.go index b7a9095..5e2fe4f 100644 --- a/clientsession.go +++ b/clientsession.go @@ -348,7 +348,7 @@ func (s *ClientSession) SetFederationClient(federation *FederationClient) { s.doLeaveRoom(true) s.onRoomSet(federation != nil) - if prev := s.federation.Swap(federation); prev != nil { + if prev := s.federation.Swap(federation); prev != nil && prev != federation { prev.Close() } } diff --git a/docs/standalone-signaling-api-v1.md b/docs/standalone-signaling-api-v1.md index 62e190f..2fe9b49 100644 --- a/docs/standalone-signaling-api-v1.md +++ b/docs/standalone-signaling-api-v1.md @@ -576,6 +576,44 @@ Message format (Server -> Client): Also the error codes from joining a regular room could be returned. +### Events + +The signaling server tries to resume the internal proxy session if the +connection to the remote server gets interrupted. To notify clients about these +interruptions, two additional events may be sent from the server to the client: + +Connection was interrupted (Server -> Client): + + { + "type": "event", + "event": { + "target": "room", + "type": "federation_interrupted" + } + } + + +Connection was resumed (Server -> Client): + + { + "type": "event", + "event": { + "target": "room", + "type": "federation_resumed", + "resumed": true + } + } + +The `resumed` flag will be `true` if the existing internal session could be +resumed (i.e. the client stayed in the remote room), or `false` if a new +internal session was created. + +If a new internal session was created, the client will receive another `room` +event for the joined room and `join` events for the different participants in +the room. This should be handled the same as if the direct session could not +be resumed on reconnect. + + ## Leave room To leave a room, a [join room](#join-room) message must be sent with an empty diff --git a/federation.go b/federation.go index dff30db..3625dfd 100644 --- a/federation.go +++ b/federation.go @@ -39,11 +39,17 @@ import ( easyjson "github.com/mailru/easyjson" ) +const ( + initialFederationReconnectInterval = 100 * time.Millisecond + maxFederationReconnectInterval = 8 * time.Second +) + var ( ErrFederationNotSupported = NewError("federation_unsupported", "The target server does not support federation.") ) type FederationClient struct { + hub *Hub session *ClientSession message atomic.Pointer[ClientMessage] @@ -53,31 +59,39 @@ type FederationClient struct { roomSessionId string federation *RoomFederationMessage - mu sync.Mutex - conn *websocket.Conn - closer *Closer + mu sync.Mutex + dialer *websocket.Dialer + url string + conn *websocket.Conn + closer *Closer + reconnectDelay time.Duration + reconnecting bool + reconnectFunc *time.Timer helloMu sync.Mutex helloMsgId string helloAuth *FederationAuthParams + resumeId string hello atomic.Pointer[HelloServerMessage] + pendingMessages []*ClientMessage + closeOnLeave atomic.Bool } func NewFederationClient(ctx context.Context, hub *Hub, session *ClientSession, message *ClientMessage) (*FederationClient, error) { - if message.Type != "room" || message.Room == nil { - return nil, fmt.Errorf("expected room message, got %+v", message) + if message.Type != "room" || message.Room == nil || message.Room.Federation == nil { + return nil, fmt.Errorf("expected federation room message, got %+v", message) } var dialer websocket.Dialer - - room := message.Room if hub.skipFederationVerify { dialer.TLSClientConfig = &tls.Config{ InsecureSkipVerify: true, } } + + room := message.Room u := *room.Federation.parsedSignalingUrl switch u.Scheme { case "http": @@ -85,27 +99,7 @@ func NewFederationClient(ctx context.Context, hub *Hub, session *ClientSession, case "https": u.Scheme = "wss" } - conn, response, err := dialer.DialContext(ctx, u.String()+"spreed", nil) - if err != nil { - return nil, err - } - - features := strings.Split(response.Header.Get("X-Spreed-Signaling-Features"), ",") - supportsFederation := false - for _, f := range features { - f = strings.TrimSpace(f) - if f == ServerFeatureFederation { - supportsFederation = true - break - } - } - if !supportsFederation { - if err := conn.Close(); err != nil { - log.Printf("Error closing federation connection to %s: %s", room.Federation.parsedSignalingUrl.String(), err) - } - - return nil, ErrFederationNotSupported - } + url := u.String() + "spreed" remoteRoomId := room.Federation.RoomId if remoteRoomId == "" { @@ -113,6 +107,7 @@ func NewFederationClient(ctx context.Context, hub *Hub, session *ClientSession, } result := &FederationClient{ + hub: hub, session: session, roomId: room.RoomId, @@ -121,18 +116,17 @@ func NewFederationClient(ctx context.Context, hub *Hub, session *ClientSession, roomSessionId: room.SessionId, federation: room.Federation, - conn: conn, + reconnectDelay: initialFederationReconnectInterval, + + dialer: &dialer, + url: url, closer: NewCloser(), } result.message.Store(message) - log.Printf("Creating federation connection to %s for %s", result.URL(), result.session.PublicId()) - go func() { - hub.readPumpActive.Add(1) - defer hub.readPumpActive.Add(-1) - - result.readPump() - }() + if err := result.connect(ctx); err != nil { + return nil, err + } go func() { hub.writePumpActive.Add(1) @@ -148,6 +142,52 @@ func (c *FederationClient) URL() string { return c.federation.parsedSignalingUrl.String() } +func (c *FederationClient) connect(ctx context.Context) error { + log.Printf("Creating federation connection to %s for %s", c.URL(), c.session.PublicId()) + conn, response, err := c.dialer.DialContext(ctx, c.url, nil) + if err != nil { + return err + } + + features := strings.Split(response.Header.Get("X-Spreed-Signaling-Features"), ",") + supportsFederation := false + for _, f := range features { + f = strings.TrimSpace(f) + if f == ServerFeatureFederation { + supportsFederation = true + break + } + } + if !supportsFederation { + if err := conn.Close(); err != nil { + log.Printf("Error closing federation connection to %s: %s", c.URL(), err) + } + + return ErrFederationNotSupported + } + + log.Printf("Federation connection established to %s for %s", c.URL(), c.session.PublicId()) + + c.mu.Lock() + defer c.mu.Unlock() + + if c.reconnectFunc != nil { + c.reconnectFunc.Stop() + c.reconnectFunc = nil + } + + c.conn = conn + + go func() { + c.hub.readPumpActive.Add(1) + defer c.hub.readPumpActive.Add(-1) + + c.readPump(conn) + }() + + return nil +} + func (c *FederationClient) Leave(message *ClientMessage) error { c.mu.Lock() defer c.mu.Unlock() @@ -171,16 +211,24 @@ func (c *FederationClient) Leave(message *ClientMessage) error { func (c *FederationClient) Close() { c.closer.Close() + c.mu.Lock() defer c.mu.Unlock() + + c.closeConnection(true) +} + +func (c *FederationClient) closeConnection(withBye bool) { if c.conn == nil { return } - if err := c.sendMessageLocked(&ClientMessage{ - Type: "bye", - }); err != nil && !errors.Is(err, websocket.ErrCloseSent) { - log.Printf("Error sending bye on federation connection to %s: %s", c.URL(), err) + if withBye { + if err := c.sendMessageLocked(&ClientMessage{ + Type: "bye", + }); err != nil && !errors.Is(err, websocket.ErrCloseSent) { + log.Printf("Error sending bye on federation connection to %s: %s", c.URL(), err) + } } closeMessage := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") @@ -196,19 +244,58 @@ func (c *FederationClient) Close() { c.conn = nil } -func (c *FederationClient) readPump() { - defer func() { - c.Close() - }() - +func (c *FederationClient) resetReconnect() { c.mu.Lock() - conn := c.conn - c.mu.Unlock() - if conn == nil { - log.Printf("Connection to %s closed while starting readPump", c.URL()) + defer c.mu.Unlock() + c.reconnectDelay = initialFederationReconnectInterval +} + +func (c *FederationClient) scheduleReconnect() { + c.mu.Lock() + defer c.mu.Unlock() + + c.scheduleReconnectLocked() +} + +func (c *FederationClient) scheduleReconnectLocked() { + c.reconnecting = true + if c.hello.Swap(nil) != nil { + c.session.SendMessage(&ServerMessage{ + Type: "event", + Event: &EventServerMessage{ + Target: "room", + Type: "federation_interrupted", + }, + }) + } + c.closeConnection(false) + + if c.reconnectFunc != nil { + c.reconnectFunc.Stop() + } + c.reconnectFunc = time.AfterFunc(c.reconnectDelay, c.reconnect) + c.reconnectDelay *= 2 + if c.reconnectDelay > maxFederationReconnectInterval { + c.reconnectDelay = maxFederationReconnectInterval + } +} + +func (c *FederationClient) reconnect() { + if c.closer.IsClosed() { return } + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.hub.federationTimeout)) + defer cancel() + + if err := c.connect(ctx); err != nil { + log.Printf("Error connecting to federation server %s for %s: %s", c.URL(), c.session.PublicId(), err) + c.scheduleReconnect() + return + } +} + +func (c *FederationClient) readPump(conn *websocket.Conn) { conn.SetReadLimit(maxMessageSize) conn.SetPongHandler(func(msg string) error { now := time.Now() @@ -222,12 +309,15 @@ func (c *FederationClient) readPump() { if err != nil { // Gorilla websocket hides the original net.Error, so also compare error messages if c.closer.IsClosed() && (errors.Is(err, net.ErrClosed) || errors.Is(err, websocket.ErrCloseSent) || strings.Contains(err.Error(), net.ErrClosed.Error())) { - break - } else if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) { + // Connection closed locally, no need to reconnect. break } - log.Printf("Error reading: %s", err) + if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) { + log.Printf("Error reading from %s for %s: %s", c.URL(), c.session.PublicId(), err) + } + + c.scheduleReconnect() break } @@ -255,22 +345,20 @@ func (c *FederationClient) readPump() { } } -func (c *FederationClient) sendPing() bool { +func (c *FederationClient) sendPing() { c.mu.Lock() defer c.mu.Unlock() if c.conn == nil { - return false + return } now := time.Now().UnixNano() msg := strconv.FormatInt(now, 10) c.conn.SetWriteDeadline(time.Now().Add(writeWait)) // nolint if err := c.conn.WriteMessage(websocket.PingMessage, []byte(msg)); err != nil { - log.Printf("Could not send ping to federated client %s: %v", c.session.PublicId(), err) - return false + log.Printf("Could not send ping to federated client %s for %s: %v", c.URL(), c.session.PublicId(), err) + c.scheduleReconnectLocked() } - - return true } func (c *FederationClient) writePump() { @@ -280,9 +368,7 @@ func (c *FederationClient) writePump() { for { select { case <-ticker.C: - if !c.sendPing() { - return - } + c.sendPing() case <-c.closer.C: return } @@ -324,18 +410,23 @@ func (c *FederationClient) sendHelloLocked(auth *FederationAuthParams) error { } c.helloAuth = auth - return c.SendMessage(&ClientMessage{ + msg := &ClientMessage{ Id: c.helloMsgId, Type: "hello", Hello: &HelloClientMessage{ Version: HelloVersionV2, - Auth: &HelloClientMessageAuth{ - Type: HelloClientTypeFederation, - Url: c.federation.NextcloudUrl, - Params: authData, - }, }, - }) + } + if resumeId := c.resumeId; resumeId != "" { + msg.Hello.ResumeId = resumeId + } else { + msg.Hello.Auth = &HelloClientMessageAuth{ + Type: HelloClientTypeFederation, + Url: c.federation.NextcloudUrl, + Params: authData, + } + } + return c.SendMessage(msg) } func (c *FederationClient) processWelcome(msg *ServerMessage) { @@ -354,6 +445,8 @@ func (c *FederationClient) processWelcome(msg *ServerMessage) { } func (c *FederationClient) processHello(msg *ServerMessage) { + c.resetReconnect() + c.helloMu.Lock() defer c.helloMu.Unlock() @@ -367,10 +460,22 @@ func (c *FederationClient) processHello(msg *ServerMessage) { c.helloMsgId = "" if msg.Type == "error" { - c.closeWithError(msg.Error) + switch msg.Error.Code { + case "no_such_session": + // Resume failed (e.g. remote has restarted), try to connect new session + // which may fail if the auth token has expired in the meantime. + c.resumeId = "" + c.pendingMessages = nil + if err := c.sendHelloLocked(c.helloAuth); err != nil { + c.closeWithError(err) + } + default: + log.Printf("Received hello error from federated client for %s to %s: %+v", c.session.PublicId(), c.URL(), msg) + c.closeWithError(msg.Error) + } return } else if msg.Type != "hello" { - log.Printf("Received unknown hello response %+v", msg) + log.Printf("Received unknown hello response from federated client for %s to %s: %+v", c.session.PublicId(), c.URL(), msg) if err := c.sendHelloLocked(c.helloAuth); err != nil { c.closeWithError(err) } @@ -378,8 +483,53 @@ func (c *FederationClient) processHello(msg *ServerMessage) { } c.hello.Store(msg.Hello) - if err := c.joinRoom(); err != nil { - c.closeWithError(err) + if c.resumeId == "" { + c.resumeId = msg.Hello.ResumeId + if c.reconnecting { + c.session.SendMessage(&ServerMessage{ + Type: "event", + Event: &EventServerMessage{ + Target: "room", + Type: "federation_resumed", + Resumed: makePtr(false), + }, + }) + // Setting the federation client will reset any information on previously + // received "join" events. + c.session.SetFederationClient(c) + } + + if err := c.joinRoom(); err != nil { + c.closeWithError(err) + } + } else { + c.session.SendMessage(&ServerMessage{ + Type: "event", + Event: &EventServerMessage{ + Target: "room", + Type: "federation_resumed", + Resumed: makePtr(true), + }, + }) + + if count := len(c.pendingMessages); count > 0 { + messages := c.pendingMessages + c.pendingMessages = nil + + log.Printf("Sending %d pending messages to %s for %s", count, c.URL(), c.session.PublicId()) + + c.helloMu.Unlock() + defer c.helloMu.Lock() + + c.mu.Lock() + defer c.mu.Unlock() + for _, msg := range messages { + if err := c.sendMessageLocked(msg); err != nil { + log.Printf("Error sending pending message %+v on federation connection to %s: %s", msg, c.URL(), err) + break + } + } + } } } @@ -562,9 +712,23 @@ func (c *FederationClient) SendMessage(message *ClientMessage) error { return c.sendMessageLocked(message) } +func (c *FederationClient) deferMessage(message *ClientMessage) { + c.helloMu.Lock() + defer c.helloMu.Unlock() + if c.resumeId == "" { + return + } + + c.pendingMessages = append(c.pendingMessages, message) + if len(c.pendingMessages) >= warnPendingMessagesCount { + log.Printf("Session %s has %d pending federated messages", c.session.PublicId(), len(c.pendingMessages)) + } +} + func (c *FederationClient) sendMessageLocked(message *ClientMessage) error { if c.conn == nil { - return ErrNotConnected + c.deferMessage(message) + return nil } c.conn.SetWriteDeadline(time.Now().Add(writeWait)) // nolint @@ -586,12 +750,8 @@ func (c *FederationClient) sendMessageLocked(message *ClientMessage) error { } log.Printf("Could not send message %+v for %s to federated client %s: %v", message, c.session.PublicId(), c.URL(), err) - closeData := websocket.FormatCloseMessage(websocket.CloseInternalServerErr, "") - c.conn.SetWriteDeadline(time.Now().Add(writeWait)) // nolint - if err := c.conn.WriteMessage(websocket.CloseMessage, closeData); err != nil { - log.Printf("Could not send close message for %s to federated client %s: %v", c.session.PublicId(), c.URL(), err) - } - return err + c.deferMessage(message) + c.scheduleReconnectLocked() } return nil diff --git a/federation_test.go b/federation_test.go index c5465b2..ba00fda 100644 --- a/federation_test.go +++ b/federation_test.go @@ -472,3 +472,251 @@ func Test_FederationMedia(t *testing.T) { UserId: hello2.Hello.UserId, })) } + +func Test_FederationResume(t *testing.T) { + CatchLogForTest(t) + + assert := assert.New(t) + require := require.New(t) + + hub1, hub2, server1, server2 := CreateClusteredHubsForTest(t) + + client1 := NewTestClient(t, server1, hub1) + defer client1.CloseWithBye() + require.NoError(client1.SendHelloV2(testDefaultUserId + "1")) + + client2 := NewTestClient(t, server2, hub2) + defer client2.CloseWithBye() + require.NoError(client2.SendHelloV2(testDefaultUserId + "2")) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + hello1, err := client1.RunUntilHello(ctx) + require.NoError(err) + + hello2, err := client2.RunUntilHello(ctx) + require.NoError(err) + + roomId := "test-room" + federatedRoomId := roomId + "@federated" + room1, err := client1.JoinRoom(ctx, roomId) + require.NoError(err) + require.Equal(roomId, room1.Room.RoomId) + + assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello)) + + now := time.Now() + token, err := client1.CreateHelloV2Token(testDefaultUserId+"2", now, now.Add(time.Minute)) + require.NoError(err) + + msg := &ClientMessage{ + Id: "join-room-fed", + Type: "room", + Room: &RoomClientMessage{ + RoomId: federatedRoomId, + SessionId: federatedRoomId + "-" + hello2.Hello.SessionId, + Federation: &RoomFederationMessage{ + SignalingUrl: server1.URL, + NextcloudUrl: server1.URL, + RoomId: roomId, + Token: token, + }, + }, + } + require.NoError(client2.WriteJSON(msg)) + + if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) { + assert.Equal(msg.Id, message.Id) + require.Equal("room", message.Type) + require.Equal(federatedRoomId, message.Room.RoomId) + } + + // The client1 will see the remote session id for client2. + var remoteSessionId string + if message, err := client1.RunUntilMessage(ctx); assert.NoError(err) { + assert.NoError(client1.checkSingleMessageJoined(message)) + evt := message.Event.Join[0] + remoteSessionId = evt.SessionId + assert.NotEqual(hello2.Hello.SessionId, remoteSessionId) + assert.Equal(testDefaultUserId+"2", evt.UserId) + } + + // The client2 will see its own session id, not the one from the remote server. + assert.NoError(client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello)) + + session2 := hub2.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession) + fed2 := session2.GetFederationClient() + require.NotNil(fed2) + fed2.mu.Lock() + err = fed2.conn.Close() + + data2 := "from-2-to-1" + assert.NoError(client2.SendMessage(MessageClientMessageRecipient{ + Type: "session", + SessionId: hello1.Hello.SessionId, + }, data2)) + fed2.mu.Unlock() + assert.NoError(err) + + if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) { + assert.Equal("event", message.Type) + assert.Equal("room", message.Event.Target) + assert.Equal("federation_interrupted", message.Event.Type) + } + + if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) { + assert.Equal("event", message.Type) + assert.Equal("room", message.Event.Target) + assert.Equal("federation_resumed", message.Event.Type) + assert.NotNil(message.Event.Resumed) + assert.True(*message.Event.Resumed) + } + + ctx1, cancel1 := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel1() + + var payload string + if assert.NoError(checkReceiveClientMessage(ctx, client1, "session", &HelloServerMessage{ + SessionId: remoteSessionId, + UserId: testDefaultUserId + "2", + }, &payload)) { + assert.Equal(data2, payload) + } + + if message, err := client1.RunUntilMessage(ctx1); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded { + assert.NoError(err) + } else { + assert.Nil(message) + } + + ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel2() + + if message, err := client2.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded { + assert.NoError(err) + } else { + assert.Nil(message) + } +} + +func Test_FederationResumeNewSession(t *testing.T) { + CatchLogForTest(t) + + assert := assert.New(t) + require := require.New(t) + + hub1, hub2, server1, server2 := CreateClusteredHubsForTest(t) + + client1 := NewTestClient(t, server1, hub1) + defer client1.CloseWithBye() + require.NoError(client1.SendHelloV2(testDefaultUserId + "1")) + + client2 := NewTestClient(t, server2, hub2) + defer client2.CloseWithBye() + require.NoError(client2.SendHelloV2(testDefaultUserId + "2")) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + hello1, err := client1.RunUntilHello(ctx) + require.NoError(err) + + hello2, err := client2.RunUntilHello(ctx) + require.NoError(err) + + roomId := "test-room" + federatedRoomId := roomId + "@federated" + room1, err := client1.JoinRoom(ctx, roomId) + require.NoError(err) + require.Equal(roomId, room1.Room.RoomId) + + assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello)) + + now := time.Now() + token, err := client1.CreateHelloV2Token(testDefaultUserId+"2", now, now.Add(time.Minute)) + require.NoError(err) + + msg := &ClientMessage{ + Id: "join-room-fed", + Type: "room", + Room: &RoomClientMessage{ + RoomId: federatedRoomId, + SessionId: federatedRoomId + "-" + hello2.Hello.SessionId, + Federation: &RoomFederationMessage{ + SignalingUrl: server1.URL, + NextcloudUrl: server1.URL, + RoomId: roomId, + Token: token, + }, + }, + } + require.NoError(client2.WriteJSON(msg)) + + if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) { + assert.Equal(msg.Id, message.Id) + require.Equal("room", message.Type) + require.Equal(federatedRoomId, message.Room.RoomId) + } + + // The client1 will see the remote session id for client2. + var remoteSessionId string + if message, err := client1.RunUntilMessage(ctx); assert.NoError(err) { + assert.NoError(client1.checkSingleMessageJoined(message)) + evt := message.Event.Join[0] + remoteSessionId = evt.SessionId + assert.NotEqual(hello2.Hello.SessionId, remoteSessionId) + assert.Equal(hello2.Hello.UserId, evt.UserId) + } + + // The client2 will see its own session id, not the one from the remote server. + assert.NoError(client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello)) + + remoteSession2 := hub1.GetSessionByPublicId(remoteSessionId).(*ClientSession) + // Simulate disconnected federated client with an expired session. + if client := remoteSession2.GetClient(); client != nil { + remoteSession2.ClearClient(client) + client.Close() + } + remoteSession2.Close() + + if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) { + assert.Equal("event", message.Type) + assert.Equal("room", message.Event.Target) + assert.Equal("federation_interrupted", message.Event.Type) + } + + if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) { + assert.Equal("event", message.Type) + assert.Equal("room", message.Event.Target) + assert.Equal("federation_resumed", message.Event.Type) + assert.NotNil(message.Event.Resumed) + assert.False(*message.Event.Resumed) + } + + // Client1 will get a "leave" for the expired session and a "join" with the + // new remote session id. + assert.NoError(client1.RunUntilLeft(ctx, &HelloServerMessage{ + SessionId: remoteSessionId, + UserId: hello2.Hello.UserId, + })) + if message, err := client1.RunUntilMessage(ctx); assert.NoError(err) { + assert.NoError(client1.checkSingleMessageJoined(message)) + evt := message.Event.Join[0] + assert.NotEqual(remoteSessionId, evt.SessionId) + assert.NotEqual(hello2.Hello.SessionId, remoteSessionId) + remoteSessionId = evt.SessionId + assert.NotEqual(hello2.Hello.SessionId, remoteSessionId) + assert.Equal(hello2.Hello.UserId, evt.UserId) + } + + // client2 will join the room again after the reconnect with the new + // session and get "joined" events for all sessions in the room (including + // its own). + if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) { + assert.Equal("", message.Id) + require.Equal("room", message.Type) + require.Equal(federatedRoomId, message.Room.RoomId) + } + assert.NoError(client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello)) +}