You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
243 lines
5.9 KiB
Go
243 lines
5.9 KiB
Go
package mongo
|
|
|
|
import (
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"github.com/globalsign/mgo"
|
|
"github.com/tal-tech/go-zero/core/breaker"
|
|
"github.com/tal-tech/go-zero/core/logx"
|
|
"github.com/tal-tech/go-zero/core/stores/mongo/internal"
|
|
"github.com/tal-tech/go-zero/core/timex"
|
|
)
|
|
|
|
const slowThreshold = time.Millisecond * 500
|
|
|
|
// ErrNotFound is an alias of mgo.ErrNotFound.
|
|
var ErrNotFound = mgo.ErrNotFound
|
|
|
|
type (
|
|
// Collection interface represents a mongo connection.
|
|
Collection interface {
|
|
Find(query interface{}) Query
|
|
FindId(id interface{}) Query
|
|
Insert(docs ...interface{}) error
|
|
Pipe(pipeline interface{}) Pipe
|
|
Remove(selector interface{}) error
|
|
RemoveAll(selector interface{}) (*mgo.ChangeInfo, error)
|
|
RemoveId(id interface{}) error
|
|
Update(selector, update interface{}) error
|
|
UpdateId(id, update interface{}) error
|
|
Upsert(selector, update interface{}) (*mgo.ChangeInfo, error)
|
|
}
|
|
|
|
decoratedCollection struct {
|
|
name string
|
|
collection internal.MgoCollection
|
|
brk breaker.Breaker
|
|
}
|
|
|
|
keepablePromise struct {
|
|
promise breaker.Promise
|
|
log func(error)
|
|
}
|
|
)
|
|
|
|
func newCollection(collection *mgo.Collection, brk breaker.Breaker) Collection {
|
|
return &decoratedCollection{
|
|
name: collection.FullName,
|
|
collection: collection,
|
|
brk: brk,
|
|
}
|
|
}
|
|
|
|
func (c *decoratedCollection) Find(query interface{}) Query {
|
|
promise, err := c.brk.Allow()
|
|
if err != nil {
|
|
return rejectedQuery{}
|
|
}
|
|
|
|
startTime := timex.Now()
|
|
return promisedQuery{
|
|
Query: c.collection.Find(query),
|
|
promise: keepablePromise{
|
|
promise: promise,
|
|
log: func(err error) {
|
|
duration := timex.Since(startTime)
|
|
c.logDuration("find", duration, err, query)
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (c *decoratedCollection) FindId(id interface{}) Query {
|
|
promise, err := c.brk.Allow()
|
|
if err != nil {
|
|
return rejectedQuery{}
|
|
}
|
|
|
|
startTime := timex.Now()
|
|
return promisedQuery{
|
|
Query: c.collection.FindId(id),
|
|
promise: keepablePromise{
|
|
promise: promise,
|
|
log: func(err error) {
|
|
duration := timex.Since(startTime)
|
|
c.logDuration("findId", duration, err, id)
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (c *decoratedCollection) Insert(docs ...interface{}) (err error) {
|
|
return c.brk.DoWithAcceptable(func() error {
|
|
startTime := timex.Now()
|
|
defer func() {
|
|
duration := timex.Since(startTime)
|
|
c.logDuration("insert", duration, err, docs...)
|
|
}()
|
|
|
|
return c.collection.Insert(docs...)
|
|
}, acceptable)
|
|
}
|
|
|
|
func (c *decoratedCollection) Pipe(pipeline interface{}) Pipe {
|
|
promise, err := c.brk.Allow()
|
|
if err != nil {
|
|
return rejectedPipe{}
|
|
}
|
|
|
|
startTime := timex.Now()
|
|
return promisedPipe{
|
|
Pipe: c.collection.Pipe(pipeline),
|
|
promise: keepablePromise{
|
|
promise: promise,
|
|
log: func(err error) {
|
|
duration := timex.Since(startTime)
|
|
c.logDuration("pipe", duration, err, pipeline)
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (c *decoratedCollection) Remove(selector interface{}) (err error) {
|
|
return c.brk.DoWithAcceptable(func() error {
|
|
startTime := timex.Now()
|
|
defer func() {
|
|
duration := timex.Since(startTime)
|
|
c.logDuration("remove", duration, err, selector)
|
|
}()
|
|
|
|
return c.collection.Remove(selector)
|
|
}, acceptable)
|
|
}
|
|
|
|
func (c *decoratedCollection) RemoveAll(selector interface{}) (info *mgo.ChangeInfo, err error) {
|
|
err = c.brk.DoWithAcceptable(func() error {
|
|
startTime := timex.Now()
|
|
defer func() {
|
|
duration := timex.Since(startTime)
|
|
c.logDuration("removeAll", duration, err, selector)
|
|
}()
|
|
|
|
info, err = c.collection.RemoveAll(selector)
|
|
return err
|
|
}, acceptable)
|
|
|
|
return
|
|
}
|
|
|
|
func (c *decoratedCollection) RemoveId(id interface{}) (err error) {
|
|
return c.brk.DoWithAcceptable(func() error {
|
|
startTime := timex.Now()
|
|
defer func() {
|
|
duration := timex.Since(startTime)
|
|
c.logDuration("removeId", duration, err, id)
|
|
}()
|
|
|
|
return c.collection.RemoveId(id)
|
|
}, acceptable)
|
|
}
|
|
|
|
func (c *decoratedCollection) Update(selector, update interface{}) (err error) {
|
|
return c.brk.DoWithAcceptable(func() error {
|
|
startTime := timex.Now()
|
|
defer func() {
|
|
duration := timex.Since(startTime)
|
|
c.logDuration("update", duration, err, selector, update)
|
|
}()
|
|
|
|
return c.collection.Update(selector, update)
|
|
}, acceptable)
|
|
}
|
|
|
|
func (c *decoratedCollection) UpdateId(id, update interface{}) (err error) {
|
|
return c.brk.DoWithAcceptable(func() error {
|
|
startTime := timex.Now()
|
|
defer func() {
|
|
duration := timex.Since(startTime)
|
|
c.logDuration("updateId", duration, err, id, update)
|
|
}()
|
|
|
|
return c.collection.UpdateId(id, update)
|
|
}, acceptable)
|
|
}
|
|
|
|
func (c *decoratedCollection) Upsert(selector, update interface{}) (info *mgo.ChangeInfo, err error) {
|
|
err = c.brk.DoWithAcceptable(func() error {
|
|
startTime := timex.Now()
|
|
defer func() {
|
|
duration := timex.Since(startTime)
|
|
c.logDuration("upsert", duration, err, selector, update)
|
|
}()
|
|
|
|
info, err = c.collection.Upsert(selector, update)
|
|
return err
|
|
}, acceptable)
|
|
|
|
return
|
|
}
|
|
|
|
func (c *decoratedCollection) logDuration(method string, duration time.Duration, err error, docs ...interface{}) {
|
|
content, e := json.Marshal(docs)
|
|
if e != nil {
|
|
logx.Error(err)
|
|
} else if err != nil {
|
|
if duration > slowThreshold {
|
|
logx.WithDuration(duration).Slowf("[MONGO] mongo(%s) - slowcall - %s - fail(%s) - %s",
|
|
c.name, method, err.Error(), string(content))
|
|
} else {
|
|
logx.WithDuration(duration).Infof("mongo(%s) - %s - fail(%s) - %s",
|
|
c.name, method, err.Error(), string(content))
|
|
}
|
|
} else {
|
|
if duration > slowThreshold {
|
|
logx.WithDuration(duration).Slowf("[MONGO] mongo(%s) - slowcall - %s - ok - %s",
|
|
c.name, method, string(content))
|
|
} else {
|
|
logx.WithDuration(duration).Infof("mongo(%s) - %s - ok - %s", c.name, method, string(content))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p keepablePromise) accept(err error) error {
|
|
p.promise.Accept()
|
|
p.log(err)
|
|
return err
|
|
}
|
|
|
|
func (p keepablePromise) keep(err error) error {
|
|
if acceptable(err) {
|
|
p.promise.Accept()
|
|
} else {
|
|
p.promise.Reject(err.Error())
|
|
}
|
|
|
|
p.log(err)
|
|
return err
|
|
}
|
|
|
|
func acceptable(err error) bool {
|
|
return err == nil || err == mgo.ErrNotFound
|
|
}
|