mirror of
https://github.com/strukturag/nextcloud-spreed-signaling
synced 2024-06-02 22:12:12 +02:00
Move Janus classes to separate files, no functional changes.
This commit is contained in:
parent
ceb87bb728
commit
a0b438e4f8
807
mcu_janus.go
807
mcu_janus.go
|
@ -23,12 +23,10 @@ package signaling
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"reflect"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -445,272 +443,6 @@ func (m *mcuJanus) sendKeepalive() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type mcuJanusClient struct {
|
|
||||||
mcu *mcuJanus
|
|
||||||
listener McuListener
|
|
||||||
mu sync.Mutex // nolint
|
|
||||||
|
|
||||||
id uint64
|
|
||||||
session uint64
|
|
||||||
roomId uint64
|
|
||||||
sid string
|
|
||||||
streamType StreamType
|
|
||||||
maxBitrate int
|
|
||||||
|
|
||||||
handle *JanusHandle
|
|
||||||
handleId uint64
|
|
||||||
closeChan chan struct{}
|
|
||||||
deferred chan func()
|
|
||||||
|
|
||||||
handleEvent func(event *janus.EventMsg)
|
|
||||||
handleHangup func(event *janus.HangupMsg)
|
|
||||||
handleDetached func(event *janus.DetachedMsg)
|
|
||||||
handleConnected func(event *janus.WebRTCUpMsg)
|
|
||||||
handleSlowLink func(event *janus.SlowLinkMsg)
|
|
||||||
handleMedia func(event *janus.MediaMsg)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *mcuJanusClient) Id() string {
|
|
||||||
return strconv.FormatUint(c.id, 10)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *mcuJanusClient) Sid() string {
|
|
||||||
return c.sid
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *mcuJanusClient) StreamType() StreamType {
|
|
||||||
return c.streamType
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *mcuJanusClient) MaxBitrate() int {
|
|
||||||
return c.maxBitrate
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *mcuJanusClient) Close(ctx context.Context) {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *mcuJanusClient) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *mcuJanusClient) closeClient(ctx context.Context) bool {
|
|
||||||
if handle := c.handle; handle != nil {
|
|
||||||
c.handle = nil
|
|
||||||
close(c.closeChan)
|
|
||||||
if _, err := handle.Detach(ctx); err != nil {
|
|
||||||
if e, ok := err.(*janus.ErrorMsg); !ok || e.Err.Code != JANUS_ERROR_HANDLE_NOT_FOUND {
|
|
||||||
log.Println("Could not detach client", handle.Id, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *mcuJanusClient) run(handle *JanusHandle, closeChan <-chan struct{}) {
|
|
||||||
loop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case msg := <-handle.Events:
|
|
||||||
switch t := msg.(type) {
|
|
||||||
case *janus.EventMsg:
|
|
||||||
c.handleEvent(t)
|
|
||||||
case *janus.HangupMsg:
|
|
||||||
c.handleHangup(t)
|
|
||||||
case *janus.DetachedMsg:
|
|
||||||
c.handleDetached(t)
|
|
||||||
case *janus.MediaMsg:
|
|
||||||
c.handleMedia(t)
|
|
||||||
case *janus.WebRTCUpMsg:
|
|
||||||
c.handleConnected(t)
|
|
||||||
case *janus.SlowLinkMsg:
|
|
||||||
c.handleSlowLink(t)
|
|
||||||
case *TrickleMsg:
|
|
||||||
c.handleTrickle(t)
|
|
||||||
default:
|
|
||||||
log.Println("Received unsupported event type", msg, reflect.TypeOf(msg))
|
|
||||||
}
|
|
||||||
case f := <-c.deferred:
|
|
||||||
f()
|
|
||||||
case <-closeChan:
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *mcuJanusClient) sendOffer(ctx context.Context, offer map[string]interface{}, callback func(error, map[string]interface{})) {
|
|
||||||
handle := c.handle
|
|
||||||
if handle == nil {
|
|
||||||
callback(ErrNotConnected, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
configure_msg := map[string]interface{}{
|
|
||||||
"request": "configure",
|
|
||||||
"audio": true,
|
|
||||||
"video": true,
|
|
||||||
"data": true,
|
|
||||||
}
|
|
||||||
answer_msg, err := handle.Message(ctx, configure_msg, offer)
|
|
||||||
if err != nil {
|
|
||||||
callback(err, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
callback(nil, answer_msg.Jsep)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *mcuJanusClient) sendAnswer(ctx context.Context, answer map[string]interface{}, callback func(error, map[string]interface{})) {
|
|
||||||
handle := c.handle
|
|
||||||
if handle == nil {
|
|
||||||
callback(ErrNotConnected, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
start_msg := map[string]interface{}{
|
|
||||||
"request": "start",
|
|
||||||
"room": c.roomId,
|
|
||||||
}
|
|
||||||
start_response, err := handle.Message(ctx, start_msg, answer)
|
|
||||||
if err != nil {
|
|
||||||
callback(err, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.Println("Started listener", start_response)
|
|
||||||
callback(nil, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *mcuJanusClient) sendCandidate(ctx context.Context, candidate interface{}, callback func(error, map[string]interface{})) {
|
|
||||||
handle := c.handle
|
|
||||||
if handle == nil {
|
|
||||||
callback(ErrNotConnected, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := handle.Trickle(ctx, candidate); err != nil {
|
|
||||||
callback(err, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
callback(nil, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *mcuJanusClient) handleTrickle(event *TrickleMsg) {
|
|
||||||
if event.Candidate.Completed {
|
|
||||||
c.listener.OnIceCompleted(c)
|
|
||||||
} else {
|
|
||||||
c.listener.OnIceCandidate(c, event.Candidate)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *mcuJanusClient) selectStream(ctx context.Context, stream *streamSelection, callback func(error, map[string]interface{})) {
|
|
||||||
handle := c.handle
|
|
||||||
if handle == nil {
|
|
||||||
callback(ErrNotConnected, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if stream == nil || !stream.HasValues() {
|
|
||||||
callback(nil, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
configure_msg := map[string]interface{}{
|
|
||||||
"request": "configure",
|
|
||||||
}
|
|
||||||
if stream != nil {
|
|
||||||
stream.AddToMessage(configure_msg)
|
|
||||||
}
|
|
||||||
_, err := handle.Message(ctx, configure_msg, nil)
|
|
||||||
if err != nil {
|
|
||||||
callback(err, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
callback(nil, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
type publisherStatsCounter struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
|
|
||||||
streamTypes map[StreamType]bool
|
|
||||||
subscribers map[string]bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *publisherStatsCounter) Reset() {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
|
|
||||||
count := len(c.subscribers)
|
|
||||||
for streamType := range c.streamTypes {
|
|
||||||
statsMcuPublisherStreamTypesCurrent.WithLabelValues(string(streamType)).Dec()
|
|
||||||
statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Sub(float64(count))
|
|
||||||
}
|
|
||||||
c.streamTypes = nil
|
|
||||||
c.subscribers = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *publisherStatsCounter) EnableStream(streamType StreamType, enable bool) {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
|
|
||||||
if enable == c.streamTypes[streamType] {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if enable {
|
|
||||||
if c.streamTypes == nil {
|
|
||||||
c.streamTypes = make(map[StreamType]bool)
|
|
||||||
}
|
|
||||||
c.streamTypes[streamType] = true
|
|
||||||
statsMcuPublisherStreamTypesCurrent.WithLabelValues(string(streamType)).Inc()
|
|
||||||
statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Add(float64(len(c.subscribers)))
|
|
||||||
} else {
|
|
||||||
delete(c.streamTypes, streamType)
|
|
||||||
statsMcuPublisherStreamTypesCurrent.WithLabelValues(string(streamType)).Dec()
|
|
||||||
statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Sub(float64(len(c.subscribers)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *publisherStatsCounter) AddSubscriber(id string) {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
|
|
||||||
if c.subscribers[id] {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.subscribers == nil {
|
|
||||||
c.subscribers = make(map[string]bool)
|
|
||||||
}
|
|
||||||
c.subscribers[id] = true
|
|
||||||
for streamType := range c.streamTypes {
|
|
||||||
statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Inc()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *publisherStatsCounter) RemoveSubscriber(id string) {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
|
|
||||||
if !c.subscribers[id] {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
delete(c.subscribers, id)
|
|
||||||
for streamType := range c.streamTypes {
|
|
||||||
statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Dec()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type mcuJanusPublisher struct {
|
|
||||||
mcuJanusClient
|
|
||||||
|
|
||||||
id string
|
|
||||||
bitrate int
|
|
||||||
mediaTypes MediaType
|
|
||||||
stats publisherStatsCounter
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mcuJanus) SubscriberConnected(id string, publisher string, streamType StreamType) {
|
func (m *mcuJanus) SubscriberConnected(id string, publisher string, streamType StreamType) {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
|
@ -867,178 +599,6 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
|
||||||
return client, nil
|
return client, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *mcuJanusPublisher) handleEvent(event *janus.EventMsg) {
|
|
||||||
if videoroom := getPluginStringValue(event.Plugindata, pluginVideoRoom, "videoroom"); videoroom != "" {
|
|
||||||
ctx := context.TODO()
|
|
||||||
switch videoroom {
|
|
||||||
case "destroyed":
|
|
||||||
log.Printf("Publisher %d: associated room has been destroyed, closing", p.handleId)
|
|
||||||
go p.Close(ctx)
|
|
||||||
case "slow_link":
|
|
||||||
// Ignore, processed through "handleSlowLink" in the general events.
|
|
||||||
default:
|
|
||||||
log.Printf("Unsupported videoroom publisher event in %d: %+v", p.handleId, event)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Printf("Unsupported publisher event in %d: %+v", p.handleId, event)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusPublisher) handleHangup(event *janus.HangupMsg) {
|
|
||||||
log.Printf("Publisher %d received hangup (%s), closing", p.handleId, event.Reason)
|
|
||||||
go p.Close(context.Background())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusPublisher) handleDetached(event *janus.DetachedMsg) {
|
|
||||||
log.Printf("Publisher %d received detached, closing", p.handleId)
|
|
||||||
go p.Close(context.Background())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusPublisher) handleConnected(event *janus.WebRTCUpMsg) {
|
|
||||||
log.Printf("Publisher %d received connected", p.handleId)
|
|
||||||
p.mcu.publisherConnected.Notify(getStreamId(p.id, p.streamType))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusPublisher) handleSlowLink(event *janus.SlowLinkMsg) {
|
|
||||||
if event.Uplink {
|
|
||||||
log.Printf("Publisher %s (%d) is reporting %d lost packets on the uplink (Janus -> client)", p.listener.PublicId(), p.handleId, event.Lost)
|
|
||||||
} else {
|
|
||||||
log.Printf("Publisher %s (%d) is reporting %d lost packets on the downlink (client -> Janus)", p.listener.PublicId(), p.handleId, event.Lost)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusPublisher) handleMedia(event *janus.MediaMsg) {
|
|
||||||
mediaType := StreamType(event.Type)
|
|
||||||
if mediaType == StreamTypeVideo && p.streamType == StreamTypeScreen {
|
|
||||||
// We want to differentiate between audio, video and screensharing
|
|
||||||
mediaType = p.streamType
|
|
||||||
}
|
|
||||||
|
|
||||||
p.stats.EnableStream(mediaType, event.Receiving)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusPublisher) HasMedia(mt MediaType) bool {
|
|
||||||
return (p.mediaTypes & mt) == mt
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusPublisher) SetMedia(mt MediaType) {
|
|
||||||
p.mediaTypes = mt
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusPublisher) NotifyReconnected() {
|
|
||||||
ctx := context.TODO()
|
|
||||||
handle, session, roomId, _, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Could not reconnect publisher %s: %s", p.id, err)
|
|
||||||
// TODO(jojo): Retry
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
p.handle = handle
|
|
||||||
p.handleId = handle.Id
|
|
||||||
p.session = session
|
|
||||||
p.roomId = roomId
|
|
||||||
|
|
||||||
log.Printf("Publisher %s reconnected on handle %d", p.id, p.handleId)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusPublisher) Close(ctx context.Context) {
|
|
||||||
notify := false
|
|
||||||
p.mu.Lock()
|
|
||||||
if handle := p.handle; handle != nil && p.roomId != 0 {
|
|
||||||
destroy_msg := map[string]interface{}{
|
|
||||||
"request": "destroy",
|
|
||||||
"room": p.roomId,
|
|
||||||
}
|
|
||||||
if _, err := handle.Request(ctx, destroy_msg); err != nil {
|
|
||||||
log.Printf("Error destroying room %d: %s", p.roomId, err)
|
|
||||||
} else {
|
|
||||||
log.Printf("Room %d destroyed", p.roomId)
|
|
||||||
}
|
|
||||||
p.mcu.mu.Lock()
|
|
||||||
delete(p.mcu.publishers, getStreamId(p.id, p.streamType))
|
|
||||||
p.mcu.mu.Unlock()
|
|
||||||
p.roomId = 0
|
|
||||||
notify = true
|
|
||||||
}
|
|
||||||
p.closeClient(ctx)
|
|
||||||
p.mu.Unlock()
|
|
||||||
|
|
||||||
p.stats.Reset()
|
|
||||||
|
|
||||||
if notify {
|
|
||||||
statsPublishersCurrent.WithLabelValues(string(p.streamType)).Dec()
|
|
||||||
p.mcu.unregisterClient(p)
|
|
||||||
p.listener.PublisherClosed(p)
|
|
||||||
}
|
|
||||||
p.mcuJanusClient.Close(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) {
|
|
||||||
statsMcuMessagesTotal.WithLabelValues(data.Type).Inc()
|
|
||||||
jsep_msg := data.Payload
|
|
||||||
switch data.Type {
|
|
||||||
case "offer":
|
|
||||||
p.deferred <- func() {
|
|
||||||
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// TODO Tear down previous publisher and get a new one if sid does
|
|
||||||
// not match?
|
|
||||||
p.sendOffer(msgctx, jsep_msg, callback)
|
|
||||||
}
|
|
||||||
case "candidate":
|
|
||||||
p.deferred <- func() {
|
|
||||||
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
if data.Sid == "" || data.Sid == p.Sid() {
|
|
||||||
p.sendCandidate(msgctx, jsep_msg["candidate"], callback)
|
|
||||||
} else {
|
|
||||||
go callback(fmt.Errorf("Candidate message sid (%s) does not match publisher sid (%s)", data.Sid, p.Sid()), nil)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case "endOfCandidates":
|
|
||||||
// Ignore
|
|
||||||
default:
|
|
||||||
go callback(fmt.Errorf("Unsupported message type: %s", data.Type), nil)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error {
|
|
||||||
msg := map[string]interface{}{
|
|
||||||
"request": "publish_remotely",
|
|
||||||
"room": p.roomId,
|
|
||||||
"publisher_id": streamTypeUserIds[p.streamType],
|
|
||||||
"remote_id": p.id,
|
|
||||||
"host": hostname,
|
|
||||||
"port": port,
|
|
||||||
"rtcp_port": rtcpPort,
|
|
||||||
}
|
|
||||||
response, err := p.handle.Request(ctx, msg)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
errorMessage := getPluginStringValue(response.PluginData, pluginVideoRoom, "error")
|
|
||||||
errorCode := getPluginIntValue(response.PluginData, pluginVideoRoom, "error_code")
|
|
||||||
if errorMessage != "" || errorCode != 0 {
|
|
||||||
if errorMessage == "" {
|
|
||||||
errorMessage = "unknown error"
|
|
||||||
}
|
|
||||||
return fmt.Errorf("%s (%d)", errorMessage, errorCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("Publishing %s to %s (port=%d, rtcpPort=%d)", p.id, hostname, port, rtcpPort)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type mcuJanusSubscriber struct {
|
|
||||||
mcuJanusClient
|
|
||||||
|
|
||||||
publisher string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mcuJanus) getPublisher(ctx context.Context, publisher string, streamType StreamType) (*mcuJanusPublisher, error) {
|
func (m *mcuJanus) getPublisher(ctx context.Context, publisher string, streamType StreamType) (*mcuJanusPublisher, error) {
|
||||||
// Do the direct check immediately as this should be the normal case.
|
// Do the direct check immediately as this should be the normal case.
|
||||||
key := getStreamId(publisher, streamType)
|
key := getStreamId(publisher, streamType)
|
||||||
|
@ -1302,370 +862,3 @@ func (m *mcuJanus) NewRemoteSubscriber(ctx context.Context, listener McuListener
|
||||||
statsSubscribersTotal.WithLabelValues(string(publisher.StreamType())).Inc()
|
statsSubscribersTotal.WithLabelValues(string(publisher.StreamType())).Inc()
|
||||||
return client, nil
|
return client, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *mcuJanusSubscriber) Publisher() string {
|
|
||||||
return p.publisher
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusSubscriber) handleEvent(event *janus.EventMsg) {
|
|
||||||
if videoroom := getPluginStringValue(event.Plugindata, pluginVideoRoom, "videoroom"); videoroom != "" {
|
|
||||||
ctx := context.TODO()
|
|
||||||
switch videoroom {
|
|
||||||
case "destroyed":
|
|
||||||
log.Printf("Subscriber %d: associated room has been destroyed, closing", p.handleId)
|
|
||||||
go p.Close(ctx)
|
|
||||||
case "event":
|
|
||||||
// Handle renegotiations, but ignore other events like selected
|
|
||||||
// substream / temporal layer.
|
|
||||||
if getPluginStringValue(event.Plugindata, pluginVideoRoom, "configured") == "ok" &&
|
|
||||||
event.Jsep != nil && event.Jsep["type"] == "offer" && event.Jsep["sdp"] != nil {
|
|
||||||
p.listener.OnUpdateOffer(p, event.Jsep)
|
|
||||||
}
|
|
||||||
case "slow_link":
|
|
||||||
// Ignore, processed through "handleSlowLink" in the general events.
|
|
||||||
default:
|
|
||||||
log.Printf("Unsupported videoroom event %s for subscriber %d: %+v", videoroom, p.handleId, event)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Printf("Unsupported event for subscriber %d: %+v", p.handleId, event)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusSubscriber) handleHangup(event *janus.HangupMsg) {
|
|
||||||
log.Printf("Subscriber %d received hangup (%s), closing", p.handleId, event.Reason)
|
|
||||||
go p.Close(context.Background())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusSubscriber) handleDetached(event *janus.DetachedMsg) {
|
|
||||||
log.Printf("Subscriber %d received detached, closing", p.handleId)
|
|
||||||
go p.Close(context.Background())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusSubscriber) handleConnected(event *janus.WebRTCUpMsg) {
|
|
||||||
log.Printf("Subscriber %d received connected", p.handleId)
|
|
||||||
p.mcu.SubscriberConnected(p.Id(), p.publisher, p.streamType)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusSubscriber) handleSlowLink(event *janus.SlowLinkMsg) {
|
|
||||||
if event.Uplink {
|
|
||||||
log.Printf("Subscriber %s (%d) is reporting %d lost packets on the uplink (Janus -> client)", p.listener.PublicId(), p.handleId, event.Lost)
|
|
||||||
} else {
|
|
||||||
log.Printf("Subscriber %s (%d) is reporting %d lost packets on the downlink (client -> Janus)", p.listener.PublicId(), p.handleId, event.Lost)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusSubscriber) handleMedia(event *janus.MediaMsg) {
|
|
||||||
// Only triggered for publishers
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusSubscriber) NotifyReconnected() {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
|
|
||||||
defer cancel()
|
|
||||||
handle, pub, err := p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType)
|
|
||||||
if err != nil {
|
|
||||||
// TODO(jojo): Retry?
|
|
||||||
log.Printf("Could not reconnect subscriber for publisher %s: %s", p.publisher, err)
|
|
||||||
p.Close(context.Background())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
p.handle = handle
|
|
||||||
p.handleId = handle.Id
|
|
||||||
p.roomId = pub.roomId
|
|
||||||
p.sid = strconv.FormatUint(handle.Id, 10)
|
|
||||||
p.listener.SubscriberSidUpdated(p)
|
|
||||||
log.Printf("Subscriber %d for publisher %s reconnected on handle %d", p.id, p.publisher, p.handleId)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusSubscriber) Close(ctx context.Context) {
|
|
||||||
p.mu.Lock()
|
|
||||||
closed := p.closeClient(ctx)
|
|
||||||
p.mu.Unlock()
|
|
||||||
|
|
||||||
if closed {
|
|
||||||
p.mcu.SubscriberDisconnected(p.Id(), p.publisher, p.streamType)
|
|
||||||
statsSubscribersCurrent.WithLabelValues(string(p.streamType)).Dec()
|
|
||||||
}
|
|
||||||
p.mcu.unregisterClient(p)
|
|
||||||
p.listener.SubscriberClosed(p)
|
|
||||||
p.mcuJanusClient.Close(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusSubscriber) joinRoom(ctx context.Context, stream *streamSelection, callback func(error, map[string]interface{})) {
|
|
||||||
handle := p.handle
|
|
||||||
if handle == nil {
|
|
||||||
callback(ErrNotConnected, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
waiter := p.mcu.publisherConnected.NewWaiter(getStreamId(p.publisher, p.streamType))
|
|
||||||
defer p.mcu.publisherConnected.Release(waiter)
|
|
||||||
|
|
||||||
loggedNotPublishingYet := false
|
|
||||||
retry:
|
|
||||||
join_msg := map[string]interface{}{
|
|
||||||
"request": "join",
|
|
||||||
"ptype": "subscriber",
|
|
||||||
"room": p.roomId,
|
|
||||||
}
|
|
||||||
if p.mcu.isMultistream() {
|
|
||||||
join_msg["streams"] = []map[string]interface{}{
|
|
||||||
{
|
|
||||||
"feed": streamTypeUserIds[p.streamType],
|
|
||||||
},
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
join_msg["feed"] = streamTypeUserIds[p.streamType]
|
|
||||||
}
|
|
||||||
if stream != nil {
|
|
||||||
stream.AddToMessage(join_msg)
|
|
||||||
}
|
|
||||||
join_response, err := handle.Message(ctx, join_msg, nil)
|
|
||||||
if err != nil {
|
|
||||||
callback(err, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if error_code := getPluginIntValue(join_response.Plugindata, pluginVideoRoom, "error_code"); error_code > 0 {
|
|
||||||
switch error_code {
|
|
||||||
case JANUS_VIDEOROOM_ERROR_ALREADY_JOINED:
|
|
||||||
// The subscriber is already connected to the room. This can happen
|
|
||||||
// if a client leaves a call but keeps the subscriber objects active.
|
|
||||||
// On joining the call again, the subscriber tries to join on the
|
|
||||||
// MCU which will fail because he is still connected.
|
|
||||||
// To get a new Offer SDP, we have to tear down the session on the
|
|
||||||
// MCU and join again.
|
|
||||||
p.mu.Lock()
|
|
||||||
p.closeClient(ctx)
|
|
||||||
p.mu.Unlock()
|
|
||||||
|
|
||||||
var pub *mcuJanusPublisher
|
|
||||||
handle, pub, err = p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType)
|
|
||||||
if err != nil {
|
|
||||||
// Reconnection didn't work, need to unregister/remove subscriber
|
|
||||||
// so a new object will be created if the request is retried.
|
|
||||||
p.mcu.unregisterClient(p)
|
|
||||||
p.listener.SubscriberClosed(p)
|
|
||||||
callback(fmt.Errorf("Already connected as subscriber for %s, error during re-joining: %s", p.streamType, err), nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
p.handle = handle
|
|
||||||
p.handleId = handle.Id
|
|
||||||
p.roomId = pub.roomId
|
|
||||||
p.sid = strconv.FormatUint(handle.Id, 10)
|
|
||||||
p.listener.SubscriberSidUpdated(p)
|
|
||||||
p.closeChan = make(chan struct{}, 1)
|
|
||||||
go p.run(p.handle, p.closeChan)
|
|
||||||
log.Printf("Already connected subscriber %d for %s, leaving and re-joining on handle %d", p.id, p.streamType, p.handleId)
|
|
||||||
goto retry
|
|
||||||
case JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM:
|
|
||||||
fallthrough
|
|
||||||
case JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED:
|
|
||||||
switch error_code {
|
|
||||||
case JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM:
|
|
||||||
log.Printf("Publisher %s not created yet for %s, wait and retry to join room %d as subscriber", p.publisher, p.streamType, p.roomId)
|
|
||||||
case JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED:
|
|
||||||
log.Printf("Publisher %s not sending yet for %s, wait and retry to join room %d as subscriber", p.publisher, p.streamType, p.roomId)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !loggedNotPublishingYet {
|
|
||||||
loggedNotPublishingYet = true
|
|
||||||
statsWaitingForPublisherTotal.WithLabelValues(string(p.streamType)).Inc()
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := waiter.Wait(ctx); err != nil {
|
|
||||||
callback(err, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.Printf("Retry subscribing %s from %s", p.streamType, p.publisher)
|
|
||||||
goto retry
|
|
||||||
default:
|
|
||||||
// TODO(jojo): Should we handle other errors, too?
|
|
||||||
callback(fmt.Errorf("Error joining room as subscriber: %+v", join_response), nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//log.Println("Joined as listener", join_response)
|
|
||||||
|
|
||||||
p.session = join_response.Session
|
|
||||||
callback(nil, join_response.Jsep)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusSubscriber) update(ctx context.Context, stream *streamSelection, callback func(error, map[string]interface{})) {
|
|
||||||
handle := p.handle
|
|
||||||
if handle == nil {
|
|
||||||
callback(ErrNotConnected, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
configure_msg := map[string]interface{}{
|
|
||||||
"request": "configure",
|
|
||||||
"update": true,
|
|
||||||
}
|
|
||||||
if stream != nil {
|
|
||||||
stream.AddToMessage(configure_msg)
|
|
||||||
}
|
|
||||||
configure_response, err := handle.Message(ctx, configure_msg, nil)
|
|
||||||
if err != nil {
|
|
||||||
callback(err, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
callback(nil, configure_response.Jsep)
|
|
||||||
}
|
|
||||||
|
|
||||||
type streamSelection struct {
|
|
||||||
substream sql.NullInt16
|
|
||||||
temporal sql.NullInt16
|
|
||||||
audio sql.NullBool
|
|
||||||
video sql.NullBool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *streamSelection) HasValues() bool {
|
|
||||||
return s.substream.Valid || s.temporal.Valid || s.audio.Valid || s.video.Valid
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *streamSelection) AddToMessage(message map[string]interface{}) {
|
|
||||||
if s.substream.Valid {
|
|
||||||
message["substream"] = s.substream.Int16
|
|
||||||
}
|
|
||||||
if s.temporal.Valid {
|
|
||||||
message["temporal"] = s.temporal.Int16
|
|
||||||
}
|
|
||||||
if s.audio.Valid {
|
|
||||||
message["audio"] = s.audio.Bool
|
|
||||||
}
|
|
||||||
if s.video.Valid {
|
|
||||||
message["video"] = s.video.Bool
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseStreamSelection(payload map[string]interface{}) (*streamSelection, error) {
|
|
||||||
var stream streamSelection
|
|
||||||
if value, found := payload["substream"]; found {
|
|
||||||
switch value := value.(type) {
|
|
||||||
case int:
|
|
||||||
stream.substream.Valid = true
|
|
||||||
stream.substream.Int16 = int16(value)
|
|
||||||
case float32:
|
|
||||||
stream.substream.Valid = true
|
|
||||||
stream.substream.Int16 = int16(value)
|
|
||||||
case float64:
|
|
||||||
stream.substream.Valid = true
|
|
||||||
stream.substream.Int16 = int16(value)
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("Unsupported substream value: %v", value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if value, found := payload["temporal"]; found {
|
|
||||||
switch value := value.(type) {
|
|
||||||
case int:
|
|
||||||
stream.temporal.Valid = true
|
|
||||||
stream.temporal.Int16 = int16(value)
|
|
||||||
case float32:
|
|
||||||
stream.temporal.Valid = true
|
|
||||||
stream.temporal.Int16 = int16(value)
|
|
||||||
case float64:
|
|
||||||
stream.temporal.Valid = true
|
|
||||||
stream.temporal.Int16 = int16(value)
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("Unsupported temporal value: %v", value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if value, found := payload["audio"]; found {
|
|
||||||
switch value := value.(type) {
|
|
||||||
case bool:
|
|
||||||
stream.audio.Valid = true
|
|
||||||
stream.audio.Bool = value
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("Unsupported audio value: %v", value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if value, found := payload["video"]; found {
|
|
||||||
switch value := value.(type) {
|
|
||||||
case bool:
|
|
||||||
stream.video.Valid = true
|
|
||||||
stream.video.Bool = value
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("Unsupported video value: %v", value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &stream, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) {
|
|
||||||
statsMcuMessagesTotal.WithLabelValues(data.Type).Inc()
|
|
||||||
jsep_msg := data.Payload
|
|
||||||
switch data.Type {
|
|
||||||
case "requestoffer":
|
|
||||||
fallthrough
|
|
||||||
case "sendoffer":
|
|
||||||
p.deferred <- func() {
|
|
||||||
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
stream, err := parseStreamSelection(jsep_msg)
|
|
||||||
if err != nil {
|
|
||||||
go callback(err, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if data.Sid == "" || data.Sid != p.Sid() {
|
|
||||||
p.joinRoom(msgctx, stream, callback)
|
|
||||||
} else {
|
|
||||||
p.update(msgctx, stream, callback)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case "answer":
|
|
||||||
p.deferred <- func() {
|
|
||||||
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
if data.Sid == "" || data.Sid == p.Sid() {
|
|
||||||
p.sendAnswer(msgctx, jsep_msg, callback)
|
|
||||||
} else {
|
|
||||||
go callback(fmt.Errorf("Answer message sid (%s) does not match subscriber sid (%s)", data.Sid, p.Sid()), nil)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case "candidate":
|
|
||||||
p.deferred <- func() {
|
|
||||||
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
if data.Sid == "" || data.Sid == p.Sid() {
|
|
||||||
p.sendCandidate(msgctx, jsep_msg["candidate"], callback)
|
|
||||||
} else {
|
|
||||||
go callback(fmt.Errorf("Candidate message sid (%s) does not match subscriber sid (%s)", data.Sid, p.Sid()), nil)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case "endOfCandidates":
|
|
||||||
// Ignore
|
|
||||||
case "selectStream":
|
|
||||||
stream, err := parseStreamSelection(jsep_msg)
|
|
||||||
if err != nil {
|
|
||||||
go callback(err, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if stream == nil || !stream.HasValues() {
|
|
||||||
// Nothing to do
|
|
||||||
go callback(nil, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
p.deferred <- func() {
|
|
||||||
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
p.selectStream(msgctx, stream, callback)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
// Return error asynchronously
|
|
||||||
go callback(fmt.Errorf("Unsupported message type: %s", data.Type), nil)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
216
mcu_janus_client.go
Normal file
216
mcu_janus_client.go
Normal file
|
@ -0,0 +1,216 @@
|
||||||
|
/**
|
||||||
|
* Standalone signaling server for the Nextcloud Spreed app.
|
||||||
|
* Copyright (C) 2017 struktur AG
|
||||||
|
*
|
||||||
|
* @author Joachim Bauch <bauch@struktur.de>
|
||||||
|
*
|
||||||
|
* @license GNU AGPL version 3 or any later version
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Affero General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
package signaling
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"reflect"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/notedit/janus-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mcuJanusClient struct {
|
||||||
|
mcu *mcuJanus
|
||||||
|
listener McuListener
|
||||||
|
mu sync.Mutex // nolint
|
||||||
|
|
||||||
|
id uint64
|
||||||
|
session uint64
|
||||||
|
roomId uint64
|
||||||
|
sid string
|
||||||
|
streamType StreamType
|
||||||
|
maxBitrate int
|
||||||
|
|
||||||
|
handle *JanusHandle
|
||||||
|
handleId uint64
|
||||||
|
closeChan chan struct{}
|
||||||
|
deferred chan func()
|
||||||
|
|
||||||
|
handleEvent func(event *janus.EventMsg)
|
||||||
|
handleHangup func(event *janus.HangupMsg)
|
||||||
|
handleDetached func(event *janus.DetachedMsg)
|
||||||
|
handleConnected func(event *janus.WebRTCUpMsg)
|
||||||
|
handleSlowLink func(event *janus.SlowLinkMsg)
|
||||||
|
handleMedia func(event *janus.MediaMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mcuJanusClient) Id() string {
|
||||||
|
return strconv.FormatUint(c.id, 10)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mcuJanusClient) Sid() string {
|
||||||
|
return c.sid
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mcuJanusClient) StreamType() StreamType {
|
||||||
|
return c.streamType
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mcuJanusClient) MaxBitrate() int {
|
||||||
|
return c.maxBitrate
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mcuJanusClient) Close(ctx context.Context) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mcuJanusClient) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mcuJanusClient) closeClient(ctx context.Context) bool {
|
||||||
|
if handle := c.handle; handle != nil {
|
||||||
|
c.handle = nil
|
||||||
|
close(c.closeChan)
|
||||||
|
if _, err := handle.Detach(ctx); err != nil {
|
||||||
|
if e, ok := err.(*janus.ErrorMsg); !ok || e.Err.Code != JANUS_ERROR_HANDLE_NOT_FOUND {
|
||||||
|
log.Println("Could not detach client", handle.Id, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mcuJanusClient) run(handle *JanusHandle, closeChan <-chan struct{}) {
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case msg := <-handle.Events:
|
||||||
|
switch t := msg.(type) {
|
||||||
|
case *janus.EventMsg:
|
||||||
|
c.handleEvent(t)
|
||||||
|
case *janus.HangupMsg:
|
||||||
|
c.handleHangup(t)
|
||||||
|
case *janus.DetachedMsg:
|
||||||
|
c.handleDetached(t)
|
||||||
|
case *janus.MediaMsg:
|
||||||
|
c.handleMedia(t)
|
||||||
|
case *janus.WebRTCUpMsg:
|
||||||
|
c.handleConnected(t)
|
||||||
|
case *janus.SlowLinkMsg:
|
||||||
|
c.handleSlowLink(t)
|
||||||
|
case *TrickleMsg:
|
||||||
|
c.handleTrickle(t)
|
||||||
|
default:
|
||||||
|
log.Println("Received unsupported event type", msg, reflect.TypeOf(msg))
|
||||||
|
}
|
||||||
|
case f := <-c.deferred:
|
||||||
|
f()
|
||||||
|
case <-closeChan:
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mcuJanusClient) sendOffer(ctx context.Context, offer map[string]interface{}, callback func(error, map[string]interface{})) {
|
||||||
|
handle := c.handle
|
||||||
|
if handle == nil {
|
||||||
|
callback(ErrNotConnected, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
configure_msg := map[string]interface{}{
|
||||||
|
"request": "configure",
|
||||||
|
"audio": true,
|
||||||
|
"video": true,
|
||||||
|
"data": true,
|
||||||
|
}
|
||||||
|
answer_msg, err := handle.Message(ctx, configure_msg, offer)
|
||||||
|
if err != nil {
|
||||||
|
callback(err, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
callback(nil, answer_msg.Jsep)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mcuJanusClient) sendAnswer(ctx context.Context, answer map[string]interface{}, callback func(error, map[string]interface{})) {
|
||||||
|
handle := c.handle
|
||||||
|
if handle == nil {
|
||||||
|
callback(ErrNotConnected, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
start_msg := map[string]interface{}{
|
||||||
|
"request": "start",
|
||||||
|
"room": c.roomId,
|
||||||
|
}
|
||||||
|
start_response, err := handle.Message(ctx, start_msg, answer)
|
||||||
|
if err != nil {
|
||||||
|
callback(err, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Println("Started listener", start_response)
|
||||||
|
callback(nil, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mcuJanusClient) sendCandidate(ctx context.Context, candidate interface{}, callback func(error, map[string]interface{})) {
|
||||||
|
handle := c.handle
|
||||||
|
if handle == nil {
|
||||||
|
callback(ErrNotConnected, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := handle.Trickle(ctx, candidate); err != nil {
|
||||||
|
callback(err, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
callback(nil, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mcuJanusClient) handleTrickle(event *TrickleMsg) {
|
||||||
|
if event.Candidate.Completed {
|
||||||
|
c.listener.OnIceCompleted(c)
|
||||||
|
} else {
|
||||||
|
c.listener.OnIceCandidate(c, event.Candidate)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mcuJanusClient) selectStream(ctx context.Context, stream *streamSelection, callback func(error, map[string]interface{})) {
|
||||||
|
handle := c.handle
|
||||||
|
if handle == nil {
|
||||||
|
callback(ErrNotConnected, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if stream == nil || !stream.HasValues() {
|
||||||
|
callback(nil, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
configure_msg := map[string]interface{}{
|
||||||
|
"request": "configure",
|
||||||
|
}
|
||||||
|
if stream != nil {
|
||||||
|
stream.AddToMessage(configure_msg)
|
||||||
|
}
|
||||||
|
_, err := handle.Message(ctx, configure_msg, nil)
|
||||||
|
if err != nil {
|
||||||
|
callback(err, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
callback(nil, nil)
|
||||||
|
}
|
205
mcu_janus_publisher.go
Normal file
205
mcu_janus_publisher.go
Normal file
|
@ -0,0 +1,205 @@
|
||||||
|
/**
|
||||||
|
* Standalone signaling server for the Nextcloud Spreed app.
|
||||||
|
* Copyright (C) 2017 struktur AG
|
||||||
|
*
|
||||||
|
* @author Joachim Bauch <bauch@struktur.de>
|
||||||
|
*
|
||||||
|
* @license GNU AGPL version 3 or any later version
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Affero General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
package signaling
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/notedit/janus-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mcuJanusPublisher struct {
|
||||||
|
mcuJanusClient
|
||||||
|
|
||||||
|
id string
|
||||||
|
bitrate int
|
||||||
|
mediaTypes MediaType
|
||||||
|
stats publisherStatsCounter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusPublisher) handleEvent(event *janus.EventMsg) {
|
||||||
|
if videoroom := getPluginStringValue(event.Plugindata, pluginVideoRoom, "videoroom"); videoroom != "" {
|
||||||
|
ctx := context.TODO()
|
||||||
|
switch videoroom {
|
||||||
|
case "destroyed":
|
||||||
|
log.Printf("Publisher %d: associated room has been destroyed, closing", p.handleId)
|
||||||
|
go p.Close(ctx)
|
||||||
|
case "slow_link":
|
||||||
|
// Ignore, processed through "handleSlowLink" in the general events.
|
||||||
|
default:
|
||||||
|
log.Printf("Unsupported videoroom publisher event in %d: %+v", p.handleId, event)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Printf("Unsupported publisher event in %d: %+v", p.handleId, event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusPublisher) handleHangup(event *janus.HangupMsg) {
|
||||||
|
log.Printf("Publisher %d received hangup (%s), closing", p.handleId, event.Reason)
|
||||||
|
go p.Close(context.Background())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusPublisher) handleDetached(event *janus.DetachedMsg) {
|
||||||
|
log.Printf("Publisher %d received detached, closing", p.handleId)
|
||||||
|
go p.Close(context.Background())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusPublisher) handleConnected(event *janus.WebRTCUpMsg) {
|
||||||
|
log.Printf("Publisher %d received connected", p.handleId)
|
||||||
|
p.mcu.publisherConnected.Notify(getStreamId(p.id, p.streamType))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusPublisher) handleSlowLink(event *janus.SlowLinkMsg) {
|
||||||
|
if event.Uplink {
|
||||||
|
log.Printf("Publisher %s (%d) is reporting %d lost packets on the uplink (Janus -> client)", p.listener.PublicId(), p.handleId, event.Lost)
|
||||||
|
} else {
|
||||||
|
log.Printf("Publisher %s (%d) is reporting %d lost packets on the downlink (client -> Janus)", p.listener.PublicId(), p.handleId, event.Lost)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusPublisher) handleMedia(event *janus.MediaMsg) {
|
||||||
|
mediaType := StreamType(event.Type)
|
||||||
|
if mediaType == StreamTypeVideo && p.streamType == StreamTypeScreen {
|
||||||
|
// We want to differentiate between audio, video and screensharing
|
||||||
|
mediaType = p.streamType
|
||||||
|
}
|
||||||
|
|
||||||
|
p.stats.EnableStream(mediaType, event.Receiving)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusPublisher) HasMedia(mt MediaType) bool {
|
||||||
|
return (p.mediaTypes & mt) == mt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusPublisher) SetMedia(mt MediaType) {
|
||||||
|
p.mediaTypes = mt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusPublisher) NotifyReconnected() {
|
||||||
|
ctx := context.TODO()
|
||||||
|
handle, session, roomId, _, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Could not reconnect publisher %s: %s", p.id, err)
|
||||||
|
// TODO(jojo): Retry
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
p.handle = handle
|
||||||
|
p.handleId = handle.Id
|
||||||
|
p.session = session
|
||||||
|
p.roomId = roomId
|
||||||
|
|
||||||
|
log.Printf("Publisher %s reconnected on handle %d", p.id, p.handleId)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusPublisher) Close(ctx context.Context) {
|
||||||
|
notify := false
|
||||||
|
p.mu.Lock()
|
||||||
|
if handle := p.handle; handle != nil && p.roomId != 0 {
|
||||||
|
destroy_msg := map[string]interface{}{
|
||||||
|
"request": "destroy",
|
||||||
|
"room": p.roomId,
|
||||||
|
}
|
||||||
|
if _, err := handle.Request(ctx, destroy_msg); err != nil {
|
||||||
|
log.Printf("Error destroying room %d: %s", p.roomId, err)
|
||||||
|
} else {
|
||||||
|
log.Printf("Room %d destroyed", p.roomId)
|
||||||
|
}
|
||||||
|
p.mcu.mu.Lock()
|
||||||
|
delete(p.mcu.publishers, getStreamId(p.id, p.streamType))
|
||||||
|
p.mcu.mu.Unlock()
|
||||||
|
p.roomId = 0
|
||||||
|
notify = true
|
||||||
|
}
|
||||||
|
p.closeClient(ctx)
|
||||||
|
p.mu.Unlock()
|
||||||
|
|
||||||
|
p.stats.Reset()
|
||||||
|
|
||||||
|
if notify {
|
||||||
|
statsPublishersCurrent.WithLabelValues(string(p.streamType)).Dec()
|
||||||
|
p.mcu.unregisterClient(p)
|
||||||
|
p.listener.PublisherClosed(p)
|
||||||
|
}
|
||||||
|
p.mcuJanusClient.Close(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) {
|
||||||
|
statsMcuMessagesTotal.WithLabelValues(data.Type).Inc()
|
||||||
|
jsep_msg := data.Payload
|
||||||
|
switch data.Type {
|
||||||
|
case "offer":
|
||||||
|
p.deferred <- func() {
|
||||||
|
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// TODO Tear down previous publisher and get a new one if sid does
|
||||||
|
// not match?
|
||||||
|
p.sendOffer(msgctx, jsep_msg, callback)
|
||||||
|
}
|
||||||
|
case "candidate":
|
||||||
|
p.deferred <- func() {
|
||||||
|
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if data.Sid == "" || data.Sid == p.Sid() {
|
||||||
|
p.sendCandidate(msgctx, jsep_msg["candidate"], callback)
|
||||||
|
} else {
|
||||||
|
go callback(fmt.Errorf("Candidate message sid (%s) does not match publisher sid (%s)", data.Sid, p.Sid()), nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case "endOfCandidates":
|
||||||
|
// Ignore
|
||||||
|
default:
|
||||||
|
go callback(fmt.Errorf("Unsupported message type: %s", data.Type), nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error {
|
||||||
|
msg := map[string]interface{}{
|
||||||
|
"request": "publish_remotely",
|
||||||
|
"room": p.roomId,
|
||||||
|
"publisher_id": streamTypeUserIds[p.streamType],
|
||||||
|
"remote_id": p.id,
|
||||||
|
"host": hostname,
|
||||||
|
"port": port,
|
||||||
|
"rtcp_port": rtcpPort,
|
||||||
|
}
|
||||||
|
response, err := p.handle.Request(ctx, msg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
errorMessage := getPluginStringValue(response.PluginData, pluginVideoRoom, "error")
|
||||||
|
errorCode := getPluginIntValue(response.PluginData, pluginVideoRoom, "error_code")
|
||||||
|
if errorMessage != "" || errorCode != 0 {
|
||||||
|
if errorMessage == "" {
|
||||||
|
errorMessage = "unknown error"
|
||||||
|
}
|
||||||
|
return fmt.Errorf("%s (%d)", errorMessage, errorCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Publishing %s to %s (port=%d, rtcpPort=%d)", p.id, hostname, port, rtcpPort)
|
||||||
|
return nil
|
||||||
|
}
|
110
mcu_janus_stream_selection.go
Normal file
110
mcu_janus_stream_selection.go
Normal file
|
@ -0,0 +1,110 @@
|
||||||
|
/**
|
||||||
|
* Standalone signaling server for the Nextcloud Spreed app.
|
||||||
|
* Copyright (C) 2017 struktur AG
|
||||||
|
*
|
||||||
|
* @author Joachim Bauch <bauch@struktur.de>
|
||||||
|
*
|
||||||
|
* @license GNU AGPL version 3 or any later version
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Affero General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
package signaling
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
type streamSelection struct {
|
||||||
|
substream sql.NullInt16
|
||||||
|
temporal sql.NullInt16
|
||||||
|
audio sql.NullBool
|
||||||
|
video sql.NullBool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *streamSelection) HasValues() bool {
|
||||||
|
return s.substream.Valid || s.temporal.Valid || s.audio.Valid || s.video.Valid
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *streamSelection) AddToMessage(message map[string]interface{}) {
|
||||||
|
if s.substream.Valid {
|
||||||
|
message["substream"] = s.substream.Int16
|
||||||
|
}
|
||||||
|
if s.temporal.Valid {
|
||||||
|
message["temporal"] = s.temporal.Int16
|
||||||
|
}
|
||||||
|
if s.audio.Valid {
|
||||||
|
message["audio"] = s.audio.Bool
|
||||||
|
}
|
||||||
|
if s.video.Valid {
|
||||||
|
message["video"] = s.video.Bool
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseStreamSelection(payload map[string]interface{}) (*streamSelection, error) {
|
||||||
|
var stream streamSelection
|
||||||
|
if value, found := payload["substream"]; found {
|
||||||
|
switch value := value.(type) {
|
||||||
|
case int:
|
||||||
|
stream.substream.Valid = true
|
||||||
|
stream.substream.Int16 = int16(value)
|
||||||
|
case float32:
|
||||||
|
stream.substream.Valid = true
|
||||||
|
stream.substream.Int16 = int16(value)
|
||||||
|
case float64:
|
||||||
|
stream.substream.Valid = true
|
||||||
|
stream.substream.Int16 = int16(value)
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("Unsupported substream value: %v", value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if value, found := payload["temporal"]; found {
|
||||||
|
switch value := value.(type) {
|
||||||
|
case int:
|
||||||
|
stream.temporal.Valid = true
|
||||||
|
stream.temporal.Int16 = int16(value)
|
||||||
|
case float32:
|
||||||
|
stream.temporal.Valid = true
|
||||||
|
stream.temporal.Int16 = int16(value)
|
||||||
|
case float64:
|
||||||
|
stream.temporal.Valid = true
|
||||||
|
stream.temporal.Int16 = int16(value)
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("Unsupported temporal value: %v", value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if value, found := payload["audio"]; found {
|
||||||
|
switch value := value.(type) {
|
||||||
|
case bool:
|
||||||
|
stream.audio.Valid = true
|
||||||
|
stream.audio.Bool = value
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("Unsupported audio value: %v", value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if value, found := payload["video"]; found {
|
||||||
|
switch value := value.(type) {
|
||||||
|
case bool:
|
||||||
|
stream.video.Valid = true
|
||||||
|
stream.video.Bool = value
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("Unsupported video value: %v", value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &stream, nil
|
||||||
|
}
|
321
mcu_janus_subscriber.go
Normal file
321
mcu_janus_subscriber.go
Normal file
|
@ -0,0 +1,321 @@
|
||||||
|
/**
|
||||||
|
* Standalone signaling server for the Nextcloud Spreed app.
|
||||||
|
* Copyright (C) 2017 struktur AG
|
||||||
|
*
|
||||||
|
* @author Joachim Bauch <bauch@struktur.de>
|
||||||
|
*
|
||||||
|
* @license GNU AGPL version 3 or any later version
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Affero General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
package signaling
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/notedit/janus-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mcuJanusSubscriber struct {
|
||||||
|
mcuJanusClient
|
||||||
|
|
||||||
|
publisher string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusSubscriber) Publisher() string {
|
||||||
|
return p.publisher
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusSubscriber) handleEvent(event *janus.EventMsg) {
|
||||||
|
if videoroom := getPluginStringValue(event.Plugindata, pluginVideoRoom, "videoroom"); videoroom != "" {
|
||||||
|
ctx := context.TODO()
|
||||||
|
switch videoroom {
|
||||||
|
case "destroyed":
|
||||||
|
log.Printf("Subscriber %d: associated room has been destroyed, closing", p.handleId)
|
||||||
|
go p.Close(ctx)
|
||||||
|
case "event":
|
||||||
|
// Handle renegotiations, but ignore other events like selected
|
||||||
|
// substream / temporal layer.
|
||||||
|
if getPluginStringValue(event.Plugindata, pluginVideoRoom, "configured") == "ok" &&
|
||||||
|
event.Jsep != nil && event.Jsep["type"] == "offer" && event.Jsep["sdp"] != nil {
|
||||||
|
p.listener.OnUpdateOffer(p, event.Jsep)
|
||||||
|
}
|
||||||
|
case "slow_link":
|
||||||
|
// Ignore, processed through "handleSlowLink" in the general events.
|
||||||
|
default:
|
||||||
|
log.Printf("Unsupported videoroom event %s for subscriber %d: %+v", videoroom, p.handleId, event)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Printf("Unsupported event for subscriber %d: %+v", p.handleId, event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusSubscriber) handleHangup(event *janus.HangupMsg) {
|
||||||
|
log.Printf("Subscriber %d received hangup (%s), closing", p.handleId, event.Reason)
|
||||||
|
go p.Close(context.Background())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusSubscriber) handleDetached(event *janus.DetachedMsg) {
|
||||||
|
log.Printf("Subscriber %d received detached, closing", p.handleId)
|
||||||
|
go p.Close(context.Background())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusSubscriber) handleConnected(event *janus.WebRTCUpMsg) {
|
||||||
|
log.Printf("Subscriber %d received connected", p.handleId)
|
||||||
|
p.mcu.SubscriberConnected(p.Id(), p.publisher, p.streamType)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusSubscriber) handleSlowLink(event *janus.SlowLinkMsg) {
|
||||||
|
if event.Uplink {
|
||||||
|
log.Printf("Subscriber %s (%d) is reporting %d lost packets on the uplink (Janus -> client)", p.listener.PublicId(), p.handleId, event.Lost)
|
||||||
|
} else {
|
||||||
|
log.Printf("Subscriber %s (%d) is reporting %d lost packets on the downlink (client -> Janus)", p.listener.PublicId(), p.handleId, event.Lost)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusSubscriber) handleMedia(event *janus.MediaMsg) {
|
||||||
|
// Only triggered for publishers
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusSubscriber) NotifyReconnected() {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
|
||||||
|
defer cancel()
|
||||||
|
handle, pub, err := p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType)
|
||||||
|
if err != nil {
|
||||||
|
// TODO(jojo): Retry?
|
||||||
|
log.Printf("Could not reconnect subscriber for publisher %s: %s", p.publisher, err)
|
||||||
|
p.Close(context.Background())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
p.handle = handle
|
||||||
|
p.handleId = handle.Id
|
||||||
|
p.roomId = pub.roomId
|
||||||
|
p.sid = strconv.FormatUint(handle.Id, 10)
|
||||||
|
p.listener.SubscriberSidUpdated(p)
|
||||||
|
log.Printf("Subscriber %d for publisher %s reconnected on handle %d", p.id, p.publisher, p.handleId)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusSubscriber) Close(ctx context.Context) {
|
||||||
|
p.mu.Lock()
|
||||||
|
closed := p.closeClient(ctx)
|
||||||
|
p.mu.Unlock()
|
||||||
|
|
||||||
|
if closed {
|
||||||
|
p.mcu.SubscriberDisconnected(p.Id(), p.publisher, p.streamType)
|
||||||
|
statsSubscribersCurrent.WithLabelValues(string(p.streamType)).Dec()
|
||||||
|
}
|
||||||
|
p.mcu.unregisterClient(p)
|
||||||
|
p.listener.SubscriberClosed(p)
|
||||||
|
p.mcuJanusClient.Close(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusSubscriber) joinRoom(ctx context.Context, stream *streamSelection, callback func(error, map[string]interface{})) {
|
||||||
|
handle := p.handle
|
||||||
|
if handle == nil {
|
||||||
|
callback(ErrNotConnected, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
waiter := p.mcu.publisherConnected.NewWaiter(getStreamId(p.publisher, p.streamType))
|
||||||
|
defer p.mcu.publisherConnected.Release(waiter)
|
||||||
|
|
||||||
|
loggedNotPublishingYet := false
|
||||||
|
retry:
|
||||||
|
join_msg := map[string]interface{}{
|
||||||
|
"request": "join",
|
||||||
|
"ptype": "subscriber",
|
||||||
|
"room": p.roomId,
|
||||||
|
}
|
||||||
|
if p.mcu.isMultistream() {
|
||||||
|
join_msg["streams"] = []map[string]interface{}{
|
||||||
|
{
|
||||||
|
"feed": streamTypeUserIds[p.streamType],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
join_msg["feed"] = streamTypeUserIds[p.streamType]
|
||||||
|
}
|
||||||
|
if stream != nil {
|
||||||
|
stream.AddToMessage(join_msg)
|
||||||
|
}
|
||||||
|
join_response, err := handle.Message(ctx, join_msg, nil)
|
||||||
|
if err != nil {
|
||||||
|
callback(err, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if error_code := getPluginIntValue(join_response.Plugindata, pluginVideoRoom, "error_code"); error_code > 0 {
|
||||||
|
switch error_code {
|
||||||
|
case JANUS_VIDEOROOM_ERROR_ALREADY_JOINED:
|
||||||
|
// The subscriber is already connected to the room. This can happen
|
||||||
|
// if a client leaves a call but keeps the subscriber objects active.
|
||||||
|
// On joining the call again, the subscriber tries to join on the
|
||||||
|
// MCU which will fail because he is still connected.
|
||||||
|
// To get a new Offer SDP, we have to tear down the session on the
|
||||||
|
// MCU and join again.
|
||||||
|
p.mu.Lock()
|
||||||
|
p.closeClient(ctx)
|
||||||
|
p.mu.Unlock()
|
||||||
|
|
||||||
|
var pub *mcuJanusPublisher
|
||||||
|
handle, pub, err = p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType)
|
||||||
|
if err != nil {
|
||||||
|
// Reconnection didn't work, need to unregister/remove subscriber
|
||||||
|
// so a new object will be created if the request is retried.
|
||||||
|
p.mcu.unregisterClient(p)
|
||||||
|
p.listener.SubscriberClosed(p)
|
||||||
|
callback(fmt.Errorf("Already connected as subscriber for %s, error during re-joining: %s", p.streamType, err), nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
p.handle = handle
|
||||||
|
p.handleId = handle.Id
|
||||||
|
p.roomId = pub.roomId
|
||||||
|
p.sid = strconv.FormatUint(handle.Id, 10)
|
||||||
|
p.listener.SubscriberSidUpdated(p)
|
||||||
|
p.closeChan = make(chan struct{}, 1)
|
||||||
|
go p.run(p.handle, p.closeChan)
|
||||||
|
log.Printf("Already connected subscriber %d for %s, leaving and re-joining on handle %d", p.id, p.streamType, p.handleId)
|
||||||
|
goto retry
|
||||||
|
case JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM:
|
||||||
|
fallthrough
|
||||||
|
case JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED:
|
||||||
|
switch error_code {
|
||||||
|
case JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM:
|
||||||
|
log.Printf("Publisher %s not created yet for %s, wait and retry to join room %d as subscriber", p.publisher, p.streamType, p.roomId)
|
||||||
|
case JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED:
|
||||||
|
log.Printf("Publisher %s not sending yet for %s, wait and retry to join room %d as subscriber", p.publisher, p.streamType, p.roomId)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !loggedNotPublishingYet {
|
||||||
|
loggedNotPublishingYet = true
|
||||||
|
statsWaitingForPublisherTotal.WithLabelValues(string(p.streamType)).Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := waiter.Wait(ctx); err != nil {
|
||||||
|
callback(err, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Printf("Retry subscribing %s from %s", p.streamType, p.publisher)
|
||||||
|
goto retry
|
||||||
|
default:
|
||||||
|
// TODO(jojo): Should we handle other errors, too?
|
||||||
|
callback(fmt.Errorf("Error joining room as subscriber: %+v", join_response), nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//log.Println("Joined as listener", join_response)
|
||||||
|
|
||||||
|
p.session = join_response.Session
|
||||||
|
callback(nil, join_response.Jsep)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusSubscriber) update(ctx context.Context, stream *streamSelection, callback func(error, map[string]interface{})) {
|
||||||
|
handle := p.handle
|
||||||
|
if handle == nil {
|
||||||
|
callback(ErrNotConnected, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
configure_msg := map[string]interface{}{
|
||||||
|
"request": "configure",
|
||||||
|
"update": true,
|
||||||
|
}
|
||||||
|
if stream != nil {
|
||||||
|
stream.AddToMessage(configure_msg)
|
||||||
|
}
|
||||||
|
configure_response, err := handle.Message(ctx, configure_msg, nil)
|
||||||
|
if err != nil {
|
||||||
|
callback(err, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
callback(nil, configure_response.Jsep)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) {
|
||||||
|
statsMcuMessagesTotal.WithLabelValues(data.Type).Inc()
|
||||||
|
jsep_msg := data.Payload
|
||||||
|
switch data.Type {
|
||||||
|
case "requestoffer":
|
||||||
|
fallthrough
|
||||||
|
case "sendoffer":
|
||||||
|
p.deferred <- func() {
|
||||||
|
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
stream, err := parseStreamSelection(jsep_msg)
|
||||||
|
if err != nil {
|
||||||
|
go callback(err, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if data.Sid == "" || data.Sid != p.Sid() {
|
||||||
|
p.joinRoom(msgctx, stream, callback)
|
||||||
|
} else {
|
||||||
|
p.update(msgctx, stream, callback)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case "answer":
|
||||||
|
p.deferred <- func() {
|
||||||
|
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if data.Sid == "" || data.Sid == p.Sid() {
|
||||||
|
p.sendAnswer(msgctx, jsep_msg, callback)
|
||||||
|
} else {
|
||||||
|
go callback(fmt.Errorf("Answer message sid (%s) does not match subscriber sid (%s)", data.Sid, p.Sid()), nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case "candidate":
|
||||||
|
p.deferred <- func() {
|
||||||
|
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if data.Sid == "" || data.Sid == p.Sid() {
|
||||||
|
p.sendCandidate(msgctx, jsep_msg["candidate"], callback)
|
||||||
|
} else {
|
||||||
|
go callback(fmt.Errorf("Candidate message sid (%s) does not match subscriber sid (%s)", data.Sid, p.Sid()), nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case "endOfCandidates":
|
||||||
|
// Ignore
|
||||||
|
case "selectStream":
|
||||||
|
stream, err := parseStreamSelection(jsep_msg)
|
||||||
|
if err != nil {
|
||||||
|
go callback(err, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if stream == nil || !stream.HasValues() {
|
||||||
|
// Nothing to do
|
||||||
|
go callback(nil, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
p.deferred <- func() {
|
||||||
|
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
p.selectStream(msgctx, stream, callback)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
// Return error asynchronously
|
||||||
|
go callback(fmt.Errorf("Unsupported message type: %s", data.Type), nil)
|
||||||
|
}
|
||||||
|
}
|
99
publisher_stats_counter.go
Normal file
99
publisher_stats_counter.go
Normal file
|
@ -0,0 +1,99 @@
|
||||||
|
/**
|
||||||
|
* Standalone signaling server for the Nextcloud Spreed app.
|
||||||
|
* Copyright (C) 2021 struktur AG
|
||||||
|
*
|
||||||
|
* @author Joachim Bauch <bauch@struktur.de>
|
||||||
|
*
|
||||||
|
* @license GNU AGPL version 3 or any later version
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Affero General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
package signaling
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type publisherStatsCounter struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
|
||||||
|
streamTypes map[StreamType]bool
|
||||||
|
subscribers map[string]bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *publisherStatsCounter) Reset() {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
count := len(c.subscribers)
|
||||||
|
for streamType := range c.streamTypes {
|
||||||
|
statsMcuPublisherStreamTypesCurrent.WithLabelValues(string(streamType)).Dec()
|
||||||
|
statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Sub(float64(count))
|
||||||
|
}
|
||||||
|
c.streamTypes = nil
|
||||||
|
c.subscribers = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *publisherStatsCounter) EnableStream(streamType StreamType, enable bool) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
if enable == c.streamTypes[streamType] {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if enable {
|
||||||
|
if c.streamTypes == nil {
|
||||||
|
c.streamTypes = make(map[StreamType]bool)
|
||||||
|
}
|
||||||
|
c.streamTypes[streamType] = true
|
||||||
|
statsMcuPublisherStreamTypesCurrent.WithLabelValues(string(streamType)).Inc()
|
||||||
|
statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Add(float64(len(c.subscribers)))
|
||||||
|
} else {
|
||||||
|
delete(c.streamTypes, streamType)
|
||||||
|
statsMcuPublisherStreamTypesCurrent.WithLabelValues(string(streamType)).Dec()
|
||||||
|
statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Sub(float64(len(c.subscribers)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *publisherStatsCounter) AddSubscriber(id string) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
if c.subscribers[id] {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.subscribers == nil {
|
||||||
|
c.subscribers = make(map[string]bool)
|
||||||
|
}
|
||||||
|
c.subscribers[id] = true
|
||||||
|
for streamType := range c.streamTypes {
|
||||||
|
statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Inc()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *publisherStatsCounter) RemoveSubscriber(id string) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
if !c.subscribers[id] {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(c.subscribers, id)
|
||||||
|
for streamType := range c.streamTypes {
|
||||||
|
statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Dec()
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue