|
|
@ -5,8 +5,12 @@ import "time"
|
|
|
|
const defaultChunkSize = 1024 * 1024 // 1M
|
|
|
|
const defaultChunkSize = 1024 * 1024 // 1M
|
|
|
|
|
|
|
|
|
|
|
|
type (
|
|
|
|
type (
|
|
|
|
|
|
|
|
// ChunkOption defines the method to customize a ChunkExecutor.
|
|
|
|
ChunkOption func(options *chunkOptions)
|
|
|
|
ChunkOption func(options *chunkOptions)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// A ChunkExecutor is an executor to execute tasks when either requirement meets:
|
|
|
|
|
|
|
|
// 1. up to given chunk size
|
|
|
|
|
|
|
|
// 2. flush interval elapsed
|
|
|
|
ChunkExecutor struct {
|
|
|
|
ChunkExecutor struct {
|
|
|
|
executor *PeriodicalExecutor
|
|
|
|
executor *PeriodicalExecutor
|
|
|
|
container *chunkContainer
|
|
|
|
container *chunkContainer
|
|
|
@ -18,6 +22,7 @@ type (
|
|
|
|
}
|
|
|
|
}
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// NewChunkExecutor returns a ChunkExecutor.
|
|
|
|
func NewChunkExecutor(execute Execute, opts ...ChunkOption) *ChunkExecutor {
|
|
|
|
func NewChunkExecutor(execute Execute, opts ...ChunkOption) *ChunkExecutor {
|
|
|
|
options := newChunkOptions()
|
|
|
|
options := newChunkOptions()
|
|
|
|
for _, opt := range opts {
|
|
|
|
for _, opt := range opts {
|
|
|
@ -36,6 +41,7 @@ func NewChunkExecutor(execute Execute, opts ...ChunkOption) *ChunkExecutor {
|
|
|
|
return executor
|
|
|
|
return executor
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Add adds task with given chunk size into ce.
|
|
|
|
func (ce *ChunkExecutor) Add(task interface{}, size int) error {
|
|
|
|
func (ce *ChunkExecutor) Add(task interface{}, size int) error {
|
|
|
|
ce.executor.Add(chunk{
|
|
|
|
ce.executor.Add(chunk{
|
|
|
|
val: task,
|
|
|
|
val: task,
|
|
|
@ -44,20 +50,24 @@ func (ce *ChunkExecutor) Add(task interface{}, size int) error {
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Flush forces ce to flush and execute tasks.
|
|
|
|
func (ce *ChunkExecutor) Flush() {
|
|
|
|
func (ce *ChunkExecutor) Flush() {
|
|
|
|
ce.executor.Flush()
|
|
|
|
ce.executor.Flush()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Wait waits the execution to be done.
|
|
|
|
func (ce *ChunkExecutor) Wait() {
|
|
|
|
func (ce *ChunkExecutor) Wait() {
|
|
|
|
ce.executor.Wait()
|
|
|
|
ce.executor.Wait()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// WithChunkBytes customizes a ChunkExecutor with the given chunk size.
|
|
|
|
func WithChunkBytes(size int) ChunkOption {
|
|
|
|
func WithChunkBytes(size int) ChunkOption {
|
|
|
|
return func(options *chunkOptions) {
|
|
|
|
return func(options *chunkOptions) {
|
|
|
|
options.chunkSize = size
|
|
|
|
options.chunkSize = size
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// WithFlushInterval customizes a ChunkExecutor with the given flush interval.
|
|
|
|
func WithFlushInterval(duration time.Duration) ChunkOption {
|
|
|
|
func WithFlushInterval(duration time.Duration) ChunkOption {
|
|
|
|
return func(options *chunkOptions) {
|
|
|
|
return func(options *chunkOptions) {
|
|
|
|
options.flushInterval = duration
|
|
|
|
options.flushInterval = duration
|
|
|
|