Migrate to closer helper class.

This commit is contained in:
Joachim Bauch 2023-01-19 14:51:37 +01:00
parent e6b2d1e0aa
commit 8353cbbb0f
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
6 changed files with 44 additions and 58 deletions

View file

@ -105,7 +105,7 @@ type Client struct {
mu sync.Mutex mu sync.Mutex
closeChan chan struct{} closer *Closer
closeOnce sync.Once closeOnce sync.Once
messagesDone chan struct{} messagesDone chan struct{}
messageChan chan *bytes.Buffer messageChan chan *bytes.Buffer
@ -137,7 +137,7 @@ func NewClient(conn *websocket.Conn, remoteAddress string, agent string) (*Clien
func (c *Client) SetConn(conn *websocket.Conn, remoteAddress string) { func (c *Client) SetConn(conn *websocket.Conn, remoteAddress string) {
c.conn = conn c.conn = conn
c.addr = remoteAddress c.addr = remoteAddress
c.closeChan = make(chan struct{}) c.closer = NewCloser()
c.messageChan = make(chan *bytes.Buffer, 16) c.messageChan = make(chan *bytes.Buffer, 16)
c.messagesDone = make(chan struct{}) c.messagesDone = make(chan struct{})
c.OnLookupCountry = func(client *Client) string { return unknownCountry } c.OnLookupCountry = func(client *Client) string { return unknownCountry }
@ -204,7 +204,7 @@ func (c *Client) doClose() {
} }
} else if closed == 2 { } else if closed == 2 {
// Both the read pump and message processing must be finished before closing. // Both the read pump and message processing must be finished before closing.
close(c.closeChan) c.closer.Close()
<-c.messagesDone <-c.messagesDone
c.OnClosed(c) c.OnClosed(c)
@ -480,7 +480,7 @@ func (c *Client) WritePump() {
if !c.sendPing() { if !c.sendPing() {
return return
} }
case <-c.closeChan: case <-c.closer.C:
return return
} }
} }

15
hub.go
View file

