Add methods to unpublish remotely.

This commit is contained in:
Joachim Bauch 2024-04-23 15:54:02 +02:00
parent 343c4634a1
commit 7eae192256
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
4 changed files with 58 additions and 2 deletions

View file

@ -160,6 +160,7 @@ type McuPublisher interface {
GetStreams(ctx context.Context) ([]PublisherStream, error)
PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error
UnpublishRemote(ctx context.Context, hostname string) error
}
type McuSubscriber interface {

View file

@ -393,12 +393,16 @@ func (p *mcuJanusPublisher) GetStreams(ctx context.Context) ([]PublisherStream,
return streams, nil
}
func getPublisherRemoteId(id string, hostname string) string {
return fmt.Sprintf("%s@%s", id, hostname)
}
func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error {
msg := map[string]interface{}{
"request": "publish_remotely",
"room": p.roomId,
"publisher_id": streamTypeUserIds[p.streamType],
"remote_id": p.id,
"remote_id": getPublisherRemoteId(p.id, hostname),
"host": hostname,
"port": port,
"rtcp_port": rtcpPort,
@ -411,12 +415,55 @@ func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, hostname string,
errorMessage := getPluginStringValue(response.PluginData, pluginVideoRoom, "error")
errorCode := getPluginIntValue(response.PluginData, pluginVideoRoom, "error_code")
if errorMessage != "" || errorCode != 0 {
if errorCode == 0 {
errorCode = 500
}
if errorMessage == "" {
errorMessage = "unknown error"
}
return fmt.Errorf("%s (%d)", errorMessage, errorCode)
return &janus.ErrorMsg{
Err: janus.ErrorData{
Code: int(errorCode),
Reason: errorMessage,
},
}
}
log.Printf("Publishing %s to %s (port=%d, rtcpPort=%d)", p.id, hostname, port, rtcpPort)
return nil
}
func (p *mcuJanusPublisher) UnpublishRemote(ctx context.Context, hostname string) error {
msg := map[string]interface{}{
"request": "unpublish_remotely",
"room": p.roomId,
"publisher_id": streamTypeUserIds[p.streamType],
"remote_id": getPublisherRemoteId(p.id, hostname),
}
response, err := p.handle.Request(ctx, msg)
if err != nil {
return err
}
errorMessage := getPluginStringValue(response.PluginData, pluginVideoRoom, "error")
errorCode := getPluginIntValue(response.PluginData, pluginVideoRoom, "error_code")
if errorMessage != "" || errorCode != 0 {
if errorCode == 0 {
errorCode = 500
}
if errorMessage == "" {
errorMessage = "unknown error"
}
return &janus.ErrorMsg{
Err: janus.ErrorData{
Code: int(errorCode),
Reason: errorMessage,
},
}
}
log.Printf("Unpublished %s to %s", p.id, hostname)
return nil
}

View file

@ -225,6 +225,10 @@ func (p *mcuProxyPublisher) PublishRemote(ctx context.Context, hostname string,
return errors.New("remote publishing not supported for proxy publishers")
}
func (p *mcuProxyPublisher) UnpublishRemote(ctx context.Context, hostname string) error {
return errors.New("remote publishing not supported for proxy publishers")
}
type mcuProxySubscriber struct {
mcuProxyPubSubCommon

View file

@ -231,6 +231,10 @@ func (p *TestMCUPublisher) PublishRemote(ctx context.Context, hostname string, p
return errors.New("remote publishing not supported")
}
func (p *TestMCUPublisher) UnpublishRemote(ctx context.Context, hostname string) error {
return errors.New("remote publishing not supported")
}
type TestMCUSubscriber struct {
TestMCUClient