Support receiving and forwarding multiple chat messages from Talk.

This commit is contained in:
Joachim Bauch 2026-01-29 11:31:08 +01:00
commit eafa39a1c5
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
3 changed files with 259 additions and 40 deletions

View file

@ -1215,6 +1215,14 @@ type RoomEventMessageDataChat struct {
// Comment will be included if the client supports the "chat-relay" feature.
Comment json.RawMessage `json:"comment,omitempty"`
// Comments will be included if the client supports the "chat-relay" feature.
Comments []json.RawMessage `json:"comments,omitempty"`
}
func (m *RoomEventMessageDataChat) HasComment() bool {
return len(m.Comment) > 0 || slices.ContainsFunc(m.Comments, func(comment json.RawMessage) bool {
return len(comment) > 0
})
}
type RoomEventMessageData struct {

View file

@ -721,15 +721,14 @@ func (s *ClientSession) sendCandidate(client sfu.Client, sender api.PublicSessio
}
// +checklocks:s.mu
func (s *ClientSession) sendMessageUnlocked(message *api.ServerMessage) bool {
func (s *ClientSession) sendMessageUnlocked(message *api.ServerMessage) {
if c := s.getClientUnlocked(); c != nil {
if c.SendMessage(message) {
return true
return
}
}
s.storePendingMessage(message)
return true
}
func (s *ClientSession) SendError(e *api.Error) bool {
@ -741,15 +740,21 @@ func (s *ClientSession) SendError(e *api.Error) bool {
}
func (s *ClientSession) SendMessage(message *api.ServerMessage) bool {
message = s.filterMessage(message)
if message == nil {
message, messages := s.filterMessage(message)
if message == nil && len(messages) == 0 {
return true
}
s.mu.Lock()
defer s.mu.Unlock()
return s.sendMessageUnlocked(message)
if message != nil {
s.sendMessageUnlocked(message)
}
for _, msg := range messages {
s.sendMessageUnlocked(msg)
}
return true
}
func (s *ClientSession) SendMessages(messages []*api.ServerMessage) bool {
@ -1333,7 +1338,7 @@ func (s *ClientSession) filterDuplicateFlags(message *api.RoomFlagsServerMessage
return false
}
func (s *ClientSession) filterMessage(message *api.ServerMessage) *api.ServerMessage {
func (s *ClientSession) filterMessage(message *api.ServerMessage) (*api.ServerMessage, []*api.ServerMessage) {
switch message.Type {
case "event":
switch message.Event.Target {
@ -1356,7 +1361,7 @@ func (s *ClientSession) filterMessage(message *api.ServerMessage) *api.ServerMes
m.Changed = nil
case "flags":
if s.filterDuplicateFlags(message.Event.Flags) {
return nil
return nil, nil
}
}
case "room":
@ -1364,7 +1369,7 @@ func (s *ClientSession) filterMessage(message *api.ServerMessage) *api.ServerMes
case "join":
join := s.filterDuplicateJoin(message.Event.Join)
if len(join) == 0 {
return nil
return nil, nil
}
copied := false
if len(join) != len(message.Event.Join) {
@ -1402,7 +1407,7 @@ func (s *ClientSession) filterMessage(message *api.ServerMessage) *api.ServerMes
leave := s.filterUnknownLeave(message.Event.Leave)
if len(leave) == 0 {
return nil
return nil, nil
}
for _, e := range message.Event.Leave {
@ -1423,57 +1428,104 @@ func (s *ClientSession) filterMessage(message *api.ServerMessage) *api.ServerMes
}
case "message":
if message.Event.Message == nil || len(message.Event.Message.Data) == 0 {
return message
return message, nil
}
data, err := message.Event.Message.GetData()
if data == nil || err != nil {
return message
return message, nil
}
if data.Type == "chat" && data.Chat != nil {
update := false
if data.Chat.Refresh && len(data.Chat.Comment) > 0 {
if data.Chat.Refresh && data.Chat.HasComment() {
// New-style chat event, check what the client supports.
if s.HasFeature(api.ClientFeatureChatRelay) {
data.Chat.Refresh = false
} else {
data.Chat.Comment = nil
data.Chat.Comments = nil
}
update = true
}
if len(data.Chat.Comment) > 0 && s.HasPermission(api.PERMISSION_HIDE_DISPLAYNAMES) {
var comment api.ChatComment
if err := json.Unmarshal(data.Chat.Comment, &comment); err != nil {
return message
}
if displayName, found := comment["actorDisplayName"]; found && displayName != "" {
comment["actorDisplayName"] = ""
var err error
if data.Chat.Comment, err = json.Marshal(comment); err != nil {
return message
if data.Chat.HasComment() {
data.Chat.Comments = slices.DeleteFunc(data.Chat.Comments, func(comment json.RawMessage) bool {
return len(comment) == 0
})
if len(data.Chat.Comment) > 0 {
if len(data.Chat.Comments) == 0 {
data.Chat.Comments = []json.RawMessage{data.Chat.Comment}
} else {
data.Chat.Comments = append([]json.RawMessage{data.Chat.Comment}, data.Chat.Comments...)
}
data.Chat.Comment = nil
}
if len(data.Chat.Comments) > 0 && s.HasPermission(api.PERMISSION_HIDE_DISPLAYNAMES) {
for i, commentData := range data.Chat.Comments {
var comment api.ChatComment
if err := json.Unmarshal(commentData, &comment); err != nil {
continue
}
if displayName, found := comment["actorDisplayName"]; found && displayName != "" {
comment["actorDisplayName"] = ""
var err error
if commentData, err = json.Marshal(comment); err != nil {
continue
}
data.Chat.Comments[i] = commentData
update = true
}
}
update = true
}
}
if update {
if encoded, err := json.Marshal(data); err == nil {
// Create unique copy of message for only this client.
message = &api.ServerMessage{
Id: message.Id,
Type: message.Type,
Event: &api.EventServerMessage{
Type: message.Event.Type,
Target: message.Event.Target,
Message: &api.RoomEventMessage{
RoomId: message.Event.Message.RoomId,
Data: encoded,
if update || len(data.Chat.Comments) > 0 {
if len(data.Chat.Comment) == 0 && len(data.Chat.Comments) == 0 {
if encoded, err := json.Marshal(data); err == nil {
// Create unique copy of message for only this client.
message = &api.ServerMessage{
Id: message.Id,
Type: message.Type,
Event: &api.EventServerMessage{
Type: message.Event.Type,
Target: message.Event.Target,
Message: &api.RoomEventMessage{
RoomId: message.Event.Message.RoomId,
Data: encoded,
},
},
},
}
}
} else {
// Forward different chat comments individually.
var result []*api.ServerMessage
for _, comment := range data.Chat.Comments {
commentData := api.RoomEventMessageData{
Type: data.Type,
Chat: &api.RoomEventMessageDataChat{
Refresh: data.Chat.Refresh,
Comment: comment,
},
}
if encoded, err := json.Marshal(commentData); err == nil {
// Create unique copy of message for only this client.
result = append(result, &api.ServerMessage{
Id: message.Id,
Type: message.Type,
Event: &api.EventServerMessage{
Type: message.Event.Type,
Target: message.Event.Target,
Message: &api.RoomEventMessage{
RoomId: message.Event.Message.RoomId,
Data: encoded,
},
},
})
}
}
return nil, result
}
}
}
@ -1483,16 +1535,16 @@ func (s *ClientSession) filterMessage(message *api.ServerMessage) *api.ServerMes
if message.Message != nil && len(message.Message.Data) > 0 && s.HasPermission(api.PERMISSION_HIDE_DISPLAYNAMES) {
var data api.MessageServerMessageData
if err := json.Unmarshal(message.Message.Data, &data); err != nil {
return message
return message, nil
}
if data.Type == "nickChanged" {
return nil
return nil, nil
}
}
}
return message
return message, nil
}
func (s *ClientSession) filterAsyncMessage(msg *events.AsyncMessage) *api.ServerMessage {

View file

@ -249,6 +249,72 @@ func TestFeatureChatRelay(t *testing.T) {
}
}
}
chatComment2 := api.StringMap{
"hello": "world",
}
message2 := api.StringMap{
"type": "chat",
"chat": api.StringMap{
"refresh": true,
"comments": []api.StringMap{
chatComment,
chatComment2,
},
},
}
data2, err := json.Marshal(message2)
require.NoError(err)
// Simulate request from the backend.
room.processAsyncMessage(&events.AsyncMessage{
Type: "room",
Room: &talk.BackendServerRoomRequest{
Type: "message",
Message: &talk.BackendRoomMessageRequest{
Data: data2,
},
},
})
if msg, ok := client.RunUntilRoomMessage(ctx); ok {
assert.Equal(roomId, msg.RoomId)
var data api.StringMap
if err := json.Unmarshal(msg.Data, &data); assert.NoError(err) {
assert.Equal("chat", data["type"], "invalid type entry in %+v", data)
if chat, found := api.GetStringMapEntry[map[string]any](data, "chat"); assert.True(found, "chat entry is missing in %+v", data) {
if feature {
assert.EqualValues(chatComment, chat["comment"])
_, found := chat["refresh"]
assert.False(found, "refresh should not be included")
// A second message with the second comment will be sent
if msg, ok := client.RunUntilRoomMessage(ctx); ok {
assert.Equal(roomId, msg.RoomId)
if err := json.Unmarshal(msg.Data, &data); assert.NoError(err) {
assert.Equal("chat", data["type"], "invalid type entry in %+v", data)
if chat, found := api.GetStringMapEntry[map[string]any](data, "chat"); assert.True(found, "chat entry is missing in %+v", data) {
assert.EqualValues(chatComment2, chat["comment"])
_, found := chat["refresh"]
assert.False(found, "refresh should not be included")
}
}
}
} else {
// Only a single refresh will be sent
assert.Equal(true, chat["refresh"])
_, found := chat["comment"]
assert.False(found, "the comment should not be included")
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel2()
client.RunUntilErrorIs(ctx2, context.DeadlineExceeded)
}
}
}
}
}
}
@ -461,6 +527,99 @@ func TestFeatureChatRelayFederation(t *testing.T) {
}
}
}
chatComment2 := api.StringMap{
"hello": "world",
}
message2 := api.StringMap{
"type": "chat",
"chat": api.StringMap{
"refresh": true,
"comments": []api.StringMap{
chatComment,
chatComment2,
},
},
}
data2, err := json.Marshal(message2)
require.NoError(err)
// Simulate request from the backend.
room.processAsyncMessage(&events.AsyncMessage{
Type: "room",
Room: &talk.BackendServerRoomRequest{
Type: "message",
Message: &talk.BackendRoomMessageRequest{
Data: data2,
},
},
})
// The first client will receive the message for the local room (always including the actual message).
if msg, ok := client1.RunUntilRoomMessage(ctx); ok {
assert.Equal(roomId, msg.RoomId)
var data api.StringMap
if err := json.Unmarshal(msg.Data, &data); assert.NoError(err) {
assert.Equal("chat", data["type"], "invalid type entry in %+v", data)
if chat, found := api.GetStringMapEntry[map[string]any](data, "chat"); assert.True(found, "chat entry is missing in %+v", data) {
AssertEqualSerialized(t, chatComment, chat["comment"])
_, found := chat["refresh"]
assert.False(found, "refresh should not be included")
}
}
}
// A second message with the second comment will be sent
if msg, ok := client1.RunUntilRoomMessage(ctx); ok {
assert.Equal(roomId, msg.RoomId)
var data api.StringMap
if err := json.Unmarshal(msg.Data, &data); assert.NoError(err) {
assert.Equal("chat", data["type"], "invalid type entry in %+v", data)
if chat, found := api.GetStringMapEntry[map[string]any](data, "chat"); assert.True(found, "chat entry is missing in %+v", data) {
assert.EqualValues(chatComment2, chat["comment"])
_, found := chat["refresh"]
assert.False(found, "refresh should not be included")
}
}
}
// The second client will receive the message from the federated room (either as refresh or with the message).
if msg, ok := client2.RunUntilRoomMessage(ctx); ok {
assert.Equal(federatedRoomId, msg.RoomId)
var data api.StringMap
if err := json.Unmarshal(msg.Data, &data); assert.NoError(err) {
assert.Equal("chat", data["type"], "invalid type entry in %+v", data)
if chat, found := api.GetStringMapEntry[map[string]any](data, "chat"); assert.True(found, "chat entry is missing in %+v", data) {
if feature {
AssertEqualSerialized(t, federatedChatComment, chat["comment"])
_, found := chat["refresh"]
assert.False(found, "refresh should not be included")
// A second message with the second comment will be sent
if msg, ok := client2.RunUntilRoomMessage(ctx); ok {
assert.Equal(federatedRoomId, msg.RoomId)
if err := json.Unmarshal(msg.Data, &data); assert.NoError(err) {
assert.Equal("chat", data["type"], "invalid type entry in %+v", data)
if chat, found := api.GetStringMapEntry[map[string]any](data, "chat"); assert.True(found, "chat entry is missing in %+v", data) {
assert.EqualValues(chatComment2, chat["comment"])
_, found := chat["refresh"]
assert.False(found, "refresh should not be included")
}
}
}
} else {
// Only a single refresh will be sent
assert.Equal(true, chat["refresh"])
_, found := chat["comment"]
assert.False(found, "the comment should not be included")
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel2()
client2.RunUntilErrorIs(ctx2, context.DeadlineExceeded)
}
}
}
}
}
}