From 2732d3cdae5bf35dc07e926d3b5ed35e3c506393 Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Sun, 13 Feb 2022 18:04:31 +0800 Subject: [PATCH] chore: refactor cache (#1532) --- core/stores/cache/cache.go | 62 ++++++++--------- core/stores/cache/cache_test.go | 118 ++++++++++++++++---------------- core/stores/cache/cachenode.go | 58 +++++++--------- 3 files changed, 116 insertions(+), 122 deletions(-) diff --git a/core/stores/cache/cache.go b/core/stores/cache/cache.go index dc491eb9..05346e83 100644 --- a/core/stores/cache/cache.go +++ b/core/stores/cache/cache.go @@ -20,31 +20,32 @@ type ( // DelCtx deletes cached values with keys. DelCtx(ctx context.Context, keys ...string) error // Get gets the cache with key and fills into v. - Get(key string, v interface{}) error + Get(key string, val interface{}) error // GetCtx gets the cache with key and fills into v. - GetCtx(ctx context.Context, key string, v interface{}) error + GetCtx(ctx context.Context, key string, val interface{}) error // IsNotFound checks if the given error is the defined errNotFound. IsNotFound(err error) bool // Set sets the cache with key and v, using c.expiry. - Set(key string, v interface{}) error + Set(key string, val interface{}) error // SetCtx sets the cache with key and v, using c.expiry. - SetCtx(ctx context.Context, key string, v interface{}) error + SetCtx(ctx context.Context, key string, val interface{}) error // SetWithExpire sets the cache with key and v, using given expire. - SetWithExpire(key string, v interface{}, expire time.Duration) error + SetWithExpire(key string, val interface{}, expire time.Duration) error // SetWithExpireCtx sets the cache with key and v, using given expire. - SetWithExpireCtx(ctx context.Context, key string, v interface{}, expire time.Duration) error + SetWithExpireCtx(ctx context.Context, key string, val interface{}, expire time.Duration) error // Take takes the result from cache first, if not found, // query from DB and set cache using c.expiry, then return the result. - Take(v interface{}, key string, query func(v interface{}) error) error + Take(val interface{}, key string, query func(val interface{}) error) error // TakeCtx takes the result from cache first, if not found, // query from DB and set cache using c.expiry, then return the result. - TakeCtx(ctx context.Context, v interface{}, key string, query func(v interface{}) error) error + TakeCtx(ctx context.Context, val interface{}, key string, query func(val interface{}) error) error // TakeWithExpire takes the result from cache first, if not found, // query from DB and set cache using given expire, then return the result. - TakeWithExpire(v interface{}, key string, query func(v interface{}, expire time.Duration) error) error + TakeWithExpire(val interface{}, key string, query func(val interface{}, expire time.Duration) error) error // TakeWithExpireCtx takes the result from cache first, if not found, // query from DB and set cache using given expire, then return the result. - TakeWithExpireCtx(ctx context.Context, v interface{}, key string, query func(v interface{}, expire time.Duration) error) error + TakeWithExpireCtx(ctx context.Context, val interface{}, key string, + query func(val interface{}, expire time.Duration) error) error } cacheCluster struct { @@ -117,18 +118,18 @@ func (cc cacheCluster) DelCtx(ctx context.Context, keys ...string) error { } // Get gets the cache with key and fills into v. -func (cc cacheCluster) Get(key string, v interface{}) error { - return cc.GetCtx(context.Background(), key, v) +func (cc cacheCluster) Get(key string, val interface{}) error { + return cc.GetCtx(context.Background(), key, val) } // GetCtx gets the cache with key and fills into v. -func (cc cacheCluster) GetCtx(ctx context.Context, key string, v interface{}) error { +func (cc cacheCluster) GetCtx(ctx context.Context, key string, val interface{}) error { c, ok := cc.dispatcher.Get(key) if !ok { return cc.errNotFound } - return c.(Cache).GetCtx(ctx, key, v) + return c.(Cache).GetCtx(ctx, key, val) } // IsNotFound checks if the given error is the defined errNotFound. @@ -137,66 +138,65 @@ func (cc cacheCluster) IsNotFound(err error) bool { } // Set sets the cache with key and v, using c.expiry. -func (cc cacheCluster) Set(key string, v interface{}) error { - return cc.SetCtx(context.Background(), key, v) +func (cc cacheCluster) Set(key string, val interface{}) error { + return cc.SetCtx(context.Background(), key, val) } // SetCtx sets the cache with key and v, using c.expiry. -func (cc cacheCluster) SetCtx(ctx context.Context, key string, v interface{}) error { +func (cc cacheCluster) SetCtx(ctx context.Context, key string, val interface{}) error { c, ok := cc.dispatcher.Get(key) if !ok { return cc.errNotFound } - return c.(Cache).SetCtx(ctx, key, v) + return c.(Cache).SetCtx(ctx, key, val) } // SetWithExpire sets the cache with key and v, using given expire. -func (cc cacheCluster) SetWithExpire(key string, v interface{}, expire time.Duration) error { - return cc.SetWithExpireCtx(context.Background(), key, v, expire) +func (cc cacheCluster) SetWithExpire(key string, val interface{}, expire time.Duration) error { + return cc.SetWithExpireCtx(context.Background(), key, val, expire) } // SetWithExpireCtx sets the cache with key and v, using given expire. -func (cc cacheCluster) SetWithExpireCtx(ctx context.Context, key string, v interface{}, expire time.Duration) error { +func (cc cacheCluster) SetWithExpireCtx(ctx context.Context, key string, val interface{}, expire time.Duration) error { c, ok := cc.dispatcher.Get(key) if !ok { return cc.errNotFound } - return c.(Cache).SetWithExpireCtx(ctx, key, v, expire) + return c.(Cache).SetWithExpireCtx(ctx, key, val, expire) } // Take takes the result from cache first, if not found, // query from DB and set cache using c.expiry, then return the result. -func (cc cacheCluster) Take(v interface{}, key string, query func(v interface{}) error) error { - return cc.TakeCtx(context.Background(), v, key, query) +func (cc cacheCluster) Take(val interface{}, key string, query func(val interface{}) error) error { + return cc.TakeCtx(context.Background(), val, key, query) } // TakeCtx takes the result from cache first, if not found, // query from DB and set cache using c.expiry, then return the result. -func (cc cacheCluster) TakeCtx(ctx context.Context, v interface{}, key string, query func(v interface{}) error) error { +func (cc cacheCluster) TakeCtx(ctx context.Context, val interface{}, key string, query func(val interface{}) error) error { c, ok := cc.dispatcher.Get(key) if !ok { return cc.errNotFound } - return c.(Cache).TakeCtx(ctx, v, key, query) + return c.(Cache).TakeCtx(ctx, val, key, query) } // TakeWithExpire takes the result from cache first, if not found, // query from DB and set cache using given expire, then return the result. -func (cc cacheCluster) TakeWithExpire(v interface{}, key string, - query func(v interface{}, expire time.Duration) error) error { - return cc.TakeWithExpireCtx(context.Background(), v, key, query) +func (cc cacheCluster) TakeWithExpire(val interface{}, key string, query func(val interface{}, expire time.Duration) error) error { + return cc.TakeWithExpireCtx(context.Background(), val, key, query) } // TakeWithExpireCtx takes the result from cache first, if not found, // query from DB and set cache using given expire, then return the result. -func (cc cacheCluster) TakeWithExpireCtx(ctx context.Context, v interface{}, key string, query func(v interface{}, expire time.Duration) error) error { +func (cc cacheCluster) TakeWithExpireCtx(ctx context.Context, val interface{}, key string, query func(val interface{}, expire time.Duration) error) error { c, ok := cc.dispatcher.Get(key) if !ok { return cc.errNotFound } - return c.(Cache).TakeWithExpireCtx(ctx, v, key, query) + return c.(Cache).TakeWithExpireCtx(ctx, val, key, query) } diff --git a/core/stores/cache/cache_test.go b/core/stores/cache/cache_test.go index f819126e..2e12734a 100644 --- a/core/stores/cache/cache_test.go +++ b/core/stores/cache/cache_test.go @@ -26,6 +26,10 @@ type mockedNode struct { } func (mc *mockedNode) Del(keys ...string) error { + return mc.DelCtx(context.Background(), keys...) +} + +func (mc *mockedNode) DelCtx(_ context.Context, keys ...string) error { var be errorx.BatchError for _, key := range keys { @@ -39,10 +43,14 @@ func (mc *mockedNode) Del(keys ...string) error { return be.Err() } -func (mc *mockedNode) Get(key string, v interface{}) error { +func (mc *mockedNode) Get(key string, val interface{}) error { + return mc.GetCtx(context.Background(), key, val) +} + +func (mc *mockedNode) GetCtx(ctx context.Context, key string, val interface{}) error { bs, ok := mc.vals[key] if ok { - return json.Unmarshal(bs, v) + return json.Unmarshal(bs, val) } return mc.errNotFound @@ -52,8 +60,12 @@ func (mc *mockedNode) IsNotFound(err error) bool { return errors.Is(err, mc.errNotFound) } -func (mc *mockedNode) Set(key string, v interface{}) error { - data, err := json.Marshal(v) +func (mc *mockedNode) Set(key string, val interface{}) error { + return mc.SetCtx(context.Background(), key, val) +} + +func (mc *mockedNode) SetCtx(ctx context.Context, key string, val interface{}) error { + data, err := json.Marshal(val) if err != nil { return err } @@ -62,50 +74,38 @@ func (mc *mockedNode) Set(key string, v interface{}) error { return nil } -func (mc *mockedNode) SetWithExpire(key string, v interface{}, _ time.Duration) error { - return mc.Set(key, v) -} - -func (mc *mockedNode) Take(v interface{}, key string, query func(v interface{}) error) error { - if _, ok := mc.vals[key]; ok { - return mc.Get(key, v) - } - - if err := query(v); err != nil { - return err - } - - return mc.Set(key, v) +func (mc *mockedNode) SetWithExpire(key string, val interface{}, expire time.Duration) error { + return mc.SetWithExpireCtx(context.Background(), key, val, expire) } -func (mc *mockedNode) TakeWithExpire(v interface{}, key string, query func(v interface{}, expire time.Duration) error) error { - return mc.Take(v, key, func(v interface{}) error { - return query(v, 0) - }) +func (mc *mockedNode) SetWithExpireCtx(ctx context.Context, key string, val interface{}, expire time.Duration) error { + return mc.Set(key, val) } -func (mc *mockedNode) DelCtx(_ context.Context, keys ...string) error { - return mc.Del(keys...) +func (mc *mockedNode) Take(val interface{}, key string, query func(val interface{}) error) error { + return mc.TakeCtx(context.Background(), val, key, query) } -func (mc *mockedNode) GetCtx(_ context.Context, key string, v interface{}) error { - return mc.Get(key, v) -} +func (mc *mockedNode) TakeCtx(ctx context.Context, val interface{}, key string, query func(val interface{}) error) error { + if _, ok := mc.vals[key]; ok { + return mc.GetCtx(ctx, key, val) + } -func (mc *mockedNode) SetCtx(_ context.Context, key string, v interface{}) error { - return mc.Set(key, v) -} + if err := query(val); err != nil { + return err + } -func (mc *mockedNode) SetWithExpireCtx(_ context.Context, key string, v interface{}, expire time.Duration) error { - return mc.SetWithExpire(key, v, expire) + return mc.SetCtx(ctx, key, val) } -func (mc *mockedNode) TakeCtx(_ context.Context, v interface{}, key string, query func(v interface{}) error) error { - return mc.Take(v, key, query) +func (mc *mockedNode) TakeWithExpire(val interface{}, key string, query func(val interface{}, expire time.Duration) error) error { + return mc.TakeWithExpireCtx(context.Background(), val, key, query) } -func (mc *mockedNode) TakeWithExpireCtx(_ context.Context, v interface{}, key string, query func(v interface{}, expire time.Duration) error) error { - return mc.TakeWithExpire(v, key, query) +func (mc *mockedNode) TakeWithExpireCtx(ctx context.Context, val interface{}, key string, query func(val interface{}, expire time.Duration) error) error { + return mc.Take(val, key, func(val interface{}) error { + return query(val, 0) + }) } func TestCache_SetDel(t *testing.T) { @@ -141,18 +141,18 @@ func TestCache_SetDel(t *testing.T) { } } for i := 0; i < total; i++ { - var v int - assert.Nil(t, c.Get(fmt.Sprintf("key/%d", i), &v)) - assert.Equal(t, i, v) + var val int + assert.Nil(t, c.Get(fmt.Sprintf("key/%d", i), &val)) + assert.Equal(t, i, val) } assert.Nil(t, c.Del()) for i := 0; i < total; i++ { assert.Nil(t, c.Del(fmt.Sprintf("key/%d", i))) } for i := 0; i < total; i++ { - var v int - assert.True(t, c.IsNotFound(c.Get(fmt.Sprintf("key/%d", i), &v))) - assert.Equal(t, 0, v) + var val int + assert.True(t, c.IsNotFound(c.Get(fmt.Sprintf("key/%d", i), &val))) + assert.Equal(t, 0, val) } } @@ -179,18 +179,18 @@ func TestCache_OneNode(t *testing.T) { } } for i := 0; i < total; i++ { - var v int - assert.Nil(t, c.Get(fmt.Sprintf("key/%d", i), &v)) - assert.Equal(t, i, v) + var val int + assert.Nil(t, c.Get(fmt.Sprintf("key/%d", i), &val)) + assert.Equal(t, i, val) } assert.Nil(t, c.Del()) for i := 0; i < total; i++ { assert.Nil(t, c.Del(fmt.Sprintf("key/%d", i))) } for i := 0; i < total; i++ { - var v int - assert.True(t, c.IsNotFound(c.Get(fmt.Sprintf("key/%d", i), &v))) - assert.Equal(t, 0, v) + var val int + assert.True(t, c.IsNotFound(c.Get(fmt.Sprintf("key/%d", i), &val))) + assert.Equal(t, 0, val) } } @@ -230,9 +230,9 @@ func TestCache_Balance(t *testing.T) { assert.True(t, entropy > .95, fmt.Sprintf("entropy should be greater than 0.95, but got %.2f", entropy)) for i := 0; i < total; i++ { - var v int - assert.Nil(t, c.Get(strconv.Itoa(i), &v)) - assert.Equal(t, i, v) + var val int + assert.Nil(t, c.Get(strconv.Itoa(i), &val)) + assert.Equal(t, i, val) } for i := 0; i < total/10; i++ { @@ -244,14 +244,14 @@ func TestCache_Balance(t *testing.T) { for i := 0; i < total/10; i++ { var val int if i%2 == 0 { - assert.Nil(t, c.Take(&val, strconv.Itoa(i*10), func(v interface{}) error { - *v.(*int) = i + assert.Nil(t, c.Take(&val, strconv.Itoa(i*10), func(val interface{}) error { + *val.(*int) = i count++ return nil })) } else { - assert.Nil(t, c.TakeWithExpire(&val, strconv.Itoa(i*10), func(v interface{}, expire time.Duration) error { - *v.(*int) = i + assert.Nil(t, c.TakeWithExpire(&val, strconv.Itoa(i*10), func(val interface{}, expire time.Duration) error { + *val.(*int) = i count++ return nil })) @@ -272,10 +272,10 @@ func TestCacheNoNode(t *testing.T) { assert.NotNil(t, c.Get("foo", nil)) assert.NotNil(t, c.Set("foo", nil)) assert.NotNil(t, c.SetWithExpire("foo", nil, time.Second)) - assert.NotNil(t, c.Take(nil, "foo", func(v interface{}) error { + assert.NotNil(t, c.Take(nil, "foo", func(val interface{}) error { return nil })) - assert.NotNil(t, c.TakeWithExpire(nil, "foo", func(v interface{}, duration time.Duration) error { + assert.NotNil(t, c.TakeWithExpire(nil, "foo", func(val interface{}, duration time.Duration) error { return nil })) } @@ -283,8 +283,8 @@ func TestCacheNoNode(t *testing.T) { func calcEntropy(m map[int]int, total int) float64 { var entropy float64 - for _, v := range m { - proba := float64(v) / float64(total) + for _, val := range m { + proba := float64(val) / float64(total) entropy -= proba * math.Log2(proba) } diff --git a/core/stores/cache/cachenode.go b/core/stores/cache/cachenode.go index 55a7fc27..adb93938 100644 --- a/core/stores/cache/cachenode.go +++ b/core/stores/cache/cachenode.go @@ -72,7 +72,6 @@ func (c cacheNode) DelCtx(ctx context.Context, keys ...string) error { } logger := logx.WithContext(ctx) - if len(keys) > 1 && c.rds.Type == redis.ClusterType { for _, key := range keys { if _, err := c.rds.DelCtx(ctx, key); err != nil { @@ -80,24 +79,22 @@ func (c cacheNode) DelCtx(ctx context.Context, keys ...string) error { c.asyncRetryDelCache(key) } } - } else { - if _, err := c.rds.DelCtx(ctx, keys...); err != nil { - logger.Errorf("failed to clear cache with keys: %q, error: %v", formatKeys(keys), err) - c.asyncRetryDelCache(keys...) - } + } else if _, err := c.rds.DelCtx(ctx, keys...); err != nil { + logger.Errorf("failed to clear cache with keys: %q, error: %v", formatKeys(keys), err) + c.asyncRetryDelCache(keys...) } return nil } // Get gets the cache with key and fills into v. -func (c cacheNode) Get(key string, v interface{}) error { - return c.GetCtx(context.Background(), key, v) +func (c cacheNode) Get(key string, val interface{}) error { + return c.GetCtx(context.Background(), key, val) } // GetCtx gets the cache with key and fills into v. -func (c cacheNode) GetCtx(ctx context.Context, key string, v interface{}) error { - err := c.doGetCache(ctx, key, v) +func (c cacheNode) GetCtx(ctx context.Context, key string, val interface{}) error { + err := c.doGetCache(ctx, key, val) if err == errPlaceholder { return c.errNotFound } @@ -111,23 +108,23 @@ func (c cacheNode) IsNotFound(err error) bool { } // Set sets the cache with key and v, using c.expiry. -func (c cacheNode) Set(key string, v interface{}) error { - return c.SetCtx(context.Background(), key, v) +func (c cacheNode) Set(key string, val interface{}) error { + return c.SetCtx(context.Background(), key, val) } // SetCtx sets the cache with key and v, using c.expiry. -func (c cacheNode) SetCtx(ctx context.Context, key string, v interface{}) error { - return c.SetWithExpireCtx(ctx, key, v, c.aroundDuration(c.expiry)) +func (c cacheNode) SetCtx(ctx context.Context, key string, val interface{}) error { + return c.SetWithExpireCtx(ctx, key, val, c.aroundDuration(c.expiry)) } // SetWithExpire sets the cache with key and v, using given expire. -func (c cacheNode) SetWithExpire(key string, v interface{}, expire time.Duration) error { - return c.SetWithExpireCtx(context.Background(), key, v, expire) +func (c cacheNode) SetWithExpire(key string, val interface{}, expire time.Duration) error { + return c.SetWithExpireCtx(context.Background(), key, val, expire) } // SetWithExpireCtx sets the cache with key and v, using given expire. -func (c cacheNode) SetWithExpireCtx(ctx context.Context, key string, v interface{}, expire time.Duration) error { - data, err := jsonx.Marshal(v) +func (c cacheNode) SetWithExpireCtx(ctx context.Context, key string, val interface{}, expire time.Duration) error { + data, err := jsonx.Marshal(val) if err != nil { return err } @@ -142,30 +139,29 @@ func (c cacheNode) String() string { // Take takes the result from cache first, if not found, // query from DB and set cache using c.expiry, then return the result. -func (c cacheNode) Take(v interface{}, key string, query func(v interface{}) error) error { - return c.TakeCtx(context.Background(), v, key, query) +func (c cacheNode) Take(val interface{}, key string, query func(val interface{}) error) error { + return c.TakeCtx(context.Background(), val, key, query) } // TakeCtx takes the result from cache first, if not found, // query from DB and set cache using c.expiry, then return the result. -func (c cacheNode) TakeCtx(ctx context.Context, v interface{}, key string, query func(v interface{}) error) error { - return c.doTake(ctx, v, key, query, func(v interface{}) error { +func (c cacheNode) TakeCtx(ctx context.Context, val interface{}, key string, query func(val interface{}) error) error { + return c.doTake(ctx, val, key, query, func(v interface{}) error { return c.SetCtx(ctx, key, v) }) } // TakeWithExpire takes the result from cache first, if not found, // query from DB and set cache using given expire, then return the result. -func (c cacheNode) TakeWithExpire(v interface{}, key string, query func(v interface{}, - expire time.Duration) error) error { - return c.TakeWithExpireCtx(context.Background(), v, key, query) +func (c cacheNode) TakeWithExpire(val interface{}, key string, query func(val interface{}, expire time.Duration) error) error { + return c.TakeWithExpireCtx(context.Background(), val, key, query) } // TakeWithExpireCtx takes the result from cache first, if not found, // query from DB and set cache using given expire, then return the result. -func (c cacheNode) TakeWithExpireCtx(ctx context.Context, v interface{}, key string, query func(v interface{}, expire time.Duration) error) error { +func (c cacheNode) TakeWithExpireCtx(ctx context.Context, val interface{}, key string, query func(val interface{}, expire time.Duration) error) error { expire := c.aroundDuration(c.expiry) - return c.doTake(ctx, v, key, func(v interface{}) error { + return c.doTake(ctx, val, key, func(v interface{}) error { return query(v, expire) }, func(v interface{}) error { return c.SetWithExpireCtx(ctx, key, v, expire) @@ -204,10 +200,9 @@ func (c cacheNode) doGetCache(ctx context.Context, key string, v interface{}) er return c.processCache(ctx, key, data, v) } -func (c cacheNode) doTake(ctx context.Context, v interface{}, key string, query func(v interface{}) error, - cacheVal func(v interface{}) error) error { +func (c cacheNode) doTake(ctx context.Context, v interface{}, key string, + query func(v interface{}) error, cacheVal func(v interface{}) error) error { logger := logx.WithContext(ctx) - val, fresh, err := c.barrier.DoEx(key, func() (interface{}, error) { if err := c.doGetCache(ctx, key, v); err != nil { if err == errPlaceholder { @@ -252,8 +247,6 @@ func (c cacheNode) doTake(ctx context.Context, v interface{}, key string, query } func (c cacheNode) processCache(ctx context.Context, key, data string, v interface{}) error { - logger := logx.WithContext(ctx) - err := jsonx.Unmarshal([]byte(data), v) if err == nil { return nil @@ -261,6 +254,7 @@ func (c cacheNode) processCache(ctx context.Context, key, data string, v interfa report := fmt.Sprintf("unmarshal cache, node: %s, key: %s, value: %s, error: %v", c.rds.Addr, key, data, err) + logger := logx.WithContext(ctx) logger.Error(report) stat.Report(report) if _, e := c.rds.DelCtx(ctx, key); e != nil {