From 48625fa3810aa254c0e88c8e3b8f8e1554d41baa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E7=99=BB=E5=AF=8C?= <757431363@qq.com> Date: Sun, 17 Dec 2023 13:28:19 +0800 Subject: [PATCH] fix endless loop caused by ErrCompacted (#3774) Co-authored-by: lidengfu --- core/discov/internal/registry.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/core/discov/internal/registry.go b/core/discov/internal/registry.go index 97dd97aa..3d53ce3b 100644 --- a/core/discov/internal/registry.go +++ b/core/discov/internal/registry.go @@ -2,6 +2,7 @@ package internal import ( "context" + "errors" "fmt" "io" "sort" @@ -14,6 +15,7 @@ import ( "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/threading" + v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -288,13 +290,18 @@ func (c *cluster) reload(cli EtcdClient) { func (c *cluster) watch(cli EtcdClient, key string, rev int64) { for { - if c.watchStream(cli, key, rev) { + err := c.watchStream(cli, key, rev) + if err == nil { return } + if rev != 0 && errors.Is(err, v3rpc.ErrCompacted) { + logx.Errorf("etcd watch stream has been compacted, try to reload, rev %v", rev) + rev = c.load(cli, key) + } } } -func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) bool { +func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) error { var rch clientv3.WatchChan if rev != 0 { rch = cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix(), @@ -308,20 +315,20 @@ func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) bool { case wresp, ok := <-rch: if !ok { logx.Error("etcd monitor chan has been closed") - return false + return errors.New("etcd monitor chan has been closed") } if wresp.Canceled { logx.Errorf("etcd monitor chan has been canceled, error: %v", wresp.Err()) - return false + return wresp.Err() } if wresp.Err() != nil { logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err())) - return false + return wresp.Err() } c.handleWatchEvents(key, wresp.Events) case <-c.done: - return true + return nil } } }