Merge pull request #133 from strukturag/check-subscribe-call
Only allow subscribing if both users are in the same room and call.
This commit is contained in:
commit
aeb7834ff7
49
hub.go
49
hub.go
|
@ -135,6 +135,8 @@ type Hub struct {
|
|||
mcuTimeout time.Duration
|
||||
internalClientsSecret []byte
|
||||
|
||||
allowSubscribeAnyStream bool
|
||||
|
||||
expiredSessions map[Session]bool
|
||||
expectHelloClients map[*Client]time.Time
|
||||
anonymousClients map[*Client]time.Time
|
||||
|
@ -197,6 +199,11 @@ func NewHub(config *goconf.ConfigFile, nats NatsClient, r *mux.Router, version s
|
|||
}
|
||||
mcuTimeout := time.Duration(mcuTimeoutSeconds) * time.Second
|
||||
|
||||
allowSubscribeAnyStream, _ := config.GetBool("app", "allowsubscribeany")
|
||||
if allowSubscribeAnyStream {
|
||||
log.Printf("WARNING: Allow subscribing any streams, this is insecure and should only be enabled for testing")
|
||||
}
|
||||
|
||||
decodeCaches := make([]*LruCache, 0, numDecodeCaches)
|
||||
for i := 0; i < numDecodeCaches; i++ {
|
||||
decodeCaches = append(decodeCaches, NewLruCache(decodeCacheSize))
|
||||
|
@ -313,6 +320,8 @@ func NewHub(config *goconf.ConfigFile, nats NatsClient, r *mux.Router, version s
|
|||
mcuTimeout: mcuTimeout,
|
||||
internalClientsSecret: []byte(internalClientsSecret),
|
||||
|
||||
allowSubscribeAnyStream: allowSubscribeAnyStream,
|
||||
|
||||
expiredSessions: make(map[Session]bool),
|
||||
anonymousClients: make(map[*Client]time.Time),
|
||||
expectHelloClients: make(map[*Client]time.Time),
|
||||
|
@ -1367,7 +1376,7 @@ func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) {
|
|||
if clientData != nil && clientData.Type == "sendoffer" {
|
||||
if !isAllowedToSend(session, clientData) {
|
||||
log.Printf("Session %s is not allowed to send offer for %s, ignoring", session.PublicId(), clientData.RoomType)
|
||||
sendNotAllowed(session, message)
|
||||
sendNotAllowed(session, message, "Not allowed to send offer")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1652,8 +1661,8 @@ func isAllowedToSend(session *ClientSession, data *MessageClientMessageData) boo
|
|||
return session.HasPermission(permission)
|
||||
}
|
||||
|
||||
func sendNotAllowed(session *ClientSession, message *ClientMessage) {
|
||||
response := message.NewErrorServerMessage(NewError("not_allowed", "Not allowed to publish."))
|
||||
func sendNotAllowed(session *ClientSession, message *ClientMessage, reason string) {
|
||||
response := message.NewErrorServerMessage(NewError("not_allowed", reason))
|
||||
session.SendMessage(response)
|
||||
}
|
||||
|
||||
|
@ -1667,6 +1676,28 @@ func sendMcuProcessingFailed(session *ClientSession, message *ClientMessage) {
|
|||
session.SendMessage(response)
|
||||
}
|
||||
|
||||
func (h *Hub) isInSameCall(senderSession *ClientSession, recipientSessionId string) bool {
|
||||
senderRoom := senderSession.GetRoom()
|
||||
if senderRoom == nil || !senderRoom.IsSessionInCall(senderSession) {
|
||||
// Sender is not in a room or not in the call.
|
||||
return false
|
||||
}
|
||||
|
||||
recipientSession := h.GetSessionByPublicId(recipientSessionId)
|
||||
if recipientSession == nil {
|
||||
// Recipient session does not exist.
|
||||
return false
|
||||
}
|
||||
|
||||
recipientRoom := recipientSession.GetRoom()
|
||||
if recipientRoom == nil || !senderRoom.IsEqual(recipientRoom) || !recipientRoom.IsSessionInCall(recipientSession) {
|
||||
// Recipient is not in a room, a different room or not in the call.
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSession, client_message *ClientMessage, message *MessageClientMessage, data *MessageClientMessageData) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), h.mcuTimeout)
|
||||
defer cancel()
|
||||
|
@ -1681,6 +1712,14 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes
|
|||
return
|
||||
}
|
||||
|
||||
// A user is only allowed to subscribe a stream if she is in the same room
|
||||
// as the other user and both have their "inCall" flag set.
|
||||
if !h.allowSubscribeAnyStream && !h.isInSameCall(senderSession, message.Recipient.SessionId) {
|
||||
log.Printf("Session %s is not in the same call as session %s, not requesting offer", session.PublicId(), message.Recipient.SessionId)
|
||||
sendNotAllowed(senderSession, client_message, "Not allowed to request offer.")
|
||||
return
|
||||
}
|
||||
|
||||
clientType = "subscriber"
|
||||
mc, err = session.GetOrCreateSubscriber(ctx, h.mcu, message.Recipient.SessionId, data.RoomType)
|
||||
case "sendoffer":
|
||||
|
@ -1690,7 +1729,7 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes
|
|||
case "offer":
|
||||
if !isAllowedToSend(session, data) {
|
||||
log.Printf("Session %s is not allowed to offer %s, ignoring", session.PublicId(), data.RoomType)
|
||||
sendNotAllowed(senderSession, client_message)
|
||||
sendNotAllowed(senderSession, client_message, "Not allowed to publish.")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1708,7 +1747,7 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes
|
|||
if session.PublicId() == message.Recipient.SessionId {
|
||||
if !isAllowedToSend(session, data) {
|
||||
log.Printf("Session %s is not allowed to send candidate for %s, ignoring", session.PublicId(), data.RoomType)
|
||||
sendNotAllowed(senderSession, client_message)
|
||||
sendNotAllowed(senderSession, client_message, "Not allowed to send candidate.")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
197
hub_test.go
197
hub_test.go
|
@ -2449,6 +2449,191 @@ func TestClientSendOfferPermissions(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestClientRequestOfferNotInRoom(t *testing.T) {
|
||||
hub, _, _, server, shutdown := CreateHubForTest(t)
|
||||
defer shutdown()
|
||||
|
||||
mcu, err := NewTestMCU()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := mcu.Start(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer mcu.Stop()
|
||||
|
||||
hub.SetMcu(mcu)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
||||
defer cancel()
|
||||
|
||||
client1 := NewTestClient(t, server, hub)
|
||||
defer client1.CloseWithBye()
|
||||
|
||||
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
hello1, err := client1.RunUntilHello(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
client2 := NewTestClient(t, server, hub)
|
||||
defer client2.CloseWithBye()
|
||||
|
||||
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
hello2, err := client2.RunUntilHello(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Join room by id.
|
||||
roomId := "test-room"
|
||||
if room, err := client1.JoinRoomWithRoomSession(ctx, roomId, "roomsession1"); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if room.Room.RoomId != roomId {
|
||||
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
|
||||
}
|
||||
|
||||
// We will receive a "joined" event.
|
||||
if err := client1.RunUntilJoined(ctx, hello1.Hello); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// Client 2 may not request an offer (he is not in the room yet).
|
||||
if err := client2.SendMessage(MessageClientMessageRecipient{
|
||||
Type: "session",
|
||||
SessionId: hello1.Hello.SessionId,
|
||||
}, MessageClientMessageData{
|
||||
Type: "requestoffer",
|
||||
Sid: "12345",
|
||||
RoomType: "screen",
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if msg, err := client2.RunUntilMessage(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
if err := checkMessageError(msg, "not_allowed"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
if room, err := client2.JoinRoom(ctx, roomId); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if room.Room.RoomId != roomId {
|
||||
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
|
||||
}
|
||||
|
||||
// We will receive a "joined" event.
|
||||
if err := client1.RunUntilJoined(ctx, hello2.Hello); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// Client 2 may not request an offer (he is not in the call yet).
|
||||
if err := client2.SendMessage(MessageClientMessageRecipient{
|
||||
Type: "session",
|
||||
SessionId: hello1.Hello.SessionId,
|
||||
}, MessageClientMessageData{
|
||||
Type: "requestoffer",
|
||||
Sid: "12345",
|
||||
RoomType: "screen",
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if msg, err := client2.RunUntilMessage(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
if err := checkMessageError(msg, "not_allowed"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Simulate request from the backend that somebody joined the call.
|
||||
users1 := []map[string]interface{}{
|
||||
{
|
||||
"sessionId": hello2.Hello.SessionId,
|
||||
"inCall": 1,
|
||||
},
|
||||
}
|
||||
room := hub.getRoom(roomId)
|
||||
if room == nil {
|
||||
t.Fatalf("Could not find room %s", roomId)
|
||||
}
|
||||
room.PublishUsersInCallChanged(users1, users1)
|
||||
if err := checkReceiveClientEvent(ctx, client1, "update", nil); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := checkReceiveClientEvent(ctx, client2, "update", nil); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// Client 2 may not request an offer (recipient is not in the call yet).
|
||||
if err := client2.SendMessage(MessageClientMessageRecipient{
|
||||
Type: "session",
|
||||
SessionId: hello1.Hello.SessionId,
|
||||
}, MessageClientMessageData{
|
||||
Type: "requestoffer",
|
||||
Sid: "12345",
|
||||
RoomType: "screen",
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if msg, err := client2.RunUntilMessage(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
if err := checkMessageError(msg, "not_allowed"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Simulate request from the backend that somebody joined the call.
|
||||
users2 := []map[string]interface{}{
|
||||
{
|
||||
"sessionId": hello1.Hello.SessionId,
|
||||
"inCall": 1,
|
||||
},
|
||||
}
|
||||
room.PublishUsersInCallChanged(users2, users2)
|
||||
if err := checkReceiveClientEvent(ctx, client1, "update", nil); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := checkReceiveClientEvent(ctx, client2, "update", nil); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// Client 2 may request an offer now (both are in the same room and call).
|
||||
if err := client2.SendMessage(MessageClientMessageRecipient{
|
||||
Type: "session",
|
||||
SessionId: hello1.Hello.SessionId,
|
||||
}, MessageClientMessageData{
|
||||
Type: "requestoffer",
|
||||
Sid: "12345",
|
||||
RoomType: "screen",
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if msg, err := client2.RunUntilMessage(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
// We check for "client_not_found" as the testing MCU doesn't support publishing/subscribing.
|
||||
if err := checkMessageError(msg, "client_not_found"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestNoSendBetweenSessionsOnDifferentBackends(t *testing.T) {
|
||||
// Clients can't send messages to sessions connected from other backends.
|
||||
hub, _, _, server, shutdown := CreateHubWithMultipleBackendsForTest(t)
|
||||
|
@ -2585,15 +2770,19 @@ func TestNoSameRoomOnDifferentBackends(t *testing.T) {
|
|||
}
|
||||
|
||||
hub.ru.RLock()
|
||||
roomCount := 0
|
||||
var rooms []*Room
|
||||
for _, room := range hub.rooms {
|
||||
defer room.Close()
|
||||
roomCount++
|
||||
rooms = append(rooms, room)
|
||||
}
|
||||
hub.ru.RUnlock()
|
||||
|
||||
if roomCount != 2 {
|
||||
t.Errorf("Expected 2 rooms, got %d", roomCount)
|
||||
if len(rooms) != 2 {
|
||||
t.Errorf("Expected 2 rooms, got %+v", rooms)
|
||||
}
|
||||
|
||||
if rooms[0].IsEqual(rooms[1]) {
|
||||
t.Errorf("Rooms should be different: %+v", rooms)
|
||||
}
|
||||
|
||||
recipient := MessageClientMessageRecipient{
|
||||
|
|
29
room.go
29
room.go
|
@ -159,6 +159,28 @@ func (r *Room) Backend() *Backend {
|
|||
return r.backend
|
||||
}
|
||||
|
||||
func (r *Room) IsEqual(other *Room) bool {
|
||||
if r == other {
|
||||
return true
|
||||
} else if other == nil {
|
||||
return false
|
||||
} else if r.Id() != other.Id() {
|
||||
return false
|
||||
}
|
||||
|
||||
b1 := r.Backend()
|
||||
b2 := other.Backend()
|
||||
if b1 == b2 {
|
||||
return true
|
||||
} else if b1 == nil && b2 != nil {
|
||||
return false
|
||||
} else if b1 != nil && b2 == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return b1.Id() == b2.Id()
|
||||
}
|
||||
|
||||
func (r *Room) run() {
|
||||
ticker := time.NewTicker(updateActiveSessionsInterval)
|
||||
loop:
|
||||
|
@ -320,6 +342,13 @@ func (r *Room) HasSession(session Session) bool {
|
|||
return result
|
||||
}
|
||||
|
||||
func (r *Room) IsSessionInCall(session Session) bool {
|
||||
r.mu.RLock()
|
||||
_, result := r.inCallSessions[session]
|
||||
r.mu.RUnlock()
|
||||
return result
|
||||
}
|
||||
|
||||
// Returns "true" if there are still clients in the room.
|
||||
func (r *Room) RemoveSession(session Session) bool {
|
||||
r.mu.Lock()
|
||||
|
|
|
@ -29,6 +29,11 @@ key = /etc/nginx/ssl/server.key
|
|||
# See "https://golang.org/pkg/net/http/pprof/" for further information.
|
||||
debug = false
|
||||
|
||||
# Set to "true" to allow subscribing any streams. This is insecure and should
|
||||
# only be enabled for testing. By default only streams of users in the same
|
||||
# room and call can be subscribed.
|
||||
#allowsubscribeany = false
|
||||
|
||||
[sessions]
|
||||
# Secret value used to generate checksums of sessions. This should be a random
|
||||
# string of 32 or 64 bytes.
|
||||
|
|
Loading…
Reference in New Issue