diff --git a/core/discov/config.go b/core/discov/config.go index 1557b4f8..66c088b6 100644 --- a/core/discov/config.go +++ b/core/discov/config.go @@ -13,6 +13,7 @@ var ( type EtcdConf struct { Hosts []string Key string + ServerID int64 `json:",optional"` User string `json:",optional"` Pass string `json:",optional"` CertFile string `json:",optional"` @@ -26,6 +27,11 @@ func (c EtcdConf) HasAccount() bool { return len(c.User) > 0 && len(c.Pass) > 0 } +// HasServerID returns if ServerID provided. +func (c EtcdConf) HasServerID() bool { + return c.ServerID > 0 +} + // HasTLS returns if TLS CertFile/CertKeyFile/CACertFile are provided. func (c EtcdConf) HasTLS() bool { return len(c.CertFile) > 0 && len(c.CertKeyFile) > 0 && len(c.CACertFile) > 0 diff --git a/core/discov/config_test.go b/core/discov/config_test.go index 63502713..732f7b55 100644 --- a/core/discov/config_test.go +++ b/core/discov/config_test.go @@ -80,3 +80,36 @@ func TestEtcdConf_HasAccount(t *testing.T) { assert.Equal(t, test.hasAccount, test.EtcdConf.HasAccount()) } } + +func TestEtcdConf_HasServerID(t *testing.T) { + tests := []struct { + EtcdConf + hasServerID bool + }{ + { + EtcdConf: EtcdConf{ + Hosts: []string{"any"}, + ServerID: -1, + }, + hasServerID: false, + }, + { + EtcdConf: EtcdConf{ + Hosts: []string{"any"}, + ServerID: 0, + }, + hasServerID: false, + }, + { + EtcdConf: EtcdConf{ + Hosts: []string{"any"}, + ServerID: 10000, + }, + hasServerID: true, + }, + } + + for _, test := range tests { + assert.Equal(t, test.hasServerID, test.EtcdConf.HasServerID()) + } +} diff --git a/zrpc/internal/clientinterceptors/tracinginterceptor.go b/zrpc/internal/clientinterceptors/tracinginterceptor.go index 006c4691..adf814b3 100644 --- a/zrpc/internal/clientinterceptors/tracinginterceptor.go +++ b/zrpc/internal/clientinterceptors/tracinginterceptor.go @@ -2,6 +2,7 @@ package clientinterceptors import ( "context" + "github.com/zeromicro/go-zero/core/lang" "io" ztrace "github.com/zeromicro/go-zero/core/trace" @@ -94,7 +95,7 @@ type ( Finished chan error desc *grpc.StreamDesc events chan streamEvent - eventsDone chan struct{} + eventsDone chan lang.PlaceholderType receivedMessageID int sentMessageID int } diff --git a/zrpc/internal/rpcpubserver.go b/zrpc/internal/rpcpubserver.go index 61b79a7d..58f6a9b3 100644 --- a/zrpc/internal/rpcpubserver.go +++ b/zrpc/internal/rpcpubserver.go @@ -26,6 +26,9 @@ func NewRpcPubServer(etcd discov.EtcdConf, listenOn string, middlewares ServerMi pubOpts = append(pubOpts, discov.WithPubEtcdTLS(etcd.CertFile, etcd.CertKeyFile, etcd.CACertFile, etcd.InsecureSkipVerify)) } + if etcd.HasServerID() { + pubOpts = append(pubOpts, discov.WithId(etcd.ServerID)) + } pubClient := discov.NewPublisher(etcd.Hosts, etcd.Key, pubListenOn, pubOpts...) return pubClient.KeepAlive() }