mirror of
https://github.com/strukturag/nextcloud-spreed-signaling
synced 2024-06-16 12:45:14 +02:00
Merge pull request #491 from strukturag/struct-chan
Use "struct{}" channel if only used as signaling mechanism.
This commit is contained in:
commit
18335071e9
|
@ -505,8 +505,7 @@ func TestBackendConfiguration_Etcd(t *testing.T) {
|
||||||
defer cfg.Close()
|
defer cfg.Close()
|
||||||
|
|
||||||
storage := cfg.storage.(*backendStorageEtcd)
|
storage := cfg.storage.(*backendStorageEtcd)
|
||||||
ch := make(chan bool, 1)
|
ch := storage.getWakeupChannelForTesting()
|
||||||
storage.SetWakeupForTesting(ch)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
|
@ -44,7 +44,7 @@ type backendStorageEtcd struct {
|
||||||
initializedCtx context.Context
|
initializedCtx context.Context
|
||||||
initializedFunc context.CancelFunc
|
initializedFunc context.CancelFunc
|
||||||
initializedWg sync.WaitGroup
|
initializedWg sync.WaitGroup
|
||||||
wakeupChanForTesting chan bool
|
wakeupChanForTesting chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBackendStorageEtcd(config *goconf.ConfigFile, etcdClient *EtcdClient) (BackendStorage, error) {
|
func NewBackendStorageEtcd(config *goconf.ConfigFile, etcdClient *EtcdClient) (BackendStorage, error) {
|
||||||
|
@ -83,20 +83,13 @@ func (s *backendStorageEtcd) WaitForInitialized(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *backendStorageEtcd) SetWakeupForTesting(ch chan bool) {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
s.wakeupChanForTesting = ch
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *backendStorageEtcd) wakeupForTesting() {
|
func (s *backendStorageEtcd) wakeupForTesting() {
|
||||||
if s.wakeupChanForTesting == nil {
|
if s.wakeupChanForTesting == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case s.wakeupChanForTesting <- true:
|
case s.wakeupChanForTesting <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
35
backend_storage_etcd_test.go
Normal file
35
backend_storage_etcd_test.go
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
/**
|
||||||
|
* Standalone signaling server for the Nextcloud Spreed app.
|
||||||
|
* Copyright (C) 2023 struktur AG
|
||||||
|
*
|
||||||
|
* @author Joachim Bauch <bauch@struktur.de>
|
||||||
|
*
|
||||||
|
* @license GNU AGPL version 3 or any later version
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Affero General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
package signaling
|
||||||
|
|
||||||
|
func (s *backendStorageEtcd) getWakeupChannelForTesting() <-chan struct{} {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
if s.wakeupChanForTesting != nil {
|
||||||
|
return s.wakeupChanForTesting
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan struct{}, 1)
|
||||||
|
s.wakeupChanForTesting = ch
|
||||||
|
return ch
|
||||||
|
}
|
|
@ -127,7 +127,7 @@ type SignalingClient struct {
|
||||||
stats *Stats
|
stats *Stats
|
||||||
closed uint32
|
closed uint32
|
||||||
|
|
||||||
stopChan chan bool
|
stopChan chan struct{}
|
||||||
|
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
privateSessionId string
|
privateSessionId string
|
||||||
|
@ -149,7 +149,7 @@ func NewSignalingClient(cookie *securecookie.SecureCookie, url string, stats *St
|
||||||
|
|
||||||
stats: stats,
|
stats: stats,
|
||||||
|
|
||||||
stopChan: make(chan bool),
|
stopChan: make(chan struct{}),
|
||||||
}
|
}
|
||||||
doneWg.Add(2)
|
doneWg.Add(2)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -169,10 +169,7 @@ func (c *SignalingClient) Close() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Signal writepump to terminate
|
// Signal writepump to terminate
|
||||||
select {
|
close(c.stopChan)
|
||||||
case c.stopChan <- true:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
c.lock.Lock()
|
c.lock.Lock()
|
||||||
c.publicSessionId = ""
|
c.publicSessionId = ""
|
||||||
|
|
|
@ -69,13 +69,13 @@ func TestDeferredExecutor_Order(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
done := make(chan bool)
|
done := make(chan struct{})
|
||||||
for x := 0; x < 10; x++ {
|
for x := 0; x < 10; x++ {
|
||||||
e.Execute(getFunc(x))
|
e.Execute(getFunc(x))
|
||||||
}
|
}
|
||||||
|
|
||||||
e.Execute(func() {
|
e.Execute(func() {
|
||||||
done <- true
|
close(done)
|
||||||
})
|
})
|
||||||
<-done
|
<-done
|
||||||
|
|
||||||
|
@ -90,10 +90,10 @@ func TestDeferredExecutor_CloseFromFunc(t *testing.T) {
|
||||||
e := NewDeferredExecutor(64)
|
e := NewDeferredExecutor(64)
|
||||||
defer e.waitForStop()
|
defer e.waitForStop()
|
||||||
|
|
||||||
done := make(chan bool)
|
done := make(chan struct{})
|
||||||
e.Execute(func() {
|
e.Execute(func() {
|
||||||
|
defer close(done)
|
||||||
e.Close()
|
e.Close()
|
||||||
done <- true
|
|
||||||
})
|
})
|
||||||
|
|
||||||
<-done
|
<-done
|
||||||
|
|
|
@ -204,7 +204,7 @@ type EtcdClientTestListener struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
|
||||||
initial chan bool
|
initial chan struct{}
|
||||||
initialWg sync.WaitGroup
|
initialWg sync.WaitGroup
|
||||||
events chan etcdEvent
|
events chan etcdEvent
|
||||||
}
|
}
|
||||||
|
@ -217,7 +217,7 @@ func NewEtcdClientTestListener(ctx context.Context, t *testing.T) *EtcdClientTes
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
|
|
||||||
initial: make(chan bool),
|
initial: make(chan struct{}),
|
||||||
events: make(chan etcdEvent),
|
events: make(chan etcdEvent),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -235,6 +235,7 @@ func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
defer close(l.initial)
|
||||||
client.WaitForConnection()
|
client.WaitForConnection()
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(l.ctx, time.Second)
|
ctx, cancel := context.WithTimeout(l.ctx, time.Second)
|
||||||
|
@ -250,7 +251,6 @@ func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) {
|
||||||
l.t.Errorf("expected value \"1\", got \"%s\"", string(response.Kvs[0].Value))
|
l.t.Errorf("expected value \"1\", got \"%s\"", string(response.Kvs[0].Value))
|
||||||
}
|
}
|
||||||
l.initialWg.Wait()
|
l.initialWg.Wait()
|
||||||
l.initial <- true
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -269,8 +269,8 @@ type GrpcClients struct {
|
||||||
clients []*GrpcClient
|
clients []*GrpcClient
|
||||||
|
|
||||||
dnsDiscovery bool
|
dnsDiscovery bool
|
||||||
stopping chan bool
|
stopping chan struct{}
|
||||||
stopped chan bool
|
stopped chan struct{}
|
||||||
|
|
||||||
etcdClient *EtcdClient
|
etcdClient *EtcdClient
|
||||||
targetPrefix string
|
targetPrefix string
|
||||||
|
@ -280,7 +280,7 @@ type GrpcClients struct {
|
||||||
initializedCtx context.Context
|
initializedCtx context.Context
|
||||||
initializedFunc context.CancelFunc
|
initializedFunc context.CancelFunc
|
||||||
initializedWg sync.WaitGroup
|
initializedWg sync.WaitGroup
|
||||||
wakeupChanForTesting chan bool
|
wakeupChanForTesting chan struct{}
|
||||||
selfCheckWaitGroup sync.WaitGroup
|
selfCheckWaitGroup sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,8 +291,8 @@ func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient) (*GrpcCli
|
||||||
initializedCtx: initializedCtx,
|
initializedCtx: initializedCtx,
|
||||||
initializedFunc: initializedFunc,
|
initializedFunc: initializedFunc,
|
||||||
|
|
||||||
stopping: make(chan bool, 1),
|
stopping: make(chan struct{}, 1),
|
||||||
stopped: make(chan bool, 1),
|
stopped: make(chan struct{}, 1),
|
||||||
}
|
}
|
||||||
if err := result.load(config, false); err != nil {
|
if err := result.load(config, false); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -480,7 +480,7 @@ func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, fromReload bo
|
||||||
dnsDiscovery, _ := config.GetBool("grpc", "dnsdiscovery")
|
dnsDiscovery, _ := config.GetBool("grpc", "dnsdiscovery")
|
||||||
if dnsDiscovery != c.dnsDiscovery {
|
if dnsDiscovery != c.dnsDiscovery {
|
||||||
if !dnsDiscovery && fromReload {
|
if !dnsDiscovery && fromReload {
|
||||||
c.stopping <- true
|
c.stopping <- struct{}{}
|
||||||
<-c.stopped
|
<-c.stopped
|
||||||
}
|
}
|
||||||
c.dnsDiscovery = dnsDiscovery
|
c.dnsDiscovery = dnsDiscovery
|
||||||
|
@ -504,7 +504,7 @@ func (c *GrpcClients) monitorGrpcIPs() {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
c.updateGrpcIPs()
|
c.updateGrpcIPs()
|
||||||
case <-c.stopping:
|
case <-c.stopping:
|
||||||
c.stopped <- true
|
c.stopped <- struct{}{}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -746,7 +746,7 @@ func (c *GrpcClients) wakeupForTesting() {
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case c.wakeupChanForTesting <- true:
|
case c.wakeupChanForTesting <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -772,7 +772,7 @@ func (c *GrpcClients) Close() {
|
||||||
c.clients = nil
|
c.clients = nil
|
||||||
c.clientsMap = nil
|
c.clientsMap = nil
|
||||||
if c.dnsDiscovery {
|
if c.dnsDiscovery {
|
||||||
c.stopping <- true
|
c.stopping <- struct{}{}
|
||||||
<-c.stopped
|
<-c.stopped
|
||||||
c.dnsDiscovery = false
|
c.dnsDiscovery = false
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,16 @@ import (
|
||||||
"go.etcd.io/etcd/server/v3/embed"
|
"go.etcd.io/etcd/server/v3/embed"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (c *GrpcClients) getWakeupChannelForTesting() <-chan struct{} {
|
||||||
|
if c.wakeupChanForTesting != nil {
|
||||||
|
return c.wakeupChanForTesting
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan struct{}, 1)
|
||||||
|
c.wakeupChanForTesting = ch
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
func NewGrpcClientsForTestWithConfig(t *testing.T, config *goconf.ConfigFile, etcdClient *EtcdClient) *GrpcClients {
|
func NewGrpcClientsForTestWithConfig(t *testing.T, config *goconf.ConfigFile, etcdClient *EtcdClient) *GrpcClients {
|
||||||
client, err := NewGrpcClients(config, etcdClient)
|
client, err := NewGrpcClients(config, etcdClient)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -76,7 +86,7 @@ func NewGrpcClientsWithEtcdForTest(t *testing.T, etcd *embed.Etcd) *GrpcClients
|
||||||
return NewGrpcClientsForTestWithConfig(t, config, etcdClient)
|
return NewGrpcClientsForTestWithConfig(t, config, etcdClient)
|
||||||
}
|
}
|
||||||
|
|
||||||
func drainWakeupChannel(ch chan bool) {
|
func drainWakeupChannel(ch <-chan struct{}) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ch:
|
case <-ch:
|
||||||
|
@ -86,7 +96,7 @@ func drainWakeupChannel(ch chan bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitForEvent(ctx context.Context, t *testing.T, ch chan bool) {
|
func waitForEvent(ctx context.Context, t *testing.T, ch <-chan struct{}) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
@ -121,8 +131,7 @@ func Test_GrpcClients_EtcdInitial(t *testing.T) {
|
||||||
func Test_GrpcClients_EtcdUpdate(t *testing.T) {
|
func Test_GrpcClients_EtcdUpdate(t *testing.T) {
|
||||||
etcd := NewEtcdForTest(t)
|
etcd := NewEtcdForTest(t)
|
||||||
client := NewGrpcClientsWithEtcdForTest(t, etcd)
|
client := NewGrpcClientsWithEtcdForTest(t, etcd)
|
||||||
ch := make(chan bool, 1)
|
ch := client.getWakeupChannelForTesting()
|
||||||
client.wakeupChanForTesting = ch
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
@ -176,8 +185,7 @@ func Test_GrpcClients_EtcdUpdate(t *testing.T) {
|
||||||
func Test_GrpcClients_EtcdIgnoreSelf(t *testing.T) {
|
func Test_GrpcClients_EtcdIgnoreSelf(t *testing.T) {
|
||||||
etcd := NewEtcdForTest(t)
|
etcd := NewEtcdForTest(t)
|
||||||
client := NewGrpcClientsWithEtcdForTest(t, etcd)
|
client := NewGrpcClientsWithEtcdForTest(t, etcd)
|
||||||
ch := make(chan bool, 1)
|
ch := client.getWakeupChannelForTesting()
|
||||||
client.wakeupChanForTesting = ch
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
@ -234,8 +242,7 @@ func Test_GrpcClients_DnsDiscovery(t *testing.T) {
|
||||||
targetWithIp2 := fmt.Sprintf("%s (%s)", target, ip2)
|
targetWithIp2 := fmt.Sprintf("%s (%s)", target, ip2)
|
||||||
ipsResult = []net.IP{ip1}
|
ipsResult = []net.IP{ip1}
|
||||||
client := NewGrpcClientsForTest(t, target)
|
client := NewGrpcClientsForTest(t, target)
|
||||||
ch := make(chan bool, 1)
|
ch := client.getWakeupChannelForTesting()
|
||||||
client.wakeupChanForTesting = ch
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
@ -296,8 +303,7 @@ func Test_GrpcClients_DnsDiscoveryInitialFailed(t *testing.T) {
|
||||||
ip1 := net.ParseIP("192.168.0.1")
|
ip1 := net.ParseIP("192.168.0.1")
|
||||||
targetWithIp1 := fmt.Sprintf("%s (%s)", target, ip1)
|
targetWithIp1 := fmt.Sprintf("%s (%s)", target, ip1)
|
||||||
client := NewGrpcClientsForTest(t, target)
|
client := NewGrpcClientsForTest(t, target)
|
||||||
ch := make(chan bool, 1)
|
ch := client.getWakeupChannelForTesting()
|
||||||
client.wakeupChanForTesting = ch
|
|
||||||
|
|
||||||
testCtx, testCtxCancel := context.WithTimeout(context.Background(), testTimeout)
|
testCtx, testCtxCancel := context.WithTimeout(context.Background(), testTimeout)
|
||||||
defer testCtxCancel()
|
defer testCtxCancel()
|
||||||
|
|
26
mcu_janus.go
26
mcu_janus.go
|
@ -146,7 +146,7 @@ type mcuJanus struct {
|
||||||
session *JanusSession
|
session *JanusSession
|
||||||
handle *JanusHandle
|
handle *JanusHandle
|
||||||
|
|
||||||
closeChan chan bool
|
closeChan chan struct{}
|
||||||
|
|
||||||
muClients sync.Mutex
|
muClients sync.Mutex
|
||||||
clients map[clientInterface]bool
|
clients map[clientInterface]bool
|
||||||
|
@ -186,7 +186,7 @@ func NewMcuJanus(url string, config *goconf.ConfigFile) (Mcu, error) {
|
||||||
maxStreamBitrate: maxStreamBitrate,
|
maxStreamBitrate: maxStreamBitrate,
|
||||||
maxScreenBitrate: maxScreenBitrate,
|
maxScreenBitrate: maxScreenBitrate,
|
||||||
mcuTimeout: mcuTimeout,
|
mcuTimeout: mcuTimeout,
|
||||||
closeChan: make(chan bool, 1),
|
closeChan: make(chan struct{}, 1),
|
||||||
clients: make(map[clientInterface]bool),
|
clients: make(map[clientInterface]bool),
|
||||||
|
|
||||||
publishers: make(map[string]*mcuJanusPublisher),
|
publishers: make(map[string]*mcuJanusPublisher),
|
||||||
|
@ -205,14 +205,14 @@ func NewMcuJanus(url string, config *goconf.ConfigFile) (Mcu, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mcuJanus) disconnect() {
|
func (m *mcuJanus) disconnect() {
|
||||||
if m.handle != nil {
|
if handle := m.handle; handle != nil {
|
||||||
if _, err := m.handle.Detach(context.TODO()); err != nil {
|
|
||||||
log.Printf("Error detaching handle %d: %s", m.handle.Id, err)
|
|
||||||
}
|
|
||||||
m.handle = nil
|
m.handle = nil
|
||||||
|
m.closeChan <- struct{}{}
|
||||||
|
if _, err := handle.Detach(context.TODO()); err != nil {
|
||||||
|
log.Printf("Error detaching handle %d: %s", handle.Id, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if m.session != nil {
|
if m.session != nil {
|
||||||
m.closeChan <- true
|
|
||||||
if _, err := m.session.Destroy(context.TODO()); err != nil {
|
if _, err := m.session.Destroy(context.TODO()); err != nil {
|
||||||
log.Printf("Error destroying session %d: %s", m.session.Id, err)
|
log.Printf("Error destroying session %d: %s", m.session.Id, err)
|
||||||
}
|
}
|
||||||
|
@ -442,7 +442,7 @@ type mcuJanusClient struct {
|
||||||
|
|
||||||
handle *JanusHandle
|
handle *JanusHandle
|
||||||
handleId uint64
|
handleId uint64
|
||||||
closeChan chan bool
|
closeChan chan struct{}
|
||||||
deferred chan func()
|
deferred chan func()
|
||||||
|
|
||||||
handleEvent func(event *janus.EventMsg)
|
handleEvent func(event *janus.EventMsg)
|
||||||
|
@ -474,7 +474,7 @@ func (c *mcuJanusClient) SendMessage(ctx context.Context, message *MessageClient
|
||||||
func (c *mcuJanusClient) closeClient(ctx context.Context) bool {
|
func (c *mcuJanusClient) closeClient(ctx context.Context) bool {
|
||||||
if handle := c.handle; handle != nil {
|
if handle := c.handle; handle != nil {
|
||||||
c.handle = nil
|
c.handle = nil
|
||||||
c.closeChan <- true
|
close(c.closeChan)
|
||||||
if _, err := handle.Detach(ctx); err != nil {
|
if _, err := handle.Detach(ctx); err != nil {
|
||||||
if e, ok := err.(*janus.ErrorMsg); !ok || e.Err.Code != JANUS_ERROR_HANDLE_NOT_FOUND {
|
if e, ok := err.(*janus.ErrorMsg); !ok || e.Err.Code != JANUS_ERROR_HANDLE_NOT_FOUND {
|
||||||
log.Println("Could not detach client", handle.Id, err)
|
log.Println("Could not detach client", handle.Id, err)
|
||||||
|
@ -486,7 +486,7 @@ func (c *mcuJanusClient) closeClient(ctx context.Context) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *mcuJanusClient) run(handle *JanusHandle, closeChan chan bool) {
|
func (c *mcuJanusClient) run(handle *JanusHandle, closeChan <-chan struct{}) {
|
||||||
loop:
|
loop:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -807,7 +807,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
|
||||||
|
|
||||||
handle: handle,
|
handle: handle,
|
||||||
handleId: handle.Id,
|
handleId: handle.Id,
|
||||||
closeChan: make(chan bool, 1),
|
closeChan: make(chan struct{}, 1),
|
||||||
deferred: make(chan func(), 64),
|
deferred: make(chan func(), 64),
|
||||||
},
|
},
|
||||||
id: id,
|
id: id,
|
||||||
|
@ -1047,7 +1047,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ
|
||||||
|
|
||||||
handle: handle,
|
handle: handle,
|
||||||
handleId: handle.Id,
|
handleId: handle.Id,
|
||||||
closeChan: make(chan bool, 1),
|
closeChan: make(chan struct{}, 1),
|
||||||
deferred: make(chan func(), 64),
|
deferred: make(chan func(), 64),
|
||||||
},
|
},
|
||||||
publisher: publisher,
|
publisher: publisher,
|
||||||
|
@ -1209,7 +1209,7 @@ retry:
|
||||||
p.roomId = pub.roomId
|
p.roomId = pub.roomId
|
||||||
p.sid = strconv.FormatUint(handle.Id, 10)
|
p.sid = strconv.FormatUint(handle.Id, 10)
|
||||||
p.listener.SubscriberSidUpdated(p)
|
p.listener.SubscriberSidUpdated(p)
|
||||||
p.closeChan = make(chan bool, 1)
|
p.closeChan = make(chan struct{}, 1)
|
||||||
go p.run(p.handle, p.closeChan)
|
go p.run(p.handle, p.closeChan)
|
||||||
log.Printf("Already connected subscriber %d for %s, leaving and re-joining on handle %d", p.id, p.streamType, p.handleId)
|
log.Printf("Already connected subscriber %d for %s, leaving and re-joining on handle %d", p.id, p.streamType, p.handleId)
|
||||||
goto retry
|
goto retry
|
||||||
|
|
14
mcu_proxy.go
14
mcu_proxy.go
|
@ -1115,8 +1115,8 @@ type mcuProxy struct {
|
||||||
proxyTimeout time.Duration
|
proxyTimeout time.Duration
|
||||||
|
|
||||||
dnsDiscovery bool
|
dnsDiscovery bool
|
||||||
stopping chan bool
|
stopping chan struct{}
|
||||||
stopped chan bool
|
stopped chan struct{}
|
||||||
|
|
||||||
maxStreamBitrate int
|
maxStreamBitrate int
|
||||||
maxScreenBitrate int
|
maxScreenBitrate int
|
||||||
|
@ -1184,8 +1184,8 @@ func NewMcuProxy(config *goconf.ConfigFile, etcdClient *EtcdClient, rpcClients *
|
||||||
connectionsMap: make(map[string][]*mcuProxyConnection),
|
connectionsMap: make(map[string][]*mcuProxyConnection),
|
||||||
proxyTimeout: proxyTimeout,
|
proxyTimeout: proxyTimeout,
|
||||||
|
|
||||||
stopping: make(chan bool, 1),
|
stopping: make(chan struct{}, 1),
|
||||||
stopped: make(chan bool, 1),
|
stopped: make(chan struct{}, 1),
|
||||||
|
|
||||||
maxStreamBitrate: maxStreamBitrate,
|
maxStreamBitrate: maxStreamBitrate,
|
||||||
maxScreenBitrate: maxScreenBitrate,
|
maxScreenBitrate: maxScreenBitrate,
|
||||||
|
@ -1303,7 +1303,7 @@ func (m *mcuProxy) Stop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.urlType == proxyUrlTypeStatic && m.dnsDiscovery {
|
if m.urlType == proxyUrlTypeStatic && m.dnsDiscovery {
|
||||||
m.stopping <- true
|
m.stopping <- struct{}{}
|
||||||
<-m.stopped
|
<-m.stopped
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1316,7 +1316,7 @@ func (m *mcuProxy) monitorProxyIPs() {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
m.updateProxyIPs()
|
m.updateProxyIPs()
|
||||||
case <-m.stopping:
|
case <-m.stopping:
|
||||||
m.stopped <- true
|
m.stopped <- struct{}{}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1408,7 +1408,7 @@ func (m *mcuProxy) configureStatic(config *goconf.ConfigFile, fromReload bool) e
|
||||||
dnsDiscovery, _ := config.GetBool("mcu", "dnsdiscovery")
|
dnsDiscovery, _ := config.GetBool("mcu", "dnsdiscovery")
|
||||||
if dnsDiscovery != m.dnsDiscovery {
|
if dnsDiscovery != m.dnsDiscovery {
|
||||||
if !dnsDiscovery && fromReload {
|
if !dnsDiscovery && fromReload {
|
||||||
m.stopping <- true
|
m.stopping <- struct{}{}
|
||||||
<-m.stopped
|
<-m.stopped
|
||||||
}
|
}
|
||||||
m.dnsDiscovery = dnsDiscovery
|
m.dnsDiscovery = dnsDiscovery
|
||||||
|
|
|
@ -61,14 +61,15 @@ func testNatsClient_Subscribe(t *testing.T, client NatsClient) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
ch := make(chan bool)
|
ch := make(chan struct{})
|
||||||
|
|
||||||
received := int32(0)
|
received := int32(0)
|
||||||
max := int32(20)
|
max := int32(20)
|
||||||
ready := make(chan bool)
|
ready := make(chan struct{})
|
||||||
quit := make(chan bool)
|
quit := make(chan struct{})
|
||||||
|
defer close(quit)
|
||||||
go func() {
|
go func() {
|
||||||
ready <- true
|
close(ready)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-dest:
|
case <-dest:
|
||||||
|
@ -79,7 +80,7 @@ func testNatsClient_Subscribe(t *testing.T, client NatsClient) {
|
||||||
t.Errorf("Unsubscribe failed with err: %s", err)
|
t.Errorf("Unsubscribe failed with err: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ch <- true
|
close(ch)
|
||||||
}
|
}
|
||||||
case <-quit:
|
case <-quit:
|
||||||
return
|
return
|
||||||
|
@ -101,7 +102,6 @@ func testNatsClient_Subscribe(t *testing.T, client NatsClient) {
|
||||||
if r != max {
|
if r != max {
|
||||||
t.Fatalf("Received wrong # of messages: %d vs %d", r, max)
|
t.Fatalf("Received wrong # of messages: %d vs %d", r, max)
|
||||||
}
|
}
|
||||||
quit <- true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNatsClient_Subscribe(t *testing.T) {
|
func TestNatsClient_Subscribe(t *testing.T) {
|
||||||
|
|
|
@ -92,7 +92,7 @@ type ProxyServer struct {
|
||||||
mcu signaling.Mcu
|
mcu signaling.Mcu
|
||||||
stopped uint32
|
stopped uint32
|
||||||
|
|
||||||
shutdownChannel chan bool
|
shutdownChannel chan struct{}
|
||||||
shutdownScheduled uint32
|
shutdownScheduled uint32
|
||||||
|
|
||||||
upgrader websocket.Upgrader
|
upgrader websocket.Upgrader
|
||||||
|
@ -167,7 +167,7 @@ func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile) (*
|
||||||
version: version,
|
version: version,
|
||||||
country: country,
|
country: country,
|
||||||
|
|
||||||
shutdownChannel: make(chan bool, 1),
|
shutdownChannel: make(chan struct{}),
|
||||||
|
|
||||||
upgrader: websocket.Upgrader{
|
upgrader: websocket.Upgrader{
|
||||||
ReadBufferSize: websocketReadBufferSize,
|
ReadBufferSize: websocketReadBufferSize,
|
||||||
|
@ -359,7 +359,7 @@ func (s *ProxyServer) Stop() {
|
||||||
s.tokens.Close()
|
s.tokens.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ProxyServer) ShutdownChannel() chan bool {
|
func (s *ProxyServer) ShutdownChannel() <-chan struct{} {
|
||||||
return s.shutdownChannel
|
return s.shutdownChannel
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -379,9 +379,7 @@ func (s *ProxyServer) ScheduleShutdown() {
|
||||||
})
|
})
|
||||||
|
|
||||||
if s.GetClientCount() == 0 {
|
if s.GetClientCount() == 0 {
|
||||||
go func() {
|
go close(s.shutdownChannel)
|
||||||
s.shutdownChannel <- true
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -957,9 +955,7 @@ func (s *ProxyServer) DeleteClient(id string, client signaling.McuClient) bool {
|
||||||
delete(s.clientIds, client.Id())
|
delete(s.clientIds, client.Id())
|
||||||
|
|
||||||
if len(s.clients) == 0 && atomic.LoadUint32(&s.shutdownScheduled) != 0 {
|
if len(s.clients) == 0 && atomic.LoadUint32(&s.shutdownScheduled) != 0 {
|
||||||
go func() {
|
go close(s.shutdownChannel)
|
||||||
s.shutdownChannel <- true
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue