diff --git a/mcu_common.go b/mcu_common.go index 5da6807..8fbca2b 100644 --- a/mcu_common.go +++ b/mcu_common.go @@ -159,8 +159,8 @@ type McuPublisher interface { SetMedia(MediaType) GetStreams(ctx context.Context) ([]PublisherStream, error) - PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error - UnpublishRemote(ctx context.Context, hostname string) error + PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error + UnpublishRemote(ctx context.Context, remoteId string) error } type McuSubscriber interface { diff --git a/mcu_janus_publisher.go b/mcu_janus_publisher.go index 75c0919..b003727 100644 --- a/mcu_janus_publisher.go +++ b/mcu_janus_publisher.go @@ -381,16 +381,16 @@ func (p *mcuJanusPublisher) GetStreams(ctx context.Context) ([]PublisherStream, return streams, nil } -func getPublisherRemoteId(id string, hostname string) string { - return fmt.Sprintf("%s@%s", id, hostname) +func getPublisherRemoteId(id string, remoteId string) string { + return fmt.Sprintf("%s@%s", id, remoteId) } -func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error { +func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error { msg := map[string]interface{}{ "request": "publish_remotely", "room": p.roomId, "publisher_id": streamTypeUserIds[p.streamType], - "remote_id": getPublisherRemoteId(p.id, hostname), + "remote_id": getPublisherRemoteId(p.id, remoteId), "host": hostname, "port": port, "rtcp_port": rtcpPort, @@ -418,16 +418,16 @@ func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, hostname string, } } - log.Printf("Publishing %s to %s (port=%d, rtcpPort=%d)", p.id, hostname, port, rtcpPort) + log.Printf("Publishing %s to %s (port=%d, rtcpPort=%d) for %s", p.id, hostname, port, rtcpPort, remoteId) return nil } -func (p *mcuJanusPublisher) UnpublishRemote(ctx context.Context, hostname string) error { +func (p *mcuJanusPublisher) UnpublishRemote(ctx context.Context, remoteId string) error { msg := map[string]interface{}{ "request": "unpublish_remotely", "room": p.roomId, "publisher_id": streamTypeUserIds[p.streamType], - "remote_id": getPublisherRemoteId(p.id, hostname), + "remote_id": getPublisherRemoteId(p.id, remoteId), } response, err := p.handle.Request(ctx, msg) if err != nil { @@ -452,6 +452,6 @@ func (p *mcuJanusPublisher) UnpublishRemote(ctx context.Context, hostname string } } - log.Printf("Unpublished %s to %s", p.id, hostname) + log.Printf("Unpublished remote %s for %s", p.id, remoteId) return nil } diff --git a/mcu_proxy.go b/mcu_proxy.go index e0139a4..1ff805b 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -222,11 +222,11 @@ func (p *mcuProxyPublisher) GetStreams(ctx context.Context) ([]PublisherStream, return nil, errors.New("not implemented") } -func (p *mcuProxyPublisher) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error { +func (p *mcuProxyPublisher) PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error { return errors.New("remote publishing not supported for proxy publishers") } -func (p *mcuProxyPublisher) UnpublishRemote(ctx context.Context, hostname string) error { +func (p *mcuProxyPublisher) UnpublishRemote(ctx context.Context, remoteId string) error { return errors.New("remote publishing not supported for proxy publishers") } diff --git a/mcu_test.go b/mcu_test.go index 8fc1144..a1ee9cc 100644 --- a/mcu_test.go +++ b/mcu_test.go @@ -227,11 +227,11 @@ func (p *TestMCUPublisher) GetStreams(ctx context.Context) ([]PublisherStream, e return nil, errors.New("not implemented") } -func (p *TestMCUPublisher) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error { +func (p *TestMCUPublisher) PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error { return errors.New("remote publishing not supported") } -func (p *TestMCUPublisher) UnpublishRemote(ctx context.Context, hostname string) error { +func (p *TestMCUPublisher) UnpublishRemote(ctx context.Context, remoteId string) error { return errors.New("remote publishing not supported") } diff --git a/proxy/proxy_server.go b/proxy/proxy_server.go index 10129c4..a076f13 100644 --- a/proxy/proxy_server.go +++ b/proxy/proxy_server.go @@ -48,6 +48,7 @@ import ( "github.com/gorilla/mux" "github.com/gorilla/securecookie" "github.com/gorilla/websocket" + "github.com/notedit/janus-go" "github.com/prometheus/client_golang/prometheus/promhttp" signaling "github.com/strukturag/nextcloud-spreed-signaling" @@ -984,10 +985,25 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s return } - if err := publisher.PublishRemote(ctx, cmd.Hostname, cmd.Port, cmd.RtcpPort); err != nil { - log.Printf("Error publishing %s %s to remote %s (port=%d, rtcpPort=%d): %s", publisher.StreamType(), cmd.ClientId, cmd.Hostname, cmd.Port, cmd.RtcpPort, err) - session.sendMessage(message.NewWrappedErrorServerMessage(err)) - return + if err := publisher.PublishRemote(ctx, session.PublicId(), cmd.Hostname, cmd.Port, cmd.RtcpPort); err != nil { + var je *janus.ErrorMsg + if !errors.As(err, &je) || je.Err.Code != signaling.JANUS_VIDEOROOM_ERROR_ID_EXISTS { + log.Printf("Error publishing %s %s to remote %s (port=%d, rtcpPort=%d): %s", publisher.StreamType(), cmd.ClientId, cmd.Hostname, cmd.Port, cmd.RtcpPort, err) + session.sendMessage(message.NewWrappedErrorServerMessage(err)) + return + } + + if err := publisher.UnpublishRemote(ctx, session.PublicId()); err != nil { + log.Printf("Error unpublishing old %s %s to remote %s (port=%d, rtcpPort=%d): %s", publisher.StreamType(), cmd.ClientId, cmd.Hostname, cmd.Port, cmd.RtcpPort, err) + session.sendMessage(message.NewWrappedErrorServerMessage(err)) + return + } + + if err := publisher.PublishRemote(ctx, session.PublicId(), cmd.Hostname, cmd.Port, cmd.RtcpPort); err != nil { + log.Printf("Error publishing %s %s to remote %s (port=%d, rtcpPort=%d): %s", publisher.StreamType(), cmd.ClientId, cmd.Hostname, cmd.Port, cmd.RtcpPort, err) + session.sendMessage(message.NewWrappedErrorServerMessage(err)) + return + } } response := &signaling.ProxyServerMessage{