package opentelemetry import ( "context" "io" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) const ( receiveEndEvent streamEventType = iota errorEvent ) type ( streamEventType int streamEvent struct { Type streamEventType Err error } clientStream struct { grpc.ClientStream Finished chan error desc *grpc.StreamDesc events chan streamEvent eventsDone chan struct{} receivedMessageID int sentMessageID int } ) func (w *clientStream) RecvMsg(m interface{}) error { err := w.ClientStream.RecvMsg(m) if err == nil && !w.desc.ServerStreams { w.sendStreamEvent(receiveEndEvent, nil) } else if err == io.EOF { w.sendStreamEvent(receiveEndEvent, nil) } else if err != nil { w.sendStreamEvent(errorEvent, err) } else { w.receivedMessageID++ MessageReceived.Event(w.Context(), w.receivedMessageID, m) } return err } func (w *clientStream) SendMsg(m interface{}) error { err := w.ClientStream.SendMsg(m) w.sentMessageID++ MessageSent.Event(w.Context(), w.sentMessageID, m) if err != nil { w.sendStreamEvent(errorEvent, err) } return err } func (w *clientStream) Header() (metadata.MD, error) { md, err := w.ClientStream.Header() if err != nil { w.sendStreamEvent(errorEvent, err) } return md, err } func (w *clientStream) CloseSend() error { err := w.ClientStream.CloseSend() if err != nil { w.sendStreamEvent(errorEvent, err) } return err } func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) { select { case <-w.eventsDone: case w.events <- streamEvent{Type: eventType, Err: err}: } } // WrapClientStream wraps s with given ctx and desc. func WrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream { events := make(chan streamEvent) eventsDone := make(chan struct{}) finished := make(chan error) go func() { defer close(eventsDone) for { select { case event := <-events: switch event.Type { case receiveEndEvent: finished <- nil return case errorEvent: finished <- event.Err return } case <-ctx.Done(): finished <- ctx.Err() return } } }() return &clientStream{ ClientStream: s, desc: desc, events: events, eventsDone: eventsDone, Finished: finished, } }