diff --git a/core/fs/files.go b/core/fs/files.go index b0cb870c..f29f7142 100644 --- a/core/fs/files.go +++ b/core/fs/files.go @@ -7,6 +7,7 @@ import ( "syscall" ) +// CloseOnExec makes sure closing the file on process forking. func CloseOnExec(file *os.File) { if file != nil { syscall.CloseOnExec(int(file.Fd())) diff --git a/core/fx/fn.go b/core/fx/fn.go index 4e560e52..2250f5d6 100644 --- a/core/fx/fn.go +++ b/core/fx/fn.go @@ -20,18 +20,30 @@ type ( workers int } - FilterFunc func(item interface{}) bool - ForAllFunc func(pipe <-chan interface{}) - ForEachFunc func(item interface{}) + // FilterFunc defines the method to filter a Stream. + FilterFunc func(item interface{}) bool + // ForAllFunc defines the method to handle all elements in a Stream. + ForAllFunc func(pipe <-chan interface{}) + // ForEachFunc defines the method to handle each element in a Stream. + ForEachFunc func(item interface{}) + // GenerateFunc defines the method to send elements into a Stream. GenerateFunc func(source chan<- interface{}) - KeyFunc func(item interface{}) interface{} - LessFunc func(a, b interface{}) bool - MapFunc func(item interface{}) interface{} - Option func(opts *rxOptions) + // KeyFunc defines the method to generate keys for the elements in a Stream. + KeyFunc func(item interface{}) interface{} + // LessFunc defines the method to compare the elements in a Stream. + LessFunc func(a, b interface{}) bool + // MapFunc defines the method to map each element to another object in a Stream. + MapFunc func(item interface{}) interface{} + // Option defines the method to customize a Stream. + Option func(opts *rxOptions) + // ParallelFunc defines the method to handle elements parallelly. ParallelFunc func(item interface{}) - ReduceFunc func(pipe <-chan interface{}) (interface{}, error) - WalkFunc func(item interface{}, pipe chan<- interface{}) + // ReduceFunc defines the method to reduce all the elements in a Stream. + ReduceFunc func(pipe <-chan interface{}) (interface{}, error) + // WalkFunc defines the method to walk through all the elements in a Stream. + WalkFunc func(item interface{}, pipe chan<- interface{}) + // A Stream is a stream that can be used to do stream processing. Stream struct { source <-chan interface{} } @@ -159,6 +171,7 @@ func (p Stream) Group(fn KeyFunc) Stream { return Range(source) } +// Head returns the first n elements in p. func (p Stream) Head(n int64) Stream { if n < 1 { panic("n must be greater than 0") @@ -187,7 +200,7 @@ func (p Stream) Head(n int64) Stream { return Range(source) } -// Maps converts each item to another corresponding item, which means it's a 1:1 model. +// Map converts each item to another corresponding item, which means it's a 1:1 model. func (p Stream) Map(fn MapFunc, opts ...Option) Stream { return p.Walk(func(item interface{}, pipe chan<- interface{}) { pipe <- fn(item) @@ -274,6 +287,7 @@ func (p Stream) Split(n int) Stream { return Range(source) } +// Tail returns the last n elements in p. func (p Stream) Tail(n int64) Stream { if n < 1 { panic("n should be greater than 0") diff --git a/core/fx/parallel.go b/core/fx/parallel.go index d4cefea8..d7146284 100644 --- a/core/fx/parallel.go +++ b/core/fx/parallel.go @@ -2,6 +2,7 @@ package fx import "github.com/tal-tech/go-zero/core/threading" +// Parallel runs fns parallelly and waits for done. func Parallel(fns ...func()) { group := threading.NewRoutineGroup() for _, fn := range fns { diff --git a/core/fx/retry.go b/core/fx/retry.go index 4aa99767..40475e0f 100644 --- a/core/fx/retry.go +++ b/core/fx/retry.go @@ -5,6 +5,7 @@ import "github.com/tal-tech/go-zero/core/errorx" const defaultRetryTimes = 3 type ( + // RetryOption defines the method to customize DoWithRetry. RetryOption func(*retryOptions) retryOptions struct { @@ -12,7 +13,8 @@ type ( } ) -func DoWithRetries(fn func() error, opts ...RetryOption) error { +// DoWithRetry runs fn, and retries if failed. Default to retry 3 times. +func DoWithRetry(fn func() error, opts ...RetryOption) error { var options = newRetryOptions() for _, opt := range opts { opt(options) @@ -30,7 +32,8 @@ func DoWithRetries(fn func() error, opts ...RetryOption) error { return berr.Err() } -func WithRetries(times int) RetryOption { +// WithRetry customize a DoWithRetry call with given retry times. +func WithRetry(times int) RetryOption { return func(options *retryOptions) { options.times = times } diff --git a/core/fx/retry_test.go b/core/fx/retry_test.go index 73f8b408..1664b6ed 100644 --- a/core/fx/retry_test.go +++ b/core/fx/retry_test.go @@ -8,12 +8,12 @@ import ( ) func TestRetry(t *testing.T) { - assert.NotNil(t, DoWithRetries(func() error { + assert.NotNil(t, DoWithRetry(func() error { return errors.New("any") })) var times int - assert.Nil(t, DoWithRetries(func() error { + assert.Nil(t, DoWithRetry(func() error { times++ if times == defaultRetryTimes { return nil @@ -22,7 +22,7 @@ func TestRetry(t *testing.T) { })) times = 0 - assert.NotNil(t, DoWithRetries(func() error { + assert.NotNil(t, DoWithRetry(func() error { times++ if times == defaultRetryTimes+1 { return nil @@ -32,11 +32,11 @@ func TestRetry(t *testing.T) { var total = 2 * defaultRetryTimes times = 0 - assert.Nil(t, DoWithRetries(func() error { + assert.Nil(t, DoWithRetry(func() error { times++ if times == total { return nil } return errors.New("any") - }, WithRetries(total))) + }, WithRetry(total))) } diff --git a/core/fx/timeout.go b/core/fx/timeout.go index 2cd1b6c7..afd0748c 100644 --- a/core/fx/timeout.go +++ b/core/fx/timeout.go @@ -3,21 +3,27 @@ package fx import ( "context" "time" + + "github.com/tal-tech/go-zero/core/contextx" ) var ( + // ErrCanceled is the error returned when the context is canceled. ErrCanceled = context.Canceled - ErrTimeout = context.DeadlineExceeded + // ErrTimeout is the error returned when the context's deadline passes. + ErrTimeout = context.DeadlineExceeded ) -type FxOption func() context.Context +// DoOption defines the method to customize a DoWithTimeout call. +type DoOption func() context.Context -func DoWithTimeout(fn func() error, timeout time.Duration, opts ...FxOption) error { +// DoWithTimeout runs fn with timeout control. +func DoWithTimeout(fn func() error, timeout time.Duration, opts ...DoOption) error { parentCtx := context.Background() for _, opt := range opts { parentCtx = opt() } - ctx, cancel := context.WithTimeout(parentCtx, timeout) + ctx, cancel := contextx.ShrinkDeadline(parentCtx, timeout) defer cancel() done := make(chan error) @@ -42,7 +48,8 @@ func DoWithTimeout(fn func() error, timeout time.Duration, opts ...FxOption) err } } -func WithContext(ctx context.Context) FxOption { +// WithContext customizes a DoWithTimeout call with given ctx. +func WithContext(ctx context.Context) DoOption { return func() context.Context { return ctx }