Merge pull request #292 from strukturag/more-tests

Add more tests
This commit is contained in:
Joachim Bauch 2022-07-08 16:00:12 +02:00 committed by GitHub
commit 4770fc8ad6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 232 additions and 67 deletions

78
hub.go
View file

@ -141,8 +141,8 @@ type Hub struct {
allowSubscribeAnyStream bool
expiredSessions map[Session]bool
anonymousSessions map[*ClientSession]time.Time
expectHelloClients map[*Client]time.Time
anonymousClients map[*Client]time.Time
backendTimeout time.Duration
backend *BackendClient
@ -329,7 +329,7 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer
allowSubscribeAnyStream: allowSubscribeAnyStream,
expiredSessions: make(map[Session]bool),
anonymousClients: make(map[*Client]time.Time),
anonymousSessions: make(map[*ClientSession]time.Time),
expectHelloClients: make(map[*Client]time.Time),
backendTimeout: backendTimeout,
@ -588,35 +588,35 @@ func (h *Hub) checkExpiredSessions(now time.Time) {
}
}
func (h *Hub) checkExpireClients(now time.Time, clients map[*Client]time.Time, reason string) {
for client, timeout := range clients {
func (h *Hub) checkAnonymousSessions(now time.Time) {
for session, timeout := range h.anonymousSessions {
if now.After(timeout) {
// This will close the client connection.
h.mu.Unlock()
client.SendByeResponseWithReason(nil, reason)
if reason == "room_join_timeout" {
session := client.GetSession()
if session != nil {
session.Close()
}
if client := session.GetClient(); client != nil {
client.SendByeResponseWithReason(nil, "room_join_timeout")
}
session.Close()
h.mu.Lock()
}
}
}
func (h *Hub) checkAnonymousClients(now time.Time) {
h.checkExpireClients(now, h.anonymousClients, "room_join_timeout")
}
func (h *Hub) checkInitialHello(now time.Time) {
h.checkExpireClients(now, h.expectHelloClients, "hello_timeout")
for client, timeout := range h.expectHelloClients {
if now.After(timeout) {
// This will close the client connection.
h.mu.Unlock()
client.SendByeResponseWithReason(nil, "hello_timeout")
h.mu.Lock()
}
}
}
func (h *Hub) performHousekeeping(now time.Time) {
h.mu.Lock()
h.checkExpiredSessions(now)
h.checkAnonymousClients(now)
h.checkAnonymousSessions(now)
h.checkInitialHello(now)
h.mu.Unlock()
}
@ -636,30 +636,30 @@ func (h *Hub) removeSession(session Session) (removed bool) {
}
}
delete(h.expiredSessions, session)
if session, ok := session.(*ClientSession); ok {
delete(h.anonymousSessions, session)
}
h.mu.Unlock()
return
}
func (h *Hub) startWaitAnonymousClientRoom(client *Client) {
func (h *Hub) startWaitAnonymousSessionRoom(session *ClientSession) {
h.mu.Lock()
defer h.mu.Unlock()
h.startWaitAnonymousClientRoomLocked(client)
h.startWaitAnonymousSessionRoomLocked(session)
}
func (h *Hub) startWaitAnonymousClientRoomLocked(client *Client) {
if !client.IsConnected() {
return
}
if session := client.GetSession(); session != nil && session.ClientType() == HelloClientTypeInternal {
func (h *Hub) startWaitAnonymousSessionRoomLocked(session *ClientSession) {
if session.ClientType() == HelloClientTypeInternal {
// Internal clients don't need to join a room.
return
}
// Anonymous clients must join a public room within a given time,
// Anonymous sessions must join a public room within a given time,
// otherwise they get disconnected to avoid blocking resources forever.
now := time.Now()
h.anonymousClients[client] = now.Add(anonmyousJoinRoomTimeout)
h.anonymousSessions[session] = now.Add(anonmyousJoinRoomTimeout)
}
func (h *Hub) startExpectHello(client *Client) {
@ -768,7 +768,7 @@ func (h *Hub) processRegister(client *Client, message *ClientMessage, backend *B
h.clients[sessionIdData.Sid] = client
delete(h.expectHelloClients, client)
if userId == "" && auth.Type != HelloClientTypeInternal {
h.startWaitAnonymousClientRoomLocked(client)
h.startWaitAnonymousSessionRoomLocked(session)
}
h.mu.Unlock()
@ -787,7 +787,6 @@ func (h *Hub) processUnregister(client *Client) *ClientSession {
session := client.GetSession()
h.mu.Lock()
delete(h.anonymousClients, client)
delete(h.expectHelloClients, client)
if session != nil {
delete(h.clients, session.Data().Sid)
@ -1001,8 +1000,8 @@ func (h *Hub) processHelloInternal(client *Client, message *ClientMessage) {
h.processRegister(client, message, backend, auth)
}
func (h *Hub) disconnectByRoomSessionId(roomSessionId string, backend *Backend) {
sessionId, err := h.roomSessions.GetSessionId(roomSessionId)
func (h *Hub) disconnectByRoomSessionId(ctx context.Context, roomSessionId string, backend *Backend) {
sessionId, err := h.roomSessions.LookupSessionId(ctx, roomSessionId)
if err == ErrNoSuchRoomSession {
return
} else if err != nil {
@ -1071,9 +1070,9 @@ func (h *Hub) processRoom(client *Client, message *ClientMessage) {
if session.LeaveRoom(true) != nil {
// User was in a room before, so need to notify about leaving it.
h.sendRoom(session, message, nil)
}
if session.UserId() == "" && session.ClientType() != HelloClientTypeInternal {
h.startWaitAnonymousClientRoom(client)
if session.UserId() == "" && session.ClientType() != HelloClientTypeInternal {
h.startWaitAnonymousSessionRoom(session)
}
}
return
}
@ -1116,7 +1115,10 @@ func (h *Hub) processRoom(client *Client, message *ClientMessage) {
if message.Room.SessionId != "" {
// There can only be one connection per Nextcloud Talk session,
// disconnect any other connections without sending a "leave" event.
h.disconnectByRoomSessionId(message.Room.SessionId, session.Backend())
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
h.disconnectByRoomSessionId(ctx, message.Room.SessionId, session.Backend())
}
}
@ -1170,7 +1172,7 @@ func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, ro
internalRoomId := getRoomIdForBackend(roomId, session.Backend())
if err := session.SubscribeRoomEvents(roomId, message.Room.SessionId); err != nil {
session.SendMessage(message.NewWrappedErrorServerMessage(err))
// The client (implicitly) left the room due to an error.
// The session (implicitly) left the room due to an error.
h.sendRoom(session, nil, nil)
return
}
@ -1182,7 +1184,7 @@ func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, ro
if r, err = h.createRoom(roomId, room.Room.Properties, session.Backend()); err != nil {
h.ru.Unlock()
session.SendMessage(message.NewWrappedErrorServerMessage(err))
// The client (implicitly) left the room due to an error.
// The session (implicitly) left the room due to an error.
session.UnsubscribeRoomEvents()
h.sendRoom(session, nil, nil)
return
@ -1191,10 +1193,8 @@ func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, ro
h.ru.Unlock()
h.mu.Lock()
if client := session.GetClient(); client != nil {
// The client now joined a room, don't expire him if he is anonymous.
delete(h.anonymousClients, client)
}
// The session now joined a room, don't expire if it is anonymous.
delete(h.anonymousSessions, session)
h.mu.Unlock()
session.SetRoom(r)
if room.Room.Permissions != nil {

View file

@ -387,7 +387,8 @@ func processRoomRequest(t *testing.T, w http.ResponseWriter, r *http.Request, re
RoomId: request.Room.RoomId,
},
}
if request.Room.RoomId == "test-room-with-sessiondata" {
switch request.Room.RoomId {
case "test-room-with-sessiondata":
data := map[string]string{
"userid": "userid-from-sessiondata",
}
@ -396,6 +397,9 @@ func processRoomRequest(t *testing.T, w http.ResponseWriter, r *http.Request, re
t.Fatalf("Could not marshal %+v: %s", data, err)
}
response.Room.Session = (*json.RawMessage)(&tmp)
case "test-room-initial-permissions":
permissions := []Permission{PERMISSION_MAY_PUBLISH_AUDIO}
response.Room.Permissions = &permissions
}
return response
}
@ -2201,6 +2205,92 @@ func TestExpectAnonymousJoinRoom(t *testing.T) {
}
}
func TestExpectAnonymousJoinRoomAfterLeave(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(authAnonymousUserId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello, err := client.RunUntilHello(ctx)
if err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != "" {
t.Errorf("Expected an anonymous user, got %+v", hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
if hello.Hello.ResumeId == "" {
t.Errorf("Expected resume id, got %+v", hello.Hello)
}
}
// Join room by id.
roomId := "test-room"
if room, err := client.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
// We will receive a "joined" event.
if err := client.RunUntilJoined(ctx, hello.Hello); err != nil {
t.Error(err)
}
// Perform housekeeping in the future, this will keep the connection as the
// session joined a room.
performHousekeeping(hub, time.Now().Add(anonmyousJoinRoomTimeout+time.Second))
// No message about the closing is sent to the new connection.
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel2()
if message, err := client.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded {
t.Error(err)
} else if message != nil {
t.Errorf("Expected no message, got %+v", message)
}
// Leave room
if room, err := client.JoinRoom(ctx, ""); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != "" {
t.Fatalf("Expected room %s, got %s", "", room.Room.RoomId)
}
// Perform housekeeping in the future, this will cause the connection to
// be terminated because the anonymous client didn't join a room.
performHousekeeping(hub, time.Now().Add(anonmyousJoinRoomTimeout+time.Second))
message, err := client.RunUntilMessage(ctx)
if err != nil {
t.Error(err)
}
if err := checkMessageType(message, "bye"); err != nil {
t.Error(err)
} else if message.Bye.Reason != "room_join_timeout" {
t.Errorf("Expected \"room_join_timeout\" reason, got %+v", message.Bye)
}
// Both the client and the session get removed from the hub.
if err := client.WaitForClientRemoved(ctx); err != nil {
t.Error(err)
}
if err := client.WaitForSessionRemoved(ctx, hello.Hello.SessionId); err != nil {
t.Error(err)
}
}
func TestJoinRoomChange(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
@ -2331,7 +2421,7 @@ func TestJoinMultiple(t *testing.T) {
}
}
func TestJoinMultipleDisplaynamesPermission(t *testing.T) {
func TestJoinDisplaynamesPermission(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client1 := NewTestClient(t, server, hub)
@ -2413,6 +2503,49 @@ func TestJoinMultipleDisplaynamesPermission(t *testing.T) {
}
}
func TestInitialRoomPermissions(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
hello, err := client.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
// Join room by id.
roomId := "test-room-initial-permissions"
if room, err := client.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
if err := client.RunUntilJoined(ctx, hello.Hello); err != nil {
t.Error(err)
}
session := hub.GetSessionByPublicId(hello.Hello.SessionId).(*ClientSession)
if session == nil {
t.Fatalf("Session %s does not exist", hello.Hello.SessionId)
}
if !session.HasPermission(PERMISSION_MAY_PUBLISH_AUDIO) {
t.Errorf("Session %s should have %s, got %+v", session.PublicId(), PERMISSION_MAY_PUBLISH_AUDIO, session.permissions)
}
if session.HasPermission(PERMISSION_MAY_PUBLISH_VIDEO) {
t.Errorf("Session %s should not have %s, got %+v", session.PublicId(), PERMISSION_MAY_PUBLISH_VIDEO, session.permissions)
}
}
func TestJoinRoomSwitchClient(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
@ -2762,17 +2895,28 @@ func TestRoomParticipantsListUpdateWhileDisconnected(t *testing.T) {
}
func TestClientTakeoverRoomSession(t *testing.T) {
for _, backend := range eventBackendsForTest {
t.Run(backend, func(t *testing.T) {
for _, subtest := range clusteredTests {
t.Run(subtest, func(t *testing.T) {
RunTestClientTakeoverRoomSession(t)
})
}
}
func RunTestClientTakeoverRoomSession(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
var hub1 *Hub
var hub2 *Hub
var server1 *httptest.Server
var server2 *httptest.Server
if isLocalTest(t) {
hub1, _, _, server1 = CreateHubForTest(t)
client1 := NewTestClient(t, server, hub)
hub2 = hub1
server2 = server1
} else {
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
}
client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
@ -2796,15 +2940,15 @@ func RunTestClientTakeoverRoomSession(t *testing.T) {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
if hubRoom := hub.getRoom(roomId); hubRoom == nil {
if hubRoom := hub1.getRoom(roomId); hubRoom == nil {
t.Fatalf("Room %s does not exist", roomId)
}
if session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId); session1 == nil {
if session1 := hub1.GetSessionByPublicId(hello1.Hello.SessionId); session1 == nil {
t.Fatalf("There should be a session %s", hello1.Hello.SessionId)
}
client3 := NewTestClient(t, server, hub)
client3 := NewTestClient(t, server2, hub2)
defer client3.CloseWithBye()
if err := client3.SendHello(testDefaultUserId + "3"); err != nil {
@ -2825,7 +2969,7 @@ func RunTestClientTakeoverRoomSession(t *testing.T) {
// Wait until both users have joined.
WaitForUsersJoined(ctx, t, client1, hello1, client3, hello3)
client2 := NewTestClient(t, server, hub)
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
@ -2862,7 +3006,7 @@ func RunTestClientTakeoverRoomSession(t *testing.T) {
}
// The first session has been closed
if session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId); session1 != nil {
if session1 := hub1.GetSessionByPublicId(hello1.Hello.SessionId); session1 != nil {
t.Errorf("The session %s should have been removed", hello1.Hello.SessionId)
}
@ -2872,26 +3016,43 @@ func RunTestClientTakeoverRoomSession(t *testing.T) {
t.Error(err)
}
// No message about the closing is sent to the new connection.
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel2()
if isLocalTest(t) {
// No message about the closing is sent to the new connection.
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel2()
if message, err := client2.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded {
t.Error(err)
} else if message != nil {
t.Errorf("Expected no message, got %+v", message)
}
if message, err := client2.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded {
t.Error(err)
} else if message != nil {
t.Errorf("Expected no message, got %+v", message)
}
// The permanently connected client will receive a "left" event from the
// overridden session and a "joined" for the new session.
if err := client3.RunUntilLeft(ctx, hello1.Hello); err != nil {
t.Error(err)
}
if err := client3.RunUntilJoined(ctx, hello2.Hello); err != nil {
t.Error(err)
}
// The permanently connected client will receive a "left" event from the
// overridden session and a "joined" for the new session. In that order as
// both were on the same server.
if err := client3.RunUntilLeft(ctx, hello1.Hello); err != nil {
t.Error(err)
}
if err := client3.RunUntilJoined(ctx, hello2.Hello); err != nil {
t.Error(err)
}
} else {
// In the clustered case, the new connection will receive a "leave" event
// due to the asynchronous events.
if err := client2.RunUntilLeft(ctx, hello1.Hello); err != nil {
t.Error(err)
}
time.Sleep(time.Second)
// The permanently connected client will first a "joined" event from the new
// session (on the same server) and a "left" from the session on the remote
// server (asynchronously).
if err := client3.RunUntilJoined(ctx, hello2.Hello); err != nil {
t.Error(err)
}
if err := client3.RunUntilLeft(ctx, hello1.Hello); err != nil {
t.Error(err)
}
}
}
func TestClientSendOfferPermissions(t *testing.T) {

View file

@ -452,6 +452,8 @@ func (r *Room) RemoveSession(session Session) bool {
return true
}
// Still need to publish an event so sessions on other servers get notified.
r.PublishSessionLeft(session)
r.hub.removeRoom(r)
r.statsRoomSessionsCurrent.Delete(prometheus.Labels{"clienttype": HelloClientTypeClient})
r.statsRoomSessionsCurrent.Delete(prometheus.Labels{"clienttype": HelloClientTypeInternal})

View file

@ -315,6 +315,7 @@ func (c *TestClient) WaitForClientRemoved(ctx context.Context) error {
c.hub.mu.Unlock()
select {
case <-ctx.Done():
c.hub.mu.Lock()
return ctx.Err()
default:
time.Sleep(time.Millisecond)
@ -341,6 +342,7 @@ func (c *TestClient) WaitForSessionRemoved(ctx context.Context, sessionId string
c.hub.mu.Unlock()
select {
case <-ctx.Done():
c.hub.mu.Lock()
return ctx.Err()
default:
time.Sleep(time.Millisecond)