diff --git a/clientsession.go b/clientsession.go index 5e2fe4f..2ae2cf8 100644 --- a/clientsession.go +++ b/clientsession.go @@ -444,12 +444,16 @@ func (s *ClientSession) UpdateRoomSessionId(roomSessionId string) error { if roomSessionId != "" { if room := s.GetRoom(); room != nil { log.Printf("Session %s updated room session id to %s in room %s", s.PublicId(), roomSessionId, room.Id()) + } else if client := s.GetFederationClient(); client != nil { + log.Printf("Session %s updated room session id to %s in federated room %s", s.PublicId(), roomSessionId, client.RemoteRoomId()) } else { log.Printf("Session %s updated room session id to %s in unknown room", s.PublicId(), roomSessionId) } } else { if room := s.GetRoom(); room != nil { log.Printf("Session %s cleared room session id in room %s", s.PublicId(), room.Id()) + } else if client := s.GetFederationClient(); client != nil { + log.Printf("Session %s cleared room session id in federated room %s", s.PublicId(), client.RemoteRoomId()) } else { log.Printf("Session %s cleared room session id in unknown room", s.PublicId()) } diff --git a/federation.go b/federation.go index b93dacc..3aab6e5 100644 --- a/federation.go +++ b/federation.go @@ -158,6 +158,14 @@ func (c *FederationClient) URL() string { return c.federation.Load().parsedSignalingUrl.String() } +func (c *FederationClient) RoomId() string { + return c.roomId.Load().(string) +} + +func (c *FederationClient) RemoteRoomId() string { + return c.remoteRoomId.Load().(string) +} + func (c *FederationClient) CanReuse(federation *RoomFederationMessage) bool { fed := c.federation.Load() return fed.NextcloudUrl == federation.NextcloudUrl && @@ -643,8 +651,8 @@ func (c *FederationClient) processMessage(msg *ServerMessage) { remoteSessionId = hello.SessionId } - remoteRoomId := c.remoteRoomId.Load().(string) - roomId := c.roomId.Load().(string) + remoteRoomId := c.RemoteRoomId() + roomId := c.RoomId() var doClose bool switch msg.Type { diff --git a/hub.go b/hub.go index 70b6e27..d7e1872 100644 --- a/hub.go +++ b/hub.go @@ -169,6 +169,7 @@ type Hub struct { expectHelloClients map[HandlerClient]time.Time dialoutSessions map[*ClientSession]bool remoteSessions map[*RemoteSession]bool + federatedSessions map[*ClientSession]bool backendTimeout time.Duration backend *BackendClient @@ -356,6 +357,7 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer expectHelloClients: make(map[HandlerClient]time.Time), dialoutSessions: make(map[*ClientSession]bool), remoteSessions: make(map[*RemoteSession]bool), + federatedSessions: make(map[*ClientSession]bool), backendTimeout: backendTimeout, backend: backend, @@ -465,6 +467,7 @@ func (h *Hub) Run() { defer h.backend.Close() housekeeping := time.NewTicker(housekeepingInterval) + federationPing := time.NewTicker(updateActiveSessionsInterval) geoipUpdater := time.NewTicker(24 * time.Hour) loop: @@ -484,6 +487,8 @@ loop: h.performHousekeeping(now) case <-geoipUpdater.C: go h.updateGeoDatabase() + case <-federationPing.C: + go h.publishFederatedSessions() case <-h.closer.C: break loop } @@ -1626,6 +1631,21 @@ func (h *Hub) processRoom(sess Session, message *ClientMessage) { } session.SetFederationClient(client) + + roomSessionId := message.Room.SessionId + if roomSessionId == "" { + // TODO(jojo): Better make the session id required in the request. + log.Printf("User did not send a room session id, assuming session %s", session.PublicId()) + roomSessionId = session.PublicId() + } + + if err := session.UpdateRoomSessionId(roomSessionId); err != nil { + log.Printf("Error updating room session id for session %s: %s", session.PublicId(), err) + } + + h.mu.Lock() + h.federatedSessions[session] = true + h.mu.Unlock() return } @@ -1693,6 +1713,82 @@ func (h *Hub) processRoom(sess Session, message *ClientMessage) { h.processJoinRoom(session, message, &room) } +func (h *Hub) publishFederatedSessions() (int, *sync.WaitGroup) { + h.mu.RLock() + defer h.mu.RUnlock() + + if len(h.federatedSessions) == 0 { + return 0, nil + } + + rooms := make(map[string]map[string][]BackendPingEntry) + urls := make(map[string]*url.URL) + for session := range h.federatedSessions { + u := session.BackendUrl() + if u == "" { + continue + } + + federation := session.GetFederationClient() + if federation == nil { + continue + } + + var sid string + var uid string + // Use Nextcloud session id and user id + sid = session.RoomSessionId() + uid = session.AuthUserId() + if sid == "" { + continue + } + + roomId := federation.RoomId() + entries, found := rooms[roomId] + if !found { + entries = make(map[string][]BackendPingEntry) + rooms[roomId] = entries + } + + e, found := entries[u] + if !found { + p := session.ParsedBackendUrl() + if p == nil { + // Should not happen, invalid URLs should get rejected earlier. + continue + } + urls[u] = p + } + + entries[u] = append(e, BackendPingEntry{ + SessionId: sid, + UserId: uid, + }) + } + + var wg sync.WaitGroup + if len(urls) == 0 { + return 0, &wg + } + count := 0 + for roomId, entries := range rooms { + count += len(entries) + for u, e := range entries { + wg.Add(1) + go func(roomId string, url *url.URL, entries []BackendPingEntry) { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), h.backendTimeout) + defer cancel() + + if err := h.roomPing.SendPings(ctx, roomId, url, entries); err != nil { + log.Printf("Error pinging room %s for active entries %+v: %s", roomId, entries, err) + } + }(roomId, urls[u], e) + } + } + return count, &wg +} + func (h *Hub) getRoomForBackend(id string, backend *Backend) *Room { internalRoomId := getRoomIdForBackend(id, backend) @@ -1735,6 +1831,9 @@ func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, ro } session.LeaveRoom(true) + h.mu.Lock() + delete(h.federatedSessions, session) + h.mu.Unlock() roomId := room.Room.RoomId internalRoomId := getRoomIdForBackend(roomId, session.Backend())