diff --git a/core/executors/periodicalexecutor.go b/core/executors/periodicalexecutor.go index 649afe14..6722a657 100644 --- a/core/executors/periodicalexecutor.go +++ b/core/executors/periodicalexecutor.go @@ -116,7 +116,7 @@ func (pe *PeriodicalExecutor) addAndCheck(task any) (any, bool) { } func (pe *PeriodicalExecutor) backgroundFlush() { - threading.GoSafe(func() { + go func() { // flush before quit goroutine to avoid missing tasks defer pe.Flush() @@ -144,7 +144,7 @@ func (pe *PeriodicalExecutor) backgroundFlush() { } } } - }) + }() } func (pe *PeriodicalExecutor) doneExecution() { @@ -162,7 +162,9 @@ func (pe *PeriodicalExecutor) executeTasks(tasks any) bool { ok := pe.hasTasks(tasks) if ok { - pe.container.Execute(tasks) + threading.RunSafe(func() { + pe.container.Execute(tasks) + }) } return ok diff --git a/core/executors/periodicalexecutor_test.go b/core/executors/periodicalexecutor_test.go index 32f5bd10..60582c69 100644 --- a/core/executors/periodicalexecutor_test.go +++ b/core/executors/periodicalexecutor_test.go @@ -108,6 +108,64 @@ func TestPeriodicalExecutor_Bulk(t *testing.T) { lock.Unlock() } +func TestPeriodicalExecutor_Panic(t *testing.T) { + // avoid data race + var lock sync.Mutex + ticker := timex.NewFakeTicker() + + var ( + executedTasks []int + expected []int + ) + executor := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, func(tasks any) { + tt := tasks.([]int) + lock.Lock() + executedTasks = append(executedTasks, tt...) + lock.Unlock() + if tt[0] == 0 { + panic("test") + } + })) + executor.newTicker = func(duration time.Duration) timex.Ticker { + return ticker + } + for i := 0; i < 30; i++ { + executor.Add(i) + expected = append(expected, i) + } + ticker.Tick() + ticker.Tick() + time.Sleep(time.Millisecond) + lock.Lock() + assert.Equal(t, expected, executedTasks) + lock.Unlock() +} + +func TestPeriodicalExecutor_FlushPanic(t *testing.T) { + var ( + executedTasks []int + expected []int + lock sync.Mutex + ) + executor := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, func(tasks any) { + tt := tasks.([]int) + lock.Lock() + executedTasks = append(executedTasks, tt...) + lock.Unlock() + if tt[0] == 0 { + panic("flush panic") + } + })) + for i := 0; i < 8; i++ { + executor.Add(i) + expected = append(expected, i) + } + executor.Flush() + lock.Lock() + assert.Equal(t, expected, executedTasks) + lock.Unlock() +} + func TestPeriodicalExecutor_Wait(t *testing.T) { var lock sync.Mutex executer := NewBulkExecutor(func(tasks []any) { @@ -151,13 +209,7 @@ func TestPeriodicalExecutor_Deadlock(t *testing.T) { } func TestPeriodicalExecutor_hasTasks(t *testing.T) { - ticker := timex.NewFakeTicker() - defer ticker.Stop() - exec := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, nil)) - exec.newTicker = func(d time.Duration) timex.Ticker { - return ticker - } assert.False(t, exec.hasTasks(nil)) assert.True(t, exec.hasTasks(1)) }