|
|
@ -20,18 +20,30 @@ type (
|
|
|
|
workers int
|
|
|
|
workers int
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// FilterFunc defines the method to filter a Stream.
|
|
|
|
FilterFunc func(item interface{}) bool
|
|
|
|
FilterFunc func(item interface{}) bool
|
|
|
|
|
|
|
|
// ForAllFunc defines the method to handle all elements in a Stream.
|
|
|
|
ForAllFunc func(pipe <-chan interface{})
|
|
|
|
ForAllFunc func(pipe <-chan interface{})
|
|
|
|
|
|
|
|
// ForEachFunc defines the method to handle each element in a Stream.
|
|
|
|
ForEachFunc func(item interface{})
|
|
|
|
ForEachFunc func(item interface{})
|
|
|
|
|
|
|
|
// GenerateFunc defines the method to send elements into a Stream.
|
|
|
|
GenerateFunc func(source chan<- interface{})
|
|
|
|
GenerateFunc func(source chan<- interface{})
|
|
|
|
|
|
|
|
// KeyFunc defines the method to generate keys for the elements in a Stream.
|
|
|
|
KeyFunc func(item interface{}) interface{}
|
|
|
|
KeyFunc func(item interface{}) interface{}
|
|
|
|
|
|
|
|
// LessFunc defines the method to compare the elements in a Stream.
|
|
|
|
LessFunc func(a, b interface{}) bool
|
|
|
|
LessFunc func(a, b interface{}) bool
|
|
|
|
|
|
|
|
// MapFunc defines the method to map each element to another object in a Stream.
|
|
|
|
MapFunc func(item interface{}) interface{}
|
|
|
|
MapFunc func(item interface{}) interface{}
|
|
|
|
|
|
|
|
// Option defines the method to customize a Stream.
|
|
|
|
Option func(opts *rxOptions)
|
|
|
|
Option func(opts *rxOptions)
|
|
|
|
|
|
|
|
// ParallelFunc defines the method to handle elements parallelly.
|
|
|
|
ParallelFunc func(item interface{})
|
|
|
|
ParallelFunc func(item interface{})
|
|
|
|
|
|
|
|
// ReduceFunc defines the method to reduce all the elements in a Stream.
|
|
|
|
ReduceFunc func(pipe <-chan interface{}) (interface{}, error)
|
|
|
|
ReduceFunc func(pipe <-chan interface{}) (interface{}, error)
|
|
|
|
|
|
|
|
// WalkFunc defines the method to walk through all the elements in a Stream.
|
|
|
|
WalkFunc func(item interface{}, pipe chan<- interface{})
|
|
|
|
WalkFunc func(item interface{}, pipe chan<- interface{})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// A Stream is a stream that can be used to do stream processing.
|
|
|
|
Stream struct {
|
|
|
|
Stream struct {
|
|
|
|
source <-chan interface{}
|
|
|
|
source <-chan interface{}
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -159,6 +171,7 @@ func (p Stream) Group(fn KeyFunc) Stream {
|
|
|
|
return Range(source)
|
|
|
|
return Range(source)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Head returns the first n elements in p.
|
|
|
|
func (p Stream) Head(n int64) Stream {
|
|
|
|
func (p Stream) Head(n int64) Stream {
|
|
|
|
if n < 1 {
|
|
|
|
if n < 1 {
|
|
|
|
panic("n must be greater than 0")
|
|
|
|
panic("n must be greater than 0")
|
|
|
@ -187,7 +200,7 @@ func (p Stream) Head(n int64) Stream {
|
|
|
|
return Range(source)
|
|
|
|
return Range(source)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Maps converts each item to another corresponding item, which means it's a 1:1 model.
|
|
|
|
// Map converts each item to another corresponding item, which means it's a 1:1 model.
|
|
|
|
func (p Stream) Map(fn MapFunc, opts ...Option) Stream {
|
|
|
|
func (p Stream) Map(fn MapFunc, opts ...Option) Stream {
|
|
|
|
return p.Walk(func(item interface{}, pipe chan<- interface{}) {
|
|
|
|
return p.Walk(func(item interface{}, pipe chan<- interface{}) {
|
|
|
|
pipe <- fn(item)
|
|
|
|
pipe <- fn(item)
|
|
|
@ -274,6 +287,7 @@ func (p Stream) Split(n int) Stream {
|
|
|
|
return Range(source)
|
|
|
|
return Range(source)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Tail returns the last n elements in p.
|
|
|
|
func (p Stream) Tail(n int64) Stream {
|
|
|
|
func (p Stream) Tail(n int64) Stream {
|
|
|
|
if n < 1 {
|
|
|
|
if n < 1 {
|
|
|
|
panic("n should be greater than 0")
|
|
|
|
panic("n should be greater than 0")
|
|
|
|