Prevent duplicate virtual sessions in participant update events.

This can happen if these sessions are returned by Nextcloud and then the
local session is added.

Also fixes events in clustered scenarios where virtual sessions are available
on on server but the update events are generated on a different one.
This commit is contained in:
Joachim Bauch 2024-10-30 11:59:28 +01:00
commit 72e38b7814
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
9 changed files with 897 additions and 86 deletions

View file

@ -249,6 +249,36 @@ func (c *GrpcClient) IsSessionInCall(ctx context.Context, sessionId string, room
return response.GetInCall(), nil
}
func (c *GrpcClient) GetInternalSessions(ctx context.Context, roomId string, backend *Backend) (internal map[string]*InternalSessionData, virtual map[string]*VirtualSessionData, err error) {
statsGrpcClientCalls.WithLabelValues("GetInternalSessions").Inc()
// TODO: Remove debug logging
log.Printf("Get internal sessions for %s@%s on %s", roomId, backend.Id(), c.Target())
response, err := c.impl.GetInternalSessions(ctx, &GetInternalSessionsRequest{
RoomId: roomId,
BackendUrl: backend.Url(),
}, grpc.WaitForReady(true))
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
return nil, nil, nil
} else if err != nil {
return nil, nil, err
}
if len(response.InternalSessions) > 0 {
internal = make(map[string]*InternalSessionData, len(response.InternalSessions))
for _, s := range response.InternalSessions {
internal[s.SessionId] = s
}
}
if len(response.VirtualSessions) > 0 {
virtual = make(map[string]*VirtualSessionData, len(response.VirtualSessions))
for _, s := range response.VirtualSessions {
virtual[s.SessionId] = s
}
}
return
}
func (c *GrpcClient) GetPublisherId(ctx context.Context, sessionId string, streamType StreamType) (string, string, net.IP, error) {
statsGrpcClientCalls.WithLabelValues("GetPublisherId").Inc()
// TODO: Remove debug logging

View file

@ -59,6 +59,7 @@ type GrpcServerHub interface {
GetSessionByResumeId(resumeId string) Session
GetSessionByPublicId(sessionId string) Session
GetSessionIdByRoomSessionId(roomSessionId string) (string, error)
GetRoomForBackend(roomId string, backend *Backend) *Room
GetBackend(u *url.URL) *Backend
}
@ -185,6 +186,52 @@ func (s *GrpcServer) IsSessionInCall(ctx context.Context, request *IsSessionInCa
return result, nil
}
func (s *GrpcServer) GetInternalSessions(ctx context.Context, request *GetInternalSessionsRequest) (*GetInternalSessionsReply, error) {
statsGrpcServerCalls.WithLabelValues("GetInternalSessions").Inc()
// TODO: Remove debug logging
log.Printf("Get internal sessions from %s on %s", request.RoomId, request.BackendUrl)
var u *url.URL
if request.BackendUrl != "" {
var err error
u, err = url.Parse(request.BackendUrl)
if err != nil {
return nil, status.Error(codes.InvalidArgument, "invalid url")
}
}
backend := s.hub.GetBackend(u)
if backend == nil {
return nil, status.Error(codes.NotFound, "no such backend")
}
room := s.hub.GetRoomForBackend(request.RoomId, backend)
if room == nil {
return nil, status.Error(codes.NotFound, "no such room")
}
result := &GetInternalSessionsReply{}
room.mu.RLock()
defer room.mu.RUnlock()
for session := range room.internalSessions {
result.InternalSessions = append(result.InternalSessions, &InternalSessionData{
SessionId: session.PublicId(),
InCall: uint32(session.GetInCall()),
Features: session.GetFeatures(),
})
}
for session := range room.virtualSessions {
result.VirtualSessions = append(result.VirtualSessions, &VirtualSessionData{
SessionId: session.PublicId(),
InCall: uint32(session.GetInCall()),
})
}
return result, nil
}
func (s *GrpcServer) GetPublisherId(ctx context.Context, request *GetPublisherIdRequest) (*GetPublisherIdReply, error) {
statsGrpcServerCalls.WithLabelValues("GetPublisherId").Inc()
// TODO: Remove debug logging

View file

@ -333,6 +333,226 @@ func (x *IsSessionInCallReply) GetInCall() bool {
return false
}
type GetInternalSessionsRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
RoomId string `protobuf:"bytes,1,opt,name=roomId,proto3" json:"roomId,omitempty"`
BackendUrl string `protobuf:"bytes,2,opt,name=backendUrl,proto3" json:"backendUrl,omitempty"`
}
func (x *GetInternalSessionsRequest) Reset() {
*x = GetInternalSessionsRequest{}
mi := &file_grpc_sessions_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *GetInternalSessionsRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetInternalSessionsRequest) ProtoMessage() {}
func (x *GetInternalSessionsRequest) ProtoReflect() protoreflect.Message {
mi := &file_grpc_sessions_proto_msgTypes[6]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetInternalSessionsRequest.ProtoReflect.Descriptor instead.
func (*GetInternalSessionsRequest) Descriptor() ([]byte, []int) {
return file_grpc_sessions_proto_rawDescGZIP(), []int{6}
}
func (x *GetInternalSessionsRequest) GetRoomId() string {
if x != nil {
return x.RoomId
}
return ""
}
func (x *GetInternalSessionsRequest) GetBackendUrl() string {
if x != nil {
return x.BackendUrl
}
return ""
}
type InternalSessionData struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
SessionId string `protobuf:"bytes,1,opt,name=sessionId,proto3" json:"sessionId,omitempty"`
InCall uint32 `protobuf:"varint,2,opt,name=inCall,proto3" json:"inCall,omitempty"`
Features []string `protobuf:"bytes,3,rep,name=features,proto3" json:"features,omitempty"`
}
func (x *InternalSessionData) Reset() {
*x = InternalSessionData{}
mi := &file_grpc_sessions_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *InternalSessionData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*InternalSessionData) ProtoMessage() {}
func (x *InternalSessionData) ProtoReflect() protoreflect.Message {
mi := &file_grpc_sessions_proto_msgTypes[7]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use InternalSessionData.ProtoReflect.Descriptor instead.
func (*InternalSessionData) Descriptor() ([]byte, []int) {
return file_grpc_sessions_proto_rawDescGZIP(), []int{7}
}
func (x *InternalSessionData) GetSessionId() string {
if x != nil {
return x.SessionId
}
return ""
}
func (x *InternalSessionData) GetInCall() uint32 {
if x != nil {
return x.InCall
}
return 0
}
func (x *InternalSessionData) GetFeatures() []string {
if x != nil {
return x.Features
}
return nil
}
type VirtualSessionData struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
SessionId string `protobuf:"bytes,1,opt,name=sessionId,proto3" json:"sessionId,omitempty"`
InCall uint32 `protobuf:"varint,2,opt,name=inCall,proto3" json:"inCall,omitempty"`
}
func (x *VirtualSessionData) Reset() {
*x = VirtualSessionData{}
mi := &file_grpc_sessions_proto_msgTypes[8]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *VirtualSessionData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*VirtualSessionData) ProtoMessage() {}
func (x *VirtualSessionData) ProtoReflect() protoreflect.Message {
mi := &file_grpc_sessions_proto_msgTypes[8]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use VirtualSessionData.ProtoReflect.Descriptor instead.
func (*VirtualSessionData) Descriptor() ([]byte, []int) {
return file_grpc_sessions_proto_rawDescGZIP(), []int{8}
}
func (x *VirtualSessionData) GetSessionId() string {
if x != nil {
return x.SessionId
}
return ""
}
func (x *VirtualSessionData) GetInCall() uint32 {
if x != nil {
return x.InCall
}
return 0
}
type GetInternalSessionsReply struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
InternalSessions []*InternalSessionData `protobuf:"bytes,1,rep,name=internalSessions,proto3" json:"internalSessions,omitempty"`
VirtualSessions []*VirtualSessionData `protobuf:"bytes,2,rep,name=virtualSessions,proto3" json:"virtualSessions,omitempty"`
}
func (x *GetInternalSessionsReply) Reset() {
*x = GetInternalSessionsReply{}
mi := &file_grpc_sessions_proto_msgTypes[9]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *GetInternalSessionsReply) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetInternalSessionsReply) ProtoMessage() {}
func (x *GetInternalSessionsReply) ProtoReflect() protoreflect.Message {
mi := &file_grpc_sessions_proto_msgTypes[9]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetInternalSessionsReply.ProtoReflect.Descriptor instead.
func (*GetInternalSessionsReply) Descriptor() ([]byte, []int) {
return file_grpc_sessions_proto_rawDescGZIP(), []int{9}
}
func (x *GetInternalSessionsReply) GetInternalSessions() []*InternalSessionData {
if x != nil {
return x.InternalSessions
}
return nil
}
func (x *GetInternalSessionsReply) GetVirtualSessions() []*VirtualSessionData {
if x != nil {
return x.VirtualSessions
}
return nil
}
type ClientSessionMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -343,7 +563,7 @@ type ClientSessionMessage struct {
func (x *ClientSessionMessage) Reset() {
*x = ClientSessionMessage{}
mi := &file_grpc_sessions_proto_msgTypes[6]
mi := &file_grpc_sessions_proto_msgTypes[10]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -355,7 +575,7 @@ func (x *ClientSessionMessage) String() string {
func (*ClientSessionMessage) ProtoMessage() {}
func (x *ClientSessionMessage) ProtoReflect() protoreflect.Message {
mi := &file_grpc_sessions_proto_msgTypes[6]
mi := &file_grpc_sessions_proto_msgTypes[10]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -368,7 +588,7 @@ func (x *ClientSessionMessage) ProtoReflect() protoreflect.Message {
// Deprecated: Use ClientSessionMessage.ProtoReflect.Descriptor instead.
func (*ClientSessionMessage) Descriptor() ([]byte, []int) {
return file_grpc_sessions_proto_rawDescGZIP(), []int{6}
return file_grpc_sessions_proto_rawDescGZIP(), []int{10}
}
func (x *ClientSessionMessage) GetMessage() []byte {
@ -388,7 +608,7 @@ type ServerSessionMessage struct {
func (x *ServerSessionMessage) Reset() {
*x = ServerSessionMessage{}
mi := &file_grpc_sessions_proto_msgTypes[7]
mi := &file_grpc_sessions_proto_msgTypes[11]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -400,7 +620,7 @@ func (x *ServerSessionMessage) String() string {
func (*ServerSessionMessage) ProtoMessage() {}
func (x *ServerSessionMessage) ProtoReflect() protoreflect.Message {
mi := &file_grpc_sessions_proto_msgTypes[7]
mi := &file_grpc_sessions_proto_msgTypes[11]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -413,7 +633,7 @@ func (x *ServerSessionMessage) ProtoReflect() protoreflect.Message {
// Deprecated: Use ServerSessionMessage.ProtoReflect.Descriptor instead.
func (*ServerSessionMessage) Descriptor() ([]byte, []int) {
return file_grpc_sessions_proto_rawDescGZIP(), []int{7}
return file_grpc_sessions_proto_rawDescGZIP(), []int{11}
}
func (x *ServerSessionMessage) GetMessage() []byte {
@ -454,41 +674,75 @@ var file_grpc_sessions_proto_rawDesc = []byte{
0x52, 0x0a, 0x62, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x55, 0x72, 0x6c, 0x22, 0x2e, 0x0a, 0x14,
0x49, 0x73, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x52,
0x65, 0x70, 0x6c, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x18, 0x01,
0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x22, 0x30, 0x0a, 0x14,
0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18,
0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x30,
0x0a, 0x14, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4d,
0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x32, 0xed, 0x02, 0x0a, 0x0b, 0x52, 0x70, 0x63, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73,
0x12, 0x54, 0x0a, 0x0e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65,
0x49, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x4c,
0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x64, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67,
0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x64, 0x52,
0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x57, 0x0a, 0x0f, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70,
0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x21, 0x2e, 0x73, 0x69, 0x67, 0x6e,
0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x22, 0x54, 0x0a, 0x1a,
0x47, 0x65, 0x74, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69,
0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x6f,
0x6f, 0x6d, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x6f, 0x6f, 0x6d,
0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x62, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x55, 0x72, 0x6c,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x62, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x55,
0x72, 0x6c, 0x22, 0x67, 0x0a, 0x13, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65,
0x73, 0x73, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x65, 0x73,
0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x65,
0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c,
0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x12,
0x1a, 0x0a, 0x08, 0x66, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28,
0x09, 0x52, 0x08, 0x66, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x22, 0x4a, 0x0a, 0x12, 0x56,
0x69, 0x72, 0x74, 0x75, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74,
0x61, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12,
0x16, 0x0a, 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52,
0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x22, 0xaf, 0x01, 0x0a, 0x18, 0x47, 0x65, 0x74, 0x49,
0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x52,
0x65, 0x70, 0x6c, 0x79, 0x12, 0x4a, 0x0a, 0x10, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c,
0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1e,
0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72,
0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x52, 0x10,
0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73,
0x12, 0x47, 0x0a, 0x0f, 0x76, 0x69, 0x72, 0x74, 0x75, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69,
0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x73, 0x69, 0x67, 0x6e,
0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x56, 0x69, 0x72, 0x74, 0x75, 0x61, 0x6c, 0x53, 0x65, 0x73,
0x73, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x52, 0x0f, 0x76, 0x69, 0x72, 0x74, 0x75, 0x61,
0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x30, 0x0a, 0x14, 0x43, 0x6c, 0x69,
0x65, 0x6e, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01,
0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x30, 0x0a, 0x14, 0x53,
0x65, 0x72, 0x76, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01,
0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0xd2, 0x03,
0x0a, 0x0b, 0x52, 0x70, 0x63, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x54, 0x0a,
0x0e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x64, 0x12,
0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b,
0x75, 0x70, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x1e, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x4c, 0x6f,
0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x64, 0x52, 0x65, 0x70, 0x6c,
0x79, 0x22, 0x00, 0x12, 0x57, 0x0a, 0x0f, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x53, 0x65, 0x73,
0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x21, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69,
0x6e, 0x67, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e,
0x49, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e,
0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x53, 0x65, 0x73, 0x73,
0x69, 0x6f, 0x6e, 0x49, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73,
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x53,
0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12,
0x57, 0x0a, 0x0f, 0x49, 0x73, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x43, 0x61,
0x6c, 0x6c, 0x12, 0x21, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x49,
0x69, 0x6f, 0x6e, 0x49, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x57, 0x0a, 0x0f,
0x49, 0x73, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x12,
0x21, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x49, 0x73, 0x53, 0x65,
0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x49,
0x73, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e,
0x67, 0x2e, 0x49, 0x73, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x43, 0x61, 0x6c,
0x6c, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x56, 0x0a, 0x0c, 0x50, 0x72, 0x6f, 0x78,
0x79, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61,
0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69,
0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e,
0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73,
0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01,
0x42, 0x3c, 0x5a, 0x3a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73,
0x74, 0x72, 0x75, 0x6b, 0x74, 0x75, 0x72, 0x61, 0x67, 0x2f, 0x6e, 0x65, 0x78, 0x74, 0x63, 0x6c,
0x6f, 0x75, 0x64, 0x2d, 0x73, 0x70, 0x72, 0x65, 0x65, 0x64, 0x2d, 0x73, 0x69, 0x67, 0x6e, 0x61,
0x6c, 0x69, 0x6e, 0x67, 0x3b, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x63, 0x0a, 0x13, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x74, 0x65,
0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x25, 0x2e, 0x73,
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x74, 0x65,
0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e,
0x47, 0x65, 0x74, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69,
0x6f, 0x6e, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x56, 0x0a, 0x0c, 0x50, 0x72,
0x6f, 0x78, 0x79, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x2e, 0x73, 0x69, 0x67,
0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x73,
0x73, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1f, 0x2e, 0x73, 0x69,
0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x53, 0x65,
0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x28, 0x01,
0x30, 0x01, 0x42, 0x3c, 0x5a, 0x3a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d,
0x2f, 0x73, 0x74, 0x72, 0x75, 0x6b, 0x74, 0x75, 0x72, 0x61, 0x67, 0x2f, 0x6e, 0x65, 0x78, 0x74,
0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2d, 0x73, 0x70, 0x72, 0x65, 0x65, 0x64, 0x2d, 0x73, 0x69, 0x67,
0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x3b, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -503,31 +757,39 @@ func file_grpc_sessions_proto_rawDescGZIP() []byte {
return file_grpc_sessions_proto_rawDescData
}
var file_grpc_sessions_proto_msgTypes = make([]protoimpl.MessageInfo, 8)
var file_grpc_sessions_proto_msgTypes = make([]protoimpl.MessageInfo, 12)
var file_grpc_sessions_proto_goTypes = []any{
(*LookupResumeIdRequest)(nil), // 0: signaling.LookupResumeIdRequest
(*LookupResumeIdReply)(nil), // 1: signaling.LookupResumeIdReply
(*LookupSessionIdRequest)(nil), // 2: signaling.LookupSessionIdRequest
(*LookupSessionIdReply)(nil), // 3: signaling.LookupSessionIdReply
(*IsSessionInCallRequest)(nil), // 4: signaling.IsSessionInCallRequest
(*IsSessionInCallReply)(nil), // 5: signaling.IsSessionInCallReply
(*ClientSessionMessage)(nil), // 6: signaling.ClientSessionMessage
(*ServerSessionMessage)(nil), // 7: signaling.ServerSessionMessage
(*LookupResumeIdRequest)(nil), // 0: signaling.LookupResumeIdRequest
(*LookupResumeIdReply)(nil), // 1: signaling.LookupResumeIdReply
(*LookupSessionIdRequest)(nil), // 2: signaling.LookupSessionIdRequest
(*LookupSessionIdReply)(nil), // 3: signaling.LookupSessionIdReply
(*IsSessionInCallRequest)(nil), // 4: signaling.IsSessionInCallRequest
(*IsSessionInCallReply)(nil), // 5: signaling.IsSessionInCallReply
(*GetInternalSessionsRequest)(nil), // 6: signaling.GetInternalSessionsRequest
(*InternalSessionData)(nil), // 7: signaling.InternalSessionData
(*VirtualSessionData)(nil), // 8: signaling.VirtualSessionData
(*GetInternalSessionsReply)(nil), // 9: signaling.GetInternalSessionsReply
(*ClientSessionMessage)(nil), // 10: signaling.ClientSessionMessage
(*ServerSessionMessage)(nil), // 11: signaling.ServerSessionMessage
}
var file_grpc_sessions_proto_depIdxs = []int32{
0, // 0: signaling.RpcSessions.LookupResumeId:input_type -> signaling.LookupResumeIdRequest
2, // 1: signaling.RpcSessions.LookupSessionId:input_type -> signaling.LookupSessionIdRequest
4, // 2: signaling.RpcSessions.IsSessionInCall:input_type -> signaling.IsSessionInCallRequest
6, // 3: signaling.RpcSessions.ProxySession:input_type -> signaling.ClientSessionMessage
1, // 4: signaling.RpcSessions.LookupResumeId:output_type -> signaling.LookupResumeIdReply
3, // 5: signaling.RpcSessions.LookupSessionId:output_type -> signaling.LookupSessionIdReply
5, // 6: signaling.RpcSessions.IsSessionInCall:output_type -> signaling.IsSessionInCallReply
7, // 7: signaling.RpcSessions.ProxySession:output_type -> signaling.ServerSessionMessage
4, // [4:8] is the sub-list for method output_type
0, // [0:4] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
7, // 0: signaling.GetInternalSessionsReply.internalSessions:type_name -> signaling.InternalSessionData
8, // 1: signaling.GetInternalSessionsReply.virtualSessions:type_name -> signaling.VirtualSessionData
0, // 2: signaling.RpcSessions.LookupResumeId:input_type -> signaling.LookupResumeIdRequest
2, // 3: signaling.RpcSessions.LookupSessionId:input_type -> signaling.LookupSessionIdRequest
4, // 4: signaling.RpcSessions.IsSessionInCall:input_type -> signaling.IsSessionInCallRequest
6, // 5: signaling.RpcSessions.GetInternalSessions:input_type -> signaling.GetInternalSessionsRequest
10, // 6: signaling.RpcSessions.ProxySession:input_type -> signaling.ClientSessionMessage
1, // 7: signaling.RpcSessions.LookupResumeId:output_type -> signaling.LookupResumeIdReply
3, // 8: signaling.RpcSessions.LookupSessionId:output_type -> signaling.LookupSessionIdReply
5, // 9: signaling.RpcSessions.IsSessionInCall:output_type -> signaling.IsSessionInCallReply
9, // 10: signaling.RpcSessions.GetInternalSessions:output_type -> signaling.GetInternalSessionsReply
11, // 11: signaling.RpcSessions.ProxySession:output_type -> signaling.ServerSessionMessage
7, // [7:12] is the sub-list for method output_type
2, // [2:7] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
2, // [2:2] is the sub-list for extension extendee
0, // [0:2] is the sub-list for field type_name
}
func init() { file_grpc_sessions_proto_init() }
@ -541,7 +803,7 @@ func file_grpc_sessions_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_grpc_sessions_proto_rawDesc,
NumEnums: 0,
NumMessages: 8,
NumMessages: 12,
NumExtensions: 0,
NumServices: 1,
},

View file

@ -29,6 +29,7 @@ service RpcSessions {
rpc LookupResumeId(LookupResumeIdRequest) returns (LookupResumeIdReply) {}
rpc LookupSessionId(LookupSessionIdRequest) returns (LookupSessionIdReply) {}
rpc IsSessionInCall(IsSessionInCallRequest) returns (IsSessionInCallReply) {}
rpc GetInternalSessions(GetInternalSessionsRequest) returns (GetInternalSessionsReply) {}
rpc ProxySession(stream ClientSessionMessage) returns (stream ServerSessionMessage) {}
}
@ -60,6 +61,27 @@ message IsSessionInCallReply {
bool inCall = 1;
}
message GetInternalSessionsRequest {
string roomId = 1;
string backendUrl = 2;
}
message InternalSessionData {
string sessionId = 1;
uint32 inCall = 2;
repeated string features = 3;
}
message VirtualSessionData {
string sessionId = 1;
uint32 inCall = 2;
}
message GetInternalSessionsReply {
repeated InternalSessionData internalSessions = 1;
repeated VirtualSessionData virtualSessions = 2;
}
message ClientSessionMessage {
bytes message = 1;
}

View file

@ -37,10 +37,11 @@ import (
const _ = grpc.SupportPackageIsVersion9
const (
RpcSessions_LookupResumeId_FullMethodName = "/signaling.RpcSessions/LookupResumeId"
RpcSessions_LookupSessionId_FullMethodName = "/signaling.RpcSessions/LookupSessionId"
RpcSessions_IsSessionInCall_FullMethodName = "/signaling.RpcSessions/IsSessionInCall"
RpcSessions_ProxySession_FullMethodName = "/signaling.RpcSessions/ProxySession"
RpcSessions_LookupResumeId_FullMethodName = "/signaling.RpcSessions/LookupResumeId"
RpcSessions_LookupSessionId_FullMethodName = "/signaling.RpcSessions/LookupSessionId"
RpcSessions_IsSessionInCall_FullMethodName = "/signaling.RpcSessions/IsSessionInCall"
RpcSessions_GetInternalSessions_FullMethodName = "/signaling.RpcSessions/GetInternalSessions"
RpcSessions_ProxySession_FullMethodName = "/signaling.RpcSessions/ProxySession"
)
// RpcSessionsClient is the client API for RpcSessions service.
@ -50,6 +51,7 @@ type RpcSessionsClient interface {
LookupResumeId(ctx context.Context, in *LookupResumeIdRequest, opts ...grpc.CallOption) (*LookupResumeIdReply, error)
LookupSessionId(ctx context.Context, in *LookupSessionIdRequest, opts ...grpc.CallOption) (*LookupSessionIdReply, error)
IsSessionInCall(ctx context.Context, in *IsSessionInCallRequest, opts ...grpc.CallOption) (*IsSessionInCallReply, error)
GetInternalSessions(ctx context.Context, in *GetInternalSessionsRequest, opts ...grpc.CallOption) (*GetInternalSessionsReply, error)
ProxySession(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ClientSessionMessage, ServerSessionMessage], error)
}
@ -91,6 +93,16 @@ func (c *rpcSessionsClient) IsSessionInCall(ctx context.Context, in *IsSessionIn
return out, nil
}
func (c *rpcSessionsClient) GetInternalSessions(ctx context.Context, in *GetInternalSessionsRequest, opts ...grpc.CallOption) (*GetInternalSessionsReply, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(GetInternalSessionsReply)
err := c.cc.Invoke(ctx, RpcSessions_GetInternalSessions_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *rpcSessionsClient) ProxySession(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ClientSessionMessage, ServerSessionMessage], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &RpcSessions_ServiceDesc.Streams[0], RpcSessions_ProxySession_FullMethodName, cOpts...)
@ -111,6 +123,7 @@ type RpcSessionsServer interface {
LookupResumeId(context.Context, *LookupResumeIdRequest) (*LookupResumeIdReply, error)
LookupSessionId(context.Context, *LookupSessionIdRequest) (*LookupSessionIdReply, error)
IsSessionInCall(context.Context, *IsSessionInCallRequest) (*IsSessionInCallReply, error)
GetInternalSessions(context.Context, *GetInternalSessionsRequest) (*GetInternalSessionsReply, error)
ProxySession(grpc.BidiStreamingServer[ClientSessionMessage, ServerSessionMessage]) error
mustEmbedUnimplementedRpcSessionsServer()
}
@ -131,6 +144,9 @@ func (UnimplementedRpcSessionsServer) LookupSessionId(context.Context, *LookupSe
func (UnimplementedRpcSessionsServer) IsSessionInCall(context.Context, *IsSessionInCallRequest) (*IsSessionInCallReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method IsSessionInCall not implemented")
}
func (UnimplementedRpcSessionsServer) GetInternalSessions(context.Context, *GetInternalSessionsRequest) (*GetInternalSessionsReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetInternalSessions not implemented")
}
func (UnimplementedRpcSessionsServer) ProxySession(grpc.BidiStreamingServer[ClientSessionMessage, ServerSessionMessage]) error {
return status.Errorf(codes.Unimplemented, "method ProxySession not implemented")
}
@ -209,6 +225,24 @@ func _RpcSessions_IsSessionInCall_Handler(srv interface{}, ctx context.Context,
return interceptor(ctx, in, info, handler)
}
func _RpcSessions_GetInternalSessions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetInternalSessionsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RpcSessionsServer).GetInternalSessions(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: RpcSessions_GetInternalSessions_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RpcSessionsServer).GetInternalSessions(ctx, req.(*GetInternalSessionsRequest))
}
return interceptor(ctx, in, info, handler)
}
func _RpcSessions_ProxySession_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(RpcSessionsServer).ProxySession(&grpc.GenericServerStream[ClientSessionMessage, ServerSessionMessage]{ServerStream: stream})
}
@ -235,6 +269,10 @@ var RpcSessions_ServiceDesc = grpc.ServiceDesc{
MethodName: "IsSessionInCall",
Handler: _RpcSessions_IsSessionInCall_Handler,
},
{
MethodName: "GetInternalSessions",
Handler: _RpcSessions_GetInternalSessions_Handler,
},
},
Streams: []grpc.StreamDesc{
{

13
hub.go
View file

@ -648,6 +648,9 @@ func (h *Hub) GetDialoutSession(roomId string, backend *Backend) *ClientSession
}
func (h *Hub) GetBackend(u *url.URL) *Backend {
if u == nil {
return h.backend.GetCompatBackend()
}
return h.backend.GetBackend(u)
}
@ -1629,7 +1632,7 @@ func (h *Hub) processRoom(sess Session, message *ClientMessage) {
return
}
if room := h.getRoomForBackend(roomId, session.Backend()); room != nil && room.HasSession(session) {
if room := h.GetRoomForBackend(roomId, session.Backend()); room != nil && room.HasSession(session) {
// Session already is in that room, no action needed.
roomSessionId := message.Room.SessionId
if roomSessionId == "" {
@ -1770,7 +1773,7 @@ func (h *Hub) publishFederatedSessions() (int, *sync.WaitGroup) {
return count, &wg
}
func (h *Hub) getRoomForBackend(id string, backend *Backend) *Room {
func (h *Hub) GetRoomForBackend(id string, backend *Backend) *Room {
internalRoomId := getRoomIdForBackend(id, backend)
h.ru.RLock()
@ -2256,7 +2259,7 @@ func (h *Hub) processInternalMsg(sess Session, message *ClientMessage) {
switch msg.Type {
case "addsession":
msg := msg.AddSession
room := h.getRoomForBackend(msg.RoomId, session.Backend())
room := h.GetRoomForBackend(msg.RoomId, session.Backend())
if room == nil {
log.Printf("Ignore add session message %+v for invalid room %s from %s", *msg, msg.RoomId, session.PublicId())
return
@ -2333,7 +2336,7 @@ func (h *Hub) processInternalMsg(sess Session, message *ClientMessage) {
room.AddSession(sess, nil)
case "updatesession":
msg := msg.UpdateSession
room := h.getRoomForBackend(msg.RoomId, session.Backend())
room := h.GetRoomForBackend(msg.RoomId, session.Backend())
if room == nil {
log.Printf("Ignore remove session message %+v for invalid room %s from %s", *msg, msg.RoomId, session.PublicId())
return
@ -2371,7 +2374,7 @@ func (h *Hub) processInternalMsg(sess Session, message *ClientMessage) {
}
case "removesession":
msg := msg.RemoveSession
room := h.getRoomForBackend(msg.RoomId, session.Backend())
room := h.GetRoomForBackend(msg.RoomId, session.Backend())
if room == nil {
log.Printf("Ignore remove session message %+v for invalid room %s from %s", *msg, msg.RoomId, session.PublicId())
return

View file

@ -4488,6 +4488,7 @@ func TestVirtualClientSessions(t *testing.T) {
}
virtualSession := virtualSessions[0]
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
assert.NoError(client1.checkMessageJoinedSession(msg, virtualSession.PublicId(), virtualUserId))
}
@ -4625,6 +4626,290 @@ func TestVirtualClientSessions(t *testing.T) {
}
}
func TestDuplicateVirtualSessions(t *testing.T) {
CatchLogForTest(t)
for _, subtest := range clusteredTests {
t.Run(subtest, func(t *testing.T) {
t.Parallel()
require := require.New(t)
assert := assert.New(t)
var hub1 *Hub
var hub2 *Hub
var server1 *httptest.Server
var server2 *httptest.Server
if isLocalTest(t) {
hub1, _, _, server1 = CreateHubForTest(t)
hub2 = hub1
server2 = server1
} else {
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
require.NoError(client1.SendHello(testDefaultUserId))
hello1, err := client1.RunUntilHello(ctx)
require.NoError(err)
roomId := "test-room"
_, err = client1.JoinRoom(ctx, roomId)
require.NoError(err)
assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello))
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
require.NoError(client2.SendHelloInternal())
hello2, err := client2.RunUntilHello(ctx)
require.NoError(err)
session2 := hub2.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession)
require.NotNil(session2, "Session %s does not exist", hello2.Hello.SessionId)
_, err = client2.JoinRoom(ctx, roomId)
require.NoError(err)
assert.NoError(client1.RunUntilJoined(ctx, hello2.Hello))
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) {
if assert.Len(msg.Users, 1) {
assert.Equal(true, msg.Users[0]["internal"], "%+v", msg)
assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"], "%+v", msg)
assert.EqualValues(3, msg.Users[0]["inCall"], "%+v", msg)
}
}
}
_, unexpected, err := client2.RunUntilJoinedAndReturn(ctx, hello1.Hello, hello2.Hello)
assert.NoError(err)
if len(unexpected) == 0 {
if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
unexpected = append(unexpected, msg)
}
}
require.Len(unexpected, 1)
if msg, err := checkMessageParticipantsInCall(unexpected[0]); assert.NoError(err) {
if assert.Len(msg.Users, 1) {
assert.Equal(true, msg.Users[0]["internal"])
assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"])
assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[0]["inCall"])
}
}
calledCtx, calledCancel := context.WithTimeout(ctx, time.Second)
virtualSessionId := "virtual-session-id"
virtualUserId := "virtual-user-id"
generatedSessionId := GetVirtualSessionId(session2, virtualSessionId)
setSessionRequestHandler(t, func(request *BackendClientSessionRequest) {
defer calledCancel()
assert.Equal("add", request.Action, "%+v", request)
assert.Equal(roomId, request.RoomId, "%+v", request)
assert.NotEqual(generatedSessionId, request.SessionId, "%+v", request)
assert.Equal(virtualUserId, request.UserId, "%+v", request)
})
require.NoError(client2.SendInternalAddSession(&AddSessionInternalClientMessage{
CommonSessionInternalClientMessage: CommonSessionInternalClientMessage{
SessionId: virtualSessionId,
RoomId: roomId,
},
UserId: virtualUserId,
Flags: FLAG_MUTED_SPEAKING,
}))
<-calledCtx.Done()
if err := calledCtx.Err(); err != nil {
require.ErrorIs(err, context.Canceled)
}
virtualSessions := session2.GetVirtualSessions()
for len(virtualSessions) == 0 {
time.Sleep(time.Millisecond)
virtualSessions = session2.GetVirtualSessions()
}
virtualSession := virtualSessions[0]
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
assert.NoError(client1.checkMessageJoinedSession(msg, virtualSession.PublicId(), virtualUserId))
}
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) {
if assert.Len(msg.Users, 2) {
assert.Equal(true, msg.Users[0]["internal"], "%+v", msg)
assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"], "%+v", msg)
assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[0]["inCall"], "%+v", msg)
assert.Equal(true, msg.Users[1]["virtual"], "%+v", msg)
assert.Equal(virtualSession.PublicId(), msg.Users[1]["sessionId"], "%+v", msg)
assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[1]["inCall"], "%+v", msg)
}
}
}
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
if flags, err := checkMessageParticipantFlags(msg); assert.NoError(err) {
assert.Equal(roomId, flags.RoomId)
assert.Equal(virtualSession.PublicId(), flags.SessionId)
assert.EqualValues(FLAG_MUTED_SPEAKING, flags.Flags)
}
}
if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
assert.NoError(client2.checkMessageJoinedSession(msg, virtualSession.PublicId(), virtualUserId))
}
if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) {
if assert.Len(msg.Users, 2) {
assert.Equal(true, msg.Users[0]["internal"], "%+v", msg)
assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"], "%+v", msg)
assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[0]["inCall"], "%+v", msg)
assert.Equal(true, msg.Users[1]["virtual"], "%+v", msg)
assert.Equal(virtualSession.PublicId(), msg.Users[1]["sessionId"], "%+v", msg)
assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[1]["inCall"], "%+v", msg)
}
}
}
if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
if flags, err := checkMessageParticipantFlags(msg); assert.NoError(err) {
assert.Equal(roomId, flags.RoomId)
assert.Equal(virtualSession.PublicId(), flags.SessionId)
assert.EqualValues(FLAG_MUTED_SPEAKING, flags.Flags)
}
}
msg := &BackendServerRoomRequest{
Type: "incall",
InCall: &BackendRoomInCallRequest{
InCall: []byte("0"),
Users: []map[string]interface{}{
{
"sessionId": virtualSession.PublicId(),
"participantPermissions": 246,
"participantType": 4,
"lastPing": 123456789,
},
{
// Request is coming from Nextcloud, so use its session id (which is our "room session id").
"sessionId": roomId + "-" + hello1.Hello.SessionId,
"participantPermissions": 254,
"participantType": 1,
"lastPing": 234567890,
},
},
},
}
data, err := json.Marshal(msg)
require.NoError(err)
res, err := performBackendRequest(server2.URL+"/api/v1/room/"+roomId, data)
require.NoError(err)
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
assert.NoError(err)
assert.Equal(http.StatusOK, res.StatusCode, "Expected successful request, got %s", string(body))
if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) {
if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) {
if assert.Len(msg.Users, 3) {
assert.Equal(true, msg.Users[0]["virtual"], "%+v", msg)
assert.Equal(virtualSession.PublicId(), msg.Users[0]["sessionId"], "%+v", msg)
assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[0]["inCall"], "%+v", msg)
assert.EqualValues(246, msg.Users[0]["participantPermissions"], "%+v", msg)
assert.EqualValues(4, msg.Users[0]["participantType"], "%+v", msg)
assert.Equal(hello1.Hello.SessionId, msg.Users[1]["sessionId"], "%+v", msg)
assert.Nil(msg.Users[1]["inCall"], "%+v", msg)
assert.EqualValues(254, msg.Users[1]["participantPermissions"], "%+v", msg)
assert.EqualValues(1, msg.Users[1]["participantType"], "%+v", msg)
assert.Equal(true, msg.Users[2]["internal"], "%+v", msg)
assert.Equal(hello2.Hello.SessionId, msg.Users[2]["sessionId"], "%+v", msg)
assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[2]["inCall"], "%+v", msg)
}
}
}
if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) {
if assert.Len(msg.Users, 3) {
assert.Equal(true, msg.Users[0]["virtual"], "%+v", msg)
assert.Equal(virtualSession.PublicId(), msg.Users[0]["sessionId"], "%+v", msg)
assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[0]["inCall"], "%+v", msg)
assert.EqualValues(246, msg.Users[0]["participantPermissions"], "%+v", msg)
assert.EqualValues(4, msg.Users[0]["participantType"], "%+v", msg)
assert.Equal(hello1.Hello.SessionId, msg.Users[1]["sessionId"], "%+v", msg)
assert.Nil(msg.Users[1]["inCall"], "%+v", msg)
assert.EqualValues(254, msg.Users[1]["participantPermissions"], "%+v", msg)
assert.EqualValues(1, msg.Users[1]["participantType"], "%+v", msg)
assert.Equal(true, msg.Users[2]["internal"], "%+v", msg)
assert.Equal(hello2.Hello.SessionId, msg.Users[2]["sessionId"], "%+v", msg)
assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[2]["inCall"], "%+v", msg)
}
}
}
client1.Close()
assert.NoError(client1.WaitForClientRemoved(ctx))
client3 := NewTestClient(t, server1, hub1)
defer client3.CloseWithBye()
require.NoError(client3.SendHelloResume(hello1.Hello.ResumeId))
if hello3, err := client3.RunUntilHello(ctx); assert.NoError(err) {
assert.Equal(testDefaultUserId, hello3.Hello.UserId, "%+v", hello3.Hello)
assert.Equal(hello1.Hello.SessionId, hello3.Hello.SessionId, "%+v", hello3.Hello)
assert.Equal(hello1.Hello.ResumeId, hello3.Hello.ResumeId, "%+v", hello3.Hello)
}
if msg, err := client3.RunUntilMessage(ctx); assert.NoError(err) {
if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) {
if assert.Len(msg.Users, 3) {
assert.Equal(true, msg.Users[0]["virtual"], "%+v", msg)
assert.Equal(virtualSession.PublicId(), msg.Users[0]["sessionId"], "%+v", msg)
assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[0]["inCall"], "%+v", msg)
assert.EqualValues(246, msg.Users[0]["participantPermissions"], "%+v", msg)
assert.EqualValues(4, msg.Users[0]["participantType"], "%+v", msg)
assert.Equal(hello1.Hello.SessionId, msg.Users[1]["sessionId"], "%+v", msg)
assert.Nil(msg.Users[1]["inCall"], "%+v", msg)
assert.EqualValues(254, msg.Users[1]["participantPermissions"], "%+v", msg)
assert.EqualValues(1, msg.Users[1]["participantType"], "%+v", msg)
assert.Equal(true, msg.Users[2]["internal"], "%+v", msg)
assert.Equal(hello2.Hello.SessionId, msg.Users[2]["sessionId"], "%+v", msg)
assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[2]["inCall"], "%+v", msg)
}
}
}
setSessionRequestHandler(t, func(request *BackendClientSessionRequest) {
defer calledCancel()
assert.Equal("remove", request.Action, "%+v", request)
assert.Equal(roomId, request.RoomId, "%+v", request)
assert.NotEqual(generatedSessionId, request.SessionId, "%+v", request)
assert.Equal(virtualUserId, request.UserId, "%+v", request)
})
})
}
}
func DoTestSwitchToOne(t *testing.T, details map[string]interface{}) {
CatchLogForTest(t)
for _, subtest := range clusteredTests {

View file

@ -1346,6 +1346,10 @@ func (h *mockGrpcServerHub) GetBackend(u *url.URL) *Backend {
return nil
}
func (h *mockGrpcServerHub) GetRoomForBackend(roomId string, backend *Backend) *Room {
return nil
}
func Test_ProxyRemotePublisher(t *testing.T) {
CatchLogForTest(t)
t.Parallel()

150
room.go
View file

@ -71,7 +71,7 @@ type Room struct {
mu *sync.RWMutex
sessions map[string]Session
internalSessions map[Session]bool
internalSessions map[*ClientSession]bool
virtualSessions map[*VirtualSession]bool
inCallSessions map[Session]bool
roomSessionData map[string]*RoomSessionData
@ -108,7 +108,7 @@ func NewRoom(roomId string, properties json.RawMessage, hub *Hub, events AsyncEv
mu: &sync.RWMutex{},
sessions: make(map[string]Session),
internalSessions: make(map[Session]bool),
internalSessions: make(map[*ClientSession]bool),
virtualSessions: make(map[*VirtualSession]bool),
inCallSessions: make(map[Session]bool),
roomSessionData: make(map[string]*RoomSessionData),
@ -290,13 +290,19 @@ func (r *Room) AddSession(session Session, sessionData json.RawMessage) {
var publishUsersChanged bool
switch session.ClientType() {
case HelloClientTypeInternal:
r.internalSessions[session] = true
clientSession, ok := session.(*ClientSession)
if !ok {
delete(r.sessions, sid)
r.mu.Unlock()
panic(fmt.Sprintf("Expected a client session, got %v (%T)", session, session))
}
r.internalSessions[clientSession] = true
case HelloClientTypeVirtual:
virtualSession, ok := session.(*VirtualSession)
if !ok {
delete(r.sessions, sid)
r.mu.Unlock()
panic(fmt.Sprintf("Expected a virtual session, got %v", session))
panic(fmt.Sprintf("Expected a virtual session, got %v (%T)", session, session))
}
r.virtualSessions[virtualSession] = true
publishUsersChanged = true
@ -447,11 +453,21 @@ func (r *Room) RemoveSession(session Session) bool {
sid := session.PublicId()
r.statsRoomSessionsCurrent.With(prometheus.Labels{"clienttype": session.ClientType()}).Dec()
delete(r.sessions, sid)
delete(r.internalSessions, session)
if virtualSession, ok := session.(*VirtualSession); ok {
delete(r.virtualSessions, virtualSession)
// Handle case where virtual session was also sent by Nextcloud.
users := make([]map[string]interface{}, 0, len(r.users))
for _, u := range r.users {
if u["sessionId"] != sid {
users = append(users, u)
}
}
if len(users) != len(r.users) {
r.users = users
}
}
if clientSession, ok := session.(*ClientSession); ok {
delete(r.internalSessions, clientSession)
r.transientData.RemoveListener(clientSession)
}
delete(r.inCallSessions, session)
@ -568,10 +584,66 @@ func (r *Room) PublishSessionLeft(session Session) {
}
}
func (r *Room) getClusteredInternalSessionsRLocked() (internal map[string]*InternalSessionData, virtual map[string]*VirtualSessionData) {
if r.hub.rpcClients == nil {
return nil, nil
}
r.mu.RUnlock()
defer r.mu.RLock()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
var mu sync.Mutex
var wg sync.WaitGroup
for _, client := range r.hub.rpcClients.GetClients() {
wg.Add(1)
go func(c *GrpcClient) {
defer wg.Done()
clientInternal, clientVirtual, err := c.GetInternalSessions(ctx, r.Id(), r.Backend())
if err != nil {
log.Printf("Received error while getting internal sessions for %s@%s from %s: %s", r.Id(), r.Backend().Id(), c.Target(), err)
return
}
mu.Lock()
defer mu.Unlock()
if internal == nil {
internal = make(map[string]*InternalSessionData, len(clientInternal))
}
for sid, s := range clientInternal {
internal[sid] = s
}
if virtual == nil {
virtual = make(map[string]*VirtualSessionData, len(clientVirtual))
}
for sid, s := range clientVirtual {
virtual[sid] = s
}
}(client)
}
wg.Wait()
return
}
func (r *Room) addInternalSessions(users []map[string]interface{}) []map[string]interface{} {
now := time.Now().Unix()
r.mu.Lock()
defer r.mu.Unlock()
r.mu.RLock()
defer r.mu.RUnlock()
if len(users) == 0 && len(r.internalSessions) == 0 && len(r.virtualSessions) == 0 {
return users
}
clusteredInternalSessions, clusteredVirtualSessions := r.getClusteredInternalSessionsRLocked()
// Local sessions might have changed while waiting for clustered information.
if len(users) == 0 && len(r.internalSessions) == 0 && len(r.virtualSessions) == 0 {
return users
}
skipSession := make(map[string]bool)
for _, user := range users {
sessionid, found := user["sessionId"]
if !found || sessionid == "" {
@ -581,21 +653,69 @@ func (r *Room) addInternalSessions(users []map[string]interface{}) []map[string]
if userid, found := user["userId"]; !found || userid == "" {
if roomSessionData, found := r.roomSessionData[sessionid.(string)]; found {
user["userId"] = roomSessionData.UserId
} else if sid, ok := sessionid.(string); ok {
if entry, found := clusteredVirtualSessions[sid]; found {
user["virtual"] = true
user["inCall"] = entry.GetInCall()
skipSession[sid] = true
} else {
for session := range r.virtualSessions {
if session.PublicId() == sid {
user["virtual"] = true
user["inCall"] = session.GetInCall()
skipSession[sid] = true
break
}
}
}
}
}
}
for session := range r.internalSessions {
users = append(users, map[string]interface{}{
"inCall": session.(*ClientSession).GetInCall(),
u := map[string]interface{}{
"inCall": session.GetInCall(),
"sessionId": session.PublicId(),
"lastPing": now,
"internal": true,
})
}
if f := session.GetFeatures(); len(f) > 0 {
u["features"] = f
}
users = append(users, u)
}
for _, session := range clusteredInternalSessions {
u := map[string]interface{}{
"inCall": session.GetInCall(),
"sessionId": session.GetSessionId(),
"lastPing": now,
"internal": true,
}
if f := session.GetFeatures(); len(f) > 0 {
u["features"] = f
}
users = append(users, u)
}
for session := range r.virtualSessions {
sid := session.PublicId()
if skipSession[sid] {
continue
}
skipSession[sid] = true
users = append(users, map[string]interface{}{
"inCall": session.GetInCall(),
"sessionId": session.PublicId(),
"sessionId": sid,
"lastPing": now,
"virtual": true,
})
}
for sid, session := range clusteredVirtualSessions {
if skipSession[sid] {
continue
}
users = append(users, map[string]interface{}{
"inCall": session.GetInCall(),
"sessionId": sid,
"lastPing": now,
"virtual": true,
})
@ -974,7 +1094,7 @@ func (r *Room) publishActiveSessions() (int, *sync.WaitGroup) {
}
func (r *Room) publishRoomMessage(message *BackendRoomMessageRequest) {
if message == nil || message.Data == nil {
if message == nil || len(message.Data) == 0 {
return
}
@ -1062,10 +1182,10 @@ func (r *Room) notifyInternalRoomDeleted() {
},
}
r.mu.Lock()
defer r.mu.Unlock()
r.mu.RLock()
defer r.mu.RUnlock()
for s := range r.internalSessions {
s.(*ClientSession).SendMessage(msg)
s.SendMessage(msg)
}
}