diff --git a/core/opentelemetry/attributes.go b/core/opentelemetry/attributes.go index 0c5f5483..45bfd0ea 100644 --- a/core/opentelemetry/attributes.go +++ b/core/opentelemetry/attributes.go @@ -3,43 +3,38 @@ package opentelemetry import ( "go.opentelemetry.io/otel/attribute" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" - grpc_codes "google.golang.org/grpc/codes" + gcodes "google.golang.org/grpc/codes" ) const ( // GRPCStatusCodeKey is convention for numeric status code of a gRPC request. GRPCStatusCodeKey = attribute.Key("rpc.grpc.status_code") - - // Name of message transmitted or received. + // RPCNameKey is the name of message transmitted or received. RPCNameKey = attribute.Key("name") - - // Type of message transmitted or received. + // RPCMessageTypeKey is the type of message transmitted or received. RPCMessageTypeKey = attribute.Key("message.type") - - // Identifier of message transmitted or received. + // RPCMessageIDKey is the identifier of message transmitted or received. RPCMessageIDKey = attribute.Key("message.id") - - // The compressed size of the message transmitted or received in bytes. + // RPCMessageCompressedSizeKey is the compressed size of the message transmitted or received in bytes. RPCMessageCompressedSizeKey = attribute.Key("message.compressed_size") - - // The uncompressed size of the message transmitted or received in - // bytes. + // RPCMessageUncompressedSizeKey is the uncompressed size of the message + // transmitted or received in bytes. RPCMessageUncompressedSizeKey = attribute.Key("message.uncompressed_size") ) // Semantic conventions for common RPC attributes. var ( - // Semantic convention for gRPC as the remoting system. + // RPCSystemGRPC is the semantic convention for gRPC as the remoting system. RPCSystemGRPC = semconv.RPCSystemKey.String("grpc") - - // Semantic convention for a message named message. + // RPCNameMessage is the semantic convention for a message named message. RPCNameMessage = RPCNameKey.String("message") - - // Semantic conventions for RPC message types. - RPCMessageTypeSent = RPCMessageTypeKey.String("SENT") + // RPCMessageTypeSent is the semantic conventions for sent RPC message types. + RPCMessageTypeSent = RPCMessageTypeKey.String("SENT") + // RPCMessageTypeReceived is the semantic conventions for the received RPC message types. RPCMessageTypeReceived = RPCMessageTypeKey.String("RECEIVED") ) -func StatusCodeAttr(c grpc_codes.Code) attribute.KeyValue { +// StatusCodeAttr returns a attribute.KeyValue that represents the give c. +func StatusCodeAttr(c gcodes.Code) attribute.KeyValue { return GRPCStatusCodeKey.Int64(int64(c)) } diff --git a/core/opentelemetry/clientstream.go b/core/opentelemetry/clientstream.go index a5a01095..408527a6 100644 --- a/core/opentelemetry/clientstream.go +++ b/core/opentelemetry/clientstream.go @@ -9,6 +9,13 @@ import ( "google.golang.org/protobuf/proto" ) +const ( + receiveEndEvent streamEventType = iota + errorEvent +) + +var _ = proto.Marshal + type streamEventType int type streamEvent struct { @@ -16,11 +23,6 @@ type streamEvent struct { Err error } -const ( - receiveEndEvent streamEventType = iota - errorEvent -) - type clientStream struct { grpc.ClientStream @@ -33,11 +35,8 @@ type clientStream struct { sentMessageID int } -var _ = proto.Marshal - func (w *clientStream) RecvMsg(m interface{}) error { err := w.ClientStream.RecvMsg(m) - if err == nil && !w.desc.ServerStreams { w.sendStreamEvent(receiveEndEvent, nil) } else if err == io.EOF { @@ -54,10 +53,8 @@ func (w *clientStream) RecvMsg(m interface{}) error { func (w *clientStream) SendMsg(m interface{}) error { err := w.ClientStream.SendMsg(m) - w.sentMessageID++ MessageSent.Event(w.Context(), w.sentMessageID, m) - if err != nil { w.sendStreamEvent(errorEvent, err) } @@ -90,6 +87,7 @@ func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) { } } +// WrapClientStream wraps s with given ctx and desc. func WrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream { events := make(chan streamEvent) eventsDone := make(chan struct{}) diff --git a/core/opentelemetry/config.go b/core/opentelemetry/config.go index a34d79fc..0d3b2689 100644 --- a/core/opentelemetry/config.go +++ b/core/opentelemetry/config.go @@ -1,5 +1,6 @@ package opentelemetry +// TraceName represents the tracing name. const TraceName = "go-zero" // A Config is a opentelemetry config. diff --git a/core/opentelemetry/message.go b/core/opentelemetry/message.go index fa24ae36..1199db50 100644 --- a/core/opentelemetry/message.go +++ b/core/opentelemetry/message.go @@ -9,7 +9,9 @@ import ( ) var ( - MessageSent = messageType(RPCMessageTypeSent) + // MessageSent is the type of sent messages. + MessageSent = messageType(RPCMessageTypeSent) + // MessageReceived is the type of received messages. MessageReceived = messageType(RPCMessageTypeReceived) ) diff --git a/core/opentelemetry/serverstream.go b/core/opentelemetry/serverstream.go index 7ad5e5b5..8f740a5a 100644 --- a/core/opentelemetry/serverstream.go +++ b/core/opentelemetry/serverstream.go @@ -40,6 +40,7 @@ func (w *serverStream) SendMsg(m interface{}) error { return err } +// WrapServerStream wraps the given grpc.ServerStream with the given context. func WrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream { return &serverStream{ ServerStream: ss, diff --git a/core/opentelemetry/tracer.go b/core/opentelemetry/tracer.go index ea0dfb15..2328f2e9 100644 --- a/core/opentelemetry/tracer.go +++ b/core/opentelemetry/tracer.go @@ -36,12 +36,14 @@ func (s *metadataSupplier) Keys() []string { return out } +// Inject injects the metadata into ctx. func Inject(ctx context.Context, p propagation.TextMapPropagator, metadata *metadata.MD) { p.Inject(ctx, &metadataSupplier{ metadata: metadata, }) } +// Extract extracts the metadata from ctx. func Extract(ctx context.Context, p propagation.TextMapPropagator, metadata *metadata.MD) (baggage.Baggage, trace.SpanContext) { ctx = p.Extract(ctx, &metadataSupplier{ metadata: metadata, diff --git a/core/opentelemetry/utils.go b/core/opentelemetry/utils.go index 56e27af3..79f56782 100644 --- a/core/opentelemetry/utils.go +++ b/core/opentelemetry/utils.go @@ -10,14 +10,17 @@ import ( "google.golang.org/grpc/peer" ) +// PeerFromCtx returns the peer from ctx. func PeerFromCtx(ctx context.Context) string { p, ok := peer.FromContext(ctx) if !ok { return "" } + return p.Addr.String() } +// SpanInfo returns the span info. func SpanInfo(fullMethod, peerAddress string) (string, []attribute.KeyValue) { attrs := []attribute.KeyValue{RPCSystemGRPC} name, mAttrs := ParseFullMethod(fullMethod) @@ -26,6 +29,7 @@ func SpanInfo(fullMethod, peerAddress string) (string, []attribute.KeyValue) { return name, attrs } +// ParseFullMethod returns the method name and attributes. func ParseFullMethod(fullMethod string) (string, []attribute.KeyValue) { name := strings.TrimLeft(fullMethod, "/") parts := strings.SplitN(name, "/", 2) @@ -41,9 +45,11 @@ func ParseFullMethod(fullMethod string) (string, []attribute.KeyValue) { if method := parts[1]; method != "" { attrs = append(attrs, semconv.RPCMethodKey.String(method)) } + return name, attrs } +// PeerAttr returns the peer attributes. func PeerAttr(addr string) []attribute.KeyValue { host, port, err := net.SplitHostPort(addr) if err != nil { diff --git a/core/proc/signals.go b/core/proc/signals.go index 25c75f46..80ef4bdc 100644 --- a/core/proc/signals.go +++ b/core/proc/signals.go @@ -51,6 +51,7 @@ func init() { }() } +// Done returns the channel that notifies the process quitting. func Done() <-chan struct{} { return done } diff --git a/core/trace/constants.go b/core/trace/constants.go index 332988d9..33be4a19 100644 --- a/core/trace/constants.go +++ b/core/trace/constants.go @@ -1,6 +1,7 @@ package trace const ( + // TraceIdKey is the trace id header. TraceIdKey = "X-Trace-ID" spanIdKey = "X-Span-ID" diff --git a/rest/handler/otelhandler.go b/rest/handler/otelhandler.go index 8872974f..3dc00341 100644 --- a/rest/handler/otelhandler.go +++ b/rest/handler/otelhandler.go @@ -10,6 +10,7 @@ import ( oteltrace "go.opentelemetry.io/otel/trace" ) +// OtelHandler return a middleware that process the opentelemetry. func OtelHandler(path string) func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { if !opentelemetry.Enabled() { diff --git a/zrpc/internal/clientinterceptors/opentracinginterceptor.go b/zrpc/internal/clientinterceptors/opentracinginterceptor.go index 13f1ea63..657b286a 100644 --- a/zrpc/internal/clientinterceptors/opentracinginterceptor.go +++ b/zrpc/internal/clientinterceptors/opentracinginterceptor.go @@ -13,9 +13,11 @@ import ( "google.golang.org/grpc/status" ) +// OpenTracingInterceptor returns a grpc.UnaryClientInterceptor for opentelemetry. func OpenTracingInterceptor() grpc.UnaryClientInterceptor { propagator := otel.GetTextMapPropagator() - return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { if !opentelemetry.Enabled() { return invoker(ctx, method, req, reply, cc, opts...) } @@ -24,13 +26,8 @@ func OpenTracingInterceptor() grpc.UnaryClientInterceptor { metadataCopy := requestMetadata.Copy() tr := otel.Tracer(opentelemetry.TraceName) name, attr := opentelemetry.SpanInfo(method, cc.Target()) - - var span trace.Span - ctx, span = tr.Start(ctx, - name, - trace.WithSpanKind(trace.SpanKindClient), - trace.WithAttributes(attr...), - ) + ctx, span := tr.Start(ctx, name, trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes(attr...)) defer span.End() opentelemetry.Inject(ctx, propagator, &metadataCopy) @@ -50,30 +47,23 @@ func OpenTracingInterceptor() grpc.UnaryClientInterceptor { } } +// StreamOpenTracingInterceptor returns a grpc.StreamClientInterceptor for opentelemetry. func StreamOpenTracingInterceptor() grpc.StreamClientInterceptor { propagator := otel.GetTextMapPropagator() - return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, + streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { if !opentelemetry.Enabled() { return streamer(ctx, desc, cc, method, opts...) } requestMetadata, _ := metadata.FromOutgoingContext(ctx) metadataCopy := requestMetadata.Copy() - tr := otel.Tracer(opentelemetry.TraceName) - name, attr := opentelemetry.SpanInfo(method, cc.Target()) - var span trace.Span - ctx, span = tr.Start( - ctx, - name, - trace.WithSpanKind(trace.SpanKindClient), - trace.WithAttributes(attr...), - ) - + ctx, span := tr.Start(ctx, name, trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes(attr...)) opentelemetry.Inject(ctx, propagator, &metadataCopy) ctx = metadata.NewOutgoingContext(ctx, metadataCopy) - s, err := streamer(ctx, desc, cc, method, opts...) if err != nil { grpcStatus, _ := status.FromError(err) @@ -82,12 +72,11 @@ func StreamOpenTracingInterceptor() grpc.StreamClientInterceptor { span.End() return s, err } + stream := opentelemetry.WrapClientStream(ctx, s, desc) go func() { - err := <-stream.Finished - - if err != nil { + if err := <-stream.Finished; err != nil { s, _ := status.FromError(err) span.SetStatus(codes.Error, s.Message()) span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code())) diff --git a/zrpc/internal/resolver/kube/eventhandler.go b/zrpc/internal/resolver/kube/eventhandler.go index 977eadca..ae7e6d91 100644 --- a/zrpc/internal/resolver/kube/eventhandler.go +++ b/zrpc/internal/resolver/kube/eventhandler.go @@ -8,12 +8,14 @@ import ( v1 "k8s.io/api/core/v1" ) +// EventHandler is ResourceEventHandler implementation. type EventHandler struct { update func([]string) endpoints map[string]lang.PlaceholderType lock sync.Mutex } +// NewEventHandler returns an EventHandler. func NewEventHandler(update func([]string)) *EventHandler { return &EventHandler{ update: update, @@ -21,6 +23,7 @@ func NewEventHandler(update func([]string)) *EventHandler { } } +// OnAdd handles the endpoints add events. func (h *EventHandler) OnAdd(obj interface{}) { endpoints, ok := obj.(*v1.Endpoints) if !ok { @@ -46,6 +49,7 @@ func (h *EventHandler) OnAdd(obj interface{}) { } } +// OnDelete handles the endpoints delete events. func (h *EventHandler) OnDelete(obj interface{}) { endpoints, ok := obj.(*v1.Endpoints) if !ok { @@ -71,6 +75,7 @@ func (h *EventHandler) OnDelete(obj interface{}) { } } +// OnUpdate handles the endpoints update events. func (h *EventHandler) OnUpdate(oldObj, newObj interface{}) { oldEndpoints, ok := oldObj.(*v1.Endpoints) if !ok { @@ -91,6 +96,7 @@ func (h *EventHandler) OnUpdate(oldObj, newObj interface{}) { h.Update(newEndpoints) } +// Update updates the endpoints. func (h *EventHandler) Update(endpoints *v1.Endpoints) { h.lock.Lock() defer h.lock.Unlock() diff --git a/zrpc/internal/resolver/kube/targetparser.go b/zrpc/internal/resolver/kube/targetparser.go index 569fdcd5..66fc6e51 100644 --- a/zrpc/internal/resolver/kube/targetparser.go +++ b/zrpc/internal/resolver/kube/targetparser.go @@ -15,12 +15,14 @@ const ( var emptyService Service +// Service represents a service with namespace, name and port. type Service struct { Namespace string Name string Port int } +// ParseTarget parses the resolver.Target. func ParseTarget(target resolver.Target) (Service, error) { var service Service service.Namespace = target.Authority diff --git a/zrpc/internal/serverinterceptors/opentracinginterceptor.go b/zrpc/internal/serverinterceptors/opentracinginterceptor.go index 5d8b9ae1..904dd634 100644 --- a/zrpc/internal/serverinterceptors/opentracinginterceptor.go +++ b/zrpc/internal/serverinterceptors/opentracinginterceptor.go @@ -14,6 +14,7 @@ import ( "google.golang.org/grpc/status" ) +// UnaryOpenTracingInterceptor returns a grpc.UnaryServerInterceptor for opentelemetry. func UnaryOpenTracingInterceptor() grpc.UnaryServerInterceptor { propagator := otel.GetTextMapPropagator() return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, @@ -24,20 +25,12 @@ func UnaryOpenTracingInterceptor() grpc.UnaryServerInterceptor { requestMetadata, _ := metadata.FromIncomingContext(ctx) metadataCopy := requestMetadata.Copy() - bags, spanCtx := opentelemetry.Extract(ctx, propagator, &metadataCopy) ctx = baggage.ContextWithBaggage(ctx, bags) - tr := otel.Tracer(opentelemetry.TraceName) name, attr := opentelemetry.SpanInfo(info.FullMethod, opentelemetry.PeerFromCtx(ctx)) - - var span trace.Span - ctx, span = tr.Start( - trace.ContextWithRemoteSpanContext(ctx, spanCtx), - name, - trace.WithSpanKind(trace.SpanKindServer), - trace.WithAttributes(attr...), - ) + ctx, span := tr.Start(trace.ContextWithRemoteSpanContext(ctx, spanCtx), name, + trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attr...)) defer span.End() opentelemetry.MessageReceived.Event(ctx, 1, req) @@ -57,6 +50,7 @@ func UnaryOpenTracingInterceptor() grpc.UnaryServerInterceptor { } } +// StreamOpenTracingInterceptor returns a grpc.StreamServerInterceptor for opentelemetry. func StreamOpenTracingInterceptor() grpc.StreamServerInterceptor { propagator := otel.GetTextMapPropagator() return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { @@ -67,30 +61,22 @@ func StreamOpenTracingInterceptor() grpc.StreamServerInterceptor { requestMetadata, _ := metadata.FromIncomingContext(ctx) metadataCopy := requestMetadata.Copy() - bags, spanCtx := opentelemetry.Extract(ctx, propagator, &metadataCopy) ctx = baggage.ContextWithBaggage(ctx, bags) - tr := otel.Tracer(opentelemetry.TraceName) name, attr := opentelemetry.SpanInfo(info.FullMethod, opentelemetry.PeerFromCtx(ctx)) - ctx, span := tr.Start( - trace.ContextWithRemoteSpanContext(ctx, spanCtx), - name, - trace.WithSpanKind(trace.SpanKindServer), - trace.WithAttributes(attr...), - ) + ctx, span := tr.Start(trace.ContextWithRemoteSpanContext(ctx, spanCtx), name, + trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attr...)) defer span.End() - err := handler(srv, opentelemetry.WrapServerStream(ctx, ss)) - - if err != nil { + if err := handler(srv, opentelemetry.WrapServerStream(ctx, ss)); err != nil { s, _ := status.FromError(err) span.SetStatus(codes.Error, s.Message()) span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code())) - } else { - span.SetAttributes(opentelemetry.StatusCodeAttr(gcodes.OK)) + return err } - return err + span.SetAttributes(opentelemetry.StatusCodeAttr(gcodes.OK)) + return nil } }