From d643007c79aea8e78e57690e587bcff95a048639 Mon Sep 17 00:00:00 2001 From: weibobo Date: Sat, 2 Jan 2021 18:04:04 +0800 Subject: [PATCH] fix bug #317 (#335) * fix bug #317. * add counter for current task. If it's bigger then zero, do not quit background thread * Revert "fix issue #317 (#331)" This reverts commit fc43876cc591fb422943814efbb58bb8e2e68131. --- core/executors/periodicalexecutor.go | 38 +++++++++++------------ core/executors/periodicalexecutor_test.go | 27 +++++----------- 2 files changed, 26 insertions(+), 39 deletions(-) diff --git a/core/executors/periodicalexecutor.go b/core/executors/periodicalexecutor.go index 96a7aa26..be84c3b2 100644 --- a/core/executors/periodicalexecutor.go +++ b/core/executors/periodicalexecutor.go @@ -37,6 +37,7 @@ type ( confirmChan chan lang.PlaceholderType guarded bool newTicker func(duration time.Duration) timex.Ticker + currTask int lock sync.Mutex } ) @@ -103,7 +104,9 @@ func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) }() if pe.container.AddTask(task) { - return pe.container.RemoveAll(), true + vals := pe.container.RemoveAll() + pe.currTask++ + return vals, true } return nil, false @@ -122,6 +125,9 @@ func (pe *PeriodicalExecutor) backgroundFlush() { commanded = true pe.enterExecution() pe.confirmChan <- lang.Placeholder + pe.lock.Lock() + pe.currTask-- + pe.lock.Unlock() pe.executeTasks(vals) last = timex.Now() case <-ticker.Chan(): @@ -130,34 +136,26 @@ func (pe *PeriodicalExecutor) backgroundFlush() { } else if pe.Flush() { last = timex.Now() } else if timex.Since(last) > pe.interval*idleRound { + var exit bool = true pe.lock.Lock() - pe.guarded = false + if pe.currTask > 0 { + exit = false + } else { + pe.guarded = false + } pe.lock.Unlock() - pe.cleanup() - return + if exit { + // flush again to avoid missing tasks + pe.Flush() + return + } } } } }) } -func (pe *PeriodicalExecutor) cleanup() { - // avoid deadlock in Add() - for { - select { - case vals := <-pe.commander: - pe.enterExecution() - pe.confirmChan <- lang.Placeholder - pe.executeTasks(vals) - default: - // flush again to avoid missing tasks - pe.Flush() - return - } - } -} - func (pe *PeriodicalExecutor) doneExecution() { pe.waitGroup.Done() } diff --git a/core/executors/periodicalexecutor_test.go b/core/executors/periodicalexecutor_test.go index 80ce4a7b..66291057 100644 --- a/core/executors/periodicalexecutor_test.go +++ b/core/executors/periodicalexecutor_test.go @@ -140,25 +140,6 @@ func TestPeriodicalExecutor_WaitFast(t *testing.T) { assert.Equal(t, total, cnt) } -func TestPeriodicalExecutor_Deadlock(t *testing.T) { - ticker := timex.NewFakeTicker() - defer ticker.Stop() - - exec := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, nil)) - exec.newTicker = func(d time.Duration) timex.Ticker { - return ticker - } - ticker.Tick() - exec.lock.Lock() - exec.backgroundFlush() - time.Sleep(20 * time.Millisecond) - ticker.Tick() - exec.commander <- 1 - exec.lock.Unlock() - <-exec.confirmChan - exec.Wait() -} - // go test -benchtime 10s -bench . func BenchmarkExecutor(b *testing.B) { b.ReportAllocs() @@ -168,3 +149,11 @@ func BenchmarkExecutor(b *testing.B) { executor.Add(1) } } + +func TestPeriodicalExecutor_Deadlock(t *testing.T) { + executer := NewBulkExecutor(func(tasks []interface{}) { + }, WithBulkTasks(1), WithBulkInterval(time.Millisecond)) + for i := 0; i < 1e6; i++ { + executer.Add(1) + } +}