diff --git a/core/fx/stream.go b/core/fx/stream.go index 9f1565ae..e832ffba 100644 --- a/core/fx/stream.go +++ b/core/fx/stream.go @@ -49,19 +49,9 @@ type ( } ) -// empty a empty Stream. -var empty Stream - -func init() { - // initial empty Stream. - source := make(chan interface{}) - close(source) - empty.source = source -} - -// Empty Returns a empty stream. -func Empty() Stream { - return empty +// Concat returns a concatenated Stream. +func Concat(s Stream, others ...Stream) Stream { + return s.Concat(others...) } // From constructs a Stream from the given GenerateFunc. @@ -94,21 +84,42 @@ func Range(source <-chan interface{}) Stream { } } -// Concat Returns a concat Stream. -func Concat(a Stream, others ...Stream) Stream { - return a.Concat(others...) +// AllMach returns whether all elements of this stream match the provided predicate. +// May not evaluate the predicate on all elements if not necessary for determining the result. +// If the stream is empty then true is returned and the predicate is not evaluated. +func (s Stream) AllMach(predicate func(item interface{}) bool) bool { + for item := range s.source { + if !predicate(item) { + return false + } + } + + return true +} + +// AnyMach returns whether any elements of this stream match the provided predicate. +// May not evaluate the predicate on all elements if not necessary for determining the result. +// If the stream is empty then false is returned and the predicate is not evaluated. +func (s Stream) AnyMach(predicate func(item interface{}) bool) bool { + for item := range s.source { + if predicate(item) { + return true + } + } + + return false } // Buffer buffers the items into a queue with size n. // It can balance the producer and the consumer if their processing throughput don't match. -func (p Stream) Buffer(n int) Stream { +func (s Stream) Buffer(n int) Stream { if n < 0 { n = 0 } source := make(chan interface{}, n) go func() { - for item := range p.source { + for item := range s.source { source <- item } close(source) @@ -117,23 +128,51 @@ func (p Stream) Buffer(n int) Stream { return Range(source) } +// Concat returns a Stream that concatenated other streams +func (s Stream) Concat(others ...Stream) Stream { + source := make(chan interface{}) + + go func() { + group := threading.NewRoutineGroup() + group.Run(func() { + for item := range s.source { + source <- item + } + }) + + for _, each := range others { + each := each + group.Run(func() { + for item := range each.source { + source <- item + } + }) + } + + group.Wait() + close(source) + }() + + return Range(source) +} + // Count counts the number of elements in the result. -func (p Stream) Count() (count int) { - for range p.source { +func (s Stream) Count() (count int) { + for range s.source { count++ } return } // Distinct removes the duplicated items base on the given KeyFunc. -func (p Stream) Distinct(fn KeyFunc) Stream { +func (s Stream) Distinct(fn KeyFunc) Stream { source := make(chan interface{}) threading.GoSafe(func() { defer close(source) keys := make(map[interface{}]lang.PlaceholderType) - for item := range p.source { + for item := range s.source { key := fn(item) if _, ok := keys[key]; !ok { source <- item @@ -146,14 +185,14 @@ func (p Stream) Distinct(fn KeyFunc) Stream { } // Done waits all upstreaming operations to be done. -func (p Stream) Done() { - for range p.source { +func (s Stream) Done() { + for range s.source { } } // Filter filters the items by the given FilterFunc. -func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream { - return p.Walk(func(item interface{}, pipe chan<- interface{}) { +func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream { + return s.Walk(func(item interface{}, pipe chan<- interface{}) { if fn(item) { pipe <- item } @@ -161,21 +200,21 @@ func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream { } // ForAll handles the streaming elements from the source and no later streams. -func (p Stream) ForAll(fn ForAllFunc) { - fn(p.source) +func (s Stream) ForAll(fn ForAllFunc) { + fn(s.source) } // ForEach seals the Stream with the ForEachFunc on each item, no successive operations. -func (p Stream) ForEach(fn ForEachFunc) { - for item := range p.source { +func (s Stream) ForEach(fn ForEachFunc) { + for item := range s.source { fn(item) } } // Group groups the elements into different groups based on their keys. -func (p Stream) Group(fn KeyFunc) Stream { +func (s Stream) Group(fn KeyFunc) Stream { groups := make(map[interface{}][]interface{}) - for item := range p.source { + for item := range s.source { key := fn(item) groups[key] = append(groups[key], item) } @@ -192,7 +231,7 @@ func (p Stream) Group(fn KeyFunc) Stream { } // Head returns the first n elements in p. -func (p Stream) Head(n int64) Stream { +func (s Stream) Head(n int64) Stream { if n < 1 { panic("n must be greater than 0") } @@ -200,7 +239,7 @@ func (p Stream) Head(n int64) Stream { source := make(chan interface{}) go func() { - for item := range p.source { + for item := range s.source { n-- if n >= 0 { source <- item @@ -221,16 +260,16 @@ func (p Stream) Head(n int64) Stream { } // Map converts each item to another corresponding item, which means it's a 1:1 model. -func (p Stream) Map(fn MapFunc, opts ...Option) Stream { - return p.Walk(func(item interface{}, pipe chan<- interface{}) { +func (s Stream) Map(fn MapFunc, opts ...Option) Stream { + return s.Walk(func(item interface{}, pipe chan<- interface{}) { pipe <- fn(item) }, opts...) } // Merge merges all the items into a slice and generates a new stream. -func (p Stream) Merge() Stream { +func (s Stream) Merge() Stream { var items []interface{} - for item := range p.source { + for item := range s.source { items = append(items, item) } @@ -242,21 +281,21 @@ func (p Stream) Merge() Stream { } // Parallel applies the given ParallelFunc to each item concurrently with given number of workers. -func (p Stream) Parallel(fn ParallelFunc, opts ...Option) { - p.Walk(func(item interface{}, pipe chan<- interface{}) { +func (s Stream) Parallel(fn ParallelFunc, opts ...Option) { + s.Walk(func(item interface{}, pipe chan<- interface{}) { fn(item) }, opts...).Done() } // Reduce is a utility method to let the caller deal with the underlying channel. -func (p Stream) Reduce(fn ReduceFunc) (interface{}, error) { - return fn(p.source) +func (s Stream) Reduce(fn ReduceFunc) (interface{}, error) { + return fn(s.source) } // Reverse reverses the elements in the stream. -func (p Stream) Reverse() Stream { +func (s Stream) Reverse() Stream { var items []interface{} - for item := range p.source { + for item := range s.source { items = append(items, item) } // reverse, official method @@ -268,10 +307,36 @@ func (p Stream) Reverse() Stream { return Just(items...) } +// Skip returns a Stream that skips size elements. +func (s Stream) Skip(n int64) Stream { + if n < 0 { + panic("n must not be negative") + } + if n == 0 { + return s + } + + source := make(chan interface{}) + + go func() { + for item := range s.source { + n-- + if n >= 0 { + continue + } else { + source <- item + } + } + close(source) + }() + + return Range(source) +} + // Sort sorts the items from the underlying source. -func (p Stream) Sort(less LessFunc) Stream { +func (s Stream) Sort(less LessFunc) Stream { var items []interface{} - for item := range p.source { + for item := range s.source { items = append(items, item) } sort.Slice(items, func(i, j int) bool { @@ -283,7 +348,7 @@ func (p Stream) Sort(less LessFunc) Stream { // Split splits the elements into chunk with size up to n, // might be less than n on tailing elements. -func (p Stream) Split(n int) Stream { +func (s Stream) Split(n int) Stream { if n < 1 { panic("n should be greater than 0") } @@ -291,7 +356,7 @@ func (p Stream) Split(n int) Stream { source := make(chan interface{}) go func() { var chunk []interface{} - for item := range p.source { + for item := range s.source { chunk = append(chunk, item) if len(chunk) == n { source <- chunk @@ -308,7 +373,7 @@ func (p Stream) Split(n int) Stream { } // Tail returns the last n elements in p. -func (p Stream) Tail(n int64) Stream { +func (s Stream) Tail(n int64) Stream { if n < 1 { panic("n should be greater than 0") } @@ -317,7 +382,7 @@ func (p Stream) Tail(n int64) Stream { go func() { ring := collection.NewRing(int(n)) - for item := range p.source { + for item := range s.source { ring.Add(item) } for _, item := range ring.Take() { @@ -330,16 +395,16 @@ func (p Stream) Tail(n int64) Stream { } // Walk lets the callers handle each item, the caller may write zero, one or more items base on the given item. -func (p Stream) Walk(fn WalkFunc, opts ...Option) Stream { +func (s Stream) Walk(fn WalkFunc, opts ...Option) Stream { option := buildOptions(opts...) if option.unlimitedWorkers { - return p.walkUnlimited(fn, option) + return s.walkUnlimited(fn, option) } - return p.walkLimited(fn, option) + return s.walkLimited(fn, option) } -func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream { +func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream { pipe := make(chan interface{}, option.workers) go func() { @@ -348,7 +413,7 @@ func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream { for { pool <- lang.Placeholder - item, ok := <-p.source + item, ok := <-s.source if !ok { <-pool break @@ -373,14 +438,14 @@ func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream { return Range(pipe) } -func (p Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream { +func (s Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream { pipe := make(chan interface{}, defaultWorkers) go func() { var wg sync.WaitGroup for { - item, ok := <-p.source + item, ok := <-s.source if !ok { break } @@ -400,93 +465,6 @@ func (p Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream { return Range(pipe) } -// AnyMach Returns whether any elements of this stream match the provided predicate. -// May not evaluate the predicate on all elements if not necessary for determining the result. -// If the stream is empty then false is returned and the predicate is not evaluated. -func (p Stream) AnyMach(f func(item interface{}) bool) (isFind bool) { - for item := range p.source { - if f(item) { - isFind = true - return - } - } - return -} - -// AllMach Returns whether all elements of this stream match the provided predicate. -// May not evaluate the predicate on all elements if not necessary for determining the result. -// If the stream is empty then true is returned and the predicate is not evaluated. -func (p Stream) AllMach(f func(item interface{}) bool) (isFind bool) { - isFind = true - for item := range p.source { - if !f(item) { - isFind = false - return - } - } - return -} - -// Concat Returns a Stream that concat others streams -func (p Stream) Concat(others ...Stream) Stream { - source := make(chan interface{}) - wg := sync.WaitGroup{} - for _, other := range others { - if p == other { - continue - } - wg.Add(1) - go func(iother Stream) { - for item := range iother.source { - source <- item - } - wg.Done() - }(other) - - } - - wg.Add(1) - go func() { - for item := range p.source { - source <- item - } - wg.Done() - }() - go func() { - wg.Wait() - close(source) - }() - return Range(source) -} - -// Skip Returns a Stream that skips size elements. -func (p Stream) Skip(size int64) Stream { - if size == 0 { - return p - } - if size < 0 { - panic("size must be greater than -1") - } - source := make(chan interface{}) - - go func() { - i := 0 - for item := range p.source { - if i >= int(size) { - source <- item - } - i++ - } - close(source) - }() - return Range(source) -} - -// Chan Returns a channel of Stream. -func (p Stream) Chan() <-chan interface{} { - return p.source -} - // UnlimitedWorkers lets the caller to use as many workers as the tasks. func UnlimitedWorkers() Option { return func(opts *rxOptions) { diff --git a/core/fx/stream_test.go b/core/fx/stream_test.go index 84868073..dab08c89 100644 --- a/core/fx/stream_test.go +++ b/core/fx/stream_test.go @@ -353,9 +353,7 @@ func BenchmarkParallelMapReduce(b *testing.B) { input <- int64(rand.Int()) } }) - }).Map(mapper).Reduce(reducer) - } func BenchmarkMapReduce(b *testing.B) { @@ -377,7 +375,6 @@ func BenchmarkMapReduce(b *testing.B) { input <- int64(rand.Int()) } }).Map(mapper).Reduce(reducer) - } func equal(t *testing.T, stream Stream, data []interface{}) { @@ -389,12 +386,13 @@ func equal(t *testing.T, stream Stream, data []interface{}) { t.Errorf(" %v, want %v", items, data) } } -func assetEqual(t *testing.T, except interface{}, data interface{}) { + +func assetEqual(t *testing.T, except, data interface{}) { if !reflect.DeepEqual(except, data) { t.Errorf(" %v, want %v", data, except) } - } + func TestStream_AnyMach(t *testing.T) { assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool { return 4 == item.(int) @@ -409,6 +407,7 @@ func TestStream_AnyMach(t *testing.T) { return 2 == item.(int) })) } + func TestStream_AllMach(t *testing.T) { assetEqual( t, true, Just(1, 2, 3).AllMach(func(item interface{}) bool { @@ -426,11 +425,7 @@ func TestStream_AllMach(t *testing.T) { }), ) } -func TestEmpty(t *testing.T) { - empty := Empty() - assetEqual(t, len(empty.source), 0) - assetEqual(t, cap(empty.source), 0) -} + func TestConcat(t *testing.T) { a1 := []interface{}{1, 2, 3} a2 := []interface{}{4, 5, 6} @@ -448,15 +443,6 @@ func TestConcat(t *testing.T) { ints = append(ints, a1...) ints = append(ints, a2...) assetEqual(t, ints, items) - -} -func TestStream_Chan(t *testing.T) { - var items []interface{} - - for item := range Just(1, 2, 3).Chan() { - items = append(items, item) - } - assetEqual(t, items, []interface{}{1, 2, 3}) } func TestStream_Skip(t *testing.T) {