From ba43214daef192f7cae57e2b099561627382cdfc Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Mon, 1 Nov 2021 15:04:38 +0800 Subject: [PATCH] feat: slow threshold customizable in zrpc (#1191) * feat: slow threshold customizable in rest * feat: slow threshold customizable in rest * feat: slow threshold customizable in rest * feat: slow threshold customizable in zrpc --- rest/server.go | 3 +++ zrpc/client.go | 6 ++++++ zrpc/client_test.go | 2 ++ .../clientinterceptors/durationinterceptor.go | 12 ++++++++++-- .../clientinterceptors/durationinterceptor_test.go | 7 +++++++ zrpc/internal/serverinterceptors/statinterceptor.go | 12 ++++++++++-- .../serverinterceptors/statinterceptor_test.go | 6 ++++++ zrpc/server.go | 5 +++++ zrpc/server_test.go | 1 + 9 files changed, 50 insertions(+), 4 deletions(-) diff --git a/rest/server.go b/rest/server.go index 0046712c..56ef157e 100644 --- a/rest/server.go +++ b/rest/server.go @@ -11,6 +11,9 @@ import ( "github.com/tal-tech/go-zero/rest/router" ) +// SetSlowThreshold sets the slow threshold. +var SetSlowThreshold = handler.SetSlowThreshold + type ( runOptions struct { start func(*engine) error diff --git a/zrpc/client.go b/zrpc/client.go index c475e411..242d1924 100644 --- a/zrpc/client.go +++ b/zrpc/client.go @@ -7,6 +7,7 @@ import ( "github.com/tal-tech/go-zero/core/discov" "github.com/tal-tech/go-zero/zrpc/internal" "github.com/tal-tech/go-zero/zrpc/internal/auth" + "github.com/tal-tech/go-zero/zrpc/internal/clientinterceptors" "google.golang.org/grpc" ) @@ -101,3 +102,8 @@ func NewClientWithTarget(target string, opts ...ClientOption) (Client, error) { func (rc *RpcClient) Conn() *grpc.ClientConn { return rc.client.Conn() } + +// SetClientSlowThreshold sets the slow threshold on client side. +func SetClientSlowThreshold(threshold time.Duration) { + clientinterceptors.SetSlowThreshold(threshold) +} diff --git a/zrpc/client_test.go b/zrpc/client_test.go index c42931c7..545ae124 100644 --- a/zrpc/client_test.go +++ b/zrpc/client_test.go @@ -108,6 +108,8 @@ func TestDepositServer_Deposit(t *testing.T) { tarConfClient, targetClient, } + SetClientSlowThreshold(time.Second) + for _, tt := range tests { tt := tt for _, client := range clients { diff --git a/zrpc/internal/clientinterceptors/durationinterceptor.go b/zrpc/internal/clientinterceptors/durationinterceptor.go index 5eda2ebf..b268a3d0 100644 --- a/zrpc/internal/clientinterceptors/durationinterceptor.go +++ b/zrpc/internal/clientinterceptors/durationinterceptor.go @@ -6,11 +6,14 @@ import ( "time" "github.com/tal-tech/go-zero/core/logx" + "github.com/tal-tech/go-zero/core/syncx" "github.com/tal-tech/go-zero/core/timex" "google.golang.org/grpc" ) -const slowThreshold = time.Millisecond * 500 +const defaultSlowThreshold = time.Millisecond * 500 + +var slowThreshold = syncx.ForAtomicDuration(defaultSlowThreshold) // DurationInterceptor is an interceptor that logs the processing time. func DurationInterceptor(ctx context.Context, method string, req, reply interface{}, @@ -23,7 +26,7 @@ func DurationInterceptor(ctx context.Context, method string, req, reply interfac serverName, req, err.Error()) } else { elapsed := timex.Since(start) - if elapsed > slowThreshold { + if elapsed > slowThreshold.Load() { logx.WithContext(ctx).WithDuration(elapsed).Slowf("[RPC] ok - slowcall - %s - %v - %v", serverName, req, reply) } @@ -31,3 +34,8 @@ func DurationInterceptor(ctx context.Context, method string, req, reply interfac return err } + +// SetSlowThreshold sets the slow threshold. +func SetSlowThreshold(threshold time.Duration) { + slowThreshold.Set(threshold) +} diff --git a/zrpc/internal/clientinterceptors/durationinterceptor_test.go b/zrpc/internal/clientinterceptors/durationinterceptor_test.go index 0d88302c..9acd0efb 100644 --- a/zrpc/internal/clientinterceptors/durationinterceptor_test.go +++ b/zrpc/internal/clientinterceptors/durationinterceptor_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "time" "github.com/stretchr/testify/assert" "google.golang.org/grpc" @@ -35,3 +36,9 @@ func TestDurationInterceptor(t *testing.T) { }) } } + +func TestSetSlowThreshold(t *testing.T) { + assert.Equal(t, defaultSlowThreshold, slowThreshold.Load()) + SetSlowThreshold(time.Second) + assert.Equal(t, time.Second, slowThreshold.Load()) +} diff --git a/zrpc/internal/serverinterceptors/statinterceptor.go b/zrpc/internal/serverinterceptors/statinterceptor.go index 8eacd4a0..71952abc 100644 --- a/zrpc/internal/serverinterceptors/statinterceptor.go +++ b/zrpc/internal/serverinterceptors/statinterceptor.go @@ -7,12 +7,20 @@ import ( "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/stat" + "github.com/tal-tech/go-zero/core/syncx" "github.com/tal-tech/go-zero/core/timex" "google.golang.org/grpc" "google.golang.org/grpc/peer" ) -const serverSlowThreshold = time.Millisecond * 500 +const defaultSlowThreshold = time.Millisecond * 500 + +var slowThreshold = syncx.ForAtomicDuration(defaultSlowThreshold) + +// SetSlowThreshold sets the slow threshold. +func SetSlowThreshold(threshold time.Duration) { + slowThreshold.Set(threshold) +} // UnaryStatInterceptor returns a func that uses given metrics to report stats. func UnaryStatInterceptor(metrics *stat.Metrics) grpc.UnaryServerInterceptor { @@ -44,7 +52,7 @@ func logDuration(ctx context.Context, method string, req interface{}, duration t content, err := json.Marshal(req) if err != nil { logx.WithContext(ctx).Errorf("%s - %s", addr, err.Error()) - } else if duration > serverSlowThreshold { + } else if duration > slowThreshold.Load() { logx.WithContext(ctx).WithDuration(duration).Slowf("[RPC] slowcall - %s - %s - %s", addr, method, string(content)) } else { diff --git a/zrpc/internal/serverinterceptors/statinterceptor_test.go b/zrpc/internal/serverinterceptors/statinterceptor_test.go index dfd67c49..fb53dfa9 100644 --- a/zrpc/internal/serverinterceptors/statinterceptor_test.go +++ b/zrpc/internal/serverinterceptors/statinterceptor_test.go @@ -13,6 +13,12 @@ import ( "google.golang.org/grpc/peer" ) +func TestSetSlowThreshold(t *testing.T) { + assert.Equal(t, defaultSlowThreshold, slowThreshold.Load()) + SetSlowThreshold(time.Second) + assert.Equal(t, time.Second, slowThreshold.Load()) +} + func TestUnaryStatInterceptor(t *testing.T) { metrics := stat.NewMetrics("mock") interceptor := UnaryStatInterceptor(metrics) diff --git a/zrpc/server.go b/zrpc/server.go index f2dd0648..dc7099db 100644 --- a/zrpc/server.go +++ b/zrpc/server.go @@ -95,6 +95,11 @@ func (rs *RpcServer) Stop() { logx.Close() } +// SetServerSlowThreshold sets the slow threshold on server side. +func SetServerSlowThreshold(threshold time.Duration) { + serverinterceptors.SetSlowThreshold(threshold) +} + func setupInterceptors(server internal.Server, c RpcServerConf, metrics *stat.Metrics) error { if c.CpuThreshold > 0 { shedder := load.NewAdaptiveShedder(load.WithCpuThreshold(c.CpuThreshold)) diff --git a/zrpc/server_test.go b/zrpc/server_test.go index 4f24c68f..8adcd04d 100644 --- a/zrpc/server_test.go +++ b/zrpc/server_test.go @@ -35,6 +35,7 @@ func TestServer_setupInterceptors(t *testing.T) { } func TestServer(t *testing.T) { + SetServerSlowThreshold(time.Second) srv := MustNewServer(RpcServerConf{ ServiceConf: service.ServiceConf{ Log: logx.LogConf{