diff --git a/core/collection/timingwheel.go b/core/collection/timingwheel.go index 6ff06b7e..0763e6f6 100644 --- a/core/collection/timingwheel.go +++ b/core/collection/timingwheel.go @@ -69,10 +69,11 @@ func NewTimingWheel(interval time.Duration, numSlots int, execute Execute) (*Tim interval, numSlots, execute) } - return NewTimingWheelWithClock(interval, numSlots, execute, timex.NewTicker(interval)) + return NewTimingWheelWithTicker(interval, numSlots, execute, timex.NewTicker(interval)) } -func NewTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute, +// NewTimingWheelWithTicker returns a TimingWheel with the given ticker. +func NewTimingWheelWithTicker(interval time.Duration, numSlots int, execute Execute, ticker timex.Ticker) (*TimingWheel, error) { tw := &TimingWheel{ interval: interval, diff --git a/core/collection/timingwheel_test.go b/core/collection/timingwheel_test.go index 1bd60486..0ae83b08 100644 --- a/core/collection/timingwheel_test.go +++ b/core/collection/timingwheel_test.go @@ -26,7 +26,7 @@ func TestNewTimingWheel(t *testing.T) { func TestTimingWheel_Drain(t *testing.T) { ticker := timex.NewFakeTicker() - tw, _ := NewTimingWheelWithClock(testStep, 10, func(k, v interface{}) { + tw, _ := NewTimingWheelWithTicker(testStep, 10, func(k, v interface{}) { }, ticker) tw.SetTimer("first", 3, testStep*4) tw.SetTimer("second", 5, testStep*7) @@ -62,7 +62,7 @@ func TestTimingWheel_Drain(t *testing.T) { func TestTimingWheel_SetTimerSoon(t *testing.T) { run := syncx.NewAtomicBool() ticker := timex.NewFakeTicker() - tw, _ := NewTimingWheelWithClock(testStep, 10, func(k, v interface{}) { + tw, _ := NewTimingWheelWithTicker(testStep, 10, func(k, v interface{}) { assert.True(t, run.CompareAndSwap(false, true)) assert.Equal(t, "any", k) assert.Equal(t, 3, v.(int)) @@ -78,7 +78,7 @@ func TestTimingWheel_SetTimerSoon(t *testing.T) { func TestTimingWheel_SetTimerTwice(t *testing.T) { run := syncx.NewAtomicBool() ticker := timex.NewFakeTicker() - tw, _ := NewTimingWheelWithClock(testStep, 10, func(k, v interface{}) { + tw, _ := NewTimingWheelWithTicker(testStep, 10, func(k, v interface{}) { assert.True(t, run.CompareAndSwap(false, true)) assert.Equal(t, "any", k) assert.Equal(t, 5, v.(int)) @@ -96,7 +96,7 @@ func TestTimingWheel_SetTimerTwice(t *testing.T) { func TestTimingWheel_SetTimerWrongDelay(t *testing.T) { ticker := timex.NewFakeTicker() - tw, _ := NewTimingWheelWithClock(testStep, 10, func(k, v interface{}) {}, ticker) + tw, _ := NewTimingWheelWithTicker(testStep, 10, func(k, v interface{}) {}, ticker) defer tw.Stop() assert.NotPanics(t, func() { tw.SetTimer("any", 3, -testStep) @@ -105,7 +105,7 @@ func TestTimingWheel_SetTimerWrongDelay(t *testing.T) { func TestTimingWheel_SetTimerAfterClose(t *testing.T) { ticker := timex.NewFakeTicker() - tw, _ := NewTimingWheelWithClock(testStep, 10, func(k, v interface{}) {}, ticker) + tw, _ := NewTimingWheelWithTicker(testStep, 10, func(k, v interface{}) {}, ticker) tw.Stop() assert.Equal(t, ErrClosed, tw.SetTimer("any", 3, testStep)) } @@ -113,7 +113,7 @@ func TestTimingWheel_SetTimerAfterClose(t *testing.T) { func TestTimingWheel_MoveTimer(t *testing.T) { run := syncx.NewAtomicBool() ticker := timex.NewFakeTicker() - tw, _ := NewTimingWheelWithClock(testStep, 3, func(k, v interface{}) { + tw, _ := NewTimingWheelWithTicker(testStep, 3, func(k, v interface{}) { assert.True(t, run.CompareAndSwap(false, true)) assert.Equal(t, "any", k) assert.Equal(t, 3, v.(int)) @@ -139,7 +139,7 @@ func TestTimingWheel_MoveTimer(t *testing.T) { func TestTimingWheel_MoveTimerSoon(t *testing.T) { run := syncx.NewAtomicBool() ticker := timex.NewFakeTicker() - tw, _ := NewTimingWheelWithClock(testStep, 3, func(k, v interface{}) { + tw, _ := NewTimingWheelWithTicker(testStep, 3, func(k, v interface{}) { assert.True(t, run.CompareAndSwap(false, true)) assert.Equal(t, "any", k) assert.Equal(t, 3, v.(int)) @@ -155,7 +155,7 @@ func TestTimingWheel_MoveTimerSoon(t *testing.T) { func TestTimingWheel_MoveTimerEarlier(t *testing.T) { run := syncx.NewAtomicBool() ticker := timex.NewFakeTicker() - tw, _ := NewTimingWheelWithClock(testStep, 10, func(k, v interface{}) { + tw, _ := NewTimingWheelWithTicker(testStep, 10, func(k, v interface{}) { assert.True(t, run.CompareAndSwap(false, true)) assert.Equal(t, "any", k) assert.Equal(t, 3, v.(int)) @@ -173,7 +173,7 @@ func TestTimingWheel_MoveTimerEarlier(t *testing.T) { func TestTimingWheel_RemoveTimer(t *testing.T) { ticker := timex.NewFakeTicker() - tw, _ := NewTimingWheelWithClock(testStep, 10, func(k, v interface{}) {}, ticker) + tw, _ := NewTimingWheelWithTicker(testStep, 10, func(k, v interface{}) {}, ticker) tw.SetTimer("any", 3, testStep) assert.NotPanics(t, func() { tw.RemoveTimer("any") @@ -236,7 +236,7 @@ func TestTimingWheel_SetTimer(t *testing.T) { } var actual int32 done := make(chan lang.PlaceholderType) - tw, err := NewTimingWheelWithClock(testStep, test.slots, func(key, value interface{}) { + tw, err := NewTimingWheelWithTicker(testStep, test.slots, func(key, value interface{}) { assert.Equal(t, 1, key.(int)) assert.Equal(t, 2, value.(int)) actual = atomic.LoadInt32(&count) @@ -317,7 +317,7 @@ func TestTimingWheel_SetAndMoveThenStart(t *testing.T) { } var actual int32 done := make(chan lang.PlaceholderType) - tw, err := NewTimingWheelWithClock(testStep, test.slots, func(key, value interface{}) { + tw, err := NewTimingWheelWithTicker(testStep, test.slots, func(key, value interface{}) { actual = atomic.LoadInt32(&count) close(done) }, ticker) @@ -405,7 +405,7 @@ func TestTimingWheel_SetAndMoveTwice(t *testing.T) { } var actual int32 done := make(chan lang.PlaceholderType) - tw, err := NewTimingWheelWithClock(testStep, test.slots, func(key, value interface{}) { + tw, err := NewTimingWheelWithTicker(testStep, test.slots, func(key, value interface{}) { actual = atomic.LoadInt32(&count) close(done) }, ticker) @@ -486,7 +486,7 @@ func TestTimingWheel_ElapsedAndSet(t *testing.T) { } var actual int32 done := make(chan lang.PlaceholderType) - tw, err := NewTimingWheelWithClock(testStep, test.slots, func(key, value interface{}) { + tw, err := NewTimingWheelWithTicker(testStep, test.slots, func(key, value interface{}) { actual = atomic.LoadInt32(&count) close(done) }, ticker) @@ -577,7 +577,7 @@ func TestTimingWheel_ElapsedAndSetThenMove(t *testing.T) { } var actual int32 done := make(chan lang.PlaceholderType) - tw, err := NewTimingWheelWithClock(testStep, test.slots, func(key, value interface{}) { + tw, err := NewTimingWheelWithTicker(testStep, test.slots, func(key, value interface{}) { actual = atomic.LoadInt32(&count) close(done) }, ticker) @@ -612,7 +612,7 @@ func TestMoveAndRemoveTask(t *testing.T) { } } var keys []int - tw, _ := NewTimingWheelWithClock(testStep, 10, func(k, v interface{}) { + tw, _ := NewTimingWheelWithTicker(testStep, 10, func(k, v interface{}) { assert.Equal(t, "any", k) assert.Equal(t, 3, v.(int)) keys = append(keys, v.(int)) diff --git a/core/stores/cache/cachenode_test.go b/core/stores/cache/cachenode_test.go index bd6ee355..f6511011 100644 --- a/core/stores/cache/cachenode_test.go +++ b/core/stores/cache/cachenode_test.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "math/rand" + "runtime" "strconv" "sync" "testing" @@ -11,12 +12,14 @@ import ( "github.com/alicebob/miniredis/v2" "github.com/stretchr/testify/assert" + "github.com/zeromicro/go-zero/core/collection" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/mathx" "github.com/zeromicro/go-zero/core/stat" "github.com/zeromicro/go-zero/core/stores/redis" "github.com/zeromicro/go-zero/core/stores/redis/redistest" "github.com/zeromicro/go-zero/core/syncx" + "github.com/zeromicro/go-zero/core/timex" ) var errTestNotFound = errors.New("not found") @@ -27,27 +30,54 @@ func init() { } func TestCacheNode_DelCache(t *testing.T) { - store, clean, err := redistest.CreateRedis() - assert.Nil(t, err) - store.Type = redis.ClusterType - defer clean() + t.Run("del cache", func(t *testing.T) { + store, clean, err := redistest.CreateRedis() + assert.Nil(t, err) + store.Type = redis.ClusterType + defer clean() - cn := cacheNode{ - rds: store, - r: rand.New(rand.NewSource(time.Now().UnixNano())), - lock: new(sync.Mutex), - unstableExpiry: mathx.NewUnstable(expiryDeviation), - stat: NewStat("any"), - errNotFound: errTestNotFound, - } - assert.Nil(t, cn.Del()) - assert.Nil(t, cn.Del([]string{}...)) - assert.Nil(t, cn.Del(make([]string, 0)...)) - cn.Set("first", "one") - assert.Nil(t, cn.Del("first")) - cn.Set("first", "one") - cn.Set("second", "two") - assert.Nil(t, cn.Del("first", "second")) + cn := cacheNode{ + rds: store, + r: rand.New(rand.NewSource(time.Now().UnixNano())), + lock: new(sync.Mutex), + unstableExpiry: mathx.NewUnstable(expiryDeviation), + stat: NewStat("any"), + errNotFound: errTestNotFound, + } + assert.Nil(t, cn.Del()) + assert.Nil(t, cn.Del([]string{}...)) + assert.Nil(t, cn.Del(make([]string, 0)...)) + cn.Set("first", "one") + assert.Nil(t, cn.Del("first")) + cn.Set("first", "one") + cn.Set("second", "two") + assert.Nil(t, cn.Del("first", "second")) + }) + + t.Run("del cache with errors", func(t *testing.T) { + old := timingWheel + ticker := timex.NewFakeTicker() + var err error + timingWheel, err = collection.NewTimingWheelWithTicker( + time.Millisecond, timingWheelSlots, func(key, value interface{}) { + clean(key, value) + }, ticker) + assert.NoError(t, err) + t.Cleanup(func() { + timingWheel = old + }) + + r, err := miniredis.Run() + assert.NoError(t, err) + defer r.Close() + r.SetError("mock error") + + node := NewNode(redis.New(r.Addr(), redis.Cluster()), syncx.NewSingleFlight(), + NewStat("any"), errTestNotFound) + assert.NoError(t, node.Del("foo", "bar")) + ticker.Tick() + runtime.Gosched() + }) } func TestCacheNode_DelCacheWithErrors(t *testing.T) { @@ -125,6 +155,21 @@ func TestCacheNode_Take(t *testing.T) { assert.Equal(t, `"value"`, val) } +func TestCacheNode_TakeBadRedis(t *testing.T) { + r, err := miniredis.Run() + assert.NoError(t, err) + defer r.Close() + r.SetError("mock error") + + cn := NewNode(redis.New(r.Addr()), syncx.NewSingleFlight(), NewStat("any"), + errTestNotFound, WithExpiry(time.Second), WithNotFoundExpiry(time.Second)) + var str string + assert.Error(t, cn.Take(&str, "any", func(v interface{}) error { + *v.(*string) = "value" + return nil + })) +} + func TestCacheNode_TakeNotFound(t *testing.T) { store, clean, err := redistest.CreateRedis() assert.Nil(t, err) diff --git a/core/stores/cache/cachestat.go b/core/stores/cache/cachestat.go index 24eb26af..90df42e2 100644 --- a/core/stores/cache/cachestat.go +++ b/core/stores/cache/cachestat.go @@ -5,6 +5,7 @@ import ( "time" "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/core/timex" ) const statInterval = time.Minute @@ -25,7 +26,13 @@ func NewStat(name string) *Stat { ret := &Stat{ name: name, } - go ret.statLoop() + + go func() { + ticker := timex.NewTicker(statInterval) + defer ticker.Stop() + + ret.statLoop(ticker) + }() return ret } @@ -50,11 +57,8 @@ func (s *Stat) IncrementDbFails() { atomic.AddUint64(&s.DbFails, 1) } -func (s *Stat) statLoop() { - ticker := time.NewTicker(statInterval) - defer ticker.Stop() - - for range ticker.C { +func (s *Stat) statLoop(ticker timex.Ticker) { + for range ticker.Chan() { total := atomic.SwapUint64(&s.Total, 0) if total == 0 { continue diff --git a/core/stores/cache/cachestat_test.go b/core/stores/cache/cachestat_test.go new file mode 100644 index 00000000..44c7d3d5 --- /dev/null +++ b/core/stores/cache/cachestat_test.go @@ -0,0 +1,28 @@ +package cache + +import ( + "testing" + + "github.com/zeromicro/go-zero/core/timex" +) + +func TestCacheStat_statLoop(t *testing.T) { + t.Run("stat loop total 0", func(t *testing.T) { + var stat Stat + ticker := timex.NewFakeTicker() + go stat.statLoop(ticker) + ticker.Tick() + ticker.Tick() + ticker.Stop() + }) + + t.Run("stat loop total not 0", func(t *testing.T) { + var stat Stat + stat.IncrementTotal() + ticker := timex.NewFakeTicker() + go stat.statLoop(ticker) + ticker.Tick() + ticker.Tick() + ticker.Stop() + }) +}