diff --git a/core/discov/internal/balancer.go b/core/discov/internal/balancer.go deleted file mode 100644 index d55133e5..00000000 --- a/core/discov/internal/balancer.go +++ /dev/null @@ -1,103 +0,0 @@ -package internal - -import "sync" - -type ( - DialFn func(server string) (interface{}, error) - CloseFn func(server string, conn interface{}) error - - Balancer interface { - AddConn(kv KV) error - IsEmpty() bool - Next(key ...string) (interface{}, bool) - RemoveKey(key string) - initialize() - setListener(listener Listener) - } - - serverConn struct { - key string - conn interface{} - } - - baseBalancer struct { - exclusive bool - servers map[string][]string - mapping map[string]string - lock sync.Mutex - dialFn DialFn - closeFn CloseFn - listener Listener - } -) - -func newBaseBalancer(dialFn DialFn, closeFn CloseFn, exclusive bool) *baseBalancer { - return &baseBalancer{ - exclusive: exclusive, - servers: make(map[string][]string), - mapping: make(map[string]string), - dialFn: dialFn, - closeFn: closeFn, - } -} - -// addKv adds the kv, returns if there are already other keys associate with the server -func (b *baseBalancer) addKv(key, value string) ([]string, bool) { - b.lock.Lock() - defer b.lock.Unlock() - - keys := b.servers[value] - previous := append([]string(nil), keys...) - early := len(keys) > 0 - if b.exclusive && early { - for _, each := range keys { - b.doRemoveKv(each) - } - } - b.servers[value] = append(b.servers[value], key) - b.mapping[key] = value - - if early { - return previous, true - } else { - return nil, false - } -} - -func (b *baseBalancer) doRemoveKv(key string) (server string, keepConn bool) { - server, ok := b.mapping[key] - if !ok { - return "", true - } - - delete(b.mapping, key) - keys := b.servers[server] - remain := keys[:0] - - for _, k := range keys { - if k != key { - remain = append(remain, k) - } - } - - if len(remain) > 0 { - b.servers[server] = remain - return server, true - } else { - delete(b.servers, server) - return server, false - } -} - -func (b *baseBalancer) removeKv(key string) (server string, keepConn bool) { - b.lock.Lock() - defer b.lock.Unlock() - - return b.doRemoveKv(key) -} - -func (b *baseBalancer) setListener(listener Listener) { - b.lock.Lock() - b.listener = listener - b.lock.Unlock() -} diff --git a/core/discov/internal/balancer_test.go b/core/discov/internal/balancer_test.go deleted file mode 100644 index 466efc46..00000000 --- a/core/discov/internal/balancer_test.go +++ /dev/null @@ -1,5 +0,0 @@ -package internal - -type mockConn struct { - server string -} diff --git a/core/discov/internal/consistentbalancer.go b/core/discov/internal/consistentbalancer.go deleted file mode 100644 index 9359ed2b..00000000 --- a/core/discov/internal/consistentbalancer.go +++ /dev/null @@ -1,152 +0,0 @@ -package internal - -import ( - "zero/core/hash" - "zero/core/logx" -) - -type consistentBalancer struct { - *baseBalancer - conns map[string]interface{} - buckets *hash.ConsistentHash - bucketKey func(KV) string -} - -func NewConsistentBalancer(dialFn DialFn, closeFn CloseFn, keyer func(kv KV) string) *consistentBalancer { - // we don't support exclusive mode for consistent Balancer, to avoid complexity, - // because there are few scenarios, use it on your own risks. - balancer := &consistentBalancer{ - conns: make(map[string]interface{}), - buckets: hash.NewConsistentHash(), - bucketKey: keyer, - } - balancer.baseBalancer = newBaseBalancer(dialFn, closeFn, false) - return balancer -} - -func (b *consistentBalancer) AddConn(kv KV) error { - // not adding kv and conn within a transaction, but it doesn't matter - // we just rollback the kv addition if dial failed - var conn interface{} - prev, found := b.addKv(kv.Key, kv.Val) - if found { - conn = b.handlePrevious(prev) - } - - if conn == nil { - var err error - conn, err = b.dialFn(kv.Val) - if err != nil { - b.removeKv(kv.Key) - return err - } - } - - bucketKey := b.bucketKey(kv) - b.lock.Lock() - defer b.lock.Unlock() - b.conns[bucketKey] = conn - b.buckets.Add(bucketKey) - b.notify(bucketKey) - - logx.Infof("added server, key: %s, server: %s", bucketKey, kv.Val) - - return nil -} - -func (b *consistentBalancer) getConn(key string) (interface{}, bool) { - b.lock.Lock() - conn, ok := b.conns[key] - b.lock.Unlock() - - return conn, ok -} - -func (b *consistentBalancer) handlePrevious(prev []string) interface{} { - if len(prev) == 0 { - return nil - } - - b.lock.Lock() - defer b.lock.Unlock() - - // if not exclusive, only need to randomly find one connection - for key, conn := range b.conns { - if key == prev[0] { - return conn - } - } - - return nil -} - -func (b *consistentBalancer) initialize() { -} - -func (b *consistentBalancer) notify(key string) { - if b.listener == nil { - return - } - - var keys []string - var values []string - for k := range b.conns { - keys = append(keys, k) - } - for _, v := range b.mapping { - values = append(values, v) - } - - b.listener.OnUpdate(keys, values, key) -} - -func (b *consistentBalancer) RemoveKey(key string) { - kv := KV{Key: key} - server, keep := b.removeKv(key) - kv.Val = server - bucketKey := b.bucketKey(kv) - b.buckets.Remove(b.bucketKey(kv)) - - // wrap the query & removal in a function to make sure the quick lock/unlock - conn, ok := func() (interface{}, bool) { - b.lock.Lock() - defer b.lock.Unlock() - - conn, ok := b.conns[bucketKey] - if ok { - delete(b.conns, bucketKey) - } - - return conn, ok - }() - if ok && !keep { - logx.Infof("removing server, key: %s", kv.Key) - if err := b.closeFn(server, conn); err != nil { - logx.Error(err) - } - } - - // notify without new key - b.notify("") -} - -func (b *consistentBalancer) IsEmpty() bool { - b.lock.Lock() - empty := len(b.conns) == 0 - b.lock.Unlock() - - return empty -} - -func (b *consistentBalancer) Next(keys ...string) (interface{}, bool) { - if len(keys) != 1 { - return nil, false - } - - key := keys[0] - if node, ok := b.buckets.Get(key); !ok { - return nil, false - } else { - return b.getConn(node.(string)) - } -} diff --git a/core/discov/internal/consistentbalancer_test.go b/core/discov/internal/consistentbalancer_test.go deleted file mode 100644 index 1f71d72d..00000000 --- a/core/discov/internal/consistentbalancer_test.go +++ /dev/null @@ -1,178 +0,0 @@ -package internal - -import ( - "errors" - "sort" - "strconv" - "testing" - - "zero/core/mathx" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" -) - -func TestConsistent_addConn(t *testing.T) { - b := NewConsistentBalancer(func(server string) (interface{}, error) { - return mockConn{ - server: server, - }, nil - }, func(server string, conn interface{}) error { - return errors.New("error") - }, func(kv KV) string { - return kv.Key - }) - assert.Nil(t, b.AddConn(KV{ - Key: "thekey1", - Val: "thevalue", - })) - assert.EqualValues(t, map[string]interface{}{ - "thekey1": mockConn{server: "thevalue"}, - }, b.conns) - assert.EqualValues(t, map[string][]string{ - "thevalue": {"thekey1"}, - }, b.servers) - assert.EqualValues(t, map[string]string{ - "thekey1": "thevalue", - }, b.mapping) - assert.Nil(t, b.AddConn(KV{ - Key: "thekey2", - Val: "thevalue", - })) - assert.EqualValues(t, map[string]interface{}{ - "thekey1": mockConn{server: "thevalue"}, - "thekey2": mockConn{server: "thevalue"}, - }, b.conns) - assert.EqualValues(t, map[string][]string{ - "thevalue": {"thekey1", "thekey2"}, - }, b.servers) - assert.EqualValues(t, map[string]string{ - "thekey1": "thevalue", - "thekey2": "thevalue", - }, b.mapping) - assert.False(t, b.IsEmpty()) - - b.RemoveKey("thekey1") - assert.EqualValues(t, map[string]interface{}{ - "thekey2": mockConn{server: "thevalue"}, - }, b.conns) - assert.EqualValues(t, map[string][]string{ - "thevalue": {"thekey2"}, - }, b.servers) - assert.EqualValues(t, map[string]string{ - "thekey2": "thevalue", - }, b.mapping) - assert.False(t, b.IsEmpty()) - - b.RemoveKey("thekey2") - assert.Equal(t, 0, len(b.conns)) - assert.EqualValues(t, map[string][]string{}, b.servers) - assert.EqualValues(t, map[string]string{}, b.mapping) - assert.True(t, b.IsEmpty()) -} - -func TestConsistent_addConnError(t *testing.T) { - b := NewConsistentBalancer(func(server string) (interface{}, error) { - return nil, errors.New("error") - }, func(server string, conn interface{}) error { - return nil - }, func(kv KV) string { - return kv.Key - }) - assert.NotNil(t, b.AddConn(KV{ - Key: "thekey1", - Val: "thevalue", - })) - assert.Equal(t, 0, len(b.conns)) - assert.EqualValues(t, map[string][]string{}, b.servers) - assert.EqualValues(t, map[string]string{}, b.mapping) -} - -func TestConsistent_next(t *testing.T) { - b := NewConsistentBalancer(func(server string) (interface{}, error) { - return mockConn{ - server: server, - }, nil - }, func(server string, conn interface{}) error { - return errors.New("error") - }, func(kv KV) string { - return kv.Key - }) - b.initialize() - - _, ok := b.Next("any") - assert.False(t, ok) - - const size = 100 - for i := 0; i < size; i++ { - assert.Nil(t, b.AddConn(KV{ - Key: "thekey/" + strconv.Itoa(i), - Val: "thevalue/" + strconv.Itoa(i), - })) - } - - m := make(map[interface{}]int) - const total = 10000 - for i := 0; i < total; i++ { - val, ok := b.Next(strconv.Itoa(i)) - assert.True(t, ok) - m[val]++ - } - - entropy := mathx.CalcEntropy(m) - assert.Equal(t, size, len(m)) - assert.True(t, entropy > .95) - - for i := 0; i < size; i++ { - b.RemoveKey("thekey/" + strconv.Itoa(i)) - } - _, ok = b.Next() - assert.False(t, ok) -} - -func TestConsistentBalancer_Listener(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - b := NewConsistentBalancer(func(server string) (interface{}, error) { - return mockConn{ - server: server, - }, nil - }, func(server string, conn interface{}) error { - return nil - }, func(kv KV) string { - return kv.Key - }) - assert.Nil(t, b.AddConn(KV{ - Key: "key1", - Val: "val1", - })) - assert.Nil(t, b.AddConn(KV{ - Key: "key2", - Val: "val2", - })) - - listener := NewMockListener(ctrl) - listener.EXPECT().OnUpdate(gomock.Any(), gomock.Any(), "key2").Do(func(keys, vals, _ interface{}) { - sort.Strings(keys.([]string)) - sort.Strings(vals.([]string)) - assert.EqualValues(t, []string{"key1", "key2"}, keys) - assert.EqualValues(t, []string{"val1", "val2"}, vals) - }) - b.setListener(listener) - b.notify("key2") -} - -func TestConsistentBalancer_remove(t *testing.T) { - b := NewConsistentBalancer(func(server string) (interface{}, error) { - return mockConn{ - server: server, - }, nil - }, func(server string, conn interface{}) error { - return nil - }, func(kv KV) string { - return kv.Key - }) - - assert.Nil(t, b.handlePrevious(nil)) - assert.Nil(t, b.handlePrevious([]string{"any"})) -} diff --git a/core/discov/internal/registry_test.go b/core/discov/internal/registry_test.go index 3a7c4d2d..95526e71 100644 --- a/core/discov/internal/registry_test.go +++ b/core/discov/internal/registry_test.go @@ -5,14 +5,13 @@ import ( "sync" "testing" - "go.etcd.io/etcd/mvcc/mvccpb" - "zero/core/contextx" "zero/core/stringx" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/mvcc/mvccpb" ) var mockLock sync.Mutex diff --git a/core/discov/internal/roundrobinbalancer.go b/core/discov/internal/roundrobinbalancer.go deleted file mode 100644 index c908efae..00000000 --- a/core/discov/internal/roundrobinbalancer.go +++ /dev/null @@ -1,148 +0,0 @@ -package internal - -import ( - "math/rand" - "time" - - "zero/core/logx" -) - -type roundRobinBalancer struct { - *baseBalancer - conns []serverConn - index int -} - -func NewRoundRobinBalancer(dialFn DialFn, closeFn CloseFn, exclusive bool) *roundRobinBalancer { - balancer := new(roundRobinBalancer) - balancer.baseBalancer = newBaseBalancer(dialFn, closeFn, exclusive) - return balancer -} - -func (b *roundRobinBalancer) AddConn(kv KV) error { - var conn interface{} - prev, found := b.addKv(kv.Key, kv.Val) - if found { - conn = b.handlePrevious(prev, kv.Val) - } - - if conn == nil { - var err error - conn, err = b.dialFn(kv.Val) - if err != nil { - b.removeKv(kv.Key) - return err - } - } - - b.lock.Lock() - defer b.lock.Unlock() - b.conns = append(b.conns, serverConn{ - key: kv.Key, - conn: conn, - }) - b.notify(kv.Key) - - return nil -} - -func (b *roundRobinBalancer) handlePrevious(prev []string, server string) interface{} { - if len(prev) == 0 { - return nil - } - - b.lock.Lock() - defer b.lock.Unlock() - - if b.exclusive { - for _, item := range prev { - conns := b.conns[:0] - for _, each := range b.conns { - if each.key == item { - if err := b.closeFn(server, each.conn); err != nil { - logx.Error(err) - } - } else { - conns = append(conns, each) - } - } - b.conns = conns - } - } else { - for _, each := range b.conns { - if each.key == prev[0] { - return each.conn - } - } - } - - return nil -} - -func (b *roundRobinBalancer) initialize() { - rand.Seed(time.Now().UnixNano()) - if len(b.conns) > 0 { - b.index = rand.Intn(len(b.conns)) - } -} - -func (b *roundRobinBalancer) IsEmpty() bool { - b.lock.Lock() - empty := len(b.conns) == 0 - b.lock.Unlock() - - return empty -} - -func (b *roundRobinBalancer) Next(...string) (interface{}, bool) { - b.lock.Lock() - defer b.lock.Unlock() - - if len(b.conns) == 0 { - return nil, false - } - - b.index = (b.index + 1) % len(b.conns) - return b.conns[b.index].conn, true -} - -func (b *roundRobinBalancer) notify(key string) { - if b.listener == nil { - return - } - - // b.servers has the format of map[conn][]key - var keys []string - var values []string - for k, v := range b.servers { - values = append(values, k) - keys = append(keys, v...) - } - - b.listener.OnUpdate(keys, values, key) -} - -func (b *roundRobinBalancer) RemoveKey(key string) { - server, keep := b.removeKv(key) - - b.lock.Lock() - defer b.lock.Unlock() - - conns := b.conns[:0] - for _, conn := range b.conns { - if conn.key == key { - // there are other keys assocated with the conn, don't close the conn. - if keep { - continue - } - if err := b.closeFn(server, conn.conn); err != nil { - logx.Error(err) - } - } else { - conns = append(conns, conn) - } - } - b.conns = conns - // notify without new key - b.notify("") -} diff --git a/core/discov/internal/roundrobinbalancer_test.go b/core/discov/internal/roundrobinbalancer_test.go deleted file mode 100644 index 80a1b896..00000000 --- a/core/discov/internal/roundrobinbalancer_test.go +++ /dev/null @@ -1,321 +0,0 @@ -package internal - -import ( - "errors" - "sort" - "strconv" - "testing" - - "zero/core/logx" - "zero/core/mathx" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" -) - -func init() { - logx.Disable() -} - -func TestRoundRobin_addConn(t *testing.T) { - b := NewRoundRobinBalancer(func(server string) (interface{}, error) { - return mockConn{ - server: server, - }, nil - }, func(server string, conn interface{}) error { - return errors.New("error") - }, false) - assert.Nil(t, b.AddConn(KV{ - Key: "thekey1", - Val: "thevalue", - })) - assert.EqualValues(t, []serverConn{ - { - key: "thekey1", - conn: mockConn{server: "thevalue"}, - }, - }, b.conns) - assert.EqualValues(t, map[string][]string{ - "thevalue": {"thekey1"}, - }, b.servers) - assert.EqualValues(t, map[string]string{ - "thekey1": "thevalue", - }, b.mapping) - assert.Nil(t, b.AddConn(KV{ - Key: "thekey2", - Val: "thevalue", - })) - assert.EqualValues(t, []serverConn{ - { - key: "thekey1", - conn: mockConn{server: "thevalue"}, - }, - { - key: "thekey2", - conn: mockConn{server: "thevalue"}, - }, - }, b.conns) - assert.EqualValues(t, map[string][]string{ - "thevalue": {"thekey1", "thekey2"}, - }, b.servers) - assert.EqualValues(t, map[string]string{ - "thekey1": "thevalue", - "thekey2": "thevalue", - }, b.mapping) - assert.False(t, b.IsEmpty()) - - b.RemoveKey("thekey1") - assert.EqualValues(t, []serverConn{ - { - key: "thekey2", - conn: mockConn{server: "thevalue"}, - }, - }, b.conns) - assert.EqualValues(t, map[string][]string{ - "thevalue": {"thekey2"}, - }, b.servers) - assert.EqualValues(t, map[string]string{ - "thekey2": "thevalue", - }, b.mapping) - assert.False(t, b.IsEmpty()) - - b.RemoveKey("thekey2") - assert.EqualValues(t, []serverConn{}, b.conns) - assert.EqualValues(t, map[string][]string{}, b.servers) - assert.EqualValues(t, map[string]string{}, b.mapping) - assert.True(t, b.IsEmpty()) -} - -func TestRoundRobin_addConnExclusive(t *testing.T) { - b := NewRoundRobinBalancer(func(server string) (interface{}, error) { - return mockConn{ - server: server, - }, nil - }, func(server string, conn interface{}) error { - return nil - }, true) - assert.Nil(t, b.AddConn(KV{ - Key: "thekey1", - Val: "thevalue", - })) - assert.EqualValues(t, []serverConn{ - { - key: "thekey1", - conn: mockConn{server: "thevalue"}, - }, - }, b.conns) - assert.EqualValues(t, map[string][]string{ - "thevalue": {"thekey1"}, - }, b.servers) - assert.EqualValues(t, map[string]string{ - "thekey1": "thevalue", - }, b.mapping) - assert.Nil(t, b.AddConn(KV{ - Key: "thekey2", - Val: "thevalue", - })) - assert.EqualValues(t, []serverConn{ - { - key: "thekey2", - conn: mockConn{server: "thevalue"}, - }, - }, b.conns) - assert.EqualValues(t, map[string][]string{ - "thevalue": {"thekey2"}, - }, b.servers) - assert.EqualValues(t, map[string]string{ - "thekey2": "thevalue", - }, b.mapping) - assert.False(t, b.IsEmpty()) - - b.RemoveKey("thekey1") - b.RemoveKey("thekey2") - assert.EqualValues(t, []serverConn{}, b.conns) - assert.EqualValues(t, map[string][]string{}, b.servers) - assert.EqualValues(t, map[string]string{}, b.mapping) - assert.True(t, b.IsEmpty()) -} - -func TestRoundRobin_addConnDupExclusive(t *testing.T) { - b := NewRoundRobinBalancer(func(server string) (interface{}, error) { - return mockConn{ - server: server, - }, nil - }, func(server string, conn interface{}) error { - return errors.New("error") - }, true) - assert.Nil(t, b.AddConn(KV{ - Key: "thekey1", - Val: "thevalue", - })) - assert.EqualValues(t, []serverConn{ - { - key: "thekey1", - conn: mockConn{server: "thevalue"}, - }, - }, b.conns) - assert.EqualValues(t, map[string][]string{ - "thevalue": {"thekey1"}, - }, b.servers) - assert.EqualValues(t, map[string]string{ - "thekey1": "thevalue", - }, b.mapping) - assert.Nil(t, b.AddConn(KV{ - Key: "thekey", - Val: "anothervalue", - })) - assert.Nil(t, b.AddConn(KV{ - Key: "thekey1", - Val: "thevalue", - })) - assert.EqualValues(t, []serverConn{ - { - key: "thekey", - conn: mockConn{server: "anothervalue"}, - }, - { - key: "thekey1", - conn: mockConn{server: "thevalue"}, - }, - }, b.conns) - assert.EqualValues(t, map[string][]string{ - "anothervalue": {"thekey"}, - "thevalue": {"thekey1"}, - }, b.servers) - assert.EqualValues(t, map[string]string{ - "thekey": "anothervalue", - "thekey1": "thevalue", - }, b.mapping) - assert.False(t, b.IsEmpty()) - - b.RemoveKey("thekey") - b.RemoveKey("thekey1") - assert.EqualValues(t, []serverConn{}, b.conns) - assert.EqualValues(t, map[string][]string{}, b.servers) - assert.EqualValues(t, map[string]string{}, b.mapping) - assert.True(t, b.IsEmpty()) -} - -func TestRoundRobin_addConnError(t *testing.T) { - b := NewRoundRobinBalancer(func(server string) (interface{}, error) { - return nil, errors.New("error") - }, func(server string, conn interface{}) error { - return nil - }, true) - assert.NotNil(t, b.AddConn(KV{ - Key: "thekey1", - Val: "thevalue", - })) - assert.Nil(t, b.conns) - assert.EqualValues(t, map[string][]string{}, b.servers) - assert.EqualValues(t, map[string]string{}, b.mapping) -} - -func TestRoundRobin_initialize(t *testing.T) { - b := NewRoundRobinBalancer(func(server string) (interface{}, error) { - return mockConn{ - server: server, - }, nil - }, func(server string, conn interface{}) error { - return nil - }, true) - for i := 0; i < 100; i++ { - assert.Nil(t, b.AddConn(KV{ - Key: "thekey/" + strconv.Itoa(i), - Val: "thevalue/" + strconv.Itoa(i), - })) - } - - m := make(map[int]int) - const total = 1000 - for i := 0; i < total; i++ { - b.initialize() - m[b.index]++ - } - - mi := make(map[interface{}]int, len(m)) - for k, v := range m { - mi[k] = v - } - entropy := mathx.CalcEntropy(mi) - assert.True(t, entropy > .95) -} - -func TestRoundRobin_next(t *testing.T) { - b := NewRoundRobinBalancer(func(server string) (interface{}, error) { - return mockConn{ - server: server, - }, nil - }, func(server string, conn interface{}) error { - return errors.New("error") - }, true) - const size = 100 - for i := 0; i < size; i++ { - assert.Nil(t, b.AddConn(KV{ - Key: "thekey/" + strconv.Itoa(i), - Val: "thevalue/" + strconv.Itoa(i), - })) - } - - m := make(map[interface{}]int) - const total = 10000 - for i := 0; i < total; i++ { - val, ok := b.Next() - assert.True(t, ok) - m[val]++ - } - - entropy := mathx.CalcEntropy(m) - assert.Equal(t, size, len(m)) - assert.True(t, entropy > .95) - - for i := 0; i < size; i++ { - b.RemoveKey("thekey/" + strconv.Itoa(i)) - } - _, ok := b.Next() - assert.False(t, ok) -} - -func TestRoundRobinBalancer_Listener(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - b := NewRoundRobinBalancer(func(server string) (interface{}, error) { - return mockConn{ - server: server, - }, nil - }, func(server string, conn interface{}) error { - return nil - }, true) - assert.Nil(t, b.AddConn(KV{ - Key: "key1", - Val: "val1", - })) - assert.Nil(t, b.AddConn(KV{ - Key: "key2", - Val: "val2", - })) - - listener := NewMockListener(ctrl) - listener.EXPECT().OnUpdate(gomock.Any(), gomock.Any(), "key2").Do(func(keys, vals, _ interface{}) { - sort.Strings(vals.([]string)) - sort.Strings(keys.([]string)) - assert.EqualValues(t, []string{"key1", "key2"}, keys) - assert.EqualValues(t, []string{"val1", "val2"}, vals) - }) - b.setListener(listener) - b.notify("key2") -} - -func TestRoundRobinBalancer_remove(t *testing.T) { - b := NewRoundRobinBalancer(func(server string) (interface{}, error) { - return mockConn{ - server: server, - }, nil - }, func(server string, conn interface{}) error { - return nil - }, true) - - assert.Nil(t, b.handlePrevious(nil, "any")) - _, ok := b.doRemoveKv("any") - assert.True(t, ok) -} diff --git a/core/discov/renewer.go b/core/discov/renewer.go deleted file mode 100644 index c0d56ba5..00000000 --- a/core/discov/renewer.go +++ /dev/null @@ -1,35 +0,0 @@ -package discov - -import "zero/core/logx" - -type ( - Renewer interface { - Start() - Stop() - Pause() - Resume() - } - - etcdRenewer struct { - *Publisher - } -) - -func NewRenewer(endpoints []string, key, value string, renewId int64) Renewer { - var publisher *Publisher - if renewId > 0 { - publisher = NewPublisher(endpoints, key, value, WithId(renewId)) - } else { - publisher = NewPublisher(endpoints, key, value) - } - - return &etcdRenewer{ - Publisher: publisher, - } -} - -func (sr *etcdRenewer) Start() { - if err := sr.KeepAlive(); err != nil { - logx.Error(err) - } -} diff --git a/core/discov/subclient.go b/core/discov/subclient.go deleted file mode 100644 index c3290d30..00000000 --- a/core/discov/subclient.go +++ /dev/null @@ -1,186 +0,0 @@ -package discov - -import ( - "sync" - - "zero/core/discov/internal" - "zero/core/logx" -) - -const ( - _ = iota // keyBasedBalance, default - idBasedBalance -) - -type ( - Listener internal.Listener - - subClient struct { - balancer internal.Balancer - lock sync.Mutex - cond *sync.Cond - listeners []internal.Listener - } - - balanceOptions struct { - balanceType int - } - - BalanceOption func(*balanceOptions) - - RoundRobinSubClient struct { - *subClient - } - - ConsistentSubClient struct { - *subClient - } - - BatchConsistentSubClient struct { - *ConsistentSubClient - } -) - -func NewRoundRobinSubClient(endpoints []string, key string, dialFn internal.DialFn, closeFn internal.CloseFn, - opts ...SubOption) (*RoundRobinSubClient, error) { - var subOpts subOptions - for _, opt := range opts { - opt(&subOpts) - } - - cli, err := newSubClient(endpoints, key, internal.NewRoundRobinBalancer(dialFn, closeFn, subOpts.exclusive)) - if err != nil { - return nil, err - } - - return &RoundRobinSubClient{ - subClient: cli, - }, nil -} - -func NewConsistentSubClient(endpoints []string, key string, dialFn internal.DialFn, - closeFn internal.CloseFn, opts ...BalanceOption) (*ConsistentSubClient, error) { - var balanceOpts balanceOptions - for _, opt := range opts { - opt(&balanceOpts) - } - - var keyer func(internal.KV) string - switch balanceOpts.balanceType { - case idBasedBalance: - keyer = func(kv internal.KV) string { - if id, ok := extractId(kv.Key); ok { - return id - } else { - return kv.Key - } - } - default: - keyer = func(kv internal.KV) string { - return kv.Val - } - } - - cli, err := newSubClient(endpoints, key, internal.NewConsistentBalancer(dialFn, closeFn, keyer)) - if err != nil { - return nil, err - } - - return &ConsistentSubClient{ - subClient: cli, - }, nil -} - -func NewBatchConsistentSubClient(endpoints []string, key string, dialFn internal.DialFn, closeFn internal.CloseFn, - opts ...BalanceOption) (*BatchConsistentSubClient, error) { - cli, err := NewConsistentSubClient(endpoints, key, dialFn, closeFn, opts...) - if err != nil { - return nil, err - } - - return &BatchConsistentSubClient{ - ConsistentSubClient: cli, - }, nil -} - -func newSubClient(endpoints []string, key string, balancer internal.Balancer) (*subClient, error) { - client := &subClient{ - balancer: balancer, - } - client.cond = sync.NewCond(&client.lock) - if err := internal.GetRegistry().Monitor(endpoints, key, client); err != nil { - return nil, err - } - - return client, nil -} - -func (c *subClient) AddListener(listener internal.Listener) { - c.lock.Lock() - c.listeners = append(c.listeners, listener) - c.lock.Unlock() -} - -func (c *subClient) OnAdd(kv internal.KV) { - c.lock.Lock() - defer c.lock.Unlock() - - if err := c.balancer.AddConn(kv); err != nil { - logx.Error(err) - } else { - c.cond.Broadcast() - } -} - -func (c *subClient) OnDelete(kv internal.KV) { - c.balancer.RemoveKey(kv.Key) -} - -func (c *subClient) WaitForServers() { - logx.Error("Waiting for alive servers") - c.lock.Lock() - defer c.lock.Unlock() - - if c.balancer.IsEmpty() { - c.cond.Wait() - } -} - -func (c *subClient) onAdd(keys []string, servers []string, newKey string) { - // guarded by locked outside - for _, listener := range c.listeners { - listener.OnUpdate(keys, servers, newKey) - } -} - -func (c *RoundRobinSubClient) Next() (interface{}, bool) { - return c.balancer.Next() -} - -func (c *ConsistentSubClient) Next(key string) (interface{}, bool) { - return c.balancer.Next(key) -} - -func (bc *BatchConsistentSubClient) Next(keys []string) (map[interface{}][]string, bool) { - if len(keys) == 0 { - return nil, false - } - - result := make(map[interface{}][]string) - for _, key := range keys { - dest, ok := bc.ConsistentSubClient.Next(key) - if !ok { - return nil, false - } - - result[dest] = append(result[dest], key) - } - - return result, true -} - -func BalanceWithId() BalanceOption { - return func(opts *balanceOptions) { - opts.balanceType = idBasedBalance - } -}