fix golint issues in core/stores (#527)

master
Kevin Wan 4 years ago committed by GitHub
parent 490241d639
commit c566b5ff82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -11,6 +11,7 @@ import (
)
type (
// Cache interface is used to define the cache implementation.
Cache interface {
Del(keys ...string) error
Get(key string, v interface{}) error
@ -27,7 +28,8 @@ type (
}
)
func New(c ClusterConf, barrier syncx.SharedCalls, st *CacheStat, errNotFound error,
// New returns a Cache.
func New(c ClusterConf, barrier syncx.SharedCalls, st *Stat, errNotFound error,
opts ...Option) Cache {
if len(c) == 0 || TotalWeights(c) <= 0 {
log.Fatal("no cache nodes")

@ -102,7 +102,7 @@ func TestCache_SetDel(t *testing.T) {
Weight: 100,
},
}
c := New(conf, syncx.NewSharedCalls(), NewCacheStat("mock"), errPlaceholder)
c := New(conf, syncx.NewSharedCalls(), NewStat("mock"), errPlaceholder)
for i := 0; i < total; i++ {
if i%2 == 0 {
assert.Nil(t, c.Set(fmt.Sprintf("key/%d", i), i))
@ -140,7 +140,7 @@ func TestCache_OneNode(t *testing.T) {
Weight: 100,
},
}
c := New(conf, syncx.NewSharedCalls(), NewCacheStat("mock"), errPlaceholder)
c := New(conf, syncx.NewSharedCalls(), NewStat("mock"), errPlaceholder)
for i := 0; i < total; i++ {
if i%2 == 0 {
assert.Nil(t, c.Set(fmt.Sprintf("key/%d", i), i))

@ -1,3 +1,4 @@
package cache
// CacheConf is an alias of ClusterConf.
type CacheConf = ClusterConf

@ -33,7 +33,7 @@ type cacheNode struct {
r *rand.Rand
lock *sync.Mutex
unstableExpiry mathx.Unstable
stat *CacheStat
stat *Stat
errNotFound error
}
@ -43,7 +43,7 @@ type cacheNode struct {
// st is used to stat the cache.
// errNotFound defines the error that returned on cache not found.
// opts are the options that customize the cacheNode.
func NewNode(rds *redis.Redis, barrier syncx.SharedCalls, st *CacheStat,
func NewNode(rds *redis.Redis, barrier syncx.SharedCalls, st *Stat,
errNotFound error, opts ...Option) Cache {
o := newOptions(opts...)
return cacheNode{

@ -36,7 +36,7 @@ func TestCacheNode_DelCache(t *testing.T) {
r: rand.New(rand.NewSource(time.Now().UnixNano())),
lock: new(sync.Mutex),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
stat: NewCacheStat("any"),
stat: NewStat("any"),
errNotFound: errTestNotFound,
}
assert.Nil(t, cn.Del())
@ -59,7 +59,7 @@ func TestCacheNode_InvalidCache(t *testing.T) {
r: rand.New(rand.NewSource(time.Now().UnixNano())),
lock: new(sync.Mutex),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
stat: NewCacheStat("any"),
stat: NewStat("any"),
errNotFound: errTestNotFound,
}
s.Set("any", "value")
@ -81,7 +81,7 @@ func TestCacheNode_Take(t *testing.T) {
barrier: syncx.NewSharedCalls(),
lock: new(sync.Mutex),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
stat: NewCacheStat("any"),
stat: NewStat("any"),
errNotFound: errTestNotFound,
}
var str string
@ -108,7 +108,7 @@ func TestCacheNode_TakeNotFound(t *testing.T) {
barrier: syncx.NewSharedCalls(),
lock: new(sync.Mutex),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
stat: NewCacheStat("any"),
stat: NewStat("any"),
errNotFound: errTestNotFound,
}
var str string
@ -147,7 +147,7 @@ func TestCacheNode_TakeWithExpire(t *testing.T) {
barrier: syncx.NewSharedCalls(),
lock: new(sync.Mutex),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
stat: NewCacheStat("any"),
stat: NewStat("any"),
errNotFound: errors.New("any"),
}
var str string
@ -174,7 +174,7 @@ func TestCacheNode_String(t *testing.T) {
barrier: syncx.NewSharedCalls(),
lock: new(sync.Mutex),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
stat: NewCacheStat("any"),
stat: NewStat("any"),
errNotFound: errors.New("any"),
}
assert.Equal(t, store.Addr, cn.String())
@ -191,7 +191,7 @@ func TestCacheValueWithBigInt(t *testing.T) {
barrier: syncx.NewSharedCalls(),
lock: new(sync.Mutex),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
stat: NewCacheStat("any"),
stat: NewStat("any"),
errNotFound: errors.New("any"),
}

@ -8,11 +8,13 @@ const (
)
type (
// An Options is used to store the cache options.
Options struct {
Expiry time.Duration
NotFoundExpiry time.Duration
}
// Option defines the method to customize an Options.
Option func(o *Options)
)
@ -32,12 +34,14 @@ func newOptions(opts ...Option) Options {
return o
}
// WithExpiry returns a func to customize a Options with given expiry.
func WithExpiry(expiry time.Duration) Option {
return func(o *Options) {
o.Expiry = expiry
}
}
// WithNotFoundExpiry returns a func to customize a Options with given not found expiry.
func WithNotFoundExpiry(expiry time.Duration) Option {
return func(o *Options) {
o.NotFoundExpiry = expiry

@ -9,7 +9,8 @@ import (
const statInterval = time.Minute
type CacheStat struct {
// A Stat is used to stat the cache.
type Stat struct {
name string
// export the fields to let the unit tests working,
// reside in internal package, doesn't matter.
@ -19,8 +20,9 @@ type CacheStat struct {
DbFails uint64
}
func NewCacheStat(name string) *CacheStat {
ret := &CacheStat{
// NewStat returns a Stat.
func NewStat(name string) *Stat {
ret := &Stat{
name: name,
}
go ret.statLoop()
@ -28,37 +30,41 @@ func NewCacheStat(name string) *CacheStat {
return ret
}
func (cs *CacheStat) IncrementTotal() {
atomic.AddUint64(&cs.Total, 1)
// IncrementTotal increments the total count.
func (s *Stat) IncrementTotal() {
atomic.AddUint64(&s.Total, 1)
}
func (cs *CacheStat) IncrementHit() {
atomic.AddUint64(&cs.Hit, 1)
// IncrementHit increments the hit count.
func (s *Stat) IncrementHit() {
atomic.AddUint64(&s.Hit, 1)
}
func (cs *CacheStat) IncrementMiss() {
atomic.AddUint64(&cs.Miss, 1)
// IncrementMiss increments the miss count.
func (s *Stat) IncrementMiss() {
atomic.AddUint64(&s.Miss, 1)
}
func (cs *CacheStat) IncrementDbFails() {
atomic.AddUint64(&cs.DbFails, 1)
// IncrementDbFails increments the db fail count.
func (s *Stat) IncrementDbFails() {
atomic.AddUint64(&s.DbFails, 1)
}
func (cs *CacheStat) statLoop() {
func (s *Stat) statLoop() {
ticker := time.NewTicker(statInterval)
defer ticker.Stop()
for range ticker.C {
total := atomic.SwapUint64(&cs.Total, 0)
total := atomic.SwapUint64(&s.Total, 0)
if total == 0 {
continue
}
hit := atomic.SwapUint64(&cs.Hit, 0)
hit := atomic.SwapUint64(&s.Hit, 0)
percent := 100 * float32(hit) / float32(total)
miss := atomic.SwapUint64(&cs.Miss, 0)
dbf := atomic.SwapUint64(&cs.DbFails, 0)
miss := atomic.SwapUint64(&s.Miss, 0)
dbf := atomic.SwapUint64(&s.DbFails, 0)
logx.Statf("dbcache(%s) - qpm: %d, hit_ratio: %.1f%%, hit: %d, miss: %d, db_fails: %d",
cs.name, total, percent, hit, miss, dbf)
s.name, total, percent, hit, miss, dbf)
}
}

@ -39,6 +39,7 @@ func init() {
})
}
// AddCleanTask adds a clean task on given keys.
func AddCleanTask(task func() error, keys ...string) {
timingWheel.SetTimer(stringx.Randn(taskKeyLen), delayTask{
delay: time.Second,

@ -3,8 +3,10 @@ package cache
import "github.com/tal-tech/go-zero/core/stores/redis"
type (
// A ClusterConf is the config of a redis cluster that used as cache.
ClusterConf []NodeConf
// A NodeConf is the config of a redis node that used as cache.
NodeConf struct {
redis.RedisConf
Weight int `json:",default=100"`

@ -4,6 +4,7 @@ import "strings"
const keySeparator = ","
// TotalWeights returns the total weights of given nodes.
func TotalWeights(c []NodeConf) int {
var weights int

@ -8,6 +8,7 @@ import (
const clickHouseDriverName = "clickhouse"
// New returns a clickhouse connection.
func New(datasource string, opts ...sqlx.SqlOption) sqlx.SqlConn {
return sqlx.NewSqlConn(clickHouseDriverName, datasource, opts...)
}

@ -1,7 +1,6 @@
package kv
import (
"github.com/tal-tech/go-zero/core/stores/cache"
)
import "github.com/tal-tech/go-zero/core/stores/cache"
// KvConf is an alias of cache.ClusterConf.
type KvConf = cache.ClusterConf

@ -10,9 +10,11 @@ import (
"github.com/tal-tech/go-zero/core/stores/redis"
)
// ErrNoRedisNode is an error that indicates no redis node.
var ErrNoRedisNode = errors.New("no redis node")
type (
// Store interface represents a KV store.
Store interface {
Del(keys ...string) (int, error)
Eval(script string, key string, args ...interface{}) (interface{}, error)
@ -81,6 +83,7 @@ type (
}
)
// NewStore returns a Store.
func NewStore(c KvConf) Store {
if len(c) == 0 || cache.TotalWeights(c) <= 0 {
log.Fatal("no cache nodes")

@ -14,14 +14,17 @@ const (
)
type (
// ResultHandler is a handler that used to handle results.
ResultHandler func(*mgo.BulkResult, error)
// A BulkInserter is used to insert bulk of mongo records.
BulkInserter struct {
executor *executors.PeriodicalExecutor
inserter *dbInserter
}
)
// NewBulkInserter returns a BulkInserter.
func NewBulkInserter(session *mgo.Session, dbName string, collectionNamer func() string) *BulkInserter {
inserter := &dbInserter{
session: session,
@ -35,14 +38,17 @@ func NewBulkInserter(session *mgo.Session, dbName string, collectionNamer func()
}
}
// Flush flushes the inserter, writes all pending records.
func (bi *BulkInserter) Flush() {
bi.executor.Flush()
}
// Insert inserts doc.
func (bi *BulkInserter) Insert(doc interface{}) {
bi.executor.Add(doc)
}
// SetResultHandler sets the result handler.
func (bi *BulkInserter) SetResultHandler(handler ResultHandler) {
bi.executor.Sync(func() {
bi.inserter.resultHandler = handler

@ -13,9 +13,11 @@ import (
const slowThreshold = time.Millisecond * 500
// ErrNotFound is an alias of mgo.ErrNotFound.
var ErrNotFound = mgo.ErrNotFound
type (
// Collection interface represents a mongo connection.
Collection interface {
Find(query interface{}) Query
FindId(id interface{}) Query

@ -4,6 +4,7 @@ package internal
import "github.com/globalsign/mgo"
// MgoCollection interface represents a mgo collection.
type MgoCollection interface {
Find(query interface{}) *mgo.Query
FindId(id interface{}) *mgo.Query

@ -8,6 +8,7 @@ import (
)
type (
// Iter interface represents a mongo iter.
Iter interface {
All(result interface{}) error
Close() error
@ -19,6 +20,7 @@ type (
Timeout() bool
}
// A ClosableIter is a closable mongo iter.
ClosableIter struct {
Iter
Cleanup func()
@ -57,6 +59,7 @@ func (i promisedIter) For(result interface{}, f func() error) error {
return i.promise.keep(err)
}
// Close closes a mongo iter.
func (it *ClosableIter) Close() error {
err := it.Iter.Close()
it.Cleanup()

@ -12,8 +12,10 @@ type (
timeout time.Duration
}
// Option defines the method to customize a mongo model.
Option func(opts *options)
// A Model is a mongo model.
Model struct {
session *concurrentSession
db *mgo.Database
@ -22,6 +24,7 @@ type (
}
)
// MustNewModel returns a Model, exits on errors.
func MustNewModel(url, collection string, opts ...Option) *Model {
model, err := NewModel(url, collection, opts...)
if err != nil {
@ -31,6 +34,7 @@ func MustNewModel(url, collection string, opts ...Option) *Model {
return model
}
// NewModel returns a Model.
func NewModel(url, collection string, opts ...Option) (*Model, error) {
session, err := getConcurrentSession(url)
if err != nil {
@ -46,72 +50,85 @@ func NewModel(url, collection string, opts ...Option) (*Model, error) {
}, nil
}
// Find finds a record with given query.
func (mm *Model) Find(query interface{}) (Query, error) {
return mm.query(func(c Collection) Query {
return c.Find(query)
})
}
// FindId finds a record with given id.
func (mm *Model) FindId(id interface{}) (Query, error) {
return mm.query(func(c Collection) Query {
return c.FindId(id)
})
}
// GetCollection returns a Collection with given session.
func (mm *Model) GetCollection(session *mgo.Session) Collection {
return newCollection(mm.db.C(mm.collection).With(session))
}
// Insert inserts docs into mm.
func (mm *Model) Insert(docs ...interface{}) error {
return mm.execute(func(c Collection) error {
return c.Insert(docs...)
})
}
// Pipe returns a Pipe with given pipeline.
func (mm *Model) Pipe(pipeline interface{}) (Pipe, error) {
return mm.pipe(func(c Collection) Pipe {
return c.Pipe(pipeline)
})
}
// PutSession returns the given session.
func (mm *Model) PutSession(session *mgo.Session) {
mm.session.putSession(session)
}
// Remove removes the records with given selector.
func (mm *Model) Remove(selector interface{}) error {
return mm.execute(func(c Collection) error {
return c.Remove(selector)
})
}
// RemoveAll removes all with given selector and returns a mgo.ChangeInfo.
func (mm *Model) RemoveAll(selector interface{}) (*mgo.ChangeInfo, error) {
return mm.change(func(c Collection) (*mgo.ChangeInfo, error) {
return c.RemoveAll(selector)
})
}
// RemoveId removes a record with given id.
func (mm *Model) RemoveId(id interface{}) error {
return mm.execute(func(c Collection) error {
return c.RemoveId(id)
})
}
// TakeSession gets a session.
func (mm *Model) TakeSession() (*mgo.Session, error) {
return mm.session.takeSession(mm.opts...)
}
// Update updates a record with given selector.
func (mm *Model) Update(selector, update interface{}) error {
return mm.execute(func(c Collection) error {
return c.Update(selector, update)
})
}
// UpdateId updates a record with given id.
func (mm *Model) UpdateId(id, update interface{}) error {
return mm.execute(func(c Collection) error {
return c.UpdateId(id, update)
})
}
// Upsert upserts a record with given selector, and returns a mgo.ChangeInfo.
func (mm *Model) Upsert(selector, update interface{}) (*mgo.ChangeInfo, error) {
return mm.change(func(c Collection) (*mgo.ChangeInfo, error) {
return c.Upsert(selector, update)
@ -158,6 +175,7 @@ func (mm *Model) query(fn func(c Collection) Query) (Query, error) {
return fn(mm.GetCollection(session)), nil
}
// WithTimeout customizes an operation with given timeout.
func WithTimeout(timeout time.Duration) Option {
return func(opts *options) {
opts.timeout = timeout

@ -8,6 +8,7 @@ import (
)
type (
// Pipe interface represents a mongo pipe.
Pipe interface {
All(result interface{}) error
AllowDiskUse() Pipe

@ -8,6 +8,7 @@ import (
)
type (
// Query interface represents a mongo query.
Query interface {
All(result interface{}) error
Apply(change mgo.Change, result interface{}) (*mgo.ChangeInfo, error)

@ -4,6 +4,7 @@ import "strings"
const mongoAddrSep = ","
// FormatAddr formats mongo hosts to a string.
func FormatAddr(hosts []string) string {
return strings.Join(hosts, mongoAddrSep)
}

@ -8,23 +8,52 @@ import (
)
var (
// ErrNotFound is an alias of mgo.ErrNotFound.
ErrNotFound = mgo.ErrNotFound
// can't use one SharedCalls per conn, because multiple conns may share the same cache key.
sharedCalls = syncx.NewSharedCalls()
stats = cache.NewCacheStat("mongoc")
stats = cache.NewStat("mongoc")
)
type (
// QueryOption defines the method to customize a mongo query.
QueryOption func(query mongo.Query) mongo.Query
// CachedCollection interface represents a mongo collection with cache.
CachedCollection interface {
Count(query interface{}) (int, error)
DelCache(keys ...string) error
FindAllNoCache(v interface{}, query interface{}, opts ...QueryOption) error
FindOne(v interface{}, key string, query interface{}) error
FindOneNoCache(v interface{}, query interface{}) error
FindOneId(v interface{}, key string, id interface{}) error
FindOneIdNoCache(v interface{}, id interface{}) error
GetCache(key string, v interface{}) error
Insert(docs ...interface{}) error
Pipe(pipeline interface{}) mongo.Pipe
Remove(selector interface{}, keys ...string) error
RemoveNoCache(selector interface{}) error
RemoveAll(selector interface{}, keys ...string) (*mgo.ChangeInfo, error)
RemoveAllNoCache(selector interface{}) (*mgo.ChangeInfo, error)
RemoveId(id interface{}, keys ...string) error
RemoveIdNoCache(id interface{}) error
SetCache(key string, v interface{}) error
Update(selector, update interface{}, keys ...string) error
UpdateNoCache(selector, update interface{}) error
UpdateId(id, update interface{}, keys ...string) error
UpdateIdNoCache(id, update interface{}) error
Upsert(selector, update interface{}, keys ...string) (*mgo.ChangeInfo, error)
UpsertNoCache(selector, update interface{}) (*mgo.ChangeInfo, error)
}
cachedCollection struct {
collection mongo.Collection
cache cache.Cache
}
)
func newCollection(collection mongo.Collection, c cache.Cache) *cachedCollection {
func newCollection(collection mongo.Collection, c cache.Cache) CachedCollection {
return &cachedCollection{
collection: collection,
cache: c,
@ -39,10 +68,6 @@ func (c *cachedCollection) DelCache(keys ...string) error {
return c.cache.Del(keys...)
}
func (c *cachedCollection) GetCache(key string, v interface{}) error {
return c.cache.Get(key, v)
}
func (c *cachedCollection) FindAllNoCache(v interface{}, query interface{}, opts ...QueryOption) error {
q := c.collection.Find(query)
for _, opt := range opts {
@ -75,6 +100,10 @@ func (c *cachedCollection) FindOneIdNoCache(v interface{}, id interface{}) error
return q.One(v)
}
func (c *cachedCollection) GetCache(key string, v interface{}) error {
return c.cache.Get(key, v)
}
func (c *cachedCollection) Insert(docs ...interface{}) error {
return c.collection.Insert(docs...)
}

@ -9,12 +9,14 @@ import (
"github.com/tal-tech/go-zero/core/stores/redis"
)
// A Model is a mongo model that built with cache capability.
type Model struct {
*mongo.Model
cache cache.Cache
generateCollection func(*mgo.Session) *cachedCollection
generateCollection func(*mgo.Session) CachedCollection
}
// MustNewNodeModel returns a Model with a cache node, exists on errors.
func MustNewNodeModel(url, collection string, rds *redis.Redis, opts ...cache.Option) *Model {
model, err := NewNodeModel(url, collection, rds, opts...)
if err != nil {
@ -24,6 +26,7 @@ func MustNewNodeModel(url, collection string, rds *redis.Redis, opts ...cache.Op
return model
}
// MustNewModel returns a Model with a cache cluster, exists on errors.
func MustNewModel(url, collection string, c cache.CacheConf, opts ...cache.Option) *Model {
model, err := NewModel(url, collection, c, opts...)
if err != nil {
@ -33,157 +36,183 @@ func MustNewModel(url, collection string, c cache.CacheConf, opts ...cache.Optio
return model
}
// NewNodeModel returns a Model with a cache node.
func NewNodeModel(url, collection string, rds *redis.Redis, opts ...cache.Option) (*Model, error) {
c := cache.NewNode(rds, sharedCalls, stats, mgo.ErrNotFound, opts...)
return createModel(url, collection, c, func(collection mongo.Collection) *cachedCollection {
return createModel(url, collection, c, func(collection mongo.Collection) CachedCollection {
return newCollection(collection, c)
})
}
// NewModel returns a Model with a cache cluster.
func NewModel(url, collection string, conf cache.CacheConf, opts ...cache.Option) (*Model, error) {
c := cache.New(conf, sharedCalls, stats, mgo.ErrNotFound, opts...)
return createModel(url, collection, c, func(collection mongo.Collection) *cachedCollection {
return createModel(url, collection, c, func(collection mongo.Collection) CachedCollection {
return newCollection(collection, c)
})
}
// Count returns the count of given query.
func (mm *Model) Count(query interface{}) (int, error) {
return mm.executeInt(func(c *cachedCollection) (int, error) {
return mm.executeInt(func(c CachedCollection) (int, error) {
return c.Count(query)
})
}
// DelCache deletes the cache with given keys.
func (mm *Model) DelCache(keys ...string) error {
return mm.cache.Del(keys...)
}
// GetCache unmarshal the cache into v with given key.
func (mm *Model) GetCache(key string, v interface{}) error {
return mm.cache.Get(key, v)
}
func (mm *Model) GetCollection(session *mgo.Session) *cachedCollection {
// GetCollection returns a cache collection.
func (mm *Model) GetCollection(session *mgo.Session) CachedCollection {
return mm.generateCollection(session)
}
// FindAllNoCache finds all records without cache.
func (mm *Model) FindAllNoCache(v interface{}, query interface{}, opts ...QueryOption) error {
return mm.execute(func(c *cachedCollection) error {
return mm.execute(func(c CachedCollection) error {
return c.FindAllNoCache(v, query, opts...)
})
}
// FindOne unmarshals a record into v with given key and query.
func (mm *Model) FindOne(v interface{}, key string, query interface{}) error {
return mm.execute(func(c *cachedCollection) error {
return mm.execute(func(c CachedCollection) error {
return c.FindOne(v, key, query)
})
}
// FindOneNoCache unmarshals a record into v with query, without cache.
func (mm *Model) FindOneNoCache(v interface{}, query interface{}) error {
return mm.execute(func(c *cachedCollection) error {
return mm.execute(func(c CachedCollection) error {
return c.FindOneNoCache(v, query)
})
}
// FindOneId unmarshals a record into v with query.
func (mm *Model) FindOneId(v interface{}, key string, id interface{}) error {
return mm.execute(func(c *cachedCollection) error {
return mm.execute(func(c CachedCollection) error {
return c.FindOneId(v, key, id)
})
}
// FindOneIdNoCache unmarshals a record into v with query, without cache.
func (mm *Model) FindOneIdNoCache(v interface{}, id interface{}) error {
return mm.execute(func(c *cachedCollection) error {
return mm.execute(func(c CachedCollection) error {
return c.FindOneIdNoCache(v, id)
})
}
// Insert inserts docs.
func (mm *Model) Insert(docs ...interface{}) error {
return mm.execute(func(c *cachedCollection) error {
return mm.execute(func(c CachedCollection) error {
return c.Insert(docs...)
})
}
// Pipe returns a mongo pipe with given pipeline.
func (mm *Model) Pipe(pipeline interface{}) (mongo.Pipe, error) {
return mm.pipe(func(c *cachedCollection) mongo.Pipe {
return mm.pipe(func(c CachedCollection) mongo.Pipe {
return c.Pipe(pipeline)
})
}
// Remove removes a record with given selector, and remove it from cache with given keys.
func (mm *Model) Remove(selector interface{}, keys ...string) error {
return mm.execute(func(c *cachedCollection) error {
return mm.execute(func(c CachedCollection) error {
return c.Remove(selector, keys...)
})
}
// RemoveNoCache removes a record with given selector.
func (mm *Model) RemoveNoCache(selector interface{}) error {
return mm.execute(func(c *cachedCollection) error {
return mm.execute(func(c CachedCollection) error {
return c.RemoveNoCache(selector)
})
}
// RemoveAll removes all records with given selector, and removes cache with given keys.
func (mm *Model) RemoveAll(selector interface{}, keys ...string) (*mgo.ChangeInfo, error) {
return mm.change(func(c *cachedCollection) (*mgo.ChangeInfo, error) {
return mm.change(func(c CachedCollection) (*mgo.ChangeInfo, error) {
return c.RemoveAll(selector, keys...)
})
}
// RemoveAllNoCache removes all records with given selector, and returns a mgo.ChangeInfo.
func (mm *Model) RemoveAllNoCache(selector interface{}) (*mgo.ChangeInfo, error) {
return mm.change(func(c *cachedCollection) (*mgo.ChangeInfo, error) {
return mm.change(func(c CachedCollection) (*mgo.ChangeInfo, error) {
return c.RemoveAllNoCache(selector)
})
}
// RemoveId removes a record with given id, and removes cache with given keys.
func (mm *Model) RemoveId(id interface{}, keys ...string) error {
return mm.execute(func(c *cachedCollection) error {
return mm.execute(func(c CachedCollection) error {
return c.RemoveId(id, keys...)
})
}
// RemoveIdNoCache removes a record with given id.
func (mm *Model) RemoveIdNoCache(id interface{}) error {
return mm.execute(func(c *cachedCollection) error {
return mm.execute(func(c CachedCollection) error {
return c.RemoveIdNoCache(id)
})
}
// SetCache sets the cache with given key and value.
func (mm *Model) SetCache(key string, v interface{}) error {
return mm.cache.Set(key, v)
}
// Update updates the record with given selector, and delete cache with given keys.
func (mm *Model) Update(selector, update interface{}, keys ...string) error {
return mm.execute(func(c *cachedCollection) error {
return mm.execute(func(c CachedCollection) error {
return c.Update(selector, update, keys...)
})
}
// UpdateNoCache updates the record with given selector.
func (mm *Model) UpdateNoCache(selector, update interface{}) error {
return mm.execute(func(c *cachedCollection) error {
return mm.execute(func(c CachedCollection) error {
return c.UpdateNoCache(selector, update)
})
}
// UpdateId updates the record with given id, and delete cache with given keys.
func (mm *Model) UpdateId(id, update interface{}, keys ...string) error {
return mm.execute(func(c *cachedCollection) error {
return mm.execute(func(c CachedCollection) error {
return c.UpdateId(id, update, keys...)
})
}
// UpdateIdNoCache updates the record with given id.
func (mm *Model) UpdateIdNoCache(id, update interface{}) error {
return mm.execute(func(c *cachedCollection) error {
return mm.execute(func(c CachedCollection) error {
return c.UpdateIdNoCache(id, update)
})
}
// Upsert upserts a record with given selector, and delete cache with given keys.
func (mm *Model) Upsert(selector, update interface{}, keys ...string) (*mgo.ChangeInfo, error) {
return mm.change(func(c *cachedCollection) (*mgo.ChangeInfo, error) {
return mm.change(func(c CachedCollection) (*mgo.ChangeInfo, error) {
return c.Upsert(selector, update, keys...)
})
}
// UpsertNoCache upserts a record with given selector.
func (mm *Model) UpsertNoCache(selector, update interface{}) (*mgo.ChangeInfo, error) {
return mm.change(func(c *cachedCollection) (*mgo.ChangeInfo, error) {
return mm.change(func(c CachedCollection) (*mgo.ChangeInfo, error) {
return c.UpsertNoCache(selector, update)
})
}
func (mm *Model) change(fn func(c *cachedCollection) (*mgo.ChangeInfo, error)) (*mgo.ChangeInfo, error) {
func (mm *Model) change(fn func(c CachedCollection) (*mgo.ChangeInfo, error)) (*mgo.ChangeInfo, error) {
session, err := mm.TakeSession()
if err != nil {
return nil, err
@ -193,7 +222,7 @@ func (mm *Model) change(fn func(c *cachedCollection) (*mgo.ChangeInfo, error)) (
return fn(mm.GetCollection(session))
}
func (mm *Model) execute(fn func(c *cachedCollection) error) error {
func (mm *Model) execute(fn func(c CachedCollection) error) error {
session, err := mm.TakeSession()
if err != nil {
return err
@ -203,7 +232,7 @@ func (mm *Model) execute(fn func(c *cachedCollection) error) error {
return fn(mm.GetCollection(session))
}
func (mm *Model) executeInt(fn func(c *cachedCollection) (int, error)) (int, error) {
func (mm *Model) executeInt(fn func(c CachedCollection) (int, error)) (int, error) {
session, err := mm.TakeSession()
if err != nil {
return 0, err
@ -213,7 +242,7 @@ func (mm *Model) executeInt(fn func(c *cachedCollection) (int, error)) (int, err
return fn(mm.GetCollection(session))
}
func (mm *Model) pipe(fn func(c *cachedCollection) mongo.Pipe) (mongo.Pipe, error) {
func (mm *Model) pipe(fn func(c CachedCollection) mongo.Pipe) (mongo.Pipe, error) {
session, err := mm.TakeSession()
if err != nil {
return nil, err
@ -224,7 +253,7 @@ func (mm *Model) pipe(fn func(c *cachedCollection) mongo.Pipe) (mongo.Pipe, erro
}
func createModel(url, collection string, c cache.Cache,
create func(mongo.Collection) *cachedCollection) (*Model, error) {
create func(mongo.Collection) CachedCollection) (*Model, error) {
model, err := mongo.NewModel(url, collection)
if err != nil {
return nil, err
@ -233,7 +262,7 @@ func createModel(url, collection string, c cache.Cache,
return &Model{
Model: model,
cache: c,
generateCollection: func(session *mgo.Session) *cachedCollection {
generateCollection: func(session *mgo.Session) CachedCollection {
collection := model.GetCollection(session)
return create(collection)
},

@ -8,6 +8,7 @@ import (
const postgresDriverName = "postgres"
// New returns a postgres connection.
func New(datasource string, opts ...sqlx.SqlOption) sqlx.SqlConn {
return sqlx.NewSqlConn(postgresDriverName, datasource, opts...)
}

@ -3,28 +3,35 @@ package redis
import "errors"
var (
// ErrEmptyHost is an error that indicates no redis host is set.
ErrEmptyHost = errors.New("empty redis host")
// ErrEmptyType is an error that indicates no redis type is set.
ErrEmptyType = errors.New("empty redis type")
ErrEmptyKey = errors.New("empty redis key")
// ErrEmptyKey is an error that indicates no redis key is set.
ErrEmptyKey = errors.New("empty redis key")
)
type (
// A RedisConf is a redis config.
RedisConf struct {
Host string
Type string `json:",default=node,options=node|cluster"`
Pass string `json:",optional"`
}
// A RedisKeyConf is a redis config with key.
RedisKeyConf struct {
RedisConf
Key string `json:",optional"`
}
)
// NewRedis returns a Redis.
func (rc RedisConf) NewRedis() *Redis {
return NewRedis(rc.Host, rc.Type, rc.Pass)
}
// Validate validates the RedisConf.
func (rc RedisConf) Validate() error {
if len(rc.Host) == 0 {
return ErrEmptyHost
@ -37,6 +44,7 @@ func (rc RedisConf) Validate() error {
return nil
}
// Validate validates the RedisKeyConf.
func (rkc RedisKeyConf) Validate() error {
if err := rkc.RedisConf.Validate(); err != nil {
return err

@ -12,9 +12,12 @@ import (
)
const (
// ClusterType means redis cluster.
ClusterType = "cluster"
NodeType = "node"
Nil = red.Nil
// NodeType means redis node.
NodeType = "node"
// Nil is an alias of redis.Nil.
Nil = red.Nil
blockingQueryTimeout = 5 * time.Second
readWriteTimeout = 2 * time.Second
@ -22,9 +25,11 @@ const (
slowThreshold = time.Millisecond * 100
)
// ErrNilNode is an error that indicates a nil redis node.
var ErrNilNode = errors.New("nil redis node")
type (
// A Pair is a key/pair set used in redis zset.
Pair struct {
Key string
Score int64
@ -38,6 +43,7 @@ type (
brk breaker.Breaker
}
// RedisNode interface represents a redis node.
RedisNode interface {
red.Cmdable
}
@ -46,18 +52,24 @@ type (
GeoLocation = red.GeoLocation
// GeoRadiusQuery is used with GeoRadius to query geospatial index.
GeoRadiusQuery = red.GeoRadiusQuery
GeoPos = red.GeoPos
// GeoPos is used to represent a geo position.
GeoPos = red.GeoPos
// Pipeliner is an alias of redis.Pipeliner.
Pipeliner = red.Pipeliner
// Z represents sorted set member.
Z = red.Z
Z = red.Z
// ZStore is an alias of redis.ZStore.
ZStore = red.ZStore
IntCmd = red.IntCmd
// IntCmd is an alias of redis.IntCmd.
IntCmd = red.IntCmd
// FloatCmd is an alias of redis.FloatCmd.
FloatCmd = red.FloatCmd
)
// NewRedis returns a Redis.
func NewRedis(redisAddr, redisType string, redisPass ...string) *Redis {
var pass string
for _, v := range redisPass {
@ -184,6 +196,8 @@ func (s *Redis) Blpop(redisNode RedisNode, key string) (string, error) {
return vals[1], nil
}
// BlpopEx uses passed in redis connection to execute blpop command.
// The difference against Blpop is that this method returns a bool to indicate success.
func (s *Redis) BlpopEx(redisNode RedisNode, key string) (string, bool, error) {
if redisNode == nil {
return "", false, ErrNilNode
@ -201,6 +215,7 @@ func (s *Redis) BlpopEx(redisNode RedisNode, key string) (string, bool, error) {
return vals[1], true, nil
}
// Del deletes keys.
func (s *Redis) Del(keys ...string) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -220,6 +235,7 @@ func (s *Redis) Del(keys ...string) (val int, err error) {
return
}
// Eval is the implementation of redis eval command.
func (s *Redis) Eval(script string, keys []string, args ...interface{}) (val interface{}, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -234,6 +250,7 @@ func (s *Redis) Eval(script string, keys []string, args ...interface{}) (val int
return
}
// Exists is the implementation of redis exists command.
func (s *Redis) Exists(key string) (val bool, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -253,6 +270,7 @@ func (s *Redis) Exists(key string) (val bool, err error) {
return
}
// Expire is the implementation of redis expire command.
func (s *Redis) Expire(key string, seconds int) error {
return s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -264,6 +282,7 @@ func (s *Redis) Expire(key string, seconds int) error {
}, acceptable)
}
// Expireat is the implementation of redis expireat command.
func (s *Redis) Expireat(key string, expireTime int64) error {
return s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -275,6 +294,7 @@ func (s *Redis) Expireat(key string, expireTime int64) error {
}, acceptable)
}
// GeoAdd is the implementation of redis geoadd command.
func (s *Redis) GeoAdd(key string, geoLocation ...*GeoLocation) (val int64, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -293,6 +313,7 @@ func (s *Redis) GeoAdd(key string, geoLocation ...*GeoLocation) (val int64, err
return
}
// GeoDist is the implementation of redis geodist command.
func (s *Redis) GeoDist(key string, member1, member2, unit string) (val float64, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -311,6 +332,7 @@ func (s *Redis) GeoDist(key string, member1, member2, unit string) (val float64,
return
}
// GeoHash is the implementation of redis geohash command.
func (s *Redis) GeoHash(key string, members ...string) (val []string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -329,6 +351,7 @@ func (s *Redis) GeoHash(key string, members ...string) (val []string, err error)
return
}
// GeoRadius is the implementation of redis georadius command.
func (s *Redis) GeoRadius(key string, longitude, latitude float64, query *GeoRadiusQuery) (val []GeoLocation, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -346,6 +369,8 @@ func (s *Redis) GeoRadius(key string, longitude, latitude float64, query *GeoRad
}, acceptable)
return
}
// GeoRadiusByMember is the implementation of redis georadiusbymember command.
func (s *Redis) GeoRadiusByMember(key, member string, query *GeoRadiusQuery) (val []GeoLocation, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -364,6 +389,7 @@ func (s *Redis) GeoRadiusByMember(key, member string, query *GeoRadiusQuery) (va
return
}
// GeoPos is the implementation of redis geopos command.
func (s *Redis) GeoPos(key string, members ...string) (val []*GeoPos, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -382,6 +408,7 @@ func (s *Redis) GeoPos(key string, members ...string) (val []*GeoPos, err error)
return
}
// Get is the implementation of redis get command.
func (s *Redis) Get(key string) (val string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -401,6 +428,7 @@ func (s *Redis) Get(key string) (val string, err error) {
return
}
// GetBit is the implementation of redis getbit command.
func (s *Redis) GetBit(key string, offset int64) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -420,6 +448,7 @@ func (s *Redis) GetBit(key string, offset int64) (val int, err error) {
return
}
// Hdel is the implementation of redis hdel command.
func (s *Redis) Hdel(key, field string) (val bool, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -439,6 +468,7 @@ func (s *Redis) Hdel(key, field string) (val bool, err error) {
return
}
// Hexists is the implementation of redis hexists command.
func (s *Redis) Hexists(key, field string) (val bool, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -453,6 +483,7 @@ func (s *Redis) Hexists(key, field string) (val bool, err error) {
return
}
// Hget is the implementation of redis hget command.
func (s *Redis) Hget(key, field string) (val string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -467,6 +498,7 @@ func (s *Redis) Hget(key, field string) (val string, err error) {
return
}
// Hgetall is the implementation of redis hgetall command.
func (s *Redis) Hgetall(key string) (val map[string]string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -481,6 +513,7 @@ func (s *Redis) Hgetall(key string) (val map[string]string, err error) {
return
}
// Hincrby is the implementation of redis hincrby command.
func (s *Redis) Hincrby(key, field string, increment int) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -500,6 +533,7 @@ func (s *Redis) Hincrby(key, field string, increment int) (val int, err error) {
return
}
// Hkeys is the implementation of redis hkeys command.
func (s *Redis) Hkeys(key string) (val []string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -514,6 +548,7 @@ func (s *Redis) Hkeys(key string) (val []string, err error) {
return
}
// Hlen is the implementation of redis hlen command.
func (s *Redis) Hlen(key string) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -533,6 +568,7 @@ func (s *Redis) Hlen(key string) (val int, err error) {
return
}
// Hmget is the implementation of redis hmget command.
func (s *Redis) Hmget(key string, fields ...string) (val []string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -552,6 +588,7 @@ func (s *Redis) Hmget(key string, fields ...string) (val []string, err error) {
return
}
// Hset is the implementation of redis hset command.
func (s *Redis) Hset(key, field, value string) error {
return s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -563,6 +600,7 @@ func (s *Redis) Hset(key, field, value string) error {
}, acceptable)
}
// Hsetnx is the implementation of redis hsetnx command.
func (s *Redis) Hsetnx(key, field, value string) (val bool, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -577,6 +615,7 @@ func (s *Redis) Hsetnx(key, field, value string) (val bool, err error) {
return
}
// Hmset is the implementation of redis hmset command.
func (s *Redis) Hmset(key string, fieldsAndValues map[string]string) error {
return s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -593,6 +632,7 @@ func (s *Redis) Hmset(key string, fieldsAndValues map[string]string) error {
}, acceptable)
}
// Hscan is the implementation of redis hscan command.
func (s *Redis) Hscan(key string, cursor uint64, match string, count int64) (keys []string, cur uint64, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -607,6 +647,7 @@ func (s *Redis) Hscan(key string, cursor uint64, match string, count int64) (key
return
}
// Hvals is the implementation of redis hvals command.
func (s *Redis) Hvals(key string) (val []string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -621,6 +662,7 @@ func (s *Redis) Hvals(key string) (val []string, err error) {
return
}
// Incr is the implementation of redis incr command.
func (s *Redis) Incr(key string) (val int64, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -635,6 +677,7 @@ func (s *Redis) Incr(key string) (val int64, err error) {
return
}
// Incrby is the implementation of redis incrby command.
func (s *Redis) Incrby(key string, increment int64) (val int64, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -649,6 +692,7 @@ func (s *Redis) Incrby(key string, increment int64) (val int64, err error) {
return
}
// Keys is the implementation of redis keys command.
func (s *Redis) Keys(pattern string) (val []string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -663,6 +707,7 @@ func (s *Redis) Keys(pattern string) (val []string, err error) {
return
}
// Llen is the implementation of redis llen command.
func (s *Redis) Llen(key string) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -682,6 +727,7 @@ func (s *Redis) Llen(key string) (val int, err error) {
return
}
// Lpop is the implementation of redis lpop command.
func (s *Redis) Lpop(key string) (val string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -696,6 +742,7 @@ func (s *Redis) Lpop(key string) (val string, err error) {
return
}
// Lpush is the implementation of redis lpush command.
func (s *Redis) Lpush(key string, values ...interface{}) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -715,6 +762,7 @@ func (s *Redis) Lpush(key string, values ...interface{}) (val int, err error) {
return
}
// Lrange is the implementation of redis lrange command.
func (s *Redis) Lrange(key string, start int, stop int) (val []string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -729,6 +777,7 @@ func (s *Redis) Lrange(key string, start int, stop int) (val []string, err error
return
}
// Lrem is the implementation of redis lrem command.
func (s *Redis) Lrem(key string, count int, value string) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -748,6 +797,7 @@ func (s *Redis) Lrem(key string, count int, value string) (val int, err error) {
return
}
// Mget is the implementation of redis mget command.
func (s *Redis) Mget(keys ...string) (val []string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -767,6 +817,7 @@ func (s *Redis) Mget(keys ...string) (val []string, err error) {
return
}
// Persist is the implementation of redis persist command.
func (s *Redis) Persist(key string) (val bool, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -781,6 +832,7 @@ func (s *Redis) Persist(key string) (val bool, err error) {
return
}
// Pfadd is the implementation of redis pfadd command.
func (s *Redis) Pfadd(key string, values ...interface{}) (val bool, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -800,6 +852,7 @@ func (s *Redis) Pfadd(key string, values ...interface{}) (val bool, err error) {
return
}
// Pfcount is the implementation of redis pfcount command.
func (s *Redis) Pfcount(key string) (val int64, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -814,6 +867,7 @@ func (s *Redis) Pfcount(key string) (val int64, err error) {
return
}
// Pfmerge is the implementation of redis pfmerge command.
func (s *Redis) Pfmerge(dest string, keys ...string) error {
return s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -826,6 +880,7 @@ func (s *Redis) Pfmerge(dest string, keys ...string) error {
}, acceptable)
}
// Ping is the implementation of redis ping command.
func (s *Redis) Ping() (val bool) {
// ignore error, error means false
_ = s.brk.DoWithAcceptable(func() error {
@ -848,6 +903,7 @@ func (s *Redis) Ping() (val bool) {
return
}
// Pipelined lets fn to execute pipelined commands.
func (s *Redis) Pipelined(fn func(Pipeliner) error) (err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -863,6 +919,7 @@ func (s *Redis) Pipelined(fn func(Pipeliner) error) (err error) {
return
}
// Rpop is the implementation of redis rpop command.
func (s *Redis) Rpop(key string) (val string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -877,6 +934,7 @@ func (s *Redis) Rpop(key string) (val string, err error) {
return
}
// Rpush is the implementation of redis rpush command.
func (s *Redis) Rpush(key string, values ...interface{}) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -896,6 +954,7 @@ func (s *Redis) Rpush(key string, values ...interface{}) (val int, err error) {
return
}
// Sadd is the implementation of redis sadd command.
func (s *Redis) Sadd(key string, values ...interface{}) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -915,6 +974,7 @@ func (s *Redis) Sadd(key string, values ...interface{}) (val int, err error) {
return
}
// Scan is the implementation of redis scan command.
func (s *Redis) Scan(cursor uint64, match string, count int64) (keys []string, cur uint64, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -929,6 +989,7 @@ func (s *Redis) Scan(cursor uint64, match string, count int64) (keys []string, c
return
}
// SetBit is the implementation of redis setbit command.
func (s *Redis) SetBit(key string, offset int64, value int) error {
return s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -941,6 +1002,7 @@ func (s *Redis) SetBit(key string, offset int64, value int) error {
}, acceptable)
}
// Sscan is the implementation of redis sscan command.
func (s *Redis) Sscan(key string, cursor uint64, match string, count int64) (keys []string, cur uint64, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -955,6 +1017,7 @@ func (s *Redis) Sscan(key string, cursor uint64, match string, count int64) (key
return
}
// Scard is the implementation of redis scard command.
func (s *Redis) Scard(key string) (val int64, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -969,6 +1032,7 @@ func (s *Redis) Scard(key string) (val int64, err error) {
return
}
// Set is the implementation of redis set command.
func (s *Redis) Set(key string, value string) error {
return s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -980,6 +1044,7 @@ func (s *Redis) Set(key string, value string) error {
}, acceptable)
}
// Setex is the implementation of redis setex command.
func (s *Redis) Setex(key, value string, seconds int) error {
return s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -991,6 +1056,7 @@ func (s *Redis) Setex(key, value string, seconds int) error {
}, acceptable)
}
// Setnx is the implementation of redis setnx command.
func (s *Redis) Setnx(key, value string) (val bool, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1005,6 +1071,7 @@ func (s *Redis) Setnx(key, value string) (val bool, err error) {
return
}
// SetnxEx is the implementation of redis setnx command with expire.
func (s *Redis) SetnxEx(key, value string, seconds int) (val bool, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1019,6 +1086,7 @@ func (s *Redis) SetnxEx(key, value string, seconds int) (val bool, err error) {
return
}
// Sismember is the implementation of redis sismember command.
func (s *Redis) Sismember(key string, value interface{}) (val bool, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1033,6 +1101,7 @@ func (s *Redis) Sismember(key string, value interface{}) (val bool, err error) {
return
}
// Srem is the implementation of redis srem command.
func (s *Redis) Srem(key string, values ...interface{}) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1052,6 +1121,7 @@ func (s *Redis) Srem(key string, values ...interface{}) (val int, err error) {
return
}
// Smembers is the implementation of redis smembers command.
func (s *Redis) Smembers(key string) (val []string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1066,6 +1136,7 @@ func (s *Redis) Smembers(key string) (val []string, err error) {
return
}
// Spop is the implementation of redis spop command.
func (s *Redis) Spop(key string) (val string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1080,6 +1151,7 @@ func (s *Redis) Spop(key string) (val string, err error) {
return
}
// Srandmember is the implementation of redis srandmember command.
func (s *Redis) Srandmember(key string, count int) (val []string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1094,6 +1166,7 @@ func (s *Redis) Srandmember(key string, count int) (val []string, err error) {
return
}
// Sunion is the implementation of redis sunion command.
func (s *Redis) Sunion(keys ...string) (val []string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1108,6 +1181,7 @@ func (s *Redis) Sunion(keys ...string) (val []string, err error) {
return
}
// Sunionstore is the implementation of redis sunionstore command.
func (s *Redis) Sunionstore(destination string, keys ...string) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1127,6 +1201,7 @@ func (s *Redis) Sunionstore(destination string, keys ...string) (val int, err er
return
}
// Sdiff is the implementation of redis sdiff command.
func (s *Redis) Sdiff(keys ...string) (val []string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1141,6 +1216,7 @@ func (s *Redis) Sdiff(keys ...string) (val []string, err error) {
return
}
// Sdiffstore is the implementation of redis sdiffstore command.
func (s *Redis) Sdiffstore(destination string, keys ...string) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1160,6 +1236,7 @@ func (s *Redis) Sdiffstore(destination string, keys ...string) (val int, err err
return
}
// Ttl is the implementation of redis ttl command.
func (s *Redis) Ttl(key string) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1179,6 +1256,7 @@ func (s *Redis) Ttl(key string) (val int, err error) {
return
}
// Zadd is the implementation of redis zadd command.
func (s *Redis) Zadd(key string, score int64, value string) (val bool, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1201,6 +1279,7 @@ func (s *Redis) Zadd(key string, score int64, value string) (val bool, err error
return
}
// Zadds is the implementation of redis zadds command.
func (s *Redis) Zadds(key string, ps ...Pair) (val int64, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1226,6 +1305,7 @@ func (s *Redis) Zadds(key string, ps ...Pair) (val int64, err error) {
return
}
// Zcard is the implementation of redis zcard command.
func (s *Redis) Zcard(key string) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1245,6 +1325,7 @@ func (s *Redis) Zcard(key string) (val int, err error) {
return
}
// Zcount is the implementation of redis zcount command.
func (s *Redis) Zcount(key string, start, stop int64) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1264,6 +1345,7 @@ func (s *Redis) Zcount(key string, start, stop int64) (val int, err error) {
return
}
// Zincrby is the implementation of redis zincrby command.
func (s *Redis) Zincrby(key string, increment int64, field string) (val int64, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1283,6 +1365,7 @@ func (s *Redis) Zincrby(key string, increment int64, field string) (val int64, e
return
}
// Zscore is the implementation of redis zscore command.
func (s *Redis) Zscore(key string, value string) (val int64, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1302,6 +1385,7 @@ func (s *Redis) Zscore(key string, value string) (val int64, err error) {
return
}
// Zrank is the implementation of redis zrank command.
func (s *Redis) Zrank(key, field string) (val int64, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1316,6 +1400,7 @@ func (s *Redis) Zrank(key, field string) (val int64, err error) {
return
}
// Zrem is the implementation of redis zrem command.
func (s *Redis) Zrem(key string, values ...interface{}) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1335,6 +1420,7 @@ func (s *Redis) Zrem(key string, values ...interface{}) (val int, err error) {
return
}
// Zremrangebyscore is the implementation of redis zremrangebyscore command.
func (s *Redis) Zremrangebyscore(key string, start, stop int64) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1355,6 +1441,7 @@ func (s *Redis) Zremrangebyscore(key string, start, stop int64) (val int, err er
return
}
// Zremrangebyrank is the implementation of redis zremrangebyrank command.
func (s *Redis) Zremrangebyrank(key string, start, stop int64) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1374,6 +1461,7 @@ func (s *Redis) Zremrangebyrank(key string, start, stop int64) (val int, err err
return
}
// Zrange is the implementation of redis zrange command.
func (s *Redis) Zrange(key string, start, stop int64) (val []string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1388,6 +1476,7 @@ func (s *Redis) Zrange(key string, start, stop int64) (val []string, err error)
return
}
// ZrangeWithScores is the implementation of redis zrange command with scores.
func (s *Redis) ZrangeWithScores(key string, start, stop int64) (val []Pair, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1407,6 +1496,7 @@ func (s *Redis) ZrangeWithScores(key string, start, stop int64) (val []Pair, err
return
}
// ZRevRangeWithScores is the implementation of redis zrevrange command with scores.
func (s *Redis) ZRevRangeWithScores(key string, start, stop int64) (val []Pair, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1426,6 +1516,7 @@ func (s *Redis) ZRevRangeWithScores(key string, start, stop int64) (val []Pair,
return
}
// ZrangebyscoreWithScores is the implementation of redis zrangebyscore command with scores.
func (s *Redis) ZrangebyscoreWithScores(key string, start, stop int64) (val []Pair, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1448,6 +1539,7 @@ func (s *Redis) ZrangebyscoreWithScores(key string, start, stop int64) (val []Pa
return
}
// ZrangebyscoreWithScoresAndLimit is the implementation of redis zrangebyscore command with scores and limit.
func (s *Redis) ZrangebyscoreWithScoresAndLimit(key string, start, stop int64, page, size int) (
val []Pair, err error) {
err = s.brk.DoWithAcceptable(func() error {
@ -1477,6 +1569,7 @@ func (s *Redis) ZrangebyscoreWithScoresAndLimit(key string, start, stop int64, p
return
}
// Zrevrange is the implementation of redis zrevrange command.
func (s *Redis) Zrevrange(key string, start, stop int64) (val []string, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1491,6 +1584,7 @@ func (s *Redis) Zrevrange(key string, start, stop int64) (val []string, err erro
return
}
// ZrevrangebyscoreWithScores is the implementation of redis zrevrangebyscore command with scores.
func (s *Redis) ZrevrangebyscoreWithScores(key string, start, stop int64) (val []Pair, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1513,6 +1607,7 @@ func (s *Redis) ZrevrangebyscoreWithScores(key string, start, stop int64) (val [
return
}
// ZrevrangebyscoreWithScoresAndLimit is the implementation of redis zrevrangebyscore command with scores and limit.
func (s *Redis) ZrevrangebyscoreWithScoresAndLimit(key string, start, stop int64, page, size int) (
val []Pair, err error) {
err = s.brk.DoWithAcceptable(func() error {
@ -1542,6 +1637,7 @@ func (s *Redis) ZrevrangebyscoreWithScoresAndLimit(key string, start, stop int64
return
}
// Zrevrank is the implementation of redis zrevrank command.
func (s *Redis) Zrevrank(key string, field string) (val int64, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1556,6 +1652,7 @@ func (s *Redis) Zrevrank(key string, field string) (val int64, err error) {
return
}
// Zunionstore is the implementation of redis zunionstore command.
func (s *Redis) Zunionstore(dest string, store ZStore, keys ...string) (val int64, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
@ -1570,6 +1667,7 @@ func (s *Redis) Zunionstore(dest string, store ZStore, keys ...string) (val int6
return
}
// String returns the string representation of s.
func (s *Redis) String() string {
return s.Addr
}

@ -7,11 +7,13 @@ import (
"github.com/tal-tech/go-zero/core/logx"
)
// ClosableNode interface represents a closable redis node.
type ClosableNode interface {
RedisNode
Close()
}
// CreateBlockingNode returns a ClosableNode.
func CreateBlockingNode(r *Redis) (ClosableNode, error) {
timeout := readWriteTimeout + blockingQueryTimeout

@ -28,6 +28,7 @@ end`
millisPerSecond = 1000
)
// A RedisLock is a redis lock.
type RedisLock struct {
store *Redis
seconds uint32
@ -39,6 +40,7 @@ func init() {
rand.Seed(time.Now().UnixNano())
}
// NewRedisLock returns a RedisLock.
func NewRedisLock(store *Redis, key string) *RedisLock {
return &RedisLock{
store: store,
@ -47,6 +49,7 @@ func NewRedisLock(store *Redis, key string) *RedisLock {
}
}
// Acquire acquires the lock.
func (rl *RedisLock) Acquire() (bool, error) {
seconds := atomic.LoadUint32(&rl.seconds)
resp, err := rl.store.Eval(lockCommand, []string{rl.key}, []string{
@ -69,6 +72,7 @@ func (rl *RedisLock) Acquire() (bool, error) {
return false, nil
}
// Release releases the lock.
func (rl *RedisLock) Release() (bool, error) {
resp, err := rl.store.Eval(delCommand, []string{rl.key}, []string{rl.id})
if err != nil {
@ -83,6 +87,7 @@ func (rl *RedisLock) Release() (bool, error) {
return reply == 1, nil
}
// SetExpire sets the expire.
func (rl *RedisLock) SetExpire(seconds int) {
atomic.StoreUint32(&rl.seconds, uint32(seconds))
}

@ -8,6 +8,7 @@ import (
"github.com/tal-tech/go-zero/core/stores/redis"
)
// CreateRedis returns a in process redis.Redis.
func CreateRedis() (r *redis.Redis, clean func(), err error) {
mr, err := miniredis.Run()
if err != nil {

@ -12,13 +12,16 @@ var (
)
type (
// Map is an alias of map[string]string.
Map map[string]string
// A ScriptCache is a cache that stores a script with its sha key.
ScriptCache struct {
atomic.Value
}
)
// GetScriptCache returns a ScriptCache.
func GetScriptCache() *ScriptCache {
once.Do(func() {
instance = &ScriptCache{}
@ -28,12 +31,14 @@ func GetScriptCache() *ScriptCache {
return instance
}
// GetSha returns the sha string of given script.
func (sc *ScriptCache) GetSha(script string) (string, bool) {
cache := sc.Load().(Map)
ret, ok := cache[script]
return ret, ok
}
// SetSha sets script with sha into the ScriptCache.
func (sc *ScriptCache) SetSha(script, sha string) {
lock.Lock()
defer lock.Unlock()

@ -14,25 +14,32 @@ import (
const cacheSafeGapBetweenIndexAndPrimary = time.Second * 5
var (
// ErrNotFound is an alias of sqlx.ErrNotFound.
ErrNotFound = sqlx.ErrNotFound
// can't use one SharedCalls per conn, because multiple conns may share the same cache key.
exclusiveCalls = syncx.NewSharedCalls()
stats = cache.NewCacheStat("sqlc")
stats = cache.NewStat("sqlc")
)
type (
ExecFn func(conn sqlx.SqlConn) (sql.Result, error)
IndexQueryFn func(conn sqlx.SqlConn, v interface{}) (interface{}, error)
// ExecFn defines the sql exec method.
ExecFn func(conn sqlx.SqlConn) (sql.Result, error)
// IndexQueryFn defines the query method that based on unique indexes.
IndexQueryFn func(conn sqlx.SqlConn, v interface{}) (interface{}, error)
// PrimaryQueryFn defines the query method that based on primary keys.
PrimaryQueryFn func(conn sqlx.SqlConn, v, primary interface{}) error
QueryFn func(conn sqlx.SqlConn, v interface{}) error
// QueryFn defines the query method.
QueryFn func(conn sqlx.SqlConn, v interface{}) error
// A CachedConn is a DB connection with cache capability.
CachedConn struct {
db sqlx.SqlConn
cache cache.Cache
}
)
// NewNodeConn returns a CachedConn with a redis node cache.
func NewNodeConn(db sqlx.SqlConn, rds *redis.Redis, opts ...cache.Option) CachedConn {
return CachedConn{
db: db,
@ -40,6 +47,7 @@ func NewNodeConn(db sqlx.SqlConn, rds *redis.Redis, opts ...cache.Option) Cached
}
}
// NewConn returns a CachedConn with a redis cluster cache.
func NewConn(db sqlx.SqlConn, c cache.CacheConf, opts ...cache.Option) CachedConn {
return CachedConn{
db: db,
@ -47,14 +55,17 @@ func NewConn(db sqlx.SqlConn, c cache.CacheConf, opts ...cache.Option) CachedCon
}
}
// DelCache deletes cache with keys.
func (cc CachedConn) DelCache(keys ...string) error {
return cc.cache.Del(keys...)
}
// GetCache unmarshals cache with given key into v.
func (cc CachedConn) GetCache(key string, v interface{}) error {
return cc.cache.Get(key, v)
}
// Exec runs given exec on given keys, and returns execution result.
func (cc CachedConn) Exec(exec ExecFn, keys ...string) (sql.Result, error) {
res, err := exec(cc.db)
if err != nil {
@ -68,16 +79,19 @@ func (cc CachedConn) Exec(exec ExecFn, keys ...string) (sql.Result, error) {
return res, nil
}
// ExecNoCache runs exec with given sql statement, without affecting cache.
func (cc CachedConn) ExecNoCache(q string, args ...interface{}) (sql.Result, error) {
return cc.db.Exec(q, args...)
}
// QueryRow unmarshals into v with given key and query func.
func (cc CachedConn) QueryRow(v interface{}, key string, query QueryFn) error {
return cc.cache.Take(v, key, func(v interface{}) error {
return query(cc.db, v)
})
}
// QueryRowIndex unmarshals into v with given key.
func (cc CachedConn) QueryRowIndex(v interface{}, key string, keyer func(primary interface{}) string,
indexQuery IndexQueryFn, primaryQuery PrimaryQueryFn) error {
var primaryKey interface{}
@ -104,19 +118,23 @@ func (cc CachedConn) QueryRowIndex(v interface{}, key string, keyer func(primary
})
}
// QueryRowNoCache unmarshals into v with given statement.
func (cc CachedConn) QueryRowNoCache(v interface{}, q string, args ...interface{}) error {
return cc.db.QueryRow(v, q, args...)
}
// QueryRowsNoCache doesn't use cache, because it might cause consistency problem.
// QueryRowsNoCache unmarshals into v with given statement.
// It doesn't use cache, because it might cause consistency problem.
func (cc CachedConn) QueryRowsNoCache(v interface{}, q string, args ...interface{}) error {
return cc.db.QueryRows(v, q, args...)
}
// SetCache sets v into cache with given key.
func (cc CachedConn) SetCache(key string, v interface{}) error {
return cc.cache.Set(key, v)
}
// Transact runs given fn in transaction mode.
func (cc CachedConn) Transact(fn func(sqlx.Session) error) error {
return cc.db.Transact(fn)
}

@ -20,8 +20,10 @@ const (
var emptyBulkStmt bulkStmt
type (
// ResultHandler defines the method of result handlers.
ResultHandler func(sql.Result, error)
// A BulkInserter is used to batch insert records.
BulkInserter struct {
executor *executors.PeriodicalExecutor
inserter *dbInserter
@ -35,6 +37,7 @@ type (
}
)
// NewBulkInserter returns a BulkInserter.
func NewBulkInserter(sqlConn SqlConn, stmt string) (*BulkInserter, error) {
bkStmt, err := parseInsertStmt(stmt)
if err != nil {
@ -53,10 +56,12 @@ func NewBulkInserter(sqlConn SqlConn, stmt string) (*BulkInserter, error) {
}, nil
}
// Flush flushes all the pending records.
func (bi *BulkInserter) Flush() {
bi.executor.Flush()
}
// Insert inserts given args.
func (bi *BulkInserter) Insert(args ...interface{}) error {
value, err := format(bi.stmt.valueFormat, args...)
if err != nil {
@ -68,17 +73,20 @@ func (bi *BulkInserter) Insert(args ...interface{}) error {
return nil
}
// SetResultHandler sets the given handler.
func (bi *BulkInserter) SetResultHandler(handler ResultHandler) {
bi.executor.Sync(func() {
bi.inserter.resultHandler = handler
})
}
// UpdateOrDelete runs update or delete queries, which flushes pending records first.
func (bi *BulkInserter) UpdateOrDelete(fn func()) {
bi.executor.Flush()
fn()
}
// UpdateStmt updates the insert statement.
func (bi *BulkInserter) UpdateStmt(stmt string) error {
bkStmt, err := parseInsertStmt(stmt)
if err != nil {

@ -7,6 +7,7 @@ const (
duplicateEntryCode uint16 = 1062
)
// NewMysql returns a mysql connection.
func NewMysql(datasource string, opts ...SqlOption) SqlConn {
opts = append(opts, withMysqlAcceptable())
return NewSqlConn(mysqlDriverName, datasource, opts...)

@ -11,9 +11,13 @@ import (
const tagName = "db"
var (
ErrNotMatchDestination = errors.New("not matching destination to scan")
ErrNotReadableValue = errors.New("value not addressable or interfaceable")
ErrNotSettable = errors.New("passed in variable is not settable")
// ErrNotMatchDestination is an error that indicates not matching destination to scan.
ErrNotMatchDestination = errors.New("not matching destination to scan")
// ErrNotReadableValue is an error that indicates value is not addressable or interfaceable.
ErrNotReadableValue = errors.New("value not addressable or interfaceable")
// ErrNotSettable is an error that indicates the passed in variable is not settable.
ErrNotSettable = errors.New("passed in variable is not settable")
// ErrUnsupportedValueType is an error that indicates unsupported unmarshal type.
ErrUnsupportedValueType = errors.New("unsupported unmarshal type")
)

@ -6,6 +6,7 @@ import (
"github.com/tal-tech/go-zero/core/breaker"
)
// ErrNotFound is an alias of sql.ErrNoRows
var ErrNotFound = sql.ErrNoRows
type (
@ -25,8 +26,10 @@ type (
Transact(func(session Session) error) error
}
// SqlOption defines the method to customize a sql connection.
SqlOption func(*commonSqlConn)
// StmtSession interface represents a session that can be used to execute statements.
StmtSession interface {
Close() error
Exec(args ...interface{}) (sql.Result, error)
@ -62,6 +65,7 @@ type (
}
)
// NewSqlConn returns a SqlConn with given driver name and datasource.
func NewSqlConn(driverName, datasource string, opts ...SqlOption) SqlConn {
conn := &commonSqlConn{
driverName: driverName,

Loading…
Cancel
Save