From b3985914479db3958a898e25a79132c917bcea3b Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 7 Jul 2021 10:53:30 +0200 Subject: [PATCH 1/2] Only allow subscribing if both users are in the same room and call. Previously it was possible to subscribe any stream if the session id of the publisher was known. --- hub.go | 40 +++++++++-- hub_test.go | 197 ++++++++++++++++++++++++++++++++++++++++++++++++++-- room.go | 29 ++++++++ 3 files changed, 257 insertions(+), 9 deletions(-) diff --git a/hub.go b/hub.go index 47f154c..cba65b1 100644 --- a/hub.go +++ b/hub.go @@ -1367,7 +1367,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { if clientData != nil && clientData.Type == "sendoffer" { if !isAllowedToSend(session, clientData) { log.Printf("Session %s is not allowed to send offer for %s, ignoring", session.PublicId(), clientData.RoomType) - sendNotAllowed(session, message) + sendNotAllowed(session, message, "Not allowed to send offer") return } @@ -1652,8 +1652,8 @@ func isAllowedToSend(session *ClientSession, data *MessageClientMessageData) boo return session.HasPermission(permission) } -func sendNotAllowed(session *ClientSession, message *ClientMessage) { - response := message.NewErrorServerMessage(NewError("not_allowed", "Not allowed to publish.")) +func sendNotAllowed(session *ClientSession, message *ClientMessage, reason string) { + response := message.NewErrorServerMessage(NewError("not_allowed", reason)) session.SendMessage(response) } @@ -1667,6 +1667,28 @@ func sendMcuProcessingFailed(session *ClientSession, message *ClientMessage) { session.SendMessage(response) } +func (h *Hub) isInSameCall(senderSession *ClientSession, recipientSessionId string) bool { + senderRoom := senderSession.GetRoom() + if senderRoom == nil || !senderRoom.IsSessionInCall(senderSession) { + // Sender is not in a room or not in the call. + return false + } + + recipientSession := h.GetSessionByPublicId(recipientSessionId) + if recipientSession == nil { + // Recipient session does not exist. + return false + } + + recipientRoom := recipientSession.GetRoom() + if recipientRoom == nil || !senderRoom.IsEqual(recipientRoom) || !recipientRoom.IsSessionInCall(recipientSession) { + // Recipient is not in a room, a different room or not in the call. + return false + } + + return true +} + func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSession, client_message *ClientMessage, message *MessageClientMessage, data *MessageClientMessageData) { ctx, cancel := context.WithTimeout(context.Background(), h.mcuTimeout) defer cancel() @@ -1681,6 +1703,14 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes return } + // A user is only allowed to subscribe a stream if she is in the same room + // as the other user and both have their "inCall" flag set. + if !h.isInSameCall(senderSession, message.Recipient.SessionId) { + log.Printf("Session %s is not in the same call as session %s, not requesting offer", session.PublicId(), message.Recipient.SessionId) + sendNotAllowed(senderSession, client_message, "Not allowed to request offer.") + return + } + clientType = "subscriber" mc, err = session.GetOrCreateSubscriber(ctx, h.mcu, message.Recipient.SessionId, data.RoomType) case "sendoffer": @@ -1690,7 +1720,7 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes case "offer": if !isAllowedToSend(session, data) { log.Printf("Session %s is not allowed to offer %s, ignoring", session.PublicId(), data.RoomType) - sendNotAllowed(senderSession, client_message) + sendNotAllowed(senderSession, client_message, "Not allowed to publish.") return } @@ -1708,7 +1738,7 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes if session.PublicId() == message.Recipient.SessionId { if !isAllowedToSend(session, data) { log.Printf("Session %s is not allowed to send candidate for %s, ignoring", session.PublicId(), data.RoomType) - sendNotAllowed(senderSession, client_message) + sendNotAllowed(senderSession, client_message, "Not allowed to send candidate.") return } diff --git a/hub_test.go b/hub_test.go index 52349c2..71c771f 100644 --- a/hub_test.go +++ b/hub_test.go @@ -2449,6 +2449,191 @@ func TestClientSendOfferPermissions(t *testing.T) { } } +func TestClientRequestOfferNotInRoom(t *testing.T) { + hub, _, _, server, shutdown := CreateHubForTest(t) + defer shutdown() + + mcu, err := NewTestMCU() + if err != nil { + t.Fatal(err) + } else if err := mcu.Start(); err != nil { + t.Fatal(err) + } + defer mcu.Stop() + + hub.SetMcu(mcu) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + client1 := NewTestClient(t, server, hub) + defer client1.CloseWithBye() + + if err := client1.SendHello(testDefaultUserId + "1"); err != nil { + t.Fatal(err) + } + + hello1, err := client1.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + client2 := NewTestClient(t, server, hub) + defer client2.CloseWithBye() + + if err := client2.SendHello(testDefaultUserId + "2"); err != nil { + t.Fatal(err) + } + + hello2, err := client2.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + // Join room by id. + roomId := "test-room" + if room, err := client1.JoinRoomWithRoomSession(ctx, roomId, "roomsession1"); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + + // We will receive a "joined" event. + if err := client1.RunUntilJoined(ctx, hello1.Hello); err != nil { + t.Error(err) + } + + // Client 2 may not request an offer (he is not in the room yet). + if err := client2.SendMessage(MessageClientMessageRecipient{ + Type: "session", + SessionId: hello1.Hello.SessionId, + }, MessageClientMessageData{ + Type: "requestoffer", + Sid: "12345", + RoomType: "screen", + }); err != nil { + t.Fatal(err) + } + + if msg, err := client2.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else { + if err := checkMessageError(msg, "not_allowed"); err != nil { + t.Fatal(err) + } + } + + if room, err := client2.JoinRoom(ctx, roomId); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + + // We will receive a "joined" event. + if err := client1.RunUntilJoined(ctx, hello2.Hello); err != nil { + t.Error(err) + } + if err := client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil { + t.Error(err) + } + + // Client 2 may not request an offer (he is not in the call yet). + if err := client2.SendMessage(MessageClientMessageRecipient{ + Type: "session", + SessionId: hello1.Hello.SessionId, + }, MessageClientMessageData{ + Type: "requestoffer", + Sid: "12345", + RoomType: "screen", + }); err != nil { + t.Fatal(err) + } + + if msg, err := client2.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else { + if err := checkMessageError(msg, "not_allowed"); err != nil { + t.Fatal(err) + } + } + + // Simulate request from the backend that somebody joined the call. + users1 := []map[string]interface{}{ + { + "sessionId": hello2.Hello.SessionId, + "inCall": 1, + }, + } + room := hub.getRoom(roomId) + if room == nil { + t.Fatalf("Could not find room %s", roomId) + } + room.PublishUsersInCallChanged(users1, users1) + if err := checkReceiveClientEvent(ctx, client1, "update", nil); err != nil { + t.Error(err) + } + if err := checkReceiveClientEvent(ctx, client2, "update", nil); err != nil { + t.Error(err) + } + + // Client 2 may not request an offer (recipient is not in the call yet). + if err := client2.SendMessage(MessageClientMessageRecipient{ + Type: "session", + SessionId: hello1.Hello.SessionId, + }, MessageClientMessageData{ + Type: "requestoffer", + Sid: "12345", + RoomType: "screen", + }); err != nil { + t.Fatal(err) + } + + if msg, err := client2.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else { + if err := checkMessageError(msg, "not_allowed"); err != nil { + t.Fatal(err) + } + } + + // Simulate request from the backend that somebody joined the call. + users2 := []map[string]interface{}{ + { + "sessionId": hello1.Hello.SessionId, + "inCall": 1, + }, + } + room.PublishUsersInCallChanged(users2, users2) + if err := checkReceiveClientEvent(ctx, client1, "update", nil); err != nil { + t.Error(err) + } + if err := checkReceiveClientEvent(ctx, client2, "update", nil); err != nil { + t.Error(err) + } + + // Client 2 may request an offer now (both are in the same room and call). + if err := client2.SendMessage(MessageClientMessageRecipient{ + Type: "session", + SessionId: hello1.Hello.SessionId, + }, MessageClientMessageData{ + Type: "requestoffer", + Sid: "12345", + RoomType: "screen", + }); err != nil { + t.Fatal(err) + } + + if msg, err := client2.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else { + // We check for "client_not_found" as the testing MCU doesn't support publishing/subscribing. + if err := checkMessageError(msg, "client_not_found"); err != nil { + t.Fatal(err) + } + } + +} + func TestNoSendBetweenSessionsOnDifferentBackends(t *testing.T) { // Clients can't send messages to sessions connected from other backends. hub, _, _, server, shutdown := CreateHubWithMultipleBackendsForTest(t) @@ -2585,15 +2770,19 @@ func TestNoSameRoomOnDifferentBackends(t *testing.T) { } hub.ru.RLock() - roomCount := 0 + var rooms []*Room for _, room := range hub.rooms { defer room.Close() - roomCount++ + rooms = append(rooms, room) } hub.ru.RUnlock() - if roomCount != 2 { - t.Errorf("Expected 2 rooms, got %d", roomCount) + if len(rooms) != 2 { + t.Errorf("Expected 2 rooms, got %+v", rooms) + } + + if rooms[0].IsEqual(rooms[1]) { + t.Errorf("Rooms should be different: %+v", rooms) } recipient := MessageClientMessageRecipient{ diff --git a/room.go b/room.go index 857e7ed..731ae90 100644 --- a/room.go +++ b/room.go @@ -159,6 +159,28 @@ func (r *Room) Backend() *Backend { return r.backend } +func (r *Room) IsEqual(other *Room) bool { + if r == other { + return true + } else if other == nil { + return false + } else if r.Id() != other.Id() { + return false + } + + b1 := r.Backend() + b2 := other.Backend() + if b1 == b2 { + return true + } else if b1 == nil && b2 != nil { + return false + } else if b1 != nil && b2 == nil { + return false + } + + return b1.Id() == b2.Id() +} + func (r *Room) run() { ticker := time.NewTicker(updateActiveSessionsInterval) loop: @@ -320,6 +342,13 @@ func (r *Room) HasSession(session Session) bool { return result } +func (r *Room) IsSessionInCall(session Session) bool { + r.mu.RLock() + _, result := r.inCallSessions[session] + r.mu.RUnlock() + return result +} + // Returns "true" if there are still clients in the room. func (r *Room) RemoveSession(session Session) bool { r.mu.Lock() From a663dd43f90b0876630250012bb716136920fcd3 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 7 Jul 2021 11:24:53 +0200 Subject: [PATCH 2/2] Add option to allow subscribing of any streams (disabled by default). --- hub.go | 11 ++++++++++- server.conf.in | 5 +++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/hub.go b/hub.go index cba65b1..8b70897 100644 --- a/hub.go +++ b/hub.go @@ -135,6 +135,8 @@ type Hub struct { mcuTimeout time.Duration internalClientsSecret []byte + allowSubscribeAnyStream bool + expiredSessions map[Session]bool expectHelloClients map[*Client]time.Time anonymousClients map[*Client]time.Time @@ -197,6 +199,11 @@ func NewHub(config *goconf.ConfigFile, nats NatsClient, r *mux.Router, version s } mcuTimeout := time.Duration(mcuTimeoutSeconds) * time.Second + allowSubscribeAnyStream, _ := config.GetBool("app", "allowsubscribeany") + if allowSubscribeAnyStream { + log.Printf("WARNING: Allow subscribing any streams, this is insecure and should only be enabled for testing") + } + decodeCaches := make([]*LruCache, 0, numDecodeCaches) for i := 0; i < numDecodeCaches; i++ { decodeCaches = append(decodeCaches, NewLruCache(decodeCacheSize)) @@ -313,6 +320,8 @@ func NewHub(config *goconf.ConfigFile, nats NatsClient, r *mux.Router, version s mcuTimeout: mcuTimeout, internalClientsSecret: []byte(internalClientsSecret), + allowSubscribeAnyStream: allowSubscribeAnyStream, + expiredSessions: make(map[Session]bool), anonymousClients: make(map[*Client]time.Time), expectHelloClients: make(map[*Client]time.Time), @@ -1705,7 +1714,7 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes // A user is only allowed to subscribe a stream if she is in the same room // as the other user and both have their "inCall" flag set. - if !h.isInSameCall(senderSession, message.Recipient.SessionId) { + if !h.allowSubscribeAnyStream && !h.isInSameCall(senderSession, message.Recipient.SessionId) { log.Printf("Session %s is not in the same call as session %s, not requesting offer", session.PublicId(), message.Recipient.SessionId) sendNotAllowed(senderSession, client_message, "Not allowed to request offer.") return diff --git a/server.conf.in b/server.conf.in index ab6f889..bd2d50d 100644 --- a/server.conf.in +++ b/server.conf.in @@ -29,6 +29,11 @@ key = /etc/nginx/ssl/server.key # See "https://golang.org/pkg/net/http/pprof/" for further information. debug = false +# Set to "true" to allow subscribing any streams. This is insecure and should +# only be enabled for testing. By default only streams of users in the same +# room and call can be subscribed. +#allowsubscribeany = false + [sessions] # Secret value used to generate checksums of sessions. This should be a random # string of 32 or 64 bytes.