From bc3c9484d12c98f070220801e1a7ab3be9100c33 Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Fri, 22 Apr 2022 09:37:09 +0800 Subject: [PATCH] chore: refactor (#1814) --- core/stores/mon/collection.go | 7 ++++--- core/stores/mon/model.go | 19 +++++++++++++------ core/stores/mon/model_test.go | 3 +-- core/stores/redis/hook.go | 17 ++++++++++------- core/stores/sqlx/sqlconn.go | 6 ++++-- 5 files changed, 32 insertions(+), 20 deletions(-) diff --git a/core/stores/mon/collection.go b/core/stores/mon/collection.go index cfed2f68..41cedc06 100644 --- a/core/stores/mon/collection.go +++ b/core/stores/mon/collection.go @@ -473,9 +473,10 @@ func (p keepablePromise) keep(err error) error { func acceptable(err error) bool { return err == nil || err == mongo.ErrNoDocuments || err == mongo.ErrNilValue || err == mongo.ErrNilDocument || err == mongo.ErrNilCursor || err == mongo.ErrEmptySlice || - // session err - err == session.ErrSessionEnded || err == session.ErrNoTransactStarted || err == session.ErrTransactInProgress || - err == session.ErrAbortAfterCommit || err == session.ErrAbortTwice || err == session.ErrCommitAfterAbort || + // session errors + err == session.ErrSessionEnded || err == session.ErrNoTransactStarted || + err == session.ErrTransactInProgress || err == session.ErrAbortAfterCommit || + err == session.ErrAbortTwice || err == session.ErrCommitAfterAbort || err == session.ErrUnackWCUnsupported || err == session.ErrSnapshotTransaction } diff --git a/core/stores/mon/model.go b/core/stores/mon/model.go index dd4bab2e..220a3776 100644 --- a/core/stores/mon/model.go +++ b/core/stores/mon/model.go @@ -21,7 +21,7 @@ type ( opts []Option } - wrapSession struct { + wrappedSession struct { mongo.Session brk breaker.Breaker } @@ -74,7 +74,10 @@ func (m *Model) StartSession(opts ...*mopt.SessionOptions) (sess mongo.Session, return sessionErr } - sess = &wrapSession{Session: session, brk: m.brk} + sess = &wrappedSession{ + Session: session, + brk: m.brk, + } return nil }, acceptable) @@ -166,7 +169,7 @@ func (m *Model) FindOneAndUpdate(ctx context.Context, v, filter interface{}, upd return res.Decode(v) } -func (w *wrapSession) AbortTransaction(ctx context.Context) error { +func (w *wrappedSession) AbortTransaction(ctx context.Context) error { ctx, span := startSpan(ctx) defer span.End() @@ -175,7 +178,7 @@ func (w *wrapSession) AbortTransaction(ctx context.Context) error { }, acceptable) } -func (w *wrapSession) CommitTransaction(ctx context.Context) error { +func (w *wrappedSession) CommitTransaction(ctx context.Context) error { ctx, span := startSpan(ctx) defer span.End() @@ -184,7 +187,11 @@ func (w *wrapSession) CommitTransaction(ctx context.Context) error { }, acceptable) } -func (w *wrapSession) WithTransaction(ctx context.Context, fn func(sessCtx mongo.SessionContext) (interface{}, error), opts ...*mopt.TransactionOptions) (res interface{}, err error) { +func (w *wrappedSession) WithTransaction( + ctx context.Context, + fn func(sessCtx mongo.SessionContext) (interface{}, error), + opts ...*mopt.TransactionOptions, +) (res interface{}, err error) { ctx, span := startSpan(ctx) defer span.End() @@ -196,7 +203,7 @@ func (w *wrapSession) WithTransaction(ctx context.Context, fn func(sessCtx mongo return } -func (w *wrapSession) EndSession(ctx context.Context) { +func (w *wrappedSession) EndSession(ctx context.Context) { ctx, span := startSpan(ctx) defer span.End() diff --git a/core/stores/mon/model_test.go b/core/stores/mon/model_test.go index f7dcd4e2..d86b15ef 100644 --- a/core/stores/mon/model_test.go +++ b/core/stores/mon/model_test.go @@ -18,6 +18,7 @@ func TestModel_StartSession(t *testing.T) { m := createModel(mt) sess, err := m.StartSession() assert.Nil(t, err) + defer sess.EndSession(context.Background()) _, err = sess.WithTransaction(context.Background(), func(sessCtx mongo.SessionContext) (interface{}, error) { _ = sessCtx.StartTransaction() @@ -26,10 +27,8 @@ func TestModel_StartSession(t *testing.T) { return nil, nil }) assert.Nil(t, err) - assert.NoError(t, sess.CommitTransaction(context.Background())) assert.Error(t, sess.AbortTransaction(context.Background())) - sess.EndSession(context.Background()) }) } diff --git a/core/stores/redis/hook.go b/core/stores/redis/hook.go index 00373113..43043be4 100644 --- a/core/stores/redis/hook.go +++ b/core/stores/redis/hook.go @@ -14,6 +14,9 @@ import ( tracestd "go.opentelemetry.io/otel/trace" ) +// spanName is the span name of the redis calls. +const spanName = "redis" + var ( startTimeKey = contextKey("startTime") spanKey = contextKey("span") @@ -28,11 +31,11 @@ type ( ) func (h hook) BeforeProcess(ctx context.Context, _ red.Cmder) (context.Context, error) { - return h.spanStart(context.WithValue(ctx, startTimeKey, timex.Now())), nil + return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now())), nil } func (h hook) AfterProcess(ctx context.Context, cmd red.Cmder) error { - h.spanEnd(ctx) + h.endSpan(ctx) val := ctx.Value(startTimeKey) if val == nil { @@ -53,11 +56,11 @@ func (h hook) AfterProcess(ctx context.Context, cmd red.Cmder) error { } func (h hook) BeforeProcessPipeline(ctx context.Context, _ []red.Cmder) (context.Context, error) { - return h.spanStart(context.WithValue(ctx, startTimeKey, timex.Now())), nil + return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now())), nil } func (h hook) AfterProcessPipeline(ctx context.Context, cmds []red.Cmder) error { - h.spanEnd(ctx) + h.endSpan(ctx) if len(cmds) == 0 { return nil @@ -92,12 +95,12 @@ func logDuration(ctx context.Context, cmd red.Cmder, duration time.Duration) { logx.WithContext(ctx).WithDuration(duration).Slowf("[REDIS] slowcall on executing: %s", buf.String()) } -func (h hook) spanStart(ctx context.Context) context.Context { - ctx, span := h.tracer.Start(ctx, "redis") +func (h hook) startSpan(ctx context.Context) context.Context { + ctx, span := h.tracer.Start(ctx, spanName) return context.WithValue(ctx, spanKey, span) } -func (h hook) spanEnd(ctx context.Context) { +func (h hook) endSpan(ctx context.Context) { spanVal := ctx.Value(spanKey) if spanVal == nil { return diff --git a/core/stores/sqlx/sqlconn.go b/core/stores/sqlx/sqlconn.go index 6a2737c9..2d880711 100644 --- a/core/stores/sqlx/sqlconn.go +++ b/core/stores/sqlx/sqlconn.go @@ -11,6 +11,9 @@ import ( tracesdk "go.opentelemetry.io/otel/trace" ) +// spanName is used to identify the span name for the SQL execution. +const spanName = "sql" + // ErrNotFound is an alias of sql.ErrNoRows var ErrNotFound = sql.ErrNoRows @@ -240,7 +243,6 @@ func (db *commonSqlConn) QueryRowsPartialCtx(ctx context.Context, v interface{}, return db.queryRows(ctx, func(rows *sql.Rows) error { return unmarshalRows(v, rows, false) }, q, args...) - } func (db *commonSqlConn) RawDB() (*sql.DB, error) { @@ -362,5 +364,5 @@ func (s statement) QueryRowsPartialCtx(ctx context.Context, v interface{}, args func startSpan(ctx context.Context) (context.Context, tracesdk.Span) { tracer := otel.GetTracerProvider().Tracer(trace.TraceName) - return tracer.Start(ctx, "sql") + return tracer.Start(ctx, spanName) }