diff --git a/core/stores/redis/redis.go b/core/stores/redis/redis.go index f3e2a7a2..b31eb874 100644 --- a/core/stores/redis/redis.go +++ b/core/stores/redis/redis.go @@ -1298,8 +1298,9 @@ func (s *Redis) Pipelined(fn func(Pipeliner) error) error { } // PipelinedCtx lets fn execute pipelined commands. -func (s *Redis) PipelinedCtx(ctx context.Context, fn func(Pipeliner) error) (err error) { - err = s.brk.DoWithAcceptable(func() error { +// Results need to be retrieved by calling Pipeline.Exec() +func (s *Redis) PipelinedCtx(ctx context.Context, fn func(Pipeliner) error) error { + return s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s) if err != nil { return err @@ -1308,8 +1309,6 @@ func (s *Redis) PipelinedCtx(ctx context.Context, fn func(Pipeliner) error) (err _, err = conn.Pipelined(ctx, fn) return err }, acceptable) - - return } // Rpop is the implementation of redis rpop command. diff --git a/rest/handler/authhandler.go b/rest/handler/authhandler.go index 9099e62b..c45ee81a 100644 --- a/rest/handler/authhandler.go +++ b/rest/handler/authhandler.go @@ -41,7 +41,7 @@ type ( AuthorizeOption func(opts *AuthorizeOptions) ) -// Authorize returns an authorize middleware. +// Authorize returns an authorization middleware. func Authorize(secret string, opts ...AuthorizeOption) func(http.Handler) http.Handler { var authOpts AuthorizeOptions for _, opt := range opts { diff --git a/zrpc/client_test.go b/zrpc/client_test.go index 0506fbcd..9b131902 100644 --- a/zrpc/client_test.go +++ b/zrpc/client_test.go @@ -14,6 +14,7 @@ import ( "github.com/zeromicro/go-zero/zrpc/internal/mock" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" "google.golang.org/grpc/test/bufconn" ) @@ -103,14 +104,15 @@ func TestDepositServer_Deposit(t *testing.T) { Token: "bar", Timeout: 1000, }, - WithDialOption(grpc.WithInsecure()), + WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), WithDialOption(grpc.WithContextDialer(dialer())), WithUnaryClientInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { return invoker(ctx, method, req, reply, cc, opts...) }), ) - targetClient, err := NewClientWithTarget("foo", WithDialOption(grpc.WithInsecure()), + targetClient, err := NewClientWithTarget("foo", + WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), WithDialOption(grpc.WithContextDialer(dialer())), WithUnaryClientInterceptor( func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { @@ -162,7 +164,7 @@ func TestNewClientWithError(t *testing.T) { Token: "bar", Timeout: 1000, }, - WithDialOption(grpc.WithInsecure()), + WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), WithDialOption(grpc.WithContextDialer(dialer())), WithUnaryClientInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { @@ -181,7 +183,7 @@ func TestNewClientWithError(t *testing.T) { Token: "bar", Timeout: 1, }, - WithDialOption(grpc.WithInsecure()), + WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), WithDialOption(grpc.WithContextDialer(dialer())), WithUnaryClientInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { diff --git a/zrpc/proxy_test.go b/zrpc/proxy_test.go index 935914a4..e562eba8 100644 --- a/zrpc/proxy_test.go +++ b/zrpc/proxy_test.go @@ -9,6 +9,7 @@ import ( "github.com/zeromicro/go-zero/zrpc/internal/mock" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" ) @@ -36,7 +37,7 @@ func TestProxy(t *testing.T) { }, } - proxy := NewProxy("foo", WithDialOption(grpc.WithInsecure()), + proxy := NewProxy("foo", WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), WithDialOption(grpc.WithContextDialer(dialer()))) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -66,7 +67,8 @@ func TestProxy(t *testing.T) { } func TestRpcProxy_TakeConnNewClientFailed(t *testing.T) { - proxy := NewProxy("foo", WithDialOption(grpc.WithInsecure()), WithDialOption(grpc.WithBlock())) + proxy := NewProxy("foo", WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + WithDialOption(grpc.WithBlock())) _, err := proxy.TakeConn(context.Background()) assert.NotNil(t, err) } diff --git a/zrpc/resolver/internal/directbuilder.go b/zrpc/resolver/internal/directbuilder.go index ed50c9a1..81840df2 100644 --- a/zrpc/resolver/internal/directbuilder.go +++ b/zrpc/resolver/internal/directbuilder.go @@ -3,15 +3,16 @@ package internal import ( "strings" + "github.com/zeromicro/go-zero/zrpc/resolver/internal/targets" "google.golang.org/grpc/resolver" ) type directBuilder struct{} -func (d *directBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) ( +func (d *directBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) ( resolver.Resolver, error) { var addrs []resolver.Address - endpoints := strings.FieldsFunc(target.Endpoint, func(r rune) bool { + endpoints := strings.FieldsFunc(targets.GetEndpoints(target), func(r rune) bool { return r == EndpointSepChar }) diff --git a/zrpc/resolver/internal/directbuilder_test.go b/zrpc/resolver/internal/directbuilder_test.go index d6fb0133..e311fcba 100644 --- a/zrpc/resolver/internal/directbuilder_test.go +++ b/zrpc/resolver/internal/directbuilder_test.go @@ -2,6 +2,7 @@ package internal import ( "fmt" + "net/url" "strconv" "strings" "testing" @@ -31,9 +32,11 @@ func TestDirectBuilder_Build(t *testing.T) { } var b directBuilder cc := new(mockedClientConn) - _, err := b.Build(resolver.Target{ - Scheme: DirectScheme, - Endpoint: strings.Join(servers, ","), + target := fmt.Sprintf("%s:///%s", DirectScheme, strings.Join(servers, ",")) + uri, err := url.Parse(target) + assert.Nil(t, err) + _, err = b.Build(resolver.Target{ + URL: *uri, }, cc, resolver.BuildOptions{}) assert.Nil(t, err) size := mathx.MinInt(test, subsetSize) diff --git a/zrpc/resolver/internal/discovbuilder.go b/zrpc/resolver/internal/discovbuilder.go index b3a587bb..cf432e23 100644 --- a/zrpc/resolver/internal/discovbuilder.go +++ b/zrpc/resolver/internal/discovbuilder.go @@ -5,6 +5,7 @@ import ( "github.com/zeromicro/go-zero/core/discov" "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/zrpc/resolver/internal/targets" "google.golang.org/grpc/resolver" ) @@ -12,10 +13,10 @@ type discovBuilder struct{} func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) ( resolver.Resolver, error) { - hosts := strings.FieldsFunc(target.Authority, func(r rune) bool { + hosts := strings.FieldsFunc(targets.GetAuthority(target), func(r rune) bool { return r == EndpointSepChar }) - sub, err := discov.NewSubscriber(hosts, target.Endpoint) + sub, err := discov.NewSubscriber(hosts, targets.GetEndpoints(target)) if err != nil { return nil, err } diff --git a/zrpc/resolver/internal/kube/targetparser.go b/zrpc/resolver/internal/kube/targetparser.go index 66fc6e51..570ad7d5 100644 --- a/zrpc/resolver/internal/kube/targetparser.go +++ b/zrpc/resolver/internal/kube/targetparser.go @@ -5,6 +5,7 @@ import ( "strconv" "strings" + "github.com/zeromicro/go-zero/zrpc/resolver/internal/targets" "google.golang.org/grpc/resolver" ) @@ -25,14 +26,15 @@ type Service struct { // ParseTarget parses the resolver.Target. func ParseTarget(target resolver.Target) (Service, error) { var service Service - service.Namespace = target.Authority + service.Namespace = targets.GetAuthority(target) if len(service.Namespace) == 0 { service.Namespace = defaultNamespace } - segs := strings.SplitN(target.Endpoint, colon, 2) + endpoints := targets.GetEndpoints(target) + segs := strings.SplitN(endpoints, colon, 2) if len(segs) < 2 { - return emptyService, fmt.Errorf("bad endpoint: %s", target.Endpoint) + return emptyService, fmt.Errorf("bad endpoint: %s", endpoints) } service.Name = segs[0] diff --git a/zrpc/resolver/internal/kube/targetparser_test.go b/zrpc/resolver/internal/kube/targetparser_test.go index 7f53e426..4dce9e43 100644 --- a/zrpc/resolver/internal/kube/targetparser_test.go +++ b/zrpc/resolver/internal/kube/targetparser_test.go @@ -1,6 +1,7 @@ package kube import ( + "net/url" "testing" "github.com/stretchr/testify/assert" @@ -10,17 +11,13 @@ import ( func TestParseTarget(t *testing.T) { tests := []struct { name string - input resolver.Target + input string expect Service hasErr bool }{ { - name: "normal case", - input: resolver.Target{ - Scheme: "k8s", - Authority: "ns1", - Endpoint: "my-svc:8080", - }, + name: "normal case", + input: "k8s://ns1/my-svc:8080", expect: Service{ Namespace: "ns1", Name: "my-svc", @@ -28,12 +25,8 @@ func TestParseTarget(t *testing.T) { }, }, { - name: "normal case", - input: resolver.Target{ - Scheme: "k8s", - Authority: "", - Endpoint: "my-svc:8080", - }, + name: "normal case", + input: "k8s:///my-svc:8080", expect: Service{ Namespace: defaultNamespace, Name: "my-svc", @@ -41,37 +34,27 @@ func TestParseTarget(t *testing.T) { }, }, { - name: "no port", - input: resolver.Target{ - Scheme: "k8s", - Authority: "ns1", - Endpoint: "my-svc:", - }, + name: "no port", + input: "k8s://ns1/my-svc:", hasErr: true, }, { - name: "no port, no colon", - input: resolver.Target{ - Scheme: "k8s", - Authority: "ns1", - Endpoint: "my-svc", - }, + name: "no port, no colon", + input: "k8s://ns1/my-svc", hasErr: true, }, { - name: "bad port", - input: resolver.Target{ - Scheme: "k8s", - Authority: "ns1", - Endpoint: "my-svc:800a", - }, + name: "bad port", + input: "k8s://ns1/my-svc:800a", hasErr: true, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - svc, err := ParseTarget(test.input) + uri, err := url.Parse(test.input) + assert.Nil(t, err) + svc, err := ParseTarget(resolver.Target{URL: *uri}) if test.hasErr { assert.NotNil(t, err) } else { diff --git a/zrpc/resolver/internal/targets/endpoints.go b/zrpc/resolver/internal/targets/endpoints.go new file mode 100644 index 00000000..2685f6d2 --- /dev/null +++ b/zrpc/resolver/internal/targets/endpoints.go @@ -0,0 +1,19 @@ +package targets + +import ( + "strings" + + "google.golang.org/grpc/resolver" +) + +const slashSeparator = "/" + +// GetAuthority returns the authority of the target. +func GetAuthority(target resolver.Target) string { + return target.URL.Host +} + +// GetEndpoints returns the endpoints from the given target. +func GetEndpoints(target resolver.Target) string { + return strings.Trim(target.URL.Path, slashSeparator) +} diff --git a/zrpc/resolver/internal/targets/endpoints_test.go b/zrpc/resolver/internal/targets/endpoints_test.go new file mode 100644 index 00000000..a22e2d39 --- /dev/null +++ b/zrpc/resolver/internal/targets/endpoints_test.go @@ -0,0 +1,89 @@ +package targets + +import ( + "net/url" + "testing" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/resolver" +) + +func TestGetAuthority(t *testing.T) { + tests := []struct { + name string + url string + want string + }{ + { + name: "test", + url: "direct://my_authority/localhost", + want: "my_authority", + }, + { + name: "test with port", + url: "direct://my_authority/localhost:8080", + want: "my_authority", + }, + { + name: "test with multiple hosts", + url: "direct://my_authority1,my_authority2/localhost,localhost", + want: "my_authority1,my_authority2", + }, + { + name: "test with multiple hosts with port", + url: "direct://my_authority1:3000,my_authority2:3001/localhost:8080,localhost:8081", + want: "my_authority1:3000,my_authority2:3001", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + uri, err := url.Parse(test.url) + assert.Nil(t, err) + target := resolver.Target{ + URL: *uri, + } + assert.Equal(t, test.want, GetAuthority(target)) + }) + } +} + +func TestGetEndpoints(t *testing.T) { + tests := []struct { + name string + url string + want string + }{ + { + name: "test", + url: "direct:///localhost", + want: "localhost", + }, + { + name: "test with port", + url: "direct:///localhost:8080", + want: "localhost:8080", + }, + { + name: "test with multiple hosts", + url: "direct:///localhost,localhost", + want: "localhost,localhost", + }, + { + name: "test with multiple hosts with port", + url: "direct:///localhost:8080,localhost:8081", + want: "localhost:8080,localhost:8081", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + uri, err := url.Parse(test.url) + assert.Nil(t, err) + target := resolver.Target{ + URL: *uri, + } + assert.Equal(t, test.want, GetEndpoints(target)) + }) + } +}