diff --git a/zrpc/client.go b/zrpc/client.go index 81d370f3..f3fb199a 100644 --- a/zrpc/client.go +++ b/zrpc/client.go @@ -11,23 +11,27 @@ import ( ) var ( - WithDialOption = internal.WithDialOption - WithTimeout = internal.WithTimeout + // WithDialOption is an alias of internal.WithDialOption. + WithDialOption = internal.WithDialOption + // WithTimeout is an alias of internal.WithTimeout. + WithTimeout = internal.WithTimeout + // WithUnaryClientInterceptor is an alias of internal.WithUnaryClientInterceptor. WithUnaryClientInterceptor = internal.WithUnaryClientInterceptor ) type ( + // Client is an alias of internal.Client. + Client = internal.Client + // ClientOption is an alias of internal.ClientOption. ClientOption = internal.ClientOption - Client interface { - Conn() *grpc.ClientConn - } - + // A RpcClient is a rpc client. RpcClient struct { client Client } ) +// MustNewClient returns a Client, exits on any error. func MustNewClient(c RpcClientConf, options ...ClientOption) Client { cli, err := NewClient(c, options...) if err != nil { @@ -37,6 +41,7 @@ func MustNewClient(c RpcClientConf, options ...ClientOption) Client { return cli } +// NewClient returns a Client. func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) { var opts []ClientOption if c.HasCredential() { @@ -66,6 +71,7 @@ func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) { }, nil } +// NewClientNoAuth returns a Client without authentication. func NewClientNoAuth(c discov.EtcdConf, opts ...ClientOption) (Client, error) { client, err := internal.NewClient(internal.BuildDiscovTarget(c.Hosts, c.Key), opts...) if err != nil { @@ -77,10 +83,12 @@ func NewClientNoAuth(c discov.EtcdConf, opts ...ClientOption) (Client, error) { }, nil } +// NewClientWithTarget returns a Client with connecting to given target. func NewClientWithTarget(target string, opts ...ClientOption) (Client, error) { return internal.NewClient(target, opts...) } +// Conn returns the underlying grpc.ClientConn. func (rc *RpcClient) Conn() *grpc.ClientConn { return rc.client.Conn() } diff --git a/zrpc/config.go b/zrpc/config.go index 31c3675c..7188ae98 100644 --- a/zrpc/config.go +++ b/zrpc/config.go @@ -7,6 +7,7 @@ import ( ) type ( + // A RpcServerConf is a rpc server config. RpcServerConf struct { service.ServiceConf ListenOn string @@ -19,6 +20,7 @@ type ( CpuThreshold int64 `json:",default=900,range=[0:1000]"` } + // A RpcClientConf is a rpc client config. RpcClientConf struct { Etcd discov.EtcdConf `json:",optional"` Endpoints []string `json:",optional=!Etcd"` @@ -28,6 +30,7 @@ type ( } ) +// NewDirectClientConf returns a RpcClientConf. func NewDirectClientConf(endpoints []string, app, token string) RpcClientConf { return RpcClientConf{ Endpoints: endpoints, @@ -36,6 +39,7 @@ func NewDirectClientConf(endpoints []string, app, token string) RpcClientConf { } } +// NewEtcdClientConf returns a RpcClientConf. func NewEtcdClientConf(hosts []string, key, app, token string) RpcClientConf { return RpcClientConf{ Etcd: discov.EtcdConf{ @@ -47,10 +51,12 @@ func NewEtcdClientConf(hosts []string, key, app, token string) RpcClientConf { } } +// HasEtcd checks if there is etcd settings in config. func (sc RpcServerConf) HasEtcd() bool { return len(sc.Etcd.Hosts) > 0 && len(sc.Etcd.Key) > 0 } +// Validate validates the config. func (sc RpcServerConf) Validate() error { if sc.Auth { if err := sc.Redis.Validate(); err != nil { @@ -61,6 +67,7 @@ func (sc RpcServerConf) Validate() error { return nil } +// HasCredential checks if there is a credential in config. func (cc RpcClientConf) HasCredential() bool { return len(cc.App) > 0 && len(cc.Token) > 0 } diff --git a/zrpc/internal/auth/auth.go b/zrpc/internal/auth/auth.go index 23761409..fe02e6fa 100644 --- a/zrpc/internal/auth/auth.go +++ b/zrpc/internal/auth/auth.go @@ -13,6 +13,7 @@ import ( const defaultExpiration = 5 * time.Minute +// An Authenticator is used to authenticate the rpc requests. type Authenticator struct { store *redis.Redis key string @@ -20,6 +21,7 @@ type Authenticator struct { strict bool } +// NewAuthenticator returns an Authenticator. func NewAuthenticator(store *redis.Redis, key string, strict bool) (*Authenticator, error) { cache, err := collection.NewCache(defaultExpiration) if err != nil { @@ -34,6 +36,7 @@ func NewAuthenticator(store *redis.Redis, key string, strict bool) (*Authenticat }, nil } +// Authenticate authenticates the given ctx. func (a *Authenticator) Authenticate(ctx context.Context) error { md, ok := metadata.FromIncomingContext(ctx) if !ok { diff --git a/zrpc/internal/auth/credential.go b/zrpc/internal/auth/credential.go index 5855113f..72ba57eb 100644 --- a/zrpc/internal/auth/credential.go +++ b/zrpc/internal/auth/credential.go @@ -6,11 +6,13 @@ import ( "google.golang.org/grpc/metadata" ) +// A Credential is used to authenticate. type Credential struct { App string Token string } +// GetRequestMetadata gets the request metadata. func (c *Credential) GetRequestMetadata(context.Context, ...string) (map[string]string, error) { return map[string]string{ appKey: c.App, @@ -18,10 +20,12 @@ func (c *Credential) GetRequestMetadata(context.Context, ...string) (map[string] }, nil } +// RequireTransportSecurity always returns false. func (c *Credential) RequireTransportSecurity() bool { return false } +// ParseCredential parses credential from given ctx. func ParseCredential(ctx context.Context) Credential { var credential Credential diff --git a/zrpc/internal/balancer/p2c/p2c.go b/zrpc/internal/balancer/p2c/p2c.go index 8125b605..abaee516 100644 --- a/zrpc/internal/balancer/p2c/p2c.go +++ b/zrpc/internal/balancer/p2c/p2c.go @@ -20,7 +20,9 @@ import ( ) const ( - Name = "p2c_ewma" + // Name is the name of p2c balancer. + Name = "p2c_ewma" + decayTime = int64(time.Second * 10) // default value from finagle forcePick = int64(time.Second) initSuccess = 1000 diff --git a/zrpc/internal/chainclientinterceptors.go b/zrpc/internal/chainclientinterceptors.go index 3c4fba0c..9465e102 100644 --- a/zrpc/internal/chainclientinterceptors.go +++ b/zrpc/internal/chainclientinterceptors.go @@ -2,10 +2,12 @@ package internal import "google.golang.org/grpc" +// WithStreamClientInterceptors uses given client stream interceptors. func WithStreamClientInterceptors(interceptors ...grpc.StreamClientInterceptor) grpc.DialOption { return grpc.WithChainStreamInterceptor(interceptors...) } +// WithUnaryClientInterceptors uses given client unary interceptors. func WithUnaryClientInterceptors(interceptors ...grpc.UnaryClientInterceptor) grpc.DialOption { return grpc.WithChainUnaryInterceptor(interceptors...) } diff --git a/zrpc/internal/chainserverinterceptors.go b/zrpc/internal/chainserverinterceptors.go index e6e3b7ae..4d67632e 100644 --- a/zrpc/internal/chainserverinterceptors.go +++ b/zrpc/internal/chainserverinterceptors.go @@ -2,10 +2,12 @@ package internal import "google.golang.org/grpc" +// WithStreamServerInterceptors uses given server stream interceptors. func WithStreamServerInterceptors(interceptors ...grpc.StreamServerInterceptor) grpc.ServerOption { return grpc.ChainStreamInterceptor(interceptors...) } +// WithUnaryServerInterceptors uses given server unary interceptors. func WithUnaryServerInterceptors(interceptors ...grpc.UnaryServerInterceptor) grpc.ServerOption { return grpc.ChainUnaryInterceptor(interceptors...) } diff --git a/zrpc/internal/client.go b/zrpc/internal/client.go index 08019566..26272fba 100644 --- a/zrpc/internal/client.go +++ b/zrpc/internal/client.go @@ -23,11 +23,18 @@ func init() { } type ( + // Client interface wraps the Conn method. + Client interface { + Conn() *grpc.ClientConn + } + + // A ClientOptions is a client options. ClientOptions struct { Timeout time.Duration DialOptions []grpc.DialOption } + // ClientOption defines the method to customize a ClientOptions. ClientOption func(options *ClientOptions) client struct { @@ -35,7 +42,8 @@ type ( } ) -func NewClient(target string, opts ...ClientOption) (*client, error) { +// NewClient returns a Client. +func NewClient(target string, opts ...ClientOption) (Client, error) { var cli client opts = append([]ClientOption{WithDialOption(grpc.WithBalancerName(p2c.Name))}, opts...) if err := cli.dial(target, opts...); err != nil { @@ -92,18 +100,21 @@ func (c *client) dial(server string, opts ...ClientOption) error { return nil } +// WithDialOption returns a func to customize a ClientOptions with given dial option. func WithDialOption(opt grpc.DialOption) ClientOption { return func(options *ClientOptions) { options.DialOptions = append(options.DialOptions, opt) } } +// WithTimeout returns a func to customize a ClientOptions with given timeout. func WithTimeout(timeout time.Duration) ClientOption { return func(options *ClientOptions) { options.Timeout = timeout } } +// WithUnaryClientInterceptor returns a func to customize a ClientOptions with given interceptor. func WithUnaryClientInterceptor(interceptor grpc.UnaryClientInterceptor) ClientOption { return func(options *ClientOptions) { options.DialOptions = append(options.DialOptions, WithUnaryClientInterceptors(interceptor)) diff --git a/zrpc/internal/clientinterceptors/breakerinterceptor.go b/zrpc/internal/clientinterceptors/breakerinterceptor.go index 2163a497..163aa62b 100644 --- a/zrpc/internal/clientinterceptors/breakerinterceptor.go +++ b/zrpc/internal/clientinterceptors/breakerinterceptor.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc" ) +// BreakerInterceptor is an interceptor that acts as a circuit breaker. func BreakerInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { breakerName := path.Join(cc.Target(), method) diff --git a/zrpc/internal/clientinterceptors/durationinterceptor.go b/zrpc/internal/clientinterceptors/durationinterceptor.go index b5ace685..5eda2ebf 100644 --- a/zrpc/internal/clientinterceptors/durationinterceptor.go +++ b/zrpc/internal/clientinterceptors/durationinterceptor.go @@ -12,6 +12,7 @@ import ( const slowThreshold = time.Millisecond * 500 +// DurationInterceptor is an interceptor that logs the processing time. func DurationInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { serverName := path.Join(cc.Target(), method) diff --git a/zrpc/internal/clientinterceptors/prometheusinterceptor.go b/zrpc/internal/clientinterceptors/prometheusinterceptor.go index a330410c..4efb3385 100644 --- a/zrpc/internal/clientinterceptors/prometheusinterceptor.go +++ b/zrpc/internal/clientinterceptors/prometheusinterceptor.go @@ -32,6 +32,7 @@ var ( }) ) +// PrometheusInterceptor is an interceptor that reports to prometheus server. func PrometheusInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { startTime := timex.Now() diff --git a/zrpc/internal/clientinterceptors/timeoutinterceptor.go b/zrpc/internal/clientinterceptors/timeoutinterceptor.go index 86150e02..5d9a380a 100644 --- a/zrpc/internal/clientinterceptors/timeoutinterceptor.go +++ b/zrpc/internal/clientinterceptors/timeoutinterceptor.go @@ -8,6 +8,7 @@ import ( "google.golang.org/grpc" ) +// TimeoutInterceptor is an interceptor that controls timeout. func TimeoutInterceptor(timeout time.Duration) grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { diff --git a/zrpc/internal/clientinterceptors/tracinginterceptor.go b/zrpc/internal/clientinterceptors/tracinginterceptor.go index d05a23a0..afd5312f 100644 --- a/zrpc/internal/clientinterceptors/tracinginterceptor.go +++ b/zrpc/internal/clientinterceptors/tracinginterceptor.go @@ -8,6 +8,7 @@ import ( "google.golang.org/grpc/metadata" ) +// TracingInterceptor is an interceptor that handles tracing. func TracingInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { ctx, span := trace.StartClientSpan(ctx, cc.Target(), method) diff --git a/zrpc/internal/codes/accept.go b/zrpc/internal/codes/accept.go index 91191083..0ecb1275 100644 --- a/zrpc/internal/codes/accept.go +++ b/zrpc/internal/codes/accept.go @@ -5,6 +5,7 @@ import ( "google.golang.org/grpc/status" ) +// Acceptable checks if given error is acceptable. func Acceptable(err error) bool { switch status.Code(err) { case codes.DeadlineExceeded, codes.Internal, codes.Unavailable, codes.DataLoss: diff --git a/zrpc/internal/resolver/resolver.go b/zrpc/internal/resolver/resolver.go index f86009c6..76d56be2 100644 --- a/zrpc/internal/resolver/resolver.go +++ b/zrpc/internal/resolver/resolver.go @@ -7,18 +7,25 @@ import ( ) const ( - DirectScheme = "direct" - DiscovScheme = "discov" + // DirectScheme stands for direct schema. + DirectScheme = "direct" + // DiscovSchema stands for discov schema. + DiscovScheme = "discov" + // EnpointSepChar is the separator cha in endpoints. EndpointSepChar = ',' - subsetSize = 32 + + subsetSize = 32 ) var ( + // EnpointSep is the separator string in endpoints. EndpointSep = fmt.Sprintf("%c", EndpointSepChar) - dirBuilder directBuilder - disBuilder discovBuilder + + dirBuilder directBuilder + disBuilder discovBuilder ) +// RegisterResolver registers the direct and discov schemas to the resolver. func RegisterResolver() { resolver.Register(&dirBuilder) resolver.Register(&disBuilder) diff --git a/zrpc/internal/rpclogger.go b/zrpc/internal/rpclogger.go index 8761fe98..8cefd149 100644 --- a/zrpc/internal/rpclogger.go +++ b/zrpc/internal/rpclogger.go @@ -12,62 +12,77 @@ const errorLevel = 2 var once sync.Once +// A Logger is a rpc logger. type Logger struct{} +// InitLogger initializes the rpc logger. func InitLogger() { once.Do(func() { grpclog.SetLoggerV2(new(Logger)) }) } +// Error logs the given args into error log. func (l *Logger) Error(args ...interface{}) { logx.Error(args...) } +// Errorf logs the given args with format into error log. func (l *Logger) Errorf(format string, args ...interface{}) { logx.Errorf(format, args...) } +// Errorln logs the given args into error log with newline. func (l *Logger) Errorln(args ...interface{}) { logx.Error(args...) } +// Fatal logs the given args into error log. func (l *Logger) Fatal(args ...interface{}) { logx.Error(args...) } +// Fatalf logs the given args with format into error log. func (l *Logger) Fatalf(format string, args ...interface{}) { logx.Errorf(format, args...) } +// Fatalln logs args into error log with newline. func (l *Logger) Fatalln(args ...interface{}) { logx.Error(args...) } +// Info ignores the grpc info logs. func (l *Logger) Info(args ...interface{}) { // ignore builtin grpc info } +// Infoln ignores the grpc info logs. func (l *Logger) Infoln(args ...interface{}) { // ignore builtin grpc info } +// Infof ignores the grpc info logs. func (l *Logger) Infof(format string, args ...interface{}) { // ignore builtin grpc info } +// V checks if meet required log level. func (l *Logger) V(v int) bool { return v >= errorLevel } +// Warning ignores the grpc warning logs. func (l *Logger) Warning(args ...interface{}) { // ignore builtin grpc warning } +// Warningf ignores the grpc warning logs. func (l *Logger) Warningf(format string, args ...interface{}) { // ignore builtin grpc warning } +// Warningln ignores the grpc warning logs. func (l *Logger) Warningln(args ...interface{}) { // ignore builtin grpc warning } diff --git a/zrpc/internal/rpcpubserver.go b/zrpc/internal/rpcpubserver.go index 334853b5..10ec8014 100644 --- a/zrpc/internal/rpcpubserver.go +++ b/zrpc/internal/rpcpubserver.go @@ -13,6 +13,7 @@ const ( envPodIp = "POD_IP" ) +// NewRpcPubServer returns a Server. func NewRpcPubServer(etcdEndpoints []string, etcdKey, listenOn string, opts ...ServerOption) (Server, error) { registerEtcd := func() error { pubListenOn := figureOutListenOn(listenOn) diff --git a/zrpc/internal/rpcserver.go b/zrpc/internal/rpcserver.go index 2f4a71e9..9d16901d 100644 --- a/zrpc/internal/rpcserver.go +++ b/zrpc/internal/rpcserver.go @@ -10,6 +10,7 @@ import ( ) type ( + // ServerOption defines the method to customize a rpcServerOptions. ServerOption func(options *rpcServerOptions) rpcServerOptions struct { @@ -26,6 +27,7 @@ func init() { InitLogger() } +// NewRpcServer returns a Server. func NewRpcServer(address string, opts ...ServerOption) Server { var options rpcServerOptions for _, opt := range opts { @@ -76,6 +78,7 @@ func (s *rpcServer) Start(register RegisterFn) error { return server.Serve(lis) } +// WithMetrics returns a func that sets metrics to a Server. func WithMetrics(metrics *stat.Metrics) ServerOption { return func(options *rpcServerOptions) { options.metrics = metrics diff --git a/zrpc/internal/server.go b/zrpc/internal/server.go index 98aea89c..10b0c818 100644 --- a/zrpc/internal/server.go +++ b/zrpc/internal/server.go @@ -6,8 +6,10 @@ import ( ) type ( + // RegisterFn defines the method to register a server. RegisterFn func(*grpc.Server) + // Server interface represents a rpc server. Server interface { AddOptions(options ...grpc.ServerOption) AddStreamInterceptors(interceptors ...grpc.StreamServerInterceptor) diff --git a/zrpc/internal/serverinterceptors/authinterceptor.go b/zrpc/internal/serverinterceptors/authinterceptor.go index 102cac34..06193e99 100644 --- a/zrpc/internal/serverinterceptors/authinterceptor.go +++ b/zrpc/internal/serverinterceptors/authinterceptor.go @@ -7,6 +7,7 @@ import ( "google.golang.org/grpc" ) +// StreamAuthorizeInterceptor returns a func that uses given authenticator in processing stream requests. func StreamAuthorizeInterceptor(authenticator *auth.Authenticator) grpc.StreamServerInterceptor { return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { @@ -18,6 +19,7 @@ func StreamAuthorizeInterceptor(authenticator *auth.Authenticator) grpc.StreamSe } } +// UnaryAuthorizeInterceptor returns a func that uses given authenticator in processing unary requests. func UnaryAuthorizeInterceptor(authenticator *auth.Authenticator) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { diff --git a/zrpc/internal/serverinterceptors/crashinterceptor.go b/zrpc/internal/serverinterceptors/crashinterceptor.go index 8fe32d37..e3126860 100644 --- a/zrpc/internal/serverinterceptors/crashinterceptor.go +++ b/zrpc/internal/serverinterceptors/crashinterceptor.go @@ -10,6 +10,7 @@ import ( "google.golang.org/grpc/status" ) +// StreamCrashInterceptor catches panics in processing stream requests and recovers. func StreamCrashInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { defer handleCrash(func(r interface{}) { @@ -19,6 +20,7 @@ func StreamCrashInterceptor(srv interface{}, stream grpc.ServerStream, info *grp return handler(srv, stream) } +// UnaryCrashInterceptor catches panics in processing unary requests and recovers. func UnaryCrashInterceptor() grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { diff --git a/zrpc/internal/serverinterceptors/prometheusinterceptor.go b/zrpc/internal/serverinterceptors/prometheusinterceptor.go index 7925fcba..b0120daf 100644 --- a/zrpc/internal/serverinterceptors/prometheusinterceptor.go +++ b/zrpc/internal/serverinterceptors/prometheusinterceptor.go @@ -32,6 +32,7 @@ var ( }) ) +// UnaryPrometheusInterceptor returns a func that reports to the prometheus server. func UnaryPrometheusInterceptor() grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) ( interface{}, error) { diff --git a/zrpc/internal/serverinterceptors/sheddinginterceptor.go b/zrpc/internal/serverinterceptors/sheddinginterceptor.go index a76f1e73..c1386e6b 100644 --- a/zrpc/internal/serverinterceptors/sheddinginterceptor.go +++ b/zrpc/internal/serverinterceptors/sheddinginterceptor.go @@ -16,6 +16,7 @@ var ( lock sync.Mutex ) +// UnarySheddingInterceptor returns a func that does load shedding on processing unary requests. func UnarySheddingInterceptor(shedder load.Shedder, metrics *stat.Metrics) grpc.UnaryServerInterceptor { ensureSheddingStat() diff --git a/zrpc/internal/serverinterceptors/statinterceptor.go b/zrpc/internal/serverinterceptors/statinterceptor.go index 3c653c62..8eacd4a0 100644 --- a/zrpc/internal/serverinterceptors/statinterceptor.go +++ b/zrpc/internal/serverinterceptors/statinterceptor.go @@ -14,6 +14,7 @@ import ( const serverSlowThreshold = time.Millisecond * 500 +// UnaryStatInterceptor returns a func that uses given metrics to report stats. func UnaryStatInterceptor(metrics *stat.Metrics) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { diff --git a/zrpc/internal/serverinterceptors/timeoutinterceptor.go b/zrpc/internal/serverinterceptors/timeoutinterceptor.go index 26990456..fbd04c53 100644 --- a/zrpc/internal/serverinterceptors/timeoutinterceptor.go +++ b/zrpc/internal/serverinterceptors/timeoutinterceptor.go @@ -8,6 +8,7 @@ import ( "google.golang.org/grpc" ) +// UnaryTimeoutInterceptor returns a func that sets timeout to incoming unary requests. func UnaryTimeoutInterceptor(timeout time.Duration) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { diff --git a/zrpc/internal/serverinterceptors/tracinginterceptor.go b/zrpc/internal/serverinterceptors/tracinginterceptor.go index 50023e5c..3ec81a27 100644 --- a/zrpc/internal/serverinterceptors/tracinginterceptor.go +++ b/zrpc/internal/serverinterceptors/tracinginterceptor.go @@ -8,6 +8,7 @@ import ( "google.golang.org/grpc/metadata" ) +// UnaryTracingInterceptor returns a func that handles tracing with given service name. func UnaryTracingInterceptor(serviceName string) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { diff --git a/zrpc/internal/target.go b/zrpc/internal/target.go index ff979fa8..25830667 100644 --- a/zrpc/internal/target.go +++ b/zrpc/internal/target.go @@ -7,11 +7,13 @@ import ( "github.com/tal-tech/go-zero/zrpc/internal/resolver" ) +// BuildDirectTarget returns a string that represents the given endpoints with direct schema. func BuildDirectTarget(endpoints []string) string { return fmt.Sprintf("%s:///%s", resolver.DirectScheme, strings.Join(endpoints, resolver.EndpointSep)) } +// BuildDiscovTarget returns a string that represents the given endpoints with discov schema. func BuildDiscovTarget(endpoints []string, key string) string { return fmt.Sprintf("%s://%s/%s", resolver.DiscovScheme, strings.Join(endpoints, resolver.EndpointSep), key) diff --git a/zrpc/proxy.go b/zrpc/proxy.go index f7bc6f79..6bcb7dab 100644 --- a/zrpc/proxy.go +++ b/zrpc/proxy.go @@ -10,6 +10,7 @@ import ( "google.golang.org/grpc" ) +// A RpcProxy is a rpc proxy. type RpcProxy struct { backend string clients map[string]Client @@ -18,6 +19,7 @@ type RpcProxy struct { lock sync.Mutex } +// NewProxy returns a RpcProxy. func NewProxy(backend string, opts ...internal.ClientOption) *RpcProxy { return &RpcProxy{ backend: backend, @@ -27,6 +29,7 @@ func NewProxy(backend string, opts ...internal.ClientOption) *RpcProxy { } } +// TakeConn returns a grpc.ClientConn. func (p *RpcProxy) TakeConn(ctx context.Context) (*grpc.ClientConn, error) { cred := auth.ParseCredential(ctx) key := cred.App + "/" + cred.Token diff --git a/zrpc/server.go b/zrpc/server.go index f1557929..9b367667 100644 --- a/zrpc/server.go +++ b/zrpc/server.go @@ -13,11 +13,13 @@ import ( "google.golang.org/grpc" ) +// A RpcServer is a rpc server. type RpcServer struct { server internal.Server register internal.RegisterFn } +// MustNewServer returns a RpcSever, exits on any error. func MustNewServer(c RpcServerConf, register internal.RegisterFn) *RpcServer { server, err := NewServer(c, register) if err != nil { @@ -27,6 +29,7 @@ func MustNewServer(c RpcServerConf, register internal.RegisterFn) *RpcServer { return server } +// NewServer returns a RpcServer. func NewServer(c RpcServerConf, register internal.RegisterFn) (*RpcServer, error) { var err error if err = c.Validate(); err != nil { @@ -60,18 +63,22 @@ func NewServer(c RpcServerConf, register internal.RegisterFn) (*RpcServer, error return rpcServer, nil } +// AddOptions adds given options. func (rs *RpcServer) AddOptions(options ...grpc.ServerOption) { rs.server.AddOptions(options...) } +// AddStreamInterceptors adds given stream interceptors. func (rs *RpcServer) AddStreamInterceptors(interceptors ...grpc.StreamServerInterceptor) { rs.server.AddStreamInterceptors(interceptors...) } +// AddUnaryInterceptors adds given unary interceptors. func (rs *RpcServer) AddUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) { rs.server.AddUnaryInterceptors(interceptors...) } +// Start starts the RpcServer. func (rs *RpcServer) Start() { if err := rs.server.Start(rs.register); err != nil { logx.Error(err) @@ -79,6 +86,7 @@ func (rs *RpcServer) Start() { } } +// Stop stops the RpcServer. func (rs *RpcServer) Stop() { logx.Close() }