From ae7f1aabdd237c3d51d588d415da73dfac1ebe71 Mon Sep 17 00:00:00 2001 From: dawn_zhou <15951703783@163.com> Date: Sat, 17 Sep 2022 19:35:30 +0800 Subject: [PATCH] feat: mysql and redis metric support (#2355) * feat: mysql and redis metric support * feat: mysql and redis metric support * feat: mysql and redis metric support Co-authored-by: dawn.zhou --- core/metric/counter.go | 12 +++++- core/metric/counter_test.go | 12 ++++++ core/metric/gauge.go | 16 +++++++- core/metric/gauge_test.go | 3 ++ core/metric/histogram.go | 8 +++- core/metric/histogram_test.go | 1 + core/stores/redis/hook.go | 39 +++++++++++++++++++ core/stores/redis/metrics.go | 23 +++++++++++ core/stores/sqlx/metrics.go | 23 +++++++++++ core/stores/sqlx/sqlconn.go | 24 +++++++++++- core/stores/sqlx/sqlconn_test.go | 7 ++-- core/stores/sqlx/stmt.go | 2 + rest/handler/prometheushandler.go | 5 --- .../prometheusinterceptor.go | 6 +-- .../prometheusinterceptor.go | 6 +-- 15 files changed, 164 insertions(+), 23 deletions(-) create mode 100644 core/stores/redis/metrics.go create mode 100644 core/stores/sqlx/metrics.go diff --git a/core/metric/counter.go b/core/metric/counter.go index 5344773c..578f91f9 100644 --- a/core/metric/counter.go +++ b/core/metric/counter.go @@ -1,8 +1,10 @@ package metric import ( - prom "github.com/prometheus/client_golang/prometheus" "github.com/zeromicro/go-zero/core/proc" + "github.com/zeromicro/go-zero/core/prometheus" + + prom "github.com/prometheus/client_golang/prometheus" ) type ( @@ -47,10 +49,18 @@ func NewCounterVec(cfg *CounterVecOpts) CounterVec { } func (cv *promCounterVec) Inc(labels ...string) { + if !prometheus.Enabled() { + return + } + cv.counter.WithLabelValues(labels...).Inc() } func (cv *promCounterVec) Add(v float64, labels ...string) { + if !prometheus.Enabled() { + return + } + cv.counter.WithLabelValues(labels...).Add(v) } diff --git a/core/metric/counter_test.go b/core/metric/counter_test.go index d8612fd6..64cb5056 100644 --- a/core/metric/counter_test.go +++ b/core/metric/counter_test.go @@ -3,6 +3,8 @@ package metric import ( "testing" + "github.com/zeromicro/go-zero/core/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" ) @@ -21,6 +23,7 @@ func TestNewCounterVec(t *testing.T) { } func TestCounterIncr(t *testing.T) { + startAgent() counterVec := NewCounterVec(&CounterVecOpts{ Namespace: "http_client", Subsystem: "call", @@ -37,6 +40,7 @@ func TestCounterIncr(t *testing.T) { } func TestCounterAdd(t *testing.T) { + startAgent() counterVec := NewCounterVec(&CounterVecOpts{ Namespace: "rpc_server", Subsystem: "requests", @@ -51,3 +55,11 @@ func TestCounterAdd(t *testing.T) { r := testutil.ToFloat64(cv.counter) assert.Equal(t, float64(33), r) } + +func startAgent() { + prometheus.StartAgent(prometheus.Config{ + Host: "127.0.0.1", + Port: 9101, + Path: "/metrics", + }) +} diff --git a/core/metric/gauge.go b/core/metric/gauge.go index b4452983..763bd4d9 100644 --- a/core/metric/gauge.go +++ b/core/metric/gauge.go @@ -1,8 +1,10 @@ package metric import ( - prom "github.com/prometheus/client_golang/prometheus" "github.com/zeromicro/go-zero/core/proc" + "github.com/zeromicro/go-zero/core/prometheus" + + prom "github.com/prometheus/client_golang/prometheus" ) type ( @@ -50,14 +52,26 @@ func NewGaugeVec(cfg *GaugeVecOpts) GaugeVec { } func (gv *promGaugeVec) Inc(labels ...string) { + if !prometheus.Enabled() { + return + } + gv.gauge.WithLabelValues(labels...).Inc() } func (gv *promGaugeVec) Add(v float64, labels ...string) { + if !prometheus.Enabled() { + return + } + gv.gauge.WithLabelValues(labels...).Add(v) } func (gv *promGaugeVec) Set(v float64, labels ...string) { + if !prometheus.Enabled() { + return + } + gv.gauge.WithLabelValues(labels...).Set(v) } diff --git a/core/metric/gauge_test.go b/core/metric/gauge_test.go index 534889fb..ad3ea549 100644 --- a/core/metric/gauge_test.go +++ b/core/metric/gauge_test.go @@ -21,6 +21,7 @@ func TestNewGaugeVec(t *testing.T) { } func TestGaugeInc(t *testing.T) { + startAgent() gaugeVec := NewGaugeVec(&GaugeVecOpts{ Namespace: "rpc_client2", Subsystem: "requests", @@ -37,6 +38,7 @@ func TestGaugeInc(t *testing.T) { } func TestGaugeAdd(t *testing.T) { + startAgent() gaugeVec := NewGaugeVec(&GaugeVecOpts{ Namespace: "rpc_client", Subsystem: "request", @@ -53,6 +55,7 @@ func TestGaugeAdd(t *testing.T) { } func TestGaugeSet(t *testing.T) { + startAgent() gaugeVec := NewGaugeVec(&GaugeVecOpts{ Namespace: "http_client", Subsystem: "request", diff --git a/core/metric/histogram.go b/core/metric/histogram.go index 2053f6bb..77cb5e39 100644 --- a/core/metric/histogram.go +++ b/core/metric/histogram.go @@ -1,8 +1,10 @@ package metric import ( - prom "github.com/prometheus/client_golang/prometheus" "github.com/zeromicro/go-zero/core/proc" + "github.com/zeromicro/go-zero/core/prometheus" + + prom "github.com/prometheus/client_golang/prometheus" ) type ( @@ -53,6 +55,10 @@ func NewHistogramVec(cfg *HistogramVecOpts) HistogramVec { } func (hv *promHistogramVec) Observe(v int64, labels ...string) { + if !prometheus.Enabled() { + return + } + hv.histogram.WithLabelValues(labels...).Observe(float64(v)) } diff --git a/core/metric/histogram_test.go b/core/metric/histogram_test.go index e8328649..4874617b 100644 --- a/core/metric/histogram_test.go +++ b/core/metric/histogram_test.go @@ -21,6 +21,7 @@ func TestNewHistogramVec(t *testing.T) { } func TestHistogramObserve(t *testing.T) { + startAgent() histogramVec := NewHistogramVec(&HistogramVecOpts{ Name: "counts", Help: "rpc server requests duration(ms).", diff --git a/core/stores/redis/hook.go b/core/stores/redis/hook.go index 6cd0ab42..4cd8e97c 100644 --- a/core/stores/redis/hook.go +++ b/core/stores/redis/hook.go @@ -56,6 +56,11 @@ func (h hook) AfterProcess(ctx context.Context, cmd red.Cmder) error { logDuration(ctx, cmd, duration) } + metricReqDur.Observe(int64(duration/time.Millisecond), cmd.Name()) + if msg := errFormat(err); len(msg) > 0 { + metricReqErr.Inc(cmd.Name(), msg) + } + return nil } @@ -98,9 +103,43 @@ func (h hook) AfterProcessPipeline(ctx context.Context, cmds []red.Cmder) error logDuration(ctx, cmds[0], duration) } + metricReqDur.Observe(int64(duration/time.Millisecond), "Pipeline") + if msg := errFormat(batchError.Err()); len(msg) > 0 { + metricReqErr.Inc("Pipeline", msg) + } + return nil } +func errFormat(err error) string { + if err == nil || err == red.Nil { + return "" + } + + es := err.Error() + switch { + case strings.HasPrefix(es, "read"): + return "read timeout" + case strings.HasPrefix(es, "dial"): + if strings.Contains(es, "connection refused") { + return "connection refused" + } + return "dial timeout" + case strings.HasPrefix(es, "write"): + return "write timeout" + case strings.Contains(es, "EOF"): + return "eof" + case strings.Contains(es, "reset"): + return "reset" + case strings.Contains(es, "broken"): + return "broken pipe" + case strings.Contains(es, "breaker"): + return "breaker" + default: + return "unexpected error" + } +} + func logDuration(ctx context.Context, cmd red.Cmder, duration time.Duration) { var buf strings.Builder for i, arg := range cmd.Args() { diff --git a/core/stores/redis/metrics.go b/core/stores/redis/metrics.go new file mode 100644 index 00000000..2cb381f4 --- /dev/null +++ b/core/stores/redis/metrics.go @@ -0,0 +1,23 @@ +package redis + +import "github.com/zeromicro/go-zero/core/metric" + +const namespace = "redis_client" + +var ( + metricReqDur = metric.NewHistogramVec(&metric.HistogramVecOpts{ + Namespace: namespace, + Subsystem: "requests", + Name: "duration_ms", + Help: "redis client requests duration(ms).", + Labels: []string{"command"}, + Buckets: []float64{5, 10, 25, 50, 100, 250, 500, 1000, 2500}, + }) + metricReqErr = metric.NewCounterVec(&metric.CounterVecOpts{ + Namespace: namespace, + Subsystem: "requests", + Name: "error_total", + Help: "redis client requests error count.", + Labels: []string{"command", "error"}, + }) +) diff --git a/core/stores/sqlx/metrics.go b/core/stores/sqlx/metrics.go new file mode 100644 index 00000000..3a4c7e0b --- /dev/null +++ b/core/stores/sqlx/metrics.go @@ -0,0 +1,23 @@ +package sqlx + +import "github.com/zeromicro/go-zero/core/metric" + +const namespace = "mysql_client" + +var ( + metricReqDur = metric.NewHistogramVec(&metric.HistogramVecOpts{ + Namespace: namespace, + Subsystem: "requests", + Name: "durations_ms", + Help: "mysql client requests duration(ms).", + Labels: []string{"command"}, + Buckets: []float64{5, 10, 25, 50, 100, 250, 500, 1000, 2500}, + }) + metricReqErr = metric.NewCounterVec(&metric.CounterVecOpts{ + Namespace: namespace, + Subsystem: "requests", + Name: "error_total", + Help: "mysql client requests error count.", + Labels: []string{"command", "error"}, + }) +) diff --git a/core/stores/sqlx/sqlconn.go b/core/stores/sqlx/sqlconn.go index c3e0269d..e5771c54 100644 --- a/core/stores/sqlx/sqlconn.go +++ b/core/stores/sqlx/sqlconn.go @@ -154,6 +154,10 @@ func (db *commonSqlConn) ExecCtx(ctx context.Context, q string, args ...interfac return err }, db.acceptable) + if err == breaker.ErrServiceUnavailable { + metricReqErr.Inc("Exec", "breaker") + } + return } @@ -187,6 +191,10 @@ func (db *commonSqlConn) PrepareCtx(ctx context.Context, query string) (stmt Stm return nil }, db.acceptable) + if err == breaker.ErrServiceUnavailable { + metricReqErr.Inc("Prepare", "breaker") + } + return } @@ -270,9 +278,15 @@ func (db *commonSqlConn) TransactCtx(ctx context.Context, fn func(context.Contex endSpan(span, err) }() - return db.brk.DoWithAcceptable(func() error { + err = db.brk.DoWithAcceptable(func() error { return transact(ctx, db, db.beginTx, fn) }, db.acceptable) + + if err == breaker.ErrServiceUnavailable { + metricReqErr.Inc("Transact", "breaker") + } + + return } func (db *commonSqlConn) acceptable(err error) bool { @@ -287,7 +301,7 @@ func (db *commonSqlConn) acceptable(err error) bool { func (db *commonSqlConn) queryRows(ctx context.Context, scanner func(*sql.Rows) error, q string, args ...interface{}) (err error) { var qerr error - return db.brk.DoWithAcceptable(func() error { + err = db.brk.DoWithAcceptable(func() error { conn, err := db.connProv() if err != nil { db.onError(err) @@ -301,6 +315,12 @@ func (db *commonSqlConn) queryRows(ctx context.Context, scanner func(*sql.Rows) }, func(err error) bool { return qerr == err || db.acceptable(err) }) + + if err == breaker.ErrServiceUnavailable { + metricReqErr.Inc("queryRows", "breaker") + } + + return } func (s statement) Close() error { diff --git a/core/stores/sqlx/sqlconn_test.go b/core/stores/sqlx/sqlconn_test.go index 3f2723ae..8c68c5e6 100644 --- a/core/stores/sqlx/sqlconn_test.go +++ b/core/stores/sqlx/sqlconn_test.go @@ -17,7 +17,8 @@ func init() { } func TestSqlConn(t *testing.T) { - mock := buildConn() + mock, err := buildConn() + assert.Nil(t, err) mock.ExpectExec("any") mock.ExpectQuery("any").WillReturnRows(sqlmock.NewRows([]string{"foo"})) conn := NewMysql(mockedDatasource) @@ -50,8 +51,8 @@ func TestSqlConn(t *testing.T) { })) } -func buildConn() (mock sqlmock.Sqlmock) { - connManager.GetResource(mockedDatasource, func() (io.Closer, error) { +func buildConn() (mock sqlmock.Sqlmock, err error) { + _, err = connManager.GetResource(mockedDatasource, func() (io.Closer, error) { var db *sql.DB var err error db, mock, err = sqlmock.New() diff --git a/core/stores/sqlx/stmt.go b/core/stores/sqlx/stmt.go index 7b6ad26b..35796b1b 100644 --- a/core/stores/sqlx/stmt.go +++ b/core/stores/sqlx/stmt.go @@ -135,6 +135,8 @@ func (e *realSqlGuard) finish(ctx context.Context, err error) { if err != nil { logSqlError(ctx, e.stmt, err) } + + metricReqDur.Observe(int64(duration/time.Millisecond), e.command) } func (e *realSqlGuard) start(q string, args ...interface{}) error { diff --git a/rest/handler/prometheushandler.go b/rest/handler/prometheushandler.go index 639d59c0..fcafa0a1 100644 --- a/rest/handler/prometheushandler.go +++ b/rest/handler/prometheushandler.go @@ -6,7 +6,6 @@ import ( "time" "github.com/zeromicro/go-zero/core/metric" - "github.com/zeromicro/go-zero/core/prometheus" "github.com/zeromicro/go-zero/core/timex" "github.com/zeromicro/go-zero/rest/internal/response" ) @@ -35,10 +34,6 @@ var ( // PrometheusHandler returns a middleware that reports stats to prometheus. func PrometheusHandler(path string) func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { - if !prometheus.Enabled() { - return next - } - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { startTime := timex.Now() cw := &response.WithCodeResponseWriter{Writer: w} diff --git a/zrpc/internal/clientinterceptors/prometheusinterceptor.go b/zrpc/internal/clientinterceptors/prometheusinterceptor.go index b713f09f..56017053 100644 --- a/zrpc/internal/clientinterceptors/prometheusinterceptor.go +++ b/zrpc/internal/clientinterceptors/prometheusinterceptor.go @@ -6,8 +6,8 @@ import ( "time" "github.com/zeromicro/go-zero/core/metric" - "github.com/zeromicro/go-zero/core/prometheus" "github.com/zeromicro/go-zero/core/timex" + "google.golang.org/grpc" "google.golang.org/grpc/status" ) @@ -36,10 +36,6 @@ var ( // PrometheusInterceptor is an interceptor that reports to prometheus server. func PrometheusInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - if !prometheus.Enabled() { - return invoker(ctx, method, req, reply, cc, opts...) - } - startTime := timex.Now() err := invoker(ctx, method, req, reply, cc, opts...) metricClientReqDur.Observe(int64(timex.Since(startTime)/time.Millisecond), method) diff --git a/zrpc/internal/serverinterceptors/prometheusinterceptor.go b/zrpc/internal/serverinterceptors/prometheusinterceptor.go index 86070e38..97ac073e 100644 --- a/zrpc/internal/serverinterceptors/prometheusinterceptor.go +++ b/zrpc/internal/serverinterceptors/prometheusinterceptor.go @@ -6,8 +6,8 @@ import ( "time" "github.com/zeromicro/go-zero/core/metric" - "github.com/zeromicro/go-zero/core/prometheus" "github.com/zeromicro/go-zero/core/timex" + "google.golang.org/grpc" "google.golang.org/grpc/status" ) @@ -36,10 +36,6 @@ var ( // UnaryPrometheusInterceptor reports the statistics to the prometheus server. func UnaryPrometheusInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - if !prometheus.Enabled() { - return handler(ctx, req) - } - startTime := timex.Now() resp, err := handler(ctx, req) metricServerReqDur.Observe(int64(timex.Since(startTime)/time.Millisecond), info.FullMethod)