diff --git a/core/logx/logs_test.go b/core/logx/logs_test.go index a0f20710..71038ca3 100644 --- a/core/logx/logs_test.go +++ b/core/logx/logs_test.go @@ -462,7 +462,7 @@ func TestStructedLogWithDuration(t *testing.T) { WithDuration(time.Second).Info(message) var entry logEntry - if err := json.Unmarshal([]byte(w.builder.String()), &entry); err != nil { + if err := json.Unmarshal([]byte(w.String()), &entry); err != nil { t.Error(err) } assert.Equal(t, levelInfo, entry.Level) @@ -515,7 +515,7 @@ func TestErrorfWithWrappedError(t *testing.T) { defer writer.Store(old) Errorf("hello %w", errors.New(message)) - assert.True(t, strings.Contains(w.builder.String(), "hello there")) + assert.True(t, strings.Contains(w.String(), "hello there")) } func TestMustNil(t *testing.T) { diff --git a/core/logx/syslog_test.go b/core/logx/syslog_test.go index 780f346d..7e389093 100644 --- a/core/logx/syslog_test.go +++ b/core/logx/syslog_test.go @@ -38,7 +38,7 @@ func captureOutput(f func()) string { f() SetLevel(prevLevel) - return w.builder.String() + return w.String() } func getContent(jsonStr string) string { diff --git a/zrpc/client.go b/zrpc/client.go index 699486a4..5240a479 100644 --- a/zrpc/client.go +++ b/zrpc/client.go @@ -15,6 +15,8 @@ var ( WithDialOption = internal.WithDialOption // WithNonBlock sets the dialing to be nonblock. WithNonBlock = internal.WithNonBlock + // WithStreamClientInterceptor is an alias of internal.WithStreamClientInterceptor. + WithStreamClientInterceptor = internal.WithStreamClientInterceptor // WithTimeout is an alias of internal.WithTimeout. WithTimeout = internal.WithTimeout // WithTransportCredentials return a func to make the gRPC calls secured with given credentials. diff --git a/zrpc/internal/client.go b/zrpc/internal/client.go index 53a920df..6b3151bf 100644 --- a/zrpc/internal/client.go +++ b/zrpc/internal/client.go @@ -131,6 +131,13 @@ func WithNonBlock() ClientOption { } } +// WithStreamClientInterceptor returns a func to customize a ClientOptions with given interceptor. +func WithStreamClientInterceptor(interceptor grpc.StreamClientInterceptor) ClientOption { + return func(options *ClientOptions) { + options.DialOptions = append(options.DialOptions, WithStreamClientInterceptors(interceptor)) + } +} + // WithTimeout returns a func to customize a ClientOptions with given timeout. func WithTimeout(timeout time.Duration) ClientOption { return func(options *ClientOptions) { diff --git a/zrpc/internal/client_test.go b/zrpc/internal/client_test.go index 5352f7ef..487049f6 100644 --- a/zrpc/internal/client_test.go +++ b/zrpc/internal/client_test.go @@ -31,6 +31,17 @@ func TestWithNonBlock(t *testing.T) { assert.True(t, options.NonBlock) } +func TestWithStreamClientInterceptor(t *testing.T) { + var options ClientOptions + opt := WithStreamClientInterceptor(func(ctx context.Context, desc *grpc.StreamDesc, + cc *grpc.ClientConn, method string, streamer grpc.Streamer, + opts ...grpc.CallOption) (grpc.ClientStream, error) { + return nil, nil + }) + opt(&options) + assert.Equal(t, 1, len(options.DialOptions)) +} + func TestWithTransportCredentials(t *testing.T) { var options ClientOptions opt := WithTransportCredentials(nil) diff --git a/zrpc/internal/rpcserver.go b/zrpc/internal/rpcserver.go index a05d4565..be378deb 100644 --- a/zrpc/internal/rpcserver.go +++ b/zrpc/internal/rpcserver.go @@ -71,7 +71,7 @@ func (s *rpcServer) Start(register RegisterFn) error { WithStreamServerInterceptors(streamInterceptors...)) server := grpc.NewServer(options...) register(server) - // we need to make sure all others are wrapped up + // we need to make sure all others are wrapped up, // so we do graceful stop at shutdown phase instead of wrap up phase waitForCalled := proc.AddWrapUpListener(func() { server.GracefulStop()