From fc43876cc591fb422943814efbb58bb8e2e68131 Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Fri, 1 Jan 2021 13:24:28 +0800 Subject: [PATCH] fix issue #317 (#331) --- core/executors/periodicalexecutor.go | 19 +++++++++++++++++-- core/executors/periodicalexecutor_test.go | 19 +++++++++++++++++++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/core/executors/periodicalexecutor.go b/core/executors/periodicalexecutor.go index aaf2a6d8..96a7aa26 100644 --- a/core/executors/periodicalexecutor.go +++ b/core/executors/periodicalexecutor.go @@ -134,8 +134,7 @@ func (pe *PeriodicalExecutor) backgroundFlush() { pe.guarded = false pe.lock.Unlock() - // flush again to avoid missing tasks - pe.Flush() + pe.cleanup() return } } @@ -143,6 +142,22 @@ func (pe *PeriodicalExecutor) backgroundFlush() { }) } +func (pe *PeriodicalExecutor) cleanup() { + // avoid deadlock in Add() + for { + select { + case vals := <-pe.commander: + pe.enterExecution() + pe.confirmChan <- lang.Placeholder + pe.executeTasks(vals) + default: + // flush again to avoid missing tasks + pe.Flush() + return + } + } +} + func (pe *PeriodicalExecutor) doneExecution() { pe.waitGroup.Done() } diff --git a/core/executors/periodicalexecutor_test.go b/core/executors/periodicalexecutor_test.go index ed25188f..80ce4a7b 100644 --- a/core/executors/periodicalexecutor_test.go +++ b/core/executors/periodicalexecutor_test.go @@ -140,6 +140,25 @@ func TestPeriodicalExecutor_WaitFast(t *testing.T) { assert.Equal(t, total, cnt) } +func TestPeriodicalExecutor_Deadlock(t *testing.T) { + ticker := timex.NewFakeTicker() + defer ticker.Stop() + + exec := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, nil)) + exec.newTicker = func(d time.Duration) timex.Ticker { + return ticker + } + ticker.Tick() + exec.lock.Lock() + exec.backgroundFlush() + time.Sleep(20 * time.Millisecond) + ticker.Tick() + exec.commander <- 1 + exec.lock.Unlock() + <-exec.confirmChan + exec.Wait() +} + // go test -benchtime 10s -bench . func BenchmarkExecutor(b *testing.B) { b.ReportAllocs()