diff --git a/mcu_janus.go b/mcu_janus.go index e056f24..487bf42 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -1033,69 +1033,6 @@ func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, hostname string, return nil } -type mcuJanusRemotePublisher struct { - mcuJanusClient - - ref atomic.Int64 - publisher string - port int - rtcpPort int -} - -func (p *mcuJanusRemotePublisher) addRef() int64 { - return p.ref.Add(1) -} - -func (p *mcuJanusRemotePublisher) release() bool { - return p.ref.Add(-1) == 0 -} - -func (p *mcuJanusRemotePublisher) Port() int { - return p.port -} - -func (p *mcuJanusRemotePublisher) RtcpPort() int { - return p.rtcpPort -} - -func (p *mcuJanusRemotePublisher) Close(ctx context.Context) { - if !p.release() { - return - } - - p.mu.Lock() - if handle := p.handle; handle != nil { - response, err := p.handle.Request(ctx, map[string]interface{}{ - "request": "remove_remote_publisher", - "room": p.roomId, - "id": streamTypeUserIds[p.streamType], - }) - if err != nil { - log.Printf("Error removing remote publisher %d in room %d: %s", p.id, p.roomId, err) - } else { - log.Printf("Removed remote publisher: %+v", response) - } - if p.roomId != 0 { - destroy_msg := map[string]interface{}{ - "request": "destroy", - "room": p.roomId, - } - if _, err := handle.Request(ctx, destroy_msg); err != nil { - log.Printf("Error destroying room %d: %s", p.roomId, err) - } else { - log.Printf("Room %d destroyed", p.roomId) - } - p.mcu.mu.Lock() - delete(p.mcu.remotePublishers, getStreamId(p.publisher, p.streamType)) - p.mcu.mu.Unlock() - p.roomId = 0 - } - } - - p.closeClient(ctx) - p.mu.Unlock() -} - type mcuJanusSubscriber struct { mcuJanusClient @@ -1191,20 +1128,6 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ return client, nil } -type mcuJanusRemoteSubscriber struct { - mcuJanusSubscriber - - remote atomic.Pointer[mcuJanusRemotePublisher] -} - -func (s *mcuJanusRemoteSubscriber) Close(ctx context.Context) { - s.mcuJanusSubscriber.Close(ctx) - - if remote := s.remote.Swap(nil); remote != nil { - remote.Close(context.Background()) - } -} - func (m *mcuJanus) getOrCreateRemotePublisher(ctx context.Context, controller RemotePublisherController, streamType StreamType, bitrate int) (*mcuJanusRemotePublisher, error) { m.mu.Lock() defer m.mu.Unlock() @@ -1269,26 +1192,35 @@ func (m *mcuJanus) getOrCreateRemotePublisher(ctx context.Context, controller Re rtcp_port := getPluginIntValue(response.PluginData, pluginVideoRoom, "rtcp_port") pub = &mcuJanusRemotePublisher{ - mcuJanusClient: mcuJanusClient{ - mcu: m, + mcuJanusPublisher: mcuJanusPublisher{ + mcuJanusClient: mcuJanusClient{ + mcu: m, - id: id, - session: response.Session, - roomId: roomId, - sid: strconv.FormatUint(handle.Id, 10), - streamType: streamType, - maxBitrate: bitrate, + id: id, + session: response.Session, + roomId: roomId, + sid: strconv.FormatUint(handle.Id, 10), + streamType: streamType, + maxBitrate: bitrate, - handle: handle, - handleId: handle.Id, - closeChan: make(chan struct{}, 1), - deferred: make(chan func(), 64), + handle: handle, + handleId: handle.Id, + closeChan: make(chan struct{}, 1), + deferred: make(chan func(), 64), + }, + + id: controller.PublisherId(), }, - publisher: controller.PublisherId(), - port: int(port), - rtcpPort: int(rtcp_port), + port: int(port), + rtcpPort: int(rtcp_port), } + pub.mcuJanusClient.handleEvent = pub.handleEvent + pub.mcuJanusClient.handleHangup = pub.handleHangup + pub.mcuJanusClient.handleDetached = pub.handleDetached + pub.mcuJanusClient.handleConnected = pub.handleConnected + pub.mcuJanusClient.handleSlowLink = pub.handleSlowLink + pub.mcuJanusClient.handleMedia = pub.handleMedia if err := controller.StartPublishing(ctx, pub); err != nil { go pub.Close(context.Background()) @@ -1334,7 +1266,7 @@ func (m *mcuJanus) NewRemoteSubscriber(ctx context.Context, listener McuListener return nil, err } - log.Printf("Attached subscriber to room %d of publisher %s in plugin %s in session %d as %d", pub.roomId, pub.publisher, pluginVideoRoom, session.Id, handle.Id) + log.Printf("Attached subscriber to room %d of publisher %s in plugin %s in session %d as %d", pub.roomId, pub.id, pluginVideoRoom, session.Id, handle.Id) client := &mcuJanusRemoteSubscriber{ mcuJanusSubscriber: mcuJanusSubscriber{ @@ -1353,7 +1285,7 @@ func (m *mcuJanus) NewRemoteSubscriber(ctx context.Context, listener McuListener closeChan: make(chan struct{}, 1), deferred: make(chan func(), 64), }, - publisher: pub.publisher, + publisher: pub.id, }, } client.remote.Store(pub) diff --git a/mcu_janus_remote_publisher.go b/mcu_janus_remote_publisher.go new file mode 100644 index 0000000..47593b0 --- /dev/null +++ b/mcu_janus_remote_publisher.go @@ -0,0 +1,150 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2024 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "context" + "log" + "sync/atomic" + + "github.com/notedit/janus-go" +) + +type mcuJanusRemotePublisher struct { + mcuJanusPublisher + + ref atomic.Int64 + + port int + rtcpPort int +} + +func (p *mcuJanusRemotePublisher) addRef() int64 { + return p.ref.Add(1) +} + +func (p *mcuJanusRemotePublisher) release() bool { + return p.ref.Add(-1) == 0 +} + +func (p *mcuJanusRemotePublisher) Port() int { + return p.port +} + +func (p *mcuJanusRemotePublisher) RtcpPort() int { + return p.rtcpPort +} + +func (p *mcuJanusRemotePublisher) handleEvent(event *janus.EventMsg) { + if videoroom := getPluginStringValue(event.Plugindata, pluginVideoRoom, "videoroom"); videoroom != "" { + ctx := context.TODO() + switch videoroom { + case "destroyed": + log.Printf("Remote publisher %d: associated room has been destroyed, closing", p.handleId) + go p.Close(ctx) + case "slow_link": + // Ignore, processed through "handleSlowLink" in the general events. + default: + log.Printf("Unsupported videoroom remote publisher event in %d: %+v", p.handleId, event) + } + } else { + log.Printf("Unsupported remote publisher event in %d: %+v", p.handleId, event) + } +} + +func (p *mcuJanusRemotePublisher) handleHangup(event *janus.HangupMsg) { + log.Printf("Remote publisher %d received hangup (%s), closing", p.handleId, event.Reason) + go p.Close(context.Background()) +} + +func (p *mcuJanusRemotePublisher) handleDetached(event *janus.DetachedMsg) { + log.Printf("Remote publisher %d received detached, closing", p.handleId) + go p.Close(context.Background()) +} + +func (p *mcuJanusRemotePublisher) handleConnected(event *janus.WebRTCUpMsg) { + log.Printf("Remote publisher %d received connected", p.handleId) + p.mcu.publisherConnected.Notify(getStreamId(p.id, p.streamType)) +} + +func (p *mcuJanusRemotePublisher) handleSlowLink(event *janus.SlowLinkMsg) { + if event.Uplink { + log.Printf("Remote publisher %s (%d) is reporting %d lost packets on the uplink (Janus -> client)", p.listener.PublicId(), p.handleId, event.Lost) + } else { + log.Printf("Remote publisher %s (%d) is reporting %d lost packets on the downlink (client -> Janus)", p.listener.PublicId(), p.handleId, event.Lost) + } +} + +func (p *mcuJanusRemotePublisher) NotifyReconnected() { + ctx := context.TODO() + handle, session, roomId, _, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate) + if err != nil { + log.Printf("Could not reconnect remote publisher %s: %s", p.id, err) + // TODO(jojo): Retry + return + } + + p.handle = handle + p.handleId = handle.Id + p.session = session + p.roomId = roomId + + log.Printf("Remote publisher %s reconnected on handle %d", p.id, p.handleId) +} + +func (p *mcuJanusRemotePublisher) Close(ctx context.Context) { + if !p.release() { + return + } + + p.mu.Lock() + if handle := p.handle; handle != nil { + response, err := p.handle.Request(ctx, map[string]interface{}{ + "request": "remove_remote_publisher", + "room": p.roomId, + "id": streamTypeUserIds[p.streamType], + }) + if err != nil { + log.Printf("Error removing remote publisher %s in room %d: %s", p.id, p.roomId, err) + } else { + log.Printf("Removed remote publisher: %+v", response) + } + if p.roomId != 0 { + destroy_msg := map[string]interface{}{ + "request": "destroy", + "room": p.roomId, + } + if _, err := handle.Request(ctx, destroy_msg); err != nil { + log.Printf("Error destroying room %d: %s", p.roomId, err) + } else { + log.Printf("Room %d destroyed", p.roomId) + } + p.mcu.mu.Lock() + delete(p.mcu.remotePublishers, getStreamId(p.id, p.streamType)) + p.mcu.mu.Unlock() + p.roomId = 0 + } + } + + p.closeClient(ctx) + p.mu.Unlock() +} diff --git a/mcu_janus_remote_subscriber.go b/mcu_janus_remote_subscriber.go new file mode 100644 index 0000000..0900416 --- /dev/null +++ b/mcu_janus_remote_subscriber.go @@ -0,0 +1,115 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2024 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "context" + "log" + "strconv" + "sync/atomic" + + "github.com/notedit/janus-go" +) + +type mcuJanusRemoteSubscriber struct { + mcuJanusSubscriber + + remote atomic.Pointer[mcuJanusRemotePublisher] +} + +func (p *mcuJanusRemoteSubscriber) handleEvent(event *janus.EventMsg) { + if videoroom := getPluginStringValue(event.Plugindata, pluginVideoRoom, "videoroom"); videoroom != "" { + ctx := context.TODO() + switch videoroom { + case "destroyed": + log.Printf("Remote subscriber %d: associated room has been destroyed, closing", p.handleId) + go p.Close(ctx) + case "event": + // Handle renegotiations, but ignore other events like selected + // substream / temporal layer. + if getPluginStringValue(event.Plugindata, pluginVideoRoom, "configured") == "ok" && + event.Jsep != nil && event.Jsep["type"] == "offer" && event.Jsep["sdp"] != nil { + p.listener.OnUpdateOffer(p, event.Jsep) + } + case "slow_link": + // Ignore, processed through "handleSlowLink" in the general events. + default: + log.Printf("Unsupported videoroom event %s for remote subscriber %d: %+v", videoroom, p.handleId, event) + } + } else { + log.Printf("Unsupported event for remote subscriber %d: %+v", p.handleId, event) + } +} + +func (p *mcuJanusRemoteSubscriber) handleHangup(event *janus.HangupMsg) { + log.Printf("Remote subscriber %d received hangup (%s), closing", p.handleId, event.Reason) + go p.Close(context.Background()) +} + +func (p *mcuJanusRemoteSubscriber) handleDetached(event *janus.DetachedMsg) { + log.Printf("Remote subscriber %d received detached, closing", p.handleId) + go p.Close(context.Background()) +} + +func (p *mcuJanusRemoteSubscriber) handleConnected(event *janus.WebRTCUpMsg) { + log.Printf("Remote subscriber %d received connected", p.handleId) + p.mcu.SubscriberConnected(p.Id(), p.publisher, p.streamType) +} + +func (p *mcuJanusRemoteSubscriber) handleSlowLink(event *janus.SlowLinkMsg) { + if event.Uplink { + log.Printf("Remote subscriber %s (%d) is reporting %d lost packets on the uplink (Janus -> client)", p.listener.PublicId(), p.handleId, event.Lost) + } else { + log.Printf("Remote subscriber %s (%d) is reporting %d lost packets on the downlink (client -> Janus)", p.listener.PublicId(), p.handleId, event.Lost) + } +} + +func (p *mcuJanusRemoteSubscriber) handleMedia(event *janus.MediaMsg) { + // Only triggered for publishers +} + +func (p *mcuJanusRemoteSubscriber) NotifyReconnected() { + ctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) + defer cancel() + handle, pub, err := p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType) + if err != nil { + // TODO(jojo): Retry? + log.Printf("Could not reconnect remote subscriber for publisher %s: %s", p.publisher, err) + p.Close(context.Background()) + return + } + + p.handle = handle + p.handleId = handle.Id + p.roomId = pub.roomId + p.sid = strconv.FormatUint(handle.Id, 10) + p.listener.SubscriberSidUpdated(p) + log.Printf("Subscriber %d for publisher %s reconnected on handle %d", p.id, p.publisher, p.handleId) +} + +func (p *mcuJanusRemoteSubscriber) Close(ctx context.Context) { + p.mcuJanusSubscriber.Close(ctx) + + if remote := p.remote.Swap(nil); remote != nil { + remote.Close(context.Background()) + } +}