You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-zero/core/stores/cache/cache.go

135 lines
2.8 KiB
Go

package cache
4 years ago
import (
"fmt"
"log"
"time"
"github.com/tal-tech/go-zero/core/errorx"
"github.com/tal-tech/go-zero/core/hash"
"github.com/tal-tech/go-zero/core/syncx"
4 years ago
)
type (
Cache interface {
Del(keys ...string) error
Get(key string, v interface{}) error
IsNotFound(err error) bool
Set(key string, v interface{}) error
SetWithExpire(key string, v interface{}, expire time.Duration) error
4 years ago
Take(v interface{}, key string, query func(v interface{}) error) error
TakeWithExpire(v interface{}, key string, query func(v interface{}, expire time.Duration) error) error
}
cacheCluster struct {
dispatcher *hash.ConsistentHash
errNotFound error
}
)
func New(c ClusterConf, barrier syncx.SharedCalls, st *CacheStat, errNotFound error,
4 years ago
opts ...Option) Cache {
if len(c) == 0 || TotalWeights(c) <= 0 {
log.Fatal("no cache nodes")
}
if len(c) == 1 {
return NewNode(c[0].NewRedis(), barrier, st, errNotFound, opts...)
4 years ago
}
dispatcher := hash.NewConsistentHash()
for _, node := range c {
cn := NewNode(node.NewRedis(), barrier, st, errNotFound, opts...)
4 years ago
dispatcher.AddWithWeight(cn, node.Weight)
}
return cacheCluster{
dispatcher: dispatcher,
errNotFound: errNotFound,
}
}
func (cc cacheCluster) Del(keys ...string) error {
4 years ago
switch len(keys) {
case 0:
return nil
case 1:
key := keys[0]
c, ok := cc.dispatcher.Get(key)
if !ok {
return cc.errNotFound
}
return c.(Cache).Del(key)
4 years ago
default:
var be errorx.BatchError
nodes := make(map[interface{}][]string)
for _, key := range keys {
c, ok := cc.dispatcher.Get(key)
if !ok {
be.Add(fmt.Errorf("key %q not found", key))
continue
}
nodes[c] = append(nodes[c], key)
}
for c, ks := range nodes {
if err := c.(Cache).Del(ks...); err != nil {
4 years ago
be.Add(err)
}
}
return be.Err()
}
}
func (cc cacheCluster) Get(key string, v interface{}) error {
4 years ago
c, ok := cc.dispatcher.Get(key)
if !ok {
return cc.errNotFound
}
return c.(Cache).Get(key, v)
4 years ago
}
func (cc cacheCluster) IsNotFound(err error) bool {
return err == cc.errNotFound
}
func (cc cacheCluster) Set(key string, v interface{}) error {
4 years ago
c, ok := cc.dispatcher.Get(key)
if !ok {
return cc.errNotFound
}
return c.(Cache).Set(key, v)
4 years ago
}
func (cc cacheCluster) SetWithExpire(key string, v interface{}, expire time.Duration) error {
4 years ago
c, ok := cc.dispatcher.Get(key)
if !ok {
return cc.errNotFound
}
return c.(Cache).SetWithExpire(key, v, expire)
4 years ago
}
func (cc cacheCluster) Take(v interface{}, key string, query func(v interface{}) error) error {
c, ok := cc.dispatcher.Get(key)
if !ok {
return cc.errNotFound
}
return c.(Cache).Take(v, key, query)
}
func (cc cacheCluster) TakeWithExpire(v interface{}, key string,
query func(v interface{}, expire time.Duration) error) error {
c, ok := cc.dispatcher.Get(key)
if !ok {
return cc.errNotFound
}
return c.(Cache).TakeWithExpire(v, key, query)
}