You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-zero/core/stores/redis/hook.go

181 lines
4.0 KiB
Go

package redis
import (
"context"
"strings"
"time"
red "github.com/go-redis/redis/v8"
"github.com/zeromicro/go-zero/core/errorx"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mapping"
"github.com/zeromicro/go-zero/core/timex"
"github.com/zeromicro/go-zero/core/trace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
oteltrace "go.opentelemetry.io/otel/trace"
)
// spanName is the span name of the redis calls.
const spanName = "redis"
var (
startTimeKey = contextKey("startTime")
durationHook = hook{tracer: otel.GetTracerProvider().Tracer(trace.TraceName)}
redisCmdsAttributeKey = attribute.Key("redis.cmds")
)
type (
contextKey string
hook struct {
tracer oteltrace.Tracer
}
)
func (h hook) BeforeProcess(ctx context.Context, cmd red.Cmder) (context.Context, error) {
return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now()), cmd), nil
}
func (h hook) AfterProcess(ctx context.Context, cmd red.Cmder) error {
err := cmd.Err()
h.endSpan(ctx, err)
val := ctx.Value(startTimeKey)
if val == nil {
return nil
}
start, ok := val.(time.Duration)
if !ok {
return nil
}
duration := timex.Since(start)
if duration > slowThreshold.Load() {
logDuration(ctx, cmd, duration)
}
metricReqDur.Observe(int64(duration/time.Millisecond), cmd.Name())
if msg := errFormat(err); len(msg) > 0 {
metricReqErr.Inc(cmd.Name(), msg)
}
return nil
}
func (h hook) BeforeProcessPipeline(ctx context.Context, cmds []red.Cmder) (context.Context, error) {
if len(cmds) == 0 {
return ctx, nil
}
return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now()), cmds...), nil
}
func (h hook) AfterProcessPipeline(ctx context.Context, cmds []red.Cmder) error {
if len(cmds) == 0 {
return nil
}
batchError := errorx.BatchError{}
for _, cmd := range cmds {
err := cmd.Err()
if err == nil {
continue
}
batchError.Add(err)
}
h.endSpan(ctx, batchError.Err())
val := ctx.Value(startTimeKey)
if val == nil {
return nil
}
start, ok := val.(time.Duration)
if !ok {
return nil
}
duration := timex.Since(start)
if duration > slowThreshold.Load()*time.Duration(len(cmds)) {
logDuration(ctx, cmds[0], duration)
}
metricReqDur.Observe(int64(duration/time.Millisecond), "Pipeline")
if msg := errFormat(batchError.Err()); len(msg) > 0 {
metricReqErr.Inc("Pipeline", msg)
}
return nil
}
func errFormat(err error) string {
if err == nil || err == red.Nil {
return ""
}
es := err.Error()
switch {
case strings.HasPrefix(es, "read"):
return "read timeout"
case strings.HasPrefix(es, "dial"):
if strings.Contains(es, "connection refused") {
return "connection refused"
}
return "dial timeout"
case strings.HasPrefix(es, "write"):
return "write timeout"
case strings.Contains(es, "EOF"):
return "eof"
case strings.Contains(es, "reset"):
return "reset"
case strings.Contains(es, "broken"):
return "broken pipe"
case strings.Contains(es, "breaker"):
return "breaker"
default:
return "unexpected error"
}
}
func logDuration(ctx context.Context, cmd red.Cmder, duration time.Duration) {
var buf strings.Builder
for i, arg := range cmd.Args() {
if i > 0 {
buf.WriteByte(' ')
}
buf.WriteString(mapping.Repr(arg))
}
logx.WithContext(ctx).WithDuration(duration).Slowf("[REDIS] slowcall on executing: %s", buf.String())
}
func (h hook) startSpan(ctx context.Context, cmds ...red.Cmder) context.Context {
ctx, span := h.tracer.Start(ctx,
spanName,
oteltrace.WithSpanKind(oteltrace.SpanKindClient),
)
cmdStrs := make([]string, 0, len(cmds))
for _, cmd := range cmds {
cmdStrs = append(cmdStrs, cmd.Name())
}
span.SetAttributes(redisCmdsAttributeKey.StringSlice(cmdStrs))
return ctx
}
func (h hook) endSpan(ctx context.Context, err error) {
span := oteltrace.SpanFromContext(ctx)
defer span.End()
if err == nil || err == red.Nil {
span.SetStatus(codes.Ok, "")
return
}
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}