chore: simplify mapreduce (#1401)

master
Kevin Wan 3 years ago committed by GitHub
parent 8e9110cedf
commit fe1da14332
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -4,8 +4,8 @@ package lang
var Placeholder PlaceholderType var Placeholder PlaceholderType
type ( type (
// GenericType can be used to hold any type. // AnyType can be used to hold any type.
GenericType = interface{} AnyType = interface{}
// PlaceholderType represents a placeholder type. // PlaceholderType represents a placeholder type.
PlaceholderType = struct{} PlaceholderType = struct{}
) )

@ -8,7 +8,6 @@ import (
"github.com/tal-tech/go-zero/core/errorx" "github.com/tal-tech/go-zero/core/errorx"
"github.com/tal-tech/go-zero/core/lang" "github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading" "github.com/tal-tech/go-zero/core/threading"
) )
@ -95,9 +94,9 @@ func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{}
options := buildOptions(opts...) options := buildOptions(opts...)
source := buildSource(generate) source := buildSource(generate)
collector := make(chan interface{}, options.workers) collector := make(chan interface{}, options.workers)
done := syncx.NewDoneChan() done := make(chan lang.PlaceholderType)
go executeMappers(options.ctx, mapper, source, collector, done.Done(), options.workers) go executeMappers(options.ctx, mapper, source, collector, done, options.workers)
return collector return collector
} }
@ -122,13 +121,13 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
}() }()
collector := make(chan interface{}, options.workers) collector := make(chan interface{}, options.workers)
done := syncx.NewDoneChan() done := make(chan lang.PlaceholderType)
writer := newGuardedWriter(options.ctx, output, done.Done()) writer := newGuardedWriter(options.ctx, output, done)
var closeOnce sync.Once var closeOnce sync.Once
var retErr errorx.AtomicError var retErr errorx.AtomicError
finish := func() { finish := func() {
closeOnce.Do(func() { closeOnce.Do(func() {
done.Close() close(done)
close(output) close(output)
}) })
} }
@ -159,7 +158,7 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
go executeMappers(options.ctx, func(item interface{}, w Writer) { go executeMappers(options.ctx, func(item interface{}, w Writer) {
mapper(item, w, cancel) mapper(item, w, cancel)
}, source, collector, done.Done(), options.workers) }, source, collector, done, options.workers)
value, ok := <-output value, ok := <-output
if err := retErr.Load(); err != nil { if err := retErr.Load(); err != nil {

Loading…
Cancel
Save