From 9d9399ad1014c171cc9bd9c87f78b5d2ac238ce4 Mon Sep 17 00:00:00 2001 From: kevin Date: Fri, 14 Aug 2020 11:50:01 +0800 Subject: [PATCH] confirm addition after add called in periodical executor --- core/executors/periodicalexecutor.go | 32 ++++++++++++++++------- core/executors/periodicalexecutor_test.go | 20 ++++++++++++++ 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/core/executors/periodicalexecutor.go b/core/executors/periodicalexecutor.go index 7db54a5d..98bb7cb5 100644 --- a/core/executors/periodicalexecutor.go +++ b/core/executors/periodicalexecutor.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/tal-tech/go-zero/core/lang" "github.com/tal-tech/go-zero/core/proc" "github.com/tal-tech/go-zero/core/syncx" "github.com/tal-tech/go-zero/core/threading" @@ -32,19 +33,21 @@ type ( container TaskContainer waitGroup sync.WaitGroup // avoid race condition on waitGroup when calling wg.Add/Done/Wait(...) - wgBarrier syncx.Barrier - guarded bool - newTicker func(duration time.Duration) timex.Ticker - lock sync.Mutex + wgBarrier syncx.Barrier + confirmChan chan lang.PlaceholderType + guarded bool + newTicker func(duration time.Duration) timex.Ticker + lock sync.Mutex } ) func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *PeriodicalExecutor { executor := &PeriodicalExecutor{ // buffer 1 to let the caller go quickly - commander: make(chan interface{}, 1), - interval: interval, - container: container, + commander: make(chan interface{}, 1), + interval: interval, + container: container, + confirmChan: make(chan lang.PlaceholderType), newTicker: func(d time.Duration) timex.Ticker { return timex.NewTicker(interval) }, @@ -59,10 +62,12 @@ func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *Per func (pe *PeriodicalExecutor) Add(task interface{}) { if vals, ok := pe.addAndCheck(task); ok { pe.commander <- vals + <-pe.confirmChan } } func (pe *PeriodicalExecutor) Flush() bool { + pe.enterExecution() return pe.executeTasks(func() interface{} { pe.lock.Lock() defer pe.lock.Unlock() @@ -114,6 +119,8 @@ func (pe *PeriodicalExecutor) backgroundFlush() { select { case vals := <-pe.commander: commanded = true + pe.enterExecution() + pe.confirmChan <- lang.Placeholder pe.executeTasks(vals) last = timex.Now() case <-ticker.Chan(): @@ -135,11 +142,18 @@ func (pe *PeriodicalExecutor) backgroundFlush() { }) } -func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool { +func (pe *PeriodicalExecutor) doneExecution() { + pe.waitGroup.Done() +} + +func (pe *PeriodicalExecutor) enterExecution() { pe.wgBarrier.Guard(func() { pe.waitGroup.Add(1) }) - defer pe.waitGroup.Done() +} + +func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool { + defer pe.doneExecution() ok := pe.hasTasks(tasks) if ok { diff --git a/core/executors/periodicalexecutor_test.go b/core/executors/periodicalexecutor_test.go index f4eba517..ed25188f 100644 --- a/core/executors/periodicalexecutor_test.go +++ b/core/executors/periodicalexecutor_test.go @@ -120,6 +120,26 @@ func TestPeriodicalExecutor_Wait(t *testing.T) { executer.Wait() } +func TestPeriodicalExecutor_WaitFast(t *testing.T) { + const total = 3 + var cnt int + var lock sync.Mutex + executer := NewBulkExecutor(func(tasks []interface{}) { + defer func() { + cnt++ + }() + lock.Lock() + defer lock.Unlock() + time.Sleep(10 * time.Millisecond) + }, WithBulkTasks(1), WithBulkInterval(10*time.Millisecond)) + for i := 0; i < total; i++ { + executer.Add(2) + } + executer.Flush() + executer.Wait() + assert.Equal(t, total, cnt) +} + // go test -benchtime 10s -bench . func BenchmarkExecutor(b *testing.B) { b.ReportAllocs()