|
|
@ -93,15 +93,12 @@ func (pe *PeriodicalExecutor) Wait() {
|
|
|
|
func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) {
|
|
|
|
func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) {
|
|
|
|
pe.lock.Lock()
|
|
|
|
pe.lock.Lock()
|
|
|
|
defer func() {
|
|
|
|
defer func() {
|
|
|
|
var start bool
|
|
|
|
|
|
|
|
if !pe.guarded {
|
|
|
|
if !pe.guarded {
|
|
|
|
pe.guarded = true
|
|
|
|
pe.guarded = true
|
|
|
|
start = true
|
|
|
|
// defer to unlock quickly
|
|
|
|
|
|
|
|
defer pe.backgroundFlush()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
pe.lock.Unlock()
|
|
|
|
pe.lock.Unlock()
|
|
|
|
if start {
|
|
|
|
|
|
|
|
pe.backgroundFlush()
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}()
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
|
|
if pe.container.AddTask(task) {
|
|
|
|
if pe.container.AddTask(task) {
|
|
|
@ -148,13 +145,11 @@ func (pe *PeriodicalExecutor) cleanup() (stop bool) {
|
|
|
|
pe.guarded = false
|
|
|
|
pe.guarded = false
|
|
|
|
if atomic.LoadInt32(&pe.inflight) == 0 {
|
|
|
|
if atomic.LoadInt32(&pe.inflight) == 0 {
|
|
|
|
stop = true
|
|
|
|
stop = true
|
|
|
|
}
|
|
|
|
// defer to unlock quickly
|
|
|
|
pe.lock.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if stop {
|
|
|
|
|
|
|
|
// flush again to avoid missing tasks
|
|
|
|
// flush again to avoid missing tasks
|
|
|
|
pe.Flush()
|
|
|
|
defer pe.Flush()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pe.lock.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|