diff --git a/client.go b/client.go index 3b17d39..8358d59 100644 --- a/client.go +++ b/client.go @@ -100,7 +100,9 @@ type Client struct { mu sync.Mutex - closeChan chan bool + closeChan chan bool + messagesDone sync.WaitGroup + messageChan chan *bytes.Buffer OnLookupCountry func(*Client) string OnClosed func(*Client) @@ -118,10 +120,12 @@ func NewClient(conn *websocket.Conn, remoteAddress string, agent string) (*Clien agent = "unknown user agent" } client := &Client{ - conn: conn, - addr: remoteAddress, - agent: agent, - closeChan: make(chan bool, 1), + conn: conn, + addr: remoteAddress, + agent: agent, + + closeChan: make(chan bool, 1), + messageChan: make(chan *bytes.Buffer, 16), OnLookupCountry: func(client *Client) string { return unknownCountry }, OnClosed: func(client *Client) {}, @@ -135,6 +139,7 @@ func (c *Client) SetConn(conn *websocket.Conn, remoteAddress string) { c.conn = conn c.addr = remoteAddress c.closeChan = make(chan bool, 1) + c.messageChan = make(chan *bytes.Buffer, 16) c.OnLookupCountry = func(client *Client) string { return unknownCountry } c.OnClosed = func(client *Client) {} c.OnMessageReceived = func(client *Client, data []byte) {} @@ -179,6 +184,8 @@ func (c *Client) Close() { } c.closeChan <- true + c.messagesDone.Wait() + close(c.messageChan) c.OnClosed(c) c.SetSession(nil) @@ -255,8 +262,8 @@ func (c *Client) ReadPump() { return nil }) - decodeBuffer := bufferPool.Get().(*bytes.Buffer) - defer bufferPool.Put(decodeBuffer) + go c.processMessages() + for { conn.SetReadDeadline(time.Now().Add(pongWait)) // nolint messageType, reader, err := conn.NextReader() @@ -284,8 +291,10 @@ func (c *Client) ReadPump() { continue } + decodeBuffer := bufferPool.Get().(*bytes.Buffer) decodeBuffer.Reset() if _, err := decodeBuffer.ReadFrom(reader); err != nil { + bufferPool.Put(decodeBuffer) if session := c.GetSession(); session != nil { log.Printf("Error reading message from client %s: %v", session.PublicId(), err) } else { @@ -294,7 +303,27 @@ func (c *Client) ReadPump() { break } - c.OnMessageReceived(c, decodeBuffer.Bytes()) + // Stop processing if the client was closed. + if atomic.LoadUint32(&c.closed) == 1 { + bufferPool.Put(decodeBuffer) + break + } + + c.messagesDone.Add(1) + c.messageChan <- decodeBuffer + } +} + +func (c *Client) processMessages() { + for { + buffer := <-c.messageChan + if buffer == nil { + break + } + + c.OnMessageReceived(c, buffer.Bytes()) + c.messagesDone.Done() + bufferPool.Put(buffer) } } diff --git a/clientsession.go b/clientsession.go index 6451aff..f98f160 100644 --- a/clientsession.go +++ b/clientsession.go @@ -559,6 +559,14 @@ func (s *ClientSession) sendMessageUnlocked(message *ServerMessage) bool { return true } +func (s *ClientSession) SendError(e *Error) bool { + message := &ServerMessage{ + Type: "error", + Error: e, + } + return s.SendMessage(message) +} + func (s *ClientSession) SendMessage(message *ServerMessage) bool { s.mu.Lock() defer s.mu.Unlock() @@ -822,7 +830,7 @@ func (s *ClientSession) NotifySessionResumed(client *Client) { if len(s.pendingClientMessages) == 0 { s.mu.Unlock() if room := s.GetRoom(); room != nil { - room.NotifySessionResumed(client) + room.NotifySessionResumed(s) } return } @@ -841,7 +849,7 @@ func (s *ClientSession) NotifySessionResumed(client *Client) { if !hasPendingParticipantsUpdate { // Only need to send initial participants list update if none was part of the pending messages. if room := s.GetRoom(); room != nil { - room.NotifySessionResumed(client) + room.NotifySessionResumed(s) } } } diff --git a/hub.go b/hub.go index 57a7f9a..dbeaa7a 100644 --- a/hub.go +++ b/hub.go @@ -740,7 +740,7 @@ func (h *Hub) processRegister(client *Client, message *ClientMessage, backend *B h.setDecodedSessionId(privateSessionId, privateSessionName, sessionIdData) h.setDecodedSessionId(publicSessionId, publicSessionName, sessionIdData) - h.sendHelloResponse(client, message, session) + h.sendHelloResponse(session, message) } func (h *Hub) processUnregister(client *Client) *ClientSession { @@ -768,20 +768,22 @@ func (h *Hub) processMessage(client *Client, data []byte) { if err := message.UnmarshalJSON(data); err != nil { if session := client.GetSession(); session != nil { log.Printf("Error decoding message from client %s: %v", session.PublicId(), err) + session.SendError(InvalidFormat) } else { log.Printf("Error decoding message from %s: %v", client.RemoteAddr(), err) + client.SendError(InvalidFormat) } - client.SendError(InvalidFormat) return } if err := message.CheckValid(); err != nil { if session := client.GetSession(); session != nil { log.Printf("Invalid message %+v from client %s: %v", message, session.PublicId(), err) + session.SendMessage(message.NewErrorServerMessage(InvalidFormat)) } else { log.Printf("Invalid message %+v from %s: %v", message, client.RemoteAddr(), err) + client.SendMessage(message.NewErrorServerMessage(InvalidFormat)) } - client.SendMessage(message.NewErrorServerMessage(InvalidFormat)) return } @@ -814,7 +816,7 @@ func (h *Hub) processMessage(client *Client, data []byte) { } } -func (h *Hub) sendHelloResponse(client *Client, message *ClientMessage, session *ClientSession) bool { +func (h *Hub) sendHelloResponse(session *ClientSession, message *ClientMessage) bool { response := &ServerMessage{ Id: message.Id, Type: "hello", @@ -826,7 +828,7 @@ func (h *Hub) sendHelloResponse(client *Client, message *ClientMessage, session Server: h.GetServerInfo(session), }, } - return client.SendMessage(response) + return session.SendMessage(response) } func (h *Hub) processHello(client *Client, message *ClientMessage) { @@ -873,7 +875,7 @@ func (h *Hub) processHello(client *Client, message *ClientMessage) { log.Printf("Resume session from %s in %s (%s) %s (private=%s)", client.RemoteAddr(), client.Country(), client.UserAgent(), session.PublicId(), session.PrivateId()) - h.sendHelloResponse(client, message, clientSession) + h.sendHelloResponse(clientSession, message) clientSession.NotifySessionResumed(client) return } @@ -986,7 +988,7 @@ func (h *Hub) disconnectByRoomSessionId(roomSessionId string) { session.Close() } -func (h *Hub) sendRoom(client *Client, message *ClientMessage, room *Room) bool { +func (h *Hub) sendRoom(session *ClientSession, message *ClientMessage, room *Room) bool { response := &ServerMessage{ Type: "room", } @@ -1003,7 +1005,7 @@ func (h *Hub) sendRoom(client *Client, message *ClientMessage, room *Room) bool Properties: room.properties, } } - return client.SendMessage(response) + return session.SendMessage(response) } func (h *Hub) processRoom(client *Client, message *ClientMessage) { @@ -1017,7 +1019,7 @@ func (h *Hub) processRoom(client *Client, message *ClientMessage) { // We can handle leaving a room directly. if session.LeaveRoom(true) != nil { // User was in a room before, so need to notify about leaving it. - h.sendRoom(client, message, nil) + h.sendRoom(session, message, nil) } if session.UserId() == "" && session.ClientType() != HelloClientTypeInternal { h.startWaitAnonymousClientRoom(client) @@ -1054,7 +1056,7 @@ func (h *Hub) processRoom(client *Client, message *ClientMessage) { } request := NewBackendClientRoomRequest(roomId, session.UserId(), sessionId) if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &room); err != nil { - client.SendMessage(message.NewWrappedErrorServerMessage(err)) + session.SendMessage(message.NewWrappedErrorServerMessage(err)) return } @@ -1067,7 +1069,7 @@ func (h *Hub) processRoom(client *Client, message *ClientMessage) { } } - h.processJoinRoom(client, message, &room) + h.processJoinRoom(session, message, &room) } func (h *Hub) getRoomForBackend(id string, backend *Backend) *Room { @@ -1097,18 +1099,12 @@ func (h *Hub) createRoom(id string, properties *json.RawMessage, backend *Backen return room, nil } -func (h *Hub) processJoinRoom(client *Client, message *ClientMessage, room *BackendClientResponse) { - session := client.GetSession() - if session == nil { - // Client disconnected while waiting for join room response. - return - } - +func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, room *BackendClientResponse) { if room.Type == "error" { - client.SendMessage(message.NewErrorServerMessage(room.Error)) + session.SendMessage(message.NewErrorServerMessage(room.Error)) return } else if room.Type != "room" { - client.SendMessage(message.NewErrorServerMessage(RoomJoinFailed)) + session.SendMessage(message.NewErrorServerMessage(RoomJoinFailed)) return } @@ -1117,9 +1113,9 @@ func (h *Hub) processJoinRoom(client *Client, message *ClientMessage, room *Back roomId := room.Room.RoomId internalRoomId := getRoomIdForBackend(roomId, session.Backend()) if err := session.SubscribeRoomNats(h.nats, roomId, message.Room.SessionId); err != nil { - client.SendMessage(message.NewWrappedErrorServerMessage(err)) + session.SendMessage(message.NewWrappedErrorServerMessage(err)) // The client (implicitly) left the room due to an error. - h.sendRoom(client, nil, nil) + h.sendRoom(session, nil, nil) return } @@ -1129,28 +1125,30 @@ func (h *Hub) processJoinRoom(client *Client, message *ClientMessage, room *Back var err error if r, err = h.createRoom(roomId, room.Room.Properties, session.Backend()); err != nil { h.ru.Unlock() - client.SendMessage(message.NewWrappedErrorServerMessage(err)) + session.SendMessage(message.NewWrappedErrorServerMessage(err)) // The client (implicitly) left the room due to an error. session.UnsubscribeRoomNats() - h.sendRoom(client, nil, nil) + h.sendRoom(session, nil, nil) return } } h.ru.Unlock() h.mu.Lock() - // The client now joined a room, don't expire him if he is anonymous. - delete(h.anonymousClients, client) + if client := session.GetClient(); client != nil { + // The client now joined a room, don't expire him if he is anonymous. + delete(h.anonymousClients, client) + } h.mu.Unlock() session.SetRoom(r) if room.Room.Permissions != nil { session.SetPermissions(*room.Room.Permissions) } - h.sendRoom(client, message, r) - h.notifyUserJoinedRoom(r, client, session, room.Room.Session) + h.sendRoom(session, message, r) + h.notifyUserJoinedRoom(r, session, room.Room.Session) } -func (h *Hub) notifyUserJoinedRoom(room *Room, client *Client, session Session, sessionData *json.RawMessage) { +func (h *Hub) notifyUserJoinedRoom(room *Room, session *ClientSession, sessionData *json.RawMessage) { // Register session with the room if sessions := room.AddSession(session, sessionData); len(sessions) > 0 { events := make([]*EventServerMessageSessionEntry, 0, len(sessions)) @@ -1171,7 +1169,7 @@ func (h *Hub) notifyUserJoinedRoom(room *Room, client *Client, session Session, } // No need to send through NATS, the session is connected locally. - client.SendMessage(msg) + session.SendMessage(msg) } } @@ -1205,7 +1203,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { case "requestoffer": // Process asynchronously to avoid blocking regular // message processing for this client. - go h.processMcuMessage(client, client, session, message, msg, &data) + go h.processMcuMessage(session, session, message, msg, &data) return case "offer": fallthrough @@ -1214,7 +1212,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { case "endOfCandidates": fallthrough case "candidate": - h.processMcuMessage(client, client, session, message, msg, &data) + h.processMcuMessage(session, session, message, msg, &data) return } } @@ -1315,7 +1313,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { if clientData != nil && clientData.Type == "sendoffer" { if !isAllowedToSend(session, clientData) { log.Printf("Session %s is not allowed to send offer for %s, ignoring", session.PublicId(), clientData.RoomType) - sendNotAllowed(client, message) + sendNotAllowed(session, message) return } @@ -1324,7 +1322,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { // It may take some time for the publisher (which is the current // client) to start his stream, so we must not block the active // goroutine. - go h.processMcuMessage(client, recipient, recipientSession, message, msg, clientData) + go h.processMcuMessage(session, recipientSession, message, msg, clientData) } else { // nolint // Client is not connected yet. } @@ -1491,14 +1489,14 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil { log.Printf("Could not join virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err) reply := message.NewErrorServerMessage(NewError("add_failed", "Could not join virtual session.")) - client.SendMessage(reply) + session.SendMessage(reply) return } if response.Type == "error" { log.Printf("Could not join virtual session %s at backend %s: %+v", virtualSessionId, session.BackendUrl(), response.Error) reply := message.NewErrorServerMessage(NewError("add_failed", response.Error.Error())) - client.SendMessage(reply) + session.SendMessage(reply) return } } else { @@ -1507,7 +1505,7 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil { log.Printf("Could not add virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err) reply := message.NewErrorServerMessage(NewError("add_failed", "Could not add virtual session.")) - client.SendMessage(reply) + session.SendMessage(reply) return } } @@ -1577,7 +1575,7 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { log.Printf("Session %s removed virtual session %s", session.PublicId(), sess.PublicId()) if vsess, ok := sess.(*VirtualSession); ok { // We should always have a VirtualSession here. - vsess.CloseWithFeedback(client, message) + vsess.CloseWithFeedback(session, message) } else { sess.Close() } @@ -1598,22 +1596,22 @@ func isAllowedToSend(session *ClientSession, data *MessageClientMessageData) boo return session.HasPermission(permission) } -func sendNotAllowed(client *Client, message *ClientMessage) { +func sendNotAllowed(session *ClientSession, message *ClientMessage) { response := message.NewErrorServerMessage(NewError("not_allowed", "Not allowed to publish.")) - client.SendMessage(response) + session.SendMessage(response) } -func sendMcuClientNotFound(client *Client, message *ClientMessage) { +func sendMcuClientNotFound(session *ClientSession, message *ClientMessage) { response := message.NewErrorServerMessage(NewError("client_not_found", "No MCU client found to send message to.")) - client.SendMessage(response) + session.SendMessage(response) } -func sendMcuProcessingFailed(client *Client, message *ClientMessage) { +func sendMcuProcessingFailed(session *ClientSession, message *ClientMessage) { response := message.NewErrorServerMessage(NewError("processing_failed", "Processing of the message failed, please check server logs.")) - client.SendMessage(response) + session.SendMessage(response) } -func (h *Hub) processMcuMessage(senderClient *Client, client *Client, session *ClientSession, client_message *ClientMessage, message *MessageClientMessage, data *MessageClientMessageData) { +func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSession, client_message *ClientMessage, message *MessageClientMessage, data *MessageClientMessageData) { ctx, cancel := context.WithTimeout(context.Background(), h.mcuTimeout) defer cancel() @@ -1636,7 +1634,7 @@ func (h *Hub) processMcuMessage(senderClient *Client, client *Client, session *C case "offer": if !isAllowedToSend(session, data) { log.Printf("Session %s is not allowed to offer %s, ignoring", session.PublicId(), data.RoomType) - sendNotAllowed(senderClient, client_message) + sendNotAllowed(senderSession, client_message) return } @@ -1646,7 +1644,7 @@ func (h *Hub) processMcuMessage(senderClient *Client, client *Client, session *C if session.PublicId() == message.Recipient.SessionId { if !isAllowedToSend(session, data) { log.Printf("Session %s is not allowed to send candidate for %s, ignoring", session.PublicId(), data.RoomType) - sendNotAllowed(senderClient, client_message) + sendNotAllowed(senderSession, client_message) return } @@ -1659,29 +1657,29 @@ func (h *Hub) processMcuMessage(senderClient *Client, client *Client, session *C } if err != nil { log.Printf("Could not create MCU %s for session %s to send %+v to %s: %s", clientType, session.PublicId(), data, message.Recipient.SessionId, err) - sendMcuClientNotFound(senderClient, client_message) + sendMcuClientNotFound(senderSession, client_message) return } else if mc == nil { log.Printf("No MCU %s found for session %s to send %+v to %s", clientType, session.PublicId(), data, message.Recipient.SessionId) - sendMcuClientNotFound(senderClient, client_message) + sendMcuClientNotFound(senderSession, client_message) return } mc.SendMessage(context.TODO(), message, data, func(err error, response map[string]interface{}) { if err != nil { log.Printf("Could not send MCU message %+v for session %s to %s: %s", data, session.PublicId(), message.Recipient.SessionId, err) - sendMcuProcessingFailed(senderClient, client_message) + sendMcuProcessingFailed(senderSession, client_message) return } else if response == nil { // No response received return } - h.sendMcuMessageResponse(client, session, message, data, response) + h.sendMcuMessageResponse(session, message, data, response) }) } -func (h *Hub) sendMcuMessageResponse(client *Client, session *ClientSession, message *MessageClientMessage, data *MessageClientMessageData, response map[string]interface{}) { +func (h *Hub) sendMcuMessageResponse(session *ClientSession, message *MessageClientMessage, data *MessageClientMessageData, response map[string]interface{}) { var response_message *ServerMessage switch response["type"] { case "answer": @@ -1763,7 +1761,7 @@ func (h *Hub) processRoomDeleted(message *BackendServerRoomRequest) { switch sess := session.(type) { case *ClientSession: if client := sess.GetClient(); client != nil { - h.sendRoom(client, nil, nil) + h.sendRoom(sess, nil, nil) } } } diff --git a/hub_test.go b/hub_test.go index 9b241e7..ff44955 100644 --- a/hub_test.go +++ b/hub_test.go @@ -254,7 +254,10 @@ func processRoomRequest(t *testing.T, w http.ResponseWriter, r *http.Request, re t.Fatalf("Expected an room backend request, got %+v", request) } - if request.Room.RoomId == "test-room-takeover-room-session" { + switch request.Room.RoomId { + case "test-room-slow": + time.Sleep(100 * time.Millisecond) + case "test-room-takeover-room-session": // Additional checks for testcase "TestClientTakeoverRoomSession" if request.Room.Action == "leave" && request.Room.UserId == "test-userid1" { t.Errorf("Should not receive \"leave\" event for first user, received %+v", request.Room) @@ -1754,6 +1757,95 @@ func TestJoinMultiple(t *testing.T) { } } +func TestJoinRoomSwitchClient(t *testing.T) { + hub, _, _, server, shutdown := CreateHubForTest(t) + defer shutdown() + + client := NewTestClient(t, server, hub) + defer client.CloseWithBye() + + if err := client.SendHello(testDefaultUserId); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + hello, err := client.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + // Join room by id. + roomId := "test-room-slow" + msg := &ClientMessage{ + Id: "ABCD", + Type: "room", + Room: &RoomClientMessage{ + RoomId: roomId, + SessionId: roomId + "-" + hello.Hello.SessionId, + }, + } + if err := client.WriteJSON(msg); err != nil { + t.Fatal(err) + } + // Wait a bit to make sure request is sent before closing client. + time.Sleep(1 * time.Millisecond) + client.Close() + if err := client.WaitForClientRemoved(ctx); err != nil { + t.Fatal(err) + } + + // The client needs some time to reconnect. + time.Sleep(200 * time.Millisecond) + + client2 := NewTestClient(t, server, hub) + defer client2.CloseWithBye() + if err := client2.SendHelloResume(hello.Hello.ResumeId); err != nil { + t.Fatal(err) + } + hello2, err := client2.RunUntilHello(ctx) + if err != nil { + t.Error(err) + } else { + if hello2.Hello.UserId != testDefaultUserId { + t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello2.Hello) + } + if hello2.Hello.SessionId != hello.Hello.SessionId { + t.Errorf("Expected session id %s, got %+v", hello.Hello.SessionId, hello2.Hello) + } + if hello2.Hello.ResumeId != hello.Hello.ResumeId { + t.Errorf("Expected resume id %s, got %+v", hello.Hello.ResumeId, hello2.Hello) + } + } + + room, err := client2.RunUntilMessage(ctx) + if err != nil { + t.Fatal(err) + } + if err := checkUnexpectedClose(err); err != nil { + t.Fatal(err) + } + if err := checkMessageType(room, "room"); err != nil { + t.Fatal(err) + } + if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + + // We will receive a "joined" event. + if err := client2.RunUntilJoined(ctx, hello.Hello); err != nil { + t.Error(err) + } + + // Leave room. + if room, err := client2.JoinRoom(ctx, ""); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != "" { + t.Fatalf("Expected empty room, got %s", room.Room.RoomId) + } +} + func TestGetRealUserIP(t *testing.T) { REMOTE_ATTR := "192.168.1.2" @@ -1841,6 +1933,9 @@ func TestClientMessageToSessionIdWhileDisconnected(t *testing.T) { client1.SendMessage(recipient2, data1) // nolint client1.SendMessage(recipient2, data1) // nolint + // Simulate some time until client resumes the session. + time.Sleep(10 * time.Millisecond) + client2 = NewTestClient(t, server, hub) defer client2.CloseWithBye() if err := client2.SendHelloResume(hello2.Hello.ResumeId); err != nil { diff --git a/natsclient_loopback.go b/natsclient_loopback.go index 12fb8db..e9c33d7 100644 --- a/natsclient_loopback.go +++ b/natsclient_loopback.go @@ -25,6 +25,7 @@ import ( "encoding/json" "strings" "sync" + "time" "github.com/nats-io/nats.go" ) @@ -95,6 +96,9 @@ func (s *loopbackNatsSubscription) run() { msg := s.incoming[0] s.incoming = s.incoming[1:] s.cond.L.Unlock() + // A "real" NATS server would take some time to process the request, + // simulate this by sleeping a tiny bit. + time.Sleep(time.Millisecond) s.ch <- msg s.cond.L.Lock() } diff --git a/room.go b/room.go index e750e68..941f799 100644 --- a/room.go +++ b/room.go @@ -590,13 +590,13 @@ func (r *Room) getParticipantsUpdateMessage(users []map[string]interface{}) *Ser return message } -func (r *Room) NotifySessionResumed(client *Client) { +func (r *Room) NotifySessionResumed(session *ClientSession) { message := r.getParticipantsUpdateMessage(r.users) if len(message.Event.Update.Users) == 0 { return } - client.SendMessage(message) + session.SendMessage(message) } func (r *Room) NotifySessionChanged(session Session) { diff --git a/virtualsession.go b/virtualsession.go index 10d506a..87943da 100644 --- a/virtualsession.go +++ b/virtualsession.go @@ -135,16 +135,16 @@ func (s *VirtualSession) Close() { s.CloseWithFeedback(nil, nil) } -func (s *VirtualSession) CloseWithFeedback(client *Client, message *ClientMessage) { +func (s *VirtualSession) CloseWithFeedback(session *ClientSession, message *ClientMessage) { room := s.GetRoom() s.session.RemoveVirtualSession(s) removed := s.session.hub.removeSession(s) if removed && room != nil { - go s.notifyBackendRemoved(room, client, message) + go s.notifyBackendRemoved(room, session, message) } } -func (s *VirtualSession) notifyBackendRemoved(room *Room, client *Client, message *ClientMessage) { +func (s *VirtualSession) notifyBackendRemoved(room *Room, session *ClientSession, message *ClientMessage) { ctx, cancel := context.WithTimeout(context.Background(), s.hub.backendTimeout) defer cancel() @@ -160,9 +160,9 @@ func (s *VirtualSession) notifyBackendRemoved(room *Room, client *Client, messag if err := s.hub.backend.PerformJSONRequest(ctx, s.ParsedBackendUrl(), request, &response); err != nil { virtualSessionId := GetVirtualSessionId(s.session, s.PublicId()) log.Printf("Could not leave virtual session %s at backend %s: %s", virtualSessionId, s.BackendUrl(), err) - if client != nil && message != nil { + if session != nil && message != nil { reply := message.NewErrorServerMessage(NewError("remove_failed", "Could not remove virtual session from backend.")) - client.SendMessage(reply) + session.SendMessage(reply) } return } @@ -170,9 +170,9 @@ func (s *VirtualSession) notifyBackendRemoved(room *Room, client *Client, messag if response.Type == "error" { virtualSessionId := GetVirtualSessionId(s.session, s.PublicId()) log.Printf("Could not leave virtual session %s at backend %s: %+v", virtualSessionId, s.BackendUrl(), response.Error) - if client != nil && message != nil { + if session != nil && message != nil { reply := message.NewErrorServerMessage(NewError("remove_failed", response.Error.Error())) - client.SendMessage(reply) + session.SendMessage(reply) } return } @@ -182,9 +182,9 @@ func (s *VirtualSession) notifyBackendRemoved(room *Room, client *Client, messag err := s.hub.backend.PerformJSONRequest(ctx, s.ParsedBackendUrl(), request, &response) if err != nil { log.Printf("Could not remove virtual session %s from backend %s: %s", s.PublicId(), s.BackendUrl(), err) - if client != nil && message != nil { + if session != nil && message != nil { reply := message.NewErrorServerMessage(NewError("remove_failed", "Could not remove virtual session from backend.")) - client.SendMessage(reply) + session.SendMessage(reply) } } }