Re-create publisher with new endpoint if it already exists.

This commit is contained in:
Joachim Bauch 2024-04-23 15:56:24 +02:00
parent 7123f07d30
commit 92e9bed597
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
5 changed files with 34 additions and 18 deletions

View file

@ -159,8 +159,8 @@ type McuPublisher interface {
SetMedia(MediaType) SetMedia(MediaType)
GetStreams(ctx context.Context) ([]PublisherStream, error) GetStreams(ctx context.Context) ([]PublisherStream, error)
PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error
UnpublishRemote(ctx context.Context, hostname string) error UnpublishRemote(ctx context.Context, remoteId string) error
} }
type McuSubscriber interface { type McuSubscriber interface {

View file

@ -393,16 +393,16 @@ func (p *mcuJanusPublisher) GetStreams(ctx context.Context) ([]PublisherStream,
return streams, nil return streams, nil
} }
func getPublisherRemoteId(id string, hostname string) string { func getPublisherRemoteId(id string, remoteId string) string {
return fmt.Sprintf("%s@%s", id, hostname) return fmt.Sprintf("%s@%s", id, remoteId)
} }
func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error { func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error {
msg := map[string]interface{}{ msg := map[string]interface{}{
"request": "publish_remotely", "request": "publish_remotely",
"room": p.roomId, "room": p.roomId,
"publisher_id": streamTypeUserIds[p.streamType], "publisher_id": streamTypeUserIds[p.streamType],
"remote_id": getPublisherRemoteId(p.id, hostname), "remote_id": getPublisherRemoteId(p.id, remoteId),
"host": hostname, "host": hostname,
"port": port, "port": port,
"rtcp_port": rtcpPort, "rtcp_port": rtcpPort,
@ -430,16 +430,16 @@ func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, hostname string,
} }
} }
log.Printf("Publishing %s to %s (port=%d, rtcpPort=%d)", p.id, hostname, port, rtcpPort) log.Printf("Publishing %s to %s (port=%d, rtcpPort=%d) for %s", p.id, hostname, port, rtcpPort, remoteId)
return nil return nil
} }
func (p *mcuJanusPublisher) UnpublishRemote(ctx context.Context, hostname string) error { func (p *mcuJanusPublisher) UnpublishRemote(ctx context.Context, remoteId string) error {
msg := map[string]interface{}{ msg := map[string]interface{}{
"request": "unpublish_remotely", "request": "unpublish_remotely",
"room": p.roomId, "room": p.roomId,
"publisher_id": streamTypeUserIds[p.streamType], "publisher_id": streamTypeUserIds[p.streamType],
"remote_id": getPublisherRemoteId(p.id, hostname), "remote_id": getPublisherRemoteId(p.id, remoteId),
} }
response, err := p.handle.Request(ctx, msg) response, err := p.handle.Request(ctx, msg)
if err != nil { if err != nil {
@ -464,6 +464,6 @@ func (p *mcuJanusPublisher) UnpublishRemote(ctx context.Context, hostname string
} }
} }
log.Printf("Unpublished %s to %s", p.id, hostname) log.Printf("Unpublished remote %s for %s", p.id, remoteId)
return nil return nil
} }

View file

@ -221,11 +221,11 @@ func (p *mcuProxyPublisher) GetStreams(ctx context.Context) ([]PublisherStream,
return nil, errors.New("not implemented") return nil, errors.New("not implemented")
} }
func (p *mcuProxyPublisher) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error { func (p *mcuProxyPublisher) PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error {
return errors.New("remote publishing not supported for proxy publishers") return errors.New("remote publishing not supported for proxy publishers")
} }
func (p *mcuProxyPublisher) UnpublishRemote(ctx context.Context, hostname string) error { func (p *mcuProxyPublisher) UnpublishRemote(ctx context.Context, remoteId string) error {
return errors.New("remote publishing not supported for proxy publishers") return errors.New("remote publishing not supported for proxy publishers")
} }

View file

@ -227,11 +227,11 @@ func (p *TestMCUPublisher) GetStreams(ctx context.Context) ([]PublisherStream, e
return nil, errors.New("not implemented") return nil, errors.New("not implemented")
} }
func (p *TestMCUPublisher) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error { func (p *TestMCUPublisher) PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error {
return errors.New("remote publishing not supported") return errors.New("remote publishing not supported")
} }
func (p *TestMCUPublisher) UnpublishRemote(ctx context.Context, hostname string) error { func (p *TestMCUPublisher) UnpublishRemote(ctx context.Context, remoteId string) error {
return errors.New("remote publishing not supported") return errors.New("remote publishing not supported")
} }

View file

@ -48,6 +48,7 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/gorilla/securecookie" "github.com/gorilla/securecookie"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/notedit/janus-go"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
signaling "github.com/strukturag/nextcloud-spreed-signaling" signaling "github.com/strukturag/nextcloud-spreed-signaling"
@ -972,10 +973,25 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s
return return
} }
if err := publisher.PublishRemote(ctx, cmd.Hostname, cmd.Port, cmd.RtcpPort); err != nil { if err := publisher.PublishRemote(ctx, session.PublicId(), cmd.Hostname, cmd.Port, cmd.RtcpPort); err != nil {
log.Printf("Error publishing %s %s to remote %s (port=%d, rtcpPort=%d): %s", publisher.StreamType(), cmd.ClientId, cmd.Hostname, cmd.Port, cmd.RtcpPort, err) var je *janus.ErrorMsg
session.sendMessage(message.NewWrappedErrorServerMessage(err)) if !errors.As(err, &je) || je.Err.Code != signaling.JANUS_VIDEOROOM_ERROR_ID_EXISTS {
return log.Printf("Error publishing %s %s to remote %s (port=%d, rtcpPort=%d): %s", publisher.StreamType(), cmd.ClientId, cmd.Hostname, cmd.Port, cmd.RtcpPort, err)
session.sendMessage(message.NewWrappedErrorServerMessage(err))
return
}
if err := publisher.UnpublishRemote(ctx, session.PublicId()); err != nil {
log.Printf("Error unpublishing old %s %s to remote %s (port=%d, rtcpPort=%d): %s", publisher.StreamType(), cmd.ClientId, cmd.Hostname, cmd.Port, cmd.RtcpPort, err)
session.sendMessage(message.NewWrappedErrorServerMessage(err))
return
}
if err := publisher.PublishRemote(ctx, session.PublicId(), cmd.Hostname, cmd.Port, cmd.RtcpPort); err != nil {
log.Printf("Error publishing %s %s to remote %s (port=%d, rtcpPort=%d): %s", publisher.StreamType(), cmd.ClientId, cmd.Hostname, cmd.Port, cmd.RtcpPort, err)
session.sendMessage(message.NewWrappedErrorServerMessage(err))
return
}
} }
response := &signaling.ProxyServerMessage{ response := &signaling.ProxyServerMessage{