From ea4f2af67fa7794c7d67996e379481eaf3214edb Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Mon, 10 Jan 2022 22:06:10 +0800 Subject: [PATCH] fix: mr goroutine leak on context deadline (#1433) * fix: mr goroutine leak on context deadline * test: update fx test check --- core/fx/stream_test.go | 6 ++---- core/mr/mapreduce.go | 19 ++++++++++++------- core/mr/mapreduce_test.go | 37 ++++++++++++++++++++++++++++++++++++- go.mod | 1 + go.sum | 3 +++ 5 files changed, 54 insertions(+), 12 deletions(-) diff --git a/core/fx/stream_test.go b/core/fx/stream_test.go index 3eecae65..4d5976b2 100644 --- a/core/fx/stream_test.go +++ b/core/fx/stream_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/zeromicro/go-zero/core/stringx" + "go.uber.org/goleak" ) func TestBuffer(t *testing.T) { @@ -563,9 +564,6 @@ func equal(t *testing.T, stream Stream, data []interface{}) { } func runCheckedTest(t *testing.T, fn func(t *testing.T)) { - goroutines := runtime.NumGoroutine() + defer goleak.VerifyNone(t) fn(t) - // let scheduler schedule first - time.Sleep(time.Millisecond) - assert.True(t, runtime.NumGoroutine() <= goroutines) } diff --git a/core/mr/mapreduce.go b/core/mr/mapreduce.go index c12e6330..70ea52d5 100644 --- a/core/mr/mapreduce.go +++ b/core/mr/mapreduce.go @@ -160,13 +160,18 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R mapper(item, w, cancel) }, source, collector, done, options.workers) - value, ok := <-output - if err := retErr.Load(); err != nil { - return nil, err - } else if ok { - return value, nil - } else { - return nil, ErrReduceNoOutput + select { + case <-options.ctx.Done(): + cancel(context.DeadlineExceeded) + return nil, context.DeadlineExceeded + case value, ok := <-output: + if err := retErr.Load(); err != nil { + return nil, err + } else if ok { + return value, nil + } else { + return nil, ErrReduceNoOutput + } } } diff --git a/core/mr/mapreduce_test.go b/core/mr/mapreduce_test.go index a9f6b4ff..324f4ad8 100644 --- a/core/mr/mapreduce_test.go +++ b/core/mr/mapreduce_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/zeromicro/go-zero/core/stringx" "github.com/zeromicro/go-zero/core/syncx" + "go.uber.org/goleak" ) var errDummy = errors.New("dummy") @@ -22,6 +23,8 @@ func init() { } func TestFinish(t *testing.T) { + defer goleak.VerifyNone(t) + var total uint32 err := Finish(func() error { atomic.AddUint32(&total, 2) @@ -39,14 +42,20 @@ func TestFinish(t *testing.T) { } func TestFinishNone(t *testing.T) { + defer goleak.VerifyNone(t) + assert.Nil(t, Finish()) } func TestFinishVoidNone(t *testing.T) { + defer goleak.VerifyNone(t) + FinishVoid() } func TestFinishErr(t *testing.T) { + defer goleak.VerifyNone(t) + var total uint32 err := Finish(func() error { atomic.AddUint32(&total, 2) @@ -63,6 +72,8 @@ func TestFinishErr(t *testing.T) { } func TestFinishVoid(t *testing.T) { + defer goleak.VerifyNone(t) + var total uint32 FinishVoid(func() { atomic.AddUint32(&total, 2) @@ -76,6 +87,8 @@ func TestFinishVoid(t *testing.T) { } func TestMap(t *testing.T) { + defer goleak.VerifyNone(t) + tests := []struct { mapper MapFunc expect int @@ -128,6 +141,8 @@ func TestMap(t *testing.T) { } func TestMapReduce(t *testing.T) { + defer goleak.VerifyNone(t) + tests := []struct { mapper MapperFunc reducer ReducerFunc @@ -204,6 +219,8 @@ func TestMapReduce(t *testing.T) { } func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) { + defer goleak.VerifyNone(t) + assert.Panics(t, func() { MapReduce(func(source chan<- interface{}) { for i := 0; i < 10; i++ { @@ -220,6 +237,8 @@ func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) { } func TestMapReduceVoid(t *testing.T) { + defer goleak.VerifyNone(t) + var value uint32 tests := []struct { mapper MapperFunc @@ -296,6 +315,8 @@ func TestMapReduceVoid(t *testing.T) { } func TestMapReduceVoidWithDelay(t *testing.T) { + defer goleak.VerifyNone(t) + var result []int err := MapReduceVoid(func(source chan<- interface{}) { source <- 0 @@ -319,6 +340,8 @@ func TestMapReduceVoidWithDelay(t *testing.T) { } func TestMapVoid(t *testing.T) { + defer goleak.VerifyNone(t) + const tasks = 1000 var count uint32 MapVoid(func(source chan<- interface{}) { @@ -333,6 +356,8 @@ func TestMapVoid(t *testing.T) { } func TestMapReducePanic(t *testing.T) { + defer goleak.VerifyNone(t) + v, err := MapReduce(func(source chan<- interface{}) { source <- 0 source <- 1 @@ -350,6 +375,8 @@ func TestMapReducePanic(t *testing.T) { } func TestMapReduceVoidCancel(t *testing.T) { + defer goleak.VerifyNone(t) + var result []int err := MapReduceVoid(func(source chan<- interface{}) { source <- 0 @@ -371,6 +398,8 @@ func TestMapReduceVoidCancel(t *testing.T) { } func TestMapReduceVoidCancelWithRemains(t *testing.T) { + defer goleak.VerifyNone(t) + var done syncx.AtomicBool var result []int err := MapReduceVoid(func(source chan<- interface{}) { @@ -396,6 +425,8 @@ func TestMapReduceVoidCancelWithRemains(t *testing.T) { } func TestMapReduceWithoutReducerWrite(t *testing.T) { + defer goleak.VerifyNone(t) + uids := []int{1, 2, 3} res, err := MapReduce(func(source chan<- interface{}) { for _, uid := range uids { @@ -412,6 +443,8 @@ func TestMapReduceWithoutReducerWrite(t *testing.T) { } func TestMapReduceVoidPanicInReducer(t *testing.T) { + defer goleak.VerifyNone(t) + const message = "foo" var done syncx.AtomicBool err := MapReduceVoid(func(source chan<- interface{}) { @@ -431,6 +464,8 @@ func TestMapReduceVoidPanicInReducer(t *testing.T) { } func TestMapReduceWithContext(t *testing.T) { + defer goleak.VerifyNone(t) + var done syncx.AtomicBool var result []int ctx, cancel := context.WithCancel(context.Background()) @@ -452,7 +487,7 @@ func TestMapReduceWithContext(t *testing.T) { } }, WithContext(ctx)) assert.NotNil(t, err) - assert.Equal(t, ErrReduceNoOutput, err) + assert.Equal(t, context.DeadlineExceeded, err) } func BenchmarkMapReduce(b *testing.B) { diff --git a/go.mod b/go.mod index 68455559..240efed3 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( go.opentelemetry.io/otel/sdk v1.1.0 go.opentelemetry.io/otel/trace v1.1.0 go.uber.org/automaxprocs v1.4.0 + go.uber.org/goleak v1.1.12 // indirect golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f // indirect golang.org/x/sys v0.0.0-20211106132015-ebca88c72f68 // indirect golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac diff --git a/go.sum b/go.sum index e6bd20bc..bdfa1808 100644 --- a/go.sum +++ b/go.sum @@ -410,6 +410,8 @@ go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.4.0 h1:CpDZl6aOlLhReez+8S3eEotD7Jx0Os++lemPlMULQP0= go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhWiR/+Q= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.17.0 h1:MTjgFu6ZLKvY6Pvaqk97GlxNBuMpV4Hy/3P6tRGlI2U= @@ -606,6 +608,7 @@ golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=