diff --git a/hub.go b/hub.go index 47f154c..cba65b1 100644 --- a/hub.go +++ b/hub.go @@ -1367,7 +1367,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { if clientData != nil && clientData.Type == "sendoffer" { if !isAllowedToSend(session, clientData) { log.Printf("Session %s is not allowed to send offer for %s, ignoring", session.PublicId(), clientData.RoomType) - sendNotAllowed(session, message) + sendNotAllowed(session, message, "Not allowed to send offer") return } @@ -1652,8 +1652,8 @@ func isAllowedToSend(session *ClientSession, data *MessageClientMessageData) boo return session.HasPermission(permission) } -func sendNotAllowed(session *ClientSession, message *ClientMessage) { - response := message.NewErrorServerMessage(NewError("not_allowed", "Not allowed to publish.")) +func sendNotAllowed(session *ClientSession, message *ClientMessage, reason string) { + response := message.NewErrorServerMessage(NewError("not_allowed", reason)) session.SendMessage(response) } @@ -1667,6 +1667,28 @@ func sendMcuProcessingFailed(session *ClientSession, message *ClientMessage) { session.SendMessage(response) } +func (h *Hub) isInSameCall(senderSession *ClientSession, recipientSessionId string) bool { + senderRoom := senderSession.GetRoom() + if senderRoom == nil || !senderRoom.IsSessionInCall(senderSession) { + // Sender is not in a room or not in the call. + return false + } + + recipientSession := h.GetSessionByPublicId(recipientSessionId) + if recipientSession == nil { + // Recipient session does not exist. + return false + } + + recipientRoom := recipientSession.GetRoom() + if recipientRoom == nil || !senderRoom.IsEqual(recipientRoom) || !recipientRoom.IsSessionInCall(recipientSession) { + // Recipient is not in a room, a different room or not in the call. + return false + } + + return true +} + func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSession, client_message *ClientMessage, message *MessageClientMessage, data *MessageClientMessageData) { ctx, cancel := context.WithTimeout(context.Background(), h.mcuTimeout) defer cancel() @@ -1681,6 +1703,14 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes return } + // A user is only allowed to subscribe a stream if she is in the same room + // as the other user and both have their "inCall" flag set. + if !h.isInSameCall(senderSession, message.Recipient.SessionId) { + log.Printf("Session %s is not in the same call as session %s, not requesting offer", session.PublicId(), message.Recipient.SessionId) + sendNotAllowed(senderSession, client_message, "Not allowed to request offer.") + return + } + clientType = "subscriber" mc, err = session.GetOrCreateSubscriber(ctx, h.mcu, message.Recipient.SessionId, data.RoomType) case "sendoffer": @@ -1690,7 +1720,7 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes case "offer": if !isAllowedToSend(session, data) { log.Printf("Session %s is not allowed to offer %s, ignoring", session.PublicId(), data.RoomType) - sendNotAllowed(senderSession, client_message) + sendNotAllowed(senderSession, client_message, "Not allowed to publish.") return } @@ -1708,7 +1738,7 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes if session.PublicId() == message.Recipient.SessionId { if !isAllowedToSend(session, data) { log.Printf("Session %s is not allowed to send candidate for %s, ignoring", session.PublicId(), data.RoomType) - sendNotAllowed(senderSession, client_message) + sendNotAllowed(senderSession, client_message, "Not allowed to send candidate.") return } diff --git a/hub_test.go b/hub_test.go index 52349c2..71c771f 100644 --- a/hub_test.go +++ b/hub_test.go @@ -2449,6 +2449,191 @@ func TestClientSendOfferPermissions(t *testing.T) { } } +func TestClientRequestOfferNotInRoom(t *testing.T) { + hub, _, _, server, shutdown := CreateHubForTest(t) + defer shutdown() + + mcu, err := NewTestMCU() + if err != nil { + t.Fatal(err) + } else if err := mcu.Start(); err != nil { + t.Fatal(err) + } + defer mcu.Stop() + + hub.SetMcu(mcu) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + client1 := NewTestClient(t, server, hub) + defer client1.CloseWithBye() + + if err := client1.SendHello(testDefaultUserId + "1"); err != nil { + t.Fatal(err) + } + + hello1, err := client1.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + client2 := NewTestClient(t, server, hub) + defer client2.CloseWithBye() + + if err := client2.SendHello(testDefaultUserId + "2"); err != nil { + t.Fatal(err) + } + + hello2, err := client2.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + // Join room by id. + roomId := "test-room" + if room, err := client1.JoinRoomWithRoomSession(ctx, roomId, "roomsession1"); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + + // We will receive a "joined" event. + if err := client1.RunUntilJoined(ctx, hello1.Hello); err != nil { + t.Error(err) + } + + // Client 2 may not request an offer (he is not in the room yet). + if err := client2.SendMessage(MessageClientMessageRecipient{ + Type: "session", + SessionId: hello1.Hello.SessionId, + }, MessageClientMessageData{ + Type: "requestoffer", + Sid: "12345", + RoomType: "screen", + }); err != nil { + t.Fatal(err) + } + + if msg, err := client2.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else { + if err := checkMessageError(msg, "not_allowed"); err != nil { + t.Fatal(err) + } + } + + if room, err := client2.JoinRoom(ctx, roomId); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + + // We will receive a "joined" event. + if err := client1.RunUntilJoined(ctx, hello2.Hello); err != nil { + t.Error(err) + } + if err := client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil { + t.Error(err) + } + + // Client 2 may not request an offer (he is not in the call yet). + if err := client2.SendMessage(MessageClientMessageRecipient{ + Type: "session", + SessionId: hello1.Hello.SessionId, + }, MessageClientMessageData{ + Type: "requestoffer", + Sid: "12345", + RoomType: "screen", + }); err != nil { + t.Fatal(err) + } + + if msg, err := client2.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else { + if err := checkMessageError(msg, "not_allowed"); err != nil { + t.Fatal(err) + } + } + + // Simulate request from the backend that somebody joined the call. + users1 := []map[string]interface{}{ + { + "sessionId": hello2.Hello.SessionId, + "inCall": 1, + }, + } + room := hub.getRoom(roomId) + if room == nil { + t.Fatalf("Could not find room %s", roomId) + } + room.PublishUsersInCallChanged(users1, users1) + if err := checkReceiveClientEvent(ctx, client1, "update", nil); err != nil { + t.Error(err) + } + if err := checkReceiveClientEvent(ctx, client2, "update", nil); err != nil { + t.Error(err) + } + + // Client 2 may not request an offer (recipient is not in the call yet). + if err := client2.SendMessage(MessageClientMessageRecipient{ + Type: "session", + SessionId: hello1.Hello.SessionId, + }, MessageClientMessageData{ + Type: "requestoffer", + Sid: "12345", + RoomType: "screen", + }); err != nil { + t.Fatal(err) + } + + if msg, err := client2.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else { + if err := checkMessageError(msg, "not_allowed"); err != nil { + t.Fatal(err) + } + } + + // Simulate request from the backend that somebody joined the call. + users2 := []map[string]interface{}{ + { + "sessionId": hello1.Hello.SessionId, + "inCall": 1, + }, + } + room.PublishUsersInCallChanged(users2, users2) + if err := checkReceiveClientEvent(ctx, client1, "update", nil); err != nil { + t.Error(err) + } + if err := checkReceiveClientEvent(ctx, client2, "update", nil); err != nil { + t.Error(err) + } + + // Client 2 may request an offer now (both are in the same room and call). + if err := client2.SendMessage(MessageClientMessageRecipient{ + Type: "session", + SessionId: hello1.Hello.SessionId, + }, MessageClientMessageData{ + Type: "requestoffer", + Sid: "12345", + RoomType: "screen", + }); err != nil { + t.Fatal(err) + } + + if msg, err := client2.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else { + // We check for "client_not_found" as the testing MCU doesn't support publishing/subscribing. + if err := checkMessageError(msg, "client_not_found"); err != nil { + t.Fatal(err) + } + } + +} + func TestNoSendBetweenSessionsOnDifferentBackends(t *testing.T) { // Clients can't send messages to sessions connected from other backends. hub, _, _, server, shutdown := CreateHubWithMultipleBackendsForTest(t) @@ -2585,15 +2770,19 @@ func TestNoSameRoomOnDifferentBackends(t *testing.T) { } hub.ru.RLock() - roomCount := 0 + var rooms []*Room for _, room := range hub.rooms { defer room.Close() - roomCount++ + rooms = append(rooms, room) } hub.ru.RUnlock() - if roomCount != 2 { - t.Errorf("Expected 2 rooms, got %d", roomCount) + if len(rooms) != 2 { + t.Errorf("Expected 2 rooms, got %+v", rooms) + } + + if rooms[0].IsEqual(rooms[1]) { + t.Errorf("Rooms should be different: %+v", rooms) } recipient := MessageClientMessageRecipient{ diff --git a/room.go b/room.go index 857e7ed..731ae90 100644 --- a/room.go +++ b/room.go @@ -159,6 +159,28 @@ func (r *Room) Backend() *Backend { return r.backend } +func (r *Room) IsEqual(other *Room) bool { + if r == other { + return true + } else if other == nil { + return false + } else if r.Id() != other.Id() { + return false + } + + b1 := r.Backend() + b2 := other.Backend() + if b1 == b2 { + return true + } else if b1 == nil && b2 != nil { + return false + } else if b1 != nil && b2 == nil { + return false + } + + return b1.Id() == b2.Id() +} + func (r *Room) run() { ticker := time.NewTicker(updateActiveSessionsInterval) loop: @@ -320,6 +342,13 @@ func (r *Room) HasSession(session Session) bool { return result } +func (r *Room) IsSessionInCall(session Session) bool { + r.mu.RLock() + _, result := r.inCallSessions[session] + r.mu.RUnlock() + return result +} + // Returns "true" if there are still clients in the room. func (r *Room) RemoveSession(session Session) bool { r.mu.Lock()