You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-zero/core/executors/bulkexecutor.go

102 lines
2.2 KiB
Go

package executors
import "time"
const defaultBulkTasks = 1000
type (
// BulkOption defines the method to customize a BulkExecutor.
BulkOption func(options *bulkOptions)
// A BulkExecutor is an executor that can execute tasks on either requirement meets:
// 1. up to given size of tasks
// 2. flush interval time elapsed
BulkExecutor struct {
executor *PeriodicalExecutor
container *bulkContainer
}
bulkOptions struct {
cachedTasks int
flushInterval time.Duration
}
)
// NewBulkExecutor returns a BulkExecutor.
func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor {
options := newBulkOptions()
for _, opt := range opts {
opt(&options)
}
container := &bulkContainer{
execute: execute,
maxTasks: options.cachedTasks,
}
executor := &BulkExecutor{
executor: NewPeriodicalExecutor(options.flushInterval, container),
container: container,
}
return executor
}
// Add adds task into be.
func (be *BulkExecutor) Add(task interface{}) error {
be.executor.Add(task)
return nil
}
// Flush forces be to flush and execute tasks.
func (be *BulkExecutor) Flush() {
be.executor.Flush()
}
// Wait waits be to done with the task execution.
func (be *BulkExecutor) Wait() {
be.executor.Wait()
}
// WithBulkTasks customizes a BulkExecutor with given tasks limit.
func WithBulkTasks(tasks int) BulkOption {
return func(options *bulkOptions) {
options.cachedTasks = tasks
}
}
// WithBulkInterval customizes a BulkExecutor with given flush interval.
func WithBulkInterval(duration time.Duration) BulkOption {
return func(options *bulkOptions) {
options.flushInterval = duration
}
}
func newBulkOptions() bulkOptions {
return bulkOptions{
cachedTasks: defaultBulkTasks,
flushInterval: defaultFlushInterval,
}
}
type bulkContainer struct {
tasks []interface{}
execute Execute
maxTasks int
}
func (bc *bulkContainer) AddTask(task interface{}) bool {
bc.tasks = append(bc.tasks, task)
return len(bc.tasks) >= bc.maxTasks
}
func (bc *bulkContainer) Execute(tasks interface{}) {
vals := tasks.([]interface{})
bc.execute(vals)
}
func (bc *bulkContainer) RemoveAll() interface{} {
tasks := bc.tasks
bc.tasks = nil
return tasks
}