diff --git a/mcu_janus.go b/mcu_janus.go index 487bf42..8af5cb4 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -23,12 +23,10 @@ package signaling import ( "context" - "database/sql" "encoding/json" "errors" "fmt" "log" - "reflect" "strconv" "sync" "sync/atomic" @@ -445,272 +443,6 @@ func (m *mcuJanus) sendKeepalive() { } } -type mcuJanusClient struct { - mcu *mcuJanus - listener McuListener - mu sync.Mutex // nolint - - id uint64 - session uint64 - roomId uint64 - sid string - streamType StreamType - maxBitrate int - - handle *JanusHandle - handleId uint64 - closeChan chan struct{} - deferred chan func() - - handleEvent func(event *janus.EventMsg) - handleHangup func(event *janus.HangupMsg) - handleDetached func(event *janus.DetachedMsg) - handleConnected func(event *janus.WebRTCUpMsg) - handleSlowLink func(event *janus.SlowLinkMsg) - handleMedia func(event *janus.MediaMsg) -} - -func (c *mcuJanusClient) Id() string { - return strconv.FormatUint(c.id, 10) -} - -func (c *mcuJanusClient) Sid() string { - return c.sid -} - -func (c *mcuJanusClient) StreamType() StreamType { - return c.streamType -} - -func (c *mcuJanusClient) MaxBitrate() int { - return c.maxBitrate -} - -func (c *mcuJanusClient) Close(ctx context.Context) { -} - -func (c *mcuJanusClient) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { -} - -func (c *mcuJanusClient) closeClient(ctx context.Context) bool { - if handle := c.handle; handle != nil { - c.handle = nil - close(c.closeChan) - if _, err := handle.Detach(ctx); err != nil { - if e, ok := err.(*janus.ErrorMsg); !ok || e.Err.Code != JANUS_ERROR_HANDLE_NOT_FOUND { - log.Println("Could not detach client", handle.Id, err) - } - } - return true - } - - return false -} - -func (c *mcuJanusClient) run(handle *JanusHandle, closeChan <-chan struct{}) { -loop: - for { - select { - case msg := <-handle.Events: - switch t := msg.(type) { - case *janus.EventMsg: - c.handleEvent(t) - case *janus.HangupMsg: - c.handleHangup(t) - case *janus.DetachedMsg: - c.handleDetached(t) - case *janus.MediaMsg: - c.handleMedia(t) - case *janus.WebRTCUpMsg: - c.handleConnected(t) - case *janus.SlowLinkMsg: - c.handleSlowLink(t) - case *TrickleMsg: - c.handleTrickle(t) - default: - log.Println("Received unsupported event type", msg, reflect.TypeOf(msg)) - } - case f := <-c.deferred: - f() - case <-closeChan: - break loop - } - } -} - -func (c *mcuJanusClient) sendOffer(ctx context.Context, offer map[string]interface{}, callback func(error, map[string]interface{})) { - handle := c.handle - if handle == nil { - callback(ErrNotConnected, nil) - return - } - - configure_msg := map[string]interface{}{ - "request": "configure", - "audio": true, - "video": true, - "data": true, - } - answer_msg, err := handle.Message(ctx, configure_msg, offer) - if err != nil { - callback(err, nil) - return - } - - callback(nil, answer_msg.Jsep) -} - -func (c *mcuJanusClient) sendAnswer(ctx context.Context, answer map[string]interface{}, callback func(error, map[string]interface{})) { - handle := c.handle - if handle == nil { - callback(ErrNotConnected, nil) - return - } - - start_msg := map[string]interface{}{ - "request": "start", - "room": c.roomId, - } - start_response, err := handle.Message(ctx, start_msg, answer) - if err != nil { - callback(err, nil) - return - } - log.Println("Started listener", start_response) - callback(nil, nil) -} - -func (c *mcuJanusClient) sendCandidate(ctx context.Context, candidate interface{}, callback func(error, map[string]interface{})) { - handle := c.handle - if handle == nil { - callback(ErrNotConnected, nil) - return - } - - if _, err := handle.Trickle(ctx, candidate); err != nil { - callback(err, nil) - return - } - callback(nil, nil) -} - -func (c *mcuJanusClient) handleTrickle(event *TrickleMsg) { - if event.Candidate.Completed { - c.listener.OnIceCompleted(c) - } else { - c.listener.OnIceCandidate(c, event.Candidate) - } -} - -func (c *mcuJanusClient) selectStream(ctx context.Context, stream *streamSelection, callback func(error, map[string]interface{})) { - handle := c.handle - if handle == nil { - callback(ErrNotConnected, nil) - return - } - - if stream == nil || !stream.HasValues() { - callback(nil, nil) - return - } - - configure_msg := map[string]interface{}{ - "request": "configure", - } - if stream != nil { - stream.AddToMessage(configure_msg) - } - _, err := handle.Message(ctx, configure_msg, nil) - if err != nil { - callback(err, nil) - return - } - - callback(nil, nil) -} - -type publisherStatsCounter struct { - mu sync.Mutex - - streamTypes map[StreamType]bool - subscribers map[string]bool -} - -func (c *publisherStatsCounter) Reset() { - c.mu.Lock() - defer c.mu.Unlock() - - count := len(c.subscribers) - for streamType := range c.streamTypes { - statsMcuPublisherStreamTypesCurrent.WithLabelValues(string(streamType)).Dec() - statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Sub(float64(count)) - } - c.streamTypes = nil - c.subscribers = nil -} - -func (c *publisherStatsCounter) EnableStream(streamType StreamType, enable bool) { - c.mu.Lock() - defer c.mu.Unlock() - - if enable == c.streamTypes[streamType] { - return - } - - if enable { - if c.streamTypes == nil { - c.streamTypes = make(map[StreamType]bool) - } - c.streamTypes[streamType] = true - statsMcuPublisherStreamTypesCurrent.WithLabelValues(string(streamType)).Inc() - statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Add(float64(len(c.subscribers))) - } else { - delete(c.streamTypes, streamType) - statsMcuPublisherStreamTypesCurrent.WithLabelValues(string(streamType)).Dec() - statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Sub(float64(len(c.subscribers))) - } -} - -func (c *publisherStatsCounter) AddSubscriber(id string) { - c.mu.Lock() - defer c.mu.Unlock() - - if c.subscribers[id] { - return - } - - if c.subscribers == nil { - c.subscribers = make(map[string]bool) - } - c.subscribers[id] = true - for streamType := range c.streamTypes { - statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Inc() - } -} - -func (c *publisherStatsCounter) RemoveSubscriber(id string) { - c.mu.Lock() - defer c.mu.Unlock() - - if !c.subscribers[id] { - return - } - - delete(c.subscribers, id) - for streamType := range c.streamTypes { - statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Dec() - } -} - -type mcuJanusPublisher struct { - mcuJanusClient - - id string - bitrate int - mediaTypes MediaType - stats publisherStatsCounter -} - func (m *mcuJanus) SubscriberConnected(id string, publisher string, streamType StreamType) { m.mu.Lock() defer m.mu.Unlock() @@ -867,178 +599,6 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st return client, nil } -func (p *mcuJanusPublisher) handleEvent(event *janus.EventMsg) { - if videoroom := getPluginStringValue(event.Plugindata, pluginVideoRoom, "videoroom"); videoroom != "" { - ctx := context.TODO() - switch videoroom { - case "destroyed": - log.Printf("Publisher %d: associated room has been destroyed, closing", p.handleId) - go p.Close(ctx) - case "slow_link": - // Ignore, processed through "handleSlowLink" in the general events. - default: - log.Printf("Unsupported videoroom publisher event in %d: %+v", p.handleId, event) - } - } else { - log.Printf("Unsupported publisher event in %d: %+v", p.handleId, event) - } -} - -func (p *mcuJanusPublisher) handleHangup(event *janus.HangupMsg) { - log.Printf("Publisher %d received hangup (%s), closing", p.handleId, event.Reason) - go p.Close(context.Background()) -} - -func (p *mcuJanusPublisher) handleDetached(event *janus.DetachedMsg) { - log.Printf("Publisher %d received detached, closing", p.handleId) - go p.Close(context.Background()) -} - -func (p *mcuJanusPublisher) handleConnected(event *janus.WebRTCUpMsg) { - log.Printf("Publisher %d received connected", p.handleId) - p.mcu.publisherConnected.Notify(getStreamId(p.id, p.streamType)) -} - -func (p *mcuJanusPublisher) handleSlowLink(event *janus.SlowLinkMsg) { - if event.Uplink { - log.Printf("Publisher %s (%d) is reporting %d lost packets on the uplink (Janus -> client)", p.listener.PublicId(), p.handleId, event.Lost) - } else { - log.Printf("Publisher %s (%d) is reporting %d lost packets on the downlink (client -> Janus)", p.listener.PublicId(), p.handleId, event.Lost) - } -} - -func (p *mcuJanusPublisher) handleMedia(event *janus.MediaMsg) { - mediaType := StreamType(event.Type) - if mediaType == StreamTypeVideo && p.streamType == StreamTypeScreen { - // We want to differentiate between audio, video and screensharing - mediaType = p.streamType - } - - p.stats.EnableStream(mediaType, event.Receiving) -} - -func (p *mcuJanusPublisher) HasMedia(mt MediaType) bool { - return (p.mediaTypes & mt) == mt -} - -func (p *mcuJanusPublisher) SetMedia(mt MediaType) { - p.mediaTypes = mt -} - -func (p *mcuJanusPublisher) NotifyReconnected() { - ctx := context.TODO() - handle, session, roomId, _, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate) - if err != nil { - log.Printf("Could not reconnect publisher %s: %s", p.id, err) - // TODO(jojo): Retry - return - } - - p.handle = handle - p.handleId = handle.Id - p.session = session - p.roomId = roomId - - log.Printf("Publisher %s reconnected on handle %d", p.id, p.handleId) -} - -func (p *mcuJanusPublisher) Close(ctx context.Context) { - notify := false - p.mu.Lock() - if handle := p.handle; handle != nil && p.roomId != 0 { - destroy_msg := map[string]interface{}{ - "request": "destroy", - "room": p.roomId, - } - if _, err := handle.Request(ctx, destroy_msg); err != nil { - log.Printf("Error destroying room %d: %s", p.roomId, err) - } else { - log.Printf("Room %d destroyed", p.roomId) - } - p.mcu.mu.Lock() - delete(p.mcu.publishers, getStreamId(p.id, p.streamType)) - p.mcu.mu.Unlock() - p.roomId = 0 - notify = true - } - p.closeClient(ctx) - p.mu.Unlock() - - p.stats.Reset() - - if notify { - statsPublishersCurrent.WithLabelValues(string(p.streamType)).Dec() - p.mcu.unregisterClient(p) - p.listener.PublisherClosed(p) - } - p.mcuJanusClient.Close(ctx) -} - -func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { - statsMcuMessagesTotal.WithLabelValues(data.Type).Inc() - jsep_msg := data.Payload - switch data.Type { - case "offer": - p.deferred <- func() { - msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) - defer cancel() - - // TODO Tear down previous publisher and get a new one if sid does - // not match? - p.sendOffer(msgctx, jsep_msg, callback) - } - case "candidate": - p.deferred <- func() { - msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) - defer cancel() - - if data.Sid == "" || data.Sid == p.Sid() { - p.sendCandidate(msgctx, jsep_msg["candidate"], callback) - } else { - go callback(fmt.Errorf("Candidate message sid (%s) does not match publisher sid (%s)", data.Sid, p.Sid()), nil) - } - } - case "endOfCandidates": - // Ignore - default: - go callback(fmt.Errorf("Unsupported message type: %s", data.Type), nil) - } -} - -func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error { - msg := map[string]interface{}{ - "request": "publish_remotely", - "room": p.roomId, - "publisher_id": streamTypeUserIds[p.streamType], - "remote_id": p.id, - "host": hostname, - "port": port, - "rtcp_port": rtcpPort, - } - response, err := p.handle.Request(ctx, msg) - if err != nil { - return err - } - - errorMessage := getPluginStringValue(response.PluginData, pluginVideoRoom, "error") - errorCode := getPluginIntValue(response.PluginData, pluginVideoRoom, "error_code") - if errorMessage != "" || errorCode != 0 { - if errorMessage == "" { - errorMessage = "unknown error" - } - return fmt.Errorf("%s (%d)", errorMessage, errorCode) - } - - log.Printf("Publishing %s to %s (port=%d, rtcpPort=%d)", p.id, hostname, port, rtcpPort) - return nil -} - -type mcuJanusSubscriber struct { - mcuJanusClient - - publisher string -} - func (m *mcuJanus) getPublisher(ctx context.Context, publisher string, streamType StreamType) (*mcuJanusPublisher, error) { // Do the direct check immediately as this should be the normal case. key := getStreamId(publisher, streamType) @@ -1302,370 +862,3 @@ func (m *mcuJanus) NewRemoteSubscriber(ctx context.Context, listener McuListener statsSubscribersTotal.WithLabelValues(string(publisher.StreamType())).Inc() return client, nil } - -func (p *mcuJanusSubscriber) Publisher() string { - return p.publisher -} - -func (p *mcuJanusSubscriber) handleEvent(event *janus.EventMsg) { - if videoroom := getPluginStringValue(event.Plugindata, pluginVideoRoom, "videoroom"); videoroom != "" { - ctx := context.TODO() - switch videoroom { - case "destroyed": - log.Printf("Subscriber %d: associated room has been destroyed, closing", p.handleId) - go p.Close(ctx) - case "event": - // Handle renegotiations, but ignore other events like selected - // substream / temporal layer. - if getPluginStringValue(event.Plugindata, pluginVideoRoom, "configured") == "ok" && - event.Jsep != nil && event.Jsep["type"] == "offer" && event.Jsep["sdp"] != nil { - p.listener.OnUpdateOffer(p, event.Jsep) - } - case "slow_link": - // Ignore, processed through "handleSlowLink" in the general events. - default: - log.Printf("Unsupported videoroom event %s for subscriber %d: %+v", videoroom, p.handleId, event) - } - } else { - log.Printf("Unsupported event for subscriber %d: %+v", p.handleId, event) - } -} - -func (p *mcuJanusSubscriber) handleHangup(event *janus.HangupMsg) { - log.Printf("Subscriber %d received hangup (%s), closing", p.handleId, event.Reason) - go p.Close(context.Background()) -} - -func (p *mcuJanusSubscriber) handleDetached(event *janus.DetachedMsg) { - log.Printf("Subscriber %d received detached, closing", p.handleId) - go p.Close(context.Background()) -} - -func (p *mcuJanusSubscriber) handleConnected(event *janus.WebRTCUpMsg) { - log.Printf("Subscriber %d received connected", p.handleId) - p.mcu.SubscriberConnected(p.Id(), p.publisher, p.streamType) -} - -func (p *mcuJanusSubscriber) handleSlowLink(event *janus.SlowLinkMsg) { - if event.Uplink { - log.Printf("Subscriber %s (%d) is reporting %d lost packets on the uplink (Janus -> client)", p.listener.PublicId(), p.handleId, event.Lost) - } else { - log.Printf("Subscriber %s (%d) is reporting %d lost packets on the downlink (client -> Janus)", p.listener.PublicId(), p.handleId, event.Lost) - } -} - -func (p *mcuJanusSubscriber) handleMedia(event *janus.MediaMsg) { - // Only triggered for publishers -} - -func (p *mcuJanusSubscriber) NotifyReconnected() { - ctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) - defer cancel() - handle, pub, err := p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType) - if err != nil { - // TODO(jojo): Retry? - log.Printf("Could not reconnect subscriber for publisher %s: %s", p.publisher, err) - p.Close(context.Background()) - return - } - - p.handle = handle - p.handleId = handle.Id - p.roomId = pub.roomId - p.sid = strconv.FormatUint(handle.Id, 10) - p.listener.SubscriberSidUpdated(p) - log.Printf("Subscriber %d for publisher %s reconnected on handle %d", p.id, p.publisher, p.handleId) -} - -func (p *mcuJanusSubscriber) Close(ctx context.Context) { - p.mu.Lock() - closed := p.closeClient(ctx) - p.mu.Unlock() - - if closed { - p.mcu.SubscriberDisconnected(p.Id(), p.publisher, p.streamType) - statsSubscribersCurrent.WithLabelValues(string(p.streamType)).Dec() - } - p.mcu.unregisterClient(p) - p.listener.SubscriberClosed(p) - p.mcuJanusClient.Close(ctx) -} - -func (p *mcuJanusSubscriber) joinRoom(ctx context.Context, stream *streamSelection, callback func(error, map[string]interface{})) { - handle := p.handle - if handle == nil { - callback(ErrNotConnected, nil) - return - } - - waiter := p.mcu.publisherConnected.NewWaiter(getStreamId(p.publisher, p.streamType)) - defer p.mcu.publisherConnected.Release(waiter) - - loggedNotPublishingYet := false -retry: - join_msg := map[string]interface{}{ - "request": "join", - "ptype": "subscriber", - "room": p.roomId, - } - if p.mcu.isMultistream() { - join_msg["streams"] = []map[string]interface{}{ - { - "feed": streamTypeUserIds[p.streamType], - }, - } - } else { - join_msg["feed"] = streamTypeUserIds[p.streamType] - } - if stream != nil { - stream.AddToMessage(join_msg) - } - join_response, err := handle.Message(ctx, join_msg, nil) - if err != nil { - callback(err, nil) - return - } - - if error_code := getPluginIntValue(join_response.Plugindata, pluginVideoRoom, "error_code"); error_code > 0 { - switch error_code { - case JANUS_VIDEOROOM_ERROR_ALREADY_JOINED: - // The subscriber is already connected to the room. This can happen - // if a client leaves a call but keeps the subscriber objects active. - // On joining the call again, the subscriber tries to join on the - // MCU which will fail because he is still connected. - // To get a new Offer SDP, we have to tear down the session on the - // MCU and join again. - p.mu.Lock() - p.closeClient(ctx) - p.mu.Unlock() - - var pub *mcuJanusPublisher - handle, pub, err = p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType) - if err != nil { - // Reconnection didn't work, need to unregister/remove subscriber - // so a new object will be created if the request is retried. - p.mcu.unregisterClient(p) - p.listener.SubscriberClosed(p) - callback(fmt.Errorf("Already connected as subscriber for %s, error during re-joining: %s", p.streamType, err), nil) - return - } - - p.handle = handle - p.handleId = handle.Id - p.roomId = pub.roomId - p.sid = strconv.FormatUint(handle.Id, 10) - p.listener.SubscriberSidUpdated(p) - p.closeChan = make(chan struct{}, 1) - go p.run(p.handle, p.closeChan) - log.Printf("Already connected subscriber %d for %s, leaving and re-joining on handle %d", p.id, p.streamType, p.handleId) - goto retry - case JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM: - fallthrough - case JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED: - switch error_code { - case JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM: - log.Printf("Publisher %s not created yet for %s, wait and retry to join room %d as subscriber", p.publisher, p.streamType, p.roomId) - case JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED: - log.Printf("Publisher %s not sending yet for %s, wait and retry to join room %d as subscriber", p.publisher, p.streamType, p.roomId) - } - - if !loggedNotPublishingYet { - loggedNotPublishingYet = true - statsWaitingForPublisherTotal.WithLabelValues(string(p.streamType)).Inc() - } - - if err := waiter.Wait(ctx); err != nil { - callback(err, nil) - return - } - log.Printf("Retry subscribing %s from %s", p.streamType, p.publisher) - goto retry - default: - // TODO(jojo): Should we handle other errors, too? - callback(fmt.Errorf("Error joining room as subscriber: %+v", join_response), nil) - return - } - } - //log.Println("Joined as listener", join_response) - - p.session = join_response.Session - callback(nil, join_response.Jsep) -} - -func (p *mcuJanusSubscriber) update(ctx context.Context, stream *streamSelection, callback func(error, map[string]interface{})) { - handle := p.handle - if handle == nil { - callback(ErrNotConnected, nil) - return - } - - configure_msg := map[string]interface{}{ - "request": "configure", - "update": true, - } - if stream != nil { - stream.AddToMessage(configure_msg) - } - configure_response, err := handle.Message(ctx, configure_msg, nil) - if err != nil { - callback(err, nil) - return - } - - callback(nil, configure_response.Jsep) -} - -type streamSelection struct { - substream sql.NullInt16 - temporal sql.NullInt16 - audio sql.NullBool - video sql.NullBool -} - -func (s *streamSelection) HasValues() bool { - return s.substream.Valid || s.temporal.Valid || s.audio.Valid || s.video.Valid -} - -func (s *streamSelection) AddToMessage(message map[string]interface{}) { - if s.substream.Valid { - message["substream"] = s.substream.Int16 - } - if s.temporal.Valid { - message["temporal"] = s.temporal.Int16 - } - if s.audio.Valid { - message["audio"] = s.audio.Bool - } - if s.video.Valid { - message["video"] = s.video.Bool - } -} - -func parseStreamSelection(payload map[string]interface{}) (*streamSelection, error) { - var stream streamSelection - if value, found := payload["substream"]; found { - switch value := value.(type) { - case int: - stream.substream.Valid = true - stream.substream.Int16 = int16(value) - case float32: - stream.substream.Valid = true - stream.substream.Int16 = int16(value) - case float64: - stream.substream.Valid = true - stream.substream.Int16 = int16(value) - default: - return nil, fmt.Errorf("Unsupported substream value: %v", value) - } - } - - if value, found := payload["temporal"]; found { - switch value := value.(type) { - case int: - stream.temporal.Valid = true - stream.temporal.Int16 = int16(value) - case float32: - stream.temporal.Valid = true - stream.temporal.Int16 = int16(value) - case float64: - stream.temporal.Valid = true - stream.temporal.Int16 = int16(value) - default: - return nil, fmt.Errorf("Unsupported temporal value: %v", value) - } - } - - if value, found := payload["audio"]; found { - switch value := value.(type) { - case bool: - stream.audio.Valid = true - stream.audio.Bool = value - default: - return nil, fmt.Errorf("Unsupported audio value: %v", value) - } - } - - if value, found := payload["video"]; found { - switch value := value.(type) { - case bool: - stream.video.Valid = true - stream.video.Bool = value - default: - return nil, fmt.Errorf("Unsupported video value: %v", value) - } - } - - return &stream, nil -} - -func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { - statsMcuMessagesTotal.WithLabelValues(data.Type).Inc() - jsep_msg := data.Payload - switch data.Type { - case "requestoffer": - fallthrough - case "sendoffer": - p.deferred <- func() { - msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) - defer cancel() - - stream, err := parseStreamSelection(jsep_msg) - if err != nil { - go callback(err, nil) - return - } - - if data.Sid == "" || data.Sid != p.Sid() { - p.joinRoom(msgctx, stream, callback) - } else { - p.update(msgctx, stream, callback) - } - } - case "answer": - p.deferred <- func() { - msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) - defer cancel() - - if data.Sid == "" || data.Sid == p.Sid() { - p.sendAnswer(msgctx, jsep_msg, callback) - } else { - go callback(fmt.Errorf("Answer message sid (%s) does not match subscriber sid (%s)", data.Sid, p.Sid()), nil) - } - } - case "candidate": - p.deferred <- func() { - msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) - defer cancel() - - if data.Sid == "" || data.Sid == p.Sid() { - p.sendCandidate(msgctx, jsep_msg["candidate"], callback) - } else { - go callback(fmt.Errorf("Candidate message sid (%s) does not match subscriber sid (%s)", data.Sid, p.Sid()), nil) - } - } - case "endOfCandidates": - // Ignore - case "selectStream": - stream, err := parseStreamSelection(jsep_msg) - if err != nil { - go callback(err, nil) - return - } - - if stream == nil || !stream.HasValues() { - // Nothing to do - go callback(nil, nil) - return - } - - p.deferred <- func() { - msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) - defer cancel() - - p.selectStream(msgctx, stream, callback) - } - default: - // Return error asynchronously - go callback(fmt.Errorf("Unsupported message type: %s", data.Type), nil) - } -} diff --git a/mcu_janus_client.go b/mcu_janus_client.go new file mode 100644 index 0000000..f1d254b --- /dev/null +++ b/mcu_janus_client.go @@ -0,0 +1,216 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2017 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "context" + "log" + "reflect" + "strconv" + "sync" + + "github.com/notedit/janus-go" +) + +type mcuJanusClient struct { + mcu *mcuJanus + listener McuListener + mu sync.Mutex // nolint + + id uint64 + session uint64 + roomId uint64 + sid string + streamType StreamType + maxBitrate int + + handle *JanusHandle + handleId uint64 + closeChan chan struct{} + deferred chan func() + + handleEvent func(event *janus.EventMsg) + handleHangup func(event *janus.HangupMsg) + handleDetached func(event *janus.DetachedMsg) + handleConnected func(event *janus.WebRTCUpMsg) + handleSlowLink func(event *janus.SlowLinkMsg) + handleMedia func(event *janus.MediaMsg) +} + +func (c *mcuJanusClient) Id() string { + return strconv.FormatUint(c.id, 10) +} + +func (c *mcuJanusClient) Sid() string { + return c.sid +} + +func (c *mcuJanusClient) StreamType() StreamType { + return c.streamType +} + +func (c *mcuJanusClient) MaxBitrate() int { + return c.maxBitrate +} + +func (c *mcuJanusClient) Close(ctx context.Context) { +} + +func (c *mcuJanusClient) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { +} + +func (c *mcuJanusClient) closeClient(ctx context.Context) bool { + if handle := c.handle; handle != nil { + c.handle = nil + close(c.closeChan) + if _, err := handle.Detach(ctx); err != nil { + if e, ok := err.(*janus.ErrorMsg); !ok || e.Err.Code != JANUS_ERROR_HANDLE_NOT_FOUND { + log.Println("Could not detach client", handle.Id, err) + } + } + return true + } + + return false +} + +func (c *mcuJanusClient) run(handle *JanusHandle, closeChan <-chan struct{}) { +loop: + for { + select { + case msg := <-handle.Events: + switch t := msg.(type) { + case *janus.EventMsg: + c.handleEvent(t) + case *janus.HangupMsg: + c.handleHangup(t) + case *janus.DetachedMsg: + c.handleDetached(t) + case *janus.MediaMsg: + c.handleMedia(t) + case *janus.WebRTCUpMsg: + c.handleConnected(t) + case *janus.SlowLinkMsg: + c.handleSlowLink(t) + case *TrickleMsg: + c.handleTrickle(t) + default: + log.Println("Received unsupported event type", msg, reflect.TypeOf(msg)) + } + case f := <-c.deferred: + f() + case <-closeChan: + break loop + } + } +} + +func (c *mcuJanusClient) sendOffer(ctx context.Context, offer map[string]interface{}, callback func(error, map[string]interface{})) { + handle := c.handle + if handle == nil { + callback(ErrNotConnected, nil) + return + } + + configure_msg := map[string]interface{}{ + "request": "configure", + "audio": true, + "video": true, + "data": true, + } + answer_msg, err := handle.Message(ctx, configure_msg, offer) + if err != nil { + callback(err, nil) + return + } + + callback(nil, answer_msg.Jsep) +} + +func (c *mcuJanusClient) sendAnswer(ctx context.Context, answer map[string]interface{}, callback func(error, map[string]interface{})) { + handle := c.handle + if handle == nil { + callback(ErrNotConnected, nil) + return + } + + start_msg := map[string]interface{}{ + "request": "start", + "room": c.roomId, + } + start_response, err := handle.Message(ctx, start_msg, answer) + if err != nil { + callback(err, nil) + return + } + log.Println("Started listener", start_response) + callback(nil, nil) +} + +func (c *mcuJanusClient) sendCandidate(ctx context.Context, candidate interface{}, callback func(error, map[string]interface{})) { + handle := c.handle + if handle == nil { + callback(ErrNotConnected, nil) + return + } + + if _, err := handle.Trickle(ctx, candidate); err != nil { + callback(err, nil) + return + } + callback(nil, nil) +} + +func (c *mcuJanusClient) handleTrickle(event *TrickleMsg) { + if event.Candidate.Completed { + c.listener.OnIceCompleted(c) + } else { + c.listener.OnIceCandidate(c, event.Candidate) + } +} + +func (c *mcuJanusClient) selectStream(ctx context.Context, stream *streamSelection, callback func(error, map[string]interface{})) { + handle := c.handle + if handle == nil { + callback(ErrNotConnected, nil) + return + } + + if stream == nil || !stream.HasValues() { + callback(nil, nil) + return + } + + configure_msg := map[string]interface{}{ + "request": "configure", + } + if stream != nil { + stream.AddToMessage(configure_msg) + } + _, err := handle.Message(ctx, configure_msg, nil) + if err != nil { + callback(err, nil) + return + } + + callback(nil, nil) +} diff --git a/mcu_janus_publisher.go b/mcu_janus_publisher.go new file mode 100644 index 0000000..ffb5b35 --- /dev/null +++ b/mcu_janus_publisher.go @@ -0,0 +1,205 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2017 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "context" + "fmt" + "log" + + "github.com/notedit/janus-go" +) + +type mcuJanusPublisher struct { + mcuJanusClient + + id string + bitrate int + mediaTypes MediaType + stats publisherStatsCounter +} + +func (p *mcuJanusPublisher) handleEvent(event *janus.EventMsg) { + if videoroom := getPluginStringValue(event.Plugindata, pluginVideoRoom, "videoroom"); videoroom != "" { + ctx := context.TODO() + switch videoroom { + case "destroyed": + log.Printf("Publisher %d: associated room has been destroyed, closing", p.handleId) + go p.Close(ctx) + case "slow_link": + // Ignore, processed through "handleSlowLink" in the general events. + default: + log.Printf("Unsupported videoroom publisher event in %d: %+v", p.handleId, event) + } + } else { + log.Printf("Unsupported publisher event in %d: %+v", p.handleId, event) + } +} + +func (p *mcuJanusPublisher) handleHangup(event *janus.HangupMsg) { + log.Printf("Publisher %d received hangup (%s), closing", p.handleId, event.Reason) + go p.Close(context.Background()) +} + +func (p *mcuJanusPublisher) handleDetached(event *janus.DetachedMsg) { + log.Printf("Publisher %d received detached, closing", p.handleId) + go p.Close(context.Background()) +} + +func (p *mcuJanusPublisher) handleConnected(event *janus.WebRTCUpMsg) { + log.Printf("Publisher %d received connected", p.handleId) + p.mcu.publisherConnected.Notify(getStreamId(p.id, p.streamType)) +} + +func (p *mcuJanusPublisher) handleSlowLink(event *janus.SlowLinkMsg) { + if event.Uplink { + log.Printf("Publisher %s (%d) is reporting %d lost packets on the uplink (Janus -> client)", p.listener.PublicId(), p.handleId, event.Lost) + } else { + log.Printf("Publisher %s (%d) is reporting %d lost packets on the downlink (client -> Janus)", p.listener.PublicId(), p.handleId, event.Lost) + } +} + +func (p *mcuJanusPublisher) handleMedia(event *janus.MediaMsg) { + mediaType := StreamType(event.Type) + if mediaType == StreamTypeVideo && p.streamType == StreamTypeScreen { + // We want to differentiate between audio, video and screensharing + mediaType = p.streamType + } + + p.stats.EnableStream(mediaType, event.Receiving) +} + +func (p *mcuJanusPublisher) HasMedia(mt MediaType) bool { + return (p.mediaTypes & mt) == mt +} + +func (p *mcuJanusPublisher) SetMedia(mt MediaType) { + p.mediaTypes = mt +} + +func (p *mcuJanusPublisher) NotifyReconnected() { + ctx := context.TODO() + handle, session, roomId, _, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate) + if err != nil { + log.Printf("Could not reconnect publisher %s: %s", p.id, err) + // TODO(jojo): Retry + return + } + + p.handle = handle + p.handleId = handle.Id + p.session = session + p.roomId = roomId + + log.Printf("Publisher %s reconnected on handle %d", p.id, p.handleId) +} + +func (p *mcuJanusPublisher) Close(ctx context.Context) { + notify := false + p.mu.Lock() + if handle := p.handle; handle != nil && p.roomId != 0 { + destroy_msg := map[string]interface{}{ + "request": "destroy", + "room": p.roomId, + } + if _, err := handle.Request(ctx, destroy_msg); err != nil { + log.Printf("Error destroying room %d: %s", p.roomId, err) + } else { + log.Printf("Room %d destroyed", p.roomId) + } + p.mcu.mu.Lock() + delete(p.mcu.publishers, getStreamId(p.id, p.streamType)) + p.mcu.mu.Unlock() + p.roomId = 0 + notify = true + } + p.closeClient(ctx) + p.mu.Unlock() + + p.stats.Reset() + + if notify { + statsPublishersCurrent.WithLabelValues(string(p.streamType)).Dec() + p.mcu.unregisterClient(p) + p.listener.PublisherClosed(p) + } + p.mcuJanusClient.Close(ctx) +} + +func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { + statsMcuMessagesTotal.WithLabelValues(data.Type).Inc() + jsep_msg := data.Payload + switch data.Type { + case "offer": + p.deferred <- func() { + msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) + defer cancel() + + // TODO Tear down previous publisher and get a new one if sid does + // not match? + p.sendOffer(msgctx, jsep_msg, callback) + } + case "candidate": + p.deferred <- func() { + msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) + defer cancel() + + if data.Sid == "" || data.Sid == p.Sid() { + p.sendCandidate(msgctx, jsep_msg["candidate"], callback) + } else { + go callback(fmt.Errorf("Candidate message sid (%s) does not match publisher sid (%s)", data.Sid, p.Sid()), nil) + } + } + case "endOfCandidates": + // Ignore + default: + go callback(fmt.Errorf("Unsupported message type: %s", data.Type), nil) + } +} + +func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, hostname string, port int, rtcpPort int) error { + msg := map[string]interface{}{ + "request": "publish_remotely", + "room": p.roomId, + "publisher_id": streamTypeUserIds[p.streamType], + "remote_id": p.id, + "host": hostname, + "port": port, + "rtcp_port": rtcpPort, + } + response, err := p.handle.Request(ctx, msg) + if err != nil { + return err + } + + errorMessage := getPluginStringValue(response.PluginData, pluginVideoRoom, "error") + errorCode := getPluginIntValue(response.PluginData, pluginVideoRoom, "error_code") + if errorMessage != "" || errorCode != 0 { + if errorMessage == "" { + errorMessage = "unknown error" + } + return fmt.Errorf("%s (%d)", errorMessage, errorCode) + } + + log.Printf("Publishing %s to %s (port=%d, rtcpPort=%d)", p.id, hostname, port, rtcpPort) + return nil +} diff --git a/mcu_janus_stream_selection.go b/mcu_janus_stream_selection.go new file mode 100644 index 0000000..9381ef3 --- /dev/null +++ b/mcu_janus_stream_selection.go @@ -0,0 +1,110 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2017 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "database/sql" + "fmt" +) + +type streamSelection struct { + substream sql.NullInt16 + temporal sql.NullInt16 + audio sql.NullBool + video sql.NullBool +} + +func (s *streamSelection) HasValues() bool { + return s.substream.Valid || s.temporal.Valid || s.audio.Valid || s.video.Valid +} + +func (s *streamSelection) AddToMessage(message map[string]interface{}) { + if s.substream.Valid { + message["substream"] = s.substream.Int16 + } + if s.temporal.Valid { + message["temporal"] = s.temporal.Int16 + } + if s.audio.Valid { + message["audio"] = s.audio.Bool + } + if s.video.Valid { + message["video"] = s.video.Bool + } +} + +func parseStreamSelection(payload map[string]interface{}) (*streamSelection, error) { + var stream streamSelection + if value, found := payload["substream"]; found { + switch value := value.(type) { + case int: + stream.substream.Valid = true + stream.substream.Int16 = int16(value) + case float32: + stream.substream.Valid = true + stream.substream.Int16 = int16(value) + case float64: + stream.substream.Valid = true + stream.substream.Int16 = int16(value) + default: + return nil, fmt.Errorf("Unsupported substream value: %v", value) + } + } + + if value, found := payload["temporal"]; found { + switch value := value.(type) { + case int: + stream.temporal.Valid = true + stream.temporal.Int16 = int16(value) + case float32: + stream.temporal.Valid = true + stream.temporal.Int16 = int16(value) + case float64: + stream.temporal.Valid = true + stream.temporal.Int16 = int16(value) + default: + return nil, fmt.Errorf("Unsupported temporal value: %v", value) + } + } + + if value, found := payload["audio"]; found { + switch value := value.(type) { + case bool: + stream.audio.Valid = true + stream.audio.Bool = value + default: + return nil, fmt.Errorf("Unsupported audio value: %v", value) + } + } + + if value, found := payload["video"]; found { + switch value := value.(type) { + case bool: + stream.video.Valid = true + stream.video.Bool = value + default: + return nil, fmt.Errorf("Unsupported video value: %v", value) + } + } + + return &stream, nil +} diff --git a/mcu_janus_subscriber.go b/mcu_janus_subscriber.go new file mode 100644 index 0000000..b63f4e9 --- /dev/null +++ b/mcu_janus_subscriber.go @@ -0,0 +1,321 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2017 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "context" + "fmt" + "log" + "strconv" + + "github.com/notedit/janus-go" +) + +type mcuJanusSubscriber struct { + mcuJanusClient + + publisher string +} + +func (p *mcuJanusSubscriber) Publisher() string { + return p.publisher +} + +func (p *mcuJanusSubscriber) handleEvent(event *janus.EventMsg) { + if videoroom := getPluginStringValue(event.Plugindata, pluginVideoRoom, "videoroom"); videoroom != "" { + ctx := context.TODO() + switch videoroom { + case "destroyed": + log.Printf("Subscriber %d: associated room has been destroyed, closing", p.handleId) + go p.Close(ctx) + case "event": + // Handle renegotiations, but ignore other events like selected + // substream / temporal layer. + if getPluginStringValue(event.Plugindata, pluginVideoRoom, "configured") == "ok" && + event.Jsep != nil && event.Jsep["type"] == "offer" && event.Jsep["sdp"] != nil { + p.listener.OnUpdateOffer(p, event.Jsep) + } + case "slow_link": + // Ignore, processed through "handleSlowLink" in the general events. + default: + log.Printf("Unsupported videoroom event %s for subscriber %d: %+v", videoroom, p.handleId, event) + } + } else { + log.Printf("Unsupported event for subscriber %d: %+v", p.handleId, event) + } +} + +func (p *mcuJanusSubscriber) handleHangup(event *janus.HangupMsg) { + log.Printf("Subscriber %d received hangup (%s), closing", p.handleId, event.Reason) + go p.Close(context.Background()) +} + +func (p *mcuJanusSubscriber) handleDetached(event *janus.DetachedMsg) { + log.Printf("Subscriber %d received detached, closing", p.handleId) + go p.Close(context.Background()) +} + +func (p *mcuJanusSubscriber) handleConnected(event *janus.WebRTCUpMsg) { + log.Printf("Subscriber %d received connected", p.handleId) + p.mcu.SubscriberConnected(p.Id(), p.publisher, p.streamType) +} + +func (p *mcuJanusSubscriber) handleSlowLink(event *janus.SlowLinkMsg) { + if event.Uplink { + log.Printf("Subscriber %s (%d) is reporting %d lost packets on the uplink (Janus -> client)", p.listener.PublicId(), p.handleId, event.Lost) + } else { + log.Printf("Subscriber %s (%d) is reporting %d lost packets on the downlink (client -> Janus)", p.listener.PublicId(), p.handleId, event.Lost) + } +} + +func (p *mcuJanusSubscriber) handleMedia(event *janus.MediaMsg) { + // Only triggered for publishers +} + +func (p *mcuJanusSubscriber) NotifyReconnected() { + ctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) + defer cancel() + handle, pub, err := p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType) + if err != nil { + // TODO(jojo): Retry? + log.Printf("Could not reconnect subscriber for publisher %s: %s", p.publisher, err) + p.Close(context.Background()) + return + } + + p.handle = handle + p.handleId = handle.Id + p.roomId = pub.roomId + p.sid = strconv.FormatUint(handle.Id, 10) + p.listener.SubscriberSidUpdated(p) + log.Printf("Subscriber %d for publisher %s reconnected on handle %d", p.id, p.publisher, p.handleId) +} + +func (p *mcuJanusSubscriber) Close(ctx context.Context) { + p.mu.Lock() + closed := p.closeClient(ctx) + p.mu.Unlock() + + if closed { + p.mcu.SubscriberDisconnected(p.Id(), p.publisher, p.streamType) + statsSubscribersCurrent.WithLabelValues(string(p.streamType)).Dec() + } + p.mcu.unregisterClient(p) + p.listener.SubscriberClosed(p) + p.mcuJanusClient.Close(ctx) +} + +func (p *mcuJanusSubscriber) joinRoom(ctx context.Context, stream *streamSelection, callback func(error, map[string]interface{})) { + handle := p.handle + if handle == nil { + callback(ErrNotConnected, nil) + return + } + + waiter := p.mcu.publisherConnected.NewWaiter(getStreamId(p.publisher, p.streamType)) + defer p.mcu.publisherConnected.Release(waiter) + + loggedNotPublishingYet := false +retry: + join_msg := map[string]interface{}{ + "request": "join", + "ptype": "subscriber", + "room": p.roomId, + } + if p.mcu.isMultistream() { + join_msg["streams"] = []map[string]interface{}{ + { + "feed": streamTypeUserIds[p.streamType], + }, + } + } else { + join_msg["feed"] = streamTypeUserIds[p.streamType] + } + if stream != nil { + stream.AddToMessage(join_msg) + } + join_response, err := handle.Message(ctx, join_msg, nil) + if err != nil { + callback(err, nil) + return + } + + if error_code := getPluginIntValue(join_response.Plugindata, pluginVideoRoom, "error_code"); error_code > 0 { + switch error_code { + case JANUS_VIDEOROOM_ERROR_ALREADY_JOINED: + // The subscriber is already connected to the room. This can happen + // if a client leaves a call but keeps the subscriber objects active. + // On joining the call again, the subscriber tries to join on the + // MCU which will fail because he is still connected. + // To get a new Offer SDP, we have to tear down the session on the + // MCU and join again. + p.mu.Lock() + p.closeClient(ctx) + p.mu.Unlock() + + var pub *mcuJanusPublisher + handle, pub, err = p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType) + if err != nil { + // Reconnection didn't work, need to unregister/remove subscriber + // so a new object will be created if the request is retried. + p.mcu.unregisterClient(p) + p.listener.SubscriberClosed(p) + callback(fmt.Errorf("Already connected as subscriber for %s, error during re-joining: %s", p.streamType, err), nil) + return + } + + p.handle = handle + p.handleId = handle.Id + p.roomId = pub.roomId + p.sid = strconv.FormatUint(handle.Id, 10) + p.listener.SubscriberSidUpdated(p) + p.closeChan = make(chan struct{}, 1) + go p.run(p.handle, p.closeChan) + log.Printf("Already connected subscriber %d for %s, leaving and re-joining on handle %d", p.id, p.streamType, p.handleId) + goto retry + case JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM: + fallthrough + case JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED: + switch error_code { + case JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM: + log.Printf("Publisher %s not created yet for %s, wait and retry to join room %d as subscriber", p.publisher, p.streamType, p.roomId) + case JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED: + log.Printf("Publisher %s not sending yet for %s, wait and retry to join room %d as subscriber", p.publisher, p.streamType, p.roomId) + } + + if !loggedNotPublishingYet { + loggedNotPublishingYet = true + statsWaitingForPublisherTotal.WithLabelValues(string(p.streamType)).Inc() + } + + if err := waiter.Wait(ctx); err != nil { + callback(err, nil) + return + } + log.Printf("Retry subscribing %s from %s", p.streamType, p.publisher) + goto retry + default: + // TODO(jojo): Should we handle other errors, too? + callback(fmt.Errorf("Error joining room as subscriber: %+v", join_response), nil) + return + } + } + //log.Println("Joined as listener", join_response) + + p.session = join_response.Session + callback(nil, join_response.Jsep) +} + +func (p *mcuJanusSubscriber) update(ctx context.Context, stream *streamSelection, callback func(error, map[string]interface{})) { + handle := p.handle + if handle == nil { + callback(ErrNotConnected, nil) + return + } + + configure_msg := map[string]interface{}{ + "request": "configure", + "update": true, + } + if stream != nil { + stream.AddToMessage(configure_msg) + } + configure_response, err := handle.Message(ctx, configure_msg, nil) + if err != nil { + callback(err, nil) + return + } + + callback(nil, configure_response.Jsep) +} + +func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { + statsMcuMessagesTotal.WithLabelValues(data.Type).Inc() + jsep_msg := data.Payload + switch data.Type { + case "requestoffer": + fallthrough + case "sendoffer": + p.deferred <- func() { + msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) + defer cancel() + + stream, err := parseStreamSelection(jsep_msg) + if err != nil { + go callback(err, nil) + return + } + + if data.Sid == "" || data.Sid != p.Sid() { + p.joinRoom(msgctx, stream, callback) + } else { + p.update(msgctx, stream, callback) + } + } + case "answer": + p.deferred <- func() { + msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) + defer cancel() + + if data.Sid == "" || data.Sid == p.Sid() { + p.sendAnswer(msgctx, jsep_msg, callback) + } else { + go callback(fmt.Errorf("Answer message sid (%s) does not match subscriber sid (%s)", data.Sid, p.Sid()), nil) + } + } + case "candidate": + p.deferred <- func() { + msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) + defer cancel() + + if data.Sid == "" || data.Sid == p.Sid() { + p.sendCandidate(msgctx, jsep_msg["candidate"], callback) + } else { + go callback(fmt.Errorf("Candidate message sid (%s) does not match subscriber sid (%s)", data.Sid, p.Sid()), nil) + } + } + case "endOfCandidates": + // Ignore + case "selectStream": + stream, err := parseStreamSelection(jsep_msg) + if err != nil { + go callback(err, nil) + return + } + + if stream == nil || !stream.HasValues() { + // Nothing to do + go callback(nil, nil) + return + } + + p.deferred <- func() { + msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) + defer cancel() + + p.selectStream(msgctx, stream, callback) + } + default: + // Return error asynchronously + go callback(fmt.Errorf("Unsupported message type: %s", data.Type), nil) + } +} diff --git a/publisher_stats_counter.go b/publisher_stats_counter.go new file mode 100644 index 0000000..ba8b293 --- /dev/null +++ b/publisher_stats_counter.go @@ -0,0 +1,99 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2021 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "sync" +) + +type publisherStatsCounter struct { + mu sync.Mutex + + streamTypes map[StreamType]bool + subscribers map[string]bool +} + +func (c *publisherStatsCounter) Reset() { + c.mu.Lock() + defer c.mu.Unlock() + + count := len(c.subscribers) + for streamType := range c.streamTypes { + statsMcuPublisherStreamTypesCurrent.WithLabelValues(string(streamType)).Dec() + statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Sub(float64(count)) + } + c.streamTypes = nil + c.subscribers = nil +} + +func (c *publisherStatsCounter) EnableStream(streamType StreamType, enable bool) { + c.mu.Lock() + defer c.mu.Unlock() + + if enable == c.streamTypes[streamType] { + return + } + + if enable { + if c.streamTypes == nil { + c.streamTypes = make(map[StreamType]bool) + } + c.streamTypes[streamType] = true + statsMcuPublisherStreamTypesCurrent.WithLabelValues(string(streamType)).Inc() + statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Add(float64(len(c.subscribers))) + } else { + delete(c.streamTypes, streamType) + statsMcuPublisherStreamTypesCurrent.WithLabelValues(string(streamType)).Dec() + statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Sub(float64(len(c.subscribers))) + } +} + +func (c *publisherStatsCounter) AddSubscriber(id string) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.subscribers[id] { + return + } + + if c.subscribers == nil { + c.subscribers = make(map[string]bool) + } + c.subscribers[id] = true + for streamType := range c.streamTypes { + statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Inc() + } +} + +func (c *publisherStatsCounter) RemoveSubscriber(id string) { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.subscribers[id] { + return + } + + delete(c.subscribers, id) + for streamType := range c.streamTypes { + statsMcuSubscriberStreamTypesCurrent.WithLabelValues(string(streamType)).Dec() + } +} diff --git a/mcu_janus_test.go b/publisher_stats_counter_test.go similarity index 100% rename from mcu_janus_test.go rename to publisher_stats_counter_test.go