From 7eae1922563815a15a7cc88690ea970583f484fa Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 23 Apr 2024 15:54:02 +0200 Subject: [PATCH] Add methods to unpublish remotely. --- mcu_common.go | 1 + mcu_janus_publisher.go | 51 ++++++++++++++++++++++++++++++++++++++++-- mcu_proxy.go | 4 ++++ mcu_test.go | 4 ++++ 4 files changed, 58 insertions(+), 2 deletions(-) diff --git a/mcu_common.go b/mcu_common.go index 017f703..5da6807 100644 --- a/mcu_common.go +++ b/mcu_common.go @@ -160,6 +160,7 @@ type McuPublisher interface { GetStreams(ctx context.Context) ([]PublisherStream, error) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error + UnpublishRemote(ctx context.Context, hostname string) error } type McuSubscriber interface { diff --git a/mcu_janus_publisher.go b/mcu_janus_publisher.go index d897703..da074e1 100644 --- a/mcu_janus_publisher.go +++ b/mcu_janus_publisher.go @@ -393,12 +393,16 @@ func (p *mcuJanusPublisher) GetStreams(ctx context.Context) ([]PublisherStream, return streams, nil } +func getPublisherRemoteId(id string, hostname string) string { + return fmt.Sprintf("%s@%s", id, hostname) +} + func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error { msg := map[string]interface{}{ "request": "publish_remotely", "room": p.roomId, "publisher_id": streamTypeUserIds[p.streamType], - "remote_id": p.id, + "remote_id": getPublisherRemoteId(p.id, hostname), "host": hostname, "port": port, "rtcp_port": rtcpPort, @@ -411,12 +415,55 @@ func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, hostname string, errorMessage := getPluginStringValue(response.PluginData, pluginVideoRoom, "error") errorCode := getPluginIntValue(response.PluginData, pluginVideoRoom, "error_code") if errorMessage != "" || errorCode != 0 { + if errorCode == 0 { + errorCode = 500 + } if errorMessage == "" { errorMessage = "unknown error" } - return fmt.Errorf("%s (%d)", errorMessage, errorCode) + + return &janus.ErrorMsg{ + Err: janus.ErrorData{ + Code: int(errorCode), + Reason: errorMessage, + }, + } } log.Printf("Publishing %s to %s (port=%d, rtcpPort=%d)", p.id, hostname, port, rtcpPort) return nil } + +func (p *mcuJanusPublisher) UnpublishRemote(ctx context.Context, hostname string) error { + msg := map[string]interface{}{ + "request": "unpublish_remotely", + "room": p.roomId, + "publisher_id": streamTypeUserIds[p.streamType], + "remote_id": getPublisherRemoteId(p.id, hostname), + } + response, err := p.handle.Request(ctx, msg) + if err != nil { + return err + } + + errorMessage := getPluginStringValue(response.PluginData, pluginVideoRoom, "error") + errorCode := getPluginIntValue(response.PluginData, pluginVideoRoom, "error_code") + if errorMessage != "" || errorCode != 0 { + if errorCode == 0 { + errorCode = 500 + } + if errorMessage == "" { + errorMessage = "unknown error" + } + + return &janus.ErrorMsg{ + Err: janus.ErrorData{ + Code: int(errorCode), + Reason: errorMessage, + }, + } + } + + log.Printf("Unpublished %s to %s", p.id, hostname) + return nil +} diff --git a/mcu_proxy.go b/mcu_proxy.go index edf911d..0a0cebc 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -225,6 +225,10 @@ func (p *mcuProxyPublisher) PublishRemote(ctx context.Context, hostname string, return errors.New("remote publishing not supported for proxy publishers") } +func (p *mcuProxyPublisher) UnpublishRemote(ctx context.Context, hostname string) error { + return errors.New("remote publishing not supported for proxy publishers") +} + type mcuProxySubscriber struct { mcuProxyPubSubCommon diff --git a/mcu_test.go b/mcu_test.go index 4ab7fea..91da984 100644 --- a/mcu_test.go +++ b/mcu_test.go @@ -231,6 +231,10 @@ func (p *TestMCUPublisher) PublishRemote(ctx context.Context, hostname string, p return errors.New("remote publishing not supported") } +func (p *TestMCUPublisher) UnpublishRemote(ctx context.Context, hostname string) error { + return errors.New("remote publishing not supported") +} + type TestMCUSubscriber struct { TestMCUClient