Re-create publisher with new endpoint if it already exists.

This commit is contained in:
Joachim Bauch 2024-04-23 15:56:24 +02:00
parent 7eae192256
commit 5b04f1ff65
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
5 changed files with 34 additions and 18 deletions

View file

@ -159,8 +159,8 @@ type McuPublisher interface {
SetMedia(MediaType)
GetStreams(ctx context.Context) ([]PublisherStream, error)
PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error
UnpublishRemote(ctx context.Context, hostname string) error
PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error
UnpublishRemote(ctx context.Context, remoteId string) error
}
type McuSubscriber interface {

View file

@ -393,16 +393,16 @@ func (p *mcuJanusPublisher) GetStreams(ctx context.Context) ([]PublisherStream,
return streams, nil
}
func getPublisherRemoteId(id string, hostname string) string {
return fmt.Sprintf("%s@%s", id, hostname)
func getPublisherRemoteId(id string, remoteId string) string {
return fmt.Sprintf("%s@%s", id, remoteId)
}
func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error {
func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error {
msg := map[string]interface{}{
"request": "publish_remotely",
"room": p.roomId,
"publisher_id": streamTypeUserIds[p.streamType],
"remote_id": getPublisherRemoteId(p.id, hostname),
"remote_id": getPublisherRemoteId(p.id, remoteId),
"host": hostname,
"port": port,
"rtcp_port": rtcpPort,
@ -430,16 +430,16 @@ func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, hostname string,
}
}
log.Printf("Publishing %s to %s (port=%d, rtcpPort=%d)", p.id, hostname, port, rtcpPort)
log.Printf("Publishing %s to %s (port=%d, rtcpPort=%d) for %s", p.id, hostname, port, rtcpPort, remoteId)
return nil
}
func (p *mcuJanusPublisher) UnpublishRemote(ctx context.Context, hostname string) error {
func (p *mcuJanusPublisher) UnpublishRemote(ctx context.Context, remoteId string) error {
msg := map[string]interface{}{
"request": "unpublish_remotely",
"room": p.roomId,
"publisher_id": streamTypeUserIds[p.streamType],
"remote_id": getPublisherRemoteId(p.id, hostname),
"remote_id": getPublisherRemoteId(p.id, remoteId),
}
response, err := p.handle.Request(ctx, msg)
if err != nil {
@ -464,6 +464,6 @@ func (p *mcuJanusPublisher) UnpublishRemote(ctx context.Context, hostname string
}
}
log.Printf("Unpublished %s to %s", p.id, hostname)
log.Printf("Unpublished remote %s for %s", p.id, remoteId)
return nil
}

View file

@ -221,11 +221,11 @@ func (p *mcuProxyPublisher) GetStreams(ctx context.Context) ([]PublisherStream,
return nil, errors.New("not implemented")
}
func (p *mcuProxyPublisher) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error {
func (p *mcuProxyPublisher) PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error {
return errors.New("remote publishing not supported for proxy publishers")
}
func (p *mcuProxyPublisher) UnpublishRemote(ctx context.Context, hostname string) error {
func (p *mcuProxyPublisher) UnpublishRemote(ctx context.Context, remoteId string) error {
return errors.New("remote publishing not supported for proxy publishers")
}

View file

@ -227,11 +227,11 @@ func (p *TestMCUPublisher) GetStreams(ctx context.Context) ([]PublisherStream, e
return nil, errors.New("not implemented")
}
func (p *TestMCUPublisher) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error {
func (p *TestMCUPublisher) PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error {
return errors.New("remote publishing not supported")
}
func (p *TestMCUPublisher) UnpublishRemote(ctx context.Context, hostname string) error {
func (p *TestMCUPublisher) UnpublishRemote(ctx context.Context, remoteId string) error {
return errors.New("remote publishing not supported")
}

View file

@ -48,6 +48,7 @@ import (
"github.com/gorilla/mux"
"github.com/gorilla/securecookie"
"github.com/gorilla/websocket"
"github.com/notedit/janus-go"
"github.com/prometheus/client_golang/prometheus/promhttp"
signaling "github.com/strukturag/nextcloud-spreed-signaling"
@ -972,10 +973,25 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s
return
}
if err := publisher.PublishRemote(ctx, cmd.Hostname, cmd.Port, cmd.RtcpPort); err != nil {
log.Printf("Error publishing %s %s to remote %s (port=%d, rtcpPort=%d): %s", publisher.StreamType(), cmd.ClientId, cmd.Hostname, cmd.Port, cmd.RtcpPort, err)
session.sendMessage(message.NewWrappedErrorServerMessage(err))
return
if err := publisher.PublishRemote(ctx, session.PublicId(), cmd.Hostname, cmd.Port, cmd.RtcpPort); err != nil {
var je *janus.ErrorMsg
if !errors.As(err, &je) || je.Err.Code != signaling.JANUS_VIDEOROOM_ERROR_ID_EXISTS {
log.Printf("Error publishing %s %s to remote %s (port=%d, rtcpPort=%d): %s", publisher.StreamType(), cmd.ClientId, cmd.Hostname, cmd.Port, cmd.RtcpPort, err)
session.sendMessage(message.NewWrappedErrorServerMessage(err))
return
}
if err := publisher.UnpublishRemote(ctx, session.PublicId()); err != nil {
log.Printf("Error unpublishing old %s %s to remote %s (port=%d, rtcpPort=%d): %s", publisher.StreamType(), cmd.ClientId, cmd.Hostname, cmd.Port, cmd.RtcpPort, err)
session.sendMessage(message.NewWrappedErrorServerMessage(err))
return
}
if err := publisher.PublishRemote(ctx, session.PublicId(), cmd.Hostname, cmd.Port, cmd.RtcpPort); err != nil {
log.Printf("Error publishing %s %s to remote %s (port=%d, rtcpPort=%d): %s", publisher.StreamType(), cmd.ClientId, cmd.Hostname, cmd.Port, cmd.RtcpPort, err)
session.sendMessage(message.NewWrappedErrorServerMessage(err))
return
}
}
response := &signaling.ProxyServerMessage{