You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-zero/core/executors/chunkexecutor.go

114 lines
2.4 KiB
Go

4 years ago
package executors
import "time"
const defaultChunkSize = 1024 * 1024 // 1M
type (
// ChunkOption defines the method to customize a ChunkExecutor.
4 years ago
ChunkOption func(options *chunkOptions)
// A ChunkExecutor is an executor to execute tasks when either requirement meets:
// 1. up to given chunk size
// 2. flush interval elapsed
4 years ago
ChunkExecutor struct {
executor *PeriodicalExecutor
container *chunkContainer
}
chunkOptions struct {
chunkSize int
flushInterval time.Duration
}
)
// NewChunkExecutor returns a ChunkExecutor.
4 years ago
func NewChunkExecutor(execute Execute, opts ...ChunkOption) *ChunkExecutor {
options := newChunkOptions()
for _, opt := range opts {
opt(&options)
}
container := &chunkContainer{
execute: execute,
maxChunkSize: options.chunkSize,
}
executor := &ChunkExecutor{
executor: NewPeriodicalExecutor(options.flushInterval, container),
container: container,
}
return executor
}
// Add adds task with given chunk size into ce.
4 years ago
func (ce *ChunkExecutor) Add(task interface{}, size int) error {
ce.executor.Add(chunk{
val: task,
size: size,
})
return nil
}
// Flush forces ce to flush and execute tasks.
4 years ago
func (ce *ChunkExecutor) Flush() {
ce.executor.Flush()
}
// Wait waits the execution to be done.
4 years ago
func (ce *ChunkExecutor) Wait() {
ce.executor.Wait()
}
// WithChunkBytes customizes a ChunkExecutor with the given chunk size.
4 years ago
func WithChunkBytes(size int) ChunkOption {
return func(options *chunkOptions) {
options.chunkSize = size
}
}
// WithFlushInterval customizes a ChunkExecutor with the given flush interval.
4 years ago
func WithFlushInterval(duration time.Duration) ChunkOption {
return func(options *chunkOptions) {
options.flushInterval = duration
}
}
func newChunkOptions() chunkOptions {
return chunkOptions{
chunkSize: defaultChunkSize,
flushInterval: defaultFlushInterval,
}
}
type chunkContainer struct {
tasks []interface{}
execute Execute
size int
maxChunkSize int
}
func (bc *chunkContainer) AddTask(task interface{}) bool {
ck := task.(chunk)
bc.tasks = append(bc.tasks, ck.val)
bc.size += ck.size
return bc.size >= bc.maxChunkSize
}
func (bc *chunkContainer) Execute(tasks interface{}) {
vals := tasks.([]interface{})
bc.execute(vals)
}
func (bc *chunkContainer) RemoveAll() interface{} {
tasks := bc.tasks
bc.tasks = nil
bc.size = 0
return tasks
}
type chunk struct {
val interface{}
size int
}