feat: implement fx.NoneMatch, fx.First, fx.Last (#1402)

* chore: use workers from options in fx.unlimitedWalk

* feat: add fx.NoneMatch

* feat: add fx.First, fx.Last

* chore: add more comments

* docs: add mr readme
master
Kevin Wan 3 years ago committed by GitHub
parent fe1da14332
commit c19d2637ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -90,6 +90,7 @@ func Range(source <-chan interface{}) Stream {
func (s Stream) AllMach(predicate func(item interface{}) bool) bool { func (s Stream) AllMach(predicate func(item interface{}) bool) bool {
for item := range s.source { for item := range s.source {
if !predicate(item) { if !predicate(item) {
drain(s.source)
return false return false
} }
} }
@ -103,6 +104,7 @@ func (s Stream) AllMach(predicate func(item interface{}) bool) bool {
func (s Stream) AnyMach(predicate func(item interface{}) bool) bool { func (s Stream) AnyMach(predicate func(item interface{}) bool) bool {
for item := range s.source { for item := range s.source {
if predicate(item) { if predicate(item) {
drain(s.source)
return true return true
} }
} }
@ -186,8 +188,7 @@ func (s Stream) Distinct(fn KeyFunc) Stream {
// Done waits all upstreaming operations to be done. // Done waits all upstreaming operations to be done.
func (s Stream) Done() { func (s Stream) Done() {
for range s.source { drain(s.source)
}
} }
// Filter filters the items by the given FilterFunc. // Filter filters the items by the given FilterFunc.
@ -199,9 +200,22 @@ func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream {
}, opts...) }, opts...)
} }
// First returns the first item, nil if no items.
func (s Stream) First() interface{} {
for item := range s.source {
// make sure the former goroutine not block, and current func returns fast.
go drain(s.source)
return item
}
return nil
}
// ForAll handles the streaming elements from the source and no later streams. // ForAll handles the streaming elements from the source and no later streams.
func (s Stream) ForAll(fn ForAllFunc) { func (s Stream) ForAll(fn ForAllFunc) {
fn(s.source) fn(s.source)
// avoid goroutine leak on fn not consuming all items.
drain(s.source)
} }
// ForEach seals the Stream with the ForEachFunc on each item, no successive operations. // ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
@ -246,11 +260,14 @@ func (s Stream) Head(n int64) Stream {
} }
if n == 0 { if n == 0 {
// let successive method go ASAP even we have more items to skip // let successive method go ASAP even we have more items to skip
// why we don't just break the loop, because if breaks,
// this former goroutine will block forever, which will cause goroutine leak.
close(source) close(source)
// why we don't just break the loop, and drain to consume all items.
// because if breaks, this former goroutine will block forever,
// which will cause goroutine leak.
drain(s.source)
} }
} }
// not enough items in s.source, but we need to let successive method to go ASAP.
if n > 0 { if n > 0 {
close(source) close(source)
} }
@ -259,6 +276,13 @@ func (s Stream) Head(n int64) Stream {
return Range(source) return Range(source)
} }
// Last returns the last item, or nil if no items.
func (s Stream) Last() (item interface{}) {
for item = range s.source {
}
return
}
// Map converts each item to another corresponding item, which means it's a 1:1 model. // Map converts each item to another corresponding item, which means it's a 1:1 model.
func (s Stream) Map(fn MapFunc, opts ...Option) Stream { func (s Stream) Map(fn MapFunc, opts ...Option) Stream {
return s.Walk(func(item interface{}, pipe chan<- interface{}) { return s.Walk(func(item interface{}, pipe chan<- interface{}) {
@ -280,6 +304,20 @@ func (s Stream) Merge() Stream {
return Range(source) return Range(source)
} }
// NoneMatch returns whether all elements of this stream don't match the provided predicate.
// May not evaluate the predicate on all elements if not necessary for determining the result.
// If the stream is empty then true is returned and the predicate is not evaluated.
func (s Stream) NoneMatch(predicate func(item interface{}) bool) bool {
for item := range s.source {
if predicate(item) {
drain(s.source)
return false
}
}
return true
}
// Parallel applies the given ParallelFunc to each item concurrently with given number of workers. // Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
func (s Stream) Parallel(fn ParallelFunc, opts ...Option) { func (s Stream) Parallel(fn ParallelFunc, opts ...Option) {
s.Walk(func(item interface{}, pipe chan<- interface{}) { s.Walk(func(item interface{}, pipe chan<- interface{}) {
@ -411,15 +449,12 @@ func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
var wg sync.WaitGroup var wg sync.WaitGroup
pool := make(chan lang.PlaceholderType, option.workers) pool := make(chan lang.PlaceholderType, option.workers)
for { for item := range s.source {
// important, used in another goroutine
val := item
pool <- lang.Placeholder pool <- lang.Placeholder
item, ok := <-s.source
if !ok {
<-pool
break
}
wg.Add(1) wg.Add(1)
// better to safely run caller defined method // better to safely run caller defined method
threading.GoSafe(func() { threading.GoSafe(func() {
defer func() { defer func() {
@ -427,7 +462,7 @@ func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
<-pool <-pool
}() }()
fn(item, pipe) fn(val, pipe)
}) })
} }
@ -439,22 +474,19 @@ func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
} }
func (s Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream { func (s Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
pipe := make(chan interface{}, defaultWorkers) pipe := make(chan interface{}, option.workers)
go func() { go func() {
var wg sync.WaitGroup var wg sync.WaitGroup
for { for item := range s.source {
item, ok := <-s.source // important, used in another goroutine
if !ok { val := item
break
}
wg.Add(1) wg.Add(1)
// better to safely run caller defined method // better to safely run caller defined method
threading.GoSafe(func() { threading.GoSafe(func() {
defer wg.Done() defer wg.Done()
fn(item, pipe) fn(val, pipe)
}) })
} }
@ -465,14 +497,14 @@ func (s Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
return Range(pipe) return Range(pipe)
} }
// UnlimitedWorkers lets the caller to use as many workers as the tasks. // UnlimitedWorkers lets the caller use as many workers as the tasks.
func UnlimitedWorkers() Option { func UnlimitedWorkers() Option {
return func(opts *rxOptions) { return func(opts *rxOptions) {
opts.unlimitedWorkers = true opts.unlimitedWorkers = true
} }
} }
// WithWorkers lets the caller to customize the concurrent workers. // WithWorkers lets the caller customize the concurrent workers.
func WithWorkers(workers int) Option { func WithWorkers(workers int) Option {
return func(opts *rxOptions) { return func(opts *rxOptions) {
if workers < minWorkers { if workers < minWorkers {
@ -483,6 +515,7 @@ func WithWorkers(workers int) Option {
} }
} }
// buildOptions returns a rxOptions with given customizations.
func buildOptions(opts ...Option) *rxOptions { func buildOptions(opts ...Option) *rxOptions {
options := newOptions() options := newOptions()
for _, opt := range opts { for _, opt := range opts {
@ -492,6 +525,13 @@ func buildOptions(opts ...Option) *rxOptions {
return options return options
} }
// drain drains the given channel.
func drain(channel <-chan interface{}) {
for range channel {
}
}
// newOptions returns a default rxOptions.
func newOptions() *rxOptions { func newOptions() *rxOptions {
return &rxOptions{ return &rxOptions{
workers: defaultWorkers, workers: defaultWorkers,

@ -17,320 +17,489 @@ import (
) )
func TestBuffer(t *testing.T) { func TestBuffer(t *testing.T) {
const N = 5 runCheckedTest(t, func(t *testing.T) {
var count int32 const N = 5
var wait sync.WaitGroup var count int32
wait.Add(1) var wait sync.WaitGroup
From(func(source chan<- interface{}) { wait.Add(1)
ticker := time.NewTicker(10 * time.Millisecond) From(func(source chan<- interface{}) {
defer ticker.Stop() ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for i := 0; i < 2*N; i++ {
select { for i := 0; i < 2*N; i++ {
case source <- i: select {
atomic.AddInt32(&count, 1) case source <- i:
case <-ticker.C: atomic.AddInt32(&count, 1)
wait.Done() case <-ticker.C:
return wait.Done()
return
}
} }
} }).Buffer(N).ForAll(func(pipe <-chan interface{}) {
}).Buffer(N).ForAll(func(pipe <-chan interface{}) { wait.Wait()
wait.Wait() // why N+1, because take one more to wait for sending into the channel
// why N+1, because take one more to wait for sending into the channel assert.Equal(t, int32(N+1), atomic.LoadInt32(&count))
assert.Equal(t, int32(N+1), atomic.LoadInt32(&count)) })
}) })
} }
func TestBufferNegative(t *testing.T) { func TestBufferNegative(t *testing.T) {
var result int runCheckedTest(t, func(t *testing.T) {
Just(1, 2, 3, 4).Buffer(-1).Reduce(func(pipe <-chan interface{}) (interface{}, error) { var result int
for item := range pipe { Just(1, 2, 3, 4).Buffer(-1).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
result += item.(int) for item := range pipe {
} result += item.(int)
return result, nil }
return result, nil
})
assert.Equal(t, 10, result)
}) })
assert.Equal(t, 10, result)
} }
func TestCount(t *testing.T) { func TestCount(t *testing.T) {
tests := []struct { runCheckedTest(t, func(t *testing.T) {
name string tests := []struct {
elements []interface{} name string
}{ elements []interface{}
{ }{
name: "no elements with nil", {
}, name: "no elements with nil",
{ },
name: "no elements", {
elements: []interface{}{}, name: "no elements",
}, elements: []interface{}{},
{ },
name: "1 element", {
elements: []interface{}{1}, name: "1 element",
}, elements: []interface{}{1},
{ },
name: "multiple elements", {
elements: []interface{}{1, 2, 3}, name: "multiple elements",
}, elements: []interface{}{1, 2, 3},
} },
}
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
val := Just(test.elements...).Count() val := Just(test.elements...).Count()
assert.Equal(t, len(test.elements), val) assert.Equal(t, len(test.elements), val)
}) })
} }
})
} }
func TestDone(t *testing.T) { func TestDone(t *testing.T) {
var count int32 runCheckedTest(t, func(t *testing.T) {
Just(1, 2, 3).Walk(func(item interface{}, pipe chan<- interface{}) { var count int32
time.Sleep(time.Millisecond * 100) Just(1, 2, 3).Walk(func(item interface{}, pipe chan<- interface{}) {
atomic.AddInt32(&count, int32(item.(int))) time.Sleep(time.Millisecond * 100)
}).Done() atomic.AddInt32(&count, int32(item.(int)))
assert.Equal(t, int32(6), count) }).Done()
assert.Equal(t, int32(6), count)
})
} }
func TestJust(t *testing.T) { func TestJust(t *testing.T) {
var result int runCheckedTest(t, func(t *testing.T) {
Just(1, 2, 3, 4).Reduce(func(pipe <-chan interface{}) (interface{}, error) { var result int
for item := range pipe { Just(1, 2, 3, 4).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
result += item.(int) for item := range pipe {
} result += item.(int)
return result, nil }
return result, nil
})
assert.Equal(t, 10, result)
}) })
assert.Equal(t, 10, result)
} }
func TestDistinct(t *testing.T) { func TestDistinct(t *testing.T) {
var result int runCheckedTest(t, func(t *testing.T) {
Just(4, 1, 3, 2, 3, 4).Distinct(func(item interface{}) interface{} { var result int
return item Just(4, 1, 3, 2, 3, 4).Distinct(func(item interface{}) interface{} {
}).Reduce(func(pipe <-chan interface{}) (interface{}, error) { return item
for item := range pipe { }).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
result += item.(int) for item := range pipe {
} result += item.(int)
return result, nil }
return result, nil
})
assert.Equal(t, 10, result)
}) })
assert.Equal(t, 10, result)
} }
func TestFilter(t *testing.T) { func TestFilter(t *testing.T) {
var result int runCheckedTest(t, func(t *testing.T) {
Just(1, 2, 3, 4).Filter(func(item interface{}) bool { var result int
return item.(int)%2 == 0 Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
}).Reduce(func(pipe <-chan interface{}) (interface{}, error) { return item.(int)%2 == 0
for item := range pipe { }).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
result += item.(int) for item := range pipe {
} result += item.(int)
return result, nil }
return result, nil
})
assert.Equal(t, 6, result)
})
}
func TestFirst(t *testing.T) {
runCheckedTest(t, func(t *testing.T) {
assert.Nil(t, Just().First())
assert.Equal(t, "foo", Just("foo").First())
assert.Equal(t, "foo", Just("foo", "bar").First())
}) })
assert.Equal(t, 6, result)
} }
func TestForAll(t *testing.T) { func TestForAll(t *testing.T) {
var result int runCheckedTest(t, func(t *testing.T) {
Just(1, 2, 3, 4).Filter(func(item interface{}) bool { var result int
return item.(int)%2 == 0 Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
}).ForAll(func(pipe <-chan interface{}) { return item.(int)%2 == 0
for item := range pipe { }).ForAll(func(pipe <-chan interface{}) {
result += item.(int) for item := range pipe {
} result += item.(int)
}
})
assert.Equal(t, 6, result)
}) })
assert.Equal(t, 6, result)
} }
func TestGroup(t *testing.T) { func TestGroup(t *testing.T) {
var groups [][]int runCheckedTest(t, func(t *testing.T) {
Just(10, 11, 20, 21).Group(func(item interface{}) interface{} { var groups [][]int
v := item.(int) Just(10, 11, 20, 21).Group(func(item interface{}) interface{} {
return v / 10 v := item.(int)
}).ForEach(func(item interface{}) { return v / 10
v := item.([]interface{}) }).ForEach(func(item interface{}) {
var group []int v := item.([]interface{})
for _, each := range v { var group []int
group = append(group, each.(int)) for _, each := range v {
group = append(group, each.(int))
}
groups = append(groups, group)
})
assert.Equal(t, 2, len(groups))
for _, group := range groups {
assert.Equal(t, 2, len(group))
assert.True(t, group[0]/10 == group[1]/10)
} }
groups = append(groups, group)
}) })
assert.Equal(t, 2, len(groups))
for _, group := range groups {
assert.Equal(t, 2, len(group))
assert.True(t, group[0]/10 == group[1]/10)
}
} }
func TestHead(t *testing.T) { func TestHead(t *testing.T) {
var result int runCheckedTest(t, func(t *testing.T) {
Just(1, 2, 3, 4).Head(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) { var result int
for item := range pipe { Just(1, 2, 3, 4).Head(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
result += item.(int) for item := range pipe {
} result += item.(int)
return result, nil }
return result, nil
})
assert.Equal(t, 3, result)
}) })
assert.Equal(t, 3, result)
} }
func TestHeadZero(t *testing.T) { func TestHeadZero(t *testing.T) {
assert.Panics(t, func() { runCheckedTest(t, func(t *testing.T) {
Just(1, 2, 3, 4).Head(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) { assert.Panics(t, func() {
return nil, nil Just(1, 2, 3, 4).Head(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
return nil, nil
})
}) })
}) })
} }
func TestHeadMore(t *testing.T) { func TestHeadMore(t *testing.T) {
var result int runCheckedTest(t, func(t *testing.T) {
Just(1, 2, 3, 4).Head(6).Reduce(func(pipe <-chan interface{}) (interface{}, error) { var result int
for item := range pipe { Just(1, 2, 3, 4).Head(6).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
result += item.(int) for item := range pipe {
} result += item.(int)
return result, nil }
return result, nil
})
assert.Equal(t, 10, result)
})
}
func TestLast(t *testing.T) {
runCheckedTest(t, func(t *testing.T) {
goroutines := runtime.NumGoroutine()
assert.Nil(t, Just().Last())
assert.Equal(t, "foo", Just("foo").Last())
assert.Equal(t, "bar", Just("foo", "bar").Last())
// let scheduler schedule first
runtime.Gosched()
assert.Equal(t, goroutines, runtime.NumGoroutine())
}) })
assert.Equal(t, 10, result)
} }
func TestMap(t *testing.T) { func TestMap(t *testing.T) {
log.SetOutput(ioutil.Discard) runCheckedTest(t, func(t *testing.T) {
log.SetOutput(ioutil.Discard)
tests := []struct {
mapper MapFunc tests := []struct {
expect int mapper MapFunc
}{ expect int
{ }{
mapper: func(item interface{}) interface{} { {
v := item.(int) mapper: func(item interface{}) interface{} {
return v * v v := item.(int)
return v * v
},
expect: 30,
}, },
expect: 30, {
}, mapper: func(item interface{}) interface{} {
{ v := item.(int)
mapper: func(item interface{}) interface{} { if v%2 == 0 {
v := item.(int) return 0
if v%2 == 0 { }
return 0 return v * v
} },
return v * v expect: 10,
}, },
expect: 10, {
}, mapper: func(item interface{}) interface{} {
{ v := item.(int)
mapper: func(item interface{}) interface{} { if v%2 == 0 {
v := item.(int) panic(v)
if v%2 == 0 { }
panic(v) return v * v
} },
return v * v expect: 10,
}, },
expect: 10, }
},
}
// Map(...) works even WithWorkers(0) // Map(...) works even WithWorkers(0)
for i, test := range tests { for i, test := range tests {
t.Run(stringx.Rand(), func(t *testing.T) { t.Run(stringx.Rand(), func(t *testing.T) {
var result int var result int
var workers int var workers int
if i%2 == 0 { if i%2 == 0 {
workers = 0 workers = 0
} else { } else {
workers = runtime.NumCPU() workers = runtime.NumCPU()
}
From(func(source chan<- interface{}) {
for i := 1; i < 5; i++ {
source <- i
} }
}).Map(test.mapper, WithWorkers(workers)).Reduce( From(func(source chan<- interface{}) {
func(pipe <-chan interface{}) (interface{}, error) { for i := 1; i < 5; i++ {
for item := range pipe { source <- i
result += item.(int)
} }
return result, nil }).Map(test.mapper, WithWorkers(workers)).Reduce(
}) func(pipe <-chan interface{}) (interface{}, error) {
for item := range pipe {
assert.Equal(t, test.expect, result) result += item.(int)
}) }
} return result, nil
})
assert.Equal(t, test.expect, result)
})
}
})
} }
func TestMerge(t *testing.T) { func TestMerge(t *testing.T) {
Just(1, 2, 3, 4).Merge().ForEach(func(item interface{}) { runCheckedTest(t, func(t *testing.T) {
assert.ElementsMatch(t, []interface{}{1, 2, 3, 4}, item.([]interface{})) Just(1, 2, 3, 4).Merge().ForEach(func(item interface{}) {
assert.ElementsMatch(t, []interface{}{1, 2, 3, 4}, item.([]interface{}))
})
}) })
} }
func TestParallelJust(t *testing.T) { func TestParallelJust(t *testing.T) {
var count int32 runCheckedTest(t, func(t *testing.T) {
Just(1, 2, 3).Parallel(func(item interface{}) { var count int32
time.Sleep(time.Millisecond * 100) Just(1, 2, 3).Parallel(func(item interface{}) {
atomic.AddInt32(&count, int32(item.(int))) time.Sleep(time.Millisecond * 100)
}, UnlimitedWorkers()) atomic.AddInt32(&count, int32(item.(int)))
assert.Equal(t, int32(6), count) }, UnlimitedWorkers())
assert.Equal(t, int32(6), count)
})
} }
func TestReverse(t *testing.T) { func TestReverse(t *testing.T) {
Just(1, 2, 3, 4).Reverse().Merge().ForEach(func(item interface{}) { runCheckedTest(t, func(t *testing.T) {
assert.ElementsMatch(t, []interface{}{4, 3, 2, 1}, item.([]interface{})) Just(1, 2, 3, 4).Reverse().Merge().ForEach(func(item interface{}) {
assert.ElementsMatch(t, []interface{}{4, 3, 2, 1}, item.([]interface{}))
})
}) })
} }
func TestSort(t *testing.T) { func TestSort(t *testing.T) {
var prev int runCheckedTest(t, func(t *testing.T) {
Just(5, 3, 7, 1, 9, 6, 4, 8, 2).Sort(func(a, b interface{}) bool { var prev int
return a.(int) < b.(int) Just(5, 3, 7, 1, 9, 6, 4, 8, 2).Sort(func(a, b interface{}) bool {
}).ForEach(func(item interface{}) { return a.(int) < b.(int)
next := item.(int) }).ForEach(func(item interface{}) {
assert.True(t, prev < next) next := item.(int)
prev = next assert.True(t, prev < next)
prev = next
})
}) })
} }
func TestSplit(t *testing.T) { func TestSplit(t *testing.T) {
assert.Panics(t, func() { runCheckedTest(t, func(t *testing.T) {
Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(0).Done() assert.Panics(t, func() {
}) Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(0).Done()
var chunks [][]interface{} })
Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(4).ForEach(func(item interface{}) { var chunks [][]interface{}
chunk := item.([]interface{}) Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(4).ForEach(func(item interface{}) {
chunks = append(chunks, chunk) chunk := item.([]interface{})
chunks = append(chunks, chunk)
})
assert.EqualValues(t, [][]interface{}{
{1, 2, 3, 4},
{5, 6, 7, 8},
{9, 10},
}, chunks)
}) })
assert.EqualValues(t, [][]interface{}{
{1, 2, 3, 4},
{5, 6, 7, 8},
{9, 10},
}, chunks)
} }
func TestTail(t *testing.T) { func TestTail(t *testing.T) {
var result int runCheckedTest(t, func(t *testing.T) {
Just(1, 2, 3, 4).Tail(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) { var result int
for item := range pipe { Just(1, 2, 3, 4).Tail(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
result += item.(int) for item := range pipe {
} result += item.(int)
return result, nil }
return result, nil
})
assert.Equal(t, 7, result)
}) })
assert.Equal(t, 7, result)
} }
func TestTailZero(t *testing.T) { func TestTailZero(t *testing.T) {
assert.Panics(t, func() { runCheckedTest(t, func(t *testing.T) {
Just(1, 2, 3, 4).Tail(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) { assert.Panics(t, func() {
return nil, nil Just(1, 2, 3, 4).Tail(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
return nil, nil
})
}) })
}) })
} }
func TestWalk(t *testing.T) { func TestWalk(t *testing.T) {
var result int runCheckedTest(t, func(t *testing.T) {
Just(1, 2, 3, 4, 5).Walk(func(item interface{}, pipe chan<- interface{}) { var result int
if item.(int)%2 != 0 { Just(1, 2, 3, 4, 5).Walk(func(item interface{}, pipe chan<- interface{}) {
pipe <- item if item.(int)%2 != 0 {
pipe <- item
}
}, UnlimitedWorkers()).ForEach(func(item interface{}) {
result += item.(int)
})
assert.Equal(t, 9, result)
})
}
func TestStream_AnyMach(t *testing.T) {
runCheckedTest(t, func(t *testing.T) {
assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return item.(int) == 4
}))
assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return item.(int) == 0
}))
assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return item.(int) == 2
}))
assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return item.(int) == 2
}))
})
}
func TestStream_AllMach(t *testing.T) {
runCheckedTest(t, func(t *testing.T) {
assetEqual(
t, true, Just(1, 2, 3).AllMach(func(item interface{}) bool {
return true
}),
)
assetEqual(
t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
return false
}),
)
assetEqual(
t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
return item.(int) == 1
}),
)
})
}
func TestStream_NoneMatch(t *testing.T) {
runCheckedTest(t, func(t *testing.T) {
assetEqual(
t, true, Just(1, 2, 3).NoneMatch(func(item interface{}) bool {
return false
}),
)
assetEqual(
t, false, Just(1, 2, 3).NoneMatch(func(item interface{}) bool {
return true
}),
)
assetEqual(
t, true, Just(1, 2, 3).NoneMatch(func(item interface{}) bool {
return item.(int) == 4
}),
)
})
}
func TestConcat(t *testing.T) {
runCheckedTest(t, func(t *testing.T) {
a1 := []interface{}{1, 2, 3}
a2 := []interface{}{4, 5, 6}
s1 := Just(a1...)
s2 := Just(a2...)
stream := Concat(s1, s2)
var items []interface{}
for item := range stream.source {
items = append(items, item)
} }
}, UnlimitedWorkers()).ForEach(func(item interface{}) { sort.Slice(items, func(i, j int) bool {
result += item.(int) return items[i].(int) < items[j].(int)
})
ints := make([]interface{}, 0)
ints = append(ints, a1...)
ints = append(ints, a2...)
assetEqual(t, ints, items)
})
}
func TestStream_Skip(t *testing.T) {
runCheckedTest(t, func(t *testing.T) {
assetEqual(t, 3, Just(1, 2, 3, 4).Skip(1).Count())
assetEqual(t, 1, Just(1, 2, 3, 4).Skip(3).Count())
assetEqual(t, 4, Just(1, 2, 3, 4).Skip(0).Count())
equal(t, Just(1, 2, 3, 4).Skip(3), []interface{}{4})
assert.Panics(t, func() {
Just(1, 2, 3, 4).Skip(-1)
})
})
}
func TestStream_Concat(t *testing.T) {
runCheckedTest(t, func(t *testing.T) {
stream := Just(1).Concat(Just(2), Just(3))
var items []interface{}
for item := range stream.source {
items = append(items, item)
}
sort.Slice(items, func(i, j int) bool {
return items[i].(int) < items[j].(int)
})
assetEqual(t, []interface{}{1, 2, 3}, items)
just := Just(1)
equal(t, just.Concat(just), []interface{}{1})
}) })
assert.Equal(t, 9, result)
} }
func BenchmarkParallelMapReduce(b *testing.B) { func BenchmarkParallelMapReduce(b *testing.B) {
@ -377,95 +546,26 @@ func BenchmarkMapReduce(b *testing.B) {
}).Map(mapper).Reduce(reducer) }).Map(mapper).Reduce(reducer)
} }
func equal(t *testing.T, stream Stream, data []interface{}) {
items := make([]interface{}, 0)
for item := range stream.source {
items = append(items, item)
}
if !reflect.DeepEqual(items, data) {
t.Errorf(" %v, want %v", items, data)
}
}
func assetEqual(t *testing.T, except, data interface{}) { func assetEqual(t *testing.T, except, data interface{}) {
if !reflect.DeepEqual(except, data) { if !reflect.DeepEqual(except, data) {
t.Errorf(" %v, want %v", data, except) t.Errorf(" %v, want %v", data, except)
} }
} }
func TestStream_AnyMach(t *testing.T) { func equal(t *testing.T, stream Stream, data []interface{}) {
assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool { items := make([]interface{}, 0)
return item.(int) == 4
}))
assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return item.(int) == 0
}))
assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return item.(int) == 2
}))
assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return item.(int) == 2
}))
}
func TestStream_AllMach(t *testing.T) {
assetEqual(
t, true, Just(1, 2, 3).AllMach(func(item interface{}) bool {
return true
}),
)
assetEqual(
t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
return false
}),
)
assetEqual(
t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
return item.(int) == 1
}),
)
}
func TestConcat(t *testing.T) {
a1 := []interface{}{1, 2, 3}
a2 := []interface{}{4, 5, 6}
s1 := Just(a1...)
s2 := Just(a2...)
stream := Concat(s1, s2)
var items []interface{}
for item := range stream.source { for item := range stream.source {
items = append(items, item) items = append(items, item)
} }
sort.Slice(items, func(i, j int) bool { if !reflect.DeepEqual(items, data) {
return items[i].(int) < items[j].(int) t.Errorf(" %v, want %v", items, data)
})
ints := make([]interface{}, 0)
ints = append(ints, a1...)
ints = append(ints, a2...)
assetEqual(t, ints, items)
}
func TestStream_Skip(t *testing.T) {
assetEqual(t, 3, Just(1, 2, 3, 4).Skip(1).Count())
assetEqual(t, 1, Just(1, 2, 3, 4).Skip(3).Count())
assetEqual(t, 4, Just(1, 2, 3, 4).Skip(0).Count())
equal(t, Just(1, 2, 3, 4).Skip(3), []interface{}{4})
assert.Panics(t, func() {
Just(1, 2, 3, 4).Skip(-1)
})
}
func TestStream_Concat(t *testing.T) {
stream := Just(1).Concat(Just(2), Just(3))
var items []interface{}
for item := range stream.source {
items = append(items, item)
} }
sort.Slice(items, func(i, j int) bool { }
return items[i].(int) < items[j].(int)
})
assetEqual(t, []interface{}{1, 2, 3}, items)
just := Just(1) func runCheckedTest(t *testing.T, fn func(t *testing.T)) {
equal(t, just.Concat(just), []interface{}{1}) goroutines := runtime.NumGoroutine()
fn(t)
// let scheduler schedule first
time.Sleep(time.Millisecond)
assert.Equal(t, goroutines, runtime.NumGoroutine())
} }

@ -0,0 +1,89 @@
# mapreduce
[English](readme.md) | 简体中文
## 为什么需要 MapReduce
在实际的业务场景中我们常常需要从不同的 rpc 服务中获取相应属性来组装成复杂对象。
比如要查询商品详情:
1. 商品服务-查询商品属性
2. 库存服务-查询库存属性
3. 价格服务-查询价格属性
4. 营销服务-查询营销属性
如果是串行调用的话响应时间会随着 rpc 调用次数呈线性增长,所以我们要优化性能一般会将串行改并行。
简单的场景下使用 `WaitGroup` 也能够满足需求,但是如果我们需要对 rpc 调用返回的数据进行校验、数据加工转换、数据汇总呢?继续使用 `WaitGroup` 就有点力不从心了go 的官方库中并没有这种工具java 中提供了 CompleteFuture我们依据 MapReduce 架构思想实现了进程内的数据批处理 MapReduce 并发工具类。
## 设计思路
我们尝试把自己代入到作者的角色梳理一下并发工具可能的业务场景:
1. 查询商品详情:支持并发调用多个服务来组合产品属性,支持调用错误可以立即结束。
2. 商品详情页自动推荐用户卡券:支持并发校验卡券,校验失败自动剔除,返回全部卡券。
以上实际都是在进行对输入数据进行处理最后输出清洗后的数据,针对数据处理有个非常经典的异步模式:生产者消费者模式。于是我们可以抽象一下数据批处理的生命周期,大致可以分为三个阶段:
<img src="https://raw.githubusercontent.com/zeromicro/zero-doc/main/doc/images/mapreduce-serial-cn.png" width="500">
1. 数据生产 generate
2. 数据加工 mapper
3. 数据聚合 reducer
其中数据生产是不可或缺的阶段,数据加工、数据聚合是可选阶段,数据生产与加工支持并发调用,数据聚合基本属于纯内存操作单协程即可。
再来思考一下不同阶段之间数据应该如何流转,既然不同阶段的数据处理都是由不同 goroutine 执行的,那么很自然的可以考虑采用 channel 来实现 goroutine 之间的通信。
<img src="https://raw.githubusercontent.com/zeromicro/zero-doc/main/doc/images/mapreduce-cn.png" width="500">
如何实现随时终止流程呢?
`goroutine` 中监听一个全局的结束 `channel` 和调用方提供的 `ctx` 就行。
## 简单示例
并行求平方和(不要嫌弃示例简单,只是模拟并发)
```go
package main
import (
"fmt"
"log"
"github.com/tal-tech/go-zero/core/mr"
)
func main() {
val, err := mr.MapReduce(func(source chan<- interface{}) {
// generator
for i := 0; i < 10; i++ {
source <- i
}
}, func(item interface{}, writer mr.Writer, cancel func(error)) {
// mapper
i := item.(int)
writer.Write(i * i)
}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
// reducer
var sum int
for i := range pipe {
sum += i.(int)
}
writer.Write(sum)
})
if err != nil {
log.Fatal(err)
}
fmt.Println("result:", val)
}
```
更多示例:[https://github.com/zeromicro/zero-examples/tree/main/mapreduce](https://github.com/zeromicro/zero-examples/tree/main/mapreduce)
## 欢迎 star
如果你正在使用或者觉得这个项目对你有帮助,请 **star** 支持,感谢!

@ -0,0 +1,90 @@
<img align="right" width="150px" src="https://raw.githubusercontent.com/zeromicro/zero-doc/main/doc/images/go-zero.png">
# mapreduce
English | [简体中文](readme-cn.md)
## Why MapReduce is needed
In practical business scenarios we often need to get the corresponding properties from different rpc services to assemble complex objects.
For example, to query product details.
1. product service - query product attributes
2. inventory service - query inventory properties
3. price service - query price attributes
4. marketing service - query marketing properties
If it is a serial call, the response time will increase linearly with the number of rpc calls, so we will generally change serial to parallel to optimize response time.
Simple scenarios using `WaitGroup` can also meet the needs, but what if we need to check the data returned by the rpc call, data processing, data aggregation? The official go library does not have such a tool (CompleteFuture is provided in java), so we implemented an in-process data batching MapReduce concurrent tool based on the MapReduce architecture.
## Design ideas
Let's try to put ourselves in the author's shoes and sort out the possible business scenarios for the concurrency tool:
1. querying product details: supporting concurrent calls to multiple services to combine product attributes, and supporting call errors that can be ended immediately.
2. automatic recommendation of user card coupons on product details page: support concurrently verifying card coupons, automatically rejecting them if they fail, and returning all of them.
The above is actually processing the input data and finally outputting the cleaned data. There is a very classic asynchronous pattern for data processing: the producer-consumer pattern. So we can abstract the life cycle of data batch processing, which can be roughly divided into three phases.
<img src="https://raw.githubusercontent.com/zeromicro/zero-doc/main/doc/images/mapreduce-serial-en.png" width="500">
1. data production generate
2. data processing mapper
3. data aggregation reducer
Data producing is an indispensable stage, data processing and data aggregation are optional stages, data producing and processing support concurrent calls, data aggregation is basically a pure memory operation, so a single concurrent process can do it.
Since different stages of data processing are performed by different goroutines, it is natural to consider the use of channel to achieve communication between goroutines.
<img src="https://raw.githubusercontent.com/zeromicro/zero-doc/main/doc/images/mapreduce-en.png" width="500">
How can I terminate the process at any time?
It's simple, just receive from a channel or the given context in the goroutine.
## A simple example
Calculate the sum of squares, simulating the concurrency.
```go
package main
import (
"fmt"
"log"
"github.com/tal-tech/go-zero/core/mr"
)
func main() {
val, err := mr.MapReduce(func(source chan<- interface{}) {
// generator
for i := 0; i < 10; i++ {
source <- i
}
}, func(item interface{}, writer mr.Writer, cancel func(error)) {
// mapper
i := item.(int)
writer.Write(i * i)
}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
// reducer
var sum int
for i := range pipe {
sum += i.(int)
}
writer.Write(sum)
})
if err != nil {
log.Fatal(err)
}
fmt.Println("result:", val)
}
```
More examples: [https://github.com/zeromicro/zero-examples/tree/main/mapreduce](https://github.com/zeromicro/zero-examples/tree/main/mapreduce)
## Give a Star! ⭐
If you like or are using this project to learn or start your solution, please give it a star. Thanks!

@ -43,7 +43,7 @@ func TestServer(t *testing.T) {
Mode: "console", Mode: "console",
}, },
}, },
ListenOn: ":8080", ListenOn: "localhost:8080",
Etcd: discov.EtcdConf{}, Etcd: discov.EtcdConf{},
Auth: false, Auth: false,
Redis: redis.RedisKeyConf{}, Redis: redis.RedisKeyConf{},
@ -67,7 +67,7 @@ func TestServerError(t *testing.T) {
Mode: "console", Mode: "console",
}, },
}, },
ListenOn: ":8080", ListenOn: "localhost:8080",
Etcd: discov.EtcdConf{ Etcd: discov.EtcdConf{
Hosts: []string{"localhost"}, Hosts: []string{"localhost"},
}, },
@ -86,7 +86,7 @@ func TestServer_HasEtcd(t *testing.T) {
Mode: "console", Mode: "console",
}, },
}, },
ListenOn: ":8080", ListenOn: "localhost:8080",
Etcd: discov.EtcdConf{ Etcd: discov.EtcdConf{
Hosts: []string{"notexist"}, Hosts: []string{"notexist"},
Key: "any", Key: "any",

Loading…
Cancel
Save