Merge pull request #987 from strukturag/detect-close-on-remote-publishers

Close subscriber if remote publisher was closed.
This commit is contained in:
Joachim Bauch 2025-05-07 11:12:42 +02:00 committed by GitHub
commit 9ef23270b4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 133 additions and 5 deletions

View file

@ -211,6 +211,8 @@ type McuClient interface {
type McuPublisher interface {
McuClient
PublisherId() string
HasMedia(MediaType) bool
SetMedia(MediaType)

View file

@ -56,6 +56,10 @@ type mcuJanusPublisher struct {
answerSdp atomic.Pointer[sdp.SessionDescription]
}
func (p *mcuJanusPublisher) PublisherId() string {
return p.id
}
func (p *mcuJanusPublisher) handleEvent(event *janus.EventMsg) {
if videoroom := getPluginStringValue(event.Plugindata, pluginVideoRoom, "videoroom"); videoroom != "" {
ctx := context.TODO()

View file

@ -153,6 +153,10 @@ func newMcuProxyPublisher(id string, sid string, streamType StreamType, maxBitra
}
}
func (p *mcuProxyPublisher) PublisherId() string {
return p.id
}
func (p *mcuProxyPublisher) HasMedia(mt MediaType) bool {
return (p.settings.MediaTypes & mt) == mt
}

View file

@ -184,6 +184,10 @@ type TestMCUPublisher struct {
sdp string
}
func (p *TestMCUPublisher) PublisherId() string {
return p.id
}
func (p *TestMCUPublisher) HasMedia(mt MediaType) bool {
return (p.settings.MediaTypes & mt) == mt
}

View file

@ -28,6 +28,7 @@ import (
"encoding/json"
"errors"
"log"
"net"
"net/http"
"net/url"
"strconv"
@ -61,6 +62,7 @@ var (
type RemoteConnection struct {
mu sync.Mutex
p *ProxyServer
url *url.URL
conn *websocket.Conn
closer *signaling.Closer
@ -82,13 +84,14 @@ type RemoteConnection struct {
messageCallbacks map[string]chan *signaling.ProxyServerMessage
}
func NewRemoteConnection(proxyUrl string, tokenId string, tokenKey *rsa.PrivateKey, tlsConfig *tls.Config) (*RemoteConnection, error) {
func NewRemoteConnection(p *ProxyServer, proxyUrl string, tokenId string, tokenKey *rsa.PrivateKey, tlsConfig *tls.Config) (*RemoteConnection, error) {
u, err := url.Parse(proxyUrl)
if err != nil {
return nil, err
}
result := &RemoteConnection{
p: p,
url: u,
closer: signaling.NewCloser(),
@ -203,6 +206,10 @@ func (c *RemoteConnection) sendClose() error {
c.mu.Lock()
defer c.mu.Unlock()
return c.sendCloseLocked()
}
func (c *RemoteConnection) sendCloseLocked() error {
if c.conn == nil {
return ErrNotConnected
}
@ -229,7 +236,12 @@ func (c *RemoteConnection) Close() error {
return nil
}
c.sendClose()
if !c.closed.CompareAndSwap(false, true) {
// Already closed
return nil
}
c.closer.Close()
err1 := c.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Time{})
err2 := c.conn.Close()
c.conn = nil
@ -315,7 +327,9 @@ func (c *RemoteConnection) readPump(conn *websocket.Conn) {
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived) {
log.Printf("Error reading from %s: %v", c, err)
if !errors.Is(err, net.ErrClosed) || !c.closed.Load() {
log.Printf("Error reading from %s: %v", c, err)
}
}
break
}
@ -448,6 +462,13 @@ func (c *RemoteConnection) processMessage(msg *signaling.ProxyServerMessage) {
switch msg.Type {
case "event":
c.processEvent(msg)
case "bye":
log.Printf("Connection to %s was closed: %s", c, msg.Bye.Reason)
if msg.Bye.Reason == "session_expired" {
// Don't try to resume expired session.
c.sessionId = ""
}
c.scheduleReconnect()
default:
log.Printf("Received unsupported message %+v from %s", msg, c)
}
@ -456,6 +477,10 @@ func (c *RemoteConnection) processMessage(msg *signaling.ProxyServerMessage) {
func (c *RemoteConnection) processEvent(msg *signaling.ProxyServerMessage) {
switch msg.Event.Type {
case "update-load":
// Ignore
case "publisher-closed":
log.Printf("Remote publisher %s was closed on %s", msg.Event.ClientId, c)
c.p.RemotePublisherDeleted(msg.Event.ClientId)
default:
log.Printf("Received unsupported event %+v from %s", msg, c)
}

View file

@ -145,6 +145,7 @@ type ProxyServer struct {
remoteHostname string
remoteConnections map[string]*RemoteConnection
remoteConnectionsLock sync.Mutex
remotePublishers map[string]map[*proxyRemotePublisher]bool
}
func IsPublicIP(IP net.IP) bool {
@ -364,6 +365,7 @@ func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile) (*
remoteTlsConfig: remoteTlsConfig,
remoteHostname: remoteHostname,
remoteConnections: make(map[string]*RemoteConnection),
remotePublishers: make(map[string]map[*proxyRemotePublisher]bool),
}
result.maxIncoming.Store(int64(maxIncoming) * 1024 * 1024)
@ -858,6 +860,8 @@ func (p *proxyRemotePublisher) StartPublishing(ctx context.Context, publisher si
}
func (p *proxyRemotePublisher) StopPublishing(ctx context.Context, publisher signaling.McuRemotePublisherProperties) error {
defer p.proxy.removeRemotePublisher(p)
conn, err := p.proxy.getRemoteConnection(p.remoteUrl)
if err != nil {
return err
@ -899,6 +903,46 @@ func (p *proxyRemotePublisher) GetStreams(ctx context.Context) ([]signaling.Publ
return response.Command.Streams, nil
}
func (s *ProxyServer) addRemotePublisher(publisher *proxyRemotePublisher) {
s.remoteConnectionsLock.Lock()
defer s.remoteConnectionsLock.Unlock()
publishers, found := s.remotePublishers[publisher.remoteUrl]
if !found {
publishers = make(map[*proxyRemotePublisher]bool)
s.remotePublishers[publisher.remoteUrl] = publishers
}
publishers[publisher] = true
log.Printf("Add remote publisher to %s", publisher.remoteUrl)
}
func (s *ProxyServer) removeRemotePublisher(publisher *proxyRemotePublisher) {
s.remoteConnectionsLock.Lock()
defer s.remoteConnectionsLock.Unlock()
log.Printf("Removing remote publisher to %s", publisher.remoteUrl)
publishers, found := s.remotePublishers[publisher.remoteUrl]
if !found {
return
}
delete(publishers, publisher)
if len(publishers) > 0 {
return
}
delete(s.remotePublishers, publisher.remoteUrl)
if conn, found := s.remoteConnections[publisher.remoteUrl]; found {
delete(s.remoteConnections, publisher.remoteUrl)
if err := conn.Close(); err != nil {
log.Printf("Error closing remote connection to %s: %s", publisher.remoteUrl, err)
} else {
log.Printf("Remote connection to %s closed", publisher.remoteUrl)
}
}
}
func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, session *ProxySession, message *signaling.ProxyClientMessage) {
cmd := message.Command
@ -1017,6 +1061,8 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s
go publisher.Close(context.Background())
}()
s.addRemotePublisher(controller)
subscriber, err = remoteMcu.NewRemoteSubscriber(subCtx, session, publisher)
if err != nil {
handleCreateError(err)
@ -1595,7 +1641,7 @@ func (s *ProxyServer) getRemoteConnection(url string) (*RemoteConnection, error)
return conn, nil
}
conn, err := NewRemoteConnection(url, s.tokenId, s.tokenKey, s.remoteTlsConfig)
conn, err := NewRemoteConnection(s, url, s.tokenId, s.tokenKey, s.remoteTlsConfig)
if err != nil {
return nil, err
}
@ -1612,3 +1658,12 @@ func (s *ProxyServer) PublisherDeleted(publisher signaling.McuPublisher) {
session.OnPublisherDeleted(publisher)
}
}
func (s *ProxyServer) RemotePublisherDeleted(publisherId string) {
s.sessionsLock.RLock()
defer s.sessionsLock.RUnlock()
for _, session := range s.sessions {
session.OnRemotePublisherDeleted(publisherId)
}
}

View file

@ -401,6 +401,10 @@ func (p *TestMCUPublisher) Id() string {
return p.id
}
func (p *TestMCUPublisher) PublisherId() string {
return p.id
}
func (p *TestMCUPublisher) Sid() string {
return p.sid
}

View file

@ -38,6 +38,7 @@ const (
)
type remotePublisherData struct {
id string
hostname string
port int
rtcpPort int
@ -400,6 +401,7 @@ func (s *ProxySession) AddRemotePublisher(publisher signaling.McuPublisher, host
}
data := &remotePublisherData{
id: publisher.PublisherId(),
hostname: hostname,
port: port,
rtcpPort: rtcpPort,
@ -431,5 +433,33 @@ func (s *ProxySession) OnPublisherDeleted(publisher signaling.McuPublisher) {
s.remotePublishersLock.Lock()
defer s.remotePublishersLock.Unlock()
delete(s.remotePublishers, publisher)
if entries, found := s.remotePublishers[publisher]; found {
delete(s.remotePublishers, publisher)
for _, entry := range entries {
msg := &signaling.ProxyServerMessage{
Type: "event",
Event: &signaling.EventProxyServerMessage{
Type: "publisher-closed",
ClientId: entry.id,
},
}
s.sendMessage(msg)
}
}
}
func (s *ProxySession) OnRemotePublisherDeleted(publisherId string) {
s.subscribersLock.Lock()
defer s.subscribersLock.Unlock()
for id, sub := range s.subscribers {
if sub.Publisher() == publisherId {
delete(s.subscribers, id)
delete(s.subscriberIds, sub)
log.Printf("Remote subscriber %s was closed, closing %s subscriber %s", publisherId, sub.StreamType(), sub.Id())
go sub.Close(context.Background())
}
}
}