Send ping requests to local instance for federated sessions.

This commit is contained in:
Joachim Bauch 2024-08-28 11:46:57 +02:00
commit bfefcfea47
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
3 changed files with 113 additions and 2 deletions

View file

@ -444,12 +444,16 @@ func (s *ClientSession) UpdateRoomSessionId(roomSessionId string) error {
if roomSessionId != "" {
if room := s.GetRoom(); room != nil {
log.Printf("Session %s updated room session id to %s in room %s", s.PublicId(), roomSessionId, room.Id())
} else if client := s.GetFederationClient(); client != nil {
log.Printf("Session %s updated room session id to %s in federated room %s", s.PublicId(), roomSessionId, client.RemoteRoomId())
} else {
log.Printf("Session %s updated room session id to %s in unknown room", s.PublicId(), roomSessionId)
}
} else {
if room := s.GetRoom(); room != nil {
log.Printf("Session %s cleared room session id in room %s", s.PublicId(), room.Id())
} else if client := s.GetFederationClient(); client != nil {
log.Printf("Session %s cleared room session id in federated room %s", s.PublicId(), client.RemoteRoomId())
} else {
log.Printf("Session %s cleared room session id in unknown room", s.PublicId())
}

View file

@ -158,6 +158,14 @@ func (c *FederationClient) URL() string {
return c.federation.Load().parsedSignalingUrl.String()
}
func (c *FederationClient) RoomId() string {
return c.roomId.Load().(string)
}
func (c *FederationClient) RemoteRoomId() string {
return c.remoteRoomId.Load().(string)
}
func (c *FederationClient) CanReuse(federation *RoomFederationMessage) bool {
fed := c.federation.Load()
return fed.NextcloudUrl == federation.NextcloudUrl &&
@ -643,8 +651,8 @@ func (c *FederationClient) processMessage(msg *ServerMessage) {
remoteSessionId = hello.SessionId
}
remoteRoomId := c.remoteRoomId.Load().(string)
roomId := c.roomId.Load().(string)
remoteRoomId := c.RemoteRoomId()
roomId := c.RoomId()
var doClose bool
switch msg.Type {

99
hub.go
View file

@ -169,6 +169,7 @@ type Hub struct {
expectHelloClients map[HandlerClient]time.Time
dialoutSessions map[*ClientSession]bool
remoteSessions map[*RemoteSession]bool
federatedSessions map[*ClientSession]bool
backendTimeout time.Duration
backend *BackendClient
@ -356,6 +357,7 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer
expectHelloClients: make(map[HandlerClient]time.Time),
dialoutSessions: make(map[*ClientSession]bool),
remoteSessions: make(map[*RemoteSession]bool),
federatedSessions: make(map[*ClientSession]bool),
backendTimeout: backendTimeout,
backend: backend,
@ -465,6 +467,7 @@ func (h *Hub) Run() {
defer h.backend.Close()
housekeeping := time.NewTicker(housekeepingInterval)
federationPing := time.NewTicker(updateActiveSessionsInterval)
geoipUpdater := time.NewTicker(24 * time.Hour)
loop:
@ -484,6 +487,8 @@ loop:
h.performHousekeeping(now)
case <-geoipUpdater.C:
go h.updateGeoDatabase()
case <-federationPing.C:
go h.publishFederatedSessions()
case <-h.closer.C:
break loop
}
@ -1626,6 +1631,21 @@ func (h *Hub) processRoom(sess Session, message *ClientMessage) {
}
session.SetFederationClient(client)
roomSessionId := message.Room.SessionId
if roomSessionId == "" {
// TODO(jojo): Better make the session id required in the request.
log.Printf("User did not send a room session id, assuming session %s", session.PublicId())
roomSessionId = session.PublicId()
}
if err := session.UpdateRoomSessionId(roomSessionId); err != nil {
log.Printf("Error updating room session id for session %s: %s", session.PublicId(), err)
}
h.mu.Lock()
h.federatedSessions[session] = true
h.mu.Unlock()
return
}
@ -1693,6 +1713,82 @@ func (h *Hub) processRoom(sess Session, message *ClientMessage) {
h.processJoinRoom(session, message, &room)
}
func (h *Hub) publishFederatedSessions() (int, *sync.WaitGroup) {
h.mu.RLock()
defer h.mu.RUnlock()
if len(h.federatedSessions) == 0 {
return 0, nil
}
rooms := make(map[string]map[string][]BackendPingEntry)
urls := make(map[string]*url.URL)
for session := range h.federatedSessions {
u := session.BackendUrl()
if u == "" {
continue
}
federation := session.GetFederationClient()
if federation == nil {
continue
}
var sid string
var uid string
// Use Nextcloud session id and user id
sid = session.RoomSessionId()
uid = session.AuthUserId()
if sid == "" {
continue
}
roomId := federation.RoomId()
entries, found := rooms[roomId]
if !found {
entries = make(map[string][]BackendPingEntry)
rooms[roomId] = entries
}
e, found := entries[u]
if !found {
p := session.ParsedBackendUrl()
if p == nil {
// Should not happen, invalid URLs should get rejected earlier.
continue
}
urls[u] = p
}
entries[u] = append(e, BackendPingEntry{
SessionId: sid,
UserId: uid,
})
}
var wg sync.WaitGroup
if len(urls) == 0 {
return 0, &wg
}
count := 0
for roomId, entries := range rooms {
count += len(entries)
for u, e := range entries {
wg.Add(1)
go func(roomId string, url *url.URL, entries []BackendPingEntry) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), h.backendTimeout)
defer cancel()
if err := h.roomPing.SendPings(ctx, roomId, url, entries); err != nil {
log.Printf("Error pinging room %s for active entries %+v: %s", roomId, entries, err)
}
}(roomId, urls[u], e)
}
}
return count, &wg
}
func (h *Hub) getRoomForBackend(id string, backend *Backend) *Room {
internalRoomId := getRoomIdForBackend(id, backend)
@ -1735,6 +1831,9 @@ func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, ro
}
session.LeaveRoom(true)
h.mu.Lock()
delete(h.federatedSessions, session)
h.mu.Unlock()
roomId := room.Room.RoomId
internalRoomId := getRoomIdForBackend(roomId, session.Backend())