From e20b02f3110264041e139705c0b114261f710829 Mon Sep 17 00:00:00 2001 From: MarkJoyMa <64180138+MarkJoyMa@users.noreply.github.com> Date: Sat, 22 Apr 2023 13:23:47 +0800 Subject: [PATCH] gateway: open timeout function cabinet (#3047) Co-authored-by: Kevin Wan --- .codecov.yml | 3 +- gateway/server.go | 52 ++++++++-- gateway/server_test.go | 95 +++++++++++++++++++ .../internal => internal}/mock/deposit.pb.go | 0 .../internal => internal}/mock/deposit.proto | 0 .../mock/depositserver.go | 0 zrpc/client_test.go | 2 +- zrpc/internal/rpcserver_test.go | 2 +- zrpc/proxy_test.go | 2 +- 9 files changed, 144 insertions(+), 12 deletions(-) create mode 100644 gateway/server_test.go rename {zrpc/internal => internal}/mock/deposit.pb.go (100%) rename {zrpc/internal => internal}/mock/deposit.proto (100%) rename {zrpc/internal => internal}/mock/depositserver.go (100%) diff --git a/.codecov.yml b/.codecov.yml index f93e9963..ea2695f5 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -3,4 +3,5 @@ comment: behavior: once require_changes: true ignore: - - "tools" \ No newline at end of file + - "tools" + - "internal/mock" \ No newline at end of file diff --git a/gateway/server.go b/gateway/server.go index cdbc0dfd..e022e460 100644 --- a/gateway/server.go +++ b/gateway/server.go @@ -23,22 +23,28 @@ import ( type ( // Server is a gateway server. Server struct { + c GatewayConf *rest.Server - upstreams []Upstream + upstreams []*upstream timeout time.Duration processHeader func(http.Header) []string } // Option defines the method to customize Server. Option func(svr *Server) + + upstream struct { + Upstream + client zrpc.Client + } ) // MustNewServer creates a new gateway server. func MustNewServer(c GatewayConf, opts ...Option) *Server { svr := &Server{ - Server: rest.MustNewServer(c.RestConf), - upstreams: c.Upstreams, - timeout: time.Duration(c.Timeout) * time.Millisecond, + c: c, + Server: rest.MustNewServer(c.RestConf), + timeout: time.Duration(c.Timeout) * time.Millisecond, } for _, opt := range opts { opt(svr) @@ -59,17 +65,47 @@ func (s *Server) Stop() { } func (s *Server) build() error { + if err := s.buildClient(); err != nil { + return err + } + return s.buildUpstream() +} + +func (s *Server) buildClient() error { if err := s.ensureUpstreamNames(); err != nil { return err } return mr.MapReduceVoid(func(source chan<- Upstream) { - for _, up := range s.upstreams { + for _, up := range s.c.Upstreams { source <- up } - }, func(up Upstream, writer mr.Writer[rest.Route], cancel func(error)) { + }, func(up Upstream, writer mr.Writer[*upstream], cancel func(error)) { + target, err := up.Grpc.BuildTarget() + if err != nil { + cancel(err) + } + up.Name = target cli := zrpc.MustNewClient(up.Grpc) - source, err := s.createDescriptorSource(cli, up) + writer.Write(&upstream{ + Upstream: up, + client: cli, + }) + }, func(pipe <-chan *upstream, cancel func(error)) { + for up := range pipe { + s.upstreams = append(s.upstreams, up) + } + }) +} + +func (s *Server) buildUpstream() error { + return mr.MapReduceVoid(func(source chan<- *upstream) { + for _, up := range s.upstreams { + source <- up + } + }, func(up *upstream, writer mr.Writer[rest.Route], cancel func(error)) { + cli := up.client + source, err := s.createDescriptorSource(cli, up.Upstream) if err != nil { cancel(fmt.Errorf("%s: %w", up.Name, err)) return @@ -161,7 +197,7 @@ func (s *Server) createDescriptorSource(cli zrpc.Client, up Upstream) (grpcurl.D } func (s *Server) ensureUpstreamNames() error { - for _, up := range s.upstreams { + for _, up := range s.c.Upstreams { target, err := up.Grpc.BuildTarget() if err != nil { return err diff --git a/gateway/server_test.go b/gateway/server_test.go new file mode 100644 index 00000000..2c420c3b --- /dev/null +++ b/gateway/server_test.go @@ -0,0 +1,95 @@ +package gateway + +import ( + "context" + "log" + "net" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/zeromicro/go-zero/core/conf" + "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/internal/mock" + "github.com/zeromicro/go-zero/rest/httpc" + "github.com/zeromicro/go-zero/zrpc" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" + "google.golang.org/grpc/test/bufconn" +) + +func init() { + logx.Disable() +} + +func dialer() func(context.Context, string) (net.Conn, error) { + listener := bufconn.Listen(1024 * 1024) + server := grpc.NewServer() + mock.RegisterDepositServiceServer(server, &mock.DepositServer{}) + + reflection.Register(server) + + go func() { + if err := server.Serve(listener); err != nil { + log.Fatal(err) + } + }() + + return func(context.Context, string) (net.Conn, error) { + return listener.Dial() + } +} + +func TestMustNewServer(t *testing.T) { + var c GatewayConf + assert.NoError(t, conf.FillDefault(&c)) + c.Port = 18881 + + s := MustNewServer(c) + + s.upstreams = []*upstream{ + { + Upstream: Upstream{ + Mappings: []RouteMapping{ + { + Method: "get", + Path: "/deposit/:amount", + RpcPath: "mock.DepositService/Deposit", + }, + }, + }, + client: zrpc.MustNewClient(zrpc.RpcClientConf{ + Endpoints: []string{"foo"}, + Timeout: 1000, + Middlewares: zrpc.ClientMiddlewaresConf{ + Trace: true, + Duration: true, + Prometheus: true, + Breaker: true, + Timeout: true, + }, + }, + zrpc.WithDialOption(grpc.WithContextDialer(dialer())), + zrpc.WithUnaryClientInterceptor(func(ctx context.Context, method string, req, reply any, + cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + return invoker(ctx, method, req, reply, cc, opts...) + })), + }, + } + + assert.NoError(t, s.buildUpstream()) + go s.Server.Start() + + time.Sleep(time.Millisecond * 100) + + ctx := context.Background() + + resp, err := httpc.Do(ctx, http.MethodGet, "http://localhost:18881/deposit/100", nil) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + + resp, err = httpc.Do(ctx, http.MethodGet, "http://localhost:18881/deposit_fail/100", nil) + assert.NoError(t, err) + assert.Equal(t, http.StatusNotFound, resp.StatusCode) +} diff --git a/zrpc/internal/mock/deposit.pb.go b/internal/mock/deposit.pb.go similarity index 100% rename from zrpc/internal/mock/deposit.pb.go rename to internal/mock/deposit.pb.go diff --git a/zrpc/internal/mock/deposit.proto b/internal/mock/deposit.proto similarity index 100% rename from zrpc/internal/mock/deposit.proto rename to internal/mock/deposit.proto diff --git a/zrpc/internal/mock/depositserver.go b/internal/mock/depositserver.go similarity index 100% rename from zrpc/internal/mock/depositserver.go rename to internal/mock/depositserver.go diff --git a/zrpc/client_test.go b/zrpc/client_test.go index 70da01b2..808f97c0 100644 --- a/zrpc/client_test.go +++ b/zrpc/client_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/zeromicro/go-zero/core/discov" "github.com/zeromicro/go-zero/core/logx" - "github.com/zeromicro/go-zero/zrpc/internal/mock" + "github.com/zeromicro/go-zero/internal/mock" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" diff --git a/zrpc/internal/rpcserver_test.go b/zrpc/internal/rpcserver_test.go index da4b2358..90f4bc9c 100644 --- a/zrpc/internal/rpcserver_test.go +++ b/zrpc/internal/rpcserver_test.go @@ -8,7 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/zeromicro/go-zero/core/proc" "github.com/zeromicro/go-zero/core/stat" - "github.com/zeromicro/go-zero/zrpc/internal/mock" + "github.com/zeromicro/go-zero/internal/mock" "google.golang.org/grpc" ) diff --git a/zrpc/proxy_test.go b/zrpc/proxy_test.go index e562eba8..bdca97f7 100644 --- a/zrpc/proxy_test.go +++ b/zrpc/proxy_test.go @@ -6,7 +6,7 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/zeromicro/go-zero/zrpc/internal/mock" + "github.com/zeromicro/go-zero/internal/mock" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure"