Assume one Janus server is only used by one signaling server / -proxy.

With that, we can notify publisher events locally instead of through NATS
and can remove code that looks up publisher rooms by their ids on Janus.
This commit is contained in:
Joachim Bauch 2021-06-04 14:52:23 +02:00
parent d7bd809c54
commit 9b83993d48
No known key found for this signature in database
GPG Key ID: 77C1D22D53E15F02
5 changed files with 92 additions and 235 deletions

View File

@ -33,7 +33,6 @@ import (
"time"
"github.com/dlintw/goconf"
"github.com/nats-io/nats.go"
"github.com/notedit/janus-go"
)
@ -135,9 +134,8 @@ type mcuJanus struct {
// 64-bit members that are accessed atomically must be 64-bit aligned.
clientId uint64
url string
mu sync.Mutex
nats NatsClient
url string
mu sync.Mutex
maxStreamBitrate int
maxScreenBitrate int
@ -152,7 +150,9 @@ type mcuJanus struct {
muClients sync.Mutex
clients map[clientInterface]bool
publisherRoomIds map[string]uint64
publishers map[string]*mcuJanusPublisher
publisherCreated Notifier
publisherConnected Notifier
reconnectTimer *time.Timer
reconnectInterval time.Duration
@ -165,7 +165,7 @@ type mcuJanus struct {
func emptyOnConnected() {}
func emptyOnDisconnected() {}
func NewMcuJanus(url string, config *goconf.ConfigFile, nats NatsClient) (Mcu, error) {
func NewMcuJanus(url string, config *goconf.ConfigFile) (Mcu, error) {
maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate")
if maxStreamBitrate <= 0 {
maxStreamBitrate = defaultMaxStreamBitrate
@ -182,13 +182,13 @@ func NewMcuJanus(url string, config *goconf.ConfigFile, nats NatsClient) (Mcu, e
mcu := &mcuJanus{
url: url,
nats: nats,
maxStreamBitrate: maxStreamBitrate,
maxScreenBitrate: maxScreenBitrate,
mcuTimeout: mcuTimeout,
closeChan: make(chan bool, 1),
clients: make(map[clientInterface]bool),
publisherRoomIds: make(map[string]uint64),
publishers: make(map[string]*mcuJanusPublisher),
reconnectInterval: initialReconnectInterval,
}
@ -249,7 +249,9 @@ func (m *mcuJanus) doReconnect() {
log.Println("Reconnection to Janus gateway successful")
m.mu.Lock()
m.publisherRoomIds = make(map[string]uint64)
m.publishers = make(map[string]*mcuJanusPublisher)
m.publisherCreated.Reset()
m.publisherConnected.Reset()
m.reconnectInterval = initialReconnectInterval
m.mu.Unlock()
@ -306,7 +308,6 @@ func (m *mcuJanus) Start() error {
} else {
log.Println("Full-Trickle is enabled")
}
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate)
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate)
@ -407,7 +408,7 @@ func (m *mcuJanus) GetStats() interface{} {
result.Uptime = &m.connectedSince
}
m.mu.Lock()
result.Publishers = int64(len(m.publisherRoomIds))
result.Publishers = int64(len(m.publishers))
m.mu.Unlock()
m.muClients.Lock()
result.Clients = int64(len(m.clients))
@ -597,53 +598,44 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st
}
log.Printf("Attached %s as publisher %d to plugin %s in session %d", streamType, handle.Id, pluginVideoRoom, session.Id)
roomId, err := m.searchPublisherRoom(ctx, id, streamType)
if err != nil {
log.Printf("Could not search for room of publisher %s: %s", id, err)
create_msg := map[string]interface{}{
"request": "create",
"description": id + "|" + streamType,
// We publish every stream in its own Janus room.
"publishers": 1,
// Do not use the video-orientation RTP extension as it breaks video
// orientation changes in Firefox.
"videoorient_ext": false,
}
if roomId == 0 {
create_msg := map[string]interface{}{
"request": "create",
"description": id + "|" + streamType,
// We publish every stream in its own Janus room.
"publishers": 1,
// Do not use the video-orientation RTP extension as it breaks video
// orientation changes in Firefox.
"videoorient_ext": false,
}
var maxBitrate int
if streamType == streamTypeScreen {
maxBitrate = m.maxScreenBitrate
} else {
maxBitrate = m.maxStreamBitrate
}
if bitrate <= 0 {
bitrate = maxBitrate
} else {
bitrate = min(bitrate, maxBitrate)
}
create_msg["bitrate"] = bitrate
create_response, err := handle.Request(ctx, create_msg)
if err != nil {
if _, err2 := handle.Detach(ctx); err2 != nil {
log.Printf("Error detaching handle %d: %s", handle.Id, err2)
}
return nil, 0, 0, err
}
roomId = getPluginIntValue(create_response.PluginData, pluginVideoRoom, "room")
if roomId == 0 {
if _, err := handle.Detach(ctx); err != nil {
log.Printf("Error detaching handle %d: %s", handle.Id, err)
}
return nil, 0, 0, fmt.Errorf("No room id received: %+v", create_response)
}
log.Println("Created room", roomId, create_response.PluginData)
var maxBitrate int
if streamType == streamTypeScreen {
maxBitrate = m.maxScreenBitrate
} else {
log.Println("Use existing room", roomId)
maxBitrate = m.maxStreamBitrate
}
if bitrate <= 0 {
bitrate = maxBitrate
} else {
bitrate = min(bitrate, maxBitrate)
}
create_msg["bitrate"] = bitrate
create_response, err := handle.Request(ctx, create_msg)
if err != nil {
if _, err2 := handle.Detach(ctx); err2 != nil {
log.Printf("Error detaching handle %d: %s", handle.Id, err2)
}
return nil, 0, 0, err
}
roomId := getPluginIntValue(create_response.PluginData, pluginVideoRoom, "room")
if roomId == 0 {
if _, err := handle.Detach(ctx); err != nil {
log.Printf("Error detaching handle %d: %s", handle.Id, err)
}
return nil, 0, 0, fmt.Errorf("No room id received: %+v", create_response)
}
log.Println("Created room", roomId, create_response.PluginData)
msg := map[string]interface{}{
"request": "join",
@ -696,16 +688,14 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
client.mcuJanusClient.handleDetached = client.handleDetached
client.mcuJanusClient.handleConnected = client.handleConnected
client.mcuJanusClient.handleSlowLink = client.handleSlowLink
m.mu.Lock()
m.publisherRoomIds[id+"|"+streamType] = roomId
m.mu.Unlock()
m.registerClient(client)
if err := client.publishNats("created"); err != nil {
log.Printf("Could not publish \"created\" event for publisher %s: %s\n", id, err)
}
log.Printf("Publisher %s is using handle %d", client.id, client.handleId)
go client.run(handle, client.closeChan)
m.mu.Lock()
m.publishers[id+"|"+streamType] = client
m.publisherCreated.Notify(id + "|" + streamType)
m.mu.Unlock()
return client, nil
}
@ -738,9 +728,7 @@ func (p *mcuJanusPublisher) handleDetached(event *janus.DetachedMsg) {
func (p *mcuJanusPublisher) handleConnected(event *janus.WebRTCUpMsg) {
log.Printf("Publisher %d received connected", p.handleId)
if err := p.publishNats("connected"); err != nil {
log.Printf("Could not publish \"connected\" event for publisher %s: %s\n", p.id, err)
}
p.mcu.publisherConnected.Notify(p.id + "|" + p.streamType)
}
func (p *mcuJanusPublisher) handleSlowLink(event *janus.SlowLinkMsg) {
@ -751,10 +739,6 @@ func (p *mcuJanusPublisher) handleSlowLink(event *janus.SlowLinkMsg) {
}
}
func (p *mcuJanusPublisher) publishNats(messageType string) error {
return p.mcu.nats.PublishNats("publisher-"+p.id+"|"+p.streamType, &NatsMessage{Type: messageType})
}
func (p *mcuJanusPublisher) NotifyReconnected() {
ctx := context.TODO()
handle, session, roomId, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate)
@ -769,9 +753,6 @@ func (p *mcuJanusPublisher) NotifyReconnected() {
p.session = session
p.roomId = roomId
p.mcu.mu.Lock()
p.mcu.publisherRoomIds[p.id+"|"+p.streamType] = roomId
p.mcu.mu.Unlock()
log.Printf("Publisher %s reconnected on handle %d", p.id, p.handleId)
}
@ -789,7 +770,7 @@ func (p *mcuJanusPublisher) Close(ctx context.Context) {
log.Printf("Room %d destroyed", p.roomId)
}
p.mcu.mu.Lock()
delete(p.mcu.publisherRoomIds, p.id+"|"+p.streamType)
delete(p.mcu.publishers, p.id+"|"+p.streamType)
p.mcu.mu.Unlock()
p.roomId = 0
notify = true
@ -833,134 +814,52 @@ type mcuJanusSubscriber struct {
publisher string
}
func (m *mcuJanus) lookupPublisherRoom(ctx context.Context, publisher string, streamType string) (uint64, error) {
handle := m.handle
if handle == nil {
return 0, ErrNotConnected
}
list_msg := map[string]interface{}{
"request": "list",
}
response_msg, err := handle.Request(ctx, list_msg)
if err != nil {
return 0, err
}
list, found := response_msg.PluginData.Data["list"]
if !found {
return 0, fmt.Errorf("no room list received")
}
entries, ok := list.([]interface{})
if !ok {
return 0, fmt.Errorf("Unsupported list received: %+v (%s)", list, reflect.TypeOf(list))
}
for _, entry := range entries {
if entry, ok := entry.(map[string]interface{}); ok {
description, found := entry["description"]
if !found {
continue
}
if description, ok := description.(string); ok {
if description != publisher+"|"+streamType {
continue
}
roomIdInterface, found := entry["room"]
if !found {
continue
}
roomId, err := convertIntValue(roomIdInterface)
if err != nil {
return 0, fmt.Errorf("Invalid room id received: %+v: %s", entry, err)
}
return roomId, nil
}
}
}
return 0, nil
}
func (m *mcuJanus) searchPublisherRoom(ctx context.Context, publisher string, streamType string) (uint64, error) {
// Check for publishers connected to this signaling server.
m.mu.Lock()
roomId, found := m.publisherRoomIds[publisher+"|"+streamType]
m.mu.Unlock()
if found {
return roomId, nil
}
// Check for publishers connected to a different signaling server.
roomId, err := m.lookupPublisherRoom(ctx, publisher, streamType)
if err != nil {
return 0, err
}
return roomId, nil
}
func (m *mcuJanus) getPublisherRoomId(ctx context.Context, publisher string, streamType string) (uint64, error) {
func (m *mcuJanus) getPublisher(ctx context.Context, publisher string, streamType string) (*mcuJanusPublisher, error) {
// Do the direct check immediately as this should be the normal case.
key := publisher + "|" + streamType
m.mu.Lock()
roomId, found := m.publisherRoomIds[publisher+"|"+streamType]
if found {
if result, found := m.publishers[key]; found {
m.mu.Unlock()
return roomId, nil
return result, nil
}
wakeupChan := make(chan *nats.Msg, 1)
sub, err := m.nats.Subscribe("publisher-"+publisher+"|"+streamType, wakeupChan)
waiter := m.publisherCreated.NewWaiter(key)
m.mu.Unlock()
if err != nil {
return 0, err
}
defer func() {
if err := sub.Unsubscribe(); err != nil {
log.Printf("Error unsubscribing channel for %s publisher %s: %s", streamType, publisher, err)
}
}()
defer m.publisherCreated.Release(waiter)
for roomId == 0 {
var err error
if roomId, err = m.searchPublisherRoom(ctx, publisher, streamType); err != nil {
log.Printf("Could not search for room of publisher %s: %s", publisher, err)
} else if roomId > 0 {
break
for {
m.mu.Lock()
result := m.publishers[key]
m.mu.Unlock()
if result != nil {
return result, nil
}
select {
case <-wakeupChan:
// We got the wakeup event through NATS, the publisher should be
// ready now.
case <-ctx.Done():
return 0, ctx.Err()
if err := waiter.Wait(ctx); err != nil {
return nil, err
}
}
return roomId, nil
}
func (m *mcuJanus) getOrCreateSubscriberHandle(ctx context.Context, publisher string, streamType string) (*JanusHandle, uint64, error) {
var roomId uint64
func (m *mcuJanus) getOrCreateSubscriberHandle(ctx context.Context, publisher string, streamType string) (*JanusHandle, *mcuJanusPublisher, error) {
var pub *mcuJanusPublisher
var err error
if roomId, err = m.getPublisherRoomId(ctx, publisher, streamType); err != nil {
return nil, 0, err
if pub, err = m.getPublisher(ctx, publisher, streamType); err != nil {
return nil, nil, err
}
session := m.session
if session == nil {
return nil, 0, ErrNotConnected
return nil, nil, ErrNotConnected
}
handle, err := session.Attach(ctx, pluginVideoRoom)
if err != nil {
return nil, 0, err
return nil, nil, err
}
log.Printf("Attached subscriber to room %d of publisher %s in plugin %s in session %d as %d", roomId, publisher, pluginVideoRoom, session.Id, handle.Id)
return handle, roomId, nil
log.Printf("Attached subscriber to room %d of publisher %s in plugin %s in session %d as %d", pub.roomId, publisher, pluginVideoRoom, session.Id, handle.Id)
return handle, pub, nil
}
func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error) {
@ -968,7 +867,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ
return nil, fmt.Errorf("Unsupported stream type %s", streamType)
}
handle, roomId, err := m.getOrCreateSubscriberHandle(ctx, publisher, streamType)
handle, pub, err := m.getOrCreateSubscriberHandle(ctx, publisher, streamType)
if err != nil {
return nil, err
}
@ -979,7 +878,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ
listener: listener,
id: atomic.AddUint64(&m.clientId, 1),
roomId: roomId,
roomId: pub.roomId,
streamType: streamType,
handle: handle,
@ -1047,7 +946,7 @@ func (p *mcuJanusSubscriber) handleSlowLink(event *janus.SlowLinkMsg) {
func (p *mcuJanusSubscriber) NotifyReconnected() {
ctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
defer cancel()
handle, roomId, err := p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType)
handle, pub, err := p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType)
if err != nil {
// TODO(jojo): Retry?
log.Printf("Could not reconnect subscriber for publisher %s: %s\n", p.publisher, err)
@ -1057,7 +956,7 @@ func (p *mcuJanusSubscriber) NotifyReconnected() {
p.handle = handle
p.handleId = handle.Id
p.roomId = roomId
p.roomId = pub.roomId
log.Printf("Subscriber %d for publisher %s reconnected on handle %d", p.id, p.publisher, p.handleId)
}
@ -1077,17 +976,8 @@ func (p *mcuJanusSubscriber) joinRoom(ctx context.Context, callback func(error,
return
}
wakeupChan := make(chan *nats.Msg, 1)
sub, err := p.mcu.nats.Subscribe("publisher-"+p.publisher+"|"+p.streamType, wakeupChan)
if err != nil {
callback(err, nil)
return
}
defer func() {
if err := sub.Unsubscribe(); err != nil {
log.Printf("Error unsubscribing channel for %s publisher %s: %s", p.streamType, p.publisher, err)
}
}()
waiter := p.mcu.publisherConnected.NewWaiter(p.publisher + "|" + p.streamType)
defer p.mcu.publisherConnected.Release(waiter)
retry:
join_msg := map[string]interface{}{
@ -1115,8 +1005,8 @@ retry:
p.closeClient(ctx)
p.mu.Unlock()
var roomId uint64
handle, roomId, err = p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType)
var pub *mcuJanusPublisher
handle, pub, err = p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType)
if err != nil {
// Reconnection didn't work, need to unregister/remove subscriber
// so a new object will be created if the request is retried.
@ -1128,7 +1018,7 @@ retry:
p.handle = handle
p.handleId = handle.Id
p.roomId = roomId
p.roomId = pub.roomId
p.closeChan = make(chan bool, 1)
go p.run(p.handle, p.closeChan)
log.Printf("Already connected subscriber %d for %s, leaving and re-joining on handle %d", p.id, p.streamType, p.handleId)
@ -1142,22 +1032,12 @@ retry:
case JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED:
log.Printf("Publisher %s not sending yet for %s, wait and retry to join room %d as subscriber", p.publisher, p.streamType, p.roomId)
}
wait:
select {
case msg := <-wakeupChan:
var message NatsMessage
if err := p.mcu.nats.Decode(msg, &message); err != nil {
log.Printf("Error decoding wakeup NATS message %s (%s)\n", string(msg.Data), err)
goto wait
} else if message.Type != "connected" {
log.Printf("Unsupported NATS message waiting for publisher %s: %+v\n", p.publisher, message)
goto wait
}
log.Printf("Retry subscribing %s from %s", p.streamType, p.publisher)
case <-ctx.Done():
callback(ctx.Err(), nil)
if err := waiter.Wait(ctx); err != nil {
callback(err, nil)
return
}
log.Printf("Retry subscribing %s from %s", p.streamType, p.publisher)
goto retry
default:
// TODO(jojo): Should we handle other errors, too?

View File

@ -20,13 +20,6 @@
# - etcd: Token information are retrieved from an etcd cluster (see below).
tokentype = static
[nats]
# Url of NATS backend to use. This can also be a list of URLs to connect to
# multiple backends. For local development, this can be set to ":loopback:"
# to process NATS messages internally instead of sending them through an
# external NATS backend.
#url = nats://localhost:4222
[tokens]
# For token type "static": Mapping of <tokenid> = <publickey> of signaling
# servers allowed to connect.

View File

@ -36,9 +36,6 @@ import (
"github.com/dlintw/goconf"
"github.com/gorilla/mux"
"github.com/nats-io/nats.go"
signaling "github.com/strukturag/nextcloud-spreed-signaling"
)
var (
@ -81,19 +78,9 @@ func main() {
runtime.GOMAXPROCS(cpus)
log.Printf("Using a maximum of %d CPUs\n", cpus)
natsUrl, _ := config.GetString("nats", "url")
if natsUrl == "" {
natsUrl = nats.DefaultURL
}
nats, err := signaling.NewNatsClient(natsUrl)
if err != nil {
log.Fatal("Could not create NATS client: ", err)
}
r := mux.NewRouter()
proxy, err := NewProxyServer(r, version, config, nats)
proxy, err := NewProxyServer(r, version, config)
if err != nil {
log.Fatal(err)
}

View File

@ -88,7 +88,6 @@ type ProxyServer struct {
country string
url string
nats signaling.NatsClient
mcu signaling.Mcu
stopped uint32
@ -110,7 +109,7 @@ type ProxyServer struct {
clientsLock sync.RWMutex
}
func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile, nats signaling.NatsClient) (*ProxyServer, error) {
func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile) (*ProxyServer, error) {
hashKey := make([]byte, 64)
if _, err := rand.Read(hashKey); err != nil {
return nil, fmt.Errorf("Could not generate random hash key: %s", err)
@ -172,8 +171,6 @@ func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile, na
version: version,
country: country,
nats: nats,
shutdownChannel: make(chan bool, 1),
upgrader: websocket.Upgrader{
@ -238,7 +235,7 @@ func (s *ProxyServer) Start(config *goconf.ConfigFile) error {
for {
switch mcuType {
case signaling.McuTypeJanus:
mcu, err = signaling.NewMcuJanus(s.url, config, s.nats)
mcu, err = signaling.NewMcuJanus(s.url, config)
default:
return fmt.Errorf("Unsupported MCU type: %s", mcuType)
}

View File

@ -175,7 +175,7 @@ func main() {
for {
switch mcuType {
case signaling.McuTypeJanus:
mcu, err = signaling.NewMcuJanus(mcuUrl, config, nats)
mcu, err = signaling.NewMcuJanus(mcuUrl, config)
case signaling.McuTypeProxy:
mcu, err = signaling.NewMcuProxy(config)
default: