Only send single "incall" message with "all: true" in clustered setup.

Previously each instance would send one message to all users in the cluster.
This commit is contained in:
Joachim Bauch 2022-07-04 15:24:33 +02:00
parent b2da4002a4
commit ad1dea2780
No known key found for this signature in database
GPG Key ID: 77C1D22D53E15F02
2 changed files with 268 additions and 3 deletions

View File

@ -1332,6 +1332,256 @@ func TestBackendServer_ParticipantsUpdateTimeout(t *testing.T) {
}
}
func TestBackendServer_InCallAll(t *testing.T) {
for _, subtest := range clusteredTests {
t.Run(subtest, func(t *testing.T) {
var hub1 *Hub
var hub2 *Hub
var server1 *httptest.Server
var server2 *httptest.Server
if isLocalTest(t) {
_, _, _, hub1, _, server1 = CreateBackendServerForTest(t)
hub2 = hub1
server2 = server1
} else {
_, _, hub1, hub2, server1, server2 = CreateBackendServerWithClusteringForTest(t)
}
client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
session1 := hub1.GetSessionByPublicId(hello1.Hello.SessionId)
if session1 == nil {
t.Fatalf("Could not find session %s", hello1.Hello.SessionId)
}
session2 := hub2.GetSessionByPublicId(hello2.Hello.SessionId)
if session2 == nil {
t.Fatalf("Could not find session %s", hello2.Hello.SessionId)
}
// Join room by id.
roomId := "test-room"
if room, err := client1.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
// Give message processing some time.
time.Sleep(10 * time.Millisecond)
if room, err := client2.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2)
room1 := hub1.getRoom(roomId)
if room1 == nil {
t.Fatalf("Could not find room %s in hub1", roomId)
}
room2 := hub2.getRoom(roomId)
if room2 == nil {
t.Fatalf("Could not find room %s in hub2", roomId)
}
if room1.IsSessionInCall(session1) {
t.Errorf("Session %s should not be in room %s", session1.PublicId(), room1.Id())
}
if room2.IsSessionInCall(session2) {
t.Errorf("Session %s should not be in room %s", session2.PublicId(), room2.Id())
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
msg := &BackendServerRoomRequest{
Type: "incall",
InCall: &BackendRoomInCallRequest{
InCall: json.RawMessage("7"),
All: true,
},
}
data, err := json.Marshal(msg)
if err != nil {
t.Error(err)
return
}
res, err := performBackendRequest(server1.URL+"/api/v1/room/"+roomId, data)
if err != nil {
t.Error(err)
return
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
t.Error(err)
}
if res.StatusCode != 200 {
t.Errorf("Expected successful request, got %s: %s", res.Status, string(body))
}
}()
wg.Wait()
if t.Failed() {
return
}
if msg1_a, err := client1.RunUntilMessage(ctx); err != nil {
t.Error(err)
} else if in_call_1, err := checkMessageParticipantsInCall(msg1_a); err != nil {
t.Error(err)
} else if !in_call_1.All {
t.Errorf("All flag not set in message %+v", in_call_1)
} else if !bytes.Equal(*in_call_1.InCall, []byte("7")) {
t.Errorf("Expected inCall flag 7, got %s", string(*in_call_1.InCall))
}
if msg2_a, err := client2.RunUntilMessage(ctx); err != nil {
t.Error(err)
} else if in_call_1, err := checkMessageParticipantsInCall(msg2_a); err != nil {
t.Error(err)
} else if !in_call_1.All {
t.Errorf("All flag not set in message %+v", in_call_1)
} else if !bytes.Equal(*in_call_1.InCall, []byte("7")) {
t.Errorf("Expected inCall flag 7, got %s", string(*in_call_1.InCall))
}
if !room1.IsSessionInCall(session1) {
t.Errorf("Session %s should be in room %s", session1.PublicId(), room1.Id())
}
if !room2.IsSessionInCall(session2) {
t.Errorf("Session %s should be in room %s", session2.PublicId(), room2.Id())
}
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel2()
if message, err := client1.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded {
t.Error(err)
} else if message != nil {
t.Errorf("Expected no message, got %+v", message)
}
ctx3, cancel3 := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel3()
if message, err := client2.RunUntilMessage(ctx3); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded {
t.Error(err)
} else if message != nil {
t.Errorf("Expected no message, got %+v", message)
}
wg.Add(1)
go func() {
defer wg.Done()
msg := &BackendServerRoomRequest{
Type: "incall",
InCall: &BackendRoomInCallRequest{
InCall: json.RawMessage("0"),
All: true,
},
}
data, err := json.Marshal(msg)
if err != nil {
t.Error(err)
return
}
res, err := performBackendRequest(server1.URL+"/api/v1/room/"+roomId, data)
if err != nil {
t.Error(err)
return
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
t.Error(err)
}
if res.StatusCode != 200 {
t.Errorf("Expected successful request, got %s: %s", res.Status, string(body))
}
}()
wg.Wait()
if t.Failed() {
return
}
if msg1_a, err := client1.RunUntilMessage(ctx); err != nil {
t.Error(err)
} else if in_call_1, err := checkMessageParticipantsInCall(msg1_a); err != nil {
t.Error(err)
} else if !in_call_1.All {
t.Errorf("All flag not set in message %+v", in_call_1)
} else if !bytes.Equal(*in_call_1.InCall, []byte("0")) {
t.Errorf("Expected inCall flag 0, got %s", string(*in_call_1.InCall))
}
if msg2_a, err := client2.RunUntilMessage(ctx); err != nil {
t.Error(err)
} else if in_call_1, err := checkMessageParticipantsInCall(msg2_a); err != nil {
t.Error(err)
} else if !in_call_1.All {
t.Errorf("All flag not set in message %+v", in_call_1)
} else if !bytes.Equal(*in_call_1.InCall, []byte("0")) {
t.Errorf("Expected inCall flag 0, got %s", string(*in_call_1.InCall))
}
if room1.IsSessionInCall(session1) {
t.Errorf("Session %s should not be in room %s", session1.PublicId(), room1.Id())
}
if room2.IsSessionInCall(session2) {
t.Errorf("Session %s should not be in room %s", session2.PublicId(), room2.Id())
}
ctx4, cancel4 := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel4()
if message, err := client1.RunUntilMessage(ctx4); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded {
t.Error(err)
} else if message != nil {
t.Errorf("Expected no message, got %+v", message)
}
ctx5, cancel5 := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel5()
if message, err := client2.RunUntilMessage(ctx5); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded {
t.Error(err)
} else if message != nil {
t.Errorf("Expected no message, got %+v", message)
}
})
}
}
func TestBackendServer_RoomMessage(t *testing.T) {
_, _, _, hub, _, server := CreateBackendServerForTest(t)

21
room.go
View File

@ -692,11 +692,13 @@ func (r *Room) PublishUsersInCallChangedAll(inCall int) {
r.mu.Lock()
defer r.mu.Unlock()
var notify []*ClientSession
if inCall&FlagInCall != 0 {
// All connected sessions join the call.
var joined []string
for _, session := range r.sessions {
if _, ok := session.(*ClientSession); !ok {
clientSession, ok := session.(*ClientSession)
if !ok {
continue
}
@ -708,6 +710,7 @@ func (r *Room) PublishUsersInCallChangedAll(inCall int) {
r.inCallSessions[session] = true
joined = append(joined, session.PublicId())
}
notify = append(notify, clientSession)
}
if len(joined) == 0 {
@ -729,6 +732,15 @@ func (r *Room) PublishUsersInCallChangedAll(inCall int) {
}
}()
for _, session := range r.sessions {
clientSession, ok := session.(*ClientSession)
if !ok {
continue
}
notify = append(notify, clientSession)
}
for session := range r.inCallSessions {
if clientSession, ok := session.(*ClientSession); ok {
ch <- clientSession
@ -755,8 +767,11 @@ func (r *Room) PublishUsersInCallChangedAll(inCall int) {
},
},
}
if err := r.publish(message); err != nil {
log.Printf("Could not publish incall message in room %s: %s", r.Id(), err)
for _, session := range notify {
if !session.SendMessage(message) {
log.Printf("Could not send incall message from room %s to %s", r.Id(), session.PublicId())
}
}
}