Prepare internal APIs for multiple backend urls.

This commit is contained in:
Joachim Bauch 2024-07-03 10:18:13 +02:00
commit 5da0a5d4b0
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
16 changed files with 187 additions and 133 deletions

View file

@ -378,11 +378,17 @@ type ClientTypeInternalAuthParams struct {
func (p *ClientTypeInternalAuthParams) CheckValid() error {
if p.Backend == "" {
return fmt.Errorf("backend missing")
} else if u, err := url.Parse(p.Backend); err != nil {
}
if p.Backend[len(p.Backend)-1] != '/' {
p.Backend += "/"
}
if u, err := url.Parse(p.Backend); err != nil {
return err
} else {
if strings.Contains(u.Host, ":") && hasStandardPort(u) {
u.Host = u.Hostname()
p.Backend = u.String()
}
p.parsedBackend = u
@ -483,11 +489,17 @@ func (m *HelloClientMessage) CheckValid() error {
case HelloClientTypeFederation:
if m.Auth.Url == "" {
return fmt.Errorf("url missing")
} else if u, err := url.ParseRequestURI(m.Auth.Url); err != nil {
}
if m.Auth.Url[len(m.Auth.Url)-1] != '/' {
m.Auth.Url += "/"
}
if u, err := url.ParseRequestURI(m.Auth.Url); err != nil {
return err
} else {
if strings.Contains(u.Host, ":") && hasStandardPort(u) {
u.Host = u.Hostname()
m.Auth.Url = u.String()
}
m.Auth.parsedUrl = u

View file

@ -42,11 +42,9 @@ var (
)
type Backend struct {
id string
url string
parsedUrl *url.URL
secret []byte
compat bool
id string
urls []string
secret []byte
allowHttp bool
@ -67,7 +65,7 @@ func (b *Backend) Secret() []byte {
}
func (b *Backend) IsCompat() bool {
return b.compat
return len(b.urls) == 0
}
func (b *Backend) IsUrlAllowed(u *url.URL) bool {
@ -81,12 +79,23 @@ func (b *Backend) IsUrlAllowed(u *url.URL) bool {
}
}
func (b *Backend) Url() string {
return b.url
func (b *Backend) HasUrl(url string) bool {
if b.IsCompat() {
// Old-style configuration, only hosts are configured.
return true
}
for _, u := range b.urls {
if strings.HasPrefix(url, u) {
return true
}
}
return false
}
func (b *Backend) ParsedUrl() *url.URL {
return b.parsedUrl
func (b *Backend) Urls() []string {
return b.urls
}
func (b *Backend) Limit() int {
@ -173,10 +182,7 @@ func (s *backendStorageCommon) getBackendLocked(u *url.URL) *Backend {
continue
}
if entry.url == "" {
// Old-style configuration, only hosts are configured.
return entry
} else if strings.HasPrefix(url, entry.url) {
if entry.HasUrl(url) {
return entry
}
}

View file

@ -491,7 +491,7 @@ func TestBackendConfiguration_Etcd(t *testing.T) {
require.NoError(storage.WaitForInitialized(ctx))
if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 1) &&
assert.Equal(url1, backends[0].url) &&
assert.Equal([]string{url1}, backends[0].urls) &&
assert.Equal(initialSecret1, string(backends[0].secret)) {
if backend := cfg.GetBackend(mustParse(url1)); assert.NotNil(backend) {
assert.Equal(backends[0], backend)
@ -502,7 +502,7 @@ func TestBackendConfiguration_Etcd(t *testing.T) {
SetEtcdValue(etcd, "/backends/1_one", []byte("{\"url\":\""+url1+"\",\"secret\":\""+secret1+"\"}"))
<-ch
if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 1) &&
assert.Equal(url1, backends[0].url) &&
assert.Equal([]string{url1}, backends[0].urls) &&
assert.Equal(secret1, string(backends[0].secret)) {
if backend := cfg.GetBackend(mustParse(url1)); assert.NotNil(backend) {
assert.Equal(backends[0], backend)
@ -516,9 +516,9 @@ func TestBackendConfiguration_Etcd(t *testing.T) {
SetEtcdValue(etcd, "/backends/2_two", []byte("{\"url\":\""+url2+"\",\"secret\":\""+secret2+"\"}"))
<-ch
if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 2) &&
assert.Equal(url1, backends[0].url) &&
assert.Equal([]string{url1}, backends[0].urls) &&
assert.Equal(secret1, string(backends[0].secret)) &&
assert.Equal(url2, backends[1].url) &&
assert.Equal([]string{url2}, backends[1].urls) &&
assert.Equal(secret2, string(backends[1].secret)) {
if backend := cfg.GetBackend(mustParse(url1)); assert.NotNil(backend) {
assert.Equal(backends[0], backend)
@ -534,11 +534,11 @@ func TestBackendConfiguration_Etcd(t *testing.T) {
SetEtcdValue(etcd, "/backends/3_three", []byte("{\"url\":\""+url3+"\",\"secret\":\""+secret3+"\"}"))
<-ch
if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 3) &&
assert.Equal(url1, backends[0].url) &&
assert.Equal([]string{url1}, backends[0].urls) &&
assert.Equal(secret1, string(backends[0].secret)) &&
assert.Equal(url2, backends[1].url) &&
assert.Equal([]string{url2}, backends[1].urls) &&
assert.Equal(secret2, string(backends[1].secret)) &&
assert.Equal(url3, backends[2].url) &&
assert.Equal([]string{url3}, backends[2].urls) &&
assert.Equal(secret3, string(backends[2].secret)) {
if backend := cfg.GetBackend(mustParse(url1)); assert.NotNil(backend) {
assert.Equal(backends[0], backend)
@ -553,9 +553,9 @@ func TestBackendConfiguration_Etcd(t *testing.T) {
DeleteEtcdValue(etcd, "/backends/1_one")
<-ch
if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 2) {
assert.Equal(url2, backends[0].url)
assert.Equal([]string{url2}, backends[0].urls)
assert.Equal(secret2, string(backends[0].secret))
assert.Equal(url3, backends[1].url)
assert.Equal([]string{url3}, backends[1].urls)
assert.Equal(secret3, string(backends[1].secret))
}
@ -563,7 +563,7 @@ func TestBackendConfiguration_Etcd(t *testing.T) {
DeleteEtcdValue(etcd, "/backends/2_two")
<-ch
if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 1) {
assert.Equal(url3, backends[0].url)
assert.Equal([]string{url3}, backends[0].urls)
assert.Equal(secret3, string(backends[0].secret))
}

View file

@ -701,12 +701,22 @@ func isNumeric(s string) bool {
}
func (b *BackendServer) startDialoutInSession(ctx context.Context, session *ClientSession, roomid string, backend *Backend, backendUrl string, request *BackendServerRoomRequest) (any, error) {
url := backend.Url()
if url == "" {
// Old-style compat backend, use client-provided URL.
url = backendUrl
if url != "" && url[len(url)-1] != '/' {
url += "/"
url := backendUrl
if url != "" && url[len(url)-1] != '/' {
url += "/"
}
if urls := backend.Urls(); len(urls) > 0 {
// Check if client-provided URL is registered for backend and use that.
found := false
for _, u := range urls {
if strings.HasPrefix(url, u) {
found = true
break
}
}
if !found {
url = urls[0]
}
}
id := newRandomString(32)

View file

@ -179,10 +179,9 @@ func (s *backendStorageEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data
}
backend := &Backend{
id: key,
url: info.Url,
parsedUrl: info.parsedUrl,
secret: []byte(info.Secret),
id: key,
urls: []string{info.Url},
secret: []byte(info.Secret),
allowHttp: info.parsedUrl.Scheme == "http",

View file

@ -55,7 +55,6 @@ func NewBackendStorageStatic(config *goconf.ConfigFile) (BackendStorage, error)
compatBackend = &Backend{
id: "compat",
secret: []byte(commonSecret),
compat: true,
allowHttp: allowHttp,
@ -70,7 +69,7 @@ func NewBackendStorageStatic(config *goconf.ConfigFile) (BackendStorage, error)
for host, configuredBackends := range getConfiguredHosts(backendIds, config, commonSecret) {
backends[host] = append(backends[host], configuredBackends...)
for _, be := range configuredBackends {
log.Printf("Backend %s added for %s", be.id, be.url)
log.Printf("Backend %s added for %s", be.id, strings.Join(be.urls, ", "))
updateBackendStats(be)
}
numBackends += len(configuredBackends)
@ -95,7 +94,6 @@ func NewBackendStorageStatic(config *goconf.ConfigFile) (BackendStorage, error)
compatBackend = &Backend{
id: "compat",
secret: []byte(commonSecret),
compat: true,
allowHttp: allowHttp,
@ -140,7 +138,7 @@ func (s *backendStorageStatic) Close() {
func (s *backendStorageStatic) RemoveBackendsForHost(host string) {
if oldBackends := s.backends[host]; len(oldBackends) > 0 {
for _, backend := range oldBackends {
log.Printf("Backend %s removed for %s", backend.id, backend.url)
log.Printf("Backend %s removed for %s", backend.id, strings.Join(backend.urls, ", "))
deleteBackendStats(backend)
}
statsBackendsCurrent.Sub(float64(len(oldBackends)))
@ -161,7 +159,7 @@ func (s *backendStorageStatic) UpsertHost(host string, backends []*Backend) {
found = true
s.backends[host][existingIndex] = newBackend
backends = append(backends[:index], backends[index+1:]...)
log.Printf("Backend %s updated for %s", newBackend.id, newBackend.url)
log.Printf("Backend %s updated for %s", newBackend.id, strings.Join(newBackend.urls, ", "))
updateBackendStats(newBackend)
break
}
@ -169,7 +167,7 @@ func (s *backendStorageStatic) UpsertHost(host string, backends []*Backend) {
}
if !found {
removed := s.backends[host][existingIndex]
log.Printf("Backend %s removed for %s", removed.id, removed.url)
log.Printf("Backend %s removed for %s", removed.id, strings.Join(removed.urls, ", "))
s.backends[host] = append(s.backends[host][:existingIndex], s.backends[host][existingIndex+1:]...)
deleteBackendStats(removed)
statsBackendsCurrent.Dec()
@ -178,7 +176,7 @@ func (s *backendStorageStatic) UpsertHost(host string, backends []*Backend) {
s.backends[host] = append(s.backends[host], backends...)
for _, added := range backends {
log.Printf("Backend %s added for %s", added.id, added.url)
log.Printf("Backend %s added for %s", added.id, strings.Join(added.urls, ", "))
updateBackendStats(added)
}
statsBackendsCurrent.Add(float64(len(backends)))
@ -254,10 +252,9 @@ func getConfiguredHosts(backendIds string, config *goconf.ConfigFile, commonSecr
}
hosts[parsed.Host] = append(hosts[parsed.Host], &Backend{
id: id,
url: u,
parsedUrl: parsed,
secret: []byte(secret),
id: id,
urls: []string{u},
secret: []byte(secret),
allowHttp: parsed.Scheme == "http",

View file

@ -39,6 +39,8 @@ var (
// Warn if a session has 32 or more pending messages.
warnPendingMessagesCount = 32
// The "/api/v1/signaling/" URL will be changed to use "v3" as the "signaling-v3"
// feature is returned by the capabilities endpoint.
PathToOcsSignalingBackend = "ocs/v2.php/apps/spreed/api/v1/signaling/backend"
)
@ -130,24 +132,6 @@ func NewClientSession(hub *Hub, privateId string, publicId string, data *Session
s.backendUrl = hello.Auth.Url
s.parsedBackendUrl = hello.Auth.parsedUrl
}
if !strings.Contains(s.backendUrl, "/ocs/v2.php/") {
backendUrl := s.backendUrl
if !strings.HasSuffix(backendUrl, "/") {
backendUrl += "/"
}
backendUrl += PathToOcsSignalingBackend
u, err := url.Parse(backendUrl)
if err != nil {
return nil, err
}
if strings.Contains(u.Host, ":") && hasStandardPort(u) {
u.Host = u.Hostname()
}
s.backendUrl = backendUrl
s.parsedBackendUrl = u
}
if err := s.SubscribeEvents(); err != nil {
return nil, err
@ -307,6 +291,10 @@ func (s *ClientSession) ParsedBackendUrl() *url.URL {
return s.parsedBackendUrl
}
func (s *ClientSession) ParsedBackendOcsUrl() *url.URL {
return s.parsedBackendUrl.JoinPath(PathToOcsSignalingBackend)
}
func (s *ClientSession) AuthUserId() string {
return s.userId
}
@ -563,7 +551,7 @@ func (s *ClientSession) doUnsubscribeRoomEvents(notify bool) {
request.Room.UpdateFromSession(s)
request.Room.Action = "leave"
var response map[string]interface{}
if err := s.hub.backend.PerformJSONRequest(ctx, s.ParsedBackendUrl(), request, &response); err != nil {
if err := s.hub.backend.PerformJSONRequest(ctx, s.ParsedBackendOcsUrl(), request, &response); err != nil {
log.Printf("Could not notify about room session %s left room %s: %s", sid, room.Id(), err)
} else {
log.Printf("Removed room session %s: %+v", sid, response)

View file

@ -65,6 +65,8 @@ func getCloudUrl(s string) string {
}
if pos := strings.Index(s, "/ocs/v"); pos != -1 {
s = s[:pos]
} else {
s = strings.TrimSuffix(s, "/")
}
return s
}
@ -606,6 +608,7 @@ func (c *FederationClient) updateEventUsers(users []map[string]interface{}, loca
localCloudUrlLen := len(localCloudUrl)
remoteCloudUrl := "@" + getCloudUrl(c.federation.Load().NextcloudUrl)
checkSessionId := true
log.Printf("XXX local=%s remote=%s", localCloudUrl, remoteCloudUrl)
for _, u := range users {
if actorType, found := getStringMapEntry[string](u, "actorType"); found {
if actorId, found := getStringMapEntry[string](u, "actorId"); found {

View file

@ -29,7 +29,6 @@ import (
"io"
"log"
"net"
"net/url"
"strings"
"sync"
"sync/atomic"
@ -240,14 +239,14 @@ func (c *GrpcClient) LookupSessionId(ctx context.Context, roomSessionId string,
return sessionId, nil
}
func (c *GrpcClient) IsSessionInCall(ctx context.Context, sessionId string, room *Room) (bool, error) {
func (c *GrpcClient) IsSessionInCall(ctx context.Context, sessionId string, room *Room, backendUrl string) (bool, error) {
statsGrpcClientCalls.WithLabelValues("IsSessionInCall").Inc()
// TODO: Remove debug logging
log.Printf("Check if session %s is in call %s on %s", sessionId, room.Id(), c.Target())
response, err := c.impl.IsSessionInCall(ctx, &IsSessionInCallRequest{
SessionId: sessionId,
RoomId: room.Id(),
BackendUrl: room.Backend().url,
BackendUrl: backendUrl,
}, grpc.WaitForReady(true))
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
return false, nil
@ -258,13 +257,18 @@ func (c *GrpcClient) IsSessionInCall(ctx context.Context, sessionId string, room
return response.GetInCall(), nil
}
func (c *GrpcClient) GetInternalSessions(ctx context.Context, roomId string, backend *Backend) (internal map[string]*InternalSessionData, virtual map[string]*VirtualSessionData, err error) {
func (c *GrpcClient) GetInternalSessions(ctx context.Context, roomId string, backendUrls []string) (internal map[string]*InternalSessionData, virtual map[string]*VirtualSessionData, err error) {
statsGrpcClientCalls.WithLabelValues("GetInternalSessions").Inc()
// TODO: Remove debug logging
log.Printf("Get internal sessions for %s@%s on %s", roomId, backend.Id(), c.Target())
log.Printf("Get internal sessions for %s on %s", roomId, c.Target())
var backendUrl string
if len(backendUrls) > 0 {
backendUrl = backendUrls[0]
}
response, err := c.impl.GetInternalSessions(ctx, &GetInternalSessionsRequest{
RoomId: roomId,
BackendUrl: backend.Url(),
RoomId: roomId,
BackendUrl: backendUrl,
BackendUrls: backendUrls,
}, grpc.WaitForReady(true))
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
return nil, nil, nil
@ -305,12 +309,12 @@ func (c *GrpcClient) GetPublisherId(ctx context.Context, sessionId string, strea
return response.GetPublisherId(), response.GetProxyUrl(), net.ParseIP(response.GetIp()), nil
}
func (c *GrpcClient) GetSessionCount(ctx context.Context, u *url.URL) (uint32, error) {
func (c *GrpcClient) GetSessionCount(ctx context.Context, url string) (uint32, error) {
statsGrpcClientCalls.WithLabelValues("GetSessionCount").Inc()
// TODO: Remove debug logging
log.Printf("Get session count for %s on %s", u, c.Target())
log.Printf("Get session count for %s on %s", url, c.Target())
response, err := c.impl.GetSessionCount(ctx, &GetSessionCountRequest{
Url: u.String(),
Url: url,
}, grpc.WaitForReady(true))
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
return 0, nil

View file

@ -178,7 +178,7 @@ func (s *GrpcServer) IsSessionInCall(ctx context.Context, request *IsSessionInCa
result := &IsSessionInCallReply{}
room := session.GetRoom()
if room == nil || room.Id() != request.GetRoomId() || room.Backend().url != request.GetBackendUrl() ||
if room == nil || room.Id() != request.GetRoomId() || !room.Backend().HasUrl(request.GetBackendUrl()) ||
(session.ClientType() != HelloClientTypeInternal && !room.IsSessionInCall(session)) {
// Recipient is not in a room, a different room or not in the call.
result.InCall = false
@ -191,44 +191,63 @@ func (s *GrpcServer) IsSessionInCall(ctx context.Context, request *IsSessionInCa
func (s *GrpcServer) GetInternalSessions(ctx context.Context, request *GetInternalSessionsRequest) (*GetInternalSessionsReply, error) {
statsGrpcServerCalls.WithLabelValues("GetInternalSessions").Inc()
// TODO: Remove debug logging
log.Printf("Get internal sessions from %s on %s", request.RoomId, request.BackendUrl)
log.Printf("Get internal sessions from %s on %v (fallback %s)", request.RoomId, request.BackendUrls, request.BackendUrl)
var u *url.URL
if request.BackendUrl != "" {
var err error
u, err = url.Parse(request.BackendUrl)
if err != nil {
return nil, status.Error(codes.InvalidArgument, "invalid url")
}
}
backend := s.hub.GetBackend(u)
if backend == nil {
return nil, status.Error(codes.NotFound, "no such backend")
}
room := s.hub.GetRoomForBackend(request.RoomId, backend)
if room == nil {
return nil, status.Error(codes.NotFound, "no such room")
var backendUrls []string
if len(request.BackendUrls) > 0 {
backendUrls = request.BackendUrls
} else if request.BackendUrl != "" {
backendUrls = append(backendUrls, request.BackendUrl)
} else {
// Only compat backend.
backendUrls = []string{""}
}
result := &GetInternalSessionsReply{}
room.mu.RLock()
defer room.mu.RUnlock()
processed := make(map[string]bool)
for _, bu := range backendUrls {
var parsed *url.URL
if bu != "" {
var err error
parsed, err = url.Parse(bu)
if err != nil {
return nil, status.Error(codes.InvalidArgument, "invalid url")
}
}
for session := range room.internalSessions {
result.InternalSessions = append(result.InternalSessions, &InternalSessionData{
SessionId: session.PublicId(),
InCall: uint32(session.GetInCall()),
Features: session.GetFeatures(),
})
}
backend := s.hub.GetBackend(parsed)
if backend == nil {
return nil, status.Error(codes.NotFound, "no such backend")
}
for session := range room.virtualSessions {
result.VirtualSessions = append(result.VirtualSessions, &VirtualSessionData{
SessionId: session.PublicId(),
InCall: uint32(session.GetInCall()),
})
// Only process each backend once.
if processed[backend.Id()] {
continue
}
processed[backend.Id()] = true
room := s.hub.GetRoomForBackend(request.RoomId, backend)
if room == nil {
return nil, status.Error(codes.NotFound, "no such room")
}
room.mu.RLock()
defer room.mu.RUnlock()
for session := range room.internalSessions {
result.InternalSessions = append(result.InternalSessions, &InternalSessionData{
SessionId: session.PublicId(),
InCall: uint32(session.GetInCall()),
Features: session.GetFeatures(),
})
}
for session := range room.virtualSessions {
result.VirtualSessions = append(result.VirtualSessions, &VirtualSessionData{
SessionId: session.PublicId(),
InCall: uint32(session.GetInCall()),
})
}
}
return result, nil

View file

@ -63,7 +63,8 @@ message IsSessionInCallReply {
message GetInternalSessionsRequest {
string roomId = 1;
string backendUrl = 2;
string backendUrl = 2 [deprecated = true];
repeated string backendUrls = 3;
}
message InternalSessionData {

20
hub.go
View file

@ -700,12 +700,10 @@ func (h *Hub) GetSessionIdByRoomSessionId(roomSessionId string) (string, error)
}
func (h *Hub) GetDialoutSessions(roomId string, backend *Backend) (result []*ClientSession) {
url := backend.Url()
h.mu.RLock()
defer h.mu.RUnlock()
for session := range h.dialoutSessions {
if session.backend.Url() != url {
if !backend.HasUrl(session.BackendUrl()) {
continue
}
@ -957,14 +955,14 @@ func (h *Hub) processRegister(c HandlerClient, message *ClientMessage, backend *
go func(c *GrpcClient) {
defer wg.Done()
count, err := c.GetSessionCount(ctx, backend.ParsedUrl())
count, err := c.GetSessionCount(ctx, session.BackendUrl())
if err != nil {
log.Printf("Received error while getting session count for %s from %s: %s", backend.Url(), c.Target(), err)
log.Printf("Received error while getting session count for %s from %s: %s", session.BackendUrl(), c.Target(), err)
return
}
if count > 0 {
log.Printf("%d sessions connected for %s on %s", count, backend.Url(), c.Target())
log.Printf("%d sessions connected for %s on %s", count, session.BackendUrl(), c.Target())
totalCount.Add(count)
}
}(client)
@ -1308,6 +1306,8 @@ func (h *Hub) processHelloV1(ctx context.Context, client HandlerClient, message
return nil, nil, InvalidBackendUrl
}
url = url.JoinPath(PathToOcsSignalingBackend)
// Run in timeout context to prevent blocking too long.
ctx, cancel := context.WithTimeout(ctx, h.backendTimeout)
defer cancel()
@ -1767,7 +1767,7 @@ func (h *Hub) processRoom(sess Session, message *ClientMessage) {
}
request := NewBackendClientRoomRequest(roomId, session.UserId(), sessionId)
request.Room.UpdateFromSession(session)
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &room); err != nil {
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendOcsUrl(), request, &room); err != nil {
session.SendMessage(message.NewWrappedErrorServerMessage(err))
return
}
@ -2395,7 +2395,7 @@ func (h *Hub) processInternalMsg(sess Session, message *ClientMessage) {
request.Room.InCall = sess.GetInCall()
var response BackendClientResponse
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil {
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendOcsUrl(), request, &response); err != nil {
sess.Close()
log.Printf("Could not join virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err)
reply := message.NewErrorServerMessage(NewError("add_failed", "Could not join virtual session."))
@ -2413,7 +2413,7 @@ func (h *Hub) processInternalMsg(sess Session, message *ClientMessage) {
} else {
request := NewBackendClientSessionRequest(room.Id(), "add", publicSessionId, msg)
var response BackendClientSessionResponse
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil {
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendOcsUrl(), request, &response); err != nil {
sess.Close()
log.Printf("Could not add virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err)
reply := message.NewErrorServerMessage(NewError("add_failed", "Could not add virtual session."))
@ -2620,7 +2620,7 @@ func (h *Hub) isInSameCallRemote(ctx context.Context, senderSession *ClientSessi
go func(client *GrpcClient) {
defer wg.Done()
inCall, err := client.IsSessionInCall(rpcCtx, recipientSessionId, senderRoom)
inCall, err := client.IsSessionInCall(rpcCtx, recipientSessionId, senderRoom, senderSession.BackendUrl())
if errors.Is(err, context.Canceled) {
return
} else if err != nil {

View file

@ -2019,7 +2019,7 @@ func TestClientHelloClient_V3Api(t *testing.T) {
}
// The "/api/v1/signaling/" URL will be changed to use "v3" as the "signaling-v3"
// feature is returned by the capabilities endpoint.
require.NoError(client.SendHelloParams(server.URL+"/ocs/v2.php/apps/spreed/api/v1/signaling/backend", HelloVersionV1, "client", nil, params))
require.NoError(client.SendHelloParams(server.URL, HelloVersionV1, "client", nil, params))
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

22
room.go
View file

@ -29,6 +29,7 @@ import (
"log"
"net/url"
"strconv"
"strings"
"sync"
"time"
@ -603,7 +604,7 @@ func (r *Room) getClusteredInternalSessionsRLocked() (internal map[string]*Inter
go func(c *GrpcClient) {
defer wg.Done()
clientInternal, clientVirtual, err := c.GetInternalSessions(ctx, r.Id(), r.Backend())
clientInternal, clientVirtual, err := c.GetInternalSessions(ctx, r.Id(), r.Backend().Urls())
if err != nil {
log.Printf("Received error while getting internal sessions for %s@%s from %s: %s", r.Id(), r.Backend().Id(), c.Target(), err)
return
@ -1043,6 +1044,20 @@ func (r *Room) publishActiveSessions() (int, *sync.WaitGroup) {
continue
}
u += PathToOcsSignalingBackend
parsed, err := url.Parse(u)
if err != nil {
log.Printf("Could not parse backend url %s: %s", u, err)
continue
}
if strings.Contains(parsed.Host, ":") && hasStandardPort(parsed) {
parsed.Host = parsed.Hostname()
}
u = parsed.String()
parsedBackendUrl := parsed
var sid string
var uid string
switch sess := session.(type) {
@ -1062,12 +1077,11 @@ func (r *Room) publishActiveSessions() (int, *sync.WaitGroup) {
}
e, found := entries[u]
if !found {
p := session.ParsedBackendUrl()
if p == nil {
if parsedBackendUrl == nil {
// Should not happen, invalid URLs should get rejected earlier.
continue
}
urls[u] = p
urls[u] = parsedBackendUrl
}
entries[u] = append(e, BackendPingEntry{

View file

@ -132,6 +132,10 @@ func (s *VirtualSession) ParsedBackendUrl() *url.URL {
return s.session.ParsedBackendUrl()
}
func (s *VirtualSession) ParsedBackendOcsUrl() *url.URL {
return s.session.ParsedBackendOcsUrl()
}
func (s *VirtualSession) UserId() string {
return s.userId
}
@ -197,7 +201,7 @@ func (s *VirtualSession) notifyBackendRemoved(room *Room, session Session, messa
}
var response BackendClientResponse
if err := s.hub.backend.PerformJSONRequest(ctx, s.ParsedBackendUrl(), request, &response); err != nil {
if err := s.hub.backend.PerformJSONRequest(ctx, s.ParsedBackendOcsUrl(), request, &response); err != nil {
virtualSessionId := GetVirtualSessionId(s.session, s.PublicId())
log.Printf("Could not leave virtual session %s at backend %s: %s", virtualSessionId, s.BackendUrl(), err)
if session != nil && message != nil {
@ -222,7 +226,7 @@ func (s *VirtualSession) notifyBackendRemoved(room *Room, session Session, messa
User: s.userData,
})
var response BackendClientSessionResponse
err := s.hub.backend.PerformJSONRequest(ctx, s.ParsedBackendUrl(), request, &response)
err := s.hub.backend.PerformJSONRequest(ctx, s.ParsedBackendOcsUrl(), request, &response)
if err != nil {
log.Printf("Could not remove virtual session %s from backend %s: %s", s.PublicId(), s.BackendUrl(), err)
if session != nil && message != nil {

View file

@ -42,8 +42,7 @@ func TestVirtualSession(t *testing.T) {
roomId := "the-room-id"
emptyProperties := json.RawMessage("{}")
backend := &Backend{
id: "compat",
compat: true,
id: "compat",
}
room, err := hub.createRoom(roomId, emptyProperties, backend)
require.NoError(err)
@ -270,8 +269,7 @@ func TestVirtualSessionCustomInCall(t *testing.T) {
roomId := "the-room-id"
emptyProperties := json.RawMessage("{}")
backend := &Backend{
id: "compat",
compat: true,
id: "compat",
}
room, err := hub.createRoom(roomId, emptyProperties, backend)
require.NoError(err)
@ -425,8 +423,7 @@ func TestVirtualSessionCleanup(t *testing.T) {
roomId := "the-room-id"
emptyProperties := json.RawMessage("{}")
backend := &Backend{
id: "compat",
compat: true,
id: "compat",
}
room, err := hub.createRoom(roomId, emptyProperties, backend)
require.NoError(err)