Simplify stopping of deferred executor.

This commit is contained in:
Joachim Bauch 2023-01-19 12:33:52 +01:00
parent 15490b802a
commit dc55e7d5c8
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
2 changed files with 22 additions and 23 deletions

View file

@ -33,8 +33,7 @@ import (
// their order. // their order.
type DeferredExecutor struct { type DeferredExecutor struct {
queue chan func() queue chan func()
closeChan chan bool closed chan struct{}
closed chan bool
closeOnce sync.Once closeOnce sync.Once
} }
@ -43,28 +42,24 @@ func NewDeferredExecutor(queueSize int) *DeferredExecutor {
queueSize = 0 queueSize = 0
} }
result := &DeferredExecutor{ result := &DeferredExecutor{
queue: make(chan func(), queueSize), queue: make(chan func(), queueSize),
closeChan: make(chan bool, 1), closed: make(chan struct{}),
closed: make(chan bool, 1),
} }
go result.run() go result.run()
return result return result
} }
func (e *DeferredExecutor) run() { func (e *DeferredExecutor) run() {
loop: defer close(e.closed)
for { for {
select { f := <-e.queue
case f := <-e.queue: if f == nil {
if f == nil { break
break loop
}
f()
case <-e.closeChan:
break loop
} }
f()
} }
e.closed <- true
} }
func getFunctionName(i interface{}) string { func getFunctionName(i interface{}) string {
@ -83,14 +78,9 @@ func (e *DeferredExecutor) Execute(f func()) {
} }
func (e *DeferredExecutor) Close() { func (e *DeferredExecutor) Close() {
select { e.closeOnce.Do(func() {
case e.closeChan <- true: close(e.queue)
e.closeOnce.Do(func() { })
close(e.queue)
})
default:
// Already closed.
}
} }
func (e *DeferredExecutor) waitForStop() { func (e *DeferredExecutor) waitForStop() {

View file

@ -109,3 +109,12 @@ func TestDeferredExecutor_DeferAfterClose(t *testing.T) {
t.Error("method should not have been called") t.Error("method should not have been called")
}) })
} }
func TestDeferredExecutor_WaitForStopTwice(t *testing.T) {
e := NewDeferredExecutor(64)
defer e.waitForStop()
e.Close()
e.waitForStop()
}