diff --git a/mcu_janus.go b/mcu_janus.go index d779879..f1fb81b 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -33,7 +33,6 @@ import ( "time" "github.com/dlintw/goconf" - "github.com/nats-io/nats.go" "github.com/notedit/janus-go" ) @@ -135,9 +134,8 @@ type mcuJanus struct { // 64-bit members that are accessed atomically must be 64-bit aligned. clientId uint64 - url string - mu sync.Mutex - nats NatsClient + url string + mu sync.Mutex maxStreamBitrate int maxScreenBitrate int @@ -152,7 +150,9 @@ type mcuJanus struct { muClients sync.Mutex clients map[clientInterface]bool - publisherRoomIds map[string]uint64 + publishers map[string]*mcuJanusPublisher + publisherCreated Notifier + publisherConnected Notifier reconnectTimer *time.Timer reconnectInterval time.Duration @@ -165,7 +165,7 @@ type mcuJanus struct { func emptyOnConnected() {} func emptyOnDisconnected() {} -func NewMcuJanus(url string, config *goconf.ConfigFile, nats NatsClient) (Mcu, error) { +func NewMcuJanus(url string, config *goconf.ConfigFile) (Mcu, error) { maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate") if maxStreamBitrate <= 0 { maxStreamBitrate = defaultMaxStreamBitrate @@ -182,13 +182,13 @@ func NewMcuJanus(url string, config *goconf.ConfigFile, nats NatsClient) (Mcu, e mcu := &mcuJanus{ url: url, - nats: nats, maxStreamBitrate: maxStreamBitrate, maxScreenBitrate: maxScreenBitrate, mcuTimeout: mcuTimeout, closeChan: make(chan bool, 1), clients: make(map[clientInterface]bool), - publisherRoomIds: make(map[string]uint64), + + publishers: make(map[string]*mcuJanusPublisher), reconnectInterval: initialReconnectInterval, } @@ -249,7 +249,9 @@ func (m *mcuJanus) doReconnect() { log.Println("Reconnection to Janus gateway successful") m.mu.Lock() - m.publisherRoomIds = make(map[string]uint64) + m.publishers = make(map[string]*mcuJanusPublisher) + m.publisherCreated.Reset() + m.publisherConnected.Reset() m.reconnectInterval = initialReconnectInterval m.mu.Unlock() @@ -306,7 +308,6 @@ func (m *mcuJanus) Start() error { } else { log.Println("Full-Trickle is enabled") } - log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate) log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate) @@ -407,7 +408,7 @@ func (m *mcuJanus) GetStats() interface{} { result.Uptime = &m.connectedSince } m.mu.Lock() - result.Publishers = int64(len(m.publisherRoomIds)) + result.Publishers = int64(len(m.publishers)) m.mu.Unlock() m.muClients.Lock() result.Clients = int64(len(m.clients)) @@ -597,53 +598,44 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st } log.Printf("Attached %s as publisher %d to plugin %s in session %d", streamType, handle.Id, pluginVideoRoom, session.Id) - roomId, err := m.searchPublisherRoom(ctx, id, streamType) - if err != nil { - log.Printf("Could not search for room of publisher %s: %s", id, err) + create_msg := map[string]interface{}{ + "request": "create", + "description": id + "|" + streamType, + // We publish every stream in its own Janus room. + "publishers": 1, + // Do not use the video-orientation RTP extension as it breaks video + // orientation changes in Firefox. + "videoorient_ext": false, } - - if roomId == 0 { - create_msg := map[string]interface{}{ - "request": "create", - "description": id + "|" + streamType, - // We publish every stream in its own Janus room. - "publishers": 1, - // Do not use the video-orientation RTP extension as it breaks video - // orientation changes in Firefox. - "videoorient_ext": false, - } - var maxBitrate int - if streamType == streamTypeScreen { - maxBitrate = m.maxScreenBitrate - } else { - maxBitrate = m.maxStreamBitrate - } - if bitrate <= 0 { - bitrate = maxBitrate - } else { - bitrate = min(bitrate, maxBitrate) - } - create_msg["bitrate"] = bitrate - create_response, err := handle.Request(ctx, create_msg) - if err != nil { - if _, err2 := handle.Detach(ctx); err2 != nil { - log.Printf("Error detaching handle %d: %s", handle.Id, err2) - } - return nil, 0, 0, err - } - - roomId = getPluginIntValue(create_response.PluginData, pluginVideoRoom, "room") - if roomId == 0 { - if _, err := handle.Detach(ctx); err != nil { - log.Printf("Error detaching handle %d: %s", handle.Id, err) - } - return nil, 0, 0, fmt.Errorf("No room id received: %+v", create_response) - } - - log.Println("Created room", roomId, create_response.PluginData) + var maxBitrate int + if streamType == streamTypeScreen { + maxBitrate = m.maxScreenBitrate } else { - log.Println("Use existing room", roomId) + maxBitrate = m.maxStreamBitrate } + if bitrate <= 0 { + bitrate = maxBitrate + } else { + bitrate = min(bitrate, maxBitrate) + } + create_msg["bitrate"] = bitrate + create_response, err := handle.Request(ctx, create_msg) + if err != nil { + if _, err2 := handle.Detach(ctx); err2 != nil { + log.Printf("Error detaching handle %d: %s", handle.Id, err2) + } + return nil, 0, 0, err + } + + roomId := getPluginIntValue(create_response.PluginData, pluginVideoRoom, "room") + if roomId == 0 { + if _, err := handle.Detach(ctx); err != nil { + log.Printf("Error detaching handle %d: %s", handle.Id, err) + } + return nil, 0, 0, fmt.Errorf("No room id received: %+v", create_response) + } + + log.Println("Created room", roomId, create_response.PluginData) msg := map[string]interface{}{ "request": "join", @@ -696,16 +688,14 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st client.mcuJanusClient.handleDetached = client.handleDetached client.mcuJanusClient.handleConnected = client.handleConnected client.mcuJanusClient.handleSlowLink = client.handleSlowLink - m.mu.Lock() - m.publisherRoomIds[id+"|"+streamType] = roomId - m.mu.Unlock() m.registerClient(client) - if err := client.publishNats("created"); err != nil { - log.Printf("Could not publish \"created\" event for publisher %s: %s\n", id, err) - } log.Printf("Publisher %s is using handle %d", client.id, client.handleId) go client.run(handle, client.closeChan) + m.mu.Lock() + m.publishers[id+"|"+streamType] = client + m.publisherCreated.Notify(id + "|" + streamType) + m.mu.Unlock() return client, nil } @@ -738,9 +728,7 @@ func (p *mcuJanusPublisher) handleDetached(event *janus.DetachedMsg) { func (p *mcuJanusPublisher) handleConnected(event *janus.WebRTCUpMsg) { log.Printf("Publisher %d received connected", p.handleId) - if err := p.publishNats("connected"); err != nil { - log.Printf("Could not publish \"connected\" event for publisher %s: %s\n", p.id, err) - } + p.mcu.publisherConnected.Notify(p.id + "|" + p.streamType) } func (p *mcuJanusPublisher) handleSlowLink(event *janus.SlowLinkMsg) { @@ -751,10 +739,6 @@ func (p *mcuJanusPublisher) handleSlowLink(event *janus.SlowLinkMsg) { } } -func (p *mcuJanusPublisher) publishNats(messageType string) error { - return p.mcu.nats.PublishNats("publisher-"+p.id+"|"+p.streamType, &NatsMessage{Type: messageType}) -} - func (p *mcuJanusPublisher) NotifyReconnected() { ctx := context.TODO() handle, session, roomId, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate) @@ -769,9 +753,6 @@ func (p *mcuJanusPublisher) NotifyReconnected() { p.session = session p.roomId = roomId - p.mcu.mu.Lock() - p.mcu.publisherRoomIds[p.id+"|"+p.streamType] = roomId - p.mcu.mu.Unlock() log.Printf("Publisher %s reconnected on handle %d", p.id, p.handleId) } @@ -789,7 +770,7 @@ func (p *mcuJanusPublisher) Close(ctx context.Context) { log.Printf("Room %d destroyed", p.roomId) } p.mcu.mu.Lock() - delete(p.mcu.publisherRoomIds, p.id+"|"+p.streamType) + delete(p.mcu.publishers, p.id+"|"+p.streamType) p.mcu.mu.Unlock() p.roomId = 0 notify = true @@ -833,134 +814,52 @@ type mcuJanusSubscriber struct { publisher string } -func (m *mcuJanus) lookupPublisherRoom(ctx context.Context, publisher string, streamType string) (uint64, error) { - handle := m.handle - if handle == nil { - return 0, ErrNotConnected - } - list_msg := map[string]interface{}{ - "request": "list", - } - response_msg, err := handle.Request(ctx, list_msg) - if err != nil { - return 0, err - } - list, found := response_msg.PluginData.Data["list"] - if !found { - return 0, fmt.Errorf("no room list received") - } - - entries, ok := list.([]interface{}) - if !ok { - return 0, fmt.Errorf("Unsupported list received: %+v (%s)", list, reflect.TypeOf(list)) - } - - for _, entry := range entries { - if entry, ok := entry.(map[string]interface{}); ok { - description, found := entry["description"] - if !found { - continue - } - if description, ok := description.(string); ok { - if description != publisher+"|"+streamType { - continue - } - - roomIdInterface, found := entry["room"] - if !found { - continue - } - - roomId, err := convertIntValue(roomIdInterface) - if err != nil { - return 0, fmt.Errorf("Invalid room id received: %+v: %s", entry, err) - } - - return roomId, nil - } - } - } - - return 0, nil -} - -func (m *mcuJanus) searchPublisherRoom(ctx context.Context, publisher string, streamType string) (uint64, error) { - // Check for publishers connected to this signaling server. - m.mu.Lock() - roomId, found := m.publisherRoomIds[publisher+"|"+streamType] - m.mu.Unlock() - if found { - return roomId, nil - } - - // Check for publishers connected to a different signaling server. - roomId, err := m.lookupPublisherRoom(ctx, publisher, streamType) - if err != nil { - return 0, err - } - - return roomId, nil -} - -func (m *mcuJanus) getPublisherRoomId(ctx context.Context, publisher string, streamType string) (uint64, error) { +func (m *mcuJanus) getPublisher(ctx context.Context, publisher string, streamType string) (*mcuJanusPublisher, error) { // Do the direct check immediately as this should be the normal case. + key := publisher + "|" + streamType m.mu.Lock() - roomId, found := m.publisherRoomIds[publisher+"|"+streamType] - if found { + if result, found := m.publishers[key]; found { m.mu.Unlock() - return roomId, nil + return result, nil } - wakeupChan := make(chan *nats.Msg, 1) - sub, err := m.nats.Subscribe("publisher-"+publisher+"|"+streamType, wakeupChan) + waiter := m.publisherCreated.NewWaiter(key) m.mu.Unlock() - if err != nil { - return 0, err - } - defer func() { - if err := sub.Unsubscribe(); err != nil { - log.Printf("Error unsubscribing channel for %s publisher %s: %s", streamType, publisher, err) - } - }() + defer m.publisherCreated.Release(waiter) - for roomId == 0 { - var err error - if roomId, err = m.searchPublisherRoom(ctx, publisher, streamType); err != nil { - log.Printf("Could not search for room of publisher %s: %s", publisher, err) - } else if roomId > 0 { - break + for { + m.mu.Lock() + result := m.publishers[key] + m.mu.Unlock() + if result != nil { + return result, nil } - select { - case <-wakeupChan: - // We got the wakeup event through NATS, the publisher should be - // ready now. - case <-ctx.Done(): - return 0, ctx.Err() + if err := waiter.Wait(ctx); err != nil { + return nil, err } } - return roomId, nil } -func (m *mcuJanus) getOrCreateSubscriberHandle(ctx context.Context, publisher string, streamType string) (*JanusHandle, uint64, error) { - var roomId uint64 +func (m *mcuJanus) getOrCreateSubscriberHandle(ctx context.Context, publisher string, streamType string) (*JanusHandle, *mcuJanusPublisher, error) { + var pub *mcuJanusPublisher var err error - if roomId, err = m.getPublisherRoomId(ctx, publisher, streamType); err != nil { - return nil, 0, err + if pub, err = m.getPublisher(ctx, publisher, streamType); err != nil { + return nil, nil, err } session := m.session if session == nil { - return nil, 0, ErrNotConnected + return nil, nil, ErrNotConnected } handle, err := session.Attach(ctx, pluginVideoRoom) if err != nil { - return nil, 0, err + return nil, nil, err } - log.Printf("Attached subscriber to room %d of publisher %s in plugin %s in session %d as %d", roomId, publisher, pluginVideoRoom, session.Id, handle.Id) - return handle, roomId, nil + log.Printf("Attached subscriber to room %d of publisher %s in plugin %s in session %d as %d", pub.roomId, publisher, pluginVideoRoom, session.Id, handle.Id) + return handle, pub, nil } func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error) { @@ -968,7 +867,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ return nil, fmt.Errorf("Unsupported stream type %s", streamType) } - handle, roomId, err := m.getOrCreateSubscriberHandle(ctx, publisher, streamType) + handle, pub, err := m.getOrCreateSubscriberHandle(ctx, publisher, streamType) if err != nil { return nil, err } @@ -979,7 +878,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ listener: listener, id: atomic.AddUint64(&m.clientId, 1), - roomId: roomId, + roomId: pub.roomId, streamType: streamType, handle: handle, @@ -1047,7 +946,7 @@ func (p *mcuJanusSubscriber) handleSlowLink(event *janus.SlowLinkMsg) { func (p *mcuJanusSubscriber) NotifyReconnected() { ctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout) defer cancel() - handle, roomId, err := p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType) + handle, pub, err := p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType) if err != nil { // TODO(jojo): Retry? log.Printf("Could not reconnect subscriber for publisher %s: %s\n", p.publisher, err) @@ -1057,7 +956,7 @@ func (p *mcuJanusSubscriber) NotifyReconnected() { p.handle = handle p.handleId = handle.Id - p.roomId = roomId + p.roomId = pub.roomId log.Printf("Subscriber %d for publisher %s reconnected on handle %d", p.id, p.publisher, p.handleId) } @@ -1077,17 +976,8 @@ func (p *mcuJanusSubscriber) joinRoom(ctx context.Context, callback func(error, return } - wakeupChan := make(chan *nats.Msg, 1) - sub, err := p.mcu.nats.Subscribe("publisher-"+p.publisher+"|"+p.streamType, wakeupChan) - if err != nil { - callback(err, nil) - return - } - defer func() { - if err := sub.Unsubscribe(); err != nil { - log.Printf("Error unsubscribing channel for %s publisher %s: %s", p.streamType, p.publisher, err) - } - }() + waiter := p.mcu.publisherConnected.NewWaiter(p.publisher + "|" + p.streamType) + defer p.mcu.publisherConnected.Release(waiter) retry: join_msg := map[string]interface{}{ @@ -1115,8 +1005,8 @@ retry: p.closeClient(ctx) p.mu.Unlock() - var roomId uint64 - handle, roomId, err = p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType) + var pub *mcuJanusPublisher + handle, pub, err = p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType) if err != nil { // Reconnection didn't work, need to unregister/remove subscriber // so a new object will be created if the request is retried. @@ -1128,7 +1018,7 @@ retry: p.handle = handle p.handleId = handle.Id - p.roomId = roomId + p.roomId = pub.roomId p.closeChan = make(chan bool, 1) go p.run(p.handle, p.closeChan) log.Printf("Already connected subscriber %d for %s, leaving and re-joining on handle %d", p.id, p.streamType, p.handleId) @@ -1142,22 +1032,12 @@ retry: case JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED: log.Printf("Publisher %s not sending yet for %s, wait and retry to join room %d as subscriber", p.publisher, p.streamType, p.roomId) } - wait: - select { - case msg := <-wakeupChan: - var message NatsMessage - if err := p.mcu.nats.Decode(msg, &message); err != nil { - log.Printf("Error decoding wakeup NATS message %s (%s)\n", string(msg.Data), err) - goto wait - } else if message.Type != "connected" { - log.Printf("Unsupported NATS message waiting for publisher %s: %+v\n", p.publisher, message) - goto wait - } - log.Printf("Retry subscribing %s from %s", p.streamType, p.publisher) - case <-ctx.Done(): - callback(ctx.Err(), nil) + + if err := waiter.Wait(ctx); err != nil { + callback(err, nil) return } + log.Printf("Retry subscribing %s from %s", p.streamType, p.publisher) goto retry default: // TODO(jojo): Should we handle other errors, too? diff --git a/proxy.conf.in b/proxy.conf.in index cd62a60..c978939 100644 --- a/proxy.conf.in +++ b/proxy.conf.in @@ -20,13 +20,6 @@ # - etcd: Token information are retrieved from an etcd cluster (see below). tokentype = static -[nats] -# Url of NATS backend to use. This can also be a list of URLs to connect to -# multiple backends. For local development, this can be set to ":loopback:" -# to process NATS messages internally instead of sending them through an -# external NATS backend. -#url = nats://localhost:4222 - [tokens] # For token type "static": Mapping of = of signaling # servers allowed to connect. diff --git a/proxy/main.go b/proxy/main.go index a354525..510c959 100644 --- a/proxy/main.go +++ b/proxy/main.go @@ -36,9 +36,6 @@ import ( "github.com/dlintw/goconf" "github.com/gorilla/mux" - "github.com/nats-io/nats.go" - - signaling "github.com/strukturag/nextcloud-spreed-signaling" ) var ( @@ -81,19 +78,9 @@ func main() { runtime.GOMAXPROCS(cpus) log.Printf("Using a maximum of %d CPUs\n", cpus) - natsUrl, _ := config.GetString("nats", "url") - if natsUrl == "" { - natsUrl = nats.DefaultURL - } - - nats, err := signaling.NewNatsClient(natsUrl) - if err != nil { - log.Fatal("Could not create NATS client: ", err) - } - r := mux.NewRouter() - proxy, err := NewProxyServer(r, version, config, nats) + proxy, err := NewProxyServer(r, version, config) if err != nil { log.Fatal(err) } diff --git a/proxy/proxy_server.go b/proxy/proxy_server.go index f683fcb..bbea72b 100644 --- a/proxy/proxy_server.go +++ b/proxy/proxy_server.go @@ -88,7 +88,6 @@ type ProxyServer struct { country string url string - nats signaling.NatsClient mcu signaling.Mcu stopped uint32 @@ -110,7 +109,7 @@ type ProxyServer struct { clientsLock sync.RWMutex } -func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile, nats signaling.NatsClient) (*ProxyServer, error) { +func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile) (*ProxyServer, error) { hashKey := make([]byte, 64) if _, err := rand.Read(hashKey); err != nil { return nil, fmt.Errorf("Could not generate random hash key: %s", err) @@ -172,8 +171,6 @@ func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile, na version: version, country: country, - nats: nats, - shutdownChannel: make(chan bool, 1), upgrader: websocket.Upgrader{ @@ -238,7 +235,7 @@ func (s *ProxyServer) Start(config *goconf.ConfigFile) error { for { switch mcuType { case signaling.McuTypeJanus: - mcu, err = signaling.NewMcuJanus(s.url, config, s.nats) + mcu, err = signaling.NewMcuJanus(s.url, config) default: return fmt.Errorf("Unsupported MCU type: %s", mcuType) } diff --git a/server/main.go b/server/main.go index ccf86ce..02165b1 100644 --- a/server/main.go +++ b/server/main.go @@ -175,7 +175,7 @@ func main() { for { switch mcuType { case signaling.McuTypeJanus: - mcu, err = signaling.NewMcuJanus(mcuUrl, config, nats) + mcu, err = signaling.NewMcuJanus(mcuUrl, config) case signaling.McuTypeProxy: mcu, err = signaling.NewMcuProxy(config) default: