diff --git a/core/discov/subscriber.go b/core/discov/subscriber.go index 1e1bf508..8d7c5ce1 100644 --- a/core/discov/subscriber.go +++ b/core/discov/subscriber.go @@ -36,6 +36,10 @@ func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscrib return sub, nil } +func (s *Subscriber) AddListener(listener func()) { + s.items.addListener(listener) +} + func (s *Subscriber) Values() []string { return s.items.getValues() } @@ -54,6 +58,7 @@ type container struct { mapping map[string]string snapshot atomic.Value dirty *syncx.AtomicBool + listeners []func() lock sync.Mutex } @@ -68,10 +73,12 @@ func newContainer(exclusive bool) *container { func (c *container) OnAdd(kv internal.KV) { c.addKv(kv.Key, kv.Val) + c.notifyChange() } func (c *container) OnDelete(kv internal.KV) { c.removeKey(kv.Key) + c.notifyChange() } // addKv adds the kv, returns if there are already other keys associate with the value @@ -98,6 +105,12 @@ func (c *container) addKv(key, value string) ([]string, bool) { } } +func (c *container) addListener(listener func()) { + c.lock.Lock() + c.listeners = append(c.listeners, listener) + c.lock.Unlock() +} + func (c *container) doRemoveKey(key string) { server, ok := c.mapping[key] if !ok { @@ -139,6 +152,16 @@ func (c *container) getValues() []string { return vals } +func (c *container) notifyChange() { + c.lock.Lock() + listeners := append(([]func())(nil), c.listeners...) + c.lock.Unlock() + + for _, listener := range listeners { + listener() + } +} + // removeKey removes the kv, returns true if there are still other keys associate with the value func (c *container) removeKey(key string) { c.lock.Lock() diff --git a/core/discov/subscriber_test.go b/core/discov/subscriber_test.go index 6a147c3a..2c2d590b 100644 --- a/core/discov/subscriber_test.go +++ b/core/discov/subscriber_test.go @@ -168,8 +168,13 @@ func TestContainer(t *testing.T) { for _, test := range tests { for _, exclusive := range exclusives { t.Run(test.name, func(t *testing.T) { + var changed bool c := newContainer(exclusive) + c.addListener(func() { + changed = true + }) assert.Nil(t, c.getValues()) + assert.False(t, changed) for _, order := range test.do { if order.act == actionAdd { @@ -185,6 +190,7 @@ func TestContainer(t *testing.T) { } } + assert.True(t, changed) assert.True(t, c.dirty.True()) assert.ElementsMatch(t, test.expect, c.getValues()) assert.False(t, c.dirty.True()) diff --git a/rpcx/internal/balancer/roundrobin.go b/rpcx/internal/balancer/roundrobin.go index 7a7a4bf1..bb173972 100644 --- a/rpcx/internal/balancer/roundrobin.go +++ b/rpcx/internal/balancer/roundrobin.go @@ -2,15 +2,16 @@ package balancer import ( "context" + "math/rand" + "sync" + "time" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/resolver" ) -const ( - Name = "roundrobin" -) +const Name = "roundrobin" func init() { balancer.Register(newBuilder()) @@ -24,13 +25,38 @@ func newBuilder() balancer.Builder { } func (b *roundRobinPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker { - panic("implement me") + rand.Seed(time.Now().UnixNano()) + picker := &roundRobinPicker{ + index: rand.Int(), + } + + for addr, conn := range readySCs { + picker.conns = append(picker.conns, &subConn{ + addr: addr, + conn: conn, + }) + } + + return picker } type roundRobinPicker struct { + conns []*subConn + index int + lock sync.Mutex } func (p *roundRobinPicker) Pick(ctx context.Context, info balancer.PickInfo) ( conn balancer.SubConn, done func(balancer.DoneInfo), err error) { - panic("implement me") + p.lock.Lock() + defer p.lock.Unlock() + + p.index = (p.index + 1) % len(p.conns) + return p.conns[p.index].conn, func(info balancer.DoneInfo) { + }, nil +} + +type subConn struct { + addr resolver.Address + conn balancer.SubConn } diff --git a/rpcx/internal/resolver/resolver.go b/rpcx/internal/resolver/resolver.go index 2f1a14a4..4f2e4a7e 100644 --- a/rpcx/internal/resolver/resolver.go +++ b/rpcx/internal/resolver/resolver.go @@ -3,6 +3,7 @@ package resolver import "google.golang.org/grpc/resolver" type discovResolver struct { + cc resolver.ClientConn } func (r discovResolver) ResolveNow(options resolver.ResolveNowOptions) {