From e567a0c718f8e245f7ac6a171e1cfdd3ad0c0416 Mon Sep 17 00:00:00 2001 From: chen quan Date: Thu, 12 May 2022 23:32:34 -0500 Subject: [PATCH] refactor: refactor trace in redis & sql & mongo (#1865) * refactor: refactor tracing in redis & sql & mongo Signed-off-by: chenquan * fix: fix some tests Signed-off-by: chenquan * refactor: add missing content Signed-off-by: chenquan * refactor: adjust `log` and `return` Signed-off-by: chenquan * refactor: reformat code Signed-off-by: chenquan * refactor: reformat code Signed-off-by: chenquan * refactor: reformat code Signed-off-by: chenquan * refactor: simpler span name Signed-off-by: chenquan * refactor: fix a bug Signed-off-by: chenquan * refactor: fix a bug Signed-off-by: chenquan --- core/stores/mon/collection.go | 208 +++++++++++++++++++---------- core/stores/mon/collection_test.go | 14 +- core/stores/mon/model.go | 70 ++++++++-- core/stores/mon/trace.go | 37 +++++ core/stores/mon/util.go | 8 +- core/stores/mon/util_test.go | 5 +- core/stores/redis/hook.go | 67 +++++++--- core/stores/redis/hook_test.go | 10 +- core/stores/sqlx/sqlconn.go | 106 ++++++++------- core/stores/sqlx/trace.go | 37 +++++ core/stores/sqlx/tx.go | 52 +++++--- 11 files changed, 434 insertions(+), 180 deletions(-) create mode 100644 core/stores/mon/trace.go create mode 100644 core/stores/sqlx/trace.go diff --git a/core/stores/mon/collection.go b/core/stores/mon/collection.go index 9bb5bb44..6d2f7047 100644 --- a/core/stores/mon/collection.go +++ b/core/stores/mon/collection.go @@ -8,18 +8,35 @@ import ( "github.com/zeromicro/go-zero/core/breaker" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/timex" - "github.com/zeromicro/go-zero/core/trace" "go.mongodb.org/mongo-driver/mongo" mopt "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/x/mongo/driver/session" - "go.opentelemetry.io/otel" - tracesdk "go.opentelemetry.io/otel/trace" ) const ( defaultSlowThreshold = time.Millisecond * 500 // spanName is the span name of the mongo calls. spanName = "mongo" + + // mongodb method names + aggregate = "Aggregate" + bulkWrite = "BulkWrite" + countDocuments = "CountDocuments" + deleteMany = "DeleteMany" + deleteOne = "DeleteOne" + distinct = "Distinct" + estimatedDocumentCount = "EstimatedDocumentCount" + find = "Find" + findOne = "FindOne" + findOneAndDelete = "FindOneAndDelete" + findOneAndReplace = "FindOneAndReplace" + findOneAndUpdate = "FindOneAndUpdate" + insertMany = "InsertMany" + insertOne = "InsertOne" + replaceOne = "ReplaceOne" + updateByID = "UpdateByID" + updateMany = "UpdateMany" + updateOne = "UpdateOne" ) // ErrNotFound is an alias of mongo.ErrNoDocuments @@ -120,341 +137,397 @@ func newCollection(collection *mongo.Collection, brk breaker.Breaker) Collection func (c *decoratedCollection) Aggregate(ctx context.Context, pipeline interface{}, opts ...*mopt.AggregateOptions) (cur *mongo.Cursor, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, aggregate) + defer func() { + endSpan(span, err) + }() err = c.brk.DoWithAcceptable(func() error { starTime := timex.Now() defer func() { - c.logDurationSimple("Aggregate", starTime, err) + c.logDurationSimple(ctx, aggregate, starTime, err) }() cur, err = c.Collection.Aggregate(ctx, pipeline, opts...) return err }, acceptable) + return } func (c *decoratedCollection) BulkWrite(ctx context.Context, models []mongo.WriteModel, opts ...*mopt.BulkWriteOptions) (res *mongo.BulkWriteResult, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, bulkWrite) + defer func() { + endSpan(span, err) + }() err = c.brk.DoWithAcceptable(func() error { startTime := timex.Now() defer func() { - c.logDurationSimple("BulkWrite", startTime, err) + c.logDurationSimple(ctx, bulkWrite, startTime, err) }() res, err = c.Collection.BulkWrite(ctx, models, opts...) return err }, acceptable) + return } func (c *decoratedCollection) CountDocuments(ctx context.Context, filter interface{}, opts ...*mopt.CountOptions) (count int64, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, countDocuments) + defer func() { + endSpan(span, err) + }() err = c.brk.DoWithAcceptable(func() error { startTime := timex.Now() defer func() { - c.logDurationSimple("CountDocuments", startTime, err) + c.logDurationSimple(ctx, countDocuments, startTime, err) }() count, err = c.Collection.CountDocuments(ctx, filter, opts...) return err }, acceptable) + return } func (c *decoratedCollection) DeleteMany(ctx context.Context, filter interface{}, opts ...*mopt.DeleteOptions) (res *mongo.DeleteResult, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, deleteMany) + defer func() { + endSpan(span, err) + }() err = c.brk.DoWithAcceptable(func() error { startTime := timex.Now() defer func() { - c.logDurationSimple("DeleteMany", startTime, err) + c.logDurationSimple(ctx, deleteMany, startTime, err) }() res, err = c.Collection.DeleteMany(ctx, filter, opts...) return err }, acceptable) + return } func (c *decoratedCollection) DeleteOne(ctx context.Context, filter interface{}, opts ...*mopt.DeleteOptions) (res *mongo.DeleteResult, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, deleteOne) + defer func() { + endSpan(span, err) + }() err = c.brk.DoWithAcceptable(func() error { startTime := timex.Now() defer func() { - c.logDuration("DeleteOne", startTime, err, filter) + c.logDuration(ctx, deleteOne, startTime, err, filter) }() res, err = c.Collection.DeleteOne(ctx, filter, opts...) return err }, acceptable) + return } func (c *decoratedCollection) Distinct(ctx context.Context, fieldName string, filter interface{}, opts ...*mopt.DistinctOptions) (val []interface{}, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, distinct) + defer func() { + endSpan(span, err) + }() err = c.brk.DoWithAcceptable(func() error { startTime := timex.Now() defer func() { - c.logDurationSimple("Distinct", startTime, err) + c.logDurationSimple(ctx, distinct, startTime, err) }() val, err = c.Collection.Distinct(ctx, fieldName, filter, opts...) return err }, acceptable) + return } func (c *decoratedCollection) EstimatedDocumentCount(ctx context.Context, opts ...*mopt.EstimatedDocumentCountOptions) (val int64, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, estimatedDocumentCount) + defer func() { + endSpan(span, err) + }() err = c.brk.DoWithAcceptable(func() error { startTime := timex.Now() defer func() { - c.logDurationSimple("EstimatedDocumentCount", startTime, err) + c.logDurationSimple(ctx, estimatedDocumentCount, startTime, err) }() val, err = c.Collection.EstimatedDocumentCount(ctx, opts...) return err }, acceptable) + return } func (c *decoratedCollection) Find(ctx context.Context, filter interface{}, opts ...*mopt.FindOptions) (cur *mongo.Cursor, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, find) + defer func() { + endSpan(span, err) + }() err = c.brk.DoWithAcceptable(func() error { startTime := timex.Now() defer func() { - c.logDuration("Find", startTime, err, filter) + c.logDuration(ctx, find, startTime, err, filter) }() cur, err = c.Collection.Find(ctx, filter, opts...) return err }, acceptable) + return } func (c *decoratedCollection) FindOne(ctx context.Context, filter interface{}, opts ...*mopt.FindOneOptions) (res *mongo.SingleResult, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, findOne) + defer func() { + endSpan(span, err) + }() err = c.brk.DoWithAcceptable(func() error { startTime := timex.Now() defer func() { - c.logDuration("FindOne", startTime, err, filter) + c.logDuration(ctx, findOne, startTime, err, filter) }() res = c.Collection.FindOne(ctx, filter, opts...) err = res.Err() return err }, acceptable) + return } func (c *decoratedCollection) FindOneAndDelete(ctx context.Context, filter interface{}, opts ...*mopt.FindOneAndDeleteOptions) (res *mongo.SingleResult, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, findOneAndDelete) + defer func() { + endSpan(span, err) + }() err = c.brk.DoWithAcceptable(func() error { startTime := timex.Now() defer func() { - c.logDuration("FindOneAndDelete", startTime, err, filter) + c.logDuration(ctx, findOneAndDelete, startTime, err, filter) }() res = c.Collection.FindOneAndDelete(ctx, filter, opts...) err = res.Err() return err }, acceptable) + return } func (c *decoratedCollection) FindOneAndReplace(ctx context.Context, filter interface{}, replacement interface{}, opts ...*mopt.FindOneAndReplaceOptions) ( res *mongo.SingleResult, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, findOneAndReplace) + defer func() { + endSpan(span, err) + }() err = c.brk.DoWithAcceptable(func() error { startTime := timex.Now() defer func() { - c.logDuration("FindOneAndReplace", startTime, err, filter, replacement) + c.logDuration(ctx, findOneAndReplace, startTime, err, filter, replacement) }() res = c.Collection.FindOneAndReplace(ctx, filter, replacement, opts...) err = res.Err() return err }, acceptable) + return } func (c *decoratedCollection) FindOneAndUpdate(ctx context.Context, filter interface{}, update interface{}, opts ...*mopt.FindOneAndUpdateOptions) (res *mongo.SingleResult, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, findOneAndUpdate) + defer func() { + endSpan(span, err) + }() err = c.brk.DoWithAcceptable(func() error { startTime := timex.Now() defer func() { - c.logDuration("FindOneAndUpdate", startTime, err, filter, update) + c.logDuration(ctx, findOneAndUpdate, startTime, err, filter, update) }() res = c.Collection.FindOneAndUpdate(ctx, filter, update, opts...) err = res.Err() return err }, acceptable) + return } func (c *decoratedCollection) InsertMany(ctx context.Context, documents []interface{}, opts ...*mopt.InsertManyOptions) (res *mongo.InsertManyResult, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, insertMany) + defer func() { + endSpan(span, err) + }() err = c.brk.DoWithAcceptable(func() error { startTime := timex.Now() defer func() { - c.logDurationSimple("InsertMany", startTime, err) + c.logDurationSimple(ctx, insertMany, startTime, err) }() res, err = c.Collection.InsertMany(ctx, documents, opts...) return err }, acceptable) + return } func (c *decoratedCollection) InsertOne(ctx context.Context, document interface{}, opts ...*mopt.InsertOneOptions) (res *mongo.InsertOneResult, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, insertOne) + defer func() { + endSpan(span, err) + }() err = c.brk.DoWithAcceptable(func() error { startTime := timex.Now() defer func() { - c.logDuration("InsertOne", startTime, err, document) + c.logDuration(ctx, insertOne, startTime, err, document) }() res, err = c.Collection.InsertOne(ctx, document, opts...) return err }, acceptable) + return } func (c *decoratedCollection) ReplaceOne(ctx context.Context, filter interface{}, replacement interface{}, opts ...*mopt.ReplaceOptions) (res *mongo.UpdateResult, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, replaceOne) + defer func() { + endSpan(span, err) + }() err = c.brk.DoWithAcceptable(func() error { startTime := timex.Now() defer func() { - c.logDuration("ReplaceOne", startTime, err, filter, replacement) + c.logDuration(ctx, replaceOne, startTime, err, filter, replacement) }() res, err = c.Collection.ReplaceOne(ctx, filter, replacement, opts...) return err }, acceptable) + return } func (c *decoratedCollection) UpdateByID(ctx context.Context, id interface{}, update interface{}, opts ...*mopt.UpdateOptions) (res *mongo.UpdateResult, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, updateByID) + defer func() { + endSpan(span, err) + }() err = c.brk.DoWithAcceptable(func() error { startTime := timex.Now() defer func() { - c.logDuration("UpdateByID", startTime, err, id, update) + c.logDuration(ctx, updateByID, startTime, err, id, update) }() res, err = c.Collection.UpdateByID(ctx, id, update, opts...) return err }, acceptable) + return } func (c *decoratedCollection) UpdateMany(ctx context.Context, filter interface{}, update interface{}, opts ...*mopt.UpdateOptions) (res *mongo.UpdateResult, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, updateMany) + defer func() { + endSpan(span, err) + }() err = c.brk.DoWithAcceptable(func() error { startTime := timex.Now() defer func() { - c.logDurationSimple("UpdateMany", startTime, err) + c.logDurationSimple(ctx, updateMany, startTime, err) }() res, err = c.Collection.UpdateMany(ctx, filter, update, opts...) return err }, acceptable) + return } func (c *decoratedCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*mopt.UpdateOptions) (res *mongo.UpdateResult, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, updateOne) + defer func() { + endSpan(span, err) + }() err = c.brk.DoWithAcceptable(func() error { startTime := timex.Now() defer func() { - c.logDuration("UpdateOne", startTime, err, filter, update) + c.logDuration(ctx, updateOne, startTime, err, filter, update) }() res, err = c.Collection.UpdateOne(ctx, filter, update, opts...) return err }, acceptable) + return } -func (c *decoratedCollection) logDuration(method string, startTime time.Duration, err error, +func (c *decoratedCollection) logDuration(ctx context.Context, method string, startTime time.Duration, err error, docs ...interface{}) { duration := timex.Since(startTime) + logger := logx.WithContext(ctx).WithDuration(duration) + content, e := json.Marshal(docs) if e != nil { - logx.Error(err) + logger.Error(err) } else if err != nil { if duration > slowThreshold.Load() { - logx.WithDuration(duration).Slowf("[MONGO] mongo(%s) - slowcall - %s - fail(%s) - %s", + logger.Slowf("[MONGO] mongo(%s) - slowcall - %s - fail(%s) - %s", c.name, method, err.Error(), string(content)) } else { - logx.WithDuration(duration).Infof("mongo(%s) - %s - fail(%s) - %s", + logger.Infof("mongo(%s) - %s - fail(%s) - %s", c.name, method, err.Error(), string(content)) } } else { if duration > slowThreshold.Load() { - logx.WithDuration(duration).Slowf("[MONGO] mongo(%s) - slowcall - %s - ok - %s", + logger.Slowf("[MONGO] mongo(%s) - slowcall - %s - ok - %s", c.name, method, string(content)) } else { - logx.WithDuration(duration).Infof("mongo(%s) - %s - ok - %s", c.name, method, string(content)) + logger.Infof("mongo(%s) - %s - ok - %s", c.name, method, string(content)) } } } -func (c *decoratedCollection) logDurationSimple(method string, startTime time.Duration, err error) { - logDuration(c.name, method, startTime, err) +func (c *decoratedCollection) logDurationSimple(ctx context.Context, method string, startTime time.Duration, err error) { + logDuration(ctx, c.name, method, startTime, err) } func (p keepablePromise) accept(err error) error { @@ -483,8 +556,3 @@ func acceptable(err error) bool { err == session.ErrAbortTwice || err == session.ErrCommitAfterAbort || err == session.ErrUnackWCUnsupported || err == session.ErrSnapshotTransaction } - -func startSpan(ctx context.Context) (context.Context, tracesdk.Span) { - tracer := otel.GetTracerProvider().Tracer(trace.TraceName) - return tracer.Start(ctx, spanName) -} diff --git a/core/stores/mon/collection_test.go b/core/stores/mon/collection_test.go index 3b39b373..4e2e9d1d 100644 --- a/core/stores/mon/collection_test.go +++ b/core/stores/mon/collection_test.go @@ -588,21 +588,21 @@ func Test_DecoratedCollectionLogDuration(t *testing.T) { }() buf.Reset() - c.logDuration("foo", time.Millisecond, nil, "bar") + c.logDuration(context.Background(), "foo", time.Millisecond, nil, "bar") assert.Contains(t, buf.String(), "foo") assert.Contains(t, buf.String(), "bar") buf.Reset() - c.logDuration("foo", time.Millisecond, errors.New("bar"), make(chan int)) + c.logDuration(context.Background(), "foo", time.Millisecond, errors.New("bar"), make(chan int)) assert.Contains(t, buf.String(), "bar") buf.Reset() - c.logDuration("foo", slowThreshold.Load()+time.Millisecond, errors.New("bar")) + c.logDuration(context.Background(), "foo", slowThreshold.Load()+time.Millisecond, errors.New("bar")) assert.Contains(t, buf.String(), "foo") assert.Contains(t, buf.String(), "slowcall") buf.Reset() - c.logDuration("foo", slowThreshold.Load()+time.Millisecond, nil) + c.logDuration(context.Background(), "foo", slowThreshold.Load()+time.Millisecond, nil) assert.Contains(t, buf.String(), "foo") assert.Contains(t, buf.String(), "slowcall") } @@ -630,15 +630,15 @@ func (d *dropBreaker) Allow() (breaker.Promise, error) { return nil, errDummy } -func (d *dropBreaker) Do(req func() error) error { +func (d *dropBreaker) Do(_ func() error) error { return nil } -func (d *dropBreaker) DoWithAcceptable(req func() error, acceptable breaker.Acceptable) error { +func (d *dropBreaker) DoWithAcceptable(_ func() error, _ breaker.Acceptable) error { return errDummy } -func (d *dropBreaker) DoWithFallback(req func() error, fallback func(err error) error) error { +func (d *dropBreaker) DoWithFallback(_ func() error, _ func(err error) error) error { return nil } diff --git a/core/stores/mon/model.go b/core/stores/mon/model.go index 220a3776..c963f988 100644 --- a/core/stores/mon/model.go +++ b/core/stores/mon/model.go @@ -11,6 +11,14 @@ import ( mopt "go.mongodb.org/mongo-driver/mongo/options" ) +const ( + startSession = "StartSession" + abortTransaction = "AbortTransaction" + commitTransaction = "CommitTransaction" + withTransaction = "WithTransaction" + endSession = "EndSession" +) + type ( // Model is a mongodb store model that represents a collection. Model struct { @@ -23,7 +31,8 @@ type ( wrappedSession struct { mongo.Session - brk breaker.Breaker + name string + brk breaker.Breaker } ) @@ -66,7 +75,7 @@ func (m *Model) StartSession(opts ...*mopt.SessionOptions) (sess mongo.Session, err = m.brk.DoWithAcceptable(func() error { starTime := timex.Now() defer func() { - logDuration(m.name, "StartSession", starTime, err) + logDuration(context.Background(), m.name, startSession, starTime, err) }() session, sessionErr := m.cli.StartSession(opts...) @@ -76,11 +85,13 @@ func (m *Model) StartSession(opts ...*mopt.SessionOptions) (sess mongo.Session, sess = &wrappedSession{ Session: session, + name: m.name, brk: m.brk, } return nil }, acceptable) + return } @@ -169,33 +180,57 @@ func (m *Model) FindOneAndUpdate(ctx context.Context, v, filter interface{}, upd return res.Decode(v) } -func (w *wrappedSession) AbortTransaction(ctx context.Context) error { - ctx, span := startSpan(ctx) - defer span.End() +// AbortTransaction implements the mongo.Session interface. +func (w *wrappedSession) AbortTransaction(ctx context.Context) (err error) { + ctx, span := startSpan(ctx, abortTransaction) + defer func() { + endSpan(span, err) + }() return w.brk.DoWithAcceptable(func() error { + starTime := timex.Now() + defer func() { + logDuration(ctx, w.name, abortTransaction, starTime, err) + }() + return w.Session.AbortTransaction(ctx) }, acceptable) } -func (w *wrappedSession) CommitTransaction(ctx context.Context) error { - ctx, span := startSpan(ctx) - defer span.End() +// CommitTransaction implements the mongo.Session interface. +func (w *wrappedSession) CommitTransaction(ctx context.Context) (err error) { + ctx, span := startSpan(ctx, commitTransaction) + defer func() { + endSpan(span, err) + }() return w.brk.DoWithAcceptable(func() error { + starTime := timex.Now() + defer func() { + logDuration(ctx, w.name, commitTransaction, starTime, err) + }() + return w.Session.CommitTransaction(ctx) }, acceptable) } +// WithTransaction implements the mongo.Session interface. func (w *wrappedSession) WithTransaction( ctx context.Context, fn func(sessCtx mongo.SessionContext) (interface{}, error), opts ...*mopt.TransactionOptions, ) (res interface{}, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, withTransaction) + defer func() { + endSpan(span, err) + }() err = w.brk.DoWithAcceptable(func() error { + starTime := timex.Now() + defer func() { + logDuration(ctx, w.name, withTransaction, starTime, err) + }() + res, err = w.Session.WithTransaction(ctx, fn, opts...) return err }, acceptable) @@ -203,11 +238,20 @@ func (w *wrappedSession) WithTransaction( return } +// EndSession implements the mongo.Session interface. func (w *wrappedSession) EndSession(ctx context.Context) { - ctx, span := startSpan(ctx) - defer span.End() + var err error + ctx, span := startSpan(ctx, endSession) + defer func() { + endSpan(span, err) + }() + + err = w.brk.DoWithAcceptable(func() error { + starTime := timex.Now() + defer func() { + logDuration(ctx, w.name, endSession, starTime, err) + }() - _ = w.brk.DoWithAcceptable(func() error { w.Session.EndSession(ctx) return nil }, acceptable) diff --git a/core/stores/mon/trace.go b/core/stores/mon/trace.go new file mode 100644 index 00000000..2f222f23 --- /dev/null +++ b/core/stores/mon/trace.go @@ -0,0 +1,37 @@ +package mon + +import ( + "context" + + "github.com/zeromicro/go-zero/core/trace" + "go.mongodb.org/mongo-driver/mongo" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + oteltrace "go.opentelemetry.io/otel/trace" +) + +var mongoCmdAttributeKey = attribute.Key("mongo.cmd") + +func startSpan(ctx context.Context, cmd string) (context.Context, oteltrace.Span) { + tracer := otel.GetTracerProvider().Tracer(trace.TraceName) + ctx, span := tracer.Start(ctx, + spanName, + oteltrace.WithSpanKind(oteltrace.SpanKindClient), + ) + span.SetAttributes(mongoCmdAttributeKey.String(cmd)) + return ctx, span +} + +func endSpan(span oteltrace.Span, err error) { + defer span.End() + + if err == nil || err == mongo.ErrNoDocuments || + err == mongo.ErrNilValue || err == mongo.ErrNilDocument { + span.SetStatus(codes.Ok, "") + return + } + + span.SetStatus(codes.Error, err.Error()) + span.RecordError(err) +} diff --git a/core/stores/mon/util.go b/core/stores/mon/util.go index 619f872b..e401df31 100644 --- a/core/stores/mon/util.go +++ b/core/stores/mon/util.go @@ -1,6 +1,7 @@ package mon import ( + "context" "strings" "time" @@ -15,11 +16,12 @@ func FormatAddr(hosts []string) string { return strings.Join(hosts, mongoAddrSep) } -func logDuration(name, method string, startTime time.Duration, err error) { +func logDuration(ctx context.Context, name, method string, startTime time.Duration, err error) { duration := timex.Since(startTime) + logger := logx.WithContext(ctx).WithDuration(duration) if err != nil { - logx.WithDuration(duration).Infof("mongo(%s) - %s - fail(%s)", name, method, err.Error()) + logger.Infof("mongo(%s) - %s - fail(%s)", name, method, err.Error()) } else { - logx.WithDuration(duration).Infof("mongo(%s) - %s - ok", name, method) + logger.Infof("mongo(%s) - %s - ok", name, method) } } diff --git a/core/stores/mon/util_test.go b/core/stores/mon/util_test.go index c23a7933..b042f854 100644 --- a/core/stores/mon/util_test.go +++ b/core/stores/mon/util_test.go @@ -1,6 +1,7 @@ package mon import ( + "context" "errors" "strings" "testing" @@ -50,12 +51,12 @@ func Test_logDuration(t *testing.T) { }() buf.Reset() - logDuration("foo", "bar", time.Millisecond, nil) + logDuration(context.Background(), "foo", "bar", time.Millisecond, nil) assert.Contains(t, buf.String(), "foo") assert.Contains(t, buf.String(), "bar") buf.Reset() - logDuration("foo", "bar", time.Millisecond, errors.New("bar")) + logDuration(context.Background(), "foo", "bar", time.Millisecond, errors.New("bar")) assert.Contains(t, buf.String(), "foo") assert.Contains(t, buf.String(), "bar") assert.Contains(t, buf.String(), "fail") diff --git a/core/stores/redis/hook.go b/core/stores/redis/hook.go index b3b1f144..6cd0ab42 100644 --- a/core/stores/redis/hook.go +++ b/core/stores/redis/hook.go @@ -6,35 +6,40 @@ import ( "time" red "github.com/go-redis/redis/v8" + "github.com/zeromicro/go-zero/core/errorx" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/mapping" "github.com/zeromicro/go-zero/core/timex" "github.com/zeromicro/go-zero/core/trace" "go.opentelemetry.io/otel" - tracestd "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + oteltrace "go.opentelemetry.io/otel/trace" ) // spanName is the span name of the redis calls. const spanName = "redis" var ( - startTimeKey = contextKey("startTime") - durationHook = hook{tracer: otel.GetTracerProvider().Tracer(trace.TraceName)} + startTimeKey = contextKey("startTime") + durationHook = hook{tracer: otel.GetTracerProvider().Tracer(trace.TraceName)} + redisCmdsAttributeKey = attribute.Key("redis.cmds") ) type ( contextKey string hook struct { - tracer tracestd.Tracer + tracer oteltrace.Tracer } ) -func (h hook) BeforeProcess(ctx context.Context, _ red.Cmder) (context.Context, error) { - return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now())), nil +func (h hook) BeforeProcess(ctx context.Context, cmd red.Cmder) (context.Context, error) { + return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now()), cmd), nil } func (h hook) AfterProcess(ctx context.Context, cmd red.Cmder) error { - h.endSpan(ctx) + err := cmd.Err() + h.endSpan(ctx, err) val := ctx.Value(startTimeKey) if val == nil { @@ -54,17 +59,30 @@ func (h hook) AfterProcess(ctx context.Context, cmd red.Cmder) error { return nil } -func (h hook) BeforeProcessPipeline(ctx context.Context, _ []red.Cmder) (context.Context, error) { - return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now())), nil +func (h hook) BeforeProcessPipeline(ctx context.Context, cmds []red.Cmder) (context.Context, error) { + if len(cmds) == 0 { + return ctx, nil + } + + return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now()), cmds...), nil } func (h hook) AfterProcessPipeline(ctx context.Context, cmds []red.Cmder) error { - h.endSpan(ctx) - if len(cmds) == 0 { return nil } + batchError := errorx.BatchError{} + for _, cmd := range cmds { + err := cmd.Err() + if err == nil { + continue + } + + batchError.Add(err) + } + h.endSpan(ctx, batchError.Err()) + val := ctx.Value(startTimeKey) if val == nil { return nil @@ -94,11 +112,30 @@ func logDuration(ctx context.Context, cmd red.Cmder, duration time.Duration) { logx.WithContext(ctx).WithDuration(duration).Slowf("[REDIS] slowcall on executing: %s", buf.String()) } -func (h hook) startSpan(ctx context.Context) context.Context { - ctx, _ = h.tracer.Start(ctx, spanName) +func (h hook) startSpan(ctx context.Context, cmds ...red.Cmder) context.Context { + ctx, span := h.tracer.Start(ctx, + spanName, + oteltrace.WithSpanKind(oteltrace.SpanKindClient), + ) + + cmdStrs := make([]string, 0, len(cmds)) + for _, cmd := range cmds { + cmdStrs = append(cmdStrs, cmd.Name()) + } + span.SetAttributes(redisCmdsAttributeKey.StringSlice(cmdStrs)) + return ctx } -func (h hook) endSpan(ctx context.Context) { - tracestd.SpanFromContext(ctx).End() +func (h hook) endSpan(ctx context.Context, err error) { + span := oteltrace.SpanFromContext(ctx) + defer span.End() + + if err == nil || err == red.Nil { + span.SetStatus(codes.Ok, "") + return + } + + span.SetStatus(codes.Error, err.Error()) + span.RecordError(err) } diff --git a/core/stores/redis/hook_test.go b/core/stores/redis/hook_test.go index 2c634f61..f8480a93 100644 --- a/core/stores/redis/hook_test.go +++ b/core/stores/redis/hook_test.go @@ -27,7 +27,7 @@ func TestHookProcessCase1(t *testing.T) { log.SetOutput(&buf) defer log.SetOutput(writer) - ctx, err := durationHook.BeforeProcess(context.Background(), nil) + ctx, err := durationHook.BeforeProcess(context.Background(), red.NewCmd(context.Background())) if err != nil { t.Fatal(err) } @@ -48,7 +48,7 @@ func TestHookProcessCase2(t *testing.T) { w, restore := injectLog() defer restore() - ctx, err := durationHook.BeforeProcess(context.Background(), nil) + ctx, err := durationHook.BeforeProcess(context.Background(), red.NewCmd(context.Background())) if err != nil { t.Fatal(err) } @@ -89,7 +89,7 @@ func TestHookProcessPipelineCase1(t *testing.T) { log.SetOutput(&buf) defer log.SetOutput(writer) - ctx, err := durationHook.BeforeProcessPipeline(context.Background(), nil) + ctx, err := durationHook.BeforeProcessPipeline(context.Background(), []red.Cmder{red.NewCmd(context.Background())}) if err != nil { t.Fatal(err) } @@ -112,7 +112,7 @@ func TestHookProcessPipelineCase2(t *testing.T) { w, restore := injectLog() defer restore() - ctx, err := durationHook.BeforeProcessPipeline(context.Background(), nil) + ctx, err := durationHook.BeforeProcessPipeline(context.Background(), []red.Cmder{red.NewCmd(context.Background())}) if err != nil { t.Fatal(err) } @@ -156,7 +156,7 @@ func TestHookProcessPipelineCase5(t *testing.T) { defer log.SetOutput(writer) ctx := context.WithValue(context.Background(), startTimeKey, "foo") - assert.Nil(t, durationHook.AfterProcessPipeline(ctx, nil)) + assert.Nil(t, durationHook.AfterProcessPipeline(ctx, []red.Cmder{red.NewCmd(context.Background())})) assert.True(t, buf.Len() == 0) } diff --git a/core/stores/sqlx/sqlconn.go b/core/stores/sqlx/sqlconn.go index 2d880711..6e2b6d4f 100644 --- a/core/stores/sqlx/sqlconn.go +++ b/core/stores/sqlx/sqlconn.go @@ -6,9 +6,6 @@ import ( "github.com/zeromicro/go-zero/core/breaker" "github.com/zeromicro/go-zero/core/logx" - "github.com/zeromicro/go-zero/core/trace" - "go.opentelemetry.io/otel" - tracesdk "go.opentelemetry.io/otel/trace" ) // spanName is used to identify the span name for the SQL execution. @@ -140,8 +137,10 @@ func (db *commonSqlConn) Exec(q string, args ...interface{}) (result sql.Result, func (db *commonSqlConn) ExecCtx(ctx context.Context, q string, args ...interface{}) ( result sql.Result, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, "Exec") + defer func() { + endSpan(span, err) + }() err = db.brk.DoWithAcceptable(func() error { var conn *sql.DB @@ -163,8 +162,10 @@ func (db *commonSqlConn) Prepare(query string) (stmt StmtSession, err error) { } func (db *commonSqlConn) PrepareCtx(ctx context.Context, query string) (stmt StmtSession, err error) { - ctx, span := startSpan(ctx) - defer span.End() + ctx, span := startSpan(ctx, "Prepare") + defer func() { + endSpan(span, err) + }() err = db.brk.DoWithAcceptable(func() error { var conn *sql.DB @@ -194,9 +195,11 @@ func (db *commonSqlConn) QueryRow(v interface{}, q string, args ...interface{}) } func (db *commonSqlConn) QueryRowCtx(ctx context.Context, v interface{}, q string, - args ...interface{}) error { - ctx, span := startSpan(ctx) - defer span.End() + args ...interface{}) (err error) { + ctx, span := startSpan(ctx, "QueryRow") + defer func() { + endSpan(span, err) + }() return db.queryRows(ctx, func(rows *sql.Rows) error { return unmarshalRow(v, rows, true) @@ -208,9 +211,11 @@ func (db *commonSqlConn) QueryRowPartial(v interface{}, q string, args ...interf } func (db *commonSqlConn) QueryRowPartialCtx(ctx context.Context, v interface{}, - q string, args ...interface{}) error { - ctx, span := startSpan(ctx) - defer span.End() + q string, args ...interface{}) (err error) { + ctx, span := startSpan(ctx, "QueryRowPartial") + defer func() { + endSpan(span, err) + }() return db.queryRows(ctx, func(rows *sql.Rows) error { return unmarshalRow(v, rows, false) @@ -222,9 +227,11 @@ func (db *commonSqlConn) QueryRows(v interface{}, q string, args ...interface{}) } func (db *commonSqlConn) QueryRowsCtx(ctx context.Context, v interface{}, q string, - args ...interface{}) error { - ctx, span := startSpan(ctx) - defer span.End() + args ...interface{}) (err error) { + ctx, span := startSpan(ctx, "QueryRows") + defer func() { + endSpan(span, err) + }() return db.queryRows(ctx, func(rows *sql.Rows) error { return unmarshalRows(v, rows, true) @@ -236,9 +243,11 @@ func (db *commonSqlConn) QueryRowsPartial(v interface{}, q string, args ...inter } func (db *commonSqlConn) QueryRowsPartialCtx(ctx context.Context, v interface{}, - q string, args ...interface{}) error { - ctx, span := startSpan(ctx) - defer span.End() + q string, args ...interface{}) (err error) { + ctx, span := startSpan(ctx, "QueryRowsPartial") + defer func() { + endSpan(span, err) + }() return db.queryRows(ctx, func(rows *sql.Rows) error { return unmarshalRows(v, rows, false) @@ -255,9 +264,11 @@ func (db *commonSqlConn) Transact(fn func(Session) error) error { }) } -func (db *commonSqlConn) TransactCtx(ctx context.Context, fn func(context.Context, Session) error) error { - ctx, span := startSpan(ctx) - defer span.End() +func (db *commonSqlConn) TransactCtx(ctx context.Context, fn func(context.Context, Session) error) (err error) { + ctx, span := startSpan(ctx, "Transact") + defer func() { + endSpan(span, err) + }() return db.brk.DoWithAcceptable(func() error { return transact(ctx, db, db.beginTx, fn) @@ -274,10 +285,7 @@ func (db *commonSqlConn) acceptable(err error) bool { } func (db *commonSqlConn) queryRows(ctx context.Context, scanner func(*sql.Rows) error, - q string, args ...interface{}) error { - ctx, span := startSpan(ctx) - defer span.End() - + q string, args ...interface{}) (err error) { var qerr error return db.brk.DoWithAcceptable(func() error { conn, err := db.connProv() @@ -303,9 +311,11 @@ func (s statement) Exec(args ...interface{}) (sql.Result, error) { return s.ExecCtx(context.Background(), args...) } -func (s statement) ExecCtx(ctx context.Context, args ...interface{}) (sql.Result, error) { - ctx, span := startSpan(ctx) - defer span.End() +func (s statement) ExecCtx(ctx context.Context, args ...interface{}) (result sql.Result, err error) { + ctx, span := startSpan(ctx, "Exec") + defer func() { + endSpan(span, err) + }() return execStmt(ctx, s.stmt, s.query, args...) } @@ -314,22 +324,27 @@ func (s statement) QueryRow(v interface{}, args ...interface{}) error { return s.QueryRowCtx(context.Background(), v, args...) } -func (s statement) QueryRowCtx(ctx context.Context, v interface{}, args ...interface{}) error { - ctx, span := startSpan(ctx) - defer span.End() +func (s statement) QueryRowCtx(ctx context.Context, v interface{}, args ...interface{}) (err error) { + ctx, span := startSpan(ctx, "QueryRow") + defer func() { + endSpan(span, err) + }() return queryStmt(ctx, s.stmt, func(rows *sql.Rows) error { return unmarshalRow(v, rows, true) }, s.query, args...) + } func (s statement) QueryRowPartial(v interface{}, args ...interface{}) error { return s.QueryRowPartialCtx(context.Background(), v, args...) } -func (s statement) QueryRowPartialCtx(ctx context.Context, v interface{}, args ...interface{}) error { - ctx, span := startSpan(ctx) - defer span.End() +func (s statement) QueryRowPartialCtx(ctx context.Context, v interface{}, args ...interface{}) (err error) { + ctx, span := startSpan(ctx, "QueryRowPartial") + defer func() { + endSpan(span, err) + }() return queryStmt(ctx, s.stmt, func(rows *sql.Rows) error { return unmarshalRow(v, rows, false) @@ -340,9 +355,11 @@ func (s statement) QueryRows(v interface{}, args ...interface{}) error { return s.QueryRowsCtx(context.Background(), v, args...) } -func (s statement) QueryRowsCtx(ctx context.Context, v interface{}, args ...interface{}) error { - ctx, span := startSpan(ctx) - defer span.End() +func (s statement) QueryRowsCtx(ctx context.Context, v interface{}, args ...interface{}) (err error) { + ctx, span := startSpan(ctx, "QueryRows") + defer func() { + endSpan(span, err) + }() return queryStmt(ctx, s.stmt, func(rows *sql.Rows) error { return unmarshalRows(v, rows, true) @@ -353,16 +370,13 @@ func (s statement) QueryRowsPartial(v interface{}, args ...interface{}) error { return s.QueryRowsPartialCtx(context.Background(), v, args...) } -func (s statement) QueryRowsPartialCtx(ctx context.Context, v interface{}, args ...interface{}) error { - ctx, span := startSpan(ctx) - defer span.End() +func (s statement) QueryRowsPartialCtx(ctx context.Context, v interface{}, args ...interface{}) (err error) { + ctx, span := startSpan(ctx, "QueryRowsPartial") + defer func() { + endSpan(span, err) + }() return queryStmt(ctx, s.stmt, func(rows *sql.Rows) error { return unmarshalRows(v, rows, false) }, s.query, args...) } - -func startSpan(ctx context.Context) (context.Context, tracesdk.Span) { - tracer := otel.GetTracerProvider().Tracer(trace.TraceName) - return tracer.Start(ctx, spanName) -} diff --git a/core/stores/sqlx/trace.go b/core/stores/sqlx/trace.go new file mode 100644 index 00000000..2c862264 --- /dev/null +++ b/core/stores/sqlx/trace.go @@ -0,0 +1,37 @@ +package sqlx + +import ( + "context" + "database/sql" + + "github.com/zeromicro/go-zero/core/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + oteltrace "go.opentelemetry.io/otel/trace" +) + +var sqlAttributeKey = attribute.Key("sql.method") + +func startSpan(ctx context.Context, method string) (context.Context, oteltrace.Span) { + tracer := otel.GetTracerProvider().Tracer(trace.TraceName) + start, span := tracer.Start(ctx, + spanName, + oteltrace.WithSpanKind(oteltrace.SpanKindClient), + ) + span.SetAttributes(sqlAttributeKey.String(method)) + + return start, span +} + +func endSpan(span oteltrace.Span, err error) { + defer span.End() + + if err == nil || err == sql.ErrNoRows { + span.SetStatus(codes.Ok, "") + return + } + + span.SetStatus(codes.Error, err.Error()) + span.RecordError(err) +} diff --git a/core/stores/sqlx/tx.go b/core/stores/sqlx/tx.go index 98d3b1d8..0ddd7e41 100644 --- a/core/stores/sqlx/tx.go +++ b/core/stores/sqlx/tx.go @@ -30,20 +30,26 @@ func (t txSession) Exec(q string, args ...interface{}) (sql.Result, error) { return t.ExecCtx(context.Background(), q, args...) } -func (t txSession) ExecCtx(ctx context.Context, q string, args ...interface{}) (sql.Result, error) { - ctx, span := startSpan(ctx) - defer span.End() +func (t txSession) ExecCtx(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error) { + ctx, span := startSpan(ctx, "Exec") + defer func() { + endSpan(span, err) + }() - return exec(ctx, t.Tx, q, args...) + result, err = exec(ctx, t.Tx, q, args...) + + return } func (t txSession) Prepare(q string) (StmtSession, error) { return t.PrepareCtx(context.Background(), q) } -func (t txSession) PrepareCtx(ctx context.Context, q string) (StmtSession, error) { - ctx, span := startSpan(ctx) - defer span.End() +func (t txSession) PrepareCtx(ctx context.Context, q string) (stmtSession StmtSession, err error) { + ctx, span := startSpan(ctx, "Prepare") + defer func() { + endSpan(span, err) + }() stmt, err := t.Tx.PrepareContext(ctx, q) if err != nil { @@ -60,9 +66,11 @@ func (t txSession) QueryRow(v interface{}, q string, args ...interface{}) error return t.QueryRowCtx(context.Background(), v, q, args...) } -func (t txSession) QueryRowCtx(ctx context.Context, v interface{}, q string, args ...interface{}) error { - ctx, span := startSpan(ctx) - defer span.End() +func (t txSession) QueryRowCtx(ctx context.Context, v interface{}, q string, args ...interface{}) (err error) { + ctx, span := startSpan(ctx, "QueryRow") + defer func() { + endSpan(span, err) + }() return query(ctx, t.Tx, func(rows *sql.Rows) error { return unmarshalRow(v, rows, true) @@ -74,9 +82,11 @@ func (t txSession) QueryRowPartial(v interface{}, q string, args ...interface{}) } func (t txSession) QueryRowPartialCtx(ctx context.Context, v interface{}, q string, - args ...interface{}) error { - ctx, span := startSpan(ctx) - defer span.End() + args ...interface{}) (err error) { + ctx, span := startSpan(ctx, "QueryRowPartial") + defer func() { + endSpan(span, err) + }() return query(ctx, t.Tx, func(rows *sql.Rows) error { return unmarshalRow(v, rows, false) @@ -87,9 +97,11 @@ func (t txSession) QueryRows(v interface{}, q string, args ...interface{}) error return t.QueryRowsCtx(context.Background(), v, q, args...) } -func (t txSession) QueryRowsCtx(ctx context.Context, v interface{}, q string, args ...interface{}) error { - ctx, span := startSpan(ctx) - defer span.End() +func (t txSession) QueryRowsCtx(ctx context.Context, v interface{}, q string, args ...interface{}) (err error) { + ctx, span := startSpan(ctx, "QueryRows") + defer func() { + endSpan(span, err) + }() return query(ctx, t.Tx, func(rows *sql.Rows) error { return unmarshalRows(v, rows, true) @@ -101,9 +113,11 @@ func (t txSession) QueryRowsPartial(v interface{}, q string, args ...interface{} } func (t txSession) QueryRowsPartialCtx(ctx context.Context, v interface{}, q string, - args ...interface{}) error { - ctx, span := startSpan(ctx) - defer span.End() + args ...interface{}) (err error) { + ctx, span := startSpan(ctx, "QueryRowsPartial") + defer func() { + endSpan(span, err) + }() return query(ctx, t.Tx, func(rows *sql.Rows) error { return unmarshalRows(v, rows, false)