diff --git a/core/mr/mapreduce.go b/core/mr/mapreduce.go index 523b2aac..49da2583 100644 --- a/core/mr/mapreduce.go +++ b/core/mr/mapreduce.go @@ -17,28 +17,42 @@ const ( ) var ( - ErrCancelWithNil = errors.New("mapreduce cancelled with nil") + // ErrCancelWithNil is an error that mapreduce was cancelled with nil. + ErrCancelWithNil = errors.New("mapreduce cancelled with nil") + // ErrReduceNoOutput is an error that reduce did not output a value. ErrReduceNoOutput = errors.New("reduce not writing value") ) type ( - GenerateFunc func(source chan<- interface{}) - MapFunc func(item interface{}, writer Writer) - VoidMapFunc func(item interface{}) - MapperFunc func(item interface{}, writer Writer, cancel func(error)) - ReducerFunc func(pipe <-chan interface{}, writer Writer, cancel func(error)) + // GenerateFunc is used to let callers send elements into source. + GenerateFunc func(source chan<- interface{}) + // MapFunc is used to do element processing and write the output to writer. + MapFunc func(item interface{}, writer Writer) + // VoidMapFunc is used to do element processing, but no output. + VoidMapFunc func(item interface{}) + // MapperFunc is used to do element processing and write the output to writer, + // use cancel func to cancel the processing. + MapperFunc func(item interface{}, writer Writer, cancel func(error)) + // ReducerFunc is used to reduce all the mapping output and write to writer, + // use cancel func to cancel the processing. + ReducerFunc func(pipe <-chan interface{}, writer Writer, cancel func(error)) + // VoidReducerFunc is used to reduce all the mapping output, but no output. + // Use cancel func to cancel the processing. VoidReducerFunc func(pipe <-chan interface{}, cancel func(error)) - Option func(opts *mapReduceOptions) + // Option defines the method to customize the mapreduce. + Option func(opts *mapReduceOptions) mapReduceOptions struct { workers int } + // Writer interface wraps Write method. Writer interface { Write(v interface{}) } ) +// Finish runs fns parallelly, cancelled on any error. func Finish(fns ...func() error) error { if len(fns) == 0 { return nil @@ -58,6 +72,7 @@ func Finish(fns ...func() error) error { }, WithWorkers(len(fns))) } +// FinishVoid runs fns parallelly. func FinishVoid(fns ...func()) { if len(fns) == 0 { return @@ -73,6 +88,7 @@ func FinishVoid(fns ...func()) { }, WithWorkers(len(fns))) } +// Map maps all elements generated from given generate func, and returns an output channel. func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{} { options := buildOptions(opts...) source := buildSource(generate) @@ -84,11 +100,14 @@ func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{} return collector } +// MapReduce maps all elements generated from given generate func, +// and reduces the output elemenets with given reducer. func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) { source := buildSource(generate) return MapReduceWithSource(source, mapper, reducer, opts...) } +// MapReduceWithSource maps all elements from source, and reduce the output elements with given reducer. func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) { options := buildOptions(opts...) @@ -141,8 +160,10 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R } } -func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error { - _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) { +// MapReduceVoid maps all elements generated from given generate, +// and reduce the output elements with given reducer. +func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error { + _, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) { reducer(input, cancel) drain(input) // We need to write a placeholder to let MapReduce to continue on reducer done, @@ -152,12 +173,14 @@ func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReduce return err } +// MapVoid maps all elements from given generate but no output. func MapVoid(generate GenerateFunc, mapper VoidMapFunc, opts ...Option) { drain(Map(generate, func(item interface{}, writer Writer) { mapper(item) }, opts...)) } +// WithWorkers customizes a mapreduce processing with given workers. func WithWorkers(workers int) Option { return func(opts *mapReduceOptions) { if workers < minWorkers {