From 1d8969c5f8c6ebf57d0194df614a64013d7ad289 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Fri, 25 Jun 2021 16:30:03 +0200 Subject: [PATCH] Support explicitly pinning streams to Janus event loops. If a fixed number of event loops is configured in Janus, the signaling server will assign streams to a dedicated loop. This should reduce context switches as the publisher and all subscribers will be using the same thread in Janus. --- eventloops.go | 99 ++++++++++++++++++++++++++++++++++++++++++++++ eventloops_test.go | 68 +++++++++++++++++++++++++++++++ janus_client.go | 31 +++++++++------ mcu_janus.go | 98 +++++++++++++++++++++++++++++++++++++++------ 4 files changed, 274 insertions(+), 22 deletions(-) create mode 100644 eventloops.go create mode 100644 eventloops_test.go diff --git a/eventloops.go b/eventloops.go new file mode 100644 index 0000000..e2798b3 --- /dev/null +++ b/eventloops.go @@ -0,0 +1,99 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2021 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "container/heap" + "fmt" +) + +type EventLoop struct { + loop int + count int + index int +} + +func (e *EventLoop) String() string { + return fmt.Sprintf("Loop %d at %d (%d streams)", e.loop, e.index, e.count) +} + +type EventLoops []*EventLoop + +func NewEventLoops(count int) EventLoops { + loops := make(EventLoops, count) + for i := 0; i < count; i++ { + loops[i] = &EventLoop{ + loop: i, + index: i, + } + } + heap.Init(&loops) + return loops +} + +func (l EventLoops) Len() int { + return len(l) +} + +func (l EventLoops) Less(i, j int) bool { + if l[i].count < l[j].count { + return true + } else if l[i].loop < l[j].loop { + // Consistent ordering for tests, same number of streams are sorted by loop. + return true + } + return false +} + +func (l EventLoops) Swap(i, j int) { + l[i], l[j] = l[j], l[i] + l[i].index = i + l[j].index = j +} + +func (l *EventLoops) Push(x interface{}) { + n := len(*l) + item := x.(*EventLoop) + item.index = n + *l = append(*l, item) +} + +func (l *EventLoops) Pop() interface{} { + old := *l + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + item.index = -1 // for safety + *l = old[0 : n-1] + return item +} + +func (l *EventLoops) Update(loop *EventLoop, change int) { + loop.count += change + heap.Fix(l, loop.index) +} + +func (l *EventLoops) GetLowest() *EventLoop { + loop := (*l)[0] + l.Update(loop, 1) + return loop +} diff --git a/eventloops_test.go b/eventloops_test.go new file mode 100644 index 0000000..c55c1fc --- /dev/null +++ b/eventloops_test.go @@ -0,0 +1,68 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2021 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "testing" +) + +func Test_EventLoops(t *testing.T) { + loops := NewEventLoops(2) + + l0 := loops.GetLowest() + if l0.loop != 0 { + t.Errorf("Expected loop 0, got %+v", l0) + } + if l0.count != 1 { + t.Errorf("Expected count 1, got %+v", l0) + } + + l1 := loops.GetLowest() + if l1.loop != 1 { + t.Errorf("Expected loop 1, got %+v", l1) + } + if l1.count != 1 { + t.Errorf("Expected count 1, got %+v", l1) + } + + l2 := loops.GetLowest() + if l2.loop != 0 { + t.Errorf("Expected loop 0, got %+v", l2) + } + if l2.count != 2 { + t.Errorf("Expected count 1, got %+v", l2) + } + + loops.Update(l2, -1) + if l0.count != 1 { + t.Errorf("Expected count 1, got %+v", l0) + } + + loops.Update(l1, -1) + l3 := loops.GetLowest() + if l3.loop != 1 { + t.Errorf("Expected loop 1, got %+v", l3) + } + if l3.count != 1 { + t.Errorf("Expected count 1, got %+v", l3) + } +} diff --git a/janus_client.go b/janus_client.go index 8e644c6..f53ea9d 100644 --- a/janus_client.go +++ b/janus_client.go @@ -140,17 +140,19 @@ var msgtypes = map[string]func() interface{}{ } type InfoMsg struct { - Name string - Version int - VersionString string `json:"version_string"` - Author string - DataChannels bool `json:"data_channels"` - IPv6 bool `json:"ipv6"` - LocalIP string `json:"local-ip"` - ICE_TCP bool `json:"ice-tcp"` - FullTrickle bool `json:"full-trickle"` - Transports map[string]janus.PluginInfo - Plugins map[string]janus.PluginInfo + Name string + Version int + VersionString string `json:"version_string"` + Author string + DataChannels bool `json:"data_channels"` + IPv6 bool `json:"ipv6"` + LocalIP string `json:"local-ip"` + ICE_TCP bool `json:"ice-tcp"` + FullTrickle bool `json:"full-trickle"` + StaticEventLoops int `json:"static-event-loops"` + LoopIndication bool `json:"loop-indication"` + Transports map[string]janus.PluginInfo + Plugins map[string]janus.PluginInfo } type TrickleMsg struct { @@ -592,8 +594,15 @@ func (session *JanusSession) send(msg map[string]interface{}, t *transaction) (u // plugin should be the unique string of the plugin to attach to. // On success, a new Handle will be returned and error will be nil. func (session *JanusSession) Attach(ctx context.Context, plugin string) (*JanusHandle, error) { + return session.AttachLoop(ctx, plugin, -1) +} + +func (session *JanusSession) AttachLoop(ctx context.Context, plugin string, loop int) (*JanusHandle, error) { req, ch := newRequest("attach") req["plugin"] = plugin + if loop >= 0 { + req["loop_index"] = loop + } id, err := session.send(req, ch) if err != nil { return nil, err diff --git a/mcu_janus.go b/mcu_janus.go index 81ae35f..6716178 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -145,6 +145,9 @@ type mcuJanus struct { session *JanusSession handle *JanusHandle + loopsLock sync.Mutex + eventloops EventLoops + closeChan chan bool muClients sync.Mutex @@ -324,6 +327,16 @@ func (m *mcuJanus) Start() error { } log.Println("Created Janus handle", m.handle.Id) + m.loopsLock.Lock() + if info.StaticEventLoops > 0 && info.LoopIndication { + log.Printf("Found %d static event loops and loop indication is allowed, streams will be pinned", info.StaticEventLoops) + m.eventloops = NewEventLoops(info.StaticEventLoops) + } else { + log.Println("No static event loops and/or loop indication is setup, streams will not be pinned") + m.eventloops = nil + } + m.loopsLock.Unlock() + go m.run() m.notifyOnConnected() @@ -342,6 +355,41 @@ func (m *mcuJanus) unregisterClient(client clientInterface) { m.muClients.Unlock() } +func (m *mcuJanus) getEventLoop() *EventLoop { + m.loopsLock.Lock() + defer m.loopsLock.Unlock() + if len(m.eventloops) == 0 { + return nil + } + + loop := m.eventloops.GetLowest() + return loop +} + +func (m *mcuJanus) acquireEventLoop(loop *EventLoop) { + if loop == nil { + return + } + + m.loopsLock.Lock() + defer m.loopsLock.Unlock() + if len(m.eventloops) != 0 { + m.eventloops.Update(loop, 1) + } +} + +func (m *mcuJanus) releaseEventLoop(loop *EventLoop) { + if loop == nil { + return + } + + m.loopsLock.Lock() + defer m.loopsLock.Unlock() + if len(m.eventloops) != 0 { + m.eventloops.Update(loop, -1) + } +} + func (m *mcuJanus) run() { ticker := time.NewTicker(keepaliveInterval) defer ticker.Stop() @@ -441,6 +489,7 @@ type mcuJanusClient struct { handle *JanusHandle handleId uint64 + eventLoop *EventLoop closeChan chan bool deferred chan func() @@ -475,6 +524,10 @@ func (c *mcuJanusClient) closeClient(ctx context.Context) bool { log.Println("Could not detach client", handle.Id, err) } } + if c.eventLoop != nil { + c.mcu.releaseEventLoop(c.eventLoop) + c.eventLoop = nil + } return true } @@ -713,14 +766,22 @@ func min(a, b int) int { } } -func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, streamType string, bitrate int) (*JanusHandle, uint64, uint64, error) { +func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, streamType string, bitrate int) (*JanusHandle, uint64, *EventLoop, uint64, error) { session := m.session if session == nil { - return nil, 0, 0, ErrNotConnected + return nil, 0, nil, 0, ErrNotConnected + } + loop := m.getEventLoop() + var handle *JanusHandle + var err error + if loop != nil { + handle, err = session.AttachLoop(ctx, pluginVideoRoom, loop.loop) + } else { + handle, err = session.AttachLoop(ctx, pluginVideoRoom, -1) } - handle, err := session.Attach(ctx, pluginVideoRoom) if err != nil { - return nil, 0, 0, err + m.releaseEventLoop(loop) + return nil, 0, nil, 0, err } log.Printf("Attached %s as publisher %d to plugin %s in session %d", streamType, handle.Id, pluginVideoRoom, session.Id) @@ -750,7 +811,8 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st if _, err2 := handle.Detach(ctx); err2 != nil { log.Printf("Error detaching handle %d: %s", handle.Id, err2) } - return nil, 0, 0, err + m.releaseEventLoop(loop) + return nil, 0, nil, 0, err } roomId := getPluginIntValue(create_response.PluginData, pluginVideoRoom, "room") @@ -758,7 +820,8 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st if _, err := handle.Detach(ctx); err != nil { log.Printf("Error detaching handle %d: %s", handle.Id, err) } - return nil, 0, 0, fmt.Errorf("No room id received: %+v", create_response) + m.releaseEventLoop(loop) + return nil, 0, nil, 0, fmt.Errorf("No room id received: %+v", create_response) } log.Println("Created room", roomId, create_response.PluginData) @@ -775,10 +838,11 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st if _, err2 := handle.Detach(ctx); err2 != nil { log.Printf("Error detaching handle %d: %s", handle.Id, err2) } - return nil, 0, 0, err + m.releaseEventLoop(loop) + return nil, 0, nil, 0, err } - return handle, response.Session, roomId, nil + return handle, response.Session, loop, roomId, nil } func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, initiator McuInitiator) (McuPublisher, error) { @@ -786,7 +850,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st return nil, fmt.Errorf("Unsupported stream type %s", streamType) } - handle, session, roomId, err := m.getOrCreatePublisherHandle(ctx, id, streamType, bitrate) + handle, session, loop, roomId, err := m.getOrCreatePublisherHandle(ctx, id, streamType, bitrate) if err != nil { return nil, err } @@ -803,6 +867,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st handle: handle, handleId: handle.Id, + eventLoop: loop, closeChan: make(chan bool, 1), deferred: make(chan func(), 64), }, @@ -880,7 +945,7 @@ func (p *mcuJanusPublisher) handleMedia(event *janus.MediaMsg) { func (p *mcuJanusPublisher) NotifyReconnected() { ctx := context.TODO() - handle, session, roomId, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate) + handle, session, loop, roomId, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate) if err != nil { log.Printf("Could not reconnect publisher %s: %s", p.id, err) // TODO(jojo): Retry @@ -891,6 +956,8 @@ func (p *mcuJanusPublisher) NotifyReconnected() { p.handleId = handle.Id p.session = session p.roomId = roomId + p.eventLoop = loop + p.mcu.acquireEventLoop(loop) log.Printf("Publisher %s reconnected on handle %d", p.id, p.handleId) } @@ -997,7 +1064,12 @@ func (m *mcuJanus) getOrCreateSubscriberHandle(ctx context.Context, publisher st return nil, nil, ErrNotConnected } - handle, err := session.Attach(ctx, pluginVideoRoom) + var handle *JanusHandle + if pub.eventLoop != nil { + handle, err = session.AttachLoop(ctx, pluginVideoRoom, pub.eventLoop.loop) + } else { + handle, err = session.AttachLoop(ctx, pluginVideoRoom, -1) + } if err != nil { return nil, nil, err } @@ -1027,6 +1099,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ handle: handle, handleId: handle.Id, + eventLoop: pub.eventLoop, closeChan: make(chan bool, 1), deferred: make(chan func(), 64), }, @@ -1039,6 +1112,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ client.mcuJanusClient.handleSlowLink = client.handleSlowLink client.mcuJanusClient.handleMedia = client.handleMedia m.registerClient(client) + m.acquireEventLoop(client.eventLoop) go client.run(handle, client.closeChan) statsSubscribersCurrent.WithLabelValues(streamType).Inc() statsSubscribersTotal.WithLabelValues(streamType).Inc() @@ -1109,6 +1183,8 @@ func (p *mcuJanusSubscriber) NotifyReconnected() { p.handle = handle p.handleId = handle.Id p.roomId = pub.roomId + p.eventLoop = pub.eventLoop + p.mcu.acquireEventLoop(pub.eventLoop) log.Printf("Subscriber %d for publisher %s reconnected on handle %d", p.id, p.publisher, p.handleId) }