|
|
@ -2,6 +2,7 @@ package internal
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"context"
|
|
|
|
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"io"
|
|
|
|
"sort"
|
|
|
|
"sort"
|
|
|
@ -14,6 +15,7 @@ import (
|
|
|
|
"github.com/zeromicro/go-zero/core/logx"
|
|
|
|
"github.com/zeromicro/go-zero/core/logx"
|
|
|
|
"github.com/zeromicro/go-zero/core/syncx"
|
|
|
|
"github.com/zeromicro/go-zero/core/syncx"
|
|
|
|
"github.com/zeromicro/go-zero/core/threading"
|
|
|
|
"github.com/zeromicro/go-zero/core/threading"
|
|
|
|
|
|
|
|
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
|
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
@ -288,13 +290,18 @@ func (c *cluster) reload(cli EtcdClient) {
|
|
|
|
|
|
|
|
|
|
|
|
func (c *cluster) watch(cli EtcdClient, key string, rev int64) {
|
|
|
|
func (c *cluster) watch(cli EtcdClient, key string, rev int64) {
|
|
|
|
for {
|
|
|
|
for {
|
|
|
|
if c.watchStream(cli, key, rev) {
|
|
|
|
err := c.watchStream(cli, key, rev)
|
|
|
|
|
|
|
|
if err == nil {
|
|
|
|
return
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if rev != 0 && errors.Is(err, v3rpc.ErrCompacted) {
|
|
|
|
|
|
|
|
logx.Errorf("etcd watch stream has been compacted, try to reload, rev %v", rev)
|
|
|
|
|
|
|
|
rev = c.load(cli, key)
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) bool {
|
|
|
|
func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) error {
|
|
|
|
var rch clientv3.WatchChan
|
|
|
|
var rch clientv3.WatchChan
|
|
|
|
if rev != 0 {
|
|
|
|
if rev != 0 {
|
|
|
|
rch = cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix(),
|
|
|
|
rch = cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix(),
|
|
|
@ -308,20 +315,20 @@ func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) bool {
|
|
|
|
case wresp, ok := <-rch:
|
|
|
|
case wresp, ok := <-rch:
|
|
|
|
if !ok {
|
|
|
|
if !ok {
|
|
|
|
logx.Error("etcd monitor chan has been closed")
|
|
|
|
logx.Error("etcd monitor chan has been closed")
|
|
|
|
return false
|
|
|
|
return errors.New("etcd monitor chan has been closed")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if wresp.Canceled {
|
|
|
|
if wresp.Canceled {
|
|
|
|
logx.Errorf("etcd monitor chan has been canceled, error: %v", wresp.Err())
|
|
|
|
logx.Errorf("etcd monitor chan has been canceled, error: %v", wresp.Err())
|
|
|
|
return false
|
|
|
|
return wresp.Err()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if wresp.Err() != nil {
|
|
|
|
if wresp.Err() != nil {
|
|
|
|
logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
|
|
|
|
logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
|
|
|
|
return false
|
|
|
|
return wresp.Err()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
c.handleWatchEvents(key, wresp.Events)
|
|
|
|
c.handleWatchEvents(key, wresp.Events)
|
|
|
|
case <-c.done:
|
|
|
|
case <-c.done:
|
|
|
|
return true
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|