@ -119,8 +119,7 @@ type Hub struct {
infoInternal *WelcomeServerMessage infoInternal *WelcomeServerMessage
welcome atomic.Value // *ServerMessage welcome atomic.Value // *ServerMessage
stopped int32 closer *Closer
stopChan chan bool
readPumpActive uint32 readPumpActive uint32
writePumpActive uint32 writePumpActive uint32
@ -314,7 +313,7 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer
info: NewWelcomeServerMessage(version, DefaultFeatures...), info: NewWelcomeServerMessage(version, DefaultFeatures...),
infoInternal: NewWelcomeServerMessage(version, DefaultFeaturesInternal...), infoInternal: NewWelcomeServerMessage(version, DefaultFeaturesInternal...),
stopChan: make(chan bool), closer: NewCloser(),
roomUpdated: make(chan *BackendServerRoomRequest), roomUpdated: make(chan *BackendServerRoomRequest),
roomDeleted: make(chan *BackendServerRoomRequest), roomDeleted: make(chan *BackendServerRoomRequest),
@ -417,7 +416,7 @@ func (h *Hub) updateGeoDatabase() {
defer atomic.CompareAndSwapInt32(&h.geoipUpdating, 1, 0) defer atomic.CompareAndSwapInt32(&h.geoipUpdating, 1, 0)
delay := time.Second delay := time.Second
for atomic.LoadInt32(&h.stopped) == 0 { for !h.closer.IsClosed() {
err := h.geoip.Update() err := h.geoip.Update()
if err == nil { if err == nil {
break break
@ -458,7 +457,7 @@ loop:
h.performHousekeeping(now) h.performHousekeeping(now)
case <-geoipUpdater.C: case <-geoipUpdater.C:
go h.updateGeoDatabase() go h.updateGeoDatabase()
case <-h.stopChan: case <-h.closer.C:
break loop break loop
} }
} }
@ -468,11 +467,7 @@ loop:
} }
func (h *Hub) Stop() { func (h *Hub) Stop() {
atomic.StoreInt32(&h.stopped, 1) h.closer.Close()
select {
case h.stopChan <- true:
default:
}
} }
func (h *Hub) Reload(config *goconf.ConfigFile) { func (h *Hub) Reload(config *goconf.ConfigFile) {

View file

@ -172,7 +172,7 @@ func unexpected(request string) error {
type transaction struct { type transaction struct {
ch chan interface{} ch chan interface{}
incoming chan interface{} incoming chan interface{}
quitChan chan bool closer *Closer
} }
func (t *transaction) run() { func (t *transaction) run() {
@ -180,7 +180,7 @@ func (t *transaction) run() {
select { select {
case msg := <-t.incoming: case msg := <-t.incoming:
t.ch <- msg t.ch <- msg
case <-t.quitChan: case <-t.closer.C:
return return
} }
} }
@ -191,18 +191,14 @@ func (t *transaction) add(msg interface{}) {
} }
func (t *transaction) quit() { func (t *transaction) quit() {
select { t.closer.Close()
case t.quitChan <- true:
default:
// Already scheduled to quit.
}
} }
func newTransaction() *transaction { func newTransaction() *transaction {
t := &transaction{ t := &transaction{
ch: make(chan interface{}, 1), ch: make(chan interface{}, 1),
incoming: make(chan interface{}, 8), incoming: make(chan interface{}, 8),
quitChan: make(chan bool, 1), closer: NewCloser(),
} }
return t return t
} }
@ -239,7 +235,7 @@ type JanusGateway struct {
conn *websocket.Conn conn *websocket.Conn
transactions map[uint64]*transaction transactions map[uint64]*transaction
closeChan chan bool closer *Closer
writeMu sync.Mutex writeMu sync.Mutex
} }
@ -269,15 +265,16 @@ func NewJanusGateway(wsURL string, listener GatewayListener) (*JanusGateway, err
return nil, err return nil, err
} }
gateway := new(JanusGateway)
gateway.conn = conn
gateway.transactions = make(map[uint64]*transaction)
gateway.Sessions = make(map[uint64]*JanusSession)
gateway.closeChan = make(chan bool)
if listener == nil { if listener == nil {
listener = new(dummyGatewayListener) listener = new(dummyGatewayListener)
} }
gateway.listener = listener gateway := &JanusGateway{
conn: conn,
listener: listener,
transactions: make(map[uint64]*transaction),
Sessions: make(map[uint64]*JanusSession),
closer: NewCloser(),
}
go gateway.ping() go gateway.ping()
go gateway.recv() go gateway.recv()
@ -286,7 +283,7 @@ func NewJanusGateway(wsURL string, listener GatewayListener) (*JanusGateway, err
// Close closes the underlying connection to the Gateway. // Close closes the underlying connection to the Gateway.
func (gateway *JanusGateway) Close() error { func (gateway *JanusGateway) Close() error {
gateway.closeChan <- true gateway.closer.Close()
gateway.writeMu.Lock() gateway.writeMu.Lock()
if gateway.conn == nil { if gateway.conn == nil {
gateway.writeMu.Unlock() gateway.writeMu.Unlock()
@ -382,7 +379,7 @@ loop:
if err != nil { if err != nil {
log.Println("Error sending ping to MCU:", err) log.Println("Error sending ping to MCU:", err)
} }
case <-gateway.closeChan: case <-gateway.closer.C:
break loop break loop
} }
} }

View file

@ -305,8 +305,8 @@ type mcuProxyConnection struct {
ip net.IP ip net.IP
mu sync.Mutex mu sync.Mutex
closeChan chan bool closer *Closer
closedChan chan bool closedDone *Closer
closed uint32 closed uint32
conn *websocket.Conn conn *websocket.Conn
@ -344,8 +344,8 @@ func newMcuProxyConnection(proxy *mcuProxy, baseUrl string, ip net.IP) (*mcuProx
rawUrl: baseUrl, rawUrl: baseUrl,
url: parsed, url: parsed,
ip: ip, ip: ip,
closeChan: make(chan bool, 1), closer: NewCloser(),
closedChan: make(chan bool, 1), closedDone: NewCloser(),
reconnectInterval: int64(initialReconnectInterval), reconnectInterval: int64(initialReconnectInterval),
load: loadNotConnected, load: loadNotConnected,
callbacks: make(map[string]func(*ProxyServerMessage)), callbacks: make(map[string]func(*ProxyServerMessage)),
@ -433,7 +433,7 @@ func (c *mcuProxyConnection) readPump() {
if atomic.LoadUint32(&c.closed) == 0 { if atomic.LoadUint32(&c.closed) == 0 {
c.scheduleReconnect() c.scheduleReconnect()
} else { } else {
c.closedChan <- true c.closedDone.Close()
} }
}() }()
defer c.close() defer c.close()
@ -515,7 +515,7 @@ func (c *mcuProxyConnection) writePump() {
c.reconnect() c.reconnect()
case <-ticker.C: case <-ticker.C:
c.sendPing() c.sendPing()
case <-c.closeChan: case <-c.closer.C:
return return
} }
} }
@ -543,7 +543,7 @@ func (c *mcuProxyConnection) stop(ctx context.Context) {
return return
} }
c.closeChan <- true c.closer.Close()
if err := c.sendClose(); err != nil { if err := c.sendClose(); err != nil {
if err != ErrNotConnected { if err != ErrNotConnected {
log.Printf("Could not send close message to %s: %s", c, err) log.Printf("Could not send close message to %s: %s", c, err)
@ -553,7 +553,7 @@ func (c *mcuProxyConnection) stop(ctx context.Context) {
} }
select { select {
case <-c.closedChan: case <-c.closedDone.C:
case <-ctx.Done(): case <-ctx.Done():
if err := ctx.Err(); err != nil { if err := ctx.Err(); err != nil {
log.Printf("Error waiting for connection to %s get closed: %s", c, err) log.Printf("Error waiting for connection to %s get closed: %s", c, err)

19
room.go
View file

@ -67,9 +67,9 @@ type Room struct {
properties *json.RawMessage properties *json.RawMessage
closeChan chan bool closer *Closer
mu *sync.RWMutex mu *sync.RWMutex
sessions map[string]Session sessions map[string]Session
internalSessions map[Session]bool internalSessions map[Session]bool
virtualSessions map[*VirtualSession]bool virtualSessions map[*VirtualSession]bool
@ -104,9 +104,9 @@ func NewRoom(roomId string, properties *json.RawMessage, hub *Hub, events AsyncE
properties: properties, properties: properties,
closeChan: make(chan bool, 1), closer: NewCloser(),
mu: &sync.RWMutex{}, mu: &sync.RWMutex{},
sessions: make(map[string]Session), sessions: make(map[string]Session),
internalSessions: make(map[Session]bool), internalSessions: make(map[Session]bool),
virtualSessions: make(map[*VirtualSession]bool), virtualSessions: make(map[*VirtualSession]bool),
@ -173,7 +173,7 @@ func (r *Room) run() {
loop: loop:
for { for {
select { select {
case <-r.closeChan: case <-r.closer.C:
break loop break loop
case <-ticker.C: case <-ticker.C:
r.publishActiveSessions() r.publishActiveSessions()
@ -182,10 +182,7 @@ loop:
} }
func (r *Room) doClose() { func (r *Room) doClose() {
select { r.closer.Close()
case r.closeChan <- true:
default:
}
} }
func (r *Room) unsubscribeBackend() { func (r *Room) unsubscribeBackend() {

View file

@ -63,8 +63,8 @@ func (e *pingEntries) RemoveRoom(room *Room) {
// For that, all ping requests across rooms of enabled instances are combined // For that, all ping requests across rooms of enabled instances are combined
// and sent out batched every "updateActiveSessionsInterval" seconds. // and sent out batched every "updateActiveSessionsInterval" seconds.
type RoomPing struct { type RoomPing struct {
mu sync.Mutex mu sync.Mutex
closeChan chan bool closer *Closer
backend *BackendClient backend *BackendClient
capabilities *Capabilities capabilities *Capabilities
@ -74,7 +74,7 @@ type RoomPing struct {
func NewRoomPing(backend *BackendClient, capabilities *Capabilities) (*RoomPing, error) { func NewRoomPing(backend *BackendClient, capabilities *Capabilities) (*RoomPing, error) {
result := &RoomPing{ result := &RoomPing{
closeChan: make(chan bool, 1), closer: NewCloser(),
backend: backend, backend: backend,
capabilities: capabilities, capabilities: capabilities,
} }
@ -87,10 +87,7 @@ func (p *RoomPing) Start() {
} }
func (p *RoomPing) Stop() { func (p *RoomPing) Stop() {
select { p.closer.Close()
case p.closeChan <- true:
default:
}
} }
func (p *RoomPing) run() { func (p *RoomPing) run() {
@ -98,7 +95,7 @@ func (p *RoomPing) run() {
loop: loop:
for { for {
select { select {
case <-p.closeChan: case <-p.closer.C:
break loop break loop
case <-ticker.C: case <-ticker.C:
p.publishActiveSessions() p.publishActiveSessions()