fix golint issues in zrpc (#531)

master
Kevin Wan 4 years ago committed by GitHub
parent dd393351cc
commit 51de0d0620
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -11,23 +11,27 @@ import (
)
var (
// WithDialOption is an alias of internal.WithDialOption.
WithDialOption = internal.WithDialOption
// WithTimeout is an alias of internal.WithTimeout.
WithTimeout = internal.WithTimeout
// WithUnaryClientInterceptor is an alias of internal.WithUnaryClientInterceptor.
WithUnaryClientInterceptor = internal.WithUnaryClientInterceptor
)
type (
// Client is an alias of internal.Client.
Client = internal.Client
// ClientOption is an alias of internal.ClientOption.
ClientOption = internal.ClientOption
Client interface {
Conn() *grpc.ClientConn
}
// A RpcClient is a rpc client.
RpcClient struct {
client Client
}
)
// MustNewClient returns a Client, exits on any error.
func MustNewClient(c RpcClientConf, options ...ClientOption) Client {
cli, err := NewClient(c, options...)
if err != nil {
@ -37,6 +41,7 @@ func MustNewClient(c RpcClientConf, options ...ClientOption) Client {
return cli
}
// NewClient returns a Client.
func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {
var opts []ClientOption
if c.HasCredential() {
@ -66,6 +71,7 @@ func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {
}, nil
}
// NewClientNoAuth returns a Client without authentication.
func NewClientNoAuth(c discov.EtcdConf, opts ...ClientOption) (Client, error) {
client, err := internal.NewClient(internal.BuildDiscovTarget(c.Hosts, c.Key), opts...)
if err != nil {
@ -77,10 +83,12 @@ func NewClientNoAuth(c discov.EtcdConf, opts ...ClientOption) (Client, error) {
}, nil
}
// NewClientWithTarget returns a Client with connecting to given target.
func NewClientWithTarget(target string, opts ...ClientOption) (Client, error) {
return internal.NewClient(target, opts...)
}
// Conn returns the underlying grpc.ClientConn.
func (rc *RpcClient) Conn() *grpc.ClientConn {
return rc.client.Conn()
}

@ -7,6 +7,7 @@ import (
)
type (
// A RpcServerConf is a rpc server config.
RpcServerConf struct {
service.ServiceConf
ListenOn string
@ -19,6 +20,7 @@ type (
CpuThreshold int64 `json:",default=900,range=[0:1000]"`
}
// A RpcClientConf is a rpc client config.
RpcClientConf struct {
Etcd discov.EtcdConf `json:",optional"`
Endpoints []string `json:",optional=!Etcd"`
@ -28,6 +30,7 @@ type (
}
)
// NewDirectClientConf returns a RpcClientConf.
func NewDirectClientConf(endpoints []string, app, token string) RpcClientConf {
return RpcClientConf{
Endpoints: endpoints,
@ -36,6 +39,7 @@ func NewDirectClientConf(endpoints []string, app, token string) RpcClientConf {
}
}
// NewEtcdClientConf returns a RpcClientConf.
func NewEtcdClientConf(hosts []string, key, app, token string) RpcClientConf {
return RpcClientConf{
Etcd: discov.EtcdConf{
@ -47,10 +51,12 @@ func NewEtcdClientConf(hosts []string, key, app, token string) RpcClientConf {
}
}
// HasEtcd checks if there is etcd settings in config.
func (sc RpcServerConf) HasEtcd() bool {
return len(sc.Etcd.Hosts) > 0 && len(sc.Etcd.Key) > 0
}
// Validate validates the config.
func (sc RpcServerConf) Validate() error {
if sc.Auth {
if err := sc.Redis.Validate(); err != nil {
@ -61,6 +67,7 @@ func (sc RpcServerConf) Validate() error {
return nil
}
// HasCredential checks if there is a credential in config.
func (cc RpcClientConf) HasCredential() bool {
return len(cc.App) > 0 && len(cc.Token) > 0
}

@ -13,6 +13,7 @@ import (
const defaultExpiration = 5 * time.Minute
// An Authenticator is used to authenticate the rpc requests.
type Authenticator struct {
store *redis.Redis
key string
@ -20,6 +21,7 @@ type Authenticator struct {
strict bool
}
// NewAuthenticator returns an Authenticator.
func NewAuthenticator(store *redis.Redis, key string, strict bool) (*Authenticator, error) {
cache, err := collection.NewCache(defaultExpiration)
if err != nil {
@ -34,6 +36,7 @@ func NewAuthenticator(store *redis.Redis, key string, strict bool) (*Authenticat
}, nil
}
// Authenticate authenticates the given ctx.
func (a *Authenticator) Authenticate(ctx context.Context) error {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {

@ -6,11 +6,13 @@ import (
"google.golang.org/grpc/metadata"
)
// A Credential is used to authenticate.
type Credential struct {
App string
Token string
}
// GetRequestMetadata gets the request metadata.
func (c *Credential) GetRequestMetadata(context.Context, ...string) (map[string]string, error) {
return map[string]string{
appKey: c.App,
@ -18,10 +20,12 @@ func (c *Credential) GetRequestMetadata(context.Context, ...string) (map[string]
}, nil
}
// RequireTransportSecurity always returns false.
func (c *Credential) RequireTransportSecurity() bool {
return false
}
// ParseCredential parses credential from given ctx.
func ParseCredential(ctx context.Context) Credential {
var credential Credential

@ -20,7 +20,9 @@ import (
)
const (
// Name is the name of p2c balancer.
Name = "p2c_ewma"
decayTime = int64(time.Second * 10) // default value from finagle
forcePick = int64(time.Second)
initSuccess = 1000

@ -2,10 +2,12 @@ package internal
import "google.golang.org/grpc"
// WithStreamClientInterceptors uses given client stream interceptors.
func WithStreamClientInterceptors(interceptors ...grpc.StreamClientInterceptor) grpc.DialOption {
return grpc.WithChainStreamInterceptor(interceptors...)
}
// WithUnaryClientInterceptors uses given client unary interceptors.
func WithUnaryClientInterceptors(interceptors ...grpc.UnaryClientInterceptor) grpc.DialOption {
return grpc.WithChainUnaryInterceptor(interceptors...)
}

@ -2,10 +2,12 @@ package internal
import "google.golang.org/grpc"
// WithStreamServerInterceptors uses given server stream interceptors.
func WithStreamServerInterceptors(interceptors ...grpc.StreamServerInterceptor) grpc.ServerOption {
return grpc.ChainStreamInterceptor(interceptors...)
}
// WithUnaryServerInterceptors uses given server unary interceptors.
func WithUnaryServerInterceptors(interceptors ...grpc.UnaryServerInterceptor) grpc.ServerOption {
return grpc.ChainUnaryInterceptor(interceptors...)
}

@ -23,11 +23,18 @@ func init() {
}
type (
// Client interface wraps the Conn method.
Client interface {
Conn() *grpc.ClientConn
}
// A ClientOptions is a client options.
ClientOptions struct {
Timeout time.Duration
DialOptions []grpc.DialOption
}
// ClientOption defines the method to customize a ClientOptions.
ClientOption func(options *ClientOptions)
client struct {
@ -35,7 +42,8 @@ type (
}
)
func NewClient(target string, opts ...ClientOption) (*client, error) {
// NewClient returns a Client.
func NewClient(target string, opts ...ClientOption) (Client, error) {
var cli client
opts = append([]ClientOption{WithDialOption(grpc.WithBalancerName(p2c.Name))}, opts...)
if err := cli.dial(target, opts...); err != nil {
@ -92,18 +100,21 @@ func (c *client) dial(server string, opts ...ClientOption) error {
return nil
}
// WithDialOption returns a func to customize a ClientOptions with given dial option.
func WithDialOption(opt grpc.DialOption) ClientOption {
return func(options *ClientOptions) {
options.DialOptions = append(options.DialOptions, opt)
}
}
// WithTimeout returns a func to customize a ClientOptions with given timeout.
func WithTimeout(timeout time.Duration) ClientOption {
return func(options *ClientOptions) {
options.Timeout = timeout
}
}
// WithUnaryClientInterceptor returns a func to customize a ClientOptions with given interceptor.
func WithUnaryClientInterceptor(interceptor grpc.UnaryClientInterceptor) ClientOption {
return func(options *ClientOptions) {
options.DialOptions = append(options.DialOptions, WithUnaryClientInterceptors(interceptor))

@ -9,6 +9,7 @@ import (
"google.golang.org/grpc"
)
// BreakerInterceptor is an interceptor that acts as a circuit breaker.
func BreakerInterceptor(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
breakerName := path.Join(cc.Target(), method)

@ -12,6 +12,7 @@ import (
const slowThreshold = time.Millisecond * 500
// DurationInterceptor is an interceptor that logs the processing time.
func DurationInterceptor(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
serverName := path.Join(cc.Target(), method)

@ -32,6 +32,7 @@ var (
})
)
// PrometheusInterceptor is an interceptor that reports to prometheus server.
func PrometheusInterceptor(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
startTime := timex.Now()

@ -8,6 +8,7 @@ import (
"google.golang.org/grpc"
)
// TimeoutInterceptor is an interceptor that controls timeout.
func TimeoutInterceptor(timeout time.Duration) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {

@ -8,6 +8,7 @@ import (
"google.golang.org/grpc/metadata"
)
// TracingInterceptor is an interceptor that handles tracing.
func TracingInterceptor(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx, span := trace.StartClientSpan(ctx, cc.Target(), method)

@ -5,6 +5,7 @@ import (
"google.golang.org/grpc/status"
)
// Acceptable checks if given error is acceptable.
func Acceptable(err error) bool {
switch status.Code(err) {
case codes.DeadlineExceeded, codes.Internal, codes.Unavailable, codes.DataLoss:

@ -7,18 +7,25 @@ import (
)
const (
// DirectScheme stands for direct schema.
DirectScheme = "direct"
// DiscovSchema stands for discov schema.
DiscovScheme = "discov"
// EnpointSepChar is the separator cha in endpoints.
EndpointSepChar = ','
subsetSize = 32
)
var (
// EnpointSep is the separator string in endpoints.
EndpointSep = fmt.Sprintf("%c", EndpointSepChar)
dirBuilder directBuilder
disBuilder discovBuilder
)
// RegisterResolver registers the direct and discov schemas to the resolver.
func RegisterResolver() {
resolver.Register(&dirBuilder)
resolver.Register(&disBuilder)

@ -12,62 +12,77 @@ const errorLevel = 2
var once sync.Once
// A Logger is a rpc logger.
type Logger struct{}
// InitLogger initializes the rpc logger.
func InitLogger() {
once.Do(func() {
grpclog.SetLoggerV2(new(Logger))
})
}
// Error logs the given args into error log.
func (l *Logger) Error(args ...interface{}) {
logx.Error(args...)
}
// Errorf logs the given args with format into error log.
func (l *Logger) Errorf(format string, args ...interface{}) {
logx.Errorf(format, args...)
}
// Errorln logs the given args into error log with newline.
func (l *Logger) Errorln(args ...interface{}) {
logx.Error(args...)
}
// Fatal logs the given args into error log.
func (l *Logger) Fatal(args ...interface{}) {
logx.Error(args...)
}
// Fatalf logs the given args with format into error log.
func (l *Logger) Fatalf(format string, args ...interface{}) {
logx.Errorf(format, args...)
}
// Fatalln logs args into error log with newline.
func (l *Logger) Fatalln(args ...interface{}) {
logx.Error(args...)
}
// Info ignores the grpc info logs.
func (l *Logger) Info(args ...interface{}) {
// ignore builtin grpc info
}
// Infoln ignores the grpc info logs.
func (l *Logger) Infoln(args ...interface{}) {
// ignore builtin grpc info
}
// Infof ignores the grpc info logs.
func (l *Logger) Infof(format string, args ...interface{}) {
// ignore builtin grpc info
}
// V checks if meet required log level.
func (l *Logger) V(v int) bool {
return v >= errorLevel
}
// Warning ignores the grpc warning logs.
func (l *Logger) Warning(args ...interface{}) {
// ignore builtin grpc warning
}
// Warningf ignores the grpc warning logs.
func (l *Logger) Warningf(format string, args ...interface{}) {
// ignore builtin grpc warning
}
// Warningln ignores the grpc warning logs.
func (l *Logger) Warningln(args ...interface{}) {
// ignore builtin grpc warning
}

@ -13,6 +13,7 @@ const (
envPodIp = "POD_IP"
)
// NewRpcPubServer returns a Server.
func NewRpcPubServer(etcdEndpoints []string, etcdKey, listenOn string, opts ...ServerOption) (Server, error) {
registerEtcd := func() error {
pubListenOn := figureOutListenOn(listenOn)

@ -10,6 +10,7 @@ import (
)
type (
// ServerOption defines the method to customize a rpcServerOptions.
ServerOption func(options *rpcServerOptions)
rpcServerOptions struct {
@ -26,6 +27,7 @@ func init() {
InitLogger()
}
// NewRpcServer returns a Server.
func NewRpcServer(address string, opts ...ServerOption) Server {
var options rpcServerOptions
for _, opt := range opts {
@ -76,6 +78,7 @@ func (s *rpcServer) Start(register RegisterFn) error {
return server.Serve(lis)
}
// WithMetrics returns a func that sets metrics to a Server.
func WithMetrics(metrics *stat.Metrics) ServerOption {
return func(options *rpcServerOptions) {
options.metrics = metrics

@ -6,8 +6,10 @@ import (
)
type (
// RegisterFn defines the method to register a server.
RegisterFn func(*grpc.Server)
// Server interface represents a rpc server.
Server interface {
AddOptions(options ...grpc.ServerOption)
AddStreamInterceptors(interceptors ...grpc.StreamServerInterceptor)

@ -7,6 +7,7 @@ import (
"google.golang.org/grpc"
)
// StreamAuthorizeInterceptor returns a func that uses given authenticator in processing stream requests.
func StreamAuthorizeInterceptor(authenticator *auth.Authenticator) grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo,
handler grpc.StreamHandler) error {
@ -18,6 +19,7 @@ func StreamAuthorizeInterceptor(authenticator *auth.Authenticator) grpc.StreamSe
}
}
// UnaryAuthorizeInterceptor returns a func that uses given authenticator in processing unary requests.
func UnaryAuthorizeInterceptor(authenticator *auth.Authenticator) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {

@ -10,6 +10,7 @@ import (
"google.golang.org/grpc/status"
)
// StreamCrashInterceptor catches panics in processing stream requests and recovers.
func StreamCrashInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo,
handler grpc.StreamHandler) (err error) {
defer handleCrash(func(r interface{}) {
@ -19,6 +20,7 @@ func StreamCrashInterceptor(srv interface{}, stream grpc.ServerStream, info *grp
return handler(srv, stream)
}
// UnaryCrashInterceptor catches panics in processing unary requests and recovers.
func UnaryCrashInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {

@ -32,6 +32,7 @@ var (
})
)
// UnaryPrometheusInterceptor returns a func that reports to the prometheus server.
func UnaryPrometheusInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (
interface{}, error) {

@ -16,6 +16,7 @@ var (
lock sync.Mutex
)
// UnarySheddingInterceptor returns a func that does load shedding on processing unary requests.
func UnarySheddingInterceptor(shedder load.Shedder, metrics *stat.Metrics) grpc.UnaryServerInterceptor {
ensureSheddingStat()

@ -14,6 +14,7 @@ import (
const serverSlowThreshold = time.Millisecond * 500
// UnaryStatInterceptor returns a func that uses given metrics to report stats.
func UnaryStatInterceptor(metrics *stat.Metrics) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {

@ -8,6 +8,7 @@ import (
"google.golang.org/grpc"
)
// UnaryTimeoutInterceptor returns a func that sets timeout to incoming unary requests.
func UnaryTimeoutInterceptor(timeout time.Duration) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {

@ -8,6 +8,7 @@ import (
"google.golang.org/grpc/metadata"
)
// UnaryTracingInterceptor returns a func that handles tracing with given service name.
func UnaryTracingInterceptor(serviceName string) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {

@ -7,11 +7,13 @@ import (
"github.com/tal-tech/go-zero/zrpc/internal/resolver"
)
// BuildDirectTarget returns a string that represents the given endpoints with direct schema.
func BuildDirectTarget(endpoints []string) string {
return fmt.Sprintf("%s:///%s", resolver.DirectScheme,
strings.Join(endpoints, resolver.EndpointSep))
}
// BuildDiscovTarget returns a string that represents the given endpoints with discov schema.
func BuildDiscovTarget(endpoints []string, key string) string {
return fmt.Sprintf("%s://%s/%s", resolver.DiscovScheme,
strings.Join(endpoints, resolver.EndpointSep), key)

@ -10,6 +10,7 @@ import (
"google.golang.org/grpc"
)
// A RpcProxy is a rpc proxy.
type RpcProxy struct {
backend string
clients map[string]Client
@ -18,6 +19,7 @@ type RpcProxy struct {
lock sync.Mutex
}
// NewProxy returns a RpcProxy.
func NewProxy(backend string, opts ...internal.ClientOption) *RpcProxy {
return &RpcProxy{
backend: backend,
@ -27,6 +29,7 @@ func NewProxy(backend string, opts ...internal.ClientOption) *RpcProxy {
}
}
// TakeConn returns a grpc.ClientConn.
func (p *RpcProxy) TakeConn(ctx context.Context) (*grpc.ClientConn, error) {
cred := auth.ParseCredential(ctx)
key := cred.App + "/" + cred.Token

@ -13,11 +13,13 @@ import (
"google.golang.org/grpc"
)
// A RpcServer is a rpc server.
type RpcServer struct {
server internal.Server
register internal.RegisterFn
}
// MustNewServer returns a RpcSever, exits on any error.
func MustNewServer(c RpcServerConf, register internal.RegisterFn) *RpcServer {
server, err := NewServer(c, register)
if err != nil {
@ -27,6 +29,7 @@ func MustNewServer(c RpcServerConf, register internal.RegisterFn) *RpcServer {
return server
}
// NewServer returns a RpcServer.
func NewServer(c RpcServerConf, register internal.RegisterFn) (*RpcServer, error) {
var err error
if err = c.Validate(); err != nil {
@ -60,18 +63,22 @@ func NewServer(c RpcServerConf, register internal.RegisterFn) (*RpcServer, error
return rpcServer, nil
}
// AddOptions adds given options.
func (rs *RpcServer) AddOptions(options ...grpc.ServerOption) {
rs.server.AddOptions(options...)
}
// AddStreamInterceptors adds given stream interceptors.
func (rs *RpcServer) AddStreamInterceptors(interceptors ...grpc.StreamServerInterceptor) {
rs.server.AddStreamInterceptors(interceptors...)
}
// AddUnaryInterceptors adds given unary interceptors.
func (rs *RpcServer) AddUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) {
rs.server.AddUnaryInterceptors(interceptors...)
}
// Start starts the RpcServer.
func (rs *RpcServer) Start() {
if err := rs.server.Start(rs.register); err != nil {
logx.Error(err)
@ -79,6 +86,7 @@ func (rs *RpcServer) Start() {
}
}
// Stop stops the RpcServer.
func (rs *RpcServer) Stop() {
logx.Close()
}

Loading…
Cancel
Save