From c566b5ff82d63d8af672b42a173eb87ebd5eede5 Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Sun, 28 Feb 2021 23:02:49 +0800 Subject: [PATCH] fix golint issues in core/stores (#527) --- core/stores/cache/cache.go | 4 +- core/stores/cache/cache_test.go | 4 +- core/stores/cache/cacheconf.go | 1 + core/stores/cache/cachenode.go | 4 +- core/stores/cache/cachenode_test.go | 14 +-- core/stores/cache/cacheopt.go | 4 + core/stores/cache/cachestat.go | 40 +++++---- core/stores/cache/cleaner.go | 1 + core/stores/cache/config.go | 2 + core/stores/cache/util.go | 1 + core/stores/clickhouse/clickhouse.go | 1 + core/stores/kv/config.go | 5 +- core/stores/kv/store.go | 3 + core/stores/mongo/bulkinserter.go | 6 ++ core/stores/mongo/collection.go | 2 + core/stores/mongo/internal/collection.go | 1 + core/stores/mongo/iter.go | 3 + core/stores/mongo/model.go | 18 ++++ core/stores/mongo/pipe.go | 1 + core/stores/mongo/query.go | 1 + core/stores/mongo/util.go | 1 + core/stores/mongoc/cachedcollection.go | 41 +++++++-- core/stores/mongoc/cachedmodel.go | 89 ++++++++++++------- core/stores/postgres/postgresql.go | 1 + core/stores/redis/conf.go | 10 ++- core/stores/redis/redis.go | 108 +++++++++++++++++++++-- core/stores/redis/redisblockingnode.go | 2 + core/stores/redis/redislock.go | 5 ++ core/stores/redis/redistest/redistest.go | 1 + core/stores/redis/scriptcache.go | 5 ++ core/stores/sqlc/cachedsql.go | 28 ++++-- core/stores/sqlx/bulkinserter.go | 8 ++ core/stores/sqlx/mysql.go | 1 + core/stores/sqlx/orm.go | 10 ++- core/stores/sqlx/sqlconn.go | 4 + 35 files changed, 348 insertions(+), 82 deletions(-) diff --git a/core/stores/cache/cache.go b/core/stores/cache/cache.go index 215255cc..4ac13956 100644 --- a/core/stores/cache/cache.go +++ b/core/stores/cache/cache.go @@ -11,6 +11,7 @@ import ( ) type ( + // Cache interface is used to define the cache implementation. Cache interface { Del(keys ...string) error Get(key string, v interface{}) error @@ -27,7 +28,8 @@ type ( } ) -func New(c ClusterConf, barrier syncx.SharedCalls, st *CacheStat, errNotFound error, +// New returns a Cache. +func New(c ClusterConf, barrier syncx.SharedCalls, st *Stat, errNotFound error, opts ...Option) Cache { if len(c) == 0 || TotalWeights(c) <= 0 { log.Fatal("no cache nodes") diff --git a/core/stores/cache/cache_test.go b/core/stores/cache/cache_test.go index 1e1e1811..92d30aed 100644 --- a/core/stores/cache/cache_test.go +++ b/core/stores/cache/cache_test.go @@ -102,7 +102,7 @@ func TestCache_SetDel(t *testing.T) { Weight: 100, }, } - c := New(conf, syncx.NewSharedCalls(), NewCacheStat("mock"), errPlaceholder) + c := New(conf, syncx.NewSharedCalls(), NewStat("mock"), errPlaceholder) for i := 0; i < total; i++ { if i%2 == 0 { assert.Nil(t, c.Set(fmt.Sprintf("key/%d", i), i)) @@ -140,7 +140,7 @@ func TestCache_OneNode(t *testing.T) { Weight: 100, }, } - c := New(conf, syncx.NewSharedCalls(), NewCacheStat("mock"), errPlaceholder) + c := New(conf, syncx.NewSharedCalls(), NewStat("mock"), errPlaceholder) for i := 0; i < total; i++ { if i%2 == 0 { assert.Nil(t, c.Set(fmt.Sprintf("key/%d", i), i)) diff --git a/core/stores/cache/cacheconf.go b/core/stores/cache/cacheconf.go index 36ba1171..9c1e2bf4 100644 --- a/core/stores/cache/cacheconf.go +++ b/core/stores/cache/cacheconf.go @@ -1,3 +1,4 @@ package cache +// CacheConf is an alias of ClusterConf. type CacheConf = ClusterConf diff --git a/core/stores/cache/cachenode.go b/core/stores/cache/cachenode.go index 56b92cfc..2cf8a609 100644 --- a/core/stores/cache/cachenode.go +++ b/core/stores/cache/cachenode.go @@ -33,7 +33,7 @@ type cacheNode struct { r *rand.Rand lock *sync.Mutex unstableExpiry mathx.Unstable - stat *CacheStat + stat *Stat errNotFound error } @@ -43,7 +43,7 @@ type cacheNode struct { // st is used to stat the cache. // errNotFound defines the error that returned on cache not found. // opts are the options that customize the cacheNode. -func NewNode(rds *redis.Redis, barrier syncx.SharedCalls, st *CacheStat, +func NewNode(rds *redis.Redis, barrier syncx.SharedCalls, st *Stat, errNotFound error, opts ...Option) Cache { o := newOptions(opts...) return cacheNode{ diff --git a/core/stores/cache/cachenode_test.go b/core/stores/cache/cachenode_test.go index c26cf9b7..b4979e58 100644 --- a/core/stores/cache/cachenode_test.go +++ b/core/stores/cache/cachenode_test.go @@ -36,7 +36,7 @@ func TestCacheNode_DelCache(t *testing.T) { r: rand.New(rand.NewSource(time.Now().UnixNano())), lock: new(sync.Mutex), unstableExpiry: mathx.NewUnstable(expiryDeviation), - stat: NewCacheStat("any"), + stat: NewStat("any"), errNotFound: errTestNotFound, } assert.Nil(t, cn.Del()) @@ -59,7 +59,7 @@ func TestCacheNode_InvalidCache(t *testing.T) { r: rand.New(rand.NewSource(time.Now().UnixNano())), lock: new(sync.Mutex), unstableExpiry: mathx.NewUnstable(expiryDeviation), - stat: NewCacheStat("any"), + stat: NewStat("any"), errNotFound: errTestNotFound, } s.Set("any", "value") @@ -81,7 +81,7 @@ func TestCacheNode_Take(t *testing.T) { barrier: syncx.NewSharedCalls(), lock: new(sync.Mutex), unstableExpiry: mathx.NewUnstable(expiryDeviation), - stat: NewCacheStat("any"), + stat: NewStat("any"), errNotFound: errTestNotFound, } var str string @@ -108,7 +108,7 @@ func TestCacheNode_TakeNotFound(t *testing.T) { barrier: syncx.NewSharedCalls(), lock: new(sync.Mutex), unstableExpiry: mathx.NewUnstable(expiryDeviation), - stat: NewCacheStat("any"), + stat: NewStat("any"), errNotFound: errTestNotFound, } var str string @@ -147,7 +147,7 @@ func TestCacheNode_TakeWithExpire(t *testing.T) { barrier: syncx.NewSharedCalls(), lock: new(sync.Mutex), unstableExpiry: mathx.NewUnstable(expiryDeviation), - stat: NewCacheStat("any"), + stat: NewStat("any"), errNotFound: errors.New("any"), } var str string @@ -174,7 +174,7 @@ func TestCacheNode_String(t *testing.T) { barrier: syncx.NewSharedCalls(), lock: new(sync.Mutex), unstableExpiry: mathx.NewUnstable(expiryDeviation), - stat: NewCacheStat("any"), + stat: NewStat("any"), errNotFound: errors.New("any"), } assert.Equal(t, store.Addr, cn.String()) @@ -191,7 +191,7 @@ func TestCacheValueWithBigInt(t *testing.T) { barrier: syncx.NewSharedCalls(), lock: new(sync.Mutex), unstableExpiry: mathx.NewUnstable(expiryDeviation), - stat: NewCacheStat("any"), + stat: NewStat("any"), errNotFound: errors.New("any"), } diff --git a/core/stores/cache/cacheopt.go b/core/stores/cache/cacheopt.go index 9ceff415..32b5d851 100644 --- a/core/stores/cache/cacheopt.go +++ b/core/stores/cache/cacheopt.go @@ -8,11 +8,13 @@ const ( ) type ( + // An Options is used to store the cache options. Options struct { Expiry time.Duration NotFoundExpiry time.Duration } + // Option defines the method to customize an Options. Option func(o *Options) ) @@ -32,12 +34,14 @@ func newOptions(opts ...Option) Options { return o } +// WithExpiry returns a func to customize a Options with given expiry. func WithExpiry(expiry time.Duration) Option { return func(o *Options) { o.Expiry = expiry } } +// WithNotFoundExpiry returns a func to customize a Options with given not found expiry. func WithNotFoundExpiry(expiry time.Duration) Option { return func(o *Options) { o.NotFoundExpiry = expiry diff --git a/core/stores/cache/cachestat.go b/core/stores/cache/cachestat.go index 88de7e56..67eb68f3 100644 --- a/core/stores/cache/cachestat.go +++ b/core/stores/cache/cachestat.go @@ -9,7 +9,8 @@ import ( const statInterval = time.Minute -type CacheStat struct { +// A Stat is used to stat the cache. +type Stat struct { name string // export the fields to let the unit tests working, // reside in internal package, doesn't matter. @@ -19,8 +20,9 @@ type CacheStat struct { DbFails uint64 } -func NewCacheStat(name string) *CacheStat { - ret := &CacheStat{ +// NewStat returns a Stat. +func NewStat(name string) *Stat { + ret := &Stat{ name: name, } go ret.statLoop() @@ -28,37 +30,41 @@ func NewCacheStat(name string) *CacheStat { return ret } -func (cs *CacheStat) IncrementTotal() { - atomic.AddUint64(&cs.Total, 1) +// IncrementTotal increments the total count. +func (s *Stat) IncrementTotal() { + atomic.AddUint64(&s.Total, 1) } -func (cs *CacheStat) IncrementHit() { - atomic.AddUint64(&cs.Hit, 1) +// IncrementHit increments the hit count. +func (s *Stat) IncrementHit() { + atomic.AddUint64(&s.Hit, 1) } -func (cs *CacheStat) IncrementMiss() { - atomic.AddUint64(&cs.Miss, 1) +// IncrementMiss increments the miss count. +func (s *Stat) IncrementMiss() { + atomic.AddUint64(&s.Miss, 1) } -func (cs *CacheStat) IncrementDbFails() { - atomic.AddUint64(&cs.DbFails, 1) +// IncrementDbFails increments the db fail count. +func (s *Stat) IncrementDbFails() { + atomic.AddUint64(&s.DbFails, 1) } -func (cs *CacheStat) statLoop() { +func (s *Stat) statLoop() { ticker := time.NewTicker(statInterval) defer ticker.Stop() for range ticker.C { - total := atomic.SwapUint64(&cs.Total, 0) + total := atomic.SwapUint64(&s.Total, 0) if total == 0 { continue } - hit := atomic.SwapUint64(&cs.Hit, 0) + hit := atomic.SwapUint64(&s.Hit, 0) percent := 100 * float32(hit) / float32(total) - miss := atomic.SwapUint64(&cs.Miss, 0) - dbf := atomic.SwapUint64(&cs.DbFails, 0) + miss := atomic.SwapUint64(&s.Miss, 0) + dbf := atomic.SwapUint64(&s.DbFails, 0) logx.Statf("dbcache(%s) - qpm: %d, hit_ratio: %.1f%%, hit: %d, miss: %d, db_fails: %d", - cs.name, total, percent, hit, miss, dbf) + s.name, total, percent, hit, miss, dbf) } } diff --git a/core/stores/cache/cleaner.go b/core/stores/cache/cleaner.go index 54798f13..09ac7175 100644 --- a/core/stores/cache/cleaner.go +++ b/core/stores/cache/cleaner.go @@ -39,6 +39,7 @@ func init() { }) } +// AddCleanTask adds a clean task on given keys. func AddCleanTask(task func() error, keys ...string) { timingWheel.SetTimer(stringx.Randn(taskKeyLen), delayTask{ delay: time.Second, diff --git a/core/stores/cache/config.go b/core/stores/cache/config.go index a5466a07..ca4cf30f 100644 --- a/core/stores/cache/config.go +++ b/core/stores/cache/config.go @@ -3,8 +3,10 @@ package cache import "github.com/tal-tech/go-zero/core/stores/redis" type ( + // A ClusterConf is the config of a redis cluster that used as cache. ClusterConf []NodeConf + // A NodeConf is the config of a redis node that used as cache. NodeConf struct { redis.RedisConf Weight int `json:",default=100"` diff --git a/core/stores/cache/util.go b/core/stores/cache/util.go index 390d20cc..203d79ad 100644 --- a/core/stores/cache/util.go +++ b/core/stores/cache/util.go @@ -4,6 +4,7 @@ import "strings" const keySeparator = "," +// TotalWeights returns the total weights of given nodes. func TotalWeights(c []NodeConf) int { var weights int diff --git a/core/stores/clickhouse/clickhouse.go b/core/stores/clickhouse/clickhouse.go index a78a0477..a46d8703 100644 --- a/core/stores/clickhouse/clickhouse.go +++ b/core/stores/clickhouse/clickhouse.go @@ -8,6 +8,7 @@ import ( const clickHouseDriverName = "clickhouse" +// New returns a clickhouse connection. func New(datasource string, opts ...sqlx.SqlOption) sqlx.SqlConn { return sqlx.NewSqlConn(clickHouseDriverName, datasource, opts...) } diff --git a/core/stores/kv/config.go b/core/stores/kv/config.go index dc86ee9f..e6a3a324 100644 --- a/core/stores/kv/config.go +++ b/core/stores/kv/config.go @@ -1,7 +1,6 @@ package kv -import ( - "github.com/tal-tech/go-zero/core/stores/cache" -) +import "github.com/tal-tech/go-zero/core/stores/cache" +// KvConf is an alias of cache.ClusterConf. type KvConf = cache.ClusterConf diff --git a/core/stores/kv/store.go b/core/stores/kv/store.go index bf835848..b022042f 100644 --- a/core/stores/kv/store.go +++ b/core/stores/kv/store.go @@ -10,9 +10,11 @@ import ( "github.com/tal-tech/go-zero/core/stores/redis" ) +// ErrNoRedisNode is an error that indicates no redis node. var ErrNoRedisNode = errors.New("no redis node") type ( + // Store interface represents a KV store. Store interface { Del(keys ...string) (int, error) Eval(script string, key string, args ...interface{}) (interface{}, error) @@ -81,6 +83,7 @@ type ( } ) +// NewStore returns a Store. func NewStore(c KvConf) Store { if len(c) == 0 || cache.TotalWeights(c) <= 0 { log.Fatal("no cache nodes") diff --git a/core/stores/mongo/bulkinserter.go b/core/stores/mongo/bulkinserter.go index 858bb33d..9791fc9b 100644 --- a/core/stores/mongo/bulkinserter.go +++ b/core/stores/mongo/bulkinserter.go @@ -14,14 +14,17 @@ const ( ) type ( + // ResultHandler is a handler that used to handle results. ResultHandler func(*mgo.BulkResult, error) + // A BulkInserter is used to insert bulk of mongo records. BulkInserter struct { executor *executors.PeriodicalExecutor inserter *dbInserter } ) +// NewBulkInserter returns a BulkInserter. func NewBulkInserter(session *mgo.Session, dbName string, collectionNamer func() string) *BulkInserter { inserter := &dbInserter{ session: session, @@ -35,14 +38,17 @@ func NewBulkInserter(session *mgo.Session, dbName string, collectionNamer func() } } +// Flush flushes the inserter, writes all pending records. func (bi *BulkInserter) Flush() { bi.executor.Flush() } +// Insert inserts doc. func (bi *BulkInserter) Insert(doc interface{}) { bi.executor.Add(doc) } +// SetResultHandler sets the result handler. func (bi *BulkInserter) SetResultHandler(handler ResultHandler) { bi.executor.Sync(func() { bi.inserter.resultHandler = handler diff --git a/core/stores/mongo/collection.go b/core/stores/mongo/collection.go index aee8a972..6fb199b2 100644 --- a/core/stores/mongo/collection.go +++ b/core/stores/mongo/collection.go @@ -13,9 +13,11 @@ import ( const slowThreshold = time.Millisecond * 500 +// ErrNotFound is an alias of mgo.ErrNotFound. var ErrNotFound = mgo.ErrNotFound type ( + // Collection interface represents a mongo connection. Collection interface { Find(query interface{}) Query FindId(id interface{}) Query diff --git a/core/stores/mongo/internal/collection.go b/core/stores/mongo/internal/collection.go index 054239a1..c6540ca3 100644 --- a/core/stores/mongo/internal/collection.go +++ b/core/stores/mongo/internal/collection.go @@ -4,6 +4,7 @@ package internal import "github.com/globalsign/mgo" +// MgoCollection interface represents a mgo collection. type MgoCollection interface { Find(query interface{}) *mgo.Query FindId(id interface{}) *mgo.Query diff --git a/core/stores/mongo/iter.go b/core/stores/mongo/iter.go index a664b303..544d8c1d 100644 --- a/core/stores/mongo/iter.go +++ b/core/stores/mongo/iter.go @@ -8,6 +8,7 @@ import ( ) type ( + // Iter interface represents a mongo iter. Iter interface { All(result interface{}) error Close() error @@ -19,6 +20,7 @@ type ( Timeout() bool } + // A ClosableIter is a closable mongo iter. ClosableIter struct { Iter Cleanup func() @@ -57,6 +59,7 @@ func (i promisedIter) For(result interface{}, f func() error) error { return i.promise.keep(err) } +// Close closes a mongo iter. func (it *ClosableIter) Close() error { err := it.Iter.Close() it.Cleanup() diff --git a/core/stores/mongo/model.go b/core/stores/mongo/model.go index 020bf07f..f8c298ac 100644 --- a/core/stores/mongo/model.go +++ b/core/stores/mongo/model.go @@ -12,8 +12,10 @@ type ( timeout time.Duration } + // Option defines the method to customize a mongo model. Option func(opts *options) + // A Model is a mongo model. Model struct { session *concurrentSession db *mgo.Database @@ -22,6 +24,7 @@ type ( } ) +// MustNewModel returns a Model, exits on errors. func MustNewModel(url, collection string, opts ...Option) *Model { model, err := NewModel(url, collection, opts...) if err != nil { @@ -31,6 +34,7 @@ func MustNewModel(url, collection string, opts ...Option) *Model { return model } +// NewModel returns a Model. func NewModel(url, collection string, opts ...Option) (*Model, error) { session, err := getConcurrentSession(url) if err != nil { @@ -46,72 +50,85 @@ func NewModel(url, collection string, opts ...Option) (*Model, error) { }, nil } +// Find finds a record with given query. func (mm *Model) Find(query interface{}) (Query, error) { return mm.query(func(c Collection) Query { return c.Find(query) }) } +// FindId finds a record with given id. func (mm *Model) FindId(id interface{}) (Query, error) { return mm.query(func(c Collection) Query { return c.FindId(id) }) } +// GetCollection returns a Collection with given session. func (mm *Model) GetCollection(session *mgo.Session) Collection { return newCollection(mm.db.C(mm.collection).With(session)) } +// Insert inserts docs into mm. func (mm *Model) Insert(docs ...interface{}) error { return mm.execute(func(c Collection) error { return c.Insert(docs...) }) } +// Pipe returns a Pipe with given pipeline. func (mm *Model) Pipe(pipeline interface{}) (Pipe, error) { return mm.pipe(func(c Collection) Pipe { return c.Pipe(pipeline) }) } +// PutSession returns the given session. func (mm *Model) PutSession(session *mgo.Session) { mm.session.putSession(session) } +// Remove removes the records with given selector. func (mm *Model) Remove(selector interface{}) error { return mm.execute(func(c Collection) error { return c.Remove(selector) }) } +// RemoveAll removes all with given selector and returns a mgo.ChangeInfo. func (mm *Model) RemoveAll(selector interface{}) (*mgo.ChangeInfo, error) { return mm.change(func(c Collection) (*mgo.ChangeInfo, error) { return c.RemoveAll(selector) }) } +// RemoveId removes a record with given id. func (mm *Model) RemoveId(id interface{}) error { return mm.execute(func(c Collection) error { return c.RemoveId(id) }) } +// TakeSession gets a session. func (mm *Model) TakeSession() (*mgo.Session, error) { return mm.session.takeSession(mm.opts...) } +// Update updates a record with given selector. func (mm *Model) Update(selector, update interface{}) error { return mm.execute(func(c Collection) error { return c.Update(selector, update) }) } +// UpdateId updates a record with given id. func (mm *Model) UpdateId(id, update interface{}) error { return mm.execute(func(c Collection) error { return c.UpdateId(id, update) }) } +// Upsert upserts a record with given selector, and returns a mgo.ChangeInfo. func (mm *Model) Upsert(selector, update interface{}) (*mgo.ChangeInfo, error) { return mm.change(func(c Collection) (*mgo.ChangeInfo, error) { return c.Upsert(selector, update) @@ -158,6 +175,7 @@ func (mm *Model) query(fn func(c Collection) Query) (Query, error) { return fn(mm.GetCollection(session)), nil } +// WithTimeout customizes an operation with given timeout. func WithTimeout(timeout time.Duration) Option { return func(opts *options) { opts.timeout = timeout diff --git a/core/stores/mongo/pipe.go b/core/stores/mongo/pipe.go index 52bd2e2b..98ddc930 100644 --- a/core/stores/mongo/pipe.go +++ b/core/stores/mongo/pipe.go @@ -8,6 +8,7 @@ import ( ) type ( + // Pipe interface represents a mongo pipe. Pipe interface { All(result interface{}) error AllowDiskUse() Pipe diff --git a/core/stores/mongo/query.go b/core/stores/mongo/query.go index 101c74bc..b7e4077d 100644 --- a/core/stores/mongo/query.go +++ b/core/stores/mongo/query.go @@ -8,6 +8,7 @@ import ( ) type ( + // Query interface represents a mongo query. Query interface { All(result interface{}) error Apply(change mgo.Change, result interface{}) (*mgo.ChangeInfo, error) diff --git a/core/stores/mongo/util.go b/core/stores/mongo/util.go index 0db2c035..aff7c6ae 100644 --- a/core/stores/mongo/util.go +++ b/core/stores/mongo/util.go @@ -4,6 +4,7 @@ import "strings" const mongoAddrSep = "," +// FormatAddr formats mongo hosts to a string. func FormatAddr(hosts []string) string { return strings.Join(hosts, mongoAddrSep) } diff --git a/core/stores/mongoc/cachedcollection.go b/core/stores/mongoc/cachedcollection.go index 32f25235..2be85e8f 100644 --- a/core/stores/mongoc/cachedcollection.go +++ b/core/stores/mongoc/cachedcollection.go @@ -8,23 +8,52 @@ import ( ) var ( + // ErrNotFound is an alias of mgo.ErrNotFound. ErrNotFound = mgo.ErrNotFound // can't use one SharedCalls per conn, because multiple conns may share the same cache key. sharedCalls = syncx.NewSharedCalls() - stats = cache.NewCacheStat("mongoc") + stats = cache.NewStat("mongoc") ) type ( + // QueryOption defines the method to customize a mongo query. QueryOption func(query mongo.Query) mongo.Query + // CachedCollection interface represents a mongo collection with cache. + CachedCollection interface { + Count(query interface{}) (int, error) + DelCache(keys ...string) error + FindAllNoCache(v interface{}, query interface{}, opts ...QueryOption) error + FindOne(v interface{}, key string, query interface{}) error + FindOneNoCache(v interface{}, query interface{}) error + FindOneId(v interface{}, key string, id interface{}) error + FindOneIdNoCache(v interface{}, id interface{}) error + GetCache(key string, v interface{}) error + Insert(docs ...interface{}) error + Pipe(pipeline interface{}) mongo.Pipe + Remove(selector interface{}, keys ...string) error + RemoveNoCache(selector interface{}) error + RemoveAll(selector interface{}, keys ...string) (*mgo.ChangeInfo, error) + RemoveAllNoCache(selector interface{}) (*mgo.ChangeInfo, error) + RemoveId(id interface{}, keys ...string) error + RemoveIdNoCache(id interface{}) error + SetCache(key string, v interface{}) error + Update(selector, update interface{}, keys ...string) error + UpdateNoCache(selector, update interface{}) error + UpdateId(id, update interface{}, keys ...string) error + UpdateIdNoCache(id, update interface{}) error + Upsert(selector, update interface{}, keys ...string) (*mgo.ChangeInfo, error) + UpsertNoCache(selector, update interface{}) (*mgo.ChangeInfo, error) + } + cachedCollection struct { collection mongo.Collection cache cache.Cache } ) -func newCollection(collection mongo.Collection, c cache.Cache) *cachedCollection { +func newCollection(collection mongo.Collection, c cache.Cache) CachedCollection { return &cachedCollection{ collection: collection, cache: c, @@ -39,10 +68,6 @@ func (c *cachedCollection) DelCache(keys ...string) error { return c.cache.Del(keys...) } -func (c *cachedCollection) GetCache(key string, v interface{}) error { - return c.cache.Get(key, v) -} - func (c *cachedCollection) FindAllNoCache(v interface{}, query interface{}, opts ...QueryOption) error { q := c.collection.Find(query) for _, opt := range opts { @@ -75,6 +100,10 @@ func (c *cachedCollection) FindOneIdNoCache(v interface{}, id interface{}) error return q.One(v) } +func (c *cachedCollection) GetCache(key string, v interface{}) error { + return c.cache.Get(key, v) +} + func (c *cachedCollection) Insert(docs ...interface{}) error { return c.collection.Insert(docs...) } diff --git a/core/stores/mongoc/cachedmodel.go b/core/stores/mongoc/cachedmodel.go index 765373d7..e5d59090 100644 --- a/core/stores/mongoc/cachedmodel.go +++ b/core/stores/mongoc/cachedmodel.go @@ -9,12 +9,14 @@ import ( "github.com/tal-tech/go-zero/core/stores/redis" ) +// A Model is a mongo model that built with cache capability. type Model struct { *mongo.Model cache cache.Cache - generateCollection func(*mgo.Session) *cachedCollection + generateCollection func(*mgo.Session) CachedCollection } +// MustNewNodeModel returns a Model with a cache node, exists on errors. func MustNewNodeModel(url, collection string, rds *redis.Redis, opts ...cache.Option) *Model { model, err := NewNodeModel(url, collection, rds, opts...) if err != nil { @@ -24,6 +26,7 @@ func MustNewNodeModel(url, collection string, rds *redis.Redis, opts ...cache.Op return model } +// MustNewModel returns a Model with a cache cluster, exists on errors. func MustNewModel(url, collection string, c cache.CacheConf, opts ...cache.Option) *Model { model, err := NewModel(url, collection, c, opts...) if err != nil { @@ -33,157 +36,183 @@ func MustNewModel(url, collection string, c cache.CacheConf, opts ...cache.Optio return model } +// NewNodeModel returns a Model with a cache node. func NewNodeModel(url, collection string, rds *redis.Redis, opts ...cache.Option) (*Model, error) { c := cache.NewNode(rds, sharedCalls, stats, mgo.ErrNotFound, opts...) - return createModel(url, collection, c, func(collection mongo.Collection) *cachedCollection { + return createModel(url, collection, c, func(collection mongo.Collection) CachedCollection { return newCollection(collection, c) }) } +// NewModel returns a Model with a cache cluster. func NewModel(url, collection string, conf cache.CacheConf, opts ...cache.Option) (*Model, error) { c := cache.New(conf, sharedCalls, stats, mgo.ErrNotFound, opts...) - return createModel(url, collection, c, func(collection mongo.Collection) *cachedCollection { + return createModel(url, collection, c, func(collection mongo.Collection) CachedCollection { return newCollection(collection, c) }) } +// Count returns the count of given query. func (mm *Model) Count(query interface{}) (int, error) { - return mm.executeInt(func(c *cachedCollection) (int, error) { + return mm.executeInt(func(c CachedCollection) (int, error) { return c.Count(query) }) } +// DelCache deletes the cache with given keys. func (mm *Model) DelCache(keys ...string) error { return mm.cache.Del(keys...) } +// GetCache unmarshal the cache into v with given key. func (mm *Model) GetCache(key string, v interface{}) error { return mm.cache.Get(key, v) } -func (mm *Model) GetCollection(session *mgo.Session) *cachedCollection { +// GetCollection returns a cache collection. +func (mm *Model) GetCollection(session *mgo.Session) CachedCollection { return mm.generateCollection(session) } +// FindAllNoCache finds all records without cache. func (mm *Model) FindAllNoCache(v interface{}, query interface{}, opts ...QueryOption) error { - return mm.execute(func(c *cachedCollection) error { + return mm.execute(func(c CachedCollection) error { return c.FindAllNoCache(v, query, opts...) }) } +// FindOne unmarshals a record into v with given key and query. func (mm *Model) FindOne(v interface{}, key string, query interface{}) error { - return mm.execute(func(c *cachedCollection) error { + return mm.execute(func(c CachedCollection) error { return c.FindOne(v, key, query) }) } +// FindOneNoCache unmarshals a record into v with query, without cache. func (mm *Model) FindOneNoCache(v interface{}, query interface{}) error { - return mm.execute(func(c *cachedCollection) error { + return mm.execute(func(c CachedCollection) error { return c.FindOneNoCache(v, query) }) } +// FindOneId unmarshals a record into v with query. func (mm *Model) FindOneId(v interface{}, key string, id interface{}) error { - return mm.execute(func(c *cachedCollection) error { + return mm.execute(func(c CachedCollection) error { return c.FindOneId(v, key, id) }) } +// FindOneIdNoCache unmarshals a record into v with query, without cache. func (mm *Model) FindOneIdNoCache(v interface{}, id interface{}) error { - return mm.execute(func(c *cachedCollection) error { + return mm.execute(func(c CachedCollection) error { return c.FindOneIdNoCache(v, id) }) } +// Insert inserts docs. func (mm *Model) Insert(docs ...interface{}) error { - return mm.execute(func(c *cachedCollection) error { + return mm.execute(func(c CachedCollection) error { return c.Insert(docs...) }) } +// Pipe returns a mongo pipe with given pipeline. func (mm *Model) Pipe(pipeline interface{}) (mongo.Pipe, error) { - return mm.pipe(func(c *cachedCollection) mongo.Pipe { + return mm.pipe(func(c CachedCollection) mongo.Pipe { return c.Pipe(pipeline) }) } +// Remove removes a record with given selector, and remove it from cache with given keys. func (mm *Model) Remove(selector interface{}, keys ...string) error { - return mm.execute(func(c *cachedCollection) error { + return mm.execute(func(c CachedCollection) error { return c.Remove(selector, keys...) }) } +// RemoveNoCache removes a record with given selector. func (mm *Model) RemoveNoCache(selector interface{}) error { - return mm.execute(func(c *cachedCollection) error { + return mm.execute(func(c CachedCollection) error { return c.RemoveNoCache(selector) }) } +// RemoveAll removes all records with given selector, and removes cache with given keys. func (mm *Model) RemoveAll(selector interface{}, keys ...string) (*mgo.ChangeInfo, error) { - return mm.change(func(c *cachedCollection) (*mgo.ChangeInfo, error) { + return mm.change(func(c CachedCollection) (*mgo.ChangeInfo, error) { return c.RemoveAll(selector, keys...) }) } +// RemoveAllNoCache removes all records with given selector, and returns a mgo.ChangeInfo. func (mm *Model) RemoveAllNoCache(selector interface{}) (*mgo.ChangeInfo, error) { - return mm.change(func(c *cachedCollection) (*mgo.ChangeInfo, error) { + return mm.change(func(c CachedCollection) (*mgo.ChangeInfo, error) { return c.RemoveAllNoCache(selector) }) } +// RemoveId removes a record with given id, and removes cache with given keys. func (mm *Model) RemoveId(id interface{}, keys ...string) error { - return mm.execute(func(c *cachedCollection) error { + return mm.execute(func(c CachedCollection) error { return c.RemoveId(id, keys...) }) } +// RemoveIdNoCache removes a record with given id. func (mm *Model) RemoveIdNoCache(id interface{}) error { - return mm.execute(func(c *cachedCollection) error { + return mm.execute(func(c CachedCollection) error { return c.RemoveIdNoCache(id) }) } +// SetCache sets the cache with given key and value. func (mm *Model) SetCache(key string, v interface{}) error { return mm.cache.Set(key, v) } +// Update updates the record with given selector, and delete cache with given keys. func (mm *Model) Update(selector, update interface{}, keys ...string) error { - return mm.execute(func(c *cachedCollection) error { + return mm.execute(func(c CachedCollection) error { return c.Update(selector, update, keys...) }) } +// UpdateNoCache updates the record with given selector. func (mm *Model) UpdateNoCache(selector, update interface{}) error { - return mm.execute(func(c *cachedCollection) error { + return mm.execute(func(c CachedCollection) error { return c.UpdateNoCache(selector, update) }) } +// UpdateId updates the record with given id, and delete cache with given keys. func (mm *Model) UpdateId(id, update interface{}, keys ...string) error { - return mm.execute(func(c *cachedCollection) error { + return mm.execute(func(c CachedCollection) error { return c.UpdateId(id, update, keys...) }) } +// UpdateIdNoCache updates the record with given id. func (mm *Model) UpdateIdNoCache(id, update interface{}) error { - return mm.execute(func(c *cachedCollection) error { + return mm.execute(func(c CachedCollection) error { return c.UpdateIdNoCache(id, update) }) } +// Upsert upserts a record with given selector, and delete cache with given keys. func (mm *Model) Upsert(selector, update interface{}, keys ...string) (*mgo.ChangeInfo, error) { - return mm.change(func(c *cachedCollection) (*mgo.ChangeInfo, error) { + return mm.change(func(c CachedCollection) (*mgo.ChangeInfo, error) { return c.Upsert(selector, update, keys...) }) } +// UpsertNoCache upserts a record with given selector. func (mm *Model) UpsertNoCache(selector, update interface{}) (*mgo.ChangeInfo, error) { - return mm.change(func(c *cachedCollection) (*mgo.ChangeInfo, error) { + return mm.change(func(c CachedCollection) (*mgo.ChangeInfo, error) { return c.UpsertNoCache(selector, update) }) } -func (mm *Model) change(fn func(c *cachedCollection) (*mgo.ChangeInfo, error)) (*mgo.ChangeInfo, error) { +func (mm *Model) change(fn func(c CachedCollection) (*mgo.ChangeInfo, error)) (*mgo.ChangeInfo, error) { session, err := mm.TakeSession() if err != nil { return nil, err @@ -193,7 +222,7 @@ func (mm *Model) change(fn func(c *cachedCollection) (*mgo.ChangeInfo, error)) ( return fn(mm.GetCollection(session)) } -func (mm *Model) execute(fn func(c *cachedCollection) error) error { +func (mm *Model) execute(fn func(c CachedCollection) error) error { session, err := mm.TakeSession() if err != nil { return err @@ -203,7 +232,7 @@ func (mm *Model) execute(fn func(c *cachedCollection) error) error { return fn(mm.GetCollection(session)) } -func (mm *Model) executeInt(fn func(c *cachedCollection) (int, error)) (int, error) { +func (mm *Model) executeInt(fn func(c CachedCollection) (int, error)) (int, error) { session, err := mm.TakeSession() if err != nil { return 0, err @@ -213,7 +242,7 @@ func (mm *Model) executeInt(fn func(c *cachedCollection) (int, error)) (int, err return fn(mm.GetCollection(session)) } -func (mm *Model) pipe(fn func(c *cachedCollection) mongo.Pipe) (mongo.Pipe, error) { +func (mm *Model) pipe(fn func(c CachedCollection) mongo.Pipe) (mongo.Pipe, error) { session, err := mm.TakeSession() if err != nil { return nil, err @@ -224,7 +253,7 @@ func (mm *Model) pipe(fn func(c *cachedCollection) mongo.Pipe) (mongo.Pipe, erro } func createModel(url, collection string, c cache.Cache, - create func(mongo.Collection) *cachedCollection) (*Model, error) { + create func(mongo.Collection) CachedCollection) (*Model, error) { model, err := mongo.NewModel(url, collection) if err != nil { return nil, err @@ -233,7 +262,7 @@ func createModel(url, collection string, c cache.Cache, return &Model{ Model: model, cache: c, - generateCollection: func(session *mgo.Session) *cachedCollection { + generateCollection: func(session *mgo.Session) CachedCollection { collection := model.GetCollection(session) return create(collection) }, diff --git a/core/stores/postgres/postgresql.go b/core/stores/postgres/postgresql.go index 550c7367..84d9d486 100644 --- a/core/stores/postgres/postgresql.go +++ b/core/stores/postgres/postgresql.go @@ -8,6 +8,7 @@ import ( const postgresDriverName = "postgres" +// New returns a postgres connection. func New(datasource string, opts ...sqlx.SqlOption) sqlx.SqlConn { return sqlx.NewSqlConn(postgresDriverName, datasource, opts...) } diff --git a/core/stores/redis/conf.go b/core/stores/redis/conf.go index f2fc21c5..25621b22 100644 --- a/core/stores/redis/conf.go +++ b/core/stores/redis/conf.go @@ -3,28 +3,35 @@ package redis import "errors" var ( + // ErrEmptyHost is an error that indicates no redis host is set. ErrEmptyHost = errors.New("empty redis host") + // ErrEmptyType is an error that indicates no redis type is set. ErrEmptyType = errors.New("empty redis type") - ErrEmptyKey = errors.New("empty redis key") + // ErrEmptyKey is an error that indicates no redis key is set. + ErrEmptyKey = errors.New("empty redis key") ) type ( + // A RedisConf is a redis config. RedisConf struct { Host string Type string `json:",default=node,options=node|cluster"` Pass string `json:",optional"` } + // A RedisKeyConf is a redis config with key. RedisKeyConf struct { RedisConf Key string `json:",optional"` } ) +// NewRedis returns a Redis. func (rc RedisConf) NewRedis() *Redis { return NewRedis(rc.Host, rc.Type, rc.Pass) } +// Validate validates the RedisConf. func (rc RedisConf) Validate() error { if len(rc.Host) == 0 { return ErrEmptyHost @@ -37,6 +44,7 @@ func (rc RedisConf) Validate() error { return nil } +// Validate validates the RedisKeyConf. func (rkc RedisKeyConf) Validate() error { if err := rkc.RedisConf.Validate(); err != nil { return err diff --git a/core/stores/redis/redis.go b/core/stores/redis/redis.go index 7b9eac1d..244f0732 100644 --- a/core/stores/redis/redis.go +++ b/core/stores/redis/redis.go @@ -12,9 +12,12 @@ import ( ) const ( + // ClusterType means redis cluster. ClusterType = "cluster" - NodeType = "node" - Nil = red.Nil + // NodeType means redis node. + NodeType = "node" + // Nil is an alias of redis.Nil. + Nil = red.Nil blockingQueryTimeout = 5 * time.Second readWriteTimeout = 2 * time.Second @@ -22,9 +25,11 @@ const ( slowThreshold = time.Millisecond * 100 ) +// ErrNilNode is an error that indicates a nil redis node. var ErrNilNode = errors.New("nil redis node") type ( + // A Pair is a key/pair set used in redis zset. Pair struct { Key string Score int64 @@ -38,6 +43,7 @@ type ( brk breaker.Breaker } + // RedisNode interface represents a redis node. RedisNode interface { red.Cmdable } @@ -46,18 +52,24 @@ type ( GeoLocation = red.GeoLocation // GeoRadiusQuery is used with GeoRadius to query geospatial index. GeoRadiusQuery = red.GeoRadiusQuery - GeoPos = red.GeoPos + // GeoPos is used to represent a geo position. + GeoPos = red.GeoPos + // Pipeliner is an alias of redis.Pipeliner. Pipeliner = red.Pipeliner // Z represents sorted set member. - Z = red.Z + Z = red.Z + // ZStore is an alias of redis.ZStore. ZStore = red.ZStore - IntCmd = red.IntCmd + // IntCmd is an alias of redis.IntCmd. + IntCmd = red.IntCmd + // FloatCmd is an alias of redis.FloatCmd. FloatCmd = red.FloatCmd ) +// NewRedis returns a Redis. func NewRedis(redisAddr, redisType string, redisPass ...string) *Redis { var pass string for _, v := range redisPass { @@ -184,6 +196,8 @@ func (s *Redis) Blpop(redisNode RedisNode, key string) (string, error) { return vals[1], nil } +// BlpopEx uses passed in redis connection to execute blpop command. +// The difference against Blpop is that this method returns a bool to indicate success. func (s *Redis) BlpopEx(redisNode RedisNode, key string) (string, bool, error) { if redisNode == nil { return "", false, ErrNilNode @@ -201,6 +215,7 @@ func (s *Redis) BlpopEx(redisNode RedisNode, key string) (string, bool, error) { return vals[1], true, nil } +// Del deletes keys. func (s *Redis) Del(keys ...string) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -220,6 +235,7 @@ func (s *Redis) Del(keys ...string) (val int, err error) { return } +// Eval is the implementation of redis eval command. func (s *Redis) Eval(script string, keys []string, args ...interface{}) (val interface{}, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -234,6 +250,7 @@ func (s *Redis) Eval(script string, keys []string, args ...interface{}) (val int return } +// Exists is the implementation of redis exists command. func (s *Redis) Exists(key string) (val bool, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -253,6 +270,7 @@ func (s *Redis) Exists(key string) (val bool, err error) { return } +// Expire is the implementation of redis expire command. func (s *Redis) Expire(key string, seconds int) error { return s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -264,6 +282,7 @@ func (s *Redis) Expire(key string, seconds int) error { }, acceptable) } +// Expireat is the implementation of redis expireat command. func (s *Redis) Expireat(key string, expireTime int64) error { return s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -275,6 +294,7 @@ func (s *Redis) Expireat(key string, expireTime int64) error { }, acceptable) } +// GeoAdd is the implementation of redis geoadd command. func (s *Redis) GeoAdd(key string, geoLocation ...*GeoLocation) (val int64, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -293,6 +313,7 @@ func (s *Redis) GeoAdd(key string, geoLocation ...*GeoLocation) (val int64, err return } +// GeoDist is the implementation of redis geodist command. func (s *Redis) GeoDist(key string, member1, member2, unit string) (val float64, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -311,6 +332,7 @@ func (s *Redis) GeoDist(key string, member1, member2, unit string) (val float64, return } +// GeoHash is the implementation of redis geohash command. func (s *Redis) GeoHash(key string, members ...string) (val []string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -329,6 +351,7 @@ func (s *Redis) GeoHash(key string, members ...string) (val []string, err error) return } +// GeoRadius is the implementation of redis georadius command. func (s *Redis) GeoRadius(key string, longitude, latitude float64, query *GeoRadiusQuery) (val []GeoLocation, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -346,6 +369,8 @@ func (s *Redis) GeoRadius(key string, longitude, latitude float64, query *GeoRad }, acceptable) return } + +// GeoRadiusByMember is the implementation of redis georadiusbymember command. func (s *Redis) GeoRadiusByMember(key, member string, query *GeoRadiusQuery) (val []GeoLocation, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -364,6 +389,7 @@ func (s *Redis) GeoRadiusByMember(key, member string, query *GeoRadiusQuery) (va return } +// GeoPos is the implementation of redis geopos command. func (s *Redis) GeoPos(key string, members ...string) (val []*GeoPos, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -382,6 +408,7 @@ func (s *Redis) GeoPos(key string, members ...string) (val []*GeoPos, err error) return } +// Get is the implementation of redis get command. func (s *Redis) Get(key string) (val string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -401,6 +428,7 @@ func (s *Redis) Get(key string) (val string, err error) { return } +// GetBit is the implementation of redis getbit command. func (s *Redis) GetBit(key string, offset int64) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -420,6 +448,7 @@ func (s *Redis) GetBit(key string, offset int64) (val int, err error) { return } +// Hdel is the implementation of redis hdel command. func (s *Redis) Hdel(key, field string) (val bool, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -439,6 +468,7 @@ func (s *Redis) Hdel(key, field string) (val bool, err error) { return } +// Hexists is the implementation of redis hexists command. func (s *Redis) Hexists(key, field string) (val bool, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -453,6 +483,7 @@ func (s *Redis) Hexists(key, field string) (val bool, err error) { return } +// Hget is the implementation of redis hget command. func (s *Redis) Hget(key, field string) (val string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -467,6 +498,7 @@ func (s *Redis) Hget(key, field string) (val string, err error) { return } +// Hgetall is the implementation of redis hgetall command. func (s *Redis) Hgetall(key string) (val map[string]string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -481,6 +513,7 @@ func (s *Redis) Hgetall(key string) (val map[string]string, err error) { return } +// Hincrby is the implementation of redis hincrby command. func (s *Redis) Hincrby(key, field string, increment int) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -500,6 +533,7 @@ func (s *Redis) Hincrby(key, field string, increment int) (val int, err error) { return } +// Hkeys is the implementation of redis hkeys command. func (s *Redis) Hkeys(key string) (val []string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -514,6 +548,7 @@ func (s *Redis) Hkeys(key string) (val []string, err error) { return } +// Hlen is the implementation of redis hlen command. func (s *Redis) Hlen(key string) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -533,6 +568,7 @@ func (s *Redis) Hlen(key string) (val int, err error) { return } +// Hmget is the implementation of redis hmget command. func (s *Redis) Hmget(key string, fields ...string) (val []string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -552,6 +588,7 @@ func (s *Redis) Hmget(key string, fields ...string) (val []string, err error) { return } +// Hset is the implementation of redis hset command. func (s *Redis) Hset(key, field, value string) error { return s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -563,6 +600,7 @@ func (s *Redis) Hset(key, field, value string) error { }, acceptable) } +// Hsetnx is the implementation of redis hsetnx command. func (s *Redis) Hsetnx(key, field, value string) (val bool, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -577,6 +615,7 @@ func (s *Redis) Hsetnx(key, field, value string) (val bool, err error) { return } +// Hmset is the implementation of redis hmset command. func (s *Redis) Hmset(key string, fieldsAndValues map[string]string) error { return s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -593,6 +632,7 @@ func (s *Redis) Hmset(key string, fieldsAndValues map[string]string) error { }, acceptable) } +// Hscan is the implementation of redis hscan command. func (s *Redis) Hscan(key string, cursor uint64, match string, count int64) (keys []string, cur uint64, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -607,6 +647,7 @@ func (s *Redis) Hscan(key string, cursor uint64, match string, count int64) (key return } +// Hvals is the implementation of redis hvals command. func (s *Redis) Hvals(key string) (val []string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -621,6 +662,7 @@ func (s *Redis) Hvals(key string) (val []string, err error) { return } +// Incr is the implementation of redis incr command. func (s *Redis) Incr(key string) (val int64, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -635,6 +677,7 @@ func (s *Redis) Incr(key string) (val int64, err error) { return } +// Incrby is the implementation of redis incrby command. func (s *Redis) Incrby(key string, increment int64) (val int64, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -649,6 +692,7 @@ func (s *Redis) Incrby(key string, increment int64) (val int64, err error) { return } +// Keys is the implementation of redis keys command. func (s *Redis) Keys(pattern string) (val []string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -663,6 +707,7 @@ func (s *Redis) Keys(pattern string) (val []string, err error) { return } +// Llen is the implementation of redis llen command. func (s *Redis) Llen(key string) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -682,6 +727,7 @@ func (s *Redis) Llen(key string) (val int, err error) { return } +// Lpop is the implementation of redis lpop command. func (s *Redis) Lpop(key string) (val string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -696,6 +742,7 @@ func (s *Redis) Lpop(key string) (val string, err error) { return } +// Lpush is the implementation of redis lpush command. func (s *Redis) Lpush(key string, values ...interface{}) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -715,6 +762,7 @@ func (s *Redis) Lpush(key string, values ...interface{}) (val int, err error) { return } +// Lrange is the implementation of redis lrange command. func (s *Redis) Lrange(key string, start int, stop int) (val []string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -729,6 +777,7 @@ func (s *Redis) Lrange(key string, start int, stop int) (val []string, err error return } +// Lrem is the implementation of redis lrem command. func (s *Redis) Lrem(key string, count int, value string) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -748,6 +797,7 @@ func (s *Redis) Lrem(key string, count int, value string) (val int, err error) { return } +// Mget is the implementation of redis mget command. func (s *Redis) Mget(keys ...string) (val []string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -767,6 +817,7 @@ func (s *Redis) Mget(keys ...string) (val []string, err error) { return } +// Persist is the implementation of redis persist command. func (s *Redis) Persist(key string) (val bool, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -781,6 +832,7 @@ func (s *Redis) Persist(key string) (val bool, err error) { return } +// Pfadd is the implementation of redis pfadd command. func (s *Redis) Pfadd(key string, values ...interface{}) (val bool, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -800,6 +852,7 @@ func (s *Redis) Pfadd(key string, values ...interface{}) (val bool, err error) { return } +// Pfcount is the implementation of redis pfcount command. func (s *Redis) Pfcount(key string) (val int64, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -814,6 +867,7 @@ func (s *Redis) Pfcount(key string) (val int64, err error) { return } +// Pfmerge is the implementation of redis pfmerge command. func (s *Redis) Pfmerge(dest string, keys ...string) error { return s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -826,6 +880,7 @@ func (s *Redis) Pfmerge(dest string, keys ...string) error { }, acceptable) } +// Ping is the implementation of redis ping command. func (s *Redis) Ping() (val bool) { // ignore error, error means false _ = s.brk.DoWithAcceptable(func() error { @@ -848,6 +903,7 @@ func (s *Redis) Ping() (val bool) { return } +// Pipelined lets fn to execute pipelined commands. func (s *Redis) Pipelined(fn func(Pipeliner) error) (err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -863,6 +919,7 @@ func (s *Redis) Pipelined(fn func(Pipeliner) error) (err error) { return } +// Rpop is the implementation of redis rpop command. func (s *Redis) Rpop(key string) (val string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -877,6 +934,7 @@ func (s *Redis) Rpop(key string) (val string, err error) { return } +// Rpush is the implementation of redis rpush command. func (s *Redis) Rpush(key string, values ...interface{}) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -896,6 +954,7 @@ func (s *Redis) Rpush(key string, values ...interface{}) (val int, err error) { return } +// Sadd is the implementation of redis sadd command. func (s *Redis) Sadd(key string, values ...interface{}) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -915,6 +974,7 @@ func (s *Redis) Sadd(key string, values ...interface{}) (val int, err error) { return } +// Scan is the implementation of redis scan command. func (s *Redis) Scan(cursor uint64, match string, count int64) (keys []string, cur uint64, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -929,6 +989,7 @@ func (s *Redis) Scan(cursor uint64, match string, count int64) (keys []string, c return } +// SetBit is the implementation of redis setbit command. func (s *Redis) SetBit(key string, offset int64, value int) error { return s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -941,6 +1002,7 @@ func (s *Redis) SetBit(key string, offset int64, value int) error { }, acceptable) } +// Sscan is the implementation of redis sscan command. func (s *Redis) Sscan(key string, cursor uint64, match string, count int64) (keys []string, cur uint64, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -955,6 +1017,7 @@ func (s *Redis) Sscan(key string, cursor uint64, match string, count int64) (key return } +// Scard is the implementation of redis scard command. func (s *Redis) Scard(key string) (val int64, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -969,6 +1032,7 @@ func (s *Redis) Scard(key string) (val int64, err error) { return } +// Set is the implementation of redis set command. func (s *Redis) Set(key string, value string) error { return s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -980,6 +1044,7 @@ func (s *Redis) Set(key string, value string) error { }, acceptable) } +// Setex is the implementation of redis setex command. func (s *Redis) Setex(key, value string, seconds int) error { return s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -991,6 +1056,7 @@ func (s *Redis) Setex(key, value string, seconds int) error { }, acceptable) } +// Setnx is the implementation of redis setnx command. func (s *Redis) Setnx(key, value string) (val bool, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1005,6 +1071,7 @@ func (s *Redis) Setnx(key, value string) (val bool, err error) { return } +// SetnxEx is the implementation of redis setnx command with expire. func (s *Redis) SetnxEx(key, value string, seconds int) (val bool, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1019,6 +1086,7 @@ func (s *Redis) SetnxEx(key, value string, seconds int) (val bool, err error) { return } +// Sismember is the implementation of redis sismember command. func (s *Redis) Sismember(key string, value interface{}) (val bool, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1033,6 +1101,7 @@ func (s *Redis) Sismember(key string, value interface{}) (val bool, err error) { return } +// Srem is the implementation of redis srem command. func (s *Redis) Srem(key string, values ...interface{}) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1052,6 +1121,7 @@ func (s *Redis) Srem(key string, values ...interface{}) (val int, err error) { return } +// Smembers is the implementation of redis smembers command. func (s *Redis) Smembers(key string) (val []string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1066,6 +1136,7 @@ func (s *Redis) Smembers(key string) (val []string, err error) { return } +// Spop is the implementation of redis spop command. func (s *Redis) Spop(key string) (val string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1080,6 +1151,7 @@ func (s *Redis) Spop(key string) (val string, err error) { return } +// Srandmember is the implementation of redis srandmember command. func (s *Redis) Srandmember(key string, count int) (val []string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1094,6 +1166,7 @@ func (s *Redis) Srandmember(key string, count int) (val []string, err error) { return } +// Sunion is the implementation of redis sunion command. func (s *Redis) Sunion(keys ...string) (val []string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1108,6 +1181,7 @@ func (s *Redis) Sunion(keys ...string) (val []string, err error) { return } +// Sunionstore is the implementation of redis sunionstore command. func (s *Redis) Sunionstore(destination string, keys ...string) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1127,6 +1201,7 @@ func (s *Redis) Sunionstore(destination string, keys ...string) (val int, err er return } +// Sdiff is the implementation of redis sdiff command. func (s *Redis) Sdiff(keys ...string) (val []string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1141,6 +1216,7 @@ func (s *Redis) Sdiff(keys ...string) (val []string, err error) { return } +// Sdiffstore is the implementation of redis sdiffstore command. func (s *Redis) Sdiffstore(destination string, keys ...string) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1160,6 +1236,7 @@ func (s *Redis) Sdiffstore(destination string, keys ...string) (val int, err err return } +// Ttl is the implementation of redis ttl command. func (s *Redis) Ttl(key string) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1179,6 +1256,7 @@ func (s *Redis) Ttl(key string) (val int, err error) { return } +// Zadd is the implementation of redis zadd command. func (s *Redis) Zadd(key string, score int64, value string) (val bool, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1201,6 +1279,7 @@ func (s *Redis) Zadd(key string, score int64, value string) (val bool, err error return } +// Zadds is the implementation of redis zadds command. func (s *Redis) Zadds(key string, ps ...Pair) (val int64, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1226,6 +1305,7 @@ func (s *Redis) Zadds(key string, ps ...Pair) (val int64, err error) { return } +// Zcard is the implementation of redis zcard command. func (s *Redis) Zcard(key string) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1245,6 +1325,7 @@ func (s *Redis) Zcard(key string) (val int, err error) { return } +// Zcount is the implementation of redis zcount command. func (s *Redis) Zcount(key string, start, stop int64) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1264,6 +1345,7 @@ func (s *Redis) Zcount(key string, start, stop int64) (val int, err error) { return } +// Zincrby is the implementation of redis zincrby command. func (s *Redis) Zincrby(key string, increment int64, field string) (val int64, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1283,6 +1365,7 @@ func (s *Redis) Zincrby(key string, increment int64, field string) (val int64, e return } +// Zscore is the implementation of redis zscore command. func (s *Redis) Zscore(key string, value string) (val int64, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1302,6 +1385,7 @@ func (s *Redis) Zscore(key string, value string) (val int64, err error) { return } +// Zrank is the implementation of redis zrank command. func (s *Redis) Zrank(key, field string) (val int64, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1316,6 +1400,7 @@ func (s *Redis) Zrank(key, field string) (val int64, err error) { return } +// Zrem is the implementation of redis zrem command. func (s *Redis) Zrem(key string, values ...interface{}) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1335,6 +1420,7 @@ func (s *Redis) Zrem(key string, values ...interface{}) (val int, err error) { return } +// Zremrangebyscore is the implementation of redis zremrangebyscore command. func (s *Redis) Zremrangebyscore(key string, start, stop int64) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1355,6 +1441,7 @@ func (s *Redis) Zremrangebyscore(key string, start, stop int64) (val int, err er return } +// Zremrangebyrank is the implementation of redis zremrangebyrank command. func (s *Redis) Zremrangebyrank(key string, start, stop int64) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1374,6 +1461,7 @@ func (s *Redis) Zremrangebyrank(key string, start, stop int64) (val int, err err return } +// Zrange is the implementation of redis zrange command. func (s *Redis) Zrange(key string, start, stop int64) (val []string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1388,6 +1476,7 @@ func (s *Redis) Zrange(key string, start, stop int64) (val []string, err error) return } +// ZrangeWithScores is the implementation of redis zrange command with scores. func (s *Redis) ZrangeWithScores(key string, start, stop int64) (val []Pair, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1407,6 +1496,7 @@ func (s *Redis) ZrangeWithScores(key string, start, stop int64) (val []Pair, err return } +// ZRevRangeWithScores is the implementation of redis zrevrange command with scores. func (s *Redis) ZRevRangeWithScores(key string, start, stop int64) (val []Pair, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1426,6 +1516,7 @@ func (s *Redis) ZRevRangeWithScores(key string, start, stop int64) (val []Pair, return } +// ZrangebyscoreWithScores is the implementation of redis zrangebyscore command with scores. func (s *Redis) ZrangebyscoreWithScores(key string, start, stop int64) (val []Pair, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1448,6 +1539,7 @@ func (s *Redis) ZrangebyscoreWithScores(key string, start, stop int64) (val []Pa return } +// ZrangebyscoreWithScoresAndLimit is the implementation of redis zrangebyscore command with scores and limit. func (s *Redis) ZrangebyscoreWithScoresAndLimit(key string, start, stop int64, page, size int) ( val []Pair, err error) { err = s.brk.DoWithAcceptable(func() error { @@ -1477,6 +1569,7 @@ func (s *Redis) ZrangebyscoreWithScoresAndLimit(key string, start, stop int64, p return } +// Zrevrange is the implementation of redis zrevrange command. func (s *Redis) Zrevrange(key string, start, stop int64) (val []string, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1491,6 +1584,7 @@ func (s *Redis) Zrevrange(key string, start, stop int64) (val []string, err erro return } +// ZrevrangebyscoreWithScores is the implementation of redis zrevrangebyscore command with scores. func (s *Redis) ZrevrangebyscoreWithScores(key string, start, stop int64) (val []Pair, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1513,6 +1607,7 @@ func (s *Redis) ZrevrangebyscoreWithScores(key string, start, stop int64) (val [ return } +// ZrevrangebyscoreWithScoresAndLimit is the implementation of redis zrevrangebyscore command with scores and limit. func (s *Redis) ZrevrangebyscoreWithScoresAndLimit(key string, start, stop int64, page, size int) ( val []Pair, err error) { err = s.brk.DoWithAcceptable(func() error { @@ -1542,6 +1637,7 @@ func (s *Redis) ZrevrangebyscoreWithScoresAndLimit(key string, start, stop int64 return } +// Zrevrank is the implementation of redis zrevrank command. func (s *Redis) Zrevrank(key string, field string) (val int64, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1556,6 +1652,7 @@ func (s *Redis) Zrevrank(key string, field string) (val int64, err error) { return } +// Zunionstore is the implementation of redis zunionstore command. func (s *Redis) Zunionstore(dest string, store ZStore, keys ...string) (val int64, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) @@ -1570,6 +1667,7 @@ func (s *Redis) Zunionstore(dest string, store ZStore, keys ...string) (val int6 return } +// String returns the string representation of s. func (s *Redis) String() string { return s.Addr } diff --git a/core/stores/redis/redisblockingnode.go b/core/stores/redis/redisblockingnode.go index e70f745a..e6c6fa81 100644 --- a/core/stores/redis/redisblockingnode.go +++ b/core/stores/redis/redisblockingnode.go @@ -7,11 +7,13 @@ import ( "github.com/tal-tech/go-zero/core/logx" ) +// ClosableNode interface represents a closable redis node. type ClosableNode interface { RedisNode Close() } +// CreateBlockingNode returns a ClosableNode. func CreateBlockingNode(r *Redis) (ClosableNode, error) { timeout := readWriteTimeout + blockingQueryTimeout diff --git a/core/stores/redis/redislock.go b/core/stores/redis/redislock.go index bb059e79..61659a51 100644 --- a/core/stores/redis/redislock.go +++ b/core/stores/redis/redislock.go @@ -28,6 +28,7 @@ end` millisPerSecond = 1000 ) +// A RedisLock is a redis lock. type RedisLock struct { store *Redis seconds uint32 @@ -39,6 +40,7 @@ func init() { rand.Seed(time.Now().UnixNano()) } +// NewRedisLock returns a RedisLock. func NewRedisLock(store *Redis, key string) *RedisLock { return &RedisLock{ store: store, @@ -47,6 +49,7 @@ func NewRedisLock(store *Redis, key string) *RedisLock { } } +// Acquire acquires the lock. func (rl *RedisLock) Acquire() (bool, error) { seconds := atomic.LoadUint32(&rl.seconds) resp, err := rl.store.Eval(lockCommand, []string{rl.key}, []string{ @@ -69,6 +72,7 @@ func (rl *RedisLock) Acquire() (bool, error) { return false, nil } +// Release releases the lock. func (rl *RedisLock) Release() (bool, error) { resp, err := rl.store.Eval(delCommand, []string{rl.key}, []string{rl.id}) if err != nil { @@ -83,6 +87,7 @@ func (rl *RedisLock) Release() (bool, error) { return reply == 1, nil } +// SetExpire sets the expire. func (rl *RedisLock) SetExpire(seconds int) { atomic.StoreUint32(&rl.seconds, uint32(seconds)) } diff --git a/core/stores/redis/redistest/redistest.go b/core/stores/redis/redistest/redistest.go index 029761a0..adebcb3b 100644 --- a/core/stores/redis/redistest/redistest.go +++ b/core/stores/redis/redistest/redistest.go @@ -8,6 +8,7 @@ import ( "github.com/tal-tech/go-zero/core/stores/redis" ) +// CreateRedis returns a in process redis.Redis. func CreateRedis() (r *redis.Redis, clean func(), err error) { mr, err := miniredis.Run() if err != nil { diff --git a/core/stores/redis/scriptcache.go b/core/stores/redis/scriptcache.go index 2e38f272..6f68ec8d 100644 --- a/core/stores/redis/scriptcache.go +++ b/core/stores/redis/scriptcache.go @@ -12,13 +12,16 @@ var ( ) type ( + // Map is an alias of map[string]string. Map map[string]string + // A ScriptCache is a cache that stores a script with its sha key. ScriptCache struct { atomic.Value } ) +// GetScriptCache returns a ScriptCache. func GetScriptCache() *ScriptCache { once.Do(func() { instance = &ScriptCache{} @@ -28,12 +31,14 @@ func GetScriptCache() *ScriptCache { return instance } +// GetSha returns the sha string of given script. func (sc *ScriptCache) GetSha(script string) (string, bool) { cache := sc.Load().(Map) ret, ok := cache[script] return ret, ok } +// SetSha sets script with sha into the ScriptCache. func (sc *ScriptCache) SetSha(script, sha string) { lock.Lock() defer lock.Unlock() diff --git a/core/stores/sqlc/cachedsql.go b/core/stores/sqlc/cachedsql.go index 332ea7d2..2e3618d4 100644 --- a/core/stores/sqlc/cachedsql.go +++ b/core/stores/sqlc/cachedsql.go @@ -14,25 +14,32 @@ import ( const cacheSafeGapBetweenIndexAndPrimary = time.Second * 5 var ( + // ErrNotFound is an alias of sqlx.ErrNotFound. ErrNotFound = sqlx.ErrNotFound // can't use one SharedCalls per conn, because multiple conns may share the same cache key. exclusiveCalls = syncx.NewSharedCalls() - stats = cache.NewCacheStat("sqlc") + stats = cache.NewStat("sqlc") ) type ( - ExecFn func(conn sqlx.SqlConn) (sql.Result, error) - IndexQueryFn func(conn sqlx.SqlConn, v interface{}) (interface{}, error) + // ExecFn defines the sql exec method. + ExecFn func(conn sqlx.SqlConn) (sql.Result, error) + // IndexQueryFn defines the query method that based on unique indexes. + IndexQueryFn func(conn sqlx.SqlConn, v interface{}) (interface{}, error) + // PrimaryQueryFn defines the query method that based on primary keys. PrimaryQueryFn func(conn sqlx.SqlConn, v, primary interface{}) error - QueryFn func(conn sqlx.SqlConn, v interface{}) error + // QueryFn defines the query method. + QueryFn func(conn sqlx.SqlConn, v interface{}) error + // A CachedConn is a DB connection with cache capability. CachedConn struct { db sqlx.SqlConn cache cache.Cache } ) +// NewNodeConn returns a CachedConn with a redis node cache. func NewNodeConn(db sqlx.SqlConn, rds *redis.Redis, opts ...cache.Option) CachedConn { return CachedConn{ db: db, @@ -40,6 +47,7 @@ func NewNodeConn(db sqlx.SqlConn, rds *redis.Redis, opts ...cache.Option) Cached } } +// NewConn returns a CachedConn with a redis cluster cache. func NewConn(db sqlx.SqlConn, c cache.CacheConf, opts ...cache.Option) CachedConn { return CachedConn{ db: db, @@ -47,14 +55,17 @@ func NewConn(db sqlx.SqlConn, c cache.CacheConf, opts ...cache.Option) CachedCon } } +// DelCache deletes cache with keys. func (cc CachedConn) DelCache(keys ...string) error { return cc.cache.Del(keys...) } +// GetCache unmarshals cache with given key into v. func (cc CachedConn) GetCache(key string, v interface{}) error { return cc.cache.Get(key, v) } +// Exec runs given exec on given keys, and returns execution result. func (cc CachedConn) Exec(exec ExecFn, keys ...string) (sql.Result, error) { res, err := exec(cc.db) if err != nil { @@ -68,16 +79,19 @@ func (cc CachedConn) Exec(exec ExecFn, keys ...string) (sql.Result, error) { return res, nil } +// ExecNoCache runs exec with given sql statement, without affecting cache. func (cc CachedConn) ExecNoCache(q string, args ...interface{}) (sql.Result, error) { return cc.db.Exec(q, args...) } +// QueryRow unmarshals into v with given key and query func. func (cc CachedConn) QueryRow(v interface{}, key string, query QueryFn) error { return cc.cache.Take(v, key, func(v interface{}) error { return query(cc.db, v) }) } +// QueryRowIndex unmarshals into v with given key. func (cc CachedConn) QueryRowIndex(v interface{}, key string, keyer func(primary interface{}) string, indexQuery IndexQueryFn, primaryQuery PrimaryQueryFn) error { var primaryKey interface{} @@ -104,19 +118,23 @@ func (cc CachedConn) QueryRowIndex(v interface{}, key string, keyer func(primary }) } +// QueryRowNoCache unmarshals into v with given statement. func (cc CachedConn) QueryRowNoCache(v interface{}, q string, args ...interface{}) error { return cc.db.QueryRow(v, q, args...) } -// QueryRowsNoCache doesn't use cache, because it might cause consistency problem. +// QueryRowsNoCache unmarshals into v with given statement. +// It doesn't use cache, because it might cause consistency problem. func (cc CachedConn) QueryRowsNoCache(v interface{}, q string, args ...interface{}) error { return cc.db.QueryRows(v, q, args...) } +// SetCache sets v into cache with given key. func (cc CachedConn) SetCache(key string, v interface{}) error { return cc.cache.Set(key, v) } +// Transact runs given fn in transaction mode. func (cc CachedConn) Transact(fn func(sqlx.Session) error) error { return cc.db.Transact(fn) } diff --git a/core/stores/sqlx/bulkinserter.go b/core/stores/sqlx/bulkinserter.go index 2f183b22..d51394cb 100644 --- a/core/stores/sqlx/bulkinserter.go +++ b/core/stores/sqlx/bulkinserter.go @@ -20,8 +20,10 @@ const ( var emptyBulkStmt bulkStmt type ( + // ResultHandler defines the method of result handlers. ResultHandler func(sql.Result, error) + // A BulkInserter is used to batch insert records. BulkInserter struct { executor *executors.PeriodicalExecutor inserter *dbInserter @@ -35,6 +37,7 @@ type ( } ) +// NewBulkInserter returns a BulkInserter. func NewBulkInserter(sqlConn SqlConn, stmt string) (*BulkInserter, error) { bkStmt, err := parseInsertStmt(stmt) if err != nil { @@ -53,10 +56,12 @@ func NewBulkInserter(sqlConn SqlConn, stmt string) (*BulkInserter, error) { }, nil } +// Flush flushes all the pending records. func (bi *BulkInserter) Flush() { bi.executor.Flush() } +// Insert inserts given args. func (bi *BulkInserter) Insert(args ...interface{}) error { value, err := format(bi.stmt.valueFormat, args...) if err != nil { @@ -68,17 +73,20 @@ func (bi *BulkInserter) Insert(args ...interface{}) error { return nil } +// SetResultHandler sets the given handler. func (bi *BulkInserter) SetResultHandler(handler ResultHandler) { bi.executor.Sync(func() { bi.inserter.resultHandler = handler }) } +// UpdateOrDelete runs update or delete queries, which flushes pending records first. func (bi *BulkInserter) UpdateOrDelete(fn func()) { bi.executor.Flush() fn() } +// UpdateStmt updates the insert statement. func (bi *BulkInserter) UpdateStmt(stmt string) error { bkStmt, err := parseInsertStmt(stmt) if err != nil { diff --git a/core/stores/sqlx/mysql.go b/core/stores/sqlx/mysql.go index c621415e..9f474f55 100644 --- a/core/stores/sqlx/mysql.go +++ b/core/stores/sqlx/mysql.go @@ -7,6 +7,7 @@ const ( duplicateEntryCode uint16 = 1062 ) +// NewMysql returns a mysql connection. func NewMysql(datasource string, opts ...SqlOption) SqlConn { opts = append(opts, withMysqlAcceptable()) return NewSqlConn(mysqlDriverName, datasource, opts...) diff --git a/core/stores/sqlx/orm.go b/core/stores/sqlx/orm.go index edba340a..933c41af 100644 --- a/core/stores/sqlx/orm.go +++ b/core/stores/sqlx/orm.go @@ -11,9 +11,13 @@ import ( const tagName = "db" var ( - ErrNotMatchDestination = errors.New("not matching destination to scan") - ErrNotReadableValue = errors.New("value not addressable or interfaceable") - ErrNotSettable = errors.New("passed in variable is not settable") + // ErrNotMatchDestination is an error that indicates not matching destination to scan. + ErrNotMatchDestination = errors.New("not matching destination to scan") + // ErrNotReadableValue is an error that indicates value is not addressable or interfaceable. + ErrNotReadableValue = errors.New("value not addressable or interfaceable") + // ErrNotSettable is an error that indicates the passed in variable is not settable. + ErrNotSettable = errors.New("passed in variable is not settable") + // ErrUnsupportedValueType is an error that indicates unsupported unmarshal type. ErrUnsupportedValueType = errors.New("unsupported unmarshal type") ) diff --git a/core/stores/sqlx/sqlconn.go b/core/stores/sqlx/sqlconn.go index c85a950b..5bf636e0 100644 --- a/core/stores/sqlx/sqlconn.go +++ b/core/stores/sqlx/sqlconn.go @@ -6,6 +6,7 @@ import ( "github.com/tal-tech/go-zero/core/breaker" ) +// ErrNotFound is an alias of sql.ErrNoRows var ErrNotFound = sql.ErrNoRows type ( @@ -25,8 +26,10 @@ type ( Transact(func(session Session) error) error } + // SqlOption defines the method to customize a sql connection. SqlOption func(*commonSqlConn) + // StmtSession interface represents a session that can be used to execute statements. StmtSession interface { Close() error Exec(args ...interface{}) (sql.Result, error) @@ -62,6 +65,7 @@ type ( } ) +// NewSqlConn returns a SqlConn with given driver name and datasource. func NewSqlConn(driverName, datasource string, opts ...SqlOption) SqlConn { conn := &commonSqlConn{ driverName: driverName,