From 8353cbbb0f2cf60a303c681c852e61dfe32a7811 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 19 Jan 2023 14:51:37 +0100 Subject: [PATCH] Migrate to closer helper class. --- client.go | 8 ++++---- hub.go | 15 +++++---------- janus_client.go | 31 ++++++++++++++----------------- mcu_proxy.go | 16 ++++++++-------- room.go | 19 ++++++++----------- room_ping.go | 13 +++++-------- 6 files changed, 44 insertions(+), 58 deletions(-) diff --git a/client.go b/client.go index 434f138..a45a0f9 100644 --- a/client.go +++ b/client.go @@ -105,7 +105,7 @@ type Client struct { mu sync.Mutex - closeChan chan struct{} + closer *Closer closeOnce sync.Once messagesDone chan struct{} messageChan chan *bytes.Buffer @@ -137,7 +137,7 @@ func NewClient(conn *websocket.Conn, remoteAddress string, agent string) (*Clien func (c *Client) SetConn(conn *websocket.Conn, remoteAddress string) { c.conn = conn c.addr = remoteAddress - c.closeChan = make(chan struct{}) + c.closer = NewCloser() c.messageChan = make(chan *bytes.Buffer, 16) c.messagesDone = make(chan struct{}) c.OnLookupCountry = func(client *Client) string { return unknownCountry } @@ -204,7 +204,7 @@ func (c *Client) doClose() { } } else if closed == 2 { // Both the read pump and message processing must be finished before closing. - close(c.closeChan) + c.closer.Close() <-c.messagesDone c.OnClosed(c) @@ -480,7 +480,7 @@ func (c *Client) WritePump() { if !c.sendPing() { return } - case <-c.closeChan: + case <-c.closer.C: return } } diff --git a/hub.go b/hub.go index 5afb393..d0163f0 100644 --- a/hub.go +++ b/hub.go @@ -119,8 +119,7 @@ type Hub struct { infoInternal *WelcomeServerMessage welcome atomic.Value // *ServerMessage - stopped int32 - stopChan chan bool + closer *Closer readPumpActive uint32 writePumpActive uint32 @@ -314,7 +313,7 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer info: NewWelcomeServerMessage(version, DefaultFeatures...), infoInternal: NewWelcomeServerMessage(version, DefaultFeaturesInternal...), - stopChan: make(chan bool), + closer: NewCloser(), roomUpdated: make(chan *BackendServerRoomRequest), roomDeleted: make(chan *BackendServerRoomRequest), @@ -417,7 +416,7 @@ func (h *Hub) updateGeoDatabase() { defer atomic.CompareAndSwapInt32(&h.geoipUpdating, 1, 0) delay := time.Second - for atomic.LoadInt32(&h.stopped) == 0 { + for !h.closer.IsClosed() { err := h.geoip.Update() if err == nil { break @@ -458,7 +457,7 @@ loop: h.performHousekeeping(now) case <-geoipUpdater.C: go h.updateGeoDatabase() - case <-h.stopChan: + case <-h.closer.C: break loop } } @@ -468,11 +467,7 @@ loop: } func (h *Hub) Stop() { - atomic.StoreInt32(&h.stopped, 1) - select { - case h.stopChan <- true: - default: - } + h.closer.Close() } func (h *Hub) Reload(config *goconf.ConfigFile) { diff --git a/janus_client.go b/janus_client.go index e4cfeb4..5dc1991 100644 --- a/janus_client.go +++ b/janus_client.go @@ -172,7 +172,7 @@ func unexpected(request string) error { type transaction struct { ch chan interface{} incoming chan interface{} - quitChan chan bool + closer *Closer } func (t *transaction) run() { @@ -180,7 +180,7 @@ func (t *transaction) run() { select { case msg := <-t.incoming: t.ch <- msg - case <-t.quitChan: + case <-t.closer.C: return } } @@ -191,18 +191,14 @@ func (t *transaction) add(msg interface{}) { } func (t *transaction) quit() { - select { - case t.quitChan <- true: - default: - // Already scheduled to quit. - } + t.closer.Close() } func newTransaction() *transaction { t := &transaction{ ch: make(chan interface{}, 1), incoming: make(chan interface{}, 8), - quitChan: make(chan bool, 1), + closer: NewCloser(), } return t } @@ -239,7 +235,7 @@ type JanusGateway struct { conn *websocket.Conn transactions map[uint64]*transaction - closeChan chan bool + closer *Closer writeMu sync.Mutex } @@ -269,15 +265,16 @@ func NewJanusGateway(wsURL string, listener GatewayListener) (*JanusGateway, err return nil, err } - gateway := new(JanusGateway) - gateway.conn = conn - gateway.transactions = make(map[uint64]*transaction) - gateway.Sessions = make(map[uint64]*JanusSession) - gateway.closeChan = make(chan bool) if listener == nil { listener = new(dummyGatewayListener) } - gateway.listener = listener + gateway := &JanusGateway{ + conn: conn, + listener: listener, + transactions: make(map[uint64]*transaction), + Sessions: make(map[uint64]*JanusSession), + closer: NewCloser(), + } go gateway.ping() go gateway.recv() @@ -286,7 +283,7 @@ func NewJanusGateway(wsURL string, listener GatewayListener) (*JanusGateway, err // Close closes the underlying connection to the Gateway. func (gateway *JanusGateway) Close() error { - gateway.closeChan <- true + gateway.closer.Close() gateway.writeMu.Lock() if gateway.conn == nil { gateway.writeMu.Unlock() @@ -382,7 +379,7 @@ loop: if err != nil { log.Println("Error sending ping to MCU:", err) } - case <-gateway.closeChan: + case <-gateway.closer.C: break loop } } diff --git a/mcu_proxy.go b/mcu_proxy.go index 9c1047d..c3e22b7 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -305,8 +305,8 @@ type mcuProxyConnection struct { ip net.IP mu sync.Mutex - closeChan chan bool - closedChan chan bool + closer *Closer + closedDone *Closer closed uint32 conn *websocket.Conn @@ -344,8 +344,8 @@ func newMcuProxyConnection(proxy *mcuProxy, baseUrl string, ip net.IP) (*mcuProx rawUrl: baseUrl, url: parsed, ip: ip, - closeChan: make(chan bool, 1), - closedChan: make(chan bool, 1), + closer: NewCloser(), + closedDone: NewCloser(), reconnectInterval: int64(initialReconnectInterval), load: loadNotConnected, callbacks: make(map[string]func(*ProxyServerMessage)), @@ -433,7 +433,7 @@ func (c *mcuProxyConnection) readPump() { if atomic.LoadUint32(&c.closed) == 0 { c.scheduleReconnect() } else { - c.closedChan <- true + c.closedDone.Close() } }() defer c.close() @@ -515,7 +515,7 @@ func (c *mcuProxyConnection) writePump() { c.reconnect() case <-ticker.C: c.sendPing() - case <-c.closeChan: + case <-c.closer.C: return } } @@ -543,7 +543,7 @@ func (c *mcuProxyConnection) stop(ctx context.Context) { return } - c.closeChan <- true + c.closer.Close() if err := c.sendClose(); err != nil { if err != ErrNotConnected { log.Printf("Could not send close message to %s: %s", c, err) @@ -553,7 +553,7 @@ func (c *mcuProxyConnection) stop(ctx context.Context) { } select { - case <-c.closedChan: + case <-c.closedDone.C: case <-ctx.Done(): if err := ctx.Err(); err != nil { log.Printf("Error waiting for connection to %s get closed: %s", c, err) diff --git a/room.go b/room.go index a3f8e20..a4d0e09 100644 --- a/room.go +++ b/room.go @@ -67,9 +67,9 @@ type Room struct { properties *json.RawMessage - closeChan chan bool - mu *sync.RWMutex - sessions map[string]Session + closer *Closer + mu *sync.RWMutex + sessions map[string]Session internalSessions map[Session]bool virtualSessions map[*VirtualSession]bool @@ -104,9 +104,9 @@ func NewRoom(roomId string, properties *json.RawMessage, hub *Hub, events AsyncE properties: properties, - closeChan: make(chan bool, 1), - mu: &sync.RWMutex{}, - sessions: make(map[string]Session), + closer: NewCloser(), + mu: &sync.RWMutex{}, + sessions: make(map[string]Session), internalSessions: make(map[Session]bool), virtualSessions: make(map[*VirtualSession]bool), @@ -173,7 +173,7 @@ func (r *Room) run() { loop: for { select { - case <-r.closeChan: + case <-r.closer.C: break loop case <-ticker.C: r.publishActiveSessions() @@ -182,10 +182,7 @@ loop: } func (r *Room) doClose() { - select { - case r.closeChan <- true: - default: - } + r.closer.Close() } func (r *Room) unsubscribeBackend() { diff --git a/room_ping.go b/room_ping.go index 48c301a..c51cb91 100644 --- a/room_ping.go +++ b/room_ping.go @@ -63,8 +63,8 @@ func (e *pingEntries) RemoveRoom(room *Room) { // For that, all ping requests across rooms of enabled instances are combined // and sent out batched every "updateActiveSessionsInterval" seconds. type RoomPing struct { - mu sync.Mutex - closeChan chan bool + mu sync.Mutex + closer *Closer backend *BackendClient capabilities *Capabilities @@ -74,7 +74,7 @@ type RoomPing struct { func NewRoomPing(backend *BackendClient, capabilities *Capabilities) (*RoomPing, error) { result := &RoomPing{ - closeChan: make(chan bool, 1), + closer: NewCloser(), backend: backend, capabilities: capabilities, } @@ -87,10 +87,7 @@ func (p *RoomPing) Start() { } func (p *RoomPing) Stop() { - select { - case p.closeChan <- true: - default: - } + p.closer.Close() } func (p *RoomPing) run() { @@ -98,7 +95,7 @@ func (p *RoomPing) run() { loop: for { select { - case <-p.closeChan: + case <-p.closer.C: break loop case <-ticker.C: p.publishActiveSessions()