You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
83 lines
1.8 KiB
Go
83 lines
1.8 KiB
Go
4 years ago
|
package internal
|
||
4 years ago
|
|
||
|
import (
|
||
|
"net"
|
||
|
|
||
|
"zero/core/proc"
|
||
|
"zero/core/stat"
|
||
4 years ago
|
"zero/rpcx/internal/serverinterceptors"
|
||
4 years ago
|
|
||
|
"google.golang.org/grpc"
|
||
|
)
|
||
|
|
||
|
type (
|
||
|
ServerOption func(options *rpcServerOptions)
|
||
|
|
||
|
rpcServerOptions struct {
|
||
|
metrics *stat.Metrics
|
||
|
}
|
||
|
|
||
|
rpcServer struct {
|
||
|
*baseRpcServer
|
||
|
}
|
||
|
)
|
||
|
|
||
|
func init() {
|
||
|
InitLogger()
|
||
|
}
|
||
|
|
||
|
func NewRpcServer(address string, opts ...ServerOption) Server {
|
||
|
var options rpcServerOptions
|
||
|
for _, opt := range opts {
|
||
|
opt(&options)
|
||
|
}
|
||
|
if options.metrics == nil {
|
||
|
options.metrics = stat.NewMetrics(address)
|
||
|
}
|
||
|
|
||
|
return &rpcServer{
|
||
|
baseRpcServer: newBaseRpcServer(address, options.metrics),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *rpcServer) SetName(name string) {
|
||
|
s.baseRpcServer.SetName(name)
|
||
|
}
|
||
|
|
||
|
func (s *rpcServer) Start(register RegisterFn) error {
|
||
|
lis, err := net.Listen("tcp", s.address)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
unaryInterceptors := []grpc.UnaryServerInterceptor{
|
||
|
serverinterceptors.UnaryCrashInterceptor(),
|
||
|
serverinterceptors.UnaryStatInterceptor(s.metrics),
|
||
|
serverinterceptors.UnaryPromMetricInterceptor(),
|
||
|
}
|
||
|
unaryInterceptors = append(unaryInterceptors, s.unaryInterceptors...)
|
||
|
streamInterceptors := []grpc.StreamServerInterceptor{
|
||
|
serverinterceptors.StreamCrashInterceptor,
|
||
|
}
|
||
|
streamInterceptors = append(streamInterceptors, s.streamInterceptors...)
|
||
|
options := append(s.options, WithUnaryServerInterceptors(unaryInterceptors...),
|
||
|
WithStreamServerInterceptors(streamInterceptors...))
|
||
|
server := grpc.NewServer(options...)
|
||
|
register(server)
|
||
|
// we need to make sure all others are wrapped up
|
||
|
// so we do graceful stop at shutdown phase instead of wrap up phase
|
||
|
shutdownCalled := proc.AddShutdownListener(func() {
|
||
|
server.GracefulStop()
|
||
|
})
|
||
|
err = server.Serve(lis)
|
||
|
shutdownCalled()
|
||
|
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func WithMetrics(metrics *stat.Metrics) ServerOption {
|
||
|
return func(options *rpcServerOptions) {
|
||
|
options.metrics = metrics
|
||
|
}
|
||
|
}
|