You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-zero/zrpc/internal/client.go

114 lines
2.7 KiB
Go

4 years ago
package internal
4 years ago
import (
"context"
"errors"
4 years ago
"fmt"
"strings"
4 years ago
"time"
"github.com/tal-tech/go-zero/zrpc/internal/balancer/p2c"
"github.com/tal-tech/go-zero/zrpc/internal/clientinterceptors"
"github.com/tal-tech/go-zero/zrpc/internal/resolver"
4 years ago
"google.golang.org/grpc"
)
const (
dialTimeout = time.Second * 3
separator = '/'
)
4 years ago
func init() {
resolver.RegisterResolver()
}
4 years ago
type (
ClientOptions struct {
Timeout time.Duration
DialOptions []grpc.DialOption
}
ClientOption func(options *ClientOptions)
client struct {
conn *grpc.ClientConn
interceptors []grpc.UnaryClientInterceptor
}
4 years ago
)
func NewClient(target string, opts ...ClientOption) (*client, error) {
var cli client
opts = append(opts, WithDialOption(grpc.WithBalancerName(p2c.Name)))
if err := cli.dial(target, opts...); err != nil {
return nil, err
}
return &cli, nil
}
func (c *client) AddInterceptor(interceptor grpc.UnaryClientInterceptor) {
c.interceptors = append(c.interceptors, interceptor)
4 years ago
}
func (c *client) Conn() *grpc.ClientConn {
return c.conn
4 years ago
}
func (c *client) buildDialOptions(opts ...ClientOption) []grpc.DialOption {
4 years ago
var clientOptions ClientOptions
for _, opt := range opts {
opt(&clientOptions)
}
options := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithBlock(),
WithUnaryClientInterceptors(
clientinterceptors.TracingInterceptor,
4 years ago
clientinterceptors.DurationInterceptor,
clientinterceptors.BreakerInterceptor,
clientinterceptors.PrometheusInterceptor,
4 years ago
clientinterceptors.TimeoutInterceptor(clientOptions.Timeout),
),
}
for _, interceptor := range c.interceptors {
options = append(options, WithUnaryClientInterceptors(interceptor))
}
4 years ago
return append(options, clientOptions.DialOptions...)
}
func (c *client) dial(server string, opts ...ClientOption) error {
options := c.buildDialOptions(opts...)
4 years ago
timeCtx, cancel := context.WithTimeout(context.Background(), dialTimeout)
defer cancel()
conn, err := grpc.DialContext(timeCtx, server, options...)
if err != nil {
service := server
if errors.Is(err, context.DeadlineExceeded) {
pos := strings.LastIndexByte(server, separator)
// len(server) - 1 is the index of last char
if 0 < pos && pos < len(server)-1 {
service = server[pos+1:]
}
}
return fmt.Errorf("rpc dial: %s, error: %s, make sure rpc service %q is alread started",
server, err.Error(), service)
4 years ago
}
c.conn = conn
return nil
}
func WithDialOption(opt grpc.DialOption) ClientOption {
return func(options *ClientOptions) {
options.DialOptions = append(options.DialOptions, opt)
}
}
func WithTimeout(timeout time.Duration) ClientOption {
return func(options *ClientOptions) {
options.Timeout = timeout
}
4 years ago
}