|
|
@ -12,14 +12,14 @@ func TestChunkExecutor(t *testing.T) {
|
|
|
|
var values []int
|
|
|
|
var values []int
|
|
|
|
var lock sync.Mutex
|
|
|
|
var lock sync.Mutex
|
|
|
|
|
|
|
|
|
|
|
|
exeutor := NewChunkExecutor(func(items []interface{}) {
|
|
|
|
executor := NewChunkExecutor(func(items []interface{}) {
|
|
|
|
lock.Lock()
|
|
|
|
lock.Lock()
|
|
|
|
values = append(values, len(items))
|
|
|
|
values = append(values, len(items))
|
|
|
|
lock.Unlock()
|
|
|
|
lock.Unlock()
|
|
|
|
}, WithChunkBytes(10), WithFlushInterval(time.Minute))
|
|
|
|
}, WithChunkBytes(10), WithFlushInterval(time.Minute))
|
|
|
|
|
|
|
|
|
|
|
|
for i := 0; i < 50; i++ {
|
|
|
|
for i := 0; i < 50; i++ {
|
|
|
|
exeutor.Add(1, 1)
|
|
|
|
executor.Add(1, 1)
|
|
|
|
time.Sleep(time.Millisecond)
|
|
|
|
time.Sleep(time.Millisecond)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -40,13 +40,13 @@ func TestChunkExecutorFlushInterval(t *testing.T) {
|
|
|
|
var wait sync.WaitGroup
|
|
|
|
var wait sync.WaitGroup
|
|
|
|
|
|
|
|
|
|
|
|
wait.Add(1)
|
|
|
|
wait.Add(1)
|
|
|
|
exeutor := NewChunkExecutor(func(items []interface{}) {
|
|
|
|
executor := NewChunkExecutor(func(items []interface{}) {
|
|
|
|
assert.Equal(t, size, len(items))
|
|
|
|
assert.Equal(t, size, len(items))
|
|
|
|
wait.Done()
|
|
|
|
wait.Done()
|
|
|
|
}, WithChunkBytes(caches), WithFlushInterval(time.Millisecond*100))
|
|
|
|
}, WithChunkBytes(caches), WithFlushInterval(time.Millisecond*100))
|
|
|
|
|
|
|
|
|
|
|
|
for i := 0; i < size; i++ {
|
|
|
|
for i := 0; i < size; i++ {
|
|
|
|
exeutor.Add(1, 1)
|
|
|
|
executor.Add(1, 1)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
wait.Wait()
|
|
|
|
wait.Wait()
|
|
|
|