You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-zero/core/stores/mongo/collection.go

241 lines
5.8 KiB
Go

4 years ago
package mongo
import (
"encoding/json"
"time"
"github.com/globalsign/mgo"
"github.com/tal-tech/go-zero/core/breaker"
"github.com/tal-tech/go-zero/core/logx"
4 years ago
"github.com/tal-tech/go-zero/core/stores/mongo/internal"
"github.com/tal-tech/go-zero/core/timex"
4 years ago
)
const slowThreshold = time.Millisecond * 500
var ErrNotFound = mgo.ErrNotFound
type (
Collection interface {
Find(query interface{}) Query
FindId(id interface{}) Query
Insert(docs ...interface{}) error
Pipe(pipeline interface{}) Pipe
Remove(selector interface{}) error
RemoveAll(selector interface{}) (*mgo.ChangeInfo, error)
RemoveId(id interface{}) error
Update(selector, update interface{}) error
UpdateId(id, update interface{}) error
Upsert(selector, update interface{}) (*mgo.ChangeInfo, error)
}
decoratedCollection struct {
4 years ago
name string
collection internal.MgoCollection
brk breaker.Breaker
4 years ago
}
keepablePromise struct {
promise breaker.Promise
log func(error)
}
)
func newCollection(collection *mgo.Collection) Collection {
return &decoratedCollection{
4 years ago
name: collection.FullName,
collection: collection,
4 years ago
brk: breaker.NewBreaker(),
}
}
func (c *decoratedCollection) Find(query interface{}) Query {
promise, err := c.brk.Allow()
if err != nil {
return rejectedQuery{}
}
startTime := timex.Now()
return promisedQuery{
4 years ago
Query: c.collection.Find(query),
4 years ago
promise: keepablePromise{
promise: promise,
log: func(err error) {
duration := timex.Since(startTime)
c.logDuration("find", duration, err, query)
},
},
}
}
func (c *decoratedCollection) FindId(id interface{}) Query {
promise, err := c.brk.Allow()
if err != nil {
return rejectedQuery{}
}
startTime := timex.Now()
return promisedQuery{
4 years ago
Query: c.collection.FindId(id),
4 years ago
promise: keepablePromise{
promise: promise,
log: func(err error) {
duration := timex.Since(startTime)
c.logDuration("findId", duration, err, id)
},
},
}
}
func (c *decoratedCollection) Insert(docs ...interface{}) (err error) {
return c.brk.DoWithAcceptable(func() error {
startTime := timex.Now()
defer func() {
duration := timex.Since(startTime)
c.logDuration("insert", duration, err, docs...)
}()
4 years ago
return c.collection.Insert(docs...)
4 years ago
}, acceptable)
}
func (c *decoratedCollection) Pipe(pipeline interface{}) Pipe {
promise, err := c.brk.Allow()
if err != nil {
return rejectedPipe{}
}
startTime := timex.Now()
return promisedPipe{
4 years ago
Pipe: c.collection.Pipe(pipeline),
4 years ago
promise: keepablePromise{
promise: promise,
log: func(err error) {
duration := timex.Since(startTime)
c.logDuration("pipe", duration, err, pipeline)
},
},
}
}
func (c *decoratedCollection) Remove(selector interface{}) (err error) {
return c.brk.DoWithAcceptable(func() error {
startTime := timex.Now()
defer func() {
duration := timex.Since(startTime)
c.logDuration("remove", duration, err, selector)
}()
4 years ago
return c.collection.Remove(selector)
4 years ago
}, acceptable)
}
func (c *decoratedCollection) RemoveAll(selector interface{}) (info *mgo.ChangeInfo, err error) {
err = c.brk.DoWithAcceptable(func() error {
startTime := timex.Now()
defer func() {
duration := timex.Since(startTime)
c.logDuration("removeAll", duration, err, selector)
}()
4 years ago
info, err = c.collection.RemoveAll(selector)
4 years ago
return err
}, acceptable)
return
}
func (c *decoratedCollection) RemoveId(id interface{}) (err error) {
return c.brk.DoWithAcceptable(func() error {
startTime := timex.Now()
defer func() {
duration := timex.Since(startTime)
c.logDuration("removeId", duration, err, id)
}()
4 years ago
return c.collection.RemoveId(id)
4 years ago
}, acceptable)
}
func (c *decoratedCollection) Update(selector, update interface{}) (err error) {
return c.brk.DoWithAcceptable(func() error {
startTime := timex.Now()
defer func() {
duration := timex.Since(startTime)
c.logDuration("update", duration, err, selector, update)
}()
4 years ago
return c.collection.Update(selector, update)
4 years ago
}, acceptable)
}
func (c *decoratedCollection) UpdateId(id, update interface{}) (err error) {
return c.brk.DoWithAcceptable(func() error {
startTime := timex.Now()
defer func() {
duration := timex.Since(startTime)
c.logDuration("updateId", duration, err, id, update)
}()
4 years ago
return c.collection.UpdateId(id, update)
4 years ago
}, acceptable)
}
func (c *decoratedCollection) Upsert(selector, update interface{}) (info *mgo.ChangeInfo, err error) {
err = c.brk.DoWithAcceptable(func() error {
startTime := timex.Now()
defer func() {
duration := timex.Since(startTime)
c.logDuration("upsert", duration, err, selector, update)
}()
4 years ago
info, err = c.collection.Upsert(selector, update)
4 years ago
return err
}, acceptable)
return
}
func (c *decoratedCollection) logDuration(method string, duration time.Duration, err error, docs ...interface{}) {
content, e := json.Marshal(docs)
if e != nil {
logx.Error(err)
} else if err != nil {
if duration > slowThreshold {
logx.WithDuration(duration).Slowf("[MONGO] mongo(%s) - slowcall - %s - fail(%s) - %s",
4 years ago
c.name, method, err.Error(), string(content))
4 years ago
} else {
logx.WithDuration(duration).Infof("mongo(%s) - %s - fail(%s) - %s",
4 years ago
c.name, method, err.Error(), string(content))
4 years ago
}
} else {
if duration > slowThreshold {
logx.WithDuration(duration).Slowf("[MONGO] mongo(%s) - slowcall - %s - ok - %s",
4 years ago
c.name, method, string(content))
4 years ago
} else {
4 years ago
logx.WithDuration(duration).Infof("mongo(%s) - %s - ok - %s", c.name, method, string(content))
4 years ago
}
}
}
func (p keepablePromise) accept(err error) error {
p.promise.Accept()
p.log(err)
return err
}
func (p keepablePromise) keep(err error) error {
if acceptable(err) {
p.promise.Accept()
} else {
p.promise.Reject(err.Error())
}
p.log(err)
return err
}
func acceptable(err error) bool {
return err == nil || err == mgo.ErrNotFound
}