diff --git a/core/mr/mapreduce.go b/core/mr/mapreduce.go index e5f34947..83c3c4dd 100644 --- a/core/mr/mapreduce.go +++ b/core/mr/mapreduce.go @@ -112,6 +112,12 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R opts ...Option) (interface{}, error) { options := buildOptions(opts...) output := make(chan interface{}) + defer func() { + for range output { + panic("more than one element written in reducer") + } + }() + collector := make(chan interface{}, options.workers) done := syncx.NewDoneChan() writer := newGuardedWriter(output, done.Done()) diff --git a/core/mr/mapreduce_test.go b/core/mr/mapreduce_test.go index 9262b027..5c736c9c 100644 --- a/core/mr/mapreduce_test.go +++ b/core/mr/mapreduce_test.go @@ -202,6 +202,22 @@ func TestMapReduce(t *testing.T) { } } +func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) { + assert.Panics(t, func() { + MapReduce(func(source chan<- interface{}) { + for i := 0; i < 10; i++ { + source <- i + } + }, func(item interface{}, writer Writer, cancel func(error)) { + writer.Write(item) + }, func(pipe <-chan interface{}, writer Writer, cancel func(error)) { + drain(pipe) + writer.Write("one") + writer.Write("two") + }) + }) +} + func TestMapReduceVoid(t *testing.T) { var value uint32 tests := []struct {