move opentelemetry into trace package, and refactoring (#996)

* move opentelemetry into trace package, and refactoring

* rename rewritten package names
master
Kevin Wan 3 years ago committed by GitHub
parent 8829c31c0d
commit 58874779e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -5,9 +5,9 @@ import (
"github.com/tal-tech/go-zero/core/load" "github.com/tal-tech/go-zero/core/load"
"github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/opentelemetry"
"github.com/tal-tech/go-zero/core/prometheus" "github.com/tal-tech/go-zero/core/prometheus"
"github.com/tal-tech/go-zero/core/stat" "github.com/tal-tech/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/trace/opentelemetry"
) )
const ( const (

@ -29,6 +29,7 @@ func StartAgent(c Config) {
if len(c.Endpoint) == 0 { if len(c.Endpoint) == 0 {
return return
} }
// Just support jaeger now // Just support jaeger now
if c.Batcher != "jaeger" { if c.Batcher != "jaeger" {
return return
@ -46,9 +47,7 @@ func StartAgent(c Config) {
// Always be sure to batch in production. // Always be sure to batch in production.
tracesdk.WithBatcher(exp), tracesdk.WithBatcher(exp),
// Record information about this application in an Resource. // Record information about this application in an Resource.
tracesdk.WithResource(resource.NewSchemaless( tracesdk.WithResource(resource.NewSchemaless(semconv.ServiceNameKey.String(c.Name))),
semconv.ServiceNameKey.String(c.Name),
)),
) )
otel.SetTracerProvider(tp) otel.SetTracerProvider(tp)

@ -6,7 +6,6 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
) )
const ( const (
@ -14,26 +13,24 @@ const (
errorEvent errorEvent
) )
var _ = proto.Marshal type (
streamEventType int
type streamEventType int streamEvent struct {
type streamEvent struct {
Type streamEventType Type streamEventType
Err error Err error
} }
type clientStream struct { clientStream struct {
grpc.ClientStream grpc.ClientStream
Finished chan error
desc *grpc.StreamDesc desc *grpc.StreamDesc
events chan streamEvent events chan streamEvent
eventsDone chan struct{} eventsDone chan struct{}
Finished chan error
receivedMessageID int receivedMessageID int
sentMessageID int sentMessageID int
} }
)
func (w *clientStream) RecvMsg(m interface{}) error { func (w *clientStream) RecvMsg(m interface{}) error {
err := w.ClientStream.RecvMsg(m) err := w.ClientStream.RecvMsg(m)

@ -8,6 +8,8 @@ import (
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
const messageEvent = "message"
var ( var (
// MessageSent is the type of sent messages. // MessageSent is the type of sent messages.
MessageSent = messageType(RPCMessageTypeSent) MessageSent = messageType(RPCMessageTypeSent)
@ -22,13 +24,13 @@ type messageType attribute.KeyValue
func (m messageType) Event(ctx context.Context, id int, message interface{}) { func (m messageType) Event(ctx context.Context, id int, message interface{}) {
span := trace.SpanFromContext(ctx) span := trace.SpanFromContext(ctx)
if p, ok := message.(proto.Message); ok { if p, ok := message.(proto.Message); ok {
span.AddEvent("message", trace.WithAttributes( span.AddEvent(messageEvent, trace.WithAttributes(
attribute.KeyValue(m), attribute.KeyValue(m),
RPCMessageIDKey.Int(id), RPCMessageIDKey.Int(id),
RPCMessageUncompressedSizeKey.Int(proto.Size(p)), RPCMessageUncompressedSizeKey.Int(proto.Size(p)),
)) ))
} else { } else {
span.AddEvent("message", trace.WithAttributes( span.AddEvent(messageEvent, trace.WithAttributes(
attribute.KeyValue(m), attribute.KeyValue(m),
RPCMessageIDKey.Int(id), RPCMessageIDKey.Int(id),
)) ))

@ -22,7 +22,6 @@ func (w *serverStream) Context() context.Context {
func (w *serverStream) RecvMsg(m interface{}) error { func (w *serverStream) RecvMsg(m interface{}) error {
err := w.ServerStream.RecvMsg(m) err := w.ServerStream.RecvMsg(m)
if err == nil { if err == nil {
w.receivedMessageID++ w.receivedMessageID++
MessageReceived.Event(w.Context(), w.receivedMessageID, m) MessageReceived.Event(w.Context(), w.receivedMessageID, m)
@ -33,7 +32,6 @@ func (w *serverStream) RecvMsg(m interface{}) error {
func (w *serverStream) SendMsg(m interface{}) error { func (w *serverStream) SendMsg(m interface{}) error {
err := w.ServerStream.SendMsg(m) err := w.ServerStream.SendMsg(m)
w.sentMessageID++ w.sentMessageID++
MessageSent.Event(w.Context(), w.sentMessageID, m) MessageSent.Event(w.Context(), w.sentMessageID, m)

@ -9,18 +9,19 @@ import (
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
) )
// assert that metadataSupplier implements the TextMapCarrier interface
var _ propagation.TextMapCarrier = new(metadataSupplier)
type metadataSupplier struct { type metadataSupplier struct {
metadata *metadata.MD metadata *metadata.MD
} }
// assert that metadataSupplier implements the TextMapCarrier interface
var _ propagation.TextMapCarrier = &metadataSupplier{}
func (s *metadataSupplier) Get(key string) string { func (s *metadataSupplier) Get(key string) string {
values := s.metadata.Get(key) values := s.metadata.Get(key)
if len(values) == 0 { if len(values) == 0 {
return "" return ""
} }
return values[0] return values[0]
} }
@ -33,6 +34,7 @@ func (s *metadataSupplier) Keys() []string {
for key := range *s.metadata { for key := range *s.metadata {
out = append(out, key) out = append(out, key)
} }
return out return out
} }

@ -10,6 +10,8 @@ import (
"google.golang.org/grpc/peer" "google.golang.org/grpc/peer"
) )
const localhost = "127.0.0.1"
// PeerFromCtx returns the peer from ctx. // PeerFromCtx returns the peer from ctx.
func PeerFromCtx(ctx context.Context) string { func PeerFromCtx(ctx context.Context) string {
p, ok := peer.FromContext(ctx) p, ok := peer.FromContext(ctx)
@ -56,8 +58,8 @@ func PeerAttr(addr string) []attribute.KeyValue {
return []attribute.KeyValue(nil) return []attribute.KeyValue(nil)
} }
if host == "" { if len(host) == 0 {
host = "127.0.0.1" host = localhost
} }
return []attribute.KeyValue{ return []attribute.KeyValue{

@ -3,7 +3,7 @@ package handler
import ( import (
"net/http" "net/http"
"github.com/tal-tech/go-zero/core/opentelemetry" "github.com/tal-tech/go-zero/core/trace/opentelemetry"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0" semconv "go.opentelemetry.io/otel/semconv/v1.4.0"

@ -7,7 +7,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/opentelemetry" "github.com/tal-tech/go-zero/core/trace/opentelemetry"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"

@ -3,7 +3,7 @@ package clientinterceptors
import ( import (
"context" "context"
"github.com/tal-tech/go-zero/core/opentelemetry" "github.com/tal-tech/go-zero/core/trace/opentelemetry"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"

@ -5,7 +5,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/opentelemetry" "github.com/tal-tech/go-zero/core/trace/opentelemetry"
"google.golang.org/grpc" "google.golang.org/grpc"
) )

@ -3,7 +3,7 @@ package serverinterceptors
import ( import (
"context" "context"
"github.com/tal-tech/go-zero/core/opentelemetry" "github.com/tal-tech/go-zero/core/trace/opentelemetry"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/baggage" "go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"

@ -5,7 +5,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/opentelemetry" "github.com/tal-tech/go-zero/core/trace/opentelemetry"
"google.golang.org/grpc" "google.golang.org/grpc"
) )

Loading…
Cancel
Save