|
|
@ -134,9 +134,6 @@ func (pe *PeriodicalExecutor) backgroundFlush() {
|
|
|
|
} else if pe.Flush() {
|
|
|
|
} else if pe.Flush() {
|
|
|
|
last = timex.Now()
|
|
|
|
last = timex.Now()
|
|
|
|
} else if pe.shallQuit(last) {
|
|
|
|
} else if pe.shallQuit(last) {
|
|
|
|
pe.lock.Lock()
|
|
|
|
|
|
|
|
pe.guarded = false
|
|
|
|
|
|
|
|
pe.lock.Unlock()
|
|
|
|
|
|
|
|
return
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -180,8 +177,18 @@ func (pe *PeriodicalExecutor) hasTasks(tasks interface{}) bool {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (pe *PeriodicalExecutor) shallQuit(last time.Duration) bool {
|
|
|
|
func (pe *PeriodicalExecutor) shallQuit(last time.Duration) (stop bool) {
|
|
|
|
idleEnough := timex.Since(last) > pe.interval*idleRound
|
|
|
|
if timex.Since(last) <= pe.interval*idleRound {
|
|
|
|
noPending := atomic.LoadInt32(&pe.inflight) == 0
|
|
|
|
return
|
|
|
|
return idleEnough && noPending
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// checking pe.inflight and setting pe.guarded should be locked together
|
|
|
|
|
|
|
|
pe.lock.Lock()
|
|
|
|
|
|
|
|
if atomic.LoadInt32(&pe.inflight) == 0 {
|
|
|
|
|
|
|
|
pe.guarded = false
|
|
|
|
|
|
|
|
stop = true
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
pe.lock.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|