diff --git a/core/executors/periodicalexecutor.go b/core/executors/periodicalexecutor.go index 2a2990c8..f797dfc2 100644 --- a/core/executors/periodicalexecutor.go +++ b/core/executors/periodicalexecutor.go @@ -134,9 +134,6 @@ func (pe *PeriodicalExecutor) backgroundFlush() { } else if pe.Flush() { last = timex.Now() } else if pe.shallQuit(last) { - pe.lock.Lock() - pe.guarded = false - pe.lock.Unlock() return } } @@ -180,8 +177,18 @@ func (pe *PeriodicalExecutor) hasTasks(tasks interface{}) bool { } } -func (pe *PeriodicalExecutor) shallQuit(last time.Duration) bool { - idleEnough := timex.Since(last) > pe.interval*idleRound - noPending := atomic.LoadInt32(&pe.inflight) == 0 - return idleEnough && noPending +func (pe *PeriodicalExecutor) shallQuit(last time.Duration) (stop bool) { + if timex.Since(last) <= pe.interval*idleRound { + return + } + + // checking pe.inflight and setting pe.guarded should be locked together + pe.lock.Lock() + if atomic.LoadInt32(&pe.inflight) == 0 { + pe.guarded = false + stop = true + } + pe.lock.Unlock() + + return }