From 353cb5000f3f127c574cb8b8fb36048cdc4b20a1 Mon Sep 17 00:00:00 2001 From: kevin Date: Tue, 4 Aug 2020 11:59:39 +0800 Subject: [PATCH] try grpc lb interface --- .../balancer/{ => roundrobin}/roundrobin.go | 10 +++-- rpcx/internal/discovclient.go | 36 ++++++++++++++++++ rpcx/internal/resolver/resolver.go | 37 +++++++++++-------- rpcx/lb/main.go | 17 +++++++++ 4 files changed, 80 insertions(+), 20 deletions(-) rename rpcx/internal/balancer/{ => roundrobin}/roundrobin.go (87%) create mode 100644 rpcx/internal/discovclient.go create mode 100644 rpcx/lb/main.go diff --git a/rpcx/internal/balancer/roundrobin.go b/rpcx/internal/balancer/roundrobin/roundrobin.go similarity index 87% rename from rpcx/internal/balancer/roundrobin.go rename to rpcx/internal/balancer/roundrobin/roundrobin.go index bb173972..8a4c366d 100644 --- a/rpcx/internal/balancer/roundrobin.go +++ b/rpcx/internal/balancer/roundrobin/roundrobin.go @@ -1,7 +1,8 @@ -package balancer +package roundrobin import ( "context" + "fmt" "math/rand" "sync" "time" @@ -11,16 +12,16 @@ import ( "google.golang.org/grpc/resolver" ) -const Name = "roundrobin" +const Name = "zero_rr" func init() { - balancer.Register(newBuilder()) + balancer.Register(newRoundRobinBuilder()) } type roundRobinPickerBuilder struct { } -func newBuilder() balancer.Builder { +func newRoundRobinBuilder() balancer.Builder { return base.NewBalancerBuilder(Name, new(roundRobinPickerBuilder)) } @@ -48,6 +49,7 @@ type roundRobinPicker struct { func (p *roundRobinPicker) Pick(ctx context.Context, info balancer.PickInfo) ( conn balancer.SubConn, done func(balancer.DoneInfo), err error) { + fmt.Println(p.conns) p.lock.Lock() defer p.lock.Unlock() diff --git a/rpcx/internal/discovclient.go b/rpcx/internal/discovclient.go new file mode 100644 index 00000000..dbdf4628 --- /dev/null +++ b/rpcx/internal/discovclient.go @@ -0,0 +1,36 @@ +package internal + +import ( + "zero/core/discov" + "zero/rpcx/internal/balancer/roundrobin" + "zero/rpcx/internal/resolver" + + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" +) + +type DiscovClient struct { + conn *grpc.ClientConn +} + +func NewDiscovClient(etcd discov.EtcdConf, opts ...ClientOption) (*DiscovClient, error) { + resolver.RegisterResolver(etcd) + opts = append(opts, WithDialOption(grpc.WithBalancerName(roundrobin.Name))) + conn, err := dial("discov:///", opts...) + if err != nil { + return nil, err + } + + return &DiscovClient{ + conn: conn, + }, nil +} + +func (c *DiscovClient) Next() (*grpc.ClientConn, bool) { + state := c.conn.GetState() + if state == connectivity.Ready { + return c.conn, true + } else { + return nil, false + } +} diff --git a/rpcx/internal/resolver/resolver.go b/rpcx/internal/resolver/resolver.go index 9639e26e..1d2f3fe0 100644 --- a/rpcx/internal/resolver/resolver.go +++ b/rpcx/internal/resolver/resolver.go @@ -6,15 +6,19 @@ import ( "google.golang.org/grpc/resolver" ) -type discovResolver struct { - scheme string - etcd discov.EtcdConf - cc resolver.ClientConn +const discovScheme = "discov" + +type discovBuilder struct { + etcd discov.EtcdConf } -func (r *discovResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) ( +func (b *discovBuilder) Scheme() string { + return discovScheme +} + +func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) ( resolver.Resolver, error) { - sub, err := discov.NewSubscriber(r.etcd.Hosts, r.etcd.Key) + sub, err := discov.NewSubscriber(b.etcd.Hosts, b.etcd.Key) if err != nil { return nil, err } @@ -27,27 +31,28 @@ func (r *discovResolver) Build(target resolver.Target, cc resolver.ClientConn, o Addr: val, }) } - r.cc.UpdateState(resolver.State{ + cc.UpdateState(resolver.State{ Addresses: addrs, }) }) - return r, nil + return &discovResolver{ + cc: cc, + }, nil } -func (r *discovResolver) Close() { +type discovResolver struct { + cc resolver.ClientConn } -func (r *discovResolver) ResolveNow(options resolver.ResolveNowOptions) { +func (r *discovResolver) Close() { } -func (r *discovResolver) Scheme() string { - return r.scheme +func (r *discovResolver) ResolveNow(options resolver.ResolveNowOptions) { } -func RegisterResolver(scheme string, etcd discov.EtcdConf) { - resolver.Register(&discovResolver{ - scheme: scheme, - etcd: etcd, +func RegisterResolver(etcd discov.EtcdConf) { + resolver.Register(&discovBuilder{ + etcd: etcd, }) } diff --git a/rpcx/lb/main.go b/rpcx/lb/main.go new file mode 100644 index 00000000..0c558ccb --- /dev/null +++ b/rpcx/lb/main.go @@ -0,0 +1,17 @@ +package main + +import ( + "zero/core/discov" + "zero/core/lang" + "zero/rpcx/internal" +) + +func main() { + cli, err := internal.NewDiscovClient(discov.EtcdConf{ + Hosts: []string{"localhost:2379"}, + Key: "rpcx", + }) + lang.Must(err) + + cli.Next() +}