in runtime/emserver/grpc.go [44:102]
func NewGRPCServer(opt *config.GRPCOption) (GracefulServer, error) {
var (
srv *grpc.Server
err error
)
//msgReqPerSeconds := config.GlobalConfig().Server.GRPCOption.MsgReqNumPerSecond
//limiter := rate.NewLimiter(rate.Limit(msgReqPerSeconds), 10)
consumerMgr, err := consumer.NewConsumerManager()
if err != nil {
return nil, err
}
producerMgr, err := producer.NewProducerManager()
if err != nil {
return nil, err
}
//registryName := config.GlobalConfig().Server.GRPCOption.RegistryName
//regis := registry.Get(registryName)
consumerSVC, err := consumer.NewConsumerServiceServer(consumerMgr, producerMgr)
if err != nil {
return nil, err
}
producerSVC, err := producer.NewProducerServiceServer(producerMgr)
if err != nil {
return nil, err
}
hbSVC, err := heartbeat.NewHeartbeatServiceServer(consumerMgr)
if err != nil {
return nil, err
}
lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%v", opt.Port))
if err != nil {
return nil, err
}
if opt.TLSOption.EnableInsecure {
creds, err := credentials.NewServerTLSFromFile(opt.Certfile, opt.Keyfile)
if err != nil {
return nil, err
}
srv = grpc.NewServer(grpc.Creds(creds))
} else {
srv = grpc.NewServer()
}
pb.RegisterConsumerServiceServer(srv, consumerSVC)
pb.RegisterHeartbeatServiceServer(srv, hbSVC)
pb.RegisterPublisherServiceServer(srv, producerSVC)
// Register reflection service on gRPC server.
reflection.Register(srv)
return &GRPCServer{
grpcOption: opt,
grpcServer: srv,
lis: lis,
}, nil
}