Close connection if last remote publisher is removed.

This commit is contained in:
Joachim Bauch 2025-05-07 10:28:49 +02:00
commit 420f7eb0ba
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
2 changed files with 60 additions and 2 deletions

View file

@ -28,6 +28,7 @@ import (
"encoding/json"
"errors"
"log"
"net"
"net/http"
"net/url"
"strconv"
@ -205,6 +206,10 @@ func (c *RemoteConnection) sendClose() error {
c.mu.Lock()
defer c.mu.Unlock()
return c.sendCloseLocked()
}
func (c *RemoteConnection) sendCloseLocked() error {
if c.conn == nil {
return ErrNotConnected
}
@ -231,7 +236,12 @@ func (c *RemoteConnection) Close() error {
return nil
}
c.sendClose()
if !c.closed.CompareAndSwap(false, true) {
// Already closed
return nil
}
c.closer.Close()
err1 := c.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Time{})
err2 := c.conn.Close()
c.conn = nil
@ -317,7 +327,9 @@ func (c *RemoteConnection) readPump(conn *websocket.Conn) {
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived) {
log.Printf("Error reading from %s: %v", c, err)
if !errors.Is(err, net.ErrClosed) || !c.closed.Load() {
log.Printf("Error reading from %s: %v", c, err)
}
}
break
}

View file

@ -145,6 +145,7 @@ type ProxyServer struct {
remoteHostname string
remoteConnections map[string]*RemoteConnection
remoteConnectionsLock sync.Mutex
remotePublishers map[string]map[*proxyRemotePublisher]bool
}
func IsPublicIP(IP net.IP) bool {
@ -364,6 +365,7 @@ func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile) (*
remoteTlsConfig: remoteTlsConfig,
remoteHostname: remoteHostname,
remoteConnections: make(map[string]*RemoteConnection),
remotePublishers: make(map[string]map[*proxyRemotePublisher]bool),
}
result.maxIncoming.Store(int64(maxIncoming) * 1024 * 1024)
@ -858,6 +860,8 @@ func (p *proxyRemotePublisher) StartPublishing(ctx context.Context, publisher si
}
func (p *proxyRemotePublisher) StopPublishing(ctx context.Context, publisher signaling.McuRemotePublisherProperties) error {
defer p.proxy.removeRemotePublisher(p)
conn, err := p.proxy.getRemoteConnection(p.remoteUrl)
if err != nil {
return err
@ -899,6 +903,46 @@ func (p *proxyRemotePublisher) GetStreams(ctx context.Context) ([]signaling.Publ
return response.Command.Streams, nil
}
func (s *ProxyServer) addRemotePublisher(publisher *proxyRemotePublisher) {
s.remoteConnectionsLock.Lock()
defer s.remoteConnectionsLock.Unlock()
publishers, found := s.remotePublishers[publisher.remoteUrl]
if !found {
publishers = make(map[*proxyRemotePublisher]bool)
s.remotePublishers[publisher.remoteUrl] = publishers
}
publishers[publisher] = true
log.Printf("Add remote publisher to %s", publisher.remoteUrl)
}
func (s *ProxyServer) removeRemotePublisher(publisher *proxyRemotePublisher) {
s.remoteConnectionsLock.Lock()
defer s.remoteConnectionsLock.Unlock()
log.Printf("Removing remote publisher to %s", publisher.remoteUrl)
publishers, found := s.remotePublishers[publisher.remoteUrl]
if !found {
return
}
delete(publishers, publisher)
if len(publishers) > 0 {
return
}
delete(s.remotePublishers, publisher.remoteUrl)
if conn, found := s.remoteConnections[publisher.remoteUrl]; found {
delete(s.remoteConnections, publisher.remoteUrl)
if err := conn.Close(); err != nil {
log.Printf("Error closing remote connection to %s: %s", publisher.remoteUrl, err)
} else {
log.Printf("Remote connection to %s closed", publisher.remoteUrl)
}
}
}
func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, session *ProxySession, message *signaling.ProxyClientMessage) {
cmd := message.Command
@ -1017,6 +1061,8 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s
go publisher.Close(context.Background())
}()
s.addRemotePublisher(controller)
subscriber, err = remoteMcu.NewRemoteSubscriber(subCtx, session, publisher)
if err != nil {
handleCreateError(err)