confirm addition after add called in periodical executor

master
kevin 4 years ago
parent e7dd04701c
commit 9d9399ad10

@ -5,6 +5,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/proc" "github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/syncx" "github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading" "github.com/tal-tech/go-zero/core/threading"
@ -32,19 +33,21 @@ type (
container TaskContainer container TaskContainer
waitGroup sync.WaitGroup waitGroup sync.WaitGroup
// avoid race condition on waitGroup when calling wg.Add/Done/Wait(...) // avoid race condition on waitGroup when calling wg.Add/Done/Wait(...)
wgBarrier syncx.Barrier wgBarrier syncx.Barrier
guarded bool confirmChan chan lang.PlaceholderType
newTicker func(duration time.Duration) timex.Ticker guarded bool
lock sync.Mutex newTicker func(duration time.Duration) timex.Ticker
lock sync.Mutex
} }
) )
func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *PeriodicalExecutor { func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
executor := &PeriodicalExecutor{ executor := &PeriodicalExecutor{
// buffer 1 to let the caller go quickly // buffer 1 to let the caller go quickly
commander: make(chan interface{}, 1), commander: make(chan interface{}, 1),
interval: interval, interval: interval,
container: container, container: container,
confirmChan: make(chan lang.PlaceholderType),
newTicker: func(d time.Duration) timex.Ticker { newTicker: func(d time.Duration) timex.Ticker {
return timex.NewTicker(interval) return timex.NewTicker(interval)
}, },
@ -59,10 +62,12 @@ func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *Per
func (pe *PeriodicalExecutor) Add(task interface{}) { func (pe *PeriodicalExecutor) Add(task interface{}) {
if vals, ok := pe.addAndCheck(task); ok { if vals, ok := pe.addAndCheck(task); ok {
pe.commander <- vals pe.commander <- vals
<-pe.confirmChan
} }
} }
func (pe *PeriodicalExecutor) Flush() bool { func (pe *PeriodicalExecutor) Flush() bool {
pe.enterExecution()
return pe.executeTasks(func() interface{} { return pe.executeTasks(func() interface{} {
pe.lock.Lock() pe.lock.Lock()
defer pe.lock.Unlock() defer pe.lock.Unlock()
@ -114,6 +119,8 @@ func (pe *PeriodicalExecutor) backgroundFlush() {
select { select {
case vals := <-pe.commander: case vals := <-pe.commander:
commanded = true commanded = true
pe.enterExecution()
pe.confirmChan <- lang.Placeholder
pe.executeTasks(vals) pe.executeTasks(vals)
last = timex.Now() last = timex.Now()
case <-ticker.Chan(): case <-ticker.Chan():
@ -135,11 +142,18 @@ func (pe *PeriodicalExecutor) backgroundFlush() {
}) })
} }
func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool { func (pe *PeriodicalExecutor) doneExecution() {
pe.waitGroup.Done()
}
func (pe *PeriodicalExecutor) enterExecution() {
pe.wgBarrier.Guard(func() { pe.wgBarrier.Guard(func() {
pe.waitGroup.Add(1) pe.waitGroup.Add(1)
}) })
defer pe.waitGroup.Done() }
func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool {
defer pe.doneExecution()
ok := pe.hasTasks(tasks) ok := pe.hasTasks(tasks)
if ok { if ok {

@ -120,6 +120,26 @@ func TestPeriodicalExecutor_Wait(t *testing.T) {
executer.Wait() executer.Wait()
} }
func TestPeriodicalExecutor_WaitFast(t *testing.T) {
const total = 3
var cnt int
var lock sync.Mutex
executer := NewBulkExecutor(func(tasks []interface{}) {
defer func() {
cnt++
}()
lock.Lock()
defer lock.Unlock()
time.Sleep(10 * time.Millisecond)
}, WithBulkTasks(1), WithBulkInterval(10*time.Millisecond))
for i := 0; i < total; i++ {
executer.Add(2)
}
executer.Flush()
executer.Wait()
assert.Equal(t, total, cnt)
}
// go test -benchtime 10s -bench . // go test -benchtime 10s -bench .
func BenchmarkExecutor(b *testing.B) { func BenchmarkExecutor(b *testing.B) {
b.ReportAllocs() b.ReportAllocs()

Loading…
Cancel
Save