Support reconnecting the internal federated connection.

This commit is contained in:
Joachim Bauch 2024-07-24 10:23:10 +02:00
commit a256789f20
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
6 changed files with 545 additions and 79 deletions

View file

@ -48,6 +48,10 @@ var (
ErrInvalidSdp = NewError("invalid_sdp", "Payload does not contain a valid SDP.")
)
func makePtr[T any](v T) *T {
return &v
}
// ClientMessage is a message that is sent from a client to the server.
type ClientMessage struct {
json.Marshaler
@ -1024,6 +1028,7 @@ type EventServerMessage struct {
Leave []string `json:"leave,omitempty"`
Change []*EventServerMessageSessionEntry `json:"change,omitempty"`
SwitchTo *EventServerMessageSwitchTo `json:"switchto,omitempty"`
Resumed *bool `json:"resumed,omitempty"`
// Used for target "roomlist" / "participants"
Invite *RoomEventServerMessage `json:"invite,omitempty"`

View file

@ -4129,6 +4129,16 @@ func easyjson29f189fbDecodeGithubComStrukturagNextcloudSpreedSignaling36(in *jle
}
(*out.SwitchTo).UnmarshalEasyJSON(in)
}
case "resumed":
if in.IsNull() {
in.Skip()
out.Resumed = nil
} else {
if out.Resumed == nil {
out.Resumed = new(bool)
}
*out.Resumed = bool(in.Bool())
}
case "invite":
if in.IsNull() {
in.Skip()
@ -4258,6 +4268,11 @@ func easyjson29f189fbEncodeGithubComStrukturagNextcloudSpreedSignaling36(out *jw
out.RawString(prefix)
(*in.SwitchTo).MarshalEasyJSON(out)
}
if in.Resumed != nil {
const prefix string = ",\"resumed\":"
out.RawString(prefix)
out.Bool(bool(*in.Resumed))
}
if in.Invite != nil {
const prefix string = ",\"invite\":"
out.RawString(prefix)

View file

@ -348,7 +348,7 @@ func (s *ClientSession) SetFederationClient(federation *FederationClient) {
s.doLeaveRoom(true)
s.onRoomSet(federation != nil)
if prev := s.federation.Swap(federation); prev != nil {
if prev := s.federation.Swap(federation); prev != nil && prev != federation {
prev.Close()
}
}

View file

@ -576,6 +576,44 @@ Message format (Server -> Client):
Also the error codes from joining a regular room could be returned.
### Events
The signaling server tries to resume the internal proxy session if the
connection to the remote server gets interrupted. To notify clients about these
interruptions, two additional events may be sent from the server to the client:
Connection was interrupted (Server -> Client):
{
"type": "event",
"event": {
"target": "room",
"type": "federation_interrupted"
}
}
Connection was resumed (Server -> Client):
{
"type": "event",
"event": {
"target": "room",
"type": "federation_resumed",
"resumed": true
}
}
The `resumed` flag will be `true` if the existing internal session could be
resumed (i.e. the client stayed in the remote room), or `false` if a new
internal session was created.
If a new internal session was created, the client will receive another `room`
event for the joined room and `join` events for the different participants in
the room. This should be handled the same as if the direct session could not
be resumed on reconnect.
## Leave room
To leave a room, a [join room](#join-room) message must be sent with an empty

View file

@ -39,11 +39,17 @@ import (
easyjson "github.com/mailru/easyjson"
)
const (
initialFederationReconnectInterval = 100 * time.Millisecond
maxFederationReconnectInterval = 8 * time.Second
)
var (
ErrFederationNotSupported = NewError("federation_unsupported", "The target server does not support federation.")
)
type FederationClient struct {
hub *Hub
session *ClientSession
message atomic.Pointer[ClientMessage]
@ -53,31 +59,39 @@ type FederationClient struct {
roomSessionId string
federation *RoomFederationMessage
mu sync.Mutex
conn *websocket.Conn
closer *Closer
mu sync.Mutex
dialer *websocket.Dialer
url string
conn *websocket.Conn
closer *Closer
reconnectDelay time.Duration
reconnecting bool
reconnectFunc *time.Timer
helloMu sync.Mutex
helloMsgId string
helloAuth *FederationAuthParams
resumeId string
hello atomic.Pointer[HelloServerMessage]
pendingMessages []*ClientMessage
closeOnLeave atomic.Bool
}
func NewFederationClient(ctx context.Context, hub *Hub, session *ClientSession, message *ClientMessage) (*FederationClient, error) {
if message.Type != "room" || message.Room == nil {
return nil, fmt.Errorf("expected room message, got %+v", message)
if message.Type != "room" || message.Room == nil || message.Room.Federation == nil {
return nil, fmt.Errorf("expected federation room message, got %+v", message)
}
var dialer websocket.Dialer
room := message.Room
if hub.skipFederationVerify {
dialer.TLSClientConfig = &tls.Config{
InsecureSkipVerify: true,
}
}
room := message.Room
u := *room.Federation.parsedSignalingUrl
switch u.Scheme {
case "http":
@ -85,27 +99,7 @@ func NewFederationClient(ctx context.Context, hub *Hub, session *ClientSession,
case "https":
u.Scheme = "wss"
}
conn, response, err := dialer.DialContext(ctx, u.String()+"spreed", nil)
if err != nil {
return nil, err
}
features := strings.Split(response.Header.Get("X-Spreed-Signaling-Features"), ",")
supportsFederation := false
for _, f := range features {
f = strings.TrimSpace(f)
if f == ServerFeatureFederation {
supportsFederation = true
break
}
}
if !supportsFederation {
if err := conn.Close(); err != nil {
log.Printf("Error closing federation connection to %s: %s", room.Federation.parsedSignalingUrl.String(), err)
}
return nil, ErrFederationNotSupported
}
url := u.String() + "spreed"
remoteRoomId := room.Federation.RoomId
if remoteRoomId == "" {
@ -113,6 +107,7 @@ func NewFederationClient(ctx context.Context, hub *Hub, session *ClientSession,
}
result := &FederationClient{
hub: hub,
session: session,
roomId: room.RoomId,
@ -121,18 +116,17 @@ func NewFederationClient(ctx context.Context, hub *Hub, session *ClientSession,
roomSessionId: room.SessionId,
federation: room.Federation,
conn: conn,
reconnectDelay: initialFederationReconnectInterval,
dialer: &dialer,
url: url,
closer: NewCloser(),
}
result.message.Store(message)
log.Printf("Creating federation connection to %s for %s", result.URL(), result.session.PublicId())
go func() {
hub.readPumpActive.Add(1)
defer hub.readPumpActive.Add(-1)
result.readPump()
}()
if err := result.connect(ctx); err != nil {
return nil, err
}
go func() {
hub.writePumpActive.Add(1)
@ -148,6 +142,52 @@ func (c *FederationClient) URL() string {
return c.federation.parsedSignalingUrl.String()
}
func (c *FederationClient) connect(ctx context.Context) error {
log.Printf("Creating federation connection to %s for %s", c.URL(), c.session.PublicId())
conn, response, err := c.dialer.DialContext(ctx, c.url, nil)
if err != nil {
return err
}
features := strings.Split(response.Header.Get("X-Spreed-Signaling-Features"), ",")
supportsFederation := false
for _, f := range features {
f = strings.TrimSpace(f)
if f == ServerFeatureFederation {
supportsFederation = true
break
}
}
if !supportsFederation {
if err := conn.Close(); err != nil {
log.Printf("Error closing federation connection to %s: %s", c.URL(), err)
}
return ErrFederationNotSupported
}
log.Printf("Federation connection established to %s for %s", c.URL(), c.session.PublicId())
c.mu.Lock()
defer c.mu.Unlock()
if c.reconnectFunc != nil {
c.reconnectFunc.Stop()
c.reconnectFunc = nil
}
c.conn = conn
go func() {
c.hub.readPumpActive.Add(1)
defer c.hub.readPumpActive.Add(-1)
c.readPump(conn)
}()
return nil
}
func (c *FederationClient) Leave(message *ClientMessage) error {
c.mu.Lock()
defer c.mu.Unlock()
@ -171,16 +211,24 @@ func (c *FederationClient) Leave(message *ClientMessage) error {
func (c *FederationClient) Close() {
c.closer.Close()
c.mu.Lock()
defer c.mu.Unlock()
c.closeConnection(true)
}
func (c *FederationClient) closeConnection(withBye bool) {
if c.conn == nil {
return
}
if err := c.sendMessageLocked(&ClientMessage{
Type: "bye",
}); err != nil && !errors.Is(err, websocket.ErrCloseSent) {
log.Printf("Error sending bye on federation connection to %s: %s", c.URL(), err)
if withBye {
if err := c.sendMessageLocked(&ClientMessage{
Type: "bye",
}); err != nil && !errors.Is(err, websocket.ErrCloseSent) {
log.Printf("Error sending bye on federation connection to %s: %s", c.URL(), err)
}
}
closeMessage := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
@ -196,19 +244,58 @@ func (c *FederationClient) Close() {
c.conn = nil
}
func (c *FederationClient) readPump() {
defer func() {
c.Close()
}()
func (c *FederationClient) resetReconnect() {
c.mu.Lock()
conn := c.conn
c.mu.Unlock()
if conn == nil {
log.Printf("Connection to %s closed while starting readPump", c.URL())
defer c.mu.Unlock()
c.reconnectDelay = initialFederationReconnectInterval
}
func (c *FederationClient) scheduleReconnect() {
c.mu.Lock()
defer c.mu.Unlock()
c.scheduleReconnectLocked()
}
func (c *FederationClient) scheduleReconnectLocked() {
c.reconnecting = true
if c.hello.Swap(nil) != nil {
c.session.SendMessage(&ServerMessage{
Type: "event",
Event: &EventServerMessage{
Target: "room",
Type: "federation_interrupted",
},
})
}
c.closeConnection(false)
if c.reconnectFunc != nil {
c.reconnectFunc.Stop()
}
c.reconnectFunc = time.AfterFunc(c.reconnectDelay, c.reconnect)
c.reconnectDelay *= 2
if c.reconnectDelay > maxFederationReconnectInterval {
c.reconnectDelay = maxFederationReconnectInterval
}
}
func (c *FederationClient) reconnect() {
if c.closer.IsClosed() {
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.hub.federationTimeout))
defer cancel()
if err := c.connect(ctx); err != nil {
log.Printf("Error connecting to federation server %s for %s: %s", c.URL(), c.session.PublicId(), err)
c.scheduleReconnect()
return
}
}
func (c *FederationClient) readPump(conn *websocket.Conn) {
conn.SetReadLimit(maxMessageSize)
conn.SetPongHandler(func(msg string) error {
now := time.Now()
@ -222,12 +309,15 @@ func (c *FederationClient) readPump() {
if err != nil {
// Gorilla websocket hides the original net.Error, so also compare error messages
if c.closer.IsClosed() && (errors.Is(err, net.ErrClosed) || errors.Is(err, websocket.ErrCloseSent) || strings.Contains(err.Error(), net.ErrClosed.Error())) {
break
} else if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
// Connection closed locally, no need to reconnect.
break
}
log.Printf("Error reading: %s", err)
if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
log.Printf("Error reading from %s for %s: %s", c.URL(), c.session.PublicId(), err)
}
c.scheduleReconnect()
break
}
@ -255,22 +345,20 @@ func (c *FederationClient) readPump() {
}
}
func (c *FederationClient) sendPing() bool {
func (c *FederationClient) sendPing() {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn == nil {
return false
return
}
now := time.Now().UnixNano()
msg := strconv.FormatInt(now, 10)
c.conn.SetWriteDeadline(time.Now().Add(writeWait)) // nolint
if err := c.conn.WriteMessage(websocket.PingMessage, []byte(msg)); err != nil {
log.Printf("Could not send ping to federated client %s: %v", c.session.PublicId(), err)
return false
log.Printf("Could not send ping to federated client %s for %s: %v", c.URL(), c.session.PublicId(), err)
c.scheduleReconnectLocked()
}
return true
}
func (c *FederationClient) writePump() {
@ -280,9 +368,7 @@ func (c *FederationClient) writePump() {
for {
select {
case <-ticker.C:
if !c.sendPing() {
return
}
c.sendPing()
case <-c.closer.C:
return
}
@ -324,18 +410,23 @@ func (c *FederationClient) sendHelloLocked(auth *FederationAuthParams) error {
}
c.helloAuth = auth
return c.SendMessage(&ClientMessage{
msg := &ClientMessage{
Id: c.helloMsgId,
Type: "hello",
Hello: &HelloClientMessage{
Version: HelloVersionV2,
Auth: &HelloClientMessageAuth{
Type: HelloClientTypeFederation,
Url: c.federation.NextcloudUrl,
Params: authData,
},
},
})
}
if resumeId := c.resumeId; resumeId != "" {
msg.Hello.ResumeId = resumeId
} else {
msg.Hello.Auth = &HelloClientMessageAuth{
Type: HelloClientTypeFederation,
Url: c.federation.NextcloudUrl,
Params: authData,
}
}
return c.SendMessage(msg)
}
func (c *FederationClient) processWelcome(msg *ServerMessage) {
@ -354,6 +445,8 @@ func (c *FederationClient) processWelcome(msg *ServerMessage) {
}
func (c *FederationClient) processHello(msg *ServerMessage) {
c.resetReconnect()
c.helloMu.Lock()
defer c.helloMu.Unlock()
@ -367,10 +460,22 @@ func (c *FederationClient) processHello(msg *ServerMessage) {
c.helloMsgId = ""
if msg.Type == "error" {
c.closeWithError(msg.Error)
switch msg.Error.Code {
case "no_such_session":
// Resume failed (e.g. remote has restarted), try to connect new session
// which may fail if the auth token has expired in the meantime.
c.resumeId = ""
c.pendingMessages = nil
if err := c.sendHelloLocked(c.helloAuth); err != nil {
c.closeWithError(err)
}
default:
log.Printf("Received hello error from federated client for %s to %s: %+v", c.session.PublicId(), c.URL(), msg)
c.closeWithError(msg.Error)
}
return
} else if msg.Type != "hello" {
log.Printf("Received unknown hello response %+v", msg)
log.Printf("Received unknown hello response from federated client for %s to %s: %+v", c.session.PublicId(), c.URL(), msg)
if err := c.sendHelloLocked(c.helloAuth); err != nil {
c.closeWithError(err)
}
@ -378,8 +483,53 @@ func (c *FederationClient) processHello(msg *ServerMessage) {
}
c.hello.Store(msg.Hello)
if err := c.joinRoom(); err != nil {
c.closeWithError(err)
if c.resumeId == "" {
c.resumeId = msg.Hello.ResumeId
if c.reconnecting {
c.session.SendMessage(&ServerMessage{
Type: "event",
Event: &EventServerMessage{
Target: "room",
Type: "federation_resumed",
Resumed: makePtr(false),
},
})
// Setting the federation client will reset any information on previously
// received "join" events.
c.session.SetFederationClient(c)
}
if err := c.joinRoom(); err != nil {
c.closeWithError(err)
}
} else {
c.session.SendMessage(&ServerMessage{
Type: "event",
Event: &EventServerMessage{
Target: "room",
Type: "federation_resumed",
Resumed: makePtr(true),
},
})
if count := len(c.pendingMessages); count > 0 {
messages := c.pendingMessages
c.pendingMessages = nil
log.Printf("Sending %d pending messages to %s for %s", count, c.URL(), c.session.PublicId())
c.helloMu.Unlock()
defer c.helloMu.Lock()
c.mu.Lock()
defer c.mu.Unlock()
for _, msg := range messages {
if err := c.sendMessageLocked(msg); err != nil {
log.Printf("Error sending pending message %+v on federation connection to %s: %s", msg, c.URL(), err)
break
}
}
}
}
}
@ -562,9 +712,23 @@ func (c *FederationClient) SendMessage(message *ClientMessage) error {
return c.sendMessageLocked(message)
}
func (c *FederationClient) deferMessage(message *ClientMessage) {
c.helloMu.Lock()
defer c.helloMu.Unlock()
if c.resumeId == "" {
return
}
c.pendingMessages = append(c.pendingMessages, message)
if len(c.pendingMessages) >= warnPendingMessagesCount {
log.Printf("Session %s has %d pending federated messages", c.session.PublicId(), len(c.pendingMessages))
}
}
func (c *FederationClient) sendMessageLocked(message *ClientMessage) error {
if c.conn == nil {
return ErrNotConnected
c.deferMessage(message)
return nil
}
c.conn.SetWriteDeadline(time.Now().Add(writeWait)) // nolint
@ -586,12 +750,8 @@ func (c *FederationClient) sendMessageLocked(message *ClientMessage) error {
}
log.Printf("Could not send message %+v for %s to federated client %s: %v", message, c.session.PublicId(), c.URL(), err)
closeData := websocket.FormatCloseMessage(websocket.CloseInternalServerErr, "")
c.conn.SetWriteDeadline(time.Now().Add(writeWait)) // nolint
if err := c.conn.WriteMessage(websocket.CloseMessage, closeData); err != nil {
log.Printf("Could not send close message for %s to federated client %s: %v", c.session.PublicId(), c.URL(), err)
}
return err
c.deferMessage(message)
c.scheduleReconnectLocked()
}
return nil

View file

@ -472,3 +472,251 @@ func Test_FederationMedia(t *testing.T) {
UserId: hello2.Hello.UserId,
}))
}
func Test_FederationResume(t *testing.T) {
CatchLogForTest(t)
assert := assert.New(t)
require := require.New(t)
hub1, hub2, server1, server2 := CreateClusteredHubsForTest(t)
client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
require.NoError(client1.SendHelloV2(testDefaultUserId + "1"))
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
require.NoError(client2.SendHelloV2(testDefaultUserId + "2"))
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
require.NoError(err)
hello2, err := client2.RunUntilHello(ctx)
require.NoError(err)
roomId := "test-room"
federatedRoomId := roomId + "@federated"
room1, err := client1.JoinRoom(ctx, roomId)
require.NoError(err)
require.Equal(roomId, room1.Room.RoomId)
assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello))
now := time.Now()
token, err := client1.CreateHelloV2Token(testDefaultUserId+"2", now, now.Add(time.Minute))
require.NoError(err)
msg := &ClientMessage{
Id: "join-room-fed",
Type: "room",
Room: &RoomClientMessage{
RoomId: federatedRoomId,
SessionId: federatedRoomId + "-" + hello2.Hello.SessionId,
Federation: &RoomFederationMessage{
SignalingUrl: server1.URL,
NextcloudUrl: server1.URL,
RoomId: roomId,
Token: token,
},
},
}
require.NoError(client2.WriteJSON(msg))
if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal(msg.Id, message.Id)
require.Equal("room", message.Type)
require.Equal(federatedRoomId, message.Room.RoomId)
}
// The client1 will see the remote session id for client2.
var remoteSessionId string
if message, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
assert.NoError(client1.checkSingleMessageJoined(message))
evt := message.Event.Join[0]
remoteSessionId = evt.SessionId
assert.NotEqual(hello2.Hello.SessionId, remoteSessionId)
assert.Equal(testDefaultUserId+"2", evt.UserId)
}
// The client2 will see its own session id, not the one from the remote server.
assert.NoError(client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello))
session2 := hub2.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession)
fed2 := session2.GetFederationClient()
require.NotNil(fed2)
fed2.mu.Lock()
err = fed2.conn.Close()
data2 := "from-2-to-1"
assert.NoError(client2.SendMessage(MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}, data2))
fed2.mu.Unlock()
assert.NoError(err)
if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("event", message.Type)
assert.Equal("room", message.Event.Target)
assert.Equal("federation_interrupted", message.Event.Type)
}
if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("event", message.Type)
assert.Equal("room", message.Event.Target)
assert.Equal("federation_resumed", message.Event.Type)
assert.NotNil(message.Event.Resumed)
assert.True(*message.Event.Resumed)
}
ctx1, cancel1 := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel1()
var payload string
if assert.NoError(checkReceiveClientMessage(ctx, client1, "session", &HelloServerMessage{
SessionId: remoteSessionId,
UserId: testDefaultUserId + "2",
}, &payload)) {
assert.Equal(data2, payload)
}
if message, err := client1.RunUntilMessage(ctx1); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded {
assert.NoError(err)
} else {
assert.Nil(message)
}
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel2()
if message, err := client2.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded {
assert.NoError(err)
} else {
assert.Nil(message)
}
}
func Test_FederationResumeNewSession(t *testing.T) {
CatchLogForTest(t)
assert := assert.New(t)
require := require.New(t)
hub1, hub2, server1, server2 := CreateClusteredHubsForTest(t)
client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
require.NoError(client1.SendHelloV2(testDefaultUserId + "1"))
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
require.NoError(client2.SendHelloV2(testDefaultUserId + "2"))
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
require.NoError(err)
hello2, err := client2.RunUntilHello(ctx)
require.NoError(err)
roomId := "test-room"
federatedRoomId := roomId + "@federated"
room1, err := client1.JoinRoom(ctx, roomId)
require.NoError(err)
require.Equal(roomId, room1.Room.RoomId)
assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello))
now := time.Now()
token, err := client1.CreateHelloV2Token(testDefaultUserId+"2", now, now.Add(time.Minute))
require.NoError(err)
msg := &ClientMessage{
Id: "join-room-fed",
Type: "room",
Room: &RoomClientMessage{
RoomId: federatedRoomId,
SessionId: federatedRoomId + "-" + hello2.Hello.SessionId,
Federation: &RoomFederationMessage{
SignalingUrl: server1.URL,
NextcloudUrl: server1.URL,
RoomId: roomId,
Token: token,
},
},
}
require.NoError(client2.WriteJSON(msg))
if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal(msg.Id, message.Id)
require.Equal("room", message.Type)
require.Equal(federatedRoomId, message.Room.RoomId)
}
// The client1 will see the remote session id for client2.
var remoteSessionId string
if message, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
assert.NoError(client1.checkSingleMessageJoined(message))
evt := message.Event.Join[0]
remoteSessionId = evt.SessionId
assert.NotEqual(hello2.Hello.SessionId, remoteSessionId)
assert.Equal(hello2.Hello.UserId, evt.UserId)
}
// The client2 will see its own session id, not the one from the remote server.
assert.NoError(client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello))
remoteSession2 := hub1.GetSessionByPublicId(remoteSessionId).(*ClientSession)
// Simulate disconnected federated client with an expired session.
if client := remoteSession2.GetClient(); client != nil {
remoteSession2.ClearClient(client)
client.Close()
}
remoteSession2.Close()
if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("event", message.Type)
assert.Equal("room", message.Event.Target)
assert.Equal("federation_interrupted", message.Event.Type)
}
if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("event", message.Type)
assert.Equal("room", message.Event.Target)
assert.Equal("federation_resumed", message.Event.Type)
assert.NotNil(message.Event.Resumed)
assert.False(*message.Event.Resumed)
}
// Client1 will get a "leave" for the expired session and a "join" with the
// new remote session id.
assert.NoError(client1.RunUntilLeft(ctx, &HelloServerMessage{
SessionId: remoteSessionId,
UserId: hello2.Hello.UserId,
}))
if message, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
assert.NoError(client1.checkSingleMessageJoined(message))
evt := message.Event.Join[0]
assert.NotEqual(remoteSessionId, evt.SessionId)
assert.NotEqual(hello2.Hello.SessionId, remoteSessionId)
remoteSessionId = evt.SessionId
assert.NotEqual(hello2.Hello.SessionId, remoteSessionId)
assert.Equal(hello2.Hello.UserId, evt.UserId)
}
// client2 will join the room again after the reconnect with the new
// session and get "joined" events for all sessions in the room (including
// its own).
if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("", message.Id)
require.Equal("room", message.Type)
require.Equal(federatedRoomId, message.Room.RoomId)
}
assert.NoError(client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello))
}