From a8fb010333cb578b7ef2cba0e468de53739dd7ef Mon Sep 17 00:00:00 2001 From: kevin Date: Sat, 12 Sep 2020 17:13:24 +0800 Subject: [PATCH] drain pipe if reducer not drained --- core/mr/mapreduce.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/mr/mapreduce.go b/core/mr/mapreduce.go index 517c94b8..4c7e6a45 100644 --- a/core/mr/mapreduce.go +++ b/core/mr/mapreduce.go @@ -124,6 +124,7 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R } }() reducer(collector, writer, cancel) + drain(collector) }() go mapperDispatcher(mapper, source, collector, done.Done(), cancel, options.workers) @@ -140,6 +141,7 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error { _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) { reducer(input, cancel) + drain(input) // We need to write a placeholder to let MapReduce to continue on reducer done, // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce. writer.Write(lang.Placeholder)