diff --git a/hub.go b/hub.go index 427a862..893f673 100644 --- a/hub.go +++ b/hub.go @@ -141,8 +141,8 @@ type Hub struct { allowSubscribeAnyStream bool expiredSessions map[Session]bool + anonymousSessions map[*ClientSession]time.Time expectHelloClients map[*Client]time.Time - anonymousClients map[*Client]time.Time backendTimeout time.Duration backend *BackendClient @@ -329,7 +329,7 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer allowSubscribeAnyStream: allowSubscribeAnyStream, expiredSessions: make(map[Session]bool), - anonymousClients: make(map[*Client]time.Time), + anonymousSessions: make(map[*ClientSession]time.Time), expectHelloClients: make(map[*Client]time.Time), backendTimeout: backendTimeout, @@ -588,35 +588,35 @@ func (h *Hub) checkExpiredSessions(now time.Time) { } } -func (h *Hub) checkExpireClients(now time.Time, clients map[*Client]time.Time, reason string) { - for client, timeout := range clients { +func (h *Hub) checkAnonymousSessions(now time.Time) { + for session, timeout := range h.anonymousSessions { if now.After(timeout) { // This will close the client connection. h.mu.Unlock() - client.SendByeResponseWithReason(nil, reason) - if reason == "room_join_timeout" { - session := client.GetSession() - if session != nil { - session.Close() - } + if client := session.GetClient(); client != nil { + client.SendByeResponseWithReason(nil, "room_join_timeout") } + session.Close() h.mu.Lock() } } } -func (h *Hub) checkAnonymousClients(now time.Time) { - h.checkExpireClients(now, h.anonymousClients, "room_join_timeout") -} - func (h *Hub) checkInitialHello(now time.Time) { - h.checkExpireClients(now, h.expectHelloClients, "hello_timeout") + for client, timeout := range h.expectHelloClients { + if now.After(timeout) { + // This will close the client connection. + h.mu.Unlock() + client.SendByeResponseWithReason(nil, "hello_timeout") + h.mu.Lock() + } + } } func (h *Hub) performHousekeeping(now time.Time) { h.mu.Lock() h.checkExpiredSessions(now) - h.checkAnonymousClients(now) + h.checkAnonymousSessions(now) h.checkInitialHello(now) h.mu.Unlock() } @@ -636,30 +636,30 @@ func (h *Hub) removeSession(session Session) (removed bool) { } } delete(h.expiredSessions, session) + if session, ok := session.(*ClientSession); ok { + delete(h.anonymousSessions, session) + } h.mu.Unlock() return } -func (h *Hub) startWaitAnonymousClientRoom(client *Client) { +func (h *Hub) startWaitAnonymousSessionRoom(session *ClientSession) { h.mu.Lock() defer h.mu.Unlock() - h.startWaitAnonymousClientRoomLocked(client) + h.startWaitAnonymousSessionRoomLocked(session) } -func (h *Hub) startWaitAnonymousClientRoomLocked(client *Client) { - if !client.IsConnected() { - return - } - if session := client.GetSession(); session != nil && session.ClientType() == HelloClientTypeInternal { +func (h *Hub) startWaitAnonymousSessionRoomLocked(session *ClientSession) { + if session.ClientType() == HelloClientTypeInternal { // Internal clients don't need to join a room. return } - // Anonymous clients must join a public room within a given time, + // Anonymous sessions must join a public room within a given time, // otherwise they get disconnected to avoid blocking resources forever. now := time.Now() - h.anonymousClients[client] = now.Add(anonmyousJoinRoomTimeout) + h.anonymousSessions[session] = now.Add(anonmyousJoinRoomTimeout) } func (h *Hub) startExpectHello(client *Client) { @@ -768,7 +768,7 @@ func (h *Hub) processRegister(client *Client, message *ClientMessage, backend *B h.clients[sessionIdData.Sid] = client delete(h.expectHelloClients, client) if userId == "" && auth.Type != HelloClientTypeInternal { - h.startWaitAnonymousClientRoomLocked(client) + h.startWaitAnonymousSessionRoomLocked(session) } h.mu.Unlock() @@ -787,7 +787,6 @@ func (h *Hub) processUnregister(client *Client) *ClientSession { session := client.GetSession() h.mu.Lock() - delete(h.anonymousClients, client) delete(h.expectHelloClients, client) if session != nil { delete(h.clients, session.Data().Sid) @@ -1001,8 +1000,8 @@ func (h *Hub) processHelloInternal(client *Client, message *ClientMessage) { h.processRegister(client, message, backend, auth) } -func (h *Hub) disconnectByRoomSessionId(roomSessionId string, backend *Backend) { - sessionId, err := h.roomSessions.GetSessionId(roomSessionId) +func (h *Hub) disconnectByRoomSessionId(ctx context.Context, roomSessionId string, backend *Backend) { + sessionId, err := h.roomSessions.LookupSessionId(ctx, roomSessionId) if err == ErrNoSuchRoomSession { return } else if err != nil { @@ -1071,9 +1070,9 @@ func (h *Hub) processRoom(client *Client, message *ClientMessage) { if session.LeaveRoom(true) != nil { // User was in a room before, so need to notify about leaving it. h.sendRoom(session, message, nil) - } - if session.UserId() == "" && session.ClientType() != HelloClientTypeInternal { - h.startWaitAnonymousClientRoom(client) + if session.UserId() == "" && session.ClientType() != HelloClientTypeInternal { + h.startWaitAnonymousSessionRoom(session) + } } return } @@ -1116,7 +1115,10 @@ func (h *Hub) processRoom(client *Client, message *ClientMessage) { if message.Room.SessionId != "" { // There can only be one connection per Nextcloud Talk session, // disconnect any other connections without sending a "leave" event. - h.disconnectByRoomSessionId(message.Room.SessionId, session.Backend()) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + h.disconnectByRoomSessionId(ctx, message.Room.SessionId, session.Backend()) } } @@ -1170,7 +1172,7 @@ func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, ro internalRoomId := getRoomIdForBackend(roomId, session.Backend()) if err := session.SubscribeRoomEvents(roomId, message.Room.SessionId); err != nil { session.SendMessage(message.NewWrappedErrorServerMessage(err)) - // The client (implicitly) left the room due to an error. + // The session (implicitly) left the room due to an error. h.sendRoom(session, nil, nil) return } @@ -1182,7 +1184,7 @@ func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, ro if r, err = h.createRoom(roomId, room.Room.Properties, session.Backend()); err != nil { h.ru.Unlock() session.SendMessage(message.NewWrappedErrorServerMessage(err)) - // The client (implicitly) left the room due to an error. + // The session (implicitly) left the room due to an error. session.UnsubscribeRoomEvents() h.sendRoom(session, nil, nil) return @@ -1191,10 +1193,8 @@ func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, ro h.ru.Unlock() h.mu.Lock() - if client := session.GetClient(); client != nil { - // The client now joined a room, don't expire him if he is anonymous. - delete(h.anonymousClients, client) - } + // The session now joined a room, don't expire if it is anonymous. + delete(h.anonymousSessions, session) h.mu.Unlock() session.SetRoom(r) if room.Room.Permissions != nil { diff --git a/hub_test.go b/hub_test.go index de133e5..ad25262 100644 --- a/hub_test.go +++ b/hub_test.go @@ -387,7 +387,8 @@ func processRoomRequest(t *testing.T, w http.ResponseWriter, r *http.Request, re RoomId: request.Room.RoomId, }, } - if request.Room.RoomId == "test-room-with-sessiondata" { + switch request.Room.RoomId { + case "test-room-with-sessiondata": data := map[string]string{ "userid": "userid-from-sessiondata", } @@ -396,6 +397,9 @@ func processRoomRequest(t *testing.T, w http.ResponseWriter, r *http.Request, re t.Fatalf("Could not marshal %+v: %s", data, err) } response.Room.Session = (*json.RawMessage)(&tmp) + case "test-room-initial-permissions": + permissions := []Permission{PERMISSION_MAY_PUBLISH_AUDIO} + response.Room.Permissions = &permissions } return response } @@ -2201,6 +2205,92 @@ func TestExpectAnonymousJoinRoom(t *testing.T) { } } +func TestExpectAnonymousJoinRoomAfterLeave(t *testing.T) { + hub, _, _, server := CreateHubForTest(t) + + client := NewTestClient(t, server, hub) + defer client.CloseWithBye() + + if err := client.SendHello(authAnonymousUserId); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + hello, err := client.RunUntilHello(ctx) + if err != nil { + t.Error(err) + } else { + if hello.Hello.UserId != "" { + t.Errorf("Expected an anonymous user, got %+v", hello.Hello) + } + if hello.Hello.SessionId == "" { + t.Errorf("Expected session id, got %+v", hello.Hello) + } + if hello.Hello.ResumeId == "" { + t.Errorf("Expected resume id, got %+v", hello.Hello) + } + } + + // Join room by id. + roomId := "test-room" + if room, err := client.JoinRoom(ctx, roomId); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + + // We will receive a "joined" event. + if err := client.RunUntilJoined(ctx, hello.Hello); err != nil { + t.Error(err) + } + + // Perform housekeeping in the future, this will keep the connection as the + // session joined a room. + performHousekeeping(hub, time.Now().Add(anonmyousJoinRoomTimeout+time.Second)) + + // No message about the closing is sent to the new connection. + ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel2() + + if message, err := client.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded { + t.Error(err) + } else if message != nil { + t.Errorf("Expected no message, got %+v", message) + } + + // Leave room + if room, err := client.JoinRoom(ctx, ""); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != "" { + t.Fatalf("Expected room %s, got %s", "", room.Room.RoomId) + } + + // Perform housekeeping in the future, this will cause the connection to + // be terminated because the anonymous client didn't join a room. + performHousekeeping(hub, time.Now().Add(anonmyousJoinRoomTimeout+time.Second)) + + message, err := client.RunUntilMessage(ctx) + if err != nil { + t.Error(err) + } + + if err := checkMessageType(message, "bye"); err != nil { + t.Error(err) + } else if message.Bye.Reason != "room_join_timeout" { + t.Errorf("Expected \"room_join_timeout\" reason, got %+v", message.Bye) + } + + // Both the client and the session get removed from the hub. + if err := client.WaitForClientRemoved(ctx); err != nil { + t.Error(err) + } + if err := client.WaitForSessionRemoved(ctx, hello.Hello.SessionId); err != nil { + t.Error(err) + } +} + func TestJoinRoomChange(t *testing.T) { hub, _, _, server := CreateHubForTest(t) @@ -2331,7 +2421,7 @@ func TestJoinMultiple(t *testing.T) { } } -func TestJoinMultipleDisplaynamesPermission(t *testing.T) { +func TestJoinDisplaynamesPermission(t *testing.T) { hub, _, _, server := CreateHubForTest(t) client1 := NewTestClient(t, server, hub) @@ -2413,6 +2503,49 @@ func TestJoinMultipleDisplaynamesPermission(t *testing.T) { } } +func TestInitialRoomPermissions(t *testing.T) { + hub, _, _, server := CreateHubForTest(t) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + client := NewTestClient(t, server, hub) + defer client.CloseWithBye() + + if err := client.SendHello(testDefaultUserId + "1"); err != nil { + t.Fatal(err) + } + + hello, err := client.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + // Join room by id. + roomId := "test-room-initial-permissions" + if room, err := client.JoinRoom(ctx, roomId); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + + if err := client.RunUntilJoined(ctx, hello.Hello); err != nil { + t.Error(err) + } + + session := hub.GetSessionByPublicId(hello.Hello.SessionId).(*ClientSession) + if session == nil { + t.Fatalf("Session %s does not exist", hello.Hello.SessionId) + } + + if !session.HasPermission(PERMISSION_MAY_PUBLISH_AUDIO) { + t.Errorf("Session %s should have %s, got %+v", session.PublicId(), PERMISSION_MAY_PUBLISH_AUDIO, session.permissions) + } + if session.HasPermission(PERMISSION_MAY_PUBLISH_VIDEO) { + t.Errorf("Session %s should not have %s, got %+v", session.PublicId(), PERMISSION_MAY_PUBLISH_VIDEO, session.permissions) + } +} + func TestJoinRoomSwitchClient(t *testing.T) { hub, _, _, server := CreateHubForTest(t) @@ -2762,17 +2895,28 @@ func TestRoomParticipantsListUpdateWhileDisconnected(t *testing.T) { } func TestClientTakeoverRoomSession(t *testing.T) { - for _, backend := range eventBackendsForTest { - t.Run(backend, func(t *testing.T) { + for _, subtest := range clusteredTests { + t.Run(subtest, func(t *testing.T) { RunTestClientTakeoverRoomSession(t) }) } } func RunTestClientTakeoverRoomSession(t *testing.T) { - hub, _, _, server := CreateHubForTest(t) + var hub1 *Hub + var hub2 *Hub + var server1 *httptest.Server + var server2 *httptest.Server + if isLocalTest(t) { + hub1, _, _, server1 = CreateHubForTest(t) - client1 := NewTestClient(t, server, hub) + hub2 = hub1 + server2 = server1 + } else { + hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t) + } + + client1 := NewTestClient(t, server1, hub1) defer client1.CloseWithBye() if err := client1.SendHello(testDefaultUserId + "1"); err != nil { @@ -2796,15 +2940,15 @@ func RunTestClientTakeoverRoomSession(t *testing.T) { t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) } - if hubRoom := hub.getRoom(roomId); hubRoom == nil { + if hubRoom := hub1.getRoom(roomId); hubRoom == nil { t.Fatalf("Room %s does not exist", roomId) } - if session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId); session1 == nil { + if session1 := hub1.GetSessionByPublicId(hello1.Hello.SessionId); session1 == nil { t.Fatalf("There should be a session %s", hello1.Hello.SessionId) } - client3 := NewTestClient(t, server, hub) + client3 := NewTestClient(t, server2, hub2) defer client3.CloseWithBye() if err := client3.SendHello(testDefaultUserId + "3"); err != nil { @@ -2825,7 +2969,7 @@ func RunTestClientTakeoverRoomSession(t *testing.T) { // Wait until both users have joined. WaitForUsersJoined(ctx, t, client1, hello1, client3, hello3) - client2 := NewTestClient(t, server, hub) + client2 := NewTestClient(t, server2, hub2) defer client2.CloseWithBye() if err := client2.SendHello(testDefaultUserId + "2"); err != nil { @@ -2862,7 +3006,7 @@ func RunTestClientTakeoverRoomSession(t *testing.T) { } // The first session has been closed - if session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId); session1 != nil { + if session1 := hub1.GetSessionByPublicId(hello1.Hello.SessionId); session1 != nil { t.Errorf("The session %s should have been removed", hello1.Hello.SessionId) } @@ -2872,26 +3016,43 @@ func RunTestClientTakeoverRoomSession(t *testing.T) { t.Error(err) } - // No message about the closing is sent to the new connection. - ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel2() + if isLocalTest(t) { + // No message about the closing is sent to the new connection. + ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel2() - if message, err := client2.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded { - t.Error(err) - } else if message != nil { - t.Errorf("Expected no message, got %+v", message) - } + if message, err := client2.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded { + t.Error(err) + } else if message != nil { + t.Errorf("Expected no message, got %+v", message) + } - // The permanently connected client will receive a "left" event from the - // overridden session and a "joined" for the new session. - if err := client3.RunUntilLeft(ctx, hello1.Hello); err != nil { - t.Error(err) - } - if err := client3.RunUntilJoined(ctx, hello2.Hello); err != nil { - t.Error(err) - } + // The permanently connected client will receive a "left" event from the + // overridden session and a "joined" for the new session. In that order as + // both were on the same server. + if err := client3.RunUntilLeft(ctx, hello1.Hello); err != nil { + t.Error(err) + } + if err := client3.RunUntilJoined(ctx, hello2.Hello); err != nil { + t.Error(err) + } + } else { + // In the clustered case, the new connection will receive a "leave" event + // due to the asynchronous events. + if err := client2.RunUntilLeft(ctx, hello1.Hello); err != nil { + t.Error(err) + } - time.Sleep(time.Second) + // The permanently connected client will first a "joined" event from the new + // session (on the same server) and a "left" from the session on the remote + // server (asynchronously). + if err := client3.RunUntilJoined(ctx, hello2.Hello); err != nil { + t.Error(err) + } + if err := client3.RunUntilLeft(ctx, hello1.Hello); err != nil { + t.Error(err) + } + } } func TestClientSendOfferPermissions(t *testing.T) { diff --git a/room.go b/room.go index 16f7dcb..fbcaf7f 100644 --- a/room.go +++ b/room.go @@ -452,6 +452,8 @@ func (r *Room) RemoveSession(session Session) bool { return true } + // Still need to publish an event so sessions on other servers get notified. + r.PublishSessionLeft(session) r.hub.removeRoom(r) r.statsRoomSessionsCurrent.Delete(prometheus.Labels{"clienttype": HelloClientTypeClient}) r.statsRoomSessionsCurrent.Delete(prometheus.Labels{"clienttype": HelloClientTypeInternal}) diff --git a/testclient_test.go b/testclient_test.go index beac45a..c54eb6d 100644 --- a/testclient_test.go +++ b/testclient_test.go @@ -315,6 +315,7 @@ func (c *TestClient) WaitForClientRemoved(ctx context.Context) error { c.hub.mu.Unlock() select { case <-ctx.Done(): + c.hub.mu.Lock() return ctx.Err() default: time.Sleep(time.Millisecond) @@ -341,6 +342,7 @@ func (c *TestClient) WaitForSessionRemoved(ctx context.Context, sessionId string c.hub.mu.Unlock() select { case <-ctx.Done(): + c.hub.mu.Lock() return ctx.Err() default: time.Sleep(time.Millisecond)