Use "struct{}" channel if only used as signaling mechanism.

This commit is contained in:
Joachim Bauch 2023-06-12 12:46:47 +02:00
parent e83cdd67ad
commit fd29f83454
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
12 changed files with 104 additions and 78 deletions

View file

@ -505,8 +505,7 @@ func TestBackendConfiguration_Etcd(t *testing.T) {
defer cfg.Close()
storage := cfg.storage.(*backendStorageEtcd)
ch := make(chan bool, 1)
storage.SetWakeupForTesting(ch)
ch := storage.getWakeupChannelForTesting()
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

View file

@ -44,7 +44,7 @@ type backendStorageEtcd struct {
initializedCtx context.Context
initializedFunc context.CancelFunc
initializedWg sync.WaitGroup
wakeupChanForTesting chan bool
wakeupChanForTesting chan struct{}
}
func NewBackendStorageEtcd(config *goconf.ConfigFile, etcdClient *EtcdClient) (BackendStorage, error) {
@ -83,20 +83,13 @@ func (s *backendStorageEtcd) WaitForInitialized(ctx context.Context) error {
}
}
func (s *backendStorageEtcd) SetWakeupForTesting(ch chan bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.wakeupChanForTesting = ch
}
func (s *backendStorageEtcd) wakeupForTesting() {
if s.wakeupChanForTesting == nil {
return
}
select {
case s.wakeupChanForTesting <- true:
case s.wakeupChanForTesting <- struct{}{}:
default:
}
}

View file

@ -0,0 +1,35 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2023 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
func (s *backendStorageEtcd) getWakeupChannelForTesting() <-chan struct{} {
s.mu.Lock()
defer s.mu.Unlock()
if s.wakeupChanForTesting != nil {
return s.wakeupChanForTesting
}
ch := make(chan struct{}, 1)
s.wakeupChanForTesting = ch
return ch
}

View file

@ -127,7 +127,7 @@ type SignalingClient struct {
stats *Stats
closed uint32
stopChan chan bool
stopChan chan struct{}
lock sync.Mutex
privateSessionId string
@ -149,7 +149,7 @@ func NewSignalingClient(cookie *securecookie.SecureCookie, url string, stats *St
stats: stats,
stopChan: make(chan bool),
stopChan: make(chan struct{}),
}
doneWg.Add(2)
go func() {
@ -169,10 +169,7 @@ func (c *SignalingClient) Close() {
}
// Signal writepump to terminate
select {
case c.stopChan <- true:
default:
}
close(c.stopChan)
c.lock.Lock()
c.publicSessionId = ""

View file

@ -69,13 +69,13 @@ func TestDeferredExecutor_Order(t *testing.T) {
}
}
done := make(chan bool)
done := make(chan struct{})
for x := 0; x < 10; x++ {
e.Execute(getFunc(x))
}
e.Execute(func() {
done <- true
close(done)
})
<-done
@ -90,10 +90,10 @@ func TestDeferredExecutor_CloseFromFunc(t *testing.T) {
e := NewDeferredExecutor(64)
defer e.waitForStop()
done := make(chan bool)
done := make(chan struct{})
e.Execute(func() {
defer close(done)
e.Close()
done <- true
})
<-done

View file

@ -204,7 +204,7 @@ type EtcdClientTestListener struct {
ctx context.Context
cancel context.CancelFunc
initial chan bool
initial chan struct{}
initialWg sync.WaitGroup
events chan etcdEvent
}
@ -217,7 +217,7 @@ func NewEtcdClientTestListener(ctx context.Context, t *testing.T) *EtcdClientTes
ctx: ctx,
cancel: cancel,
initial: make(chan bool),
initial: make(chan struct{}),
events: make(chan etcdEvent),
}
}
@ -235,6 +235,7 @@ func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) {
}()
go func() {
defer close(l.initial)
client.WaitForConnection()
ctx, cancel := context.WithTimeout(l.ctx, time.Second)
@ -250,7 +251,6 @@ func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) {
l.t.Errorf("expected value \"1\", got \"%s\"", string(response.Kvs[0].Value))
}
l.initialWg.Wait()
l.initial <- true
}()
}

View file

