feat: mapreduce generic version (#2827)

* feat: mapreduce generic version

* fix: gateway mr type issue

---------

Co-authored-by: kevin.wan <kevin.wan@yijinin.com>
master
Kevin Wan 2 years ago committed by GitHub
parent 413ee919e6
commit 464ed51728
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -7,7 +7,6 @@ import (
"sync/atomic" "sync/atomic"
"github.com/zeromicro/go-zero/core/errorx" "github.com/zeromicro/go-zero/core/errorx"
"github.com/zeromicro/go-zero/core/lang"
) )
const ( const (
@ -24,30 +23,30 @@ var (
type ( type (
// ForEachFunc is used to do element processing, but no output. // ForEachFunc is used to do element processing, but no output.
ForEachFunc func(item any) ForEachFunc[T any] func(item T)
// GenerateFunc is used to let callers send elements into source. // GenerateFunc is used to let callers send elements into source.
GenerateFunc func(source chan<- any) GenerateFunc[T any] func(source chan<- T)
// MapFunc is used to do element processing and write the output to writer. // MapFunc is used to do element processing and write the output to writer.
MapFunc func(item any, writer Writer) MapFunc[T, U any] func(item T, writer Writer[U])
// MapperFunc is used to do element processing and write the output to writer, // MapperFunc is used to do element processing and write the output to writer,
// use cancel func to cancel the processing. // use cancel func to cancel the processing.
MapperFunc func(item any, writer Writer, cancel func(error)) MapperFunc[T, U any] func(item T, writer Writer[U], cancel func(error))
// ReducerFunc is used to reduce all the mapping output and write to writer, // ReducerFunc is used to reduce all the mapping output and write to writer,
// use cancel func to cancel the processing. // use cancel func to cancel the processing.
ReducerFunc func(pipe <-chan any, writer Writer, cancel func(error)) ReducerFunc[U, V any] func(pipe <-chan U, writer Writer[V], cancel func(error))
// VoidReducerFunc is used to reduce all the mapping output, but no output. // VoidReducerFunc is used to reduce all the mapping output, but no output.
// Use cancel func to cancel the processing. // Use cancel func to cancel the processing.
VoidReducerFunc func(pipe <-chan any, cancel func(error)) VoidReducerFunc[U any] func(pipe <-chan U, cancel func(error))
// Option defines the method to customize the mapreduce. // Option defines the method to customize the mapreduce.
Option func(opts *mapReduceOptions) Option func(opts *mapReduceOptions)
mapperContext struct { mapperContext[T, U any] struct {
ctx context.Context ctx context.Context
mapper MapFunc mapper MapFunc[T, U]
source <-chan any source <-chan T
panicChan *onceChan panicChan *onceChan
collector chan<- any collector chan<- U
doneChan <-chan lang.PlaceholderType doneChan <-chan struct{}
workers int workers int
} }
@ -57,8 +56,8 @@ type (
} }
// Writer interface wraps Write method. // Writer interface wraps Write method.
Writer interface { Writer[T any] interface {
Write(v any) Write(v T)
} }
) )
@ -68,12 +67,11 @@ func Finish(fns ...func() error) error {
return nil return nil
} }
return MapReduceVoid(func(source chan<- any) { return MapReduceVoid(func(source chan<- func() error) {
for _, fn := range fns { for _, fn := range fns {
source <- fn source <- fn
} }
}, func(item any, writer Writer, cancel func(error)) { }, func(fn func() error, writer Writer[any], cancel func(error)) {
fn := item.(func() error)
if err := fn(); err != nil { if err := fn(); err != nil {
cancel(err) cancel(err)
} }
@ -87,27 +85,26 @@ func FinishVoid(fns ...func()) {
return return
} }
ForEach(func(source chan<- any) { ForEach(func(source chan<- func()) {
for _, fn := range fns { for _, fn := range fns {
source <- fn source <- fn
} }
}, func(item any) { }, func(fn func()) {
fn := item.(func())
fn() fn()
}, WithWorkers(len(fns))) }, WithWorkers(len(fns)))
} }
// ForEach maps all elements from given generate but no output. // ForEach maps all elements from given generate but no output.
func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) { func ForEach[T any](generate GenerateFunc[T], mapper ForEachFunc[T], opts ...Option) {
options := buildOptions(opts...) options := buildOptions(opts...)
panicChan := &onceChan{channel: make(chan any)} panicChan := &onceChan{channel: make(chan any)}
source := buildSource(generate, panicChan) source := buildSource(generate, panicChan)
collector := make(chan any) collector := make(chan any)
done := make(chan lang.PlaceholderType) done := make(chan struct{})
go executeMappers(mapperContext{ go executeMappers(mapperContext[T, any]{
ctx: options.ctx, ctx: options.ctx,
mapper: func(item any, _ Writer) { mapper: func(item T, _ Writer[any]) {
mapper(item) mapper(item)
}, },
source: source, source: source,
@ -131,26 +128,26 @@ func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) {
// MapReduce maps all elements generated from given generate func, // MapReduce maps all elements generated from given generate func,
// and reduces the output elements with given reducer. // and reduces the output elements with given reducer.
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, func MapReduce[T, U, V any](generate GenerateFunc[T], mapper MapperFunc[T, U], reducer ReducerFunc[U, V],
opts ...Option) (any, error) { opts ...Option) (V, error) {
panicChan := &onceChan{channel: make(chan any)} panicChan := &onceChan{channel: make(chan any)}
source := buildSource(generate, panicChan) source := buildSource(generate, panicChan)
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...) return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
} }
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer. // MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
func MapReduceChan(source <-chan any, mapper MapperFunc, reducer ReducerFunc, func MapReduceChan[T, U, V any](source <-chan T, mapper MapperFunc[T, U], reducer ReducerFunc[U, V],
opts ...Option) (any, error) { opts ...Option) (V, error) {
panicChan := &onceChan{channel: make(chan any)} panicChan := &onceChan{channel: make(chan any)}
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...) return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
} }
// mapReduceWithPanicChan maps all elements from source, and reduce the output elements with given reducer. // mapReduceWithPanicChan maps all elements from source, and reduce the output elements with given reducer.
func mapReduceWithPanicChan(source <-chan any, panicChan *onceChan, mapper MapperFunc, func mapReduceWithPanicChan[T, U, V any](source <-chan T, panicChan *onceChan, mapper MapperFunc[T, U],
reducer ReducerFunc, opts ...Option) (any, error) { reducer ReducerFunc[U, V], opts ...Option) (val V, err error) {
options := buildOptions(opts...) options := buildOptions(opts...)
// output is used to write the final result // output is used to write the final result
output := make(chan any) output := make(chan V)
defer func() { defer func() {
// reducer can only write once, if more, panic // reducer can only write once, if more, panic
for range output { for range output {
@ -159,12 +156,12 @@ func mapReduceWithPanicChan(source <-chan any, panicChan *onceChan, mapper Mappe
}() }()
// collector is used to collect data from mapper, and consume in reducer // collector is used to collect data from mapper, and consume in reducer
collector := make(chan any, options.workers) collector := make(chan U, options.workers)
// if done is closed, all mappers and reducer should stop processing // if done is closed, all mappers and reducer should stop processing
done := make(chan lang.PlaceholderType) done := make(chan struct{})
writer := newGuardedWriter(options.ctx, output, done) writer := newGuardedWriter(options.ctx, output, done)
var closeOnce sync.Once var closeOnce sync.Once
// use atomic.Value to avoid data race // use atomic type to avoid data race
var retErr errorx.AtomicError var retErr errorx.AtomicError
finish := func() { finish := func() {
closeOnce.Do(func() { closeOnce.Do(func() {
@ -195,9 +192,9 @@ func mapReduceWithPanicChan(source <-chan any, panicChan *onceChan, mapper Mappe
reducer(collector, writer, cancel) reducer(collector, writer, cancel)
}() }()
go executeMappers(mapperContext{ go executeMappers(mapperContext[T, U]{
ctx: options.ctx, ctx: options.ctx,
mapper: func(item any, w Writer) { mapper: func(item T, w Writer[U]) {
mapper(item, w, cancel) mapper(item, w, cancel)
}, },
source: source, source: source,
@ -210,26 +207,29 @@ func mapReduceWithPanicChan(source <-chan any, panicChan *onceChan, mapper Mappe
select { select {
case <-options.ctx.Done(): case <-options.ctx.Done():
cancel(context.DeadlineExceeded) cancel(context.DeadlineExceeded)
return nil, context.DeadlineExceeded err = context.DeadlineExceeded
case v := <-panicChan.channel: case v := <-panicChan.channel:
// drain output here, otherwise for loop panic in defer // drain output here, otherwise for loop panic in defer
drain(output) drain(output)
panic(v) panic(v)
case v, ok := <-output: case v, ok := <-output:
if err := retErr.Load(); err != nil { if e := retErr.Load(); e != nil {
return nil, err err = e
} else if ok { } else if ok {
return v, nil val = v
} else { } else {
return nil, ErrReduceNoOutput err = ErrReduceNoOutput
} }
} }
return
} }
// MapReduceVoid maps all elements generated from given generate, // MapReduceVoid maps all elements generated from given generate,
// and reduce the output elements with given reducer. // and reduce the output elements with given reducer.
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error { func MapReduceVoid[T, U any](generate GenerateFunc[T], mapper MapperFunc[T, U],
_, err := MapReduce(generate, mapper, func(input <-chan any, writer Writer, cancel func(error)) { reducer VoidReducerFunc[U], opts ...Option) error {
_, err := MapReduce(generate, mapper, func(input <-chan U, writer Writer[any], cancel func(error)) {
reducer(input, cancel) reducer(input, cancel)
}, opts...) }, opts...)
if errors.Is(err, ErrReduceNoOutput) { if errors.Is(err, ErrReduceNoOutput) {
@ -266,8 +266,8 @@ func buildOptions(opts ...Option) *mapReduceOptions {
return options return options
} }
func buildSource(generate GenerateFunc, panicChan *onceChan) chan any { func buildSource[T any](generate GenerateFunc[T], panicChan *onceChan) chan T {
source := make(chan any) source := make(chan T)
go func() { go func() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
@ -283,13 +283,13 @@ func buildSource(generate GenerateFunc, panicChan *onceChan) chan any {
} }
// drain drains the channel. // drain drains the channel.
func drain(channel <-chan any) { func drain[T any](channel <-chan T) {
// drain the channel // drain the channel
for range channel { for range channel {
} }
} }
func executeMappers(mCtx mapperContext) { func executeMappers[T, U any](mCtx mapperContext[T, U]) {
var wg sync.WaitGroup var wg sync.WaitGroup
defer func() { defer func() {
wg.Wait() wg.Wait()
@ -298,7 +298,7 @@ func executeMappers(mCtx mapperContext) {
}() }()
var failed int32 var failed int32
pool := make(chan lang.PlaceholderType, mCtx.workers) pool := make(chan struct{}, mCtx.workers)
writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan) writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan)
for atomic.LoadInt32(&failed) == 0 { for atomic.LoadInt32(&failed) == 0 {
select { select {
@ -306,7 +306,7 @@ func executeMappers(mCtx mapperContext) {
return return
case <-mCtx.doneChan: case <-mCtx.doneChan:
return return
case pool <- lang.Placeholder: case pool <- struct{}{}:
item, ok := <-mCtx.source item, ok := <-mCtx.source
if !ok { if !ok {
<-pool <-pool
@ -346,22 +346,21 @@ func once(fn func(error)) func(error) {
} }
} }
type guardedWriter struct { type guardedWriter[T any] struct {
ctx context.Context ctx context.Context
channel chan<- any channel chan<- T
done <-chan lang.PlaceholderType done <-chan struct{}
} }
func newGuardedWriter(ctx context.Context, channel chan<- any, func newGuardedWriter[T any](ctx context.Context, channel chan<- T, done <-chan struct{}) guardedWriter[T] {
done <-chan lang.PlaceholderType) guardedWriter { return guardedWriter[T]{
return guardedWriter{
ctx: ctx, ctx: ctx,
channel: channel, channel: channel,
done: done, done: done,
} }
} }
func (gw guardedWriter) Write(v any) { func (gw guardedWriter[T]) Write(v T) {
select { select {
case <-gw.ctx.Done(): case <-gw.ctx.Done():
return return

@ -1,6 +1,3 @@
//go:build go1.18
// +build go1.18
package mr package mr
import ( import (
@ -18,9 +15,9 @@ import (
func FuzzMapReduce(f *testing.F) { func FuzzMapReduce(f *testing.F) {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
f.Add(uint(10), uint(runtime.NumCPU())) f.Add(int64(10), runtime.NumCPU())
f.Fuzz(func(t *testing.T, num, workers uint) { f.Fuzz(func(t *testing.T, n int64, workers int) {
n := int64(num)%5000 + 5000 n = n%5000 + 5000
genPanic := rand.Intn(100) == 0 genPanic := rand.Intn(100) == 0
mapperPanic := rand.Intn(100) == 0 mapperPanic := rand.Intn(100) == 0
reducerPanic := rand.Intn(100) == 0 reducerPanic := rand.Intn(100) == 0
@ -29,34 +26,33 @@ func FuzzMapReduce(f *testing.F) {
reducerIdx := rand.Int63n(n) reducerIdx := rand.Int63n(n)
squareSum := (n - 1) * n * (2*n - 1) / 6 squareSum := (n - 1) * n * (2*n - 1) / 6
fn := func() (any, error) { fn := func() (int64, error) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
return MapReduce(func(source chan<- any) { return MapReduce(func(source chan<- int64) {
for i := int64(0); i < n; i++ { for i := int64(0); i < n; i++ {
source <- i source <- i
if genPanic && i == genIdx { if genPanic && i == genIdx {
panic("foo") panic("foo")
} }
} }
}, func(item any, writer Writer, cancel func(error)) { }, func(v int64, writer Writer[int64], cancel func(error)) {
v := item.(int64)
if mapperPanic && v == mapperIdx { if mapperPanic && v == mapperIdx {
panic("bar") panic("bar")
} }
writer.Write(v * v) writer.Write(v * v)
}, func(pipe <-chan any, writer Writer, cancel func(error)) { }, func(pipe <-chan int64, writer Writer[int64], cancel func(error)) {
var idx int64 var idx int64
var total int64 var total int64
for v := range pipe { for v := range pipe {
if reducerPanic && idx == reducerIdx { if reducerPanic && idx == reducerIdx {
panic("baz") panic("baz")
} }
total += v.(int64) total += v
idx++ idx++
} }
writer.Write(total) writer.Write(total)
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2)) }, WithWorkers(workers%50+runtime.NumCPU()))
} }
if genPanic || mapperPanic || reducerPanic { if genPanic || mapperPanic || reducerPanic {
@ -72,7 +68,7 @@ func FuzzMapReduce(f *testing.F) {
} else { } else {
val, err := fn() val, err := fn()
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, squareSum, val.(int64)) assert.Equal(t, squareSum, val)
} }
}) })
} }

@ -54,28 +54,27 @@ func TestMapReduceRandom(t *testing.T) {
reducerIdx := rand.Int63n(n) reducerIdx := rand.Int63n(n)
squareSum := (n - 1) * n * (2*n - 1) / 6 squareSum := (n - 1) * n * (2*n - 1) / 6
fn := func() (any, error) { fn := func() (int64, error) {
return MapReduce(func(source chan<- any) { return MapReduce(func(source chan<- int64) {
for i := int64(0); i < n; i++ { for i := int64(0); i < n; i++ {
source <- i source <- i
if genPanic && i == genIdx { if genPanic && i == genIdx {
panic("foo") panic("foo")
} }
} }
}, func(item any, writer Writer, cancel func(error)) { }, func(v int64, writer Writer[int64], cancel func(error)) {
v := item.(int64)
if mapperPanic && v == mapperIdx { if mapperPanic && v == mapperIdx {
panic("bar") panic("bar")
} }
writer.Write(v * v) writer.Write(v * v)
}, func(pipe <-chan any, writer Writer, cancel func(error)) { }, func(pipe <-chan int64, writer Writer[int64], cancel func(error)) {
var idx int64 var idx int64
var total int64 var total int64
for v := range pipe { for v := range pipe {
if reducerPanic && idx == reducerIdx { if reducerPanic && idx == reducerIdx {
panic("baz") panic("baz")
} }
total += v.(int64) total += v
idx++ idx++
} }
writer.Write(total) writer.Write(total)
@ -95,7 +94,7 @@ func TestMapReduceRandom(t *testing.T) {
} else { } else {
val, err := fn() val, err := fn()
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, squareSum, val.(int64)) assert.Equal(t, squareSum, val)
} }
bar.Increment() bar.Increment()
}) })

@ -3,7 +3,7 @@ package mr
import ( import (
"context" "context"
"errors" "errors"
"io" "io/ioutil"
"log" "log"
"runtime" "runtime"
"sync/atomic" "sync/atomic"
@ -17,7 +17,7 @@ import (
var errDummy = errors.New("dummy") var errDummy = errors.New("dummy")
func init() { func init() {
log.SetOutput(io.Discard) log.SetOutput(ioutil.Discard)
} }
func TestFinish(t *testing.T) { func TestFinish(t *testing.T) {
@ -91,11 +91,11 @@ func TestForEach(t *testing.T) {
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
var count uint32 var count uint32
ForEach(func(source chan<- any) { ForEach(func(source chan<- int) {
for i := 0; i < tasks; i++ { for i := 0; i < tasks; i++ {
source <- i source <- i
} }
}, func(item any) { }, func(item int) {
atomic.AddUint32(&count, 1) atomic.AddUint32(&count, 1)
}, WithWorkers(-1)) }, WithWorkers(-1))
@ -106,12 +106,12 @@ func TestForEach(t *testing.T) {
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
var count uint32 var count uint32
ForEach(func(source chan<- any) { ForEach(func(source chan<- int) {
for i := 0; i < tasks; i++ { for i := 0; i < tasks; i++ {
source <- i source <- i
} }
}, func(item any) { }, func(item int) {
if item.(int)%2 == 0 { if item%2 == 0 {
atomic.AddUint32(&count, 1) atomic.AddUint32(&count, 1)
} }
}) })
@ -123,11 +123,11 @@ func TestForEach(t *testing.T) {
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
assert.PanicsWithValue(t, "foo", func() { assert.PanicsWithValue(t, "foo", func() {
ForEach(func(source chan<- any) { ForEach(func(source chan<- int) {
for i := 0; i < tasks; i++ { for i := 0; i < tasks; i++ {
source <- i source <- i
} }
}, func(item any) { }, func(item int) {
panic("foo") panic("foo")
}) })
}) })
@ -139,9 +139,9 @@ func TestGeneratePanic(t *testing.T) {
t.Run("all", func(t *testing.T) { t.Run("all", func(t *testing.T) {
assert.PanicsWithValue(t, "foo", func() { assert.PanicsWithValue(t, "foo", func() {
ForEach(func(source chan<- any) { ForEach(func(source chan<- int) {
panic("foo") panic("foo")
}, func(item any) { }, func(item int) {
}) })
}) })
}) })
@ -154,14 +154,14 @@ func TestMapperPanic(t *testing.T) {
var run int32 var run int32
t.Run("all", func(t *testing.T) { t.Run("all", func(t *testing.T) {
assert.PanicsWithValue(t, "foo", func() { assert.PanicsWithValue(t, "foo", func() {
_, _ = MapReduce(func(source chan<- any) { _, _ = MapReduce(func(source chan<- int) {
for i := 0; i < tasks; i++ { for i := 0; i < tasks; i++ {
source <- i source <- i
} }
}, func(item any, writer Writer, cancel func(error)) { }, func(item int, writer Writer[int], cancel func(error)) {
atomic.AddInt32(&run, 1) atomic.AddInt32(&run, 1)
panic("foo") panic("foo")
}, func(pipe <-chan any, writer Writer, cancel func(error)) { }, func(pipe <-chan int, writer Writer[int], cancel func(error)) {
}) })
}) })
assert.True(t, atomic.LoadInt32(&run) < tasks/2) assert.True(t, atomic.LoadInt32(&run) < tasks/2)
@ -173,10 +173,10 @@ func TestMapReduce(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
mapper MapperFunc mapper MapperFunc[int, int]
reducer ReducerFunc reducer ReducerFunc[int, int]
expectErr error expectErr error
expectValue any expectValue int
}{ }{
{ {
name: "simple", name: "simple",
@ -185,8 +185,7 @@ func TestMapReduce(t *testing.T) {
}, },
{ {
name: "cancel with error", name: "cancel with error",
mapper: func(item any, writer Writer, cancel func(error)) { mapper: func(v int, writer Writer[int], cancel func(error)) {
v := item.(int)
if v%3 == 0 { if v%3 == 0 {
cancel(errDummy) cancel(errDummy)
} }
@ -196,22 +195,20 @@ func TestMapReduce(t *testing.T) {
}, },
{ {
name: "cancel with nil", name: "cancel with nil",
mapper: func(item any, writer Writer, cancel func(error)) { mapper: func(v int, writer Writer[int], cancel func(error)) {
v := item.(int)
if v%3 == 0 { if v%3 == 0 {
cancel(nil) cancel(nil)
} }
writer.Write(v * v) writer.Write(v * v)
}, },
expectErr: ErrCancelWithNil, expectErr: ErrCancelWithNil,
expectValue: nil,
}, },
{ {
name: "cancel with more", name: "cancel with more",
reducer: func(pipe <-chan any, writer Writer, cancel func(error)) { reducer: func(pipe <-chan int, writer Writer[int], cancel func(error)) {
var result int var result int
for item := range pipe { for item := range pipe {
result += item.(int) result += item
if result > 10 { if result > 10 {
cancel(errDummy) cancel(errDummy)
} }
@ -226,21 +223,20 @@ func TestMapReduce(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
if test.mapper == nil { if test.mapper == nil {
test.mapper = func(item any, writer Writer, cancel func(error)) { test.mapper = func(v int, writer Writer[int], cancel func(error)) {
v := item.(int)
writer.Write(v * v) writer.Write(v * v)
} }
} }
if test.reducer == nil { if test.reducer == nil {
test.reducer = func(pipe <-chan any, writer Writer, cancel func(error)) { test.reducer = func(pipe <-chan int, writer Writer[int], cancel func(error)) {
var result int var result int
for item := range pipe { for item := range pipe {
result += item.(int) result += item
} }
writer.Write(result) writer.Write(result)
} }
} }
value, err := MapReduce(func(source chan<- any) { value, err := MapReduce(func(source chan<- int) {
for i := 1; i < 5; i++ { for i := 1; i < 5; i++ {
source <- i source <- i
} }
@ -256,22 +252,21 @@ func TestMapReduce(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
if test.mapper == nil { if test.mapper == nil {
test.mapper = func(item any, writer Writer, cancel func(error)) { test.mapper = func(v int, writer Writer[int], cancel func(error)) {
v := item.(int)
writer.Write(v * v) writer.Write(v * v)
} }
} }
if test.reducer == nil { if test.reducer == nil {
test.reducer = func(pipe <-chan any, writer Writer, cancel func(error)) { test.reducer = func(pipe <-chan int, writer Writer[int], cancel func(error)) {
var result int var result int
for item := range pipe { for item := range pipe {
result += item.(int) result += item
} }
writer.Write(result) writer.Write(result)
} }
} }
source := make(chan any) source := make(chan int)
go func() { go func() {
for i := 1; i < 5; i++ { for i := 1; i < 5; i++ {
source <- i source <- i
@ -291,13 +286,13 @@ func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
assert.Panics(t, func() { assert.Panics(t, func() {
MapReduce(func(source chan<- any) { MapReduce(func(source chan<- int) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
source <- i source <- i
} }
}, func(item any, writer Writer, cancel func(error)) { }, func(item int, writer Writer[int], cancel func(error)) {
writer.Write(item) writer.Write(item)
}, func(pipe <-chan any, writer Writer, cancel func(error)) { }, func(pipe <-chan int, writer Writer[string], cancel func(error)) {
drain(pipe) drain(pipe)
writer.Write("one") writer.Write("one")
writer.Write("two") writer.Write("two")
@ -311,8 +306,8 @@ func TestMapReduceVoid(t *testing.T) {
var value uint32 var value uint32
tests := []struct { tests := []struct {
name string name string
mapper MapperFunc mapper MapperFunc[int, int]
reducer VoidReducerFunc reducer VoidReducerFunc[int]
expectValue uint32 expectValue uint32
expectErr error expectErr error
}{ }{
@ -323,8 +318,7 @@ func TestMapReduceVoid(t *testing.T) {
}, },
{ {
name: "cancel with error", name: "cancel with error",
mapper: func(item any, writer Writer, cancel func(error)) { mapper: func(v int, writer Writer[int], cancel func(error)) {
v := item.(int)
if v%3 == 0 { if v%3 == 0 {
cancel(errDummy) cancel(errDummy)
} }
@ -334,8 +328,7 @@ func TestMapReduceVoid(t *testing.T) {
}, },
{ {
name: "cancel with nil", name: "cancel with nil",
mapper: func(item any, writer Writer, cancel func(error)) { mapper: func(v int, writer Writer[int], cancel func(error)) {
v := item.(int)
if v%3 == 0 { if v%3 == 0 {
cancel(nil) cancel(nil)
} }
@ -345,9 +338,9 @@ func TestMapReduceVoid(t *testing.T) {
}, },
{ {
name: "cancel with more", name: "cancel with more",
reducer: func(pipe <-chan any, cancel func(error)) { reducer: func(pipe <-chan int, cancel func(error)) {
for item := range pipe { for item := range pipe {
result := atomic.AddUint32(&value, uint32(item.(int))) result := atomic.AddUint32(&value, uint32(item))
if result > 10 { if result > 10 {
cancel(errDummy) cancel(errDummy)
} }
@ -362,19 +355,18 @@ func TestMapReduceVoid(t *testing.T) {
atomic.StoreUint32(&value, 0) atomic.StoreUint32(&value, 0)
if test.mapper == nil { if test.mapper == nil {
test.mapper = func(item any, writer Writer, cancel func(error)) { test.mapper = func(v int, writer Writer[int], cancel func(error)) {
v := item.(int)
writer.Write(v * v) writer.Write(v * v)
} }
} }
if test.reducer == nil { if test.reducer == nil {
test.reducer = func(pipe <-chan any, cancel func(error)) { test.reducer = func(pipe <-chan int, cancel func(error)) {
for item := range pipe { for item := range pipe {
atomic.AddUint32(&value, uint32(item.(int))) atomic.AddUint32(&value, uint32(item))
} }
} }
} }
err := MapReduceVoid(func(source chan<- any) { err := MapReduceVoid(func(source chan<- int) {
for i := 1; i < 5; i++ { for i := 1; i < 5; i++ {
source <- i source <- i
} }
@ -392,18 +384,17 @@ func TestMapReduceVoidWithDelay(t *testing.T) {
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
var result []int var result []int
err := MapReduceVoid(func(source chan<- any) { err := MapReduceVoid(func(source chan<- int) {
source <- 0 source <- 0
source <- 1 source <- 1
}, func(item any, writer Writer, cancel func(error)) { }, func(i int, writer Writer[int], cancel func(error)) {
i := item.(int)
if i == 0 { if i == 0 {
time.Sleep(time.Millisecond * 50) time.Sleep(time.Millisecond * 50)
} }
writer.Write(i) writer.Write(i)
}, func(pipe <-chan any, cancel func(error)) { }, func(pipe <-chan int, cancel func(error)) {
for item := range pipe { for item := range pipe {
i := item.(int) i := item
result = append(result, i) result = append(result, i)
} }
}) })
@ -417,13 +408,12 @@ func TestMapReducePanic(t *testing.T) {
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
assert.Panics(t, func() { assert.Panics(t, func() {
_, _ = MapReduce(func(source chan<- any) { _, _ = MapReduce(func(source chan<- int) {
source <- 0 source <- 0
source <- 1 source <- 1
}, func(item any, writer Writer, cancel func(error)) { }, func(i int, writer Writer[int], cancel func(error)) {
i := item.(int)
writer.Write(i) writer.Write(i)
}, func(pipe <-chan any, writer Writer, cancel func(error)) { }, func(pipe <-chan int, writer Writer[int], cancel func(error)) {
for range pipe { for range pipe {
panic("panic") panic("panic")
} }
@ -435,17 +425,16 @@ func TestMapReducePanicOnce(t *testing.T) {
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
assert.Panics(t, func() { assert.Panics(t, func() {
_, _ = MapReduce(func(source chan<- any) { _, _ = MapReduce(func(source chan<- int) {
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
source <- i source <- i
} }
}, func(item any, writer Writer, cancel func(error)) { }, func(i int, writer Writer[int], cancel func(error)) {
i := item.(int)
if i == 0 { if i == 0 {
panic("foo") panic("foo")
} }
writer.Write(i) writer.Write(i)
}, func(pipe <-chan any, writer Writer, cancel func(error)) { }, func(pipe <-chan int, writer Writer[int], cancel func(error)) {
for range pipe { for range pipe {
panic("bar") panic("bar")
} }
@ -457,12 +446,12 @@ func TestMapReducePanicBothMapperAndReducer(t *testing.T) {
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
assert.Panics(t, func() { assert.Panics(t, func() {
_, _ = MapReduce(func(source chan<- any) { _, _ = MapReduce(func(source chan<- int) {
source <- 0 source <- 0
source <- 1 source <- 1
}, func(item any, writer Writer, cancel func(error)) { }, func(item int, writer Writer[int], cancel func(error)) {
panic("foo") panic("foo")
}, func(pipe <-chan any, writer Writer, cancel func(error)) { }, func(pipe <-chan int, writer Writer[int], cancel func(error)) {
panic("bar") panic("bar")
}) })
}) })
@ -472,18 +461,17 @@ func TestMapReduceVoidCancel(t *testing.T) {
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
var result []int var result []int
err := MapReduceVoid(func(source chan<- any) { err := MapReduceVoid(func(source chan<- int) {
source <- 0 source <- 0
source <- 1 source <- 1
}, func(item any, writer Writer, cancel func(error)) { }, func(i int, writer Writer[int], cancel func(error)) {
i := item.(int)
if i == 1 { if i == 1 {
cancel(errors.New("anything")) cancel(errors.New("anything"))
} }
writer.Write(i) writer.Write(i)
}, func(pipe <-chan any, cancel func(error)) { }, func(pipe <-chan int, cancel func(error)) {
for item := range pipe { for item := range pipe {
i := item.(int) i := item
result = append(result, i) result = append(result, i)
} }
}) })
@ -496,21 +484,19 @@ func TestMapReduceVoidCancelWithRemains(t *testing.T) {
var done int32 var done int32
var result []int var result []int
err := MapReduceVoid(func(source chan<- any) { err := MapReduceVoid(func(source chan<- int) {
for i := 0; i < defaultWorkers*2; i++ { for i := 0; i < defaultWorkers*2; i++ {
source <- i source <- i
} }
atomic.AddInt32(&done, 1) atomic.AddInt32(&done, 1)
}, func(item any, writer Writer, cancel func(error)) { }, func(i int, writer Writer[int], cancel func(error)) {
i := item.(int)
if i == defaultWorkers/2 { if i == defaultWorkers/2 {
cancel(errors.New("anything")) cancel(errors.New("anything"))
} }
writer.Write(i) writer.Write(i)
}, func(pipe <-chan any, cancel func(error)) { }, func(pipe <-chan int, cancel func(error)) {
for item := range pipe { for item := range pipe {
i := item.(int) result = append(result, item)
result = append(result, i)
} }
}) })
assert.NotNil(t, err) assert.NotNil(t, err)
@ -522,18 +508,18 @@ func TestMapReduceWithoutReducerWrite(t *testing.T) {
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
uids := []int{1, 2, 3} uids := []int{1, 2, 3}
res, err := MapReduce(func(source chan<- any) { res, err := MapReduce(func(source chan<- int) {
for _, uid := range uids { for _, uid := range uids {
source <- uid source <- uid
} }
}, func(item any, writer Writer, cancel func(error)) { }, func(item int, writer Writer[int], cancel func(error)) {
writer.Write(item) writer.Write(item)
}, func(pipe <-chan any, writer Writer, cancel func(error)) { }, func(pipe <-chan int, writer Writer[int], cancel func(error)) {
drain(pipe) drain(pipe)
// not calling writer.Write(...), should not panic // not calling writer.Write(...), should not panic
}) })
assert.Equal(t, ErrReduceNoOutput, err) assert.Equal(t, ErrReduceNoOutput, err)
assert.Nil(t, res) assert.Equal(t, 0, res)
} }
func TestMapReduceVoidPanicInReducer(t *testing.T) { func TestMapReduceVoidPanicInReducer(t *testing.T) {
@ -542,15 +528,14 @@ func TestMapReduceVoidPanicInReducer(t *testing.T) {
const message = "foo" const message = "foo"
assert.Panics(t, func() { assert.Panics(t, func() {
var done int32 var done int32
_ = MapReduceVoid(func(source chan<- any) { _ = MapReduceVoid(func(source chan<- int) {
for i := 0; i < defaultWorkers*2; i++ { for i := 0; i < defaultWorkers*2; i++ {
source <- i source <- i
} }
atomic.AddInt32(&done, 1) atomic.AddInt32(&done, 1)
}, func(item any, writer Writer, cancel func(error)) { }, func(i int, writer Writer[int], cancel func(error)) {
i := item.(int)
writer.Write(i) writer.Write(i)
}, func(pipe <-chan any, cancel func(error)) { }, func(pipe <-chan int, cancel func(error)) {
panic(message) panic(message)
}, WithWorkers(1)) }, WithWorkers(1))
}) })
@ -561,13 +546,12 @@ func TestForEachWithContext(t *testing.T) {
var done int32 var done int32
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
ForEach(func(source chan<- any) { ForEach(func(source chan<- int) {
for i := 0; i < defaultWorkers*2; i++ { for i := 0; i < defaultWorkers*2; i++ {
source <- i source <- i
} }
atomic.AddInt32(&done, 1) atomic.AddInt32(&done, 1)
}, func(item any) { }, func(i int) {
i := item.(int)
if i == defaultWorkers/2 { if i == defaultWorkers/2 {
cancel() cancel()
} }
@ -580,20 +564,19 @@ func TestMapReduceWithContext(t *testing.T) {
var done int32 var done int32
var result []int var result []int
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
err := MapReduceVoid(func(source chan<- any) { err := MapReduceVoid(func(source chan<- int) {
for i := 0; i < defaultWorkers*2; i++ { for i := 0; i < defaultWorkers*2; i++ {
source <- i source <- i
} }
atomic.AddInt32(&done, 1) atomic.AddInt32(&done, 1)
}, func(item any, writer Writer, c func(error)) { }, func(i int, writer Writer[int], c func(error)) {
i := item.(int)
if i == defaultWorkers/2 { if i == defaultWorkers/2 {
cancel() cancel()
} }
writer.Write(i) writer.Write(i)
}, func(pipe <-chan any, cancel func(error)) { }, func(pipe <-chan int, cancel func(error)) {
for item := range pipe { for item := range pipe {
i := item.(int) i := item
result = append(result, i) result = append(result, i)
} }
}, WithContext(ctx)) }, WithContext(ctx))
@ -604,19 +587,19 @@ func TestMapReduceWithContext(t *testing.T) {
func BenchmarkMapReduce(b *testing.B) { func BenchmarkMapReduce(b *testing.B) {
b.ReportAllocs() b.ReportAllocs()
mapper := func(v any, writer Writer, cancel func(error)) { mapper := func(v int64, writer Writer[int64], cancel func(error)) {
writer.Write(v.(int64) * v.(int64)) writer.Write(v * v)
} }
reducer := func(input <-chan any, writer Writer, cancel func(error)) { reducer := func(input <-chan int64, writer Writer[int64], cancel func(error)) {
var result int64 var result int64
for v := range input { for v := range input {
result += v.(int64) result += v
} }
writer.Write(result) writer.Write(result)
} }
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
MapReduce(func(input chan<- any) { MapReduce(func(input chan<- int64) {
for j := 0; j < 2; j++ { for j := 0; j < 2; j++ {
input <- int64(j) input <- int64(j)
} }

@ -58,20 +58,19 @@ import (
) )
func main() { func main() {
val, err := mr.MapReduce(func(source chan<- any) { val, err := mr.MapReduce(func(source chan<- int) {
// generator // generator
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
source <- i source <- i
} }
}, func(item any, writer mr.Writer, cancel func(error)) { }, func(i int, writer mr.Writer[int], cancel func(error)) {
// mapper // mapper
i := item.(int)
writer.Write(i * i) writer.Write(i * i)
}, func(pipe <-chan any, writer mr.Writer, cancel func(error)) { }, func(pipe <-chan int, writer mr.Writer[int], cancel func(error)) {
// reducer // reducer
var sum int var sum int
for i := range pipe { for i := range pipe {
sum += i.(int) sum += i
} }
writer.Write(sum) writer.Write(sum)
}) })

@ -59,20 +59,19 @@ import (
) )
func main() { func main() {
val, err := mr.MapReduce(func(source chan<- any) { val, err := mr.MapReduce(func(source chan<- int) {
// generator // generator
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
source <- i source <- i
} }
}, func(item any, writer mr.Writer, cancel func(error)) { }, func(i int, writer mr.Writer[int], cancel func(error)) {
// mapper // mapper
i := item.(int)
writer.Write(i * i) writer.Write(i * i)
}, func(pipe <-chan any, writer mr.Writer, cancel func(error)) { }, func(pipe <-chan int, writer mr.Writer[int], cancel func(error)) {
// reducer // reducer
var sum int var sum int
for i := range pipe { for i := range pipe {
sum += i.(int) sum += i
} }
writer.Write(sum) writer.Write(sum)
}) })

@ -63,12 +63,11 @@ func (s *Server) build() error {
return err return err
} }
return mr.MapReduceVoid(func(source chan<- any) { return mr.MapReduceVoid(func(source chan<- Upstream) {
for _, up := range s.upstreams { for _, up := range s.upstreams {
source <- up source <- up
} }
}, func(item any, writer mr.Writer, cancel func(error)) { }, func(up Upstream, writer mr.Writer[rest.Route], cancel func(error)) {
up := item.(Upstream)
cli := zrpc.MustNewClient(up.Grpc) cli := zrpc.MustNewClient(up.Grpc)
source, err := s.createDescriptorSource(cli, up) source, err := s.createDescriptorSource(cli, up)
if err != nil { if err != nil {
@ -109,9 +108,8 @@ func (s *Server) build() error {
Handler: s.buildHandler(source, resolver, cli, m.RpcPath), Handler: s.buildHandler(source, resolver, cli, m.RpcPath),
}) })
} }
}, func(pipe <-chan any, cancel func(error)) { }, func(pipe <-chan rest.Route, cancel func(error)) {
for item := range pipe { for route := range pipe {
route := item.(rest.Route)
s.Server.AddRoute(route) s.Server.AddRoute(route)
} }
}) })

@ -9,7 +9,7 @@ import (
"github.com/zeromicro/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/syncx"
) )
// defaultHealthManager is global comboHealthManager for byone self. // defaultHealthManager is global comboHealthManager.
var defaultHealthManager = newComboHealthManager() var defaultHealthManager = newComboHealthManager()
type ( type (

Loading…
Cancel
Save