|
|
|
package executors
|
|
|
|
|
|
|
|
import "time"
|
|
|
|
|
|
|
|
const defaultChunkSize = 1024 * 1024 // 1M
|
|
|
|
|
|
|
|
type (
|
|
|
|
// ChunkOption defines the method to customize a ChunkExecutor.
|
|
|
|
ChunkOption func(options *chunkOptions)
|
|
|
|
|
|
|
|
// A ChunkExecutor is an executor to execute tasks when either requirement meets:
|
|
|
|
// 1. up to given chunk size
|
|
|
|
// 2. flush interval elapsed
|
|
|
|
ChunkExecutor struct {
|
|
|
|
executor *PeriodicalExecutor
|
|
|
|
container *chunkContainer
|
|
|
|
}
|
|
|
|
|
|
|
|
chunkOptions struct {
|
|
|
|
chunkSize int
|
|
|
|
flushInterval time.Duration
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
// NewChunkExecutor returns a ChunkExecutor.
|
|
|
|
func NewChunkExecutor(execute Execute, opts ...ChunkOption) *ChunkExecutor {
|
|
|
|
options := newChunkOptions()
|
|
|
|
for _, opt := range opts {
|
|
|
|
opt(&options)
|
|
|
|
}
|
|
|
|
|
|
|
|
container := &chunkContainer{
|
|
|
|
execute: execute,
|
|
|
|
maxChunkSize: options.chunkSize,
|
|
|
|
}
|
|
|
|
executor := &ChunkExecutor{
|
|
|
|
executor: NewPeriodicalExecutor(options.flushInterval, container),
|
|
|
|
container: container,
|
|
|
|
}
|
|
|
|
|
|
|
|
return executor
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add adds task with given chunk size into ce.
|
|
|
|
func (ce *ChunkExecutor) Add(task interface{}, size int) error {
|
|
|
|
ce.executor.Add(chunk{
|
|
|
|
val: task,
|
|
|
|
size: size,
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Flush forces ce to flush and execute tasks.
|
|
|
|
func (ce *ChunkExecutor) Flush() {
|
|
|
|
ce.executor.Flush()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Wait waits the execution to be done.
|
|
|
|
func (ce *ChunkExecutor) Wait() {
|
|
|
|
ce.executor.Wait()
|
|
|
|
}
|
|
|
|
|
|
|
|
// WithChunkBytes customizes a ChunkExecutor with the given chunk size.
|
|
|
|
func WithChunkBytes(size int) ChunkOption {
|
|
|
|
return func(options *chunkOptions) {
|
|
|
|
options.chunkSize = size
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// WithFlushInterval customizes a ChunkExecutor with the given flush interval.
|
|
|
|
func WithFlushInterval(duration time.Duration) ChunkOption {
|
|
|
|
return func(options *chunkOptions) {
|
|
|
|
options.flushInterval = duration
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func newChunkOptions() chunkOptions {
|
|
|
|
return chunkOptions{
|
|
|
|
chunkSize: defaultChunkSize,
|
|
|
|
flushInterval: defaultFlushInterval,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type chunkContainer struct {
|
|
|
|
tasks []interface{}
|
|
|
|
execute Execute
|
|
|
|
size int
|
|
|
|
maxChunkSize int
|
|
|
|
}
|
|
|
|
|
|
|
|
func (bc *chunkContainer) AddTask(task interface{}) bool {
|
|
|
|
ck := task.(chunk)
|
|
|
|
bc.tasks = append(bc.tasks, ck.val)
|
|
|
|
bc.size += ck.size
|
|
|
|
return bc.size >= bc.maxChunkSize
|
|
|
|
}
|
|
|
|
|
|
|
|
func (bc *chunkContainer) Execute(tasks interface{}) {
|
|
|
|
vals := tasks.([]interface{})
|
|
|
|
bc.execute(vals)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (bc *chunkContainer) RemoveAll() interface{} {
|
|
|
|
tasks := bc.tasks
|
|
|
|
bc.tasks = nil
|
|
|
|
bc.size = 0
|
|
|
|
return tasks
|
|
|
|
}
|
|
|
|
|
|
|
|
type chunk struct {
|
|
|
|
val interface{}
|
|
|
|
size int
|
|
|
|
}
|