You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-zero/rpcx/internal/rpcsubclient.go

103 lines
2.4 KiB
Go

package internal
import (
"time"
"zero/core/discov"
"zero/core/logx"
"zero/core/threading"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)
const (
coolOffTime = time.Second * 5
retryTimes = 3
)
type (
RoundRobinSubClient struct {
*discov.RoundRobinSubClient
}
ConsistentSubClient struct {
*discov.ConsistentSubClient
}
)
func NewRoundRobinRpcClient(endpoints []string, key string, opts ...ClientOption) (*RoundRobinSubClient, error) {
subClient, err := discov.NewRoundRobinSubClient(endpoints, key, func(server string) (interface{}, error) {
return dial(server, opts...)
}, func(server string, conn interface{}) error {
return closeConn(conn.(*grpc.ClientConn))
}, discov.Exclusive())
if err != nil {
return nil, err
} else {
return &RoundRobinSubClient{subClient}, nil
}
}
func NewConsistentRpcClient(endpoints []string, key string, opts ...ClientOption) (*ConsistentSubClient, error) {
subClient, err := discov.NewConsistentSubClient(endpoints, key, func(server string) (interface{}, error) {
return dial(server, opts...)
}, func(server string, conn interface{}) error {
return closeConn(conn.(*grpc.ClientConn))
})
if err != nil {
return nil, err
} else {
return &ConsistentSubClient{subClient}, nil
}
}
func (cli *RoundRobinSubClient) Next() (*grpc.ClientConn, bool) {
return next(func() (interface{}, bool) {
return cli.RoundRobinSubClient.Next()
})
}
func (cli *ConsistentSubClient) Next(key string) (*grpc.ClientConn, bool) {
return next(func() (interface{}, bool) {
return cli.ConsistentSubClient.Next(key)
})
}
func closeConn(conn *grpc.ClientConn) error {
// why to close the conn asynchronously is because maybe another goroutine
// is using the same conn, we can wait the coolOffTime to let the other
// goroutine to finish using the conn.
// after the conn unregistered, the balancer will not assign the conn,
// but maybe the already assigned tasks are still using it.
threading.GoSafe(func() {
time.Sleep(coolOffTime)
if err := conn.Close(); err != nil {
logx.Error(err)
}
})
return nil
}
func next(nextFn func() (interface{}, bool)) (*grpc.ClientConn, bool) {
for i := 0; i < retryTimes; i++ {
v, ok := nextFn()
if !ok {
break
}
conn, yes := v.(*grpc.ClientConn)
if !yes {
break
}
switch conn.GetState() {
case connectivity.Ready:
return conn, true
}
}
return nil, false
}