From c3756a8f1ca43aa28a079ead5e930ffab7c86ed6 Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Tue, 27 Dec 2022 20:03:03 +0800 Subject: [PATCH] fix: etcd publisher reconnecting problem (#2710) * fix: etcd publisher reconnecting problem * chore: fix wrong call --- core/discov/publisher.go | 56 ++++++++++++++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/core/discov/publisher.go b/core/discov/publisher.go index f6ccfa21..6d842012 100644 --- a/core/discov/publisher.go +++ b/core/discov/publisher.go @@ -1,6 +1,8 @@ package discov import ( + "time" + "github.com/zeromicro/go-zero/core/discov/internal" "github.com/zeromicro/go-zero/core/lang" "github.com/zeromicro/go-zero/core/logx" @@ -51,12 +53,7 @@ func NewPublisher(endpoints []string, key, value string, opts ...PubOption) *Pub // KeepAlive keeps key:value alive. func (p *Publisher) KeepAlive() error { - cli, err := internal.GetRegistry().GetConn(p.endpoints) - if err != nil { - return err - } - - p.lease, err = p.register(cli) + cli, err := p.doRegister() if err != nil { return err } @@ -83,6 +80,43 @@ func (p *Publisher) Stop() { p.quit.Close() } +func (p *Publisher) doKeepAlive() error { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for range ticker.C { + select { + case <-p.quit.Done(): + return nil + default: + cli, err := p.doRegister() + if err != nil { + logx.Errorf("etcd publisher doRegister: %s", err.Error()) + break + } + + if err := p.keepAliveAsync(cli); err != nil { + logx.Errorf("etcd publisher keepAliveAsync: %s", err.Error()) + break + } + + return nil + } + } + + return nil +} + +func (p *Publisher) doRegister() (internal.EtcdClient, error) { + cli, err := internal.GetRegistry().GetConn(p.endpoints) + if err != nil { + return nil, err + } + + p.lease, err = p.register(cli) + return cli, err +} + func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error { ch, err := cli.KeepAlive(cli.Ctx(), p.lease) if err != nil { @@ -95,8 +129,8 @@ func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error { case _, ok := <-ch: if !ok { p.revoke(cli) - if err := p.KeepAlive(); err != nil { - logx.Errorf("KeepAlive: %s", err.Error()) + if err := p.doKeepAlive(); err != nil { + logx.Errorf("etcd publisher KeepAlive: %s", err.Error()) } return } @@ -105,8 +139,8 @@ func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error { p.revoke(cli) select { case <-p.resumeChan: - if err := p.KeepAlive(); err != nil { - logx.Errorf("KeepAlive: %s", err.Error()) + if err := p.doKeepAlive(); err != nil { + logx.Errorf("etcd publisher KeepAlive: %s", err.Error()) } return case <-p.quit.Done(): @@ -141,7 +175,7 @@ func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, erro func (p *Publisher) revoke(cli internal.EtcdClient) { if _, err := cli.Revoke(cli.Ctx(), p.lease); err != nil { - logx.Error(err) + logx.Errorf("etcd publisher revoke: %s", err.Error()) } }