diff --git a/example/graceful/dns/api/handler/gracefulhandler.go b/example/graceful/dns/api/handler/gracefulhandler.go index 830857c2..ac846ad8 100644 --- a/example/graceful/dns/api/handler/gracefulhandler.go +++ b/example/graceful/dns/api/handler/gracefulhandler.go @@ -18,20 +18,13 @@ import ( func gracefulHandler(ctx *svc.ServiceContext) http.HandlerFunc { logger := executors.NewLessExecutor(time.Second) return func(w http.ResponseWriter, r *http.Request) { - var resp types.Response - - conn, ok := ctx.Client.Next() - if !ok { - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } - host, err := os.Hostname() if err != nil { http.Error(w, http.StatusText(http.StatusNotImplemented), http.StatusNotImplemented) return } + conn := ctx.Client.Conn() client := graceful.NewGraceServiceClient(conn) rp, err := client.Grace(context.Background(), &graceful.Request{From: host}) if err != nil { @@ -40,6 +33,7 @@ func gracefulHandler(ctx *svc.ServiceContext) http.HandlerFunc { return } + var resp types.Response resp.Host = rp.Host logger.DoOrDiscard(func() { fmt.Printf("%s from host: %s\n", time.Now().Format("15:04:05"), rp.Host) diff --git a/example/graceful/etcd/api/handler/gracefulhandler.go b/example/graceful/etcd/api/handler/gracefulhandler.go index bea35a76..6d0a6c08 100644 --- a/example/graceful/etcd/api/handler/gracefulhandler.go +++ b/example/graceful/etcd/api/handler/gracefulhandler.go @@ -18,20 +18,13 @@ import ( func gracefulHandler(ctx *svc.ServiceContext) http.HandlerFunc { logger := executors.NewLessExecutor(time.Second) return func(w http.ResponseWriter, r *http.Request) { - var resp types.Response - - conn, ok := ctx.Client.Next() - if !ok { - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } - host, err := os.Hostname() if err != nil { http.Error(w, http.StatusText(http.StatusNotImplemented), http.StatusNotImplemented) return } + conn := ctx.Client.Conn() client := graceful.NewGraceServiceClient(conn) rp, err := client.Grace(context.Background(), &graceful.Request{From: host}) if err != nil { @@ -40,6 +33,7 @@ func gracefulHandler(ctx *svc.ServiceContext) http.HandlerFunc { return } + var resp types.Response resp.Host = rp.Host logger.DoOrDiscard(func() { fmt.Printf("%s from host: %s\n", time.Now().Format("15:04:05"), rp.Host) diff --git a/example/rpc/client/direct/client.go b/example/rpc/client/direct/client.go index e056ee7b..843a5cd1 100644 --- a/example/rpc/client/direct/client.go +++ b/example/rpc/client/direct/client.go @@ -28,12 +28,7 @@ func main() { for { select { case <-ticker.C: - conn, ok := client.Next() - if !ok { - time.Sleep(time.Second) - break - } - + conn := client.Conn() greet := unary.NewGreeterClient(conn) ctx, cancel := context.WithTimeout(context.Background(), time.Second) resp, err := greet.Greet(ctx, &unary.Request{ diff --git a/example/rpc/client/lb/main.go b/example/rpc/client/lb/main.go new file mode 100644 index 00000000..e4bb6afc --- /dev/null +++ b/example/rpc/client/lb/main.go @@ -0,0 +1,37 @@ +package main + +import ( + "context" + "fmt" + "time" + + "zero/core/discov" + "zero/example/rpc/remote/unary" + "zero/rpcx" +) + +func main() { + cli := rpcx.MustNewClient(rpcx.RpcClientConf{ + Etcd: discov.EtcdConf{ + Hosts: []string{"localhost:2379"}, + Key: "rpcx", + }, + }) + greet := unary.NewGreeterClient(cli.Conn()) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + resp, err := greet.Greet(context.Background(), &unary.Request{ + Name: "kevin", + }) + if err != nil { + fmt.Println("X", err.Error()) + } else { + fmt.Println("=>", resp.Greet) + } + } + } +} diff --git a/example/rpc/client/stream/client.go b/example/rpc/client/stream/client.go index d303ccaf..273ee092 100644 --- a/example/rpc/client/stream/client.go +++ b/example/rpc/client/stream/client.go @@ -26,11 +26,7 @@ func main() { log.Fatal(err) } - conn, ok := client.Next() - if !ok { - log.Fatal("no server") - } - + conn := client.Conn() greet := stream.NewStreamGreeterClient(conn) stm, err := greet.Greet(context.Background()) if err != nil { diff --git a/example/rpc/client/unary/client.go b/example/rpc/client/unary/client.go index d7873419..dda272fe 100644 --- a/example/rpc/client/unary/client.go +++ b/example/rpc/client/unary/client.go @@ -4,7 +4,6 @@ import ( "context" "flag" "fmt" - "log" "time" "zero/core/conf" @@ -25,11 +24,7 @@ func main() { for { select { case <-ticker.C: - conn, ok := client.Next() - if !ok { - log.Fatal("no server") - } - + conn := client.Conn() greet := unary.NewGreeterClient(conn) resp, err := greet.Greet(context.Background(), &unary.Request{ Name: "kevin", diff --git a/example/rpc/server/unary/etc/config1.json b/example/rpc/server/unary/etc/config1.json index 6af30d6b..1cc72a63 100644 --- a/example/rpc/server/unary/etc/config1.json +++ b/example/rpc/server/unary/etc/config1.json @@ -1,17 +1,13 @@ { "Name": "rpc.unary", - "MetricsUrl": "http://localhost:2222/add", + "Log": { + "Mode": "volume" + }, "ListenOn": "localhost:3457", - "Auth": false, "Etcd": { "Hosts": [ "localhost:2379" ], "Key": "rpcx" - }, - "Redis": { - "Host": "localhost:6379", - "Type": "node", - "Key": "apps" } } diff --git a/example/tracing/edge/main.go b/example/tracing/edge/main.go index 2f497c58..38a05903 100644 --- a/example/tracing/edge/main.go +++ b/example/tracing/edge/main.go @@ -2,7 +2,6 @@ package main import ( "flag" - "log" "net/http" "zero/core/conf" @@ -20,11 +19,7 @@ var ( ) func handle(w http.ResponseWriter, r *http.Request) { - conn, ok := client.Next() - if !ok { - log.Fatal("no server") - } - + conn := client.Conn() greet := portal.NewPortalClient(conn) resp, err := greet.Portal(r.Context(), &portal.PortalRequest{ Name: "kevin", diff --git a/example/tracing/portal/server.go b/example/tracing/portal/server.go index 340a7309..950e38c3 100644 --- a/example/tracing/portal/server.go +++ b/example/tracing/portal/server.go @@ -2,7 +2,6 @@ package main import ( "context" - "errors" "flag" "zero/core/conf" @@ -33,11 +32,7 @@ func NewPortalServer(client *rpcx.RpcClient) *PortalServer { } func (gs *PortalServer) Portal(ctx context.Context, req *portal.PortalRequest) (*portal.PortalResponse, error) { - conn, ok := gs.userRpc.Next() - if !ok { - return nil, errors.New("internal error") - } - + conn := gs.userRpc.Conn() greet := user.NewUserClient(conn) resp, err := greet.GetGrade(ctx, &user.UserRequest{ Name: req.Name, diff --git a/rpcx/client.go b/rpcx/client.go index 8fd09492..552b6183 100644 --- a/rpcx/client.go +++ b/rpcx/client.go @@ -42,7 +42,7 @@ func NewClient(c RpcClientConf, options ...internal.ClientOption) (*RpcClient, e if len(c.Server) > 0 { client, err = internal.NewDirectClient(c.Server, opts...) } else if err = c.Etcd.Validate(); err == nil { - client, err = internal.NewRoundRobinRpcClient(c.Etcd.Hosts, c.Etcd.Key, opts...) + client, err = internal.NewDiscovClient(c.Etcd.Hosts, c.Etcd.Key, opts...) } if err != nil { return nil, err @@ -54,7 +54,7 @@ func NewClient(c RpcClientConf, options ...internal.ClientOption) (*RpcClient, e } func NewClientNoAuth(c discov.EtcdConf) (*RpcClient, error) { - client, err := internal.NewRoundRobinRpcClient(c.Hosts, c.Key) + client, err := internal.NewDiscovClient(c.Hosts, c.Key) if err != nil { return nil, err } @@ -64,6 +64,6 @@ func NewClientNoAuth(c discov.EtcdConf) (*RpcClient, error) { }, nil } -func (rc *RpcClient) Next() (*grpc.ClientConn, bool) { - return rc.client.Next() +func (rc *RpcClient) Conn() *grpc.ClientConn { + return rc.client.Conn() } diff --git a/rpcx/internal/balancer/p2c/p2c.go b/rpcx/internal/balancer/p2c/p2c.go new file mode 100644 index 00000000..d1028155 --- /dev/null +++ b/rpcx/internal/balancer/p2c/p2c.go @@ -0,0 +1,172 @@ +package p2c + +import ( + "context" + "math" + "math/rand" + "sync" + "sync/atomic" + "time" + + "zero/core/timex" + "zero/rpcx/internal/codes" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/resolver" +) + +const ( + Name = "p2c_ewma" + decayTime = int64(time.Millisecond * 600) + forcePick = int64(time.Second) + initSuccess = 1000 + throttleSuccess = initSuccess / 2 + penalty = int64(math.MaxInt32) + pickTimes = 3 +) + +func init() { + balancer.Register(newBuilder()) +} + +type p2cPickerBuilder struct { +} + +func newBuilder() balancer.Builder { + return base.NewBalancerBuilder(Name, new(p2cPickerBuilder)) +} + +func (b *p2cPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker { + if len(readySCs) == 0 { + return base.NewErrPicker(balancer.ErrNoSubConnAvailable) + } + + var conns []*subConn + for addr, conn := range readySCs { + conns = append(conns, &subConn{ + addr: addr, + conn: conn, + success: initSuccess, + }) + } + + return &p2cPicker{ + conns: conns, + r: rand.New(rand.NewSource(time.Now().UnixNano())), + } +} + +type p2cPicker struct { + conns []*subConn + r *rand.Rand + lock sync.Mutex +} + +func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) ( + conn balancer.SubConn, done func(balancer.DoneInfo), err error) { + p.lock.Lock() + defer p.lock.Unlock() + + var chosen *subConn + switch len(p.conns) { + case 0: + return nil, nil, balancer.ErrNoSubConnAvailable + case 1: + chosen = p.choose(p.conns[0], nil) + case 2: + chosen = p.choose(p.conns[0], p.conns[1]) + default: + var node1, node2 *subConn + for i := 0; i < pickTimes; i++ { + a := p.r.Intn(len(p.conns)) + b := p.r.Intn(len(p.conns) - 1) + if b >= a { + b++ + } + node1 = p.conns[a] + node2 = p.conns[b] + if node1.healthy() && node2.healthy() { + break + } + } + + chosen = p.choose(node1, node2) + } + + atomic.AddInt64(&chosen.inflight, 1) + return chosen.conn, p.buildDoneFunc(chosen), nil +} + +func (p *p2cPicker) buildDoneFunc(c *subConn) func(info balancer.DoneInfo) { + start := int64(timex.Now()) + return func(info balancer.DoneInfo) { + atomic.AddInt64(&c.inflight, -1) + now := int64(timex.Now()) + last := atomic.SwapInt64(&c.last, int64(now)) + td := now - last + if td < 0 { + td = 0 + } + w := math.Exp(float64(-td) / float64(decayTime)) + lag := now - start + if lag < 0 { + lag = 0 + } + olag := atomic.LoadUint64(&c.lag) + if olag == 0 { + w = 0 + } + atomic.StoreUint64(&c.lag, uint64(float64(olag)*w+float64(lag)*(1-w))) + success := initSuccess + if info.Err != nil && !codes.Acceptable(info.Err) { + success = 0 + } + osucc := atomic.LoadUint64(&c.success) + atomic.StoreUint64(&c.success, uint64(float64(osucc)*w+float64(success)*(1-w))) + } +} + +func (p *p2cPicker) choose(c1, c2 *subConn) *subConn { + start := int64(timex.Now()) + if c2 == nil { + atomic.StoreInt64(&c1.pick, start) + return c1 + } + + if c1.load() > c2.load() { + c1, c2 = c2, c1 + } + + pick := atomic.LoadInt64(&c2.pick) + if start-pick > forcePick && atomic.CompareAndSwapInt64(&c2.pick, pick, start) { + return c2 + } else { + atomic.StoreInt64(&c1.pick, start) + return c1 + } +} + +type subConn struct { + addr resolver.Address + conn balancer.SubConn + lag uint64 + inflight int64 + success uint64 + last int64 + pick int64 +} + +func (c *subConn) healthy() bool { + return atomic.LoadUint64(&c.success) > throttleSuccess +} + +func (c *subConn) load() int64 { + lag := int64(math.Sqrt(float64(atomic.LoadUint64(&c.lag) + 1))) + load := lag * atomic.LoadInt64(&c.inflight) + if load == 0 { + return penalty + } else { + return load + } +} diff --git a/rpcx/internal/balancer/p2c/p2c_test.go b/rpcx/internal/balancer/p2c/p2c_test.go new file mode 100644 index 00000000..685780df --- /dev/null +++ b/rpcx/internal/balancer/p2c/p2c_test.go @@ -0,0 +1,7 @@ +package p2c + +import "testing" + +func TestP2cPicker_Pick(t *testing.T) { + +} diff --git a/rpcx/internal/balancer/roundrobin/roundrobin.go b/rpcx/internal/balancer/roundrobin/roundrobin.go index 8a4c366d..0e54ec6c 100644 --- a/rpcx/internal/balancer/roundrobin/roundrobin.go +++ b/rpcx/internal/balancer/roundrobin/roundrobin.go @@ -2,7 +2,6 @@ package roundrobin import ( "context" - "fmt" "math/rand" "sync" "time" @@ -12,20 +11,23 @@ import ( "google.golang.org/grpc/resolver" ) -const Name = "zero_rr" +const Name = "rr" func init() { - balancer.Register(newRoundRobinBuilder()) + balancer.Register(newBuilder()) } -type roundRobinPickerBuilder struct { -} +type roundRobinPickerBuilder struct{} -func newRoundRobinBuilder() balancer.Builder { +func newBuilder() balancer.Builder { return base.NewBalancerBuilder(Name, new(roundRobinPickerBuilder)) } func (b *roundRobinPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker { + if len(readySCs) == 0 { + return base.NewErrPicker(balancer.ErrNoSubConnAvailable) + } + rand.Seed(time.Now().UnixNano()) picker := &roundRobinPicker{ index: rand.Int(), @@ -49,7 +51,6 @@ type roundRobinPicker struct { func (p *roundRobinPicker) Pick(ctx context.Context, info balancer.PickInfo) ( conn balancer.SubConn, done func(balancer.DoneInfo), err error) { - fmt.Println(p.conns) p.lock.Lock() defer p.lock.Unlock() diff --git a/rpcx/internal/client.go b/rpcx/internal/client.go index 58ed3e29..84e55ec8 100644 --- a/rpcx/internal/client.go +++ b/rpcx/internal/client.go @@ -21,7 +21,7 @@ type ( ClientOption func(options *ClientOptions) Client interface { - Next() (*grpc.ClientConn, bool) + Conn() *grpc.ClientConn } ) diff --git a/rpcx/internal/clientinterceptors/breakerinterceptor.go b/rpcx/internal/clientinterceptors/breakerinterceptor.go index 43233520..cb86d929 100644 --- a/rpcx/internal/clientinterceptors/breakerinterceptor.go +++ b/rpcx/internal/clientinterceptors/breakerinterceptor.go @@ -5,25 +5,15 @@ import ( "path" "zero/core/breaker" + "zero/rpcx/internal/codes" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) -func acceptable(err error) bool { - switch status.Code(err) { - case codes.DeadlineExceeded, codes.Internal, codes.Unavailable, codes.DataLoss: - return false - default: - return true - } -} - func BreakerInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { breakerName := path.Join(cc.Target(), method) return breaker.DoWithAcceptable(breakerName, func() error { return invoker(ctx, method, req, reply, cc, opts...) - }, acceptable) + }, codes.Acceptable) } diff --git a/rpcx/internal/clientinterceptors/breakerinterceptor_test.go b/rpcx/internal/clientinterceptors/breakerinterceptor_test.go index b134dca6..6982836a 100644 --- a/rpcx/internal/clientinterceptors/breakerinterceptor_test.go +++ b/rpcx/internal/clientinterceptors/breakerinterceptor_test.go @@ -5,6 +5,7 @@ import ( "zero/core/breaker" "zero/core/stat" + rcodes "zero/rpcx/internal/codes" "github.com/stretchr/testify/assert" "google.golang.org/grpc/codes" @@ -32,7 +33,7 @@ func TestBreakerInterceptorNotFound(t *testing.T) { for i := 0; i < 1000; i++ { assert.Equal(t, err, breaker.DoWithAcceptable("call", func() error { return err - }, acceptable)) + }, rcodes.Acceptable)) } } @@ -42,7 +43,7 @@ func TestBreakerInterceptorDeadlineExceeded(t *testing.T) { for i := 0; i < 1000; i++ { e := breaker.DoWithAcceptable("call", func() error { return err - }, acceptable) + }, rcodes.Acceptable) errs[e]++ } assert.Equal(t, 2, len(errs)) diff --git a/rpcx/internal/codes/error.go b/rpcx/internal/codes/error.go new file mode 100644 index 00000000..91191083 --- /dev/null +++ b/rpcx/internal/codes/error.go @@ -0,0 +1,15 @@ +package codes + +import ( + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func Acceptable(err error) bool { + switch status.Code(err) { + case codes.DeadlineExceeded, codes.Internal, codes.Unavailable, codes.DataLoss: + return false + default: + return true + } +} diff --git a/rpcx/internal/directclient.go b/rpcx/internal/directclient.go index a7373fa3..de460828 100644 --- a/rpcx/internal/directclient.go +++ b/rpcx/internal/directclient.go @@ -3,7 +3,6 @@ package internal import ( "google.golang.org/grpc" "google.golang.org/grpc/balancer/roundrobin" - "google.golang.org/grpc/connectivity" ) type DirectClient struct { @@ -22,11 +21,6 @@ func NewDirectClient(server string, opts ...ClientOption) (*DirectClient, error) }, nil } -func (c *DirectClient) Next() (*grpc.ClientConn, bool) { - state := c.conn.GetState() - if state == connectivity.Ready { - return c.conn, true - } else { - return nil, false - } +func (c *DirectClient) Conn() *grpc.ClientConn { + return c.conn } diff --git a/rpcx/internal/discovclient.go b/rpcx/internal/discovclient.go index dbdf4628..b45ffce4 100644 --- a/rpcx/internal/discovclient.go +++ b/rpcx/internal/discovclient.go @@ -1,36 +1,35 @@ package internal import ( - "zero/core/discov" - "zero/rpcx/internal/balancer/roundrobin" + "fmt" + "strings" + + "zero/rpcx/internal/balancer/p2c" "zero/rpcx/internal/resolver" "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" ) +func init() { + resolver.RegisterResolver() +} + type DiscovClient struct { conn *grpc.ClientConn } -func NewDiscovClient(etcd discov.EtcdConf, opts ...ClientOption) (*DiscovClient, error) { - resolver.RegisterResolver(etcd) - opts = append(opts, WithDialOption(grpc.WithBalancerName(roundrobin.Name))) - conn, err := dial("discov:///", opts...) +func NewDiscovClient(endpoints []string, key string, opts ...ClientOption) (*DiscovClient, error) { + opts = append(opts, WithDialOption(grpc.WithBalancerName(p2c.Name))) + target := fmt.Sprintf("%s://%s/%s", resolver.DiscovScheme, + strings.Join(endpoints, resolver.EndpointSep), key) + conn, err := dial(target, opts...) if err != nil { return nil, err } - return &DiscovClient{ - conn: conn, - }, nil + return &DiscovClient{conn: conn}, nil } -func (c *DiscovClient) Next() (*grpc.ClientConn, bool) { - state := c.conn.GetState() - if state == connectivity.Ready { - return c.conn, true - } else { - return nil, false - } +func (c *DiscovClient) Conn() *grpc.ClientConn { + return c.conn } diff --git a/rpcx/internal/resolver/resolver.go b/rpcx/internal/resolver/resolver.go index 1d2f3fe0..c4b64f75 100644 --- a/rpcx/internal/resolver/resolver.go +++ b/rpcx/internal/resolver/resolver.go @@ -1,32 +1,42 @@ package resolver import ( + "fmt" + "strings" + "zero/core/discov" "google.golang.org/grpc/resolver" ) -const discovScheme = "discov" +const ( + DiscovScheme = "discov" + EndpointSep = "," +) + +var builder discovBuilder -type discovBuilder struct { - etcd discov.EtcdConf -} +type discovBuilder struct{} func (b *discovBuilder) Scheme() string { - return discovScheme + return DiscovScheme } func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) ( resolver.Resolver, error) { - sub, err := discov.NewSubscriber(b.etcd.Hosts, b.etcd.Key) + if target.Scheme != DiscovScheme { + return nil, fmt.Errorf("bad scheme: %s", target.Scheme) + } + + hosts := strings.Split(target.Authority, EndpointSep) + sub, err := discov.NewSubscriber(hosts, target.Endpoint) if err != nil { return nil, err } - sub.AddListener(func() { - vals := sub.Values() + update := func() { var addrs []resolver.Address - for _, val := range vals { + for _, val := range sub.Values() { addrs = append(addrs, resolver.Address{ Addr: val, }) @@ -34,7 +44,9 @@ func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, op cc.UpdateState(resolver.State{ Addresses: addrs, }) - }) + } + sub.AddListener(update) + update() return &discovResolver{ cc: cc, @@ -51,8 +63,6 @@ func (r *discovResolver) Close() { func (r *discovResolver) ResolveNow(options resolver.ResolveNowOptions) { } -func RegisterResolver(etcd discov.EtcdConf) { - resolver.Register(&discovBuilder{ - etcd: etcd, - }) +func RegisterResolver() { + resolver.Register(&builder) } diff --git a/rpcx/internal/rpcsubclient.go b/rpcx/internal/rpcsubclient.go deleted file mode 100644 index 47347ef2..00000000 --- a/rpcx/internal/rpcsubclient.go +++ /dev/null @@ -1,102 +0,0 @@ -package internal - -import ( - "time" - - "zero/core/discov" - "zero/core/logx" - "zero/core/threading" - - "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" -) - -const ( - coolOffTime = time.Second * 5 - retryTimes = 3 -) - -type ( - RoundRobinSubClient struct { - *discov.RoundRobinSubClient - } - - ConsistentSubClient struct { - *discov.ConsistentSubClient - } -) - -func NewRoundRobinRpcClient(endpoints []string, key string, opts ...ClientOption) (*RoundRobinSubClient, error) { - subClient, err := discov.NewRoundRobinSubClient(endpoints, key, func(server string) (interface{}, error) { - return dial(server, opts...) - }, func(server string, conn interface{}) error { - return closeConn(conn.(*grpc.ClientConn)) - }, discov.Exclusive()) - if err != nil { - return nil, err - } else { - return &RoundRobinSubClient{subClient}, nil - } -} - -func NewConsistentRpcClient(endpoints []string, key string, opts ...ClientOption) (*ConsistentSubClient, error) { - subClient, err := discov.NewConsistentSubClient(endpoints, key, func(server string) (interface{}, error) { - return dial(server, opts...) - }, func(server string, conn interface{}) error { - return closeConn(conn.(*grpc.ClientConn)) - }) - if err != nil { - return nil, err - } else { - return &ConsistentSubClient{subClient}, nil - } -} - -func (cli *RoundRobinSubClient) Next() (*grpc.ClientConn, bool) { - return next(func() (interface{}, bool) { - return cli.RoundRobinSubClient.Next() - }) -} - -func (cli *ConsistentSubClient) Next(key string) (*grpc.ClientConn, bool) { - return next(func() (interface{}, bool) { - return cli.ConsistentSubClient.Next(key) - }) -} - -func closeConn(conn *grpc.ClientConn) error { - // why to close the conn asynchronously is because maybe another goroutine - // is using the same conn, we can wait the coolOffTime to let the other - // goroutine to finish using the conn. - // after the conn unregistered, the balancer will not assign the conn, - // but maybe the already assigned tasks are still using it. - threading.GoSafe(func() { - time.Sleep(coolOffTime) - if err := conn.Close(); err != nil { - logx.Error(err) - } - }) - - return nil -} - -func next(nextFn func() (interface{}, bool)) (*grpc.ClientConn, bool) { - for i := 0; i < retryTimes; i++ { - v, ok := nextFn() - if !ok { - break - } - - conn, yes := v.(*grpc.ClientConn) - if !yes { - break - } - - switch conn.GetState() { - case connectivity.Ready: - return conn, true - } - } - - return nil, false -} diff --git a/rpcx/internal/rrclient.go b/rpcx/internal/rrclient.go deleted file mode 100644 index d0228be1..00000000 --- a/rpcx/internal/rrclient.go +++ /dev/null @@ -1,40 +0,0 @@ -package internal - -import ( - "math/rand" - "sync" - "time" - - "google.golang.org/grpc" -) - -type RRClient struct { - conns []*grpc.ClientConn - index int - lock sync.Mutex -} - -func NewRRClient(endpoints []string) (*RRClient, error) { - var conns []*grpc.ClientConn - for _, endpoint := range endpoints { - conn, err := dial(endpoint) - if err != nil { - return nil, err - } - - conns = append(conns, conn) - } - - rand.Seed(time.Now().UnixNano()) - return &RRClient{ - conns: conns, - index: rand.Intn(len(conns)), - }, nil -} - -func (c *RRClient) Next() *grpc.ClientConn { - c.lock.Lock() - defer c.lock.Unlock() - c.index = (c.index + 1) % len(c.conns) - return c.conns[c.index] -} diff --git a/rpcx/lb/main.go b/rpcx/lb/main.go deleted file mode 100644 index 0c558ccb..00000000 --- a/rpcx/lb/main.go +++ /dev/null @@ -1,17 +0,0 @@ -package main - -import ( - "zero/core/discov" - "zero/core/lang" - "zero/rpcx/internal" -) - -func main() { - cli, err := internal.NewDiscovClient(discov.EtcdConf{ - Hosts: []string{"localhost:2379"}, - Key: "rpcx", - }) - lang.Must(err) - - cli.Next() -} diff --git a/rpcx/proxy.go b/rpcx/proxy.go index b6243e34..07e51c13 100644 --- a/rpcx/proxy.go +++ b/rpcx/proxy.go @@ -57,10 +57,5 @@ func (p *RpcProxy) TakeConn(ctx context.Context) (*grpc.ClientConn, error) { return nil, err } - conn, ok := val.(*RpcClient).Next() - if !ok { - return nil, grpc.ErrServerStopped - } - - return conn, nil + return val.(*RpcClient).Conn(), nil }