fix bug that etcd stream cancelled without re-watch (#770)

master
Kevin Wan 3 years ago committed by GitHub
parent ea4f680052
commit 74ee163761
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -260,26 +260,34 @@ func (c *cluster) reload(cli EtcdClient) {
} }
func (c *cluster) watch(cli EtcdClient, key string) { func (c *cluster) watch(cli EtcdClient, key string) {
for {
if c.watchStream(cli, key) {
return
}
}
}
func (c *cluster) watchStream(cli EtcdClient, key string) bool {
rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix()) rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
for { for {
select { select {
case wresp, ok := <-rch: case wresp, ok := <-rch:
if !ok { if !ok {
logx.Error("etcd monitor chan has been closed") logx.Error("etcd monitor chan has been closed")
return return false
} }
if wresp.Canceled { if wresp.Canceled {
logx.Error("etcd monitor chan has been canceled") logx.Errorf("etcd monitor chan has been canceled, error: %v", wresp.Err())
return return false
} }
if wresp.Err() != nil { if wresp.Err() != nil {
logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err())) logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
return return false
} }
c.handleWatchEvents(key, wresp.Events) c.handleWatchEvents(key, wresp.Events)
case <-c.done: case <-c.done:
return return true
} }
} }
} }

@ -8,6 +8,7 @@ import (
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/contextx" "github.com/tal-tech/go-zero/core/contextx"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stringx" "github.com/tal-tech/go-zero/core/stringx"
"go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3"
@ -202,11 +203,13 @@ func TestClusterWatch_RespFailures(t *testing.T) {
restore := setMockClient(cli) restore := setMockClient(cli)
defer restore() defer restore()
ch := make(chan clientv3.WatchResponse) ch := make(chan clientv3.WatchResponse)
cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch) cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch).AnyTimes()
cli.EXPECT().Ctx().Return(context.Background()).AnyTimes() cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
c := new(cluster) c := new(cluster)
c.done = make(chan lang.PlaceholderType)
go func() { go func() {
ch <- resp ch <- resp
close(c.done)
}() }()
c.watch(cli, "any") c.watch(cli, "any")
}) })
@ -220,11 +223,13 @@ func TestClusterWatch_CloseChan(t *testing.T) {
restore := setMockClient(cli) restore := setMockClient(cli)
defer restore() defer restore()
ch := make(chan clientv3.WatchResponse) ch := make(chan clientv3.WatchResponse)
cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch) cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch).AnyTimes()
cli.EXPECT().Ctx().Return(context.Background()).AnyTimes() cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
c := new(cluster) c := new(cluster)
c.done = make(chan lang.PlaceholderType)
go func() { go func() {
close(ch) close(ch)
close(c.done)
}() }()
c.watch(cli, "any") c.watch(cli, "any")
} }

Loading…
Cancel
Save