From a87978568ae2352965509a76d37663b60042cc80 Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Mon, 10 May 2021 23:10:57 +0800 Subject: [PATCH] fix #676 (#682) --- core/mr/mapreduce.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/mr/mapreduce.go b/core/mr/mapreduce.go index 89ef834b..e5f34947 100644 --- a/core/mr/mapreduce.go +++ b/core/mr/mapreduce.go @@ -136,14 +136,16 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R go func() { defer func() { + drain(collector) + if r := recover(); r != nil { cancel(fmt.Errorf("%v", r)) } else { finish() } }() + reducer(collector, writer, cancel) - drain(collector) }() go executeMappers(func(item interface{}, w Writer) { @@ -165,7 +167,6 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error { _, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) { reducer(input, cancel) - drain(input) // We need to write a placeholder to let MapReduce to continue on reducer done, // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce. writer.Write(lang.Placeholder)