From 771371e051a9591e9dd7a1ba78d0dfb16d86bf40 Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Sun, 3 Jan 2021 20:47:29 +0800 Subject: [PATCH] simplify rolling window code, and make tests run faster (#343) --- core/collection/rollingwindow.go | 4 +-- core/collection/rollingwindow_test.go | 51 ++++++++++++++------------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/core/collection/rollingwindow.go b/core/collection/rollingwindow.go index 51b0d87c..189469c3 100644 --- a/core/collection/rollingwindow.go +++ b/core/collection/rollingwindow.go @@ -17,7 +17,7 @@ type ( interval time.Duration offset int ignoreCurrent bool - lastTime time.Duration + lastTime time.Duration // start time of the last bucket } ) @@ -96,7 +96,7 @@ func (rw *RollingWindow) updateOffset() { } rw.offset = (offset + span) % rw.size - rw.lastTime = time.Duration(int(rw.lastTime) + int(rw.interval)*span) + rw.lastTime = rw.lastTime + rw.interval*time.Duration(span) } type Bucket struct { diff --git a/core/collection/rollingwindow_test.go b/core/collection/rollingwindow_test.go index b4e05b14..bbdc7821 100644 --- a/core/collection/rollingwindow_test.go +++ b/core/collection/rollingwindow_test.go @@ -45,31 +45,6 @@ func TestRollingWindowAdd(t *testing.T) { assert.Equal(t, []float64{5, 15, 7}, listBuckets()) } -func TestRollingWindowAdd2(t *testing.T) { - const size = 3 - interval := time.Millisecond * 50 - r := NewRollingWindow(size, interval) - listBuckets := func() []float64 { - var buckets []float64 - r.Reduce(func(b *Bucket) { - buckets = append(buckets, b.Sum) - }) - return buckets - } - assert.Equal(t, []float64{0, 0, 0}, listBuckets()) - r.Add(1) - assert.Equal(t, []float64{0, 0, 1}, listBuckets()) - time.Sleep(time.Millisecond * 90) - r.Add(2) - r.Add(3) - assert.Equal(t, []float64{0, 1, 5}, listBuckets()) - time.Sleep(time.Millisecond * 20) - r.Add(4) - r.Add(5) - r.Add(6) - assert.Equal(t, []float64{1, 5, 15}, listBuckets()) -} - func TestRollingWindowReset(t *testing.T) { const size = 3 r := NewRollingWindow(size, duration, IgnoreCurrentBucket()) @@ -130,6 +105,32 @@ func TestRollingWindowReduce(t *testing.T) { } } +func TestRollingWindowBucketTimeBoundary(t *testing.T) { + const size = 3 + interval := time.Millisecond * 30 + r := NewRollingWindow(size, interval) + listBuckets := func() []float64 { + var buckets []float64 + r.Reduce(func(b *Bucket) { + buckets = append(buckets, b.Sum) + }) + return buckets + } + assert.Equal(t, []float64{0, 0, 0}, listBuckets()) + r.Add(1) + assert.Equal(t, []float64{0, 0, 1}, listBuckets()) + time.Sleep(time.Millisecond * 45) + r.Add(2) + r.Add(3) + assert.Equal(t, []float64{0, 1, 5}, listBuckets()) + // sleep time should be less than interval, and make the bucket change happen + time.Sleep(time.Millisecond * 20) + r.Add(4) + r.Add(5) + r.Add(6) + assert.Equal(t, []float64{1, 5, 15}, listBuckets()) +} + func TestRollingWindowDataRace(t *testing.T) { const size = 3 r := NewRollingWindow(size, duration)