chore: refactor cache (#1532)

master
Kevin Wan 3 years ago committed by GitHub
parent e8c307e4dc
commit 2732d3cdae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -20,31 +20,32 @@ type (
// DelCtx deletes cached values with keys. // DelCtx deletes cached values with keys.
DelCtx(ctx context.Context, keys ...string) error DelCtx(ctx context.Context, keys ...string) error
// Get gets the cache with key and fills into v. // Get gets the cache with key and fills into v.
Get(key string, v interface{}) error Get(key string, val interface{}) error
// GetCtx gets the cache with key and fills into v. // GetCtx gets the cache with key and fills into v.
GetCtx(ctx context.Context, key string, v interface{}) error GetCtx(ctx context.Context, key string, val interface{}) error
// IsNotFound checks if the given error is the defined errNotFound. // IsNotFound checks if the given error is the defined errNotFound.
IsNotFound(err error) bool IsNotFound(err error) bool
// Set sets the cache with key and v, using c.expiry. // Set sets the cache with key and v, using c.expiry.
Set(key string, v interface{}) error Set(key string, val interface{}) error
// SetCtx sets the cache with key and v, using c.expiry. // SetCtx sets the cache with key and v, using c.expiry.
SetCtx(ctx context.Context, key string, v interface{}) error SetCtx(ctx context.Context, key string, val interface{}) error
// SetWithExpire sets the cache with key and v, using given expire. // SetWithExpire sets the cache with key and v, using given expire.
SetWithExpire(key string, v interface{}, expire time.Duration) error SetWithExpire(key string, val interface{}, expire time.Duration) error
// SetWithExpireCtx sets the cache with key and v, using given expire. // SetWithExpireCtx sets the cache with key and v, using given expire.
SetWithExpireCtx(ctx context.Context, key string, v interface{}, expire time.Duration) error SetWithExpireCtx(ctx context.Context, key string, val interface{}, expire time.Duration) error
// Take takes the result from cache first, if not found, // Take takes the result from cache first, if not found,
// query from DB and set cache using c.expiry, then return the result. // query from DB and set cache using c.expiry, then return the result.
Take(v interface{}, key string, query func(v interface{}) error) error Take(val interface{}, key string, query func(val interface{}) error) error
// TakeCtx takes the result from cache first, if not found, // TakeCtx takes the result from cache first, if not found,
// query from DB and set cache using c.expiry, then return the result. // query from DB and set cache using c.expiry, then return the result.
TakeCtx(ctx context.Context, v interface{}, key string, query func(v interface{}) error) error TakeCtx(ctx context.Context, val interface{}, key string, query func(val interface{}) error) error
// TakeWithExpire takes the result from cache first, if not found, // TakeWithExpire takes the result from cache first, if not found,
// query from DB and set cache using given expire, then return the result. // query from DB and set cache using given expire, then return the result.
TakeWithExpire(v interface{}, key string, query func(v interface{}, expire time.Duration) error) error TakeWithExpire(val interface{}, key string, query func(val interface{}, expire time.Duration) error) error
// TakeWithExpireCtx takes the result from cache first, if not found, // TakeWithExpireCtx takes the result from cache first, if not found,
// query from DB and set cache using given expire, then return the result. // query from DB and set cache using given expire, then return the result.
TakeWithExpireCtx(ctx context.Context, v interface{}, key string, query func(v interface{}, expire time.Duration) error) error TakeWithExpireCtx(ctx context.Context, val interface{}, key string,
query func(val interface{}, expire time.Duration) error) error
} }
cacheCluster struct { cacheCluster struct {
@ -117,18 +118,18 @@ func (cc cacheCluster) DelCtx(ctx context.Context, keys ...string) error {
} }
// Get gets the cache with key and fills into v. // Get gets the cache with key and fills into v.
func (cc cacheCluster) Get(key string, v interface{}) error { func (cc cacheCluster) Get(key string, val interface{}) error {
return cc.GetCtx(context.Background(), key, v) return cc.GetCtx(context.Background(), key, val)
} }
// GetCtx gets the cache with key and fills into v. // GetCtx gets the cache with key and fills into v.
func (cc cacheCluster) GetCtx(ctx context.Context, key string, v interface{}) error { func (cc cacheCluster) GetCtx(ctx context.Context, key string, val interface{}) error {
c, ok := cc.dispatcher.Get(key) c, ok := cc.dispatcher.Get(key)
if !ok { if !ok {
return cc.errNotFound return cc.errNotFound
} }
return c.(Cache).GetCtx(ctx, key, v) return c.(Cache).GetCtx(ctx, key, val)
} }
// IsNotFound checks if the given error is the defined errNotFound. // IsNotFound checks if the given error is the defined errNotFound.
@ -137,66 +138,65 @@ func (cc cacheCluster) IsNotFound(err error) bool {
} }
// Set sets the cache with key and v, using c.expiry. // Set sets the cache with key and v, using c.expiry.
func (cc cacheCluster) Set(key string, v interface{}) error { func (cc cacheCluster) Set(key string, val interface{}) error {
return cc.SetCtx(context.Background(), key, v) return cc.SetCtx(context.Background(), key, val)
} }
// SetCtx sets the cache with key and v, using c.expiry. // SetCtx sets the cache with key and v, using c.expiry.
func (cc cacheCluster) SetCtx(ctx context.Context, key string, v interface{}) error { func (cc cacheCluster) SetCtx(ctx context.Context, key string, val interface{}) error {
c, ok := cc.dispatcher.Get(key) c, ok := cc.dispatcher.Get(key)
if !ok { if !ok {
return cc.errNotFound return cc.errNotFound
} }
return c.(Cache).SetCtx(ctx, key, v) return c.(Cache).SetCtx(ctx, key, val)
} }
// SetWithExpire sets the cache with key and v, using given expire. // SetWithExpire sets the cache with key and v, using given expire.
func (cc cacheCluster) SetWithExpire(key string, v interface{}, expire time.Duration) error { func (cc cacheCluster) SetWithExpire(key string, val interface{}, expire time.Duration) error {
return cc.SetWithExpireCtx(context.Background(), key, v, expire) return cc.SetWithExpireCtx(context.Background(), key, val, expire)
} }
// SetWithExpireCtx sets the cache with key and v, using given expire. // SetWithExpireCtx sets the cache with key and v, using given expire.
func (cc cacheCluster) SetWithExpireCtx(ctx context.Context, key string, v interface{}, expire time.Duration) error { func (cc cacheCluster) SetWithExpireCtx(ctx context.Context, key string, val interface{}, expire time.Duration) error {
c, ok := cc.dispatcher.Get(key) c, ok := cc.dispatcher.Get(key)
if !ok { if !ok {
return cc.errNotFound return cc.errNotFound
} }
return c.(Cache).SetWithExpireCtx(ctx, key, v, expire) return c.(Cache).SetWithExpireCtx(ctx, key, val, expire)
} }
// Take takes the result from cache first, if not found, // Take takes the result from cache first, if not found,
// query from DB and set cache using c.expiry, then return the result. // query from DB and set cache using c.expiry, then return the result.
func (cc cacheCluster) Take(v interface{}, key string, query func(v interface{}) error) error { func (cc cacheCluster) Take(val interface{}, key string, query func(val interface{}) error) error {
return cc.TakeCtx(context.Background(), v, key, query) return cc.TakeCtx(context.Background(), val, key, query)
} }
// TakeCtx takes the result from cache first, if not found, // TakeCtx takes the result from cache first, if not found,
// query from DB and set cache using c.expiry, then return the result. // query from DB and set cache using c.expiry, then return the result.
func (cc cacheCluster) TakeCtx(ctx context.Context, v interface{}, key string, query func(v interface{}) error) error { func (cc cacheCluster) TakeCtx(ctx context.Context, val interface{}, key string, query func(val interface{}) error) error {
c, ok := cc.dispatcher.Get(key) c, ok := cc.dispatcher.Get(key)
if !ok { if !ok {
return cc.errNotFound return cc.errNotFound
} }
return c.(Cache).TakeCtx(ctx, v, key, query) return c.(Cache).TakeCtx(ctx, val, key, query)
} }
// TakeWithExpire takes the result from cache first, if not found, // TakeWithExpire takes the result from cache first, if not found,
// query from DB and set cache using given expire, then return the result. // query from DB and set cache using given expire, then return the result.
func (cc cacheCluster) TakeWithExpire(v interface{}, key string, func (cc cacheCluster) TakeWithExpire(val interface{}, key string, query func(val interface{}, expire time.Duration) error) error {
query func(v interface{}, expire time.Duration) error) error { return cc.TakeWithExpireCtx(context.Background(), val, key, query)
return cc.TakeWithExpireCtx(context.Background(), v, key, query)
} }
// TakeWithExpireCtx takes the result from cache first, if not found, // TakeWithExpireCtx takes the result from cache first, if not found,
// query from DB and set cache using given expire, then return the result. // query from DB and set cache using given expire, then return the result.
func (cc cacheCluster) TakeWithExpireCtx(ctx context.Context, v interface{}, key string, query func(v interface{}, expire time.Duration) error) error { func (cc cacheCluster) TakeWithExpireCtx(ctx context.Context, val interface{}, key string, query func(val interface{}, expire time.Duration) error) error {
c, ok := cc.dispatcher.Get(key) c, ok := cc.dispatcher.Get(key)
if !ok { if !ok {
return cc.errNotFound return cc.errNotFound
} }
return c.(Cache).TakeWithExpireCtx(ctx, v, key, query) return c.(Cache).TakeWithExpireCtx(ctx, val, key, query)
} }

@ -26,6 +26,10 @@ type mockedNode struct {
} }
func (mc *mockedNode) Del(keys ...string) error { func (mc *mockedNode) Del(keys ...string) error {
return mc.DelCtx(context.Background(), keys...)
}
func (mc *mockedNode) DelCtx(_ context.Context, keys ...string) error {
var be errorx.BatchError var be errorx.BatchError
for _, key := range keys { for _, key := range keys {
@ -39,10 +43,14 @@ func (mc *mockedNode) Del(keys ...string) error {
return be.Err() return be.Err()
} }
func (mc *mockedNode) Get(key string, v interface{}) error { func (mc *mockedNode) Get(key string, val interface{}) error {
return mc.GetCtx(context.Background(), key, val)
}
func (mc *mockedNode) GetCtx(ctx context.Context, key string, val interface{}) error {
bs, ok := mc.vals[key] bs, ok := mc.vals[key]
if ok { if ok {
return json.Unmarshal(bs, v) return json.Unmarshal(bs, val)
} }
return mc.errNotFound return mc.errNotFound
@ -52,8 +60,12 @@ func (mc *mockedNode) IsNotFound(err error) bool {
return errors.Is(err, mc.errNotFound) return errors.Is(err, mc.errNotFound)
} }
func (mc *mockedNode) Set(key string, v interface{}) error { func (mc *mockedNode) Set(key string, val interface{}) error {
data, err := json.Marshal(v) return mc.SetCtx(context.Background(), key, val)
}
func (mc *mockedNode) SetCtx(ctx context.Context, key string, val interface{}) error {
data, err := json.Marshal(val)
if err != nil { if err != nil {
return err return err
} }
@ -62,50 +74,38 @@ func (mc *mockedNode) Set(key string, v interface{}) error {
return nil return nil
} }
func (mc *mockedNode) SetWithExpire(key string, v interface{}, _ time.Duration) error { func (mc *mockedNode) SetWithExpire(key string, val interface{}, expire time.Duration) error {
return mc.Set(key, v) return mc.SetWithExpireCtx(context.Background(), key, val, expire)
}
func (mc *mockedNode) Take(v interface{}, key string, query func(v interface{}) error) error {
if _, ok := mc.vals[key]; ok {
return mc.Get(key, v)
}
if err := query(v); err != nil {
return err
}
return mc.Set(key, v)
} }
func (mc *mockedNode) TakeWithExpire(v interface{}, key string, query func(v interface{}, expire time.Duration) error) error { func (mc *mockedNode) SetWithExpireCtx(ctx context.Context, key string, val interface{}, expire time.Duration) error {
return mc.Take(v, key, func(v interface{}) error { return mc.Set(key, val)
return query(v, 0)
})
} }
func (mc *mockedNode) DelCtx(_ context.Context, keys ...string) error { func (mc *mockedNode) Take(val interface{}, key string, query func(val interface{}) error) error {
return mc.Del(keys...) return mc.TakeCtx(context.Background(), val, key, query)
} }
func (mc *mockedNode) GetCtx(_ context.Context, key string, v interface{}) error { func (mc *mockedNode) TakeCtx(ctx context.Context, val interface{}, key string, query func(val interface{}) error) error {
return mc.Get(key, v) if _, ok := mc.vals[key]; ok {
} return mc.GetCtx(ctx, key, val)
}
func (mc *mockedNode) SetCtx(_ context.Context, key string, v interface{}) error { if err := query(val); err != nil {
return mc.Set(key, v) return err
} }
func (mc *mockedNode) SetWithExpireCtx(_ context.Context, key string, v interface{}, expire time.Duration) error { return mc.SetCtx(ctx, key, val)
return mc.SetWithExpire(key, v, expire)
} }
func (mc *mockedNode) TakeCtx(_ context.Context, v interface{}, key string, query func(v interface{}) error) error { func (mc *mockedNode) TakeWithExpire(val interface{}, key string, query func(val interface{}, expire time.Duration) error) error {
return mc.Take(v, key, query) return mc.TakeWithExpireCtx(context.Background(), val, key, query)
} }
func (mc *mockedNode) TakeWithExpireCtx(_ context.Context, v interface{}, key string, query func(v interface{}, expire time.Duration) error) error { func (mc *mockedNode) TakeWithExpireCtx(ctx context.Context, val interface{}, key string, query func(val interface{}, expire time.Duration) error) error {
return mc.TakeWithExpire(v, key, query) return mc.Take(val, key, func(val interface{}) error {
return query(val, 0)
})
} }
func TestCache_SetDel(t *testing.T) { func TestCache_SetDel(t *testing.T) {
@ -141,18 +141,18 @@ func TestCache_SetDel(t *testing.T) {
} }
} }
for i := 0; i < total; i++ { for i := 0; i < total; i++ {
var v int var val int
assert.Nil(t, c.Get(fmt.Sprintf("key/%d", i), &v)) assert.Nil(t, c.Get(fmt.Sprintf("key/%d", i), &val))
assert.Equal(t, i, v) assert.Equal(t, i, val)
} }
assert.Nil(t, c.Del()) assert.Nil(t, c.Del())
for i := 0; i < total; i++ { for i := 0; i < total; i++ {
assert.Nil(t, c.Del(fmt.Sprintf("key/%d", i))) assert.Nil(t, c.Del(fmt.Sprintf("key/%d", i)))
} }
for i := 0; i < total; i++ { for i := 0; i < total; i++ {
var v int var val int
assert.True(t, c.IsNotFound(c.Get(fmt.Sprintf("key/%d", i), &v))) assert.True(t, c.IsNotFound(c.Get(fmt.Sprintf("key/%d", i), &val)))
assert.Equal(t, 0, v) assert.Equal(t, 0, val)
} }
} }
@ -179,18 +179,18 @@ func TestCache_OneNode(t *testing.T) {
} }
} }
for i := 0; i < total; i++ { for i := 0; i < total; i++ {
var v int var val int
assert.Nil(t, c.Get(fmt.Sprintf("key/%d", i), &v)) assert.Nil(t, c.Get(fmt.Sprintf("key/%d", i), &val))
assert.Equal(t, i, v) assert.Equal(t, i, val)
} }
assert.Nil(t, c.Del()) assert.Nil(t, c.Del())
for i := 0; i < total; i++ { for i := 0; i < total; i++ {
assert.Nil(t, c.Del(fmt.Sprintf("key/%d", i))) assert.Nil(t, c.Del(fmt.Sprintf("key/%d", i)))
} }
for i := 0; i < total; i++ { for i := 0; i < total; i++ {
var v int var val int
assert.True(t, c.IsNotFound(c.Get(fmt.Sprintf("key/%d", i), &v))) assert.True(t, c.IsNotFound(c.Get(fmt.Sprintf("key/%d", i), &val)))
assert.Equal(t, 0, v) assert.Equal(t, 0, val)
} }
} }
@ -230,9 +230,9 @@ func TestCache_Balance(t *testing.T) {
assert.True(t, entropy > .95, fmt.Sprintf("entropy should be greater than 0.95, but got %.2f", entropy)) assert.True(t, entropy > .95, fmt.Sprintf("entropy should be greater than 0.95, but got %.2f", entropy))
for i := 0; i < total; i++ { for i := 0; i < total; i++ {
var v int var val int
assert.Nil(t, c.Get(strconv.Itoa(i), &v)) assert.Nil(t, c.Get(strconv.Itoa(i), &val))
assert.Equal(t, i, v) assert.Equal(t, i, val)
} }
for i := 0; i < total/10; i++ { for i := 0; i < total/10; i++ {
@ -244,14 +244,14 @@ func TestCache_Balance(t *testing.T) {
for i := 0; i < total/10; i++ { for i := 0; i < total/10; i++ {
var val int var val int
if i%2 == 0 { if i%2 == 0 {
assert.Nil(t, c.Take(&val, strconv.Itoa(i*10), func(v interface{}) error { assert.Nil(t, c.Take(&val, strconv.Itoa(i*10), func(val interface{}) error {
*v.(*int) = i *val.(*int) = i
count++ count++
return nil return nil
})) }))
} else { } else {
assert.Nil(t, c.TakeWithExpire(&val, strconv.Itoa(i*10), func(v interface{}, expire time.Duration) error { assert.Nil(t, c.TakeWithExpire(&val, strconv.Itoa(i*10), func(val interface{}, expire time.Duration) error {
*v.(*int) = i *val.(*int) = i
count++ count++
return nil return nil
})) }))
@ -272,10 +272,10 @@ func TestCacheNoNode(t *testing.T) {
assert.NotNil(t, c.Get("foo", nil)) assert.NotNil(t, c.Get("foo", nil))
assert.NotNil(t, c.Set("foo", nil)) assert.NotNil(t, c.Set("foo", nil))
assert.NotNil(t, c.SetWithExpire("foo", nil, time.Second)) assert.NotNil(t, c.SetWithExpire("foo", nil, time.Second))
assert.NotNil(t, c.Take(nil, "foo", func(v interface{}) error { assert.NotNil(t, c.Take(nil, "foo", func(val interface{}) error {
return nil return nil
})) }))
assert.NotNil(t, c.TakeWithExpire(nil, "foo", func(v interface{}, duration time.Duration) error { assert.NotNil(t, c.TakeWithExpire(nil, "foo", func(val interface{}, duration time.Duration) error {
return nil return nil
})) }))
} }
@ -283,8 +283,8 @@ func TestCacheNoNode(t *testing.T) {
func calcEntropy(m map[int]int, total int) float64 { func calcEntropy(m map[int]int, total int) float64 {
var entropy float64 var entropy float64
for _, v := range m { for _, val := range m {
proba := float64(v) / float64(total) proba := float64(val) / float64(total)
entropy -= proba * math.Log2(proba) entropy -= proba * math.Log2(proba)
} }

@ -72,7 +72,6 @@ func (c cacheNode) DelCtx(ctx context.Context, keys ...string) error {
} }
logger := logx.WithContext(ctx) logger := logx.WithContext(ctx)
if len(keys) > 1 && c.rds.Type == redis.ClusterType { if len(keys) > 1 && c.rds.Type == redis.ClusterType {
for _, key := range keys { for _, key := range keys {
if _, err := c.rds.DelCtx(ctx, key); err != nil { if _, err := c.rds.DelCtx(ctx, key); err != nil {
@ -80,24 +79,22 @@ func (c cacheNode) DelCtx(ctx context.Context, keys ...string) error {
c.asyncRetryDelCache(key) c.asyncRetryDelCache(key)
} }
} }
} else { } else if _, err := c.rds.DelCtx(ctx, keys...); err != nil {
if _, err := c.rds.DelCtx(ctx, keys...); err != nil {
logger.Errorf("failed to clear cache with keys: %q, error: %v", formatKeys(keys), err) logger.Errorf("failed to clear cache with keys: %q, error: %v", formatKeys(keys), err)
c.asyncRetryDelCache(keys...) c.asyncRetryDelCache(keys...)
} }
}
return nil return nil
} }
// Get gets the cache with key and fills into v. // Get gets the cache with key and fills into v.
func (c cacheNode) Get(key string, v interface{}) error { func (c cacheNode) Get(key string, val interface{}) error {
return c.GetCtx(context.Background(), key, v) return c.GetCtx(context.Background(), key, val)
} }
// GetCtx gets the cache with key and fills into v. // GetCtx gets the cache with key and fills into v.
func (c cacheNode) GetCtx(ctx context.Context, key string, v interface{}) error { func (c cacheNode) GetCtx(ctx context.Context, key string, val interface{}) error {
err := c.doGetCache(ctx, key, v) err := c.doGetCache(ctx, key, val)
if err == errPlaceholder { if err == errPlaceholder {
return c.errNotFound return c.errNotFound
} }
@ -111,23 +108,23 @@ func (c cacheNode) IsNotFound(err error) bool {
} }
// Set sets the cache with key and v, using c.expiry. // Set sets the cache with key and v, using c.expiry.
func (c cacheNode) Set(key string, v interface{}) error { func (c cacheNode) Set(key string, val interface{}) error {
return c.SetCtx(context.Background(), key, v) return c.SetCtx(context.Background(), key, val)
} }
// SetCtx sets the cache with key and v, using c.expiry. // SetCtx sets the cache with key and v, using c.expiry.
func (c cacheNode) SetCtx(ctx context.Context, key string, v interface{}) error { func (c cacheNode) SetCtx(ctx context.Context, key string, val interface{}) error {
return c.SetWithExpireCtx(ctx, key, v, c.aroundDuration(c.expiry)) return c.SetWithExpireCtx(ctx, key, val, c.aroundDuration(c.expiry))
} }
// SetWithExpire sets the cache with key and v, using given expire. // SetWithExpire sets the cache with key and v, using given expire.
func (c cacheNode) SetWithExpire(key string, v interface{}, expire time.Duration) error { func (c cacheNode) SetWithExpire(key string, val interface{}, expire time.Duration) error {
return c.SetWithExpireCtx(context.Background(), key, v, expire) return c.SetWithExpireCtx(context.Background(), key, val, expire)
} }
// SetWithExpireCtx sets the cache with key and v, using given expire. // SetWithExpireCtx sets the cache with key and v, using given expire.
func (c cacheNode) SetWithExpireCtx(ctx context.Context, key string, v interface{}, expire time.Duration) error { func (c cacheNode) SetWithExpireCtx(ctx context.Context, key string, val interface{}, expire time.Duration) error {
data, err := jsonx.Marshal(v) data, err := jsonx.Marshal(val)
if err != nil { if err != nil {
return err return err
} }
@ -142,30 +139,29 @@ func (c cacheNode) String() string {
// Take takes the result from cache first, if not found, // Take takes the result from cache first, if not found,
// query from DB and set cache using c.expiry, then return the result. // query from DB and set cache using c.expiry, then return the result.
func (c cacheNode) Take(v interface{}, key string, query func(v interface{}) error) error { func (c cacheNode) Take(val interface{}, key string, query func(val interface{}) error) error {
return c.TakeCtx(context.Background(), v, key, query) return c.TakeCtx(context.Background(), val, key, query)
} }
// TakeCtx takes the result from cache first, if not found, // TakeCtx takes the result from cache first, if not found,
// query from DB and set cache using c.expiry, then return the result. // query from DB and set cache using c.expiry, then return the result.
func (c cacheNode) TakeCtx(ctx context.Context, v interface{}, key string, query func(v interface{}) error) error { func (c cacheNode) TakeCtx(ctx context.Context, val interface{}, key string, query func(val interface{}) error) error {
return c.doTake(ctx, v, key, query, func(v interface{}) error { return c.doTake(ctx, val, key, query, func(v interface{}) error {
return c.SetCtx(ctx, key, v) return c.SetCtx(ctx, key, v)
}) })
} }
// TakeWithExpire takes the result from cache first, if not found, // TakeWithExpire takes the result from cache first, if not found,
// query from DB and set cache using given expire, then return the result. // query from DB and set cache using given expire, then return the result.
func (c cacheNode) TakeWithExpire(v interface{}, key string, query func(v interface{}, func (c cacheNode) TakeWithExpire(val interface{}, key string, query func(val interface{}, expire time.Duration) error) error {
expire time.Duration) error) error { return c.TakeWithExpireCtx(context.Background(), val, key, query)
return c.TakeWithExpireCtx(context.Background(), v, key, query)
} }
// TakeWithExpireCtx takes the result from cache first, if not found, // TakeWithExpireCtx takes the result from cache first, if not found,
// query from DB and set cache using given expire, then return the result. // query from DB and set cache using given expire, then return the result.
func (c cacheNode) TakeWithExpireCtx(ctx context.Context, v interface{}, key string, query func(v interface{}, expire time.Duration) error) error { func (c cacheNode) TakeWithExpireCtx(ctx context.Context, val interface{}, key string, query func(val interface{}, expire time.Duration) error) error {
expire := c.aroundDuration(c.expiry) expire := c.aroundDuration(c.expiry)
return c.doTake(ctx, v, key, func(v interface{}) error { return c.doTake(ctx, val, key, func(v interface{}) error {
return query(v, expire) return query(v, expire)
}, func(v interface{}) error { }, func(v interface{}) error {
return c.SetWithExpireCtx(ctx, key, v, expire) return c.SetWithExpireCtx(ctx, key, v, expire)
@ -204,10 +200,9 @@ func (c cacheNode) doGetCache(ctx context.Context, key string, v interface{}) er
return c.processCache(ctx, key, data, v) return c.processCache(ctx, key, data, v)
} }
func (c cacheNode) doTake(ctx context.Context, v interface{}, key string, query func(v interface{}) error, func (c cacheNode) doTake(ctx context.Context, v interface{}, key string,
cacheVal func(v interface{}) error) error { query func(v interface{}) error, cacheVal func(v interface{}) error) error {
logger := logx.WithContext(ctx) logger := logx.WithContext(ctx)
val, fresh, err := c.barrier.DoEx(key, func() (interface{}, error) { val, fresh, err := c.barrier.DoEx(key, func() (interface{}, error) {
if err := c.doGetCache(ctx, key, v); err != nil { if err := c.doGetCache(ctx, key, v); err != nil {
if err == errPlaceholder { if err == errPlaceholder {
@ -252,8 +247,6 @@ func (c cacheNode) doTake(ctx context.Context, v interface{}, key string, query
} }
func (c cacheNode) processCache(ctx context.Context, key, data string, v interface{}) error { func (c cacheNode) processCache(ctx context.Context, key, data string, v interface{}) error {
logger := logx.WithContext(ctx)
err := jsonx.Unmarshal([]byte(data), v) err := jsonx.Unmarshal([]byte(data), v)
if err == nil { if err == nil {
return nil return nil
@ -261,6 +254,7 @@ func (c cacheNode) processCache(ctx context.Context, key, data string, v interfa
report := fmt.Sprintf("unmarshal cache, node: %s, key: %s, value: %s, error: %v", report := fmt.Sprintf("unmarshal cache, node: %s, key: %s, value: %s, error: %v",
c.rds.Addr, key, data, err) c.rds.Addr, key, data, err)
logger := logx.WithContext(ctx)
logger.Error(report) logger.Error(report)
stat.Report(report) stat.Report(report)
if _, e := c.rds.DelCtx(ctx, key); e != nil { if _, e := c.rds.DelCtx(ctx, key); e != nil {

Loading…
Cancel
Save