|
|
|
@ -3,6 +3,7 @@ package sqlx
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"database/sql"
|
|
|
|
|
"errors"
|
|
|
|
|
|
|
|
|
|
"github.com/zeromicro/go-zero/core/breaker"
|
|
|
|
|
"github.com/zeromicro/go-zero/core/logx"
|
|
|
|
@ -157,7 +158,7 @@ func (db *commonSqlConn) ExecCtx(ctx context.Context, q string, args ...any) (
|
|
|
|
|
result, err = exec(ctx, conn, q, args...)
|
|
|
|
|
return err
|
|
|
|
|
}, db.acceptable)
|
|
|
|
|
if err == breaker.ErrServiceUnavailable {
|
|
|
|
|
if errors.Is(err, breaker.ErrServiceUnavailable) {
|
|
|
|
|
metricReqErr.Inc("Exec", "breaker")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -193,7 +194,7 @@ func (db *commonSqlConn) PrepareCtx(ctx context.Context, query string) (stmt Stm
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}, db.acceptable)
|
|
|
|
|
if err == breaker.ErrServiceUnavailable {
|
|
|
|
|
if errors.Is(err, breaker.ErrServiceUnavailable) {
|
|
|
|
|
metricReqErr.Inc("Prepare", "breaker")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -283,7 +284,7 @@ func (db *commonSqlConn) TransactCtx(ctx context.Context, fn func(context.Contex
|
|
|
|
|
err = db.brk.DoWithAcceptable(func() error {
|
|
|
|
|
return transact(ctx, db, db.beginTx, fn)
|
|
|
|
|
}, db.acceptable)
|
|
|
|
|
if err == breaker.ErrServiceUnavailable {
|
|
|
|
|
if errors.Is(err, breaker.ErrServiceUnavailable) {
|
|
|
|
|
metricReqErr.Inc("Transact", "breaker")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -291,11 +292,13 @@ func (db *commonSqlConn) TransactCtx(ctx context.Context, fn func(context.Contex
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (db *commonSqlConn) acceptable(err error) bool {
|
|
|
|
|
if err == nil || err == sql.ErrNoRows || err == sql.ErrTxDone || err == context.Canceled {
|
|
|
|
|
if err == nil || errors.Is(err, sql.ErrNoRows) || errors.Is(err, sql.ErrTxDone) ||
|
|
|
|
|
errors.Is(err, context.Canceled) {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if _, ok := err.(acceptableError); ok {
|
|
|
|
|
var e acceptableError
|
|
|
|
|
if errors.As(err, &e) {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -321,9 +324,9 @@ func (db *commonSqlConn) queryRows(ctx context.Context, scanner func(*sql.Rows)
|
|
|
|
|
return qerr
|
|
|
|
|
}, q, args...)
|
|
|
|
|
}, func(err error) bool {
|
|
|
|
|
return qerr == err || db.acceptable(err)
|
|
|
|
|
return errors.Is(err, qerr) || db.acceptable(err)
|
|
|
|
|
})
|
|
|
|
|
if err == breaker.ErrServiceUnavailable {
|
|
|
|
|
if errors.Is(err, breaker.ErrServiceUnavailable) {
|
|
|
|
|
metricReqErr.Inc("queryRows", "breaker")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|