You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-zero/core/opentelemetry/clientstream.go

125 lines
2.3 KiB
Go

package opentelemetry
import (
"context"
"io"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
)
const (
receiveEndEvent streamEventType = iota
errorEvent
)
var _ = proto.Marshal
type streamEventType int
type streamEvent struct {
Type streamEventType
Err error
}
type clientStream struct {
grpc.ClientStream
desc *grpc.StreamDesc
events chan streamEvent
eventsDone chan struct{}
Finished chan error
receivedMessageID int
sentMessageID int
}
func (w *clientStream) RecvMsg(m interface{}) error {
err := w.ClientStream.RecvMsg(m)
if err == nil && !w.desc.ServerStreams {
w.sendStreamEvent(receiveEndEvent, nil)
} else if err == io.EOF {
w.sendStreamEvent(receiveEndEvent, nil)
} else if err != nil {
w.sendStreamEvent(errorEvent, err)
} else {
w.receivedMessageID++
MessageReceived.Event(w.Context(), w.receivedMessageID, m)
}
return err
}
func (w *clientStream) SendMsg(m interface{}) error {
err := w.ClientStream.SendMsg(m)
w.sentMessageID++
MessageSent.Event(w.Context(), w.sentMessageID, m)
if err != nil {
w.sendStreamEvent(errorEvent, err)
}
return err
}
func (w *clientStream) Header() (metadata.MD, error) {
md, err := w.ClientStream.Header()
if err != nil {
w.sendStreamEvent(errorEvent, err)
}
return md, err
}
func (w *clientStream) CloseSend() error {
err := w.ClientStream.CloseSend()
if err != nil {
w.sendStreamEvent(errorEvent, err)
}
return err
}
func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
select {
case <-w.eventsDone:
case w.events <- streamEvent{Type: eventType, Err: err}:
}
}
// WrapClientStream wraps s with given ctx and desc.
func WrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream {
events := make(chan streamEvent)
eventsDone := make(chan struct{})
finished := make(chan error)
go func() {
defer close(eventsDone)
for {
select {
case event := <-events:
switch event.Type {
case receiveEndEvent:
finished <- nil
return
case errorEvent:
finished <- event.Err
return
}
case <-ctx.Done():
finished <- ctx.Err()
return
}
}
}()
return &clientStream{
ClientStream: s,
desc: desc,
events: events,
eventsDone: eventsDone,
Finished: finished,
}
}