From 5b35fa17de22f9f39c7905e4fd661b11017089fd Mon Sep 17 00:00:00 2001 From: neosu Date: Sun, 22 Aug 2021 10:03:56 +0800 Subject: [PATCH] add the opentelemetry tracing (#908) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add the opentelemetry tracing * fix the error sampler config * 添加stream的链路跟踪 * fix the error field name --- core/opentelemetry/agent.go | 60 ++++++++ core/opentelemetry/attributes.go | 45 ++++++ core/opentelemetry/clientstream.go | 128 ++++++++++++++++++ core/opentelemetry/config.go | 13 ++ core/opentelemetry/message.go | 34 +++++ core/opentelemetry/serverstream.go | 48 +++++++ core/opentelemetry/tracer.go | 50 +++++++ core/opentelemetry/utils.go | 61 +++++++++ core/service/serviceconf.go | 14 +- go.mod | 7 +- go.sum | 24 ++-- zrpc/internal/client.go | 4 + .../opentracinginterceptor.go | 108 +++++++++++++++ .../opentracinginterceptor_test.go | 27 ++++ zrpc/internal/rpcserver.go | 2 + .../opentracinginterceptor.go | 96 +++++++++++++ .../opentracinginterceptor_test.go | 36 +++++ 17 files changed, 739 insertions(+), 18 deletions(-) create mode 100644 core/opentelemetry/agent.go create mode 100644 core/opentelemetry/attributes.go create mode 100644 core/opentelemetry/clientstream.go create mode 100644 core/opentelemetry/config.go create mode 100644 core/opentelemetry/message.go create mode 100644 core/opentelemetry/serverstream.go create mode 100644 core/opentelemetry/tracer.go create mode 100644 core/opentelemetry/utils.go create mode 100644 zrpc/internal/clientinterceptors/opentracinginterceptor.go create mode 100644 zrpc/internal/clientinterceptors/opentracinginterceptor_test.go create mode 100644 zrpc/internal/serverinterceptors/opentracinginterceptor.go create mode 100644 zrpc/internal/serverinterceptors/opentracinginterceptor_test.go diff --git a/core/opentelemetry/agent.go b/core/opentelemetry/agent.go new file mode 100644 index 00000000..5dd518d5 --- /dev/null +++ b/core/opentelemetry/agent.go @@ -0,0 +1,60 @@ +package opentelemetry + +import ( + "sync" + + "github.com/tal-tech/go-zero/core/logx" + "github.com/tal-tech/go-zero/core/syncx" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/jaeger" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" +) + +var ( + once sync.Once + enabled syncx.AtomicBool +) + +// Enabled returns if prometheus is enabled. +func Enabled() bool { + return enabled.True() +} + +// StartAgent starts a prometheus agent. +func StartAgent(c Config) { + once.Do(func() { + if len(c.Endpoint) == 0 { + return + } + // Just support jaeger now + if c.Batcher != "jaeger" { + return + } + + exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(c.Endpoint))) + if err != nil { + logx.Error(err) + return + } + + tp := tracesdk.NewTracerProvider( + // Set the sampling rate based on the parent span to 100% + tracesdk.WithSampler(tracesdk.ParentBased(tracesdk.TraceIDRatioBased(c.Sampler))), + // Always be sure to batch in production. + tracesdk.WithBatcher(exp), + // Record information about this application in an Resource. + tracesdk.WithResource(resource.NewSchemaless( + semconv.ServiceNameKey.String(c.Name), + )), + ) + + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) + + enabled.Set(true) + + }) +} diff --git a/core/opentelemetry/attributes.go b/core/opentelemetry/attributes.go new file mode 100644 index 00000000..0c5f5483 --- /dev/null +++ b/core/opentelemetry/attributes.go @@ -0,0 +1,45 @@ +package opentelemetry + +import ( + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + grpc_codes "google.golang.org/grpc/codes" +) + +const ( + // GRPCStatusCodeKey is convention for numeric status code of a gRPC request. + GRPCStatusCodeKey = attribute.Key("rpc.grpc.status_code") + + // Name of message transmitted or received. + RPCNameKey = attribute.Key("name") + + // Type of message transmitted or received. + RPCMessageTypeKey = attribute.Key("message.type") + + // Identifier of message transmitted or received. + RPCMessageIDKey = attribute.Key("message.id") + + // The compressed size of the message transmitted or received in bytes. + RPCMessageCompressedSizeKey = attribute.Key("message.compressed_size") + + // The uncompressed size of the message transmitted or received in + // bytes. + RPCMessageUncompressedSizeKey = attribute.Key("message.uncompressed_size") +) + +// Semantic conventions for common RPC attributes. +var ( + // Semantic convention for gRPC as the remoting system. + RPCSystemGRPC = semconv.RPCSystemKey.String("grpc") + + // Semantic convention for a message named message. + RPCNameMessage = RPCNameKey.String("message") + + // Semantic conventions for RPC message types. + RPCMessageTypeSent = RPCMessageTypeKey.String("SENT") + RPCMessageTypeReceived = RPCMessageTypeKey.String("RECEIVED") +) + +func StatusCodeAttr(c grpc_codes.Code) attribute.KeyValue { + return GRPCStatusCodeKey.Int64(int64(c)) +} diff --git a/core/opentelemetry/clientstream.go b/core/opentelemetry/clientstream.go new file mode 100644 index 00000000..f292a5bb --- /dev/null +++ b/core/opentelemetry/clientstream.go @@ -0,0 +1,128 @@ +package opentelemetry + +import ( + "context" + "io" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/proto" +) + +type streamEventType int + +type streamEvent struct { + Type streamEventType + Err error +} + +const ( + receiveEndEvent streamEventType = iota + errorEvent +) + +type clientStream struct { + grpc.ClientStream + + desc *grpc.StreamDesc + events chan streamEvent + eventsDone chan struct{} + Finished chan error + + receivedMessageID int + sentMessageID int +} + +var _ = proto.Marshal + +func (w *clientStream) RecvMsg(m interface{}) error { + err := w.ClientStream.RecvMsg(m) + + if err == nil && !w.desc.ServerStreams { + w.sendStreamEvent(receiveEndEvent, nil) + } else if err == io.EOF { + w.sendStreamEvent(receiveEndEvent, nil) + } else if err != nil { + w.sendStreamEvent(errorEvent, err) + } else { + w.receivedMessageID++ + MessageReceived.Event(w.Context(), w.receivedMessageID, m) + } + + return err +} + +func (w *clientStream) SendMsg(m interface{}) error { + err := w.ClientStream.SendMsg(m) + + w.sentMessageID++ + MessageSent.Event(w.Context(), w.sentMessageID, m) + + if err != nil { + w.sendStreamEvent(errorEvent, err) + } + + return err +} + +func (w *clientStream) Header() (metadata.MD, error) { + md, err := w.ClientStream.Header() + + if err != nil { + w.sendStreamEvent(errorEvent, err) + } + + return md, err +} + +func (w *clientStream) CloseSend() error { + err := w.ClientStream.CloseSend() + + if err != nil { + w.sendStreamEvent(errorEvent, err) + } + + return err +} + +func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) { + select { + case <-w.eventsDone: + case w.events <- streamEvent{Type: eventType, Err: err}: + } +} + +func WrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream { + events := make(chan streamEvent) + eventsDone := make(chan struct{}) + finished := make(chan error) + + go func() { + defer close(eventsDone) + + for { + select { + case event := <-events: + switch event.Type { + case receiveEndEvent: + finished <- nil + return + case errorEvent: + finished <- event.Err + return + } + case <-ctx.Done(): + finished <- ctx.Err() + return + } + } + }() + + return &clientStream{ + ClientStream: s, + desc: desc, + events: events, + eventsDone: eventsDone, + Finished: finished, + } +} diff --git a/core/opentelemetry/config.go b/core/opentelemetry/config.go new file mode 100644 index 00000000..2d5dbef4 --- /dev/null +++ b/core/opentelemetry/config.go @@ -0,0 +1,13 @@ +package opentelemetry + +const ( + TraceName = "go-zero" +) + +// A Config is a opentelemetry config. +type Config struct { + Name string `json:",optional"` + Endpoint string `json:",optional"` + Sampler float64 `json:",default=1.0"` + Batcher string `json:",default=jaeger"` +} diff --git a/core/opentelemetry/message.go b/core/opentelemetry/message.go new file mode 100644 index 00000000..fa24ae36 --- /dev/null +++ b/core/opentelemetry/message.go @@ -0,0 +1,34 @@ +package opentelemetry + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "google.golang.org/protobuf/proto" +) + +var ( + MessageSent = messageType(RPCMessageTypeSent) + MessageReceived = messageType(RPCMessageTypeReceived) +) + +type messageType attribute.KeyValue + +// Event adds an event of the messageType to the span associated with the +// passed context with id and size (if message is a proto message). +func (m messageType) Event(ctx context.Context, id int, message interface{}) { + span := trace.SpanFromContext(ctx) + if p, ok := message.(proto.Message); ok { + span.AddEvent("message", trace.WithAttributes( + attribute.KeyValue(m), + RPCMessageIDKey.Int(id), + RPCMessageUncompressedSizeKey.Int(proto.Size(p)), + )) + } else { + span.AddEvent("message", trace.WithAttributes( + attribute.KeyValue(m), + RPCMessageIDKey.Int(id), + )) + } +} diff --git a/core/opentelemetry/serverstream.go b/core/opentelemetry/serverstream.go new file mode 100644 index 00000000..7ad5e5b5 --- /dev/null +++ b/core/opentelemetry/serverstream.go @@ -0,0 +1,48 @@ +package opentelemetry + +import ( + "context" + + "google.golang.org/grpc" +) + +// serverStream wraps around the embedded grpc.ServerStream, and intercepts the RecvMsg and +// SendMsg method call. +type serverStream struct { + grpc.ServerStream + ctx context.Context + + receivedMessageID int + sentMessageID int +} + +func (w *serverStream) Context() context.Context { + return w.ctx +} + +func (w *serverStream) RecvMsg(m interface{}) error { + err := w.ServerStream.RecvMsg(m) + + if err == nil { + w.receivedMessageID++ + MessageReceived.Event(w.Context(), w.receivedMessageID, m) + } + + return err +} + +func (w *serverStream) SendMsg(m interface{}) error { + err := w.ServerStream.SendMsg(m) + + w.sentMessageID++ + MessageSent.Event(w.Context(), w.sentMessageID, m) + + return err +} + +func WrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream { + return &serverStream{ + ServerStream: ss, + ctx: ctx, + } +} diff --git a/core/opentelemetry/tracer.go b/core/opentelemetry/tracer.go new file mode 100644 index 00000000..0273787d --- /dev/null +++ b/core/opentelemetry/tracer.go @@ -0,0 +1,50 @@ +package opentelemetry + +import ( + "context" + + "go.opentelemetry.io/otel/baggage" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/metadata" +) + +type metadataSupplier struct { + metadata *metadata.MD +} + +// assert that metadataSupplier implements the TextMapCarrier interface +var _ propagation.TextMapCarrier = &metadataSupplier{} + +func (s *metadataSupplier) Get(key string) string { + values := s.metadata.Get(key) + if len(values) == 0 { + return "" + } + return values[0] +} + +func (s *metadataSupplier) Set(key string, value string) { + s.metadata.Set(key, value) +} + +func (s *metadataSupplier) Keys() []string { + out := make([]string, 0, len(*s.metadata)) + for key := range *s.metadata { + out = append(out, key) + } + return out +} +func Inject(ctx context.Context, p propagation.TextMapPropagator, metadata *metadata.MD) { + p.Inject(ctx, &metadataSupplier{ + metadata: metadata, + }) +} + +func Extract(ctx context.Context, p propagation.TextMapPropagator, metadata *metadata.MD) (baggage.Baggage, trace.SpanContext) { + ctx = p.Extract(ctx, &metadataSupplier{ + metadata: metadata, + }) + + return baggage.FromContext(ctx), trace.SpanContextFromContext(ctx) +} diff --git a/core/opentelemetry/utils.go b/core/opentelemetry/utils.go new file mode 100644 index 00000000..56e27af3 --- /dev/null +++ b/core/opentelemetry/utils.go @@ -0,0 +1,61 @@ +package opentelemetry + +import ( + "context" + "net" + "strings" + + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "google.golang.org/grpc/peer" +) + +func PeerFromCtx(ctx context.Context) string { + p, ok := peer.FromContext(ctx) + if !ok { + return "" + } + return p.Addr.String() +} + +func SpanInfo(fullMethod, peerAddress string) (string, []attribute.KeyValue) { + attrs := []attribute.KeyValue{RPCSystemGRPC} + name, mAttrs := ParseFullMethod(fullMethod) + attrs = append(attrs, mAttrs...) + attrs = append(attrs, PeerAttr(peerAddress)...) + return name, attrs +} + +func ParseFullMethod(fullMethod string) (string, []attribute.KeyValue) { + name := strings.TrimLeft(fullMethod, "/") + parts := strings.SplitN(name, "/", 2) + if len(parts) != 2 { + // Invalid format, does not follow `/package.service/method`. + return name, []attribute.KeyValue(nil) + } + + var attrs []attribute.KeyValue + if service := parts[0]; service != "" { + attrs = append(attrs, semconv.RPCServiceKey.String(service)) + } + if method := parts[1]; method != "" { + attrs = append(attrs, semconv.RPCMethodKey.String(method)) + } + return name, attrs +} + +func PeerAttr(addr string) []attribute.KeyValue { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return []attribute.KeyValue(nil) + } + + if host == "" { + host = "127.0.0.1" + } + + return []attribute.KeyValue{ + semconv.NetPeerIPKey.String(host), + semconv.NetPeerPortKey.String(port), + } +} diff --git a/core/service/serviceconf.go b/core/service/serviceconf.go index e5b64579..d31dba20 100644 --- a/core/service/serviceconf.go +++ b/core/service/serviceconf.go @@ -5,6 +5,7 @@ import ( "github.com/tal-tech/go-zero/core/load" "github.com/tal-tech/go-zero/core/logx" + "github.com/tal-tech/go-zero/core/opentelemetry" "github.com/tal-tech/go-zero/core/prometheus" "github.com/tal-tech/go-zero/core/stat" ) @@ -24,11 +25,12 @@ const ( // A ServiceConf is a service config. type ServiceConf struct { - Name string - Log logx.LogConf - Mode string `json:",default=pro,options=dev|test|rt|pre|pro"` - MetricsUrl string `json:",optional"` - Prometheus prometheus.Config `json:",optional"` + Name string + Log logx.LogConf + Mode string `json:",default=pro,options=dev|test|rt|pre|pro"` + MetricsUrl string `json:",optional"` + Prometheus prometheus.Config `json:",optional"` + OpenTelemetry opentelemetry.Config `json:",optional"` } // MustSetUp sets up the service, exits on error. @@ -49,6 +51,8 @@ func (sc ServiceConf) SetUp() error { sc.initMode() prometheus.StartAgent(sc.Prometheus) + opentelemetry.StartAgent(sc.OpenTelemetry) + if len(sc.MetricsUrl) > 0 { stat.SetReportWriter(stat.NewRemoteWriter(sc.MetricsUrl)) } diff --git a/go.mod b/go.mod index ba466e9e..67319ac3 100644 --- a/go.mod +++ b/go.mod @@ -36,15 +36,20 @@ require ( github.com/stretchr/testify v1.7.0 github.com/urfave/cli v1.22.5 github.com/zeromicro/antlr v0.0.1 - github.com/zeromicro/ddl-parser v0.0.0-20210712021150-63520aca7348 // indirect + github.com/zeromicro/ddl-parser v0.0.0-20210712021150-63520aca7348 go.etcd.io/etcd/api/v3 v3.5.0 go.etcd.io/etcd/client/v3 v3.5.0 + go.opentelemetry.io/otel v1.0.0-RC2 + go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC2 + go.opentelemetry.io/otel/sdk v1.0.0-RC2 + go.opentelemetry.io/otel/trace v1.0.0-RC2 go.uber.org/automaxprocs v1.3.0 golang.org/x/net v0.0.0-20210716203947-853a461950ff golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba google.golang.org/genproto v0.0.0-20210722135532-667f2b7c528f // indirect google.golang.org/grpc v1.39.0 + google.golang.org/protobuf v1.27.1 gopkg.in/cheggaaa/pb.v1 v1.0.28 gopkg.in/h2non/gock.v1 v1.0.15 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index 0f0342d2..2b4ac855 100644 --- a/go.sum +++ b/go.sum @@ -114,8 +114,9 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= @@ -221,6 +222,7 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -237,8 +239,6 @@ github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb h1:ZkM6LRnq40pR1Ox github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ= github.com/zeromicro/antlr v0.0.1 h1:CQpIn/dc0pUjgGQ81y98s/NGOm2Hfru2NNio2I9mQgk= github.com/zeromicro/antlr v0.0.1/go.mod h1:nfpjEwFR6Q4xGDJMcZnCL9tEfQRgszMwu3rDz2Z+p5M= -github.com/zeromicro/ddl-parser v0.0.0-20210710132903-bc9dbb9789b1 h1:zItUIfobEHTYD9X0fAt9QWEWIFWDa8CypF+Z62zIR+M= -github.com/zeromicro/ddl-parser v0.0.0-20210710132903-bc9dbb9789b1/go.mod h1:ISU/8NuPyEpl9pa17Py9TBPetMjtsiHrb9f5XGiYbo8= github.com/zeromicro/ddl-parser v0.0.0-20210712021150-63520aca7348 h1:OhxL9tn28gDeJVzreIUiE5oVxZCjL3tBJ0XBNw8p5R8= github.com/zeromicro/ddl-parser v0.0.0-20210712021150-63520aca7348/go.mod h1:ISU/8NuPyEpl9pa17Py9TBPetMjtsiHrb9f5XGiYbo8= go.etcd.io/etcd/api/v3 v3.5.0 h1:GsV3S+OfZEOCNXdtNkBSR7kgLobAa/SO6tCxRa0GAYw= @@ -247,6 +247,14 @@ go.etcd.io/etcd/client/pkg/v3 v3.5.0 h1:2aQv6F436YnN7I4VbI8PPYrBhu+SmrTaADcf8Mi/ go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v3 v3.5.0 h1:62Eh0XOro+rDwkrypAGDfgmNh5Joq+z+W9HZdlXMzek= go.etcd.io/etcd/client/v3 v3.5.0/go.mod h1:AIKXXVX/DQXtfTEqBryiLTUXwON+GuvO6Z7lLS/oTh0= +go.opentelemetry.io/otel v1.0.0-RC2 h1:SHhxSjB+omnGZPgGlKe+QMp3MyazcOHdQ8qwo89oKbg= +go.opentelemetry.io/otel v1.0.0-RC2/go.mod h1:w1thVQ7qbAy8MHb0IFj8a5Q2QU0l2ksf8u/CN8m3NOM= +go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC2 h1:RF0nWsIDpDBe+s06lkLxUw9CWQUAhO6hBSxxB7dz45s= +go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC2/go.mod h1:sZZqN3Vb0iT+NE6mZ1S7sNyH3t4PFk6ElK5TLGFBZ7E= +go.opentelemetry.io/otel/sdk v1.0.0-RC2 h1:ROuteeSCBaZNjiT9JcFzZepmInDvLktR28Y6qKo8bCs= +go.opentelemetry.io/otel/sdk v1.0.0-RC2/go.mod h1:fgwHyiDn4e5k40TD9VX243rOxXR+jzsWBZYA2P5jpEw= +go.opentelemetry.io/otel/trace v1.0.0-RC2 h1:dunAP0qDULMIT82atj34m5RgvsIK6LcsXf1c/MsYg1w= +go.opentelemetry.io/otel/trace v1.0.0-RC2/go.mod h1:JPQ+z6nNw9mqEGT8o3eoPTdnNI+Aj5JcxEsVGREIAy4= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= @@ -287,8 +295,6 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q= -golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210716203947-853a461950ff h1:j2EK/QoxYNBsXI4R7fQkkRUk8y6wnOBI+6hgPdP/6Ds= golang.org/x/net v0.0.0-20210716203947-853a461950ff/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -324,10 +330,9 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 h1:RqytpXGR1iVNX7psjB3ff8y7sNFinVFvkx1c8SjBkio= -golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -353,7 +358,6 @@ golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -367,8 +371,6 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= -google.golang.org/genproto v0.0.0-20210617175327-b9e0b3197ced h1:c5geK1iMU3cDKtFrCVQIcjR3W+JOZMuhIyICMCTbtus= -google.golang.org/genproto v0.0.0-20210617175327-b9e0b3197ced/go.mod h1:SzzZ/N+nwJDaO1kznhnlzqS8ocJICar6hYhVyhi++24= google.golang.org/genproto v0.0.0-20210722135532-667f2b7c528f h1:YORWxaStkWBnWgELOHTmDrqNlFXuVGEbhwbB5iK94bQ= google.golang.org/genproto v0.0.0-20210722135532-667f2b7c528f/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -377,7 +379,6 @@ google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= -google.golang.org/grpc v1.38.0 h1:/9BgsAsa5nWe26HqOlvlgJnqBuktYOLCgjCPqsa56W0= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.39.0 h1:Klz8I9kdtkIN6EpHHUOMLCYhTn/2WAe5a0s1hcBkdTI= google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= @@ -391,7 +392,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= diff --git a/zrpc/internal/client.go b/zrpc/internal/client.go index 96111782..945e0234 100644 --- a/zrpc/internal/client.go +++ b/zrpc/internal/client.go @@ -72,6 +72,10 @@ func (c *client) buildDialOptions(opts ...ClientOption) []grpc.DialOption { clientinterceptors.PrometheusInterceptor, clientinterceptors.BreakerInterceptor, clientinterceptors.TimeoutInterceptor(cliOpts.Timeout), + clientinterceptors.OpenTracingInterceptor(), + ), + WithStreamClientInterceptors( + clientinterceptors.StreamOpenTracingInterceptor(), ), } diff --git a/zrpc/internal/clientinterceptors/opentracinginterceptor.go b/zrpc/internal/clientinterceptors/opentracinginterceptor.go new file mode 100644 index 00000000..8311f799 --- /dev/null +++ b/zrpc/internal/clientinterceptors/opentracinginterceptor.go @@ -0,0 +1,108 @@ +package clientinterceptors + +import ( + "context" + + "github.com/tal-tech/go-zero/core/opentelemetry" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc" + grpc_codes "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +func OpenTracingInterceptor() grpc.UnaryClientInterceptor { + propagator := otel.GetTextMapPropagator() + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + if !opentelemetry.Enabled() { + return invoker(ctx, method, req, reply, cc, opts...) + } + + requestMetadata, _ := metadata.FromOutgoingContext(ctx) + metadataCopy := requestMetadata.Copy() + + tr := otel.Tracer(opentelemetry.TraceName) + name, attr := opentelemetry.SpanInfo(method, cc.Target()) + + var span trace.Span + ctx, span = tr.Start(ctx, + name, + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes(attr...), + ) + defer span.End() + + opentelemetry.Inject(ctx, propagator, &metadataCopy) + ctx = metadata.NewOutgoingContext(ctx, metadataCopy) + + opentelemetry.MessageSent.Event(ctx, 1, req) + + err := invoker(ctx, method, req, reply, cc, opts...) + + opentelemetry.MessageReceived.Event(ctx, 1, reply) + + if err != nil { + s, _ := status.FromError(err) + span.SetStatus(codes.Error, s.Message()) + span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code())) + } else { + span.SetAttributes(opentelemetry.StatusCodeAttr(grpc_codes.OK)) + } + + return err + } +} + +func StreamOpenTracingInterceptor() grpc.StreamClientInterceptor { + propagator := otel.GetTextMapPropagator() + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + if !opentelemetry.Enabled() { + return streamer(ctx, desc, cc, method, opts...) + } + + requestMetadata, _ := metadata.FromOutgoingContext(ctx) + metadataCopy := requestMetadata.Copy() + + tr := otel.Tracer("ecoplants") + + name, attr := opentelemetry.SpanInfo(method, cc.Target()) + var span trace.Span + ctx, span = tr.Start( + ctx, + name, + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes(attr...), + ) + + opentelemetry.Inject(ctx, propagator, &metadataCopy) + ctx = metadata.NewOutgoingContext(ctx, metadataCopy) + + s, err := streamer(ctx, desc, cc, method, opts...) + if err != nil { + grpcStatus, _ := status.FromError(err) + span.SetStatus(codes.Error, grpcStatus.Message()) + span.SetAttributes(opentelemetry.StatusCodeAttr(grpcStatus.Code())) + span.End() + return s, err + } + stream := opentelemetry.WrapClientStream(ctx, s, desc) + + go func() { + err := <-stream.Finished + + if err != nil { + s, _ := status.FromError(err) + span.SetStatus(codes.Error, s.Message()) + span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code())) + } else { + span.SetAttributes(opentelemetry.StatusCodeAttr(grpc_codes.OK)) + } + + span.End() + }() + + return stream, nil + } +} diff --git a/zrpc/internal/clientinterceptors/opentracinginterceptor_test.go b/zrpc/internal/clientinterceptors/opentracinginterceptor_test.go new file mode 100644 index 00000000..ed01d2f9 --- /dev/null +++ b/zrpc/internal/clientinterceptors/opentracinginterceptor_test.go @@ -0,0 +1,27 @@ +package clientinterceptors + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tal-tech/go-zero/core/opentelemetry" + "google.golang.org/grpc" +) + +func TestOpenTracingInterceptor(t *testing.T) { + opentelemetry.StartAgent(opentelemetry.Config{ + Name: "go-zero-test", + Endpoint: "http://localhost:14268/api/traces", + Batcher: "jaeger", + Sampler: 1.0, + }) + + cc := new(grpc.ClientConn) + err := OpenTracingInterceptor()(context.Background(), "/ListUser", nil, nil, cc, + func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, + opts ...grpc.CallOption) error { + return nil + }) + assert.Nil(t, err) +} diff --git a/zrpc/internal/rpcserver.go b/zrpc/internal/rpcserver.go index 3b5c9a3e..42d4defd 100644 --- a/zrpc/internal/rpcserver.go +++ b/zrpc/internal/rpcserver.go @@ -59,11 +59,13 @@ func (s *rpcServer) Start(register RegisterFn) error { serverinterceptors.UnaryStatInterceptor(s.metrics), serverinterceptors.UnaryPrometheusInterceptor(), serverinterceptors.UnaryBreakerInterceptor(), + serverinterceptors.UnaryOpenTracingInterceptor(), } unaryInterceptors = append(unaryInterceptors, s.unaryInterceptors...) streamInterceptors := []grpc.StreamServerInterceptor{ serverinterceptors.StreamCrashInterceptor, serverinterceptors.StreamBreakerInterceptor, + serverinterceptors.StreamOpenTracingInterceptor(), } streamInterceptors = append(streamInterceptors, s.streamInterceptors...) options := append(s.options, WithUnaryServerInterceptors(unaryInterceptors...), diff --git a/zrpc/internal/serverinterceptors/opentracinginterceptor.go b/zrpc/internal/serverinterceptors/opentracinginterceptor.go new file mode 100644 index 00000000..76e0d921 --- /dev/null +++ b/zrpc/internal/serverinterceptors/opentracinginterceptor.go @@ -0,0 +1,96 @@ +package serverinterceptors + +import ( + "context" + + "github.com/tal-tech/go-zero/core/opentelemetry" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/baggage" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc" + grpc_codes "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +func UnaryOpenTracingInterceptor() grpc.UnaryServerInterceptor { + propagator := otel.GetTextMapPropagator() + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + if !opentelemetry.Enabled() { + return handler(ctx, req) + } + + requestMetadata, _ := metadata.FromIncomingContext(ctx) + metadataCopy := requestMetadata.Copy() + + bags, spanCtx := opentelemetry.Extract(ctx, propagator, &metadataCopy) + ctx = baggage.ContextWithBaggage(ctx, bags) + + tr := otel.Tracer(opentelemetry.TraceName) + name, attr := opentelemetry.SpanInfo(info.FullMethod, opentelemetry.PeerFromCtx(ctx)) + + var span trace.Span + ctx, span = tr.Start( + trace.ContextWithRemoteSpanContext(ctx, spanCtx), + name, + trace.WithSpanKind(trace.SpanKindServer), + trace.WithAttributes(attr...), + ) + defer span.End() + + opentelemetry.MessageReceived.Event(ctx, 1, req) + + resp, err := handler(ctx, req) + + if err != nil { + s, _ := status.FromError(err) + span.SetStatus(codes.Error, s.Message()) + span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code())) + opentelemetry.MessageSent.Event(ctx, 1, s.Proto()) + } else { + span.SetAttributes(opentelemetry.StatusCodeAttr(grpc_codes.OK)) + opentelemetry.MessageSent.Event(ctx, 1, resp) + } + + return resp, err + } +} + +func StreamOpenTracingInterceptor() grpc.StreamServerInterceptor { + propagator := otel.GetTextMapPropagator() + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + ctx := ss.Context() + if !opentelemetry.Enabled() { + return handler(srv, opentelemetry.WrapServerStream(ctx, ss)) + } + + requestMetadata, _ := metadata.FromIncomingContext(ctx) + metadataCopy := requestMetadata.Copy() + + bags, spanCtx := opentelemetry.Extract(ctx, propagator, &metadataCopy) + ctx = baggage.ContextWithBaggage(ctx, bags) + + tr := otel.Tracer(opentelemetry.TraceName) + name, attr := opentelemetry.SpanInfo(info.FullMethod, opentelemetry.PeerFromCtx(ctx)) + ctx, span := tr.Start( + trace.ContextWithRemoteSpanContext(ctx, spanCtx), + name, + trace.WithSpanKind(trace.SpanKindServer), + trace.WithAttributes(attr...), + ) + defer span.End() + + err := handler(srv, opentelemetry.WrapServerStream(ctx, ss)) + + if err != nil { + s, _ := status.FromError(err) + span.SetStatus(codes.Error, s.Message()) + span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code())) + } else { + span.SetAttributes(opentelemetry.StatusCodeAttr(grpc_codes.OK)) + } + + return err + } +} diff --git a/zrpc/internal/serverinterceptors/opentracinginterceptor_test.go b/zrpc/internal/serverinterceptors/opentracinginterceptor_test.go new file mode 100644 index 00000000..3feb2a79 --- /dev/null +++ b/zrpc/internal/serverinterceptors/opentracinginterceptor_test.go @@ -0,0 +1,36 @@ +package serverinterceptors + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tal-tech/go-zero/core/opentelemetry" + "google.golang.org/grpc" +) + +func TestUnaryOpenTracingInterceptor_Disable(t *testing.T) { + interceptor := UnaryOpenTracingInterceptor() + _, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{ + FullMethod: "/", + }, func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, nil + }) + assert.Nil(t, err) +} + +func TestUnaryOpenTracingInterceptor_Enabled(t *testing.T) { + opentelemetry.StartAgent(opentelemetry.Config{ + Name: "go-zero-test", + Endpoint: "http://localhost:14268/api/traces", + Batcher: "jaeger", + Sampler: 1.0, + }) + interceptor := UnaryOpenTracingInterceptor() + _, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{ + FullMethod: "/package.TestService.GetUser", + }, func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, nil + }) + assert.Nil(t, err) +}