From d0f9e5702289b32b5d289ad2418d8b91b0807c6e Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Thu, 26 Aug 2021 16:47:28 +0800 Subject: [PATCH] fix #957 (#959) --- core/mr/mapreduce.go | 6 ++++++ core/mr/mapreduce_test.go | 16 ++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/core/mr/mapreduce.go b/core/mr/mapreduce.go index e5f34947..83c3c4dd 100644 --- a/core/mr/mapreduce.go +++ b/core/mr/mapreduce.go @@ -112,6 +112,12 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R opts ...Option) (interface{}, error) { options := buildOptions(opts...) output := make(chan interface{}) + defer func() { + for range output { + panic("more than one element written in reducer") + } + }() + collector := make(chan interface{}, options.workers) done := syncx.NewDoneChan() writer := newGuardedWriter(output, done.Done()) diff --git a/core/mr/mapreduce_test.go b/core/mr/mapreduce_test.go index 9262b027..5c736c9c 100644 --- a/core/mr/mapreduce_test.go +++ b/core/mr/mapreduce_test.go @@ -202,6 +202,22 @@ func TestMapReduce(t *testing.T) { } } +func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) { + assert.Panics(t, func() { + MapReduce(func(source chan<- interface{}) { + for i := 0; i < 10; i++ { + source <- i + } + }, func(item interface{}, writer Writer, cancel func(error)) { + writer.Write(item) + }, func(pipe <-chan interface{}, writer Writer, cancel func(error)) { + drain(pipe) + writer.Write("one") + writer.Write("two") + }) + }) +} + func TestMapReduceVoid(t *testing.T) { var value uint32 tests := []struct {