From d828c3f37e60b2fdbab557bedfb41686ea463b90 Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Sun, 28 Nov 2021 20:08:18 +0800 Subject: [PATCH] feat: add etcd resolver scheme, fix discov minor issue (#1281) --- core/discov/internal/registry.go | 39 ++++++++++++++++++++----- core/discov/internal/registry_test.go | 6 ++-- zrpc/internal/resolver/discovbuilder.go | 2 +- zrpc/internal/resolver/etcdbuilder.go | 9 ++++++ zrpc/internal/resolver/resolver.go | 16 ++++++---- 5 files changed, 54 insertions(+), 18 deletions(-) create mode 100644 zrpc/internal/resolver/etcdbuilder.go diff --git a/core/discov/internal/registry.go b/core/discov/internal/registry.go index 5ea83bde..99d2115a 100644 --- a/core/discov/internal/registry.go +++ b/core/discov/internal/registry.go @@ -37,25 +37,35 @@ func GetRegistry() *Registry { // GetConn returns an etcd client connection associated with given endpoints. func (r *Registry) GetConn(endpoints []string) (EtcdClient, error) { - return r.getCluster(endpoints).getClient() + c, _ := r.getCluster(endpoints) + return c.getClient() } // Monitor monitors the key on given etcd endpoints, notify with the given UpdateListener. func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener) error { - return r.getCluster(endpoints).monitor(key, l) + c, exists := r.getCluster(endpoints) + // if exists, the existing values should be updated to the listener. + if exists { + kvs := c.getCurrent(key) + for _, kv := range kvs { + l.OnAdd(kv) + } + } + + return c.monitor(key, l) } -func (r *Registry) getCluster(endpoints []string) *cluster { +func (r *Registry) getCluster(endpoints []string) (c *cluster, exists bool) { clusterKey := getClusterKey(endpoints) r.lock.Lock() defer r.lock.Unlock() - c, ok := r.clusters[clusterKey] - if !ok { + c, exists = r.clusters[clusterKey] + if !exists { c = newCluster(endpoints) r.clusters[clusterKey] = c } - return c + return } type cluster struct { @@ -94,6 +104,21 @@ func (c *cluster) getClient() (EtcdClient, error) { return val.(EtcdClient), nil } +func (c *cluster) getCurrent(key string) []KV { + c.lock.Lock() + defer c.lock.Unlock() + + var kvs []KV + for k, v := range c.values[key] { + kvs = append(kvs, KV{ + Key: k, + Val: v, + }) + } + + return kvs +} + func (c *cluster) handleChanges(key string, kvs []KV) { var add []KV var remove []KV @@ -197,14 +222,12 @@ func (c *cluster) load(cli EtcdClient, key string) { } var kvs []KV - c.lock.Lock() for _, ev := range resp.Kvs { kvs = append(kvs, KV{ Key: string(ev.Key), Val: string(ev.Value), }) } - c.lock.Unlock() c.handleChanges(key, kvs) } diff --git a/core/discov/internal/registry_test.go b/core/discov/internal/registry_test.go index 8340a75f..21def4d6 100644 --- a/core/discov/internal/registry_test.go +++ b/core/discov/internal/registry_test.go @@ -34,9 +34,9 @@ func setMockClient(cli EtcdClient) func() { func TestGetCluster(t *testing.T) { AddAccount([]string{"first"}, "foo", "bar") - c1 := GetRegistry().getCluster([]string{"first"}) - c2 := GetRegistry().getCluster([]string{"second"}) - c3 := GetRegistry().getCluster([]string{"first"}) + c1, _ := GetRegistry().getCluster([]string{"first"}) + c2, _ := GetRegistry().getCluster([]string{"second"}) + c3, _ := GetRegistry().getCluster([]string{"first"}) assert.Equal(t, c1, c3) assert.NotEqual(t, c1, c2) } diff --git a/zrpc/internal/resolver/discovbuilder.go b/zrpc/internal/resolver/discovbuilder.go index 8b153659..2f362e94 100644 --- a/zrpc/internal/resolver/discovbuilder.go +++ b/zrpc/internal/resolver/discovbuilder.go @@ -10,7 +10,7 @@ import ( type discovBuilder struct{} -func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) ( +func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) ( resolver.Resolver, error) { hosts := strings.FieldsFunc(target.Authority, func(r rune) bool { return r == EndpointSepChar diff --git a/zrpc/internal/resolver/etcdbuilder.go b/zrpc/internal/resolver/etcdbuilder.go new file mode 100644 index 00000000..954e9d07 --- /dev/null +++ b/zrpc/internal/resolver/etcdbuilder.go @@ -0,0 +1,9 @@ +package resolver + +type etcdBuilder struct { + discovBuilder +} + +func (b *etcdBuilder) Scheme() string { + return EtcdScheme +} diff --git a/zrpc/internal/resolver/resolver.go b/zrpc/internal/resolver/resolver.go index fd7611be..162e841d 100644 --- a/zrpc/internal/resolver/resolver.go +++ b/zrpc/internal/resolver/resolver.go @@ -11,6 +11,8 @@ const ( DirectScheme = "direct" // DiscovScheme stands for discov scheme. DiscovScheme = "discov" + // EtcdScheme stands for etcd scheme. + EtcdScheme = "etcd" // KubernetesScheme stands for k8s scheme. KubernetesScheme = "k8s" // EndpointSepChar is the separator cha in endpoints. @@ -23,16 +25,18 @@ var ( // EndpointSep is the separator string in endpoints. EndpointSep = fmt.Sprintf("%c", EndpointSepChar) - dirBuilder directBuilder - disBuilder discovBuilder - k8sBuilder kubeBuilder + directResolverBuilder directBuilder + discovResolverBuilder discovBuilder + etcdResolverBuilder etcdBuilder + k8sResolverBuilder kubeBuilder ) // RegisterResolver registers the direct and discov schemes to the resolver. func RegisterResolver() { - resolver.Register(&dirBuilder) - resolver.Register(&disBuilder) - resolver.Register(&k8sBuilder) + resolver.Register(&directResolverBuilder) + resolver.Register(&discovResolverBuilder) + resolver.Register(&etcdResolverBuilder) + resolver.Register(&k8sResolverBuilder) } type nopResolver struct {