From fd29f8345432e9cacda8714a4eb2132bd4d97680 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Mon, 12 Jun 2023 12:46:47 +0200 Subject: [PATCH] Use "struct{}" channel if only used as signaling mechanism. --- backend_configuration_test.go | 3 +-- backend_storage_etcd.go | 11 ++--------- backend_storage_etcd_test.go | 35 +++++++++++++++++++++++++++++++++++ client/main.go | 9 +++------ deferred_executor_test.go | 8 ++++---- etcd_client_test.go | 6 +++--- grpc_client.go | 18 +++++++++--------- grpc_client_test.go | 26 ++++++++++++++++---------- mcu_janus.go | 26 +++++++++++++------------- mcu_proxy.go | 14 +++++++------- natsclient_test.go | 12 ++++++------ proxy/proxy_server.go | 14 +++++--------- 12 files changed, 104 insertions(+), 78 deletions(-) create mode 100644 backend_storage_etcd_test.go diff --git a/backend_configuration_test.go b/backend_configuration_test.go index d06cf6b..0a6389f 100644 --- a/backend_configuration_test.go +++ b/backend_configuration_test.go @@ -505,8 +505,7 @@ func TestBackendConfiguration_Etcd(t *testing.T) { defer cfg.Close() storage := cfg.storage.(*backendStorageEtcd) - ch := make(chan bool, 1) - storage.SetWakeupForTesting(ch) + ch := storage.getWakeupChannelForTesting() ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() diff --git a/backend_storage_etcd.go b/backend_storage_etcd.go index a33b216..08717c2 100644 --- a/backend_storage_etcd.go +++ b/backend_storage_etcd.go @@ -44,7 +44,7 @@ type backendStorageEtcd struct { initializedCtx context.Context initializedFunc context.CancelFunc initializedWg sync.WaitGroup - wakeupChanForTesting chan bool + wakeupChanForTesting chan struct{} } func NewBackendStorageEtcd(config *goconf.ConfigFile, etcdClient *EtcdClient) (BackendStorage, error) { @@ -83,20 +83,13 @@ func (s *backendStorageEtcd) WaitForInitialized(ctx context.Context) error { } } -func (s *backendStorageEtcd) SetWakeupForTesting(ch chan bool) { - s.mu.Lock() - defer s.mu.Unlock() - - s.wakeupChanForTesting = ch -} - func (s *backendStorageEtcd) wakeupForTesting() { if s.wakeupChanForTesting == nil { return } select { - case s.wakeupChanForTesting <- true: + case s.wakeupChanForTesting <- struct{}{}: default: } } diff --git a/backend_storage_etcd_test.go b/backend_storage_etcd_test.go new file mode 100644 index 0000000..bc1f83d --- /dev/null +++ b/backend_storage_etcd_test.go @@ -0,0 +1,35 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2023 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +func (s *backendStorageEtcd) getWakeupChannelForTesting() <-chan struct{} { + s.mu.Lock() + defer s.mu.Unlock() + + if s.wakeupChanForTesting != nil { + return s.wakeupChanForTesting + } + + ch := make(chan struct{}, 1) + s.wakeupChanForTesting = ch + return ch +} diff --git a/client/main.go b/client/main.go index a9b2229..e3d3a35 100644 --- a/client/main.go +++ b/client/main.go @@ -127,7 +127,7 @@ type SignalingClient struct { stats *Stats closed uint32 - stopChan chan bool + stopChan chan struct{} lock sync.Mutex privateSessionId string @@ -149,7 +149,7 @@ func NewSignalingClient(cookie *securecookie.SecureCookie, url string, stats *St stats: stats, - stopChan: make(chan bool), + stopChan: make(chan struct{}), } doneWg.Add(2) go func() { @@ -169,10 +169,7 @@ func (c *SignalingClient) Close() { } // Signal writepump to terminate - select { - case c.stopChan <- true: - default: - } + close(c.stopChan) c.lock.Lock() c.publicSessionId = "" diff --git a/deferred_executor_test.go b/deferred_executor_test.go index 6e1b12c..4cf3d1f 100644 --- a/deferred_executor_test.go +++ b/deferred_executor_test.go @@ -69,13 +69,13 @@ func TestDeferredExecutor_Order(t *testing.T) { } } - done := make(chan bool) + done := make(chan struct{}) for x := 0; x < 10; x++ { e.Execute(getFunc(x)) } e.Execute(func() { - done <- true + close(done) }) <-done @@ -90,10 +90,10 @@ func TestDeferredExecutor_CloseFromFunc(t *testing.T) { e := NewDeferredExecutor(64) defer e.waitForStop() - done := make(chan bool) + done := make(chan struct{}) e.Execute(func() { + defer close(done) e.Close() - done <- true }) <-done diff --git a/etcd_client_test.go b/etcd_client_test.go index 0a673e2..dd85ddb 100644 --- a/etcd_client_test.go +++ b/etcd_client_test.go @@ -204,7 +204,7 @@ type EtcdClientTestListener struct { ctx context.Context cancel context.CancelFunc - initial chan bool + initial chan struct{} initialWg sync.WaitGroup events chan etcdEvent } @@ -217,7 +217,7 @@ func NewEtcdClientTestListener(ctx context.Context, t *testing.T) *EtcdClientTes ctx: ctx, cancel: cancel, - initial: make(chan bool), + initial: make(chan struct{}), events: make(chan etcdEvent), } } @@ -235,6 +235,7 @@ func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) { }() go func() { + defer close(l.initial) client.WaitForConnection() ctx, cancel := context.WithTimeout(l.ctx, time.Second) @@ -250,7 +251,6 @@ func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) { l.t.Errorf("expected value \"1\", got \"%s\"", string(response.Kvs[0].Value)) } l.initialWg.Wait() - l.initial <- true }() } diff --git a/grpc_client.go b/grpc_client.go index 18ef04a..82b649d 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -269,8 +269,8 @@ type GrpcClients struct { clients []*GrpcClient dnsDiscovery bool - stopping chan bool - stopped chan bool + stopping chan struct{} + stopped chan struct{} etcdClient *EtcdClient targetPrefix string @@ -280,7 +280,7 @@ type GrpcClients struct { initializedCtx context.Context initializedFunc context.CancelFunc initializedWg sync.WaitGroup - wakeupChanForTesting chan bool + wakeupChanForTesting chan struct{} selfCheckWaitGroup sync.WaitGroup } @@ -291,8 +291,8 @@ func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient) (*GrpcCli initializedCtx: initializedCtx, initializedFunc: initializedFunc, - stopping: make(chan bool, 1), - stopped: make(chan bool, 1), + stopping: make(chan struct{}, 1), + stopped: make(chan struct{}, 1), } if err := result.load(config, false); err != nil { return nil, err @@ -480,7 +480,7 @@ func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, fromReload bo dnsDiscovery, _ := config.GetBool("grpc", "dnsdiscovery") if dnsDiscovery != c.dnsDiscovery { if !dnsDiscovery && fromReload { - c.stopping <- true + c.stopping <- struct{}{} <-c.stopped } c.dnsDiscovery = dnsDiscovery @@ -504,7 +504,7 @@ func (c *GrpcClients) monitorGrpcIPs() { case <-ticker.C: c.updateGrpcIPs() case <-c.stopping: - c.stopped <- true + c.stopped <- struct{}{} return } } @@ -746,7 +746,7 @@ func (c *GrpcClients) wakeupForTesting() { } select { - case c.wakeupChanForTesting <- true: + case c.wakeupChanForTesting <- struct{}{}: default: } } @@ -772,7 +772,7 @@ func (c *GrpcClients) Close() { c.clients = nil c.clientsMap = nil if c.dnsDiscovery { - c.stopping <- true + c.stopping <- struct{}{} <-c.stopped c.dnsDiscovery = false } diff --git a/grpc_client_test.go b/grpc_client_test.go index 8f79485..bc63c9b 100644 --- a/grpc_client_test.go +++ b/grpc_client_test.go @@ -36,6 +36,16 @@ import ( "go.etcd.io/etcd/server/v3/embed" ) +func (c *GrpcClients) getWakeupChannelForTesting() <-chan struct{} { + if c.wakeupChanForTesting != nil { + return c.wakeupChanForTesting + } + + ch := make(chan struct{}, 1) + c.wakeupChanForTesting = ch + return ch +} + func NewGrpcClientsForTestWithConfig(t *testing.T, config *goconf.ConfigFile, etcdClient *EtcdClient) *GrpcClients { client, err := NewGrpcClients(config, etcdClient) if err != nil { @@ -76,7 +86,7 @@ func NewGrpcClientsWithEtcdForTest(t *testing.T, etcd *embed.Etcd) *GrpcClients return NewGrpcClientsForTestWithConfig(t, config, etcdClient) } -func drainWakeupChannel(ch chan bool) { +func drainWakeupChannel(ch <-chan struct{}) { for { select { case <-ch: @@ -86,7 +96,7 @@ func drainWakeupChannel(ch chan bool) { } } -func waitForEvent(ctx context.Context, t *testing.T, ch chan bool) { +func waitForEvent(ctx context.Context, t *testing.T, ch <-chan struct{}) { t.Helper() select { @@ -121,8 +131,7 @@ func Test_GrpcClients_EtcdInitial(t *testing.T) { func Test_GrpcClients_EtcdUpdate(t *testing.T) { etcd := NewEtcdForTest(t) client := NewGrpcClientsWithEtcdForTest(t, etcd) - ch := make(chan bool, 1) - client.wakeupChanForTesting = ch + ch := client.getWakeupChannelForTesting() ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() @@ -176,8 +185,7 @@ func Test_GrpcClients_EtcdUpdate(t *testing.T) { func Test_GrpcClients_EtcdIgnoreSelf(t *testing.T) { etcd := NewEtcdForTest(t) client := NewGrpcClientsWithEtcdForTest(t, etcd) - ch := make(chan bool, 1) - client.wakeupChanForTesting = ch + ch := client.getWakeupChannelForTesting() ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() @@ -234,8 +242,7 @@ func Test_GrpcClients_DnsDiscovery(t *testing.T) { targetWithIp2 := fmt.Sprintf("%s (%s)", target, ip2) ipsResult = []net.IP{ip1} client := NewGrpcClientsForTest(t, target) - ch := make(chan bool, 1) - client.wakeupChanForTesting = ch + ch := client.getWakeupChannelForTesting() ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() @@ -296,8 +303,7 @@ func Test_GrpcClients_DnsDiscoveryInitialFailed(t *testing.T) { ip1 := net.ParseIP("192.168.0.1") targetWithIp1 := fmt.Sprintf("%s (%s)", target, ip1) client := NewGrpcClientsForTest(t, target) - ch := make(chan bool, 1) - client.wakeupChanForTesting = ch + ch := client.getWakeupChannelForTesting() testCtx, testCtxCancel := context.WithTimeout(context.Background(), testTimeout) defer testCtxCancel() diff --git a/mcu_janus.go b/mcu_janus.go index 06f6a9b..c5ca17f 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -146,7 +146,7 @@ type mcuJanus struct { session *JanusSession handle *JanusHandle - closeChan chan bool + closeChan chan struct{} muClients sync.Mutex clients map[clientInterface]bool @@ -186,7 +186,7 @@ func NewMcuJanus(url string, config *goconf.ConfigFile) (Mcu, error) { maxStreamBitrate: maxStreamBitrate, maxScreenBitrate: maxScreenBitrate, mcuTimeout: mcuTimeout, - closeChan: make(chan bool, 1), + closeChan: make(chan struct{}, 1), clients: make(map[clientInterface]bool), publishers: make(map[string]*mcuJanusPublisher), @@ -205,14 +205,14 @@ func NewMcuJanus(url string, config *goconf.ConfigFile) (Mcu, error) { } func (m *mcuJanus) disconnect() { - if m.handle != nil { - if _, err := m.handle.Detach(context.TODO()); err != nil { - log.Printf("Error detaching handle %d: %s", m.handle.Id, err) - } + if handle := m.handle; handle != nil { m.handle = nil + m.closeChan <- struct{}{} + if _, err := handle.Detach(context.TODO()); err != nil { + log.Printf("Error detaching handle %d: %s", handle.Id, err) + } } if m.session != nil { - m.closeChan <- true if _, err := m.session.Destroy(context.TODO()); err != nil { log.Printf("Error destroying session %d: %s", m.session.Id, err) } @@ -442,7 +442,7 @@ type mcuJanusClient struct { handle *JanusHandle handleId uint64 - closeChan chan bool + closeChan chan struct{} deferred chan func() handleEvent func(event *janus.EventMsg) @@ -474,7 +474,7 @@ func (c *mcuJanusClient) SendMessage(ctx context.Context, message *MessageClient func (c *mcuJanusClient) closeClient(ctx context.Context) bool { if handle := c.handle; handle != nil { c.handle = nil - c.closeChan <- true + close(c.closeChan) if _, err := handle.Detach(ctx); err != nil { if e, ok := err.(*janus.ErrorMsg); !ok || e.Err.Code != JANUS_ERROR_HANDLE_NOT_FOUND { log.Println("Could not detach client", handle.Id, err) @@ -486,7 +486,7 @@ func (c *mcuJanusClient) closeClient(ctx context.Context) bool { return false } -func (c *mcuJanusClient) run(handle *JanusHandle, closeChan chan bool) { +func (c *mcuJanusClient) run(handle *JanusHandle, closeChan <-chan struct{}) { loop: for { select { @@ -807,7 +807,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st handle: handle, handleId: handle.Id, - closeChan: make(chan bool, 1), + closeChan: make(chan struct{}, 1), deferred: make(chan func(), 64), }, id: id, @@ -1047,7 +1047,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ handle: handle, handleId: handle.Id, - closeChan: make(chan bool, 1), + closeChan: make(chan struct{}, 1), deferred: make(chan func(), 64), }, publisher: publisher, @@ -1209,7 +1209,7 @@ retry: p.roomId = pub.roomId p.sid = strconv.FormatUint(handle.Id, 10) p.listener.SubscriberSidUpdated(p) - p.closeChan = make(chan bool, 1) + p.closeChan = make(chan struct{}, 1) go p.run(p.handle, p.closeChan) log.Printf("Already connected subscriber %d for %s, leaving and re-joining on handle %d", p.id, p.streamType, p.handleId) goto retry diff --git a/mcu_proxy.go b/mcu_proxy.go index 3e1e9cd..a0d03fa 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -1115,8 +1115,8 @@ type mcuProxy struct { proxyTimeout time.Duration dnsDiscovery bool - stopping chan bool - stopped chan bool + stopping chan struct{} + stopped chan struct{} maxStreamBitrate int maxScreenBitrate int @@ -1184,8 +1184,8 @@ func NewMcuProxy(config *goconf.ConfigFile, etcdClient *EtcdClient, rpcClients * connectionsMap: make(map[string][]*mcuProxyConnection), proxyTimeout: proxyTimeout, - stopping: make(chan bool, 1), - stopped: make(chan bool, 1), + stopping: make(chan struct{}, 1), + stopped: make(chan struct{}, 1), maxStreamBitrate: maxStreamBitrate, maxScreenBitrate: maxScreenBitrate, @@ -1303,7 +1303,7 @@ func (m *mcuProxy) Stop() { } if m.urlType == proxyUrlTypeStatic && m.dnsDiscovery { - m.stopping <- true + m.stopping <- struct{}{} <-m.stopped } } @@ -1316,7 +1316,7 @@ func (m *mcuProxy) monitorProxyIPs() { case <-ticker.C: m.updateProxyIPs() case <-m.stopping: - m.stopped <- true + m.stopped <- struct{}{} return } } @@ -1408,7 +1408,7 @@ func (m *mcuProxy) configureStatic(config *goconf.ConfigFile, fromReload bool) e dnsDiscovery, _ := config.GetBool("mcu", "dnsdiscovery") if dnsDiscovery != m.dnsDiscovery { if !dnsDiscovery && fromReload { - m.stopping <- true + m.stopping <- struct{}{} <-m.stopped } m.dnsDiscovery = dnsDiscovery diff --git a/natsclient_test.go b/natsclient_test.go index b72d291..cc6cbfa 100644 --- a/natsclient_test.go +++ b/natsclient_test.go @@ -61,14 +61,15 @@ func testNatsClient_Subscribe(t *testing.T, client NatsClient) { if err != nil { t.Fatal(err) } - ch := make(chan bool) + ch := make(chan struct{}) received := int32(0) max := int32(20) - ready := make(chan bool) - quit := make(chan bool) + ready := make(chan struct{}) + quit := make(chan struct{}) + defer close(quit) go func() { - ready <- true + close(ready) for { select { case <-dest: @@ -79,7 +80,7 @@ func testNatsClient_Subscribe(t *testing.T, client NatsClient) { t.Errorf("Unsubscribe failed with err: %s", err) return } - ch <- true + close(ch) } case <-quit: return @@ -101,7 +102,6 @@ func testNatsClient_Subscribe(t *testing.T, client NatsClient) { if r != max { t.Fatalf("Received wrong # of messages: %d vs %d", r, max) } - quit <- true } func TestNatsClient_Subscribe(t *testing.T) { diff --git a/proxy/proxy_server.go b/proxy/proxy_server.go index e057b59..75c58b1 100644 --- a/proxy/proxy_server.go +++ b/proxy/proxy_server.go @@ -92,7 +92,7 @@ type ProxyServer struct { mcu signaling.Mcu stopped uint32 - shutdownChannel chan bool + shutdownChannel chan struct{} shutdownScheduled uint32 upgrader websocket.Upgrader @@ -167,7 +167,7 @@ func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile) (* version: version, country: country, - shutdownChannel: make(chan bool, 1), + shutdownChannel: make(chan struct{}), upgrader: websocket.Upgrader{ ReadBufferSize: websocketReadBufferSize, @@ -359,7 +359,7 @@ func (s *ProxyServer) Stop() { s.tokens.Close() } -func (s *ProxyServer) ShutdownChannel() chan bool { +func (s *ProxyServer) ShutdownChannel() <-chan struct{} { return s.shutdownChannel } @@ -379,9 +379,7 @@ func (s *ProxyServer) ScheduleShutdown() { }) if s.GetClientCount() == 0 { - go func() { - s.shutdownChannel <- true - }() + go close(s.shutdownChannel) } } @@ -957,9 +955,7 @@ func (s *ProxyServer) DeleteClient(id string, client signaling.McuClient) bool { delete(s.clientIds, client.Id()) if len(s.clients) == 0 && atomic.LoadUint32(&s.shutdownScheduled) != 0 { - go func() { - s.shutdownChannel <- true - }() + go close(s.shutdownChannel) } return true }