@ -269,8 +269,8 @@ type GrpcClients struct {
clients []*GrpcClient
dnsDiscovery bool
stopping chan bool
stopped chan bool
stopping chan struct{}
stopped chan struct{}
etcdClient *EtcdClient
targetPrefix string
@ -280,7 +280,7 @@ type GrpcClients struct {
initializedCtx context.Context
initializedFunc context.CancelFunc
initializedWg sync.WaitGroup
wakeupChanForTesting chan bool
wakeupChanForTesting chan struct{}
selfCheckWaitGroup sync.WaitGroup
}
@ -291,8 +291,8 @@ func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient) (*GrpcCli
initializedCtx: initializedCtx,
initializedFunc: initializedFunc,
stopping: make(chan bool, 1),
stopped: make(chan bool, 1),
stopping: make(chan struct{}, 1),
stopped: make(chan struct{}, 1),
}
if err := result.load(config, false); err != nil {
return nil, err
@ -480,7 +480,7 @@ func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, fromReload bo
dnsDiscovery, _ := config.GetBool("grpc", "dnsdiscovery")
if dnsDiscovery != c.dnsDiscovery {
if !dnsDiscovery && fromReload {
c.stopping <- true
c.stopping <- struct{}{}
<-c.stopped
}
c.dnsDiscovery = dnsDiscovery
@ -504,7 +504,7 @@ func (c *GrpcClients) monitorGrpcIPs() {
case <-ticker.C:
c.updateGrpcIPs()
case <-c.stopping:
c.stopped <- true
c.stopped <- struct{}{}
return
}
}
@ -746,7 +746,7 @@ func (c *GrpcClients) wakeupForTesting() {
}
select {
case c.wakeupChanForTesting <- true:
case c.wakeupChanForTesting <- struct{}{}:
default:
}
}
@ -772,7 +772,7 @@ func (c *GrpcClients) Close() {
c.clients = nil
c.clientsMap = nil
if c.dnsDiscovery {
c.stopping <- true
c.stopping <- struct{}{}
<-c.stopped
c.dnsDiscovery = false
}

View file

@ -36,6 +36,16 @@ import (
"go.etcd.io/etcd/server/v3/embed"
)
func (c *GrpcClients) getWakeupChannelForTesting() <-chan struct{} {
if c.wakeupChanForTesting != nil {
return c.wakeupChanForTesting
}
ch := make(chan struct{}, 1)
c.wakeupChanForTesting = ch
return ch
}
func NewGrpcClientsForTestWithConfig(t *testing.T, config *goconf.ConfigFile, etcdClient *EtcdClient) *GrpcClients {
client, err := NewGrpcClients(config, etcdClient)
if err != nil {
@ -76,7 +86,7 @@ func NewGrpcClientsWithEtcdForTest(t *testing.T, etcd *embed.Etcd) *GrpcClients
return NewGrpcClientsForTestWithConfig(t, config, etcdClient)
}
func drainWakeupChannel(ch chan bool) {
func drainWakeupChannel(ch <-chan struct{}) {
for {
select {
case <-ch:
@ -86,7 +96,7 @@ func drainWakeupChannel(ch chan bool) {
}
}
func waitForEvent(ctx context.Context, t *testing.T, ch chan bool) {
func waitForEvent(ctx context.Context, t *testing.T, ch <-chan struct{}) {
t.Helper()
select {
@ -121,8 +131,7 @@ func Test_GrpcClients_EtcdInitial(t *testing.T) {
func Test_GrpcClients_EtcdUpdate(t *testing.T) {
etcd := NewEtcdForTest(t)
client := NewGrpcClientsWithEtcdForTest(t, etcd)
ch := make(chan bool, 1)
client.wakeupChanForTesting = ch
ch := client.getWakeupChannelForTesting()
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
@ -176,8 +185,7 @@ func Test_GrpcClients_EtcdUpdate(t *testing.T) {
func Test_GrpcClients_EtcdIgnoreSelf(t *testing.T) {
etcd := NewEtcdForTest(t)
client := NewGrpcClientsWithEtcdForTest(t, etcd)
ch := make(chan bool, 1)
client.wakeupChanForTesting = ch
ch := client.getWakeupChannelForTesting()
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
@ -234,8 +242,7 @@ func Test_GrpcClients_DnsDiscovery(t *testing.T) {
targetWithIp2 := fmt.Sprintf("%s (%s)", target, ip2)
ipsResult = []net.IP{ip1}
client := NewGrpcClientsForTest(t, target)
ch := make(chan bool, 1)
client.wakeupChanForTesting = ch
ch := client.getWakeupChannelForTesting()
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
@ -296,8 +303,7 @@ func Test_GrpcClients_DnsDiscoveryInitialFailed(t *testing.T) {
ip1 := net.ParseIP("192.168.0.1")
targetWithIp1 := fmt.Sprintf("%s (%s)", target, ip1)
client := NewGrpcClientsForTest(t, target)
ch := make(chan bool, 1)
client.wakeupChanForTesting = ch
ch := client.getWakeupChannelForTesting()
testCtx, testCtxCancel := context.WithTimeout(context.Background(), testTimeout)
defer testCtxCancel()

View file

@ -146,7 +146,7 @@ type mcuJanus struct {
session *JanusSession
handle *JanusHandle
closeChan chan bool
closeChan chan struct{}
muClients sync.Mutex
clients map[clientInterface]bool
@ -186,7 +186,7 @@ func NewMcuJanus(url string, config *goconf.ConfigFile) (Mcu, error) {
maxStreamBitrate: maxStreamBitrate,
maxScreenBitrate: maxScreenBitrate,
mcuTimeout: mcuTimeout,
closeChan: make(chan bool, 1),
closeChan: make(chan struct{}, 1),
clients: make(map[clientInterface]bool),
publishers: make(map[string]*mcuJanusPublisher),
@ -205,14 +205,14 @@ func NewMcuJanus(url string, config *goconf.ConfigFile) (Mcu, error) {
}
func (m *mcuJanus) disconnect() {
if m.handle != nil {
if _, err := m.handle.Detach(context.TODO()); err != nil {
log.Printf("Error detaching handle %d: %s", m.handle.Id, err)
}
if handle := m.handle; handle != nil {
m.handle = nil
m.closeChan <- struct{}{}
if _, err := handle.Detach(context.TODO()); err != nil {
log.Printf("Error detaching handle %d: %s", handle.Id, err)
}
}
if m.session != nil {
m.closeChan <- true
if _, err := m.session.Destroy(context.TODO()); err != nil {
log.Printf("Error destroying session %d: %s", m.session.Id, err)
}
@ -442,7 +442,7 @@ type mcuJanusClient struct {
handle *JanusHandle
handleId uint64
closeChan chan bool
closeChan chan struct{}
deferred chan func()
handleEvent func(event *janus.EventMsg)
@ -474,7 +474,7 @@ func (c *mcuJanusClient) SendMessage(ctx context.Context, message *MessageClient
func (c *mcuJanusClient) closeClient(ctx context.Context) bool {
if handle := c.handle; handle != nil {
c.handle = nil
c.closeChan <- true
close(c.closeChan)
if _, err := handle.Detach(ctx); err != nil {
if e, ok := err.(*janus.ErrorMsg); !ok || e.Err.Code != JANUS_ERROR_HANDLE_NOT_FOUND {
log.Println("Could not detach client", handle.Id, err)
@ -486,7 +486,7 @@ func (c *mcuJanusClient) closeClient(ctx context.Context) bool {
return false
}
func (c *mcuJanusClient) run(handle *JanusHandle, closeChan chan bool) {
func (c *mcuJanusClient) run(handle *JanusHandle, closeChan <-chan struct{}) {
loop:
for {
select {
@ -807,7 +807,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
handle: handle,
handleId: handle.Id,
closeChan: make(chan bool, 1),
closeChan: make(chan struct{}, 1),
deferred: make(chan func(), 64),
},
id: id,
@ -1047,7 +1047,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ
handle: handle,
handleId: handle.Id,
closeChan: make(chan bool, 1),
closeChan: make(chan struct{}, 1),
deferred: make(chan func(), 64),
},
publisher: publisher,
@ -1209,7 +1209,7 @@ retry:
p.roomId = pub.roomId
p.sid = strconv.FormatUint(handle.Id, 10)
p.listener.SubscriberSidUpdated(p)
p.closeChan = make(chan bool, 1)
p.closeChan = make(chan struct{}, 1)
go p.run(p.handle, p.closeChan)
log.Printf("Already connected subscriber %d for %s, leaving and re-joining on handle %d", p.id, p.streamType, p.handleId)
goto retry

View file

@ -1115,8 +1115,8 @@ type mcuProxy struct {
proxyTimeout time.Duration
dnsDiscovery bool
stopping chan bool
stopped chan bool
stopping chan struct{}
stopped chan struct{}
maxStreamBitrate int
maxScreenBitrate int
@ -1184,8 +1184,8 @@ func NewMcuProxy(config *goconf.ConfigFile, etcdClient *EtcdClient, rpcClients *
connectionsMap: make(map[string][]*mcuProxyConnection),
proxyTimeout: proxyTimeout,
stopping: make(chan bool, 1),
stopped: make(chan bool, 1),
stopping: make(chan struct{}, 1),
stopped: make(chan struct{}, 1),
maxStreamBitrate: maxStreamBitrate,
maxScreenBitrate: maxScreenBitrate,
@ -1303,7 +1303,7 @@ func (m *mcuProxy) Stop() {
}
if m.urlType == proxyUrlTypeStatic && m.dnsDiscovery {
m.stopping <- true
m.stopping <- struct{}{}
<-m.stopped
}
}
@ -1316,7 +1316,7 @@ func (m *mcuProxy) monitorProxyIPs() {
case <-ticker.C:
m.updateProxyIPs()
case <-m.stopping:
m.stopped <- true
m.stopped <- struct{}{}
return
}
}
@ -1408,7 +1408,7 @@ func (m *mcuProxy) configureStatic(config *goconf.ConfigFile, fromReload bool) e
dnsDiscovery, _ := config.GetBool("mcu", "dnsdiscovery")
if dnsDiscovery != m.dnsDiscovery {
if !dnsDiscovery && fromReload {
m.stopping <- true
m.stopping <- struct{}{}
<-m.stopped
}
m.dnsDiscovery = dnsDiscovery

View file

@ -61,14 +61,15 @@ func testNatsClient_Subscribe(t *testing.T, client NatsClient) {
if err != nil {
t.Fatal(err)
}
ch := make(chan bool)
ch := make(chan struct{})
received := int32(0)
max := int32(20)
ready := make(chan bool)
quit := make(chan bool)
ready := make(chan struct{})
quit := make(chan struct{})
defer close(quit)
go func() {
ready <- true
close(ready)
for {
select {
case <-dest:
@ -79,7 +80,7 @@ func testNatsClient_Subscribe(t *testing.T, client NatsClient) {
t.Errorf("Unsubscribe failed with err: %s", err)
return
}
ch <- true
close(ch)
}
case <-quit:
return
@ -101,7 +102,6 @@ func testNatsClient_Subscribe(t *testing.T, client NatsClient) {
if r != max {
t.Fatalf("Received wrong # of messages: %d vs %d", r, max)
}
quit <- true
}
func TestNatsClient_Subscribe(t *testing.T) {

View file

@ -92,7 +92,7 @@ type ProxyServer struct {
mcu signaling.Mcu
stopped uint32
shutdownChannel chan bool
shutdownChannel chan struct{}
shutdownScheduled uint32
upgrader websocket.Upgrader
@ -167,7 +167,7 @@ func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile) (*
version: version,
country: country,
shutdownChannel: make(chan bool, 1),
shutdownChannel: make(chan struct{}),
upgrader: websocket.Upgrader{
ReadBufferSize: websocketReadBufferSize,
@ -359,7 +359,7 @@ func (s *ProxyServer) Stop() {
s.tokens.Close()
}
func (s *ProxyServer) ShutdownChannel() chan bool {
func (s *ProxyServer) ShutdownChannel() <-chan struct{} {
return s.shutdownChannel
}
@ -379,9 +379,7 @@ func (s *ProxyServer) ScheduleShutdown() {
})
if s.GetClientCount() == 0 {
go func() {
s.shutdownChannel <- true
}()
go close(s.shutdownChannel)
}
}
@ -957,9 +955,7 @@ func (s *ProxyServer) DeleteClient(id string, client signaling.McuClient) bool {
delete(s.clientIds, client.Id())
if len(s.clients) == 0 && atomic.LoadUint32(&s.shutdownScheduled) != 0 {
go func() {
s.shutdownChannel <- true
}()
go close(s.shutdownChannel)
}
return true
}