From cfda972d500487ff2246371158ca15e831eac306 Mon Sep 17 00:00:00 2001 From: maizige <37949155+wsx864321@users.noreply.github.com> Date: Wed, 7 Sep 2022 10:33:01 +0800 Subject: [PATCH] fix:trace graceful stop,pre loss trace (#2358) --- core/service/serviceconf.go | 5 +++++ core/stores/redis/hook_test.go | 3 +++ core/trace/agent.go | 9 ++++++++- rest/handler/tracinghandler_test.go | 1 + rest/httpc/requests_test.go | 1 + .../clientinterceptors/tracinginterceptor_test.go | 1 + zrpc/internal/rpcserver.go | 1 + .../serverinterceptors/tracinginterceptor_test.go | 2 ++ 8 files changed, 22 insertions(+), 1 deletion(-) diff --git a/core/service/serviceconf.go b/core/service/serviceconf.go index 51fcd605..17542814 100644 --- a/core/service/serviceconf.go +++ b/core/service/serviceconf.go @@ -3,6 +3,8 @@ package service import ( "log" + "github.com/zeromicro/go-zero/core/proc" + "github.com/zeromicro/go-zero/core/load" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/prometheus" @@ -56,6 +58,9 @@ func (sc ServiceConf) SetUp() error { sc.Telemetry.Name = sc.Name } trace.StartAgent(sc.Telemetry) + proc.AddShutdownListener(func() { + trace.StopAgent() + }) if len(sc.MetricsUrl) > 0 { stat.SetReportWriter(stat.NewRemoteWriter(sc.MetricsUrl)) diff --git a/core/stores/redis/hook_test.go b/core/stores/redis/hook_test.go index f8480a93..ed10f549 100644 --- a/core/stores/redis/hook_test.go +++ b/core/stores/redis/hook_test.go @@ -21,6 +21,7 @@ func TestHookProcessCase1(t *testing.T) { Batcher: "jaeger", Sampler: 1.0, }) + defer ztrace.StopAgent() writer := log.Writer() var buf strings.Builder @@ -44,6 +45,7 @@ func TestHookProcessCase2(t *testing.T) { Batcher: "jaeger", Sampler: 1.0, }) + defer ztrace.StopAgent() w, restore := injectLog() defer restore() @@ -108,6 +110,7 @@ func TestHookProcessPipelineCase2(t *testing.T) { Batcher: "jaeger", Sampler: 1.0, }) + defer ztrace.StopAgent() w, restore := injectLog() defer restore() diff --git a/core/trace/agent.go b/core/trace/agent.go index 4e1f60b6..9c19cc82 100644 --- a/core/trace/agent.go +++ b/core/trace/agent.go @@ -1,6 +1,7 @@ package trace import ( + "context" "fmt" "sync" @@ -23,6 +24,7 @@ const ( var ( agents = make(map[string]lang.PlaceholderType) lock sync.Mutex + tp *sdktrace.TracerProvider ) // StartAgent starts a opentelemetry agent. @@ -43,6 +45,11 @@ func StartAgent(c Config) { agents[c.Endpoint] = lang.Placeholder } +// StopAgent shuts down the span processors in the order they were registered. +func StopAgent() { + _ = tp.Shutdown(context.Background()) +} + func createExporter(c Config) (sdktrace.SpanExporter, error) { // Just support jaeger and zipkin now, more for later switch c.Batcher { @@ -74,7 +81,7 @@ func startAgent(c Config) error { opts = append(opts, sdktrace.WithBatcher(exp)) } - tp := sdktrace.NewTracerProvider(opts...) + tp = sdktrace.NewTracerProvider(opts...) otel.SetTracerProvider(tp) otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( propagation.TraceContext{}, propagation.Baggage{})) diff --git a/rest/handler/tracinghandler_test.go b/rest/handler/tracinghandler_test.go index 7f1948dd..abbc5127 100644 --- a/rest/handler/tracinghandler_test.go +++ b/rest/handler/tracinghandler_test.go @@ -21,6 +21,7 @@ func TestOtelHandler(t *testing.T) { Batcher: "jaeger", Sampler: 1.0, }) + defer ztrace.StopAgent() for _, test := range []string{"", "bar"} { t.Run(test, func(t *testing.T) { diff --git a/rest/httpc/requests_test.go b/rest/httpc/requests_test.go index e8edf2b1..b5bf9612 100644 --- a/rest/httpc/requests_test.go +++ b/rest/httpc/requests_test.go @@ -22,6 +22,7 @@ func TestDoRequest(t *testing.T) { Batcher: "jaeger", Sampler: 1.0, }) + defer ztrace.StopAgent() svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { })) diff --git a/zrpc/internal/clientinterceptors/tracinginterceptor_test.go b/zrpc/internal/clientinterceptors/tracinginterceptor_test.go index 00c3db1b..963013e2 100644 --- a/zrpc/internal/clientinterceptors/tracinginterceptor_test.go +++ b/zrpc/internal/clientinterceptors/tracinginterceptor_test.go @@ -23,6 +23,7 @@ func TestOpenTracingInterceptor(t *testing.T) { Batcher: "jaeger", Sampler: 1.0, }) + defer trace.StopAgent() cc := new(grpc.ClientConn) ctx := metadata.NewOutgoingContext(context.Background(), metadata.MD{}) diff --git a/zrpc/internal/rpcserver.go b/zrpc/internal/rpcserver.go index 36408776..8e76c475 100644 --- a/zrpc/internal/rpcserver.go +++ b/zrpc/internal/rpcserver.go @@ -6,6 +6,7 @@ import ( "github.com/zeromicro/go-zero/core/proc" "github.com/zeromicro/go-zero/core/stat" "github.com/zeromicro/go-zero/zrpc/internal/serverinterceptors" + "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" ) diff --git a/zrpc/internal/serverinterceptors/tracinginterceptor_test.go b/zrpc/internal/serverinterceptors/tracinginterceptor_test.go index 727a4b50..81c53ce5 100644 --- a/zrpc/internal/serverinterceptors/tracinginterceptor_test.go +++ b/zrpc/internal/serverinterceptors/tracinginterceptor_test.go @@ -32,6 +32,8 @@ func TestUnaryOpenTracingInterceptor_Enabled(t *testing.T) { Batcher: "jaeger", Sampler: 1.0, }) + defer trace.StopAgent() + _, err := UnaryTracingInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{ FullMethod: "/package.TestService.GetUser", }, func(ctx context.Context, req interface{}) (interface{}, error) {