Support explicitly pinning streams to Janus event loops.

If a fixed number of event loops is configured in Janus, the signaling server
will assign streams to a dedicated loop. This should reduce context switches
as the publisher and all subscribers will be using the same thread in Janus.
This commit is contained in:
Joachim Bauch 2021-06-25 16:30:03 +02:00
parent fb12b359e0
commit 1d8969c5f8
No known key found for this signature in database
GPG Key ID: 77C1D22D53E15F02
4 changed files with 274 additions and 22 deletions

99
eventloops.go Normal file
View File

@ -0,0 +1,99 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2021 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import (
"container/heap"
"fmt"
)
type EventLoop struct {
loop int
count int
index int
}
func (e *EventLoop) String() string {
return fmt.Sprintf("Loop %d at %d (%d streams)", e.loop, e.index, e.count)
}
type EventLoops []*EventLoop
func NewEventLoops(count int) EventLoops {
loops := make(EventLoops, count)
for i := 0; i < count; i++ {
loops[i] = &EventLoop{
loop: i,
index: i,
}
}
heap.Init(&loops)
return loops
}
func (l EventLoops) Len() int {
return len(l)
}
func (l EventLoops) Less(i, j int) bool {
if l[i].count < l[j].count {
return true
} else if l[i].loop < l[j].loop {
// Consistent ordering for tests, same number of streams are sorted by loop.
return true
}
return false
}
func (l EventLoops) Swap(i, j int) {
l[i], l[j] = l[j], l[i]
l[i].index = i
l[j].index = j
}
func (l *EventLoops) Push(x interface{}) {
n := len(*l)
item := x.(*EventLoop)
item.index = n
*l = append(*l, item)
}
func (l *EventLoops) Pop() interface{} {
old := *l
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*l = old[0 : n-1]
return item
}
func (l *EventLoops) Update(loop *EventLoop, change int) {
loop.count += change
heap.Fix(l, loop.index)
}
func (l *EventLoops) GetLowest() *EventLoop {
loop := (*l)[0]
l.Update(loop, 1)
return loop
}

68
eventloops_test.go Normal file
View File

@ -0,0 +1,68 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2021 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import (
"testing"
)
func Test_EventLoops(t *testing.T) {
loops := NewEventLoops(2)
l0 := loops.GetLowest()
if l0.loop != 0 {
t.Errorf("Expected loop 0, got %+v", l0)
}
if l0.count != 1 {
t.Errorf("Expected count 1, got %+v", l0)
}
l1 := loops.GetLowest()
if l1.loop != 1 {
t.Errorf("Expected loop 1, got %+v", l1)
}
if l1.count != 1 {
t.Errorf("Expected count 1, got %+v", l1)
}
l2 := loops.GetLowest()
if l2.loop != 0 {
t.Errorf("Expected loop 0, got %+v", l2)
}
if l2.count != 2 {
t.Errorf("Expected count 1, got %+v", l2)
}
loops.Update(l2, -1)
if l0.count != 1 {
t.Errorf("Expected count 1, got %+v", l0)
}
loops.Update(l1, -1)
l3 := loops.GetLowest()
if l3.loop != 1 {
t.Errorf("Expected loop 1, got %+v", l3)
}
if l3.count != 1 {
t.Errorf("Expected count 1, got %+v", l3)
}
}

View File

@ -140,17 +140,19 @@ var msgtypes = map[string]func() interface{}{
}
type InfoMsg struct {
Name string
Version int
VersionString string `json:"version_string"`
Author string
DataChannels bool `json:"data_channels"`
IPv6 bool `json:"ipv6"`
LocalIP string `json:"local-ip"`
ICE_TCP bool `json:"ice-tcp"`
FullTrickle bool `json:"full-trickle"`
Transports map[string]janus.PluginInfo
Plugins map[string]janus.PluginInfo
Name string
Version int
VersionString string `json:"version_string"`
Author string
DataChannels bool `json:"data_channels"`
IPv6 bool `json:"ipv6"`
LocalIP string `json:"local-ip"`
ICE_TCP bool `json:"ice-tcp"`
FullTrickle bool `json:"full-trickle"`
StaticEventLoops int `json:"static-event-loops"`
LoopIndication bool `json:"loop-indication"`
Transports map[string]janus.PluginInfo
Plugins map[string]janus.PluginInfo
}
type TrickleMsg struct {
@ -592,8 +594,15 @@ func (session *JanusSession) send(msg map[string]interface{}, t *transaction) (u
// plugin should be the unique string of the plugin to attach to.
// On success, a new Handle will be returned and error will be nil.
func (session *JanusSession) Attach(ctx context.Context, plugin string) (*JanusHandle, error) {
return session.AttachLoop(ctx, plugin, -1)
}
func (session *JanusSession) AttachLoop(ctx context.Context, plugin string, loop int) (*JanusHandle, error) {
req, ch := newRequest("attach")
req["plugin"] = plugin
if loop >= 0 {
req["loop_index"] = loop
}
id, err := session.send(req, ch)
if err != nil {
return nil, err

View File

@ -145,6 +145,9 @@ type mcuJanus struct {
session *JanusSession
handle *JanusHandle
loopsLock sync.Mutex
eventloops EventLoops
closeChan chan bool
muClients sync.Mutex
@ -324,6 +327,16 @@ func (m *mcuJanus) Start() error {
}
log.Println("Created Janus handle", m.handle.Id)
m.loopsLock.Lock()
if info.StaticEventLoops > 0 && info.LoopIndication {
log.Printf("Found %d static event loops and loop indication is allowed, streams will be pinned", info.StaticEventLoops)
m.eventloops = NewEventLoops(info.StaticEventLoops)
} else {
log.Println("No static event loops and/or loop indication is setup, streams will not be pinned")
m.eventloops = nil
}
m.loopsLock.Unlock()
go m.run()
m.notifyOnConnected()
@ -342,6 +355,41 @@ func (m *mcuJanus) unregisterClient(client clientInterface) {
m.muClients.Unlock()
}
func (m *mcuJanus) getEventLoop() *EventLoop {
m.loopsLock.Lock()
defer m.loopsLock.Unlock()
if len(m.eventloops) == 0 {
return nil
}
loop := m.eventloops.GetLowest()
return loop
}
func (m *mcuJanus) acquireEventLoop(loop *EventLoop) {
if loop == nil {
return
}
m.loopsLock.Lock()
defer m.loopsLock.Unlock()
if len(m.eventloops) != 0 {
m.eventloops.Update(loop, 1)
}
}
func (m *mcuJanus) releaseEventLoop(loop *EventLoop) {
if loop == nil {
return
}
m.loopsLock.Lock()
defer m.loopsLock.Unlock()
if len(m.eventloops) != 0 {
m.eventloops.Update(loop, -1)
}
}
func (m *mcuJanus) run() {
ticker := time.NewTicker(keepaliveInterval)
defer ticker.Stop()
@ -441,6 +489,7 @@ type mcuJanusClient struct {
handle *JanusHandle
handleId uint64
eventLoop *EventLoop
closeChan chan bool
deferred chan func()
@ -475,6 +524,10 @@ func (c *mcuJanusClient) closeClient(ctx context.Context) bool {
log.Println("Could not detach client", handle.Id, err)
}
}
if c.eventLoop != nil {
c.mcu.releaseEventLoop(c.eventLoop)
c.eventLoop = nil
}
return true
}
@ -713,14 +766,22 @@ func min(a, b int) int {
}
}
func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, streamType string, bitrate int) (*JanusHandle, uint64, uint64, error) {
func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, streamType string, bitrate int) (*JanusHandle, uint64, *EventLoop, uint64, error) {
session := m.session
if session == nil {
return nil, 0, 0, ErrNotConnected
return nil, 0, nil, 0, ErrNotConnected
}
loop := m.getEventLoop()
var handle *JanusHandle
var err error
if loop != nil {
handle, err = session.AttachLoop(ctx, pluginVideoRoom, loop.loop)
} else {
handle, err = session.AttachLoop(ctx, pluginVideoRoom, -1)
}
handle, err := session.Attach(ctx, pluginVideoRoom)
if err != nil {
return nil, 0, 0, err
m.releaseEventLoop(loop)
return nil, 0, nil, 0, err
}
log.Printf("Attached %s as publisher %d to plugin %s in session %d", streamType, handle.Id, pluginVideoRoom, session.Id)
@ -750,7 +811,8 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st
if _, err2 := handle.Detach(ctx); err2 != nil {
log.Printf("Error detaching handle %d: %s", handle.Id, err2)
}
return nil, 0, 0, err
m.releaseEventLoop(loop)
return nil, 0, nil, 0, err
}
roomId := getPluginIntValue(create_response.PluginData, pluginVideoRoom, "room")
@ -758,7 +820,8 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st
if _, err := handle.Detach(ctx); err != nil {
log.Printf("Error detaching handle %d: %s", handle.Id, err)
}
return nil, 0, 0, fmt.Errorf("No room id received: %+v", create_response)
m.releaseEventLoop(loop)
return nil, 0, nil, 0, fmt.Errorf("No room id received: %+v", create_response)
}
log.Println("Created room", roomId, create_response.PluginData)
@ -775,10 +838,11 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st
if _, err2 := handle.Detach(ctx); err2 != nil {
log.Printf("Error detaching handle %d: %s", handle.Id, err2)
}
return nil, 0, 0, err
m.releaseEventLoop(loop)
return nil, 0, nil, 0, err
}
return handle, response.Session, roomId, nil
return handle, response.Session, loop, roomId, nil
}
func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, initiator McuInitiator) (McuPublisher, error) {
@ -786,7 +850,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
return nil, fmt.Errorf("Unsupported stream type %s", streamType)
}
handle, session, roomId, err := m.getOrCreatePublisherHandle(ctx, id, streamType, bitrate)
handle, session, loop, roomId, err := m.getOrCreatePublisherHandle(ctx, id, streamType, bitrate)
if err != nil {
return nil, err
}
@ -803,6 +867,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
handle: handle,
handleId: handle.Id,
eventLoop: loop,
closeChan: make(chan bool, 1),
deferred: make(chan func(), 64),
},
@ -880,7 +945,7 @@ func (p *mcuJanusPublisher) handleMedia(event *janus.MediaMsg) {
func (p *mcuJanusPublisher) NotifyReconnected() {
ctx := context.TODO()
handle, session, roomId, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate)
handle, session, loop, roomId, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate)
if err != nil {
log.Printf("Could not reconnect publisher %s: %s", p.id, err)
// TODO(jojo): Retry
@ -891,6 +956,8 @@ func (p *mcuJanusPublisher) NotifyReconnected() {
p.handleId = handle.Id
p.session = session
p.roomId = roomId
p.eventLoop = loop
p.mcu.acquireEventLoop(loop)
log.Printf("Publisher %s reconnected on handle %d", p.id, p.handleId)
}
@ -997,7 +1064,12 @@ func (m *mcuJanus) getOrCreateSubscriberHandle(ctx context.Context, publisher st
return nil, nil, ErrNotConnected
}
handle, err := session.Attach(ctx, pluginVideoRoom)
var handle *JanusHandle
if pub.eventLoop != nil {
handle, err = session.AttachLoop(ctx, pluginVideoRoom, pub.eventLoop.loop)
} else {
handle, err = session.AttachLoop(ctx, pluginVideoRoom, -1)
}
if err != nil {
return nil, nil, err
}
@ -1027,6 +1099,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ
handle: handle,
handleId: handle.Id,
eventLoop: pub.eventLoop,
closeChan: make(chan bool, 1),
deferred: make(chan func(), 64),
},
@ -1039,6 +1112,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ
client.mcuJanusClient.handleSlowLink = client.handleSlowLink
client.mcuJanusClient.handleMedia = client.handleMedia
m.registerClient(client)
m.acquireEventLoop(client.eventLoop)
go client.run(handle, client.closeChan)
statsSubscribersCurrent.WithLabelValues(streamType).Inc()
statsSubscribersTotal.WithLabelValues(streamType).Inc()
@ -1109,6 +1183,8 @@ func (p *mcuJanusSubscriber) NotifyReconnected() {
p.handle = handle
p.handleId = handle.Id
p.roomId = pub.roomId
p.eventLoop = pub.eventLoop
p.mcu.acquireEventLoop(pub.eventLoop)
log.Printf("Subscriber %d for publisher %s reconnected on handle %d", p.id, p.publisher, p.handleId)
}