You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
94 lines
1.6 KiB
Go
94 lines
1.6 KiB
Go
4 years ago
|
package executors
|
||
|
|
||
|
import (
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
const defaultBulkTasks = 1000
|
||
|
|
||
|
type (
|
||
|
BulkOption func(options *bulkOptions)
|
||
|
|
||
|
BulkExecutor struct {
|
||
|
executor *PeriodicalExecutor
|
||
|
container *bulkContainer
|
||
|
}
|
||
|
|
||
|
bulkOptions struct {
|
||
|
cachedTasks int
|
||
|
flushInterval time.Duration
|
||
|
}
|
||
|
)
|
||
|
|
||
|
func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor {
|
||
|
options := newBulkOptions()
|
||
|
for _, opt := range opts {
|
||
|
opt(&options)
|
||
|
}
|
||
|
|
||
|
container := &bulkContainer{
|
||
|
execute: execute,
|
||
|
maxTasks: options.cachedTasks,
|
||
|
}
|
||
|
executor := &BulkExecutor{
|
||
|
executor: NewPeriodicalExecutor(options.flushInterval, container),
|
||
|
container: container,
|
||
|
}
|
||
|
|
||
|
return executor
|
||
|
}
|
||
|
|
||
|
func (be *BulkExecutor) Add(task interface{}) error {
|
||
|
be.executor.Add(task)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (be *BulkExecutor) Flush() {
|
||
|
be.executor.Flush()
|
||
|
}
|
||
|
|
||
|
func (be *BulkExecutor) Wait() {
|
||
|
be.executor.Wait()
|
||
|
}
|
||
|
|
||
|
func WithBulkTasks(tasks int) BulkOption {
|
||
|
return func(options *bulkOptions) {
|
||
|
options.cachedTasks = tasks
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func WithBulkInterval(duration time.Duration) BulkOption {
|
||
|
return func(options *bulkOptions) {
|
||
|
options.flushInterval = duration
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func newBulkOptions() bulkOptions {
|
||
|
return bulkOptions{
|
||
|
cachedTasks: defaultBulkTasks,
|
||
|
flushInterval: defaultFlushInterval,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type bulkContainer struct {
|
||
|
tasks []interface{}
|
||
|
execute Execute
|
||
|
maxTasks int
|
||
|
}
|
||
|
|
||
|
func (bc *bulkContainer) AddTask(task interface{}) bool {
|
||
|
bc.tasks = append(bc.tasks, task)
|
||
|
return len(bc.tasks) >= bc.maxTasks
|
||
|
}
|
||
|
|
||
|
func (bc *bulkContainer) Execute(tasks interface{}) {
|
||
|
vals := tasks.([]interface{})
|
||
|
bc.execute(vals)
|
||
|
}
|
||
|
|
||
|
func (bc *bulkContainer) RemoveAll() interface{} {
|
||
|
tasks := bc.tasks
|
||
|
bc.tasks = nil
|
||
|
return tasks
|
||
|
}
|