plugins/server/grpc/server.go (92 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package grpc
import (
"net"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"github.com/apache/skywalking-satellite/internal/pkg/config"
"github.com/apache/skywalking-satellite/internal/pkg/log"
)
const (
Name = "grpc-server"
ShowName = "GRPC Server"
)
type Server struct {
config.CommonFields
Address string `mapstructure:"address"` // The address of grpc server.
Network string `mapstructure:"network"` // The network of grpc.
MaxRecvMsgSize int `mapstructure:"max_recv_msg_size"` // The max size of the received log.
MaxConcurrentStreams uint32 `mapstructure:"max_concurrent_streams"` // The max concurrent stream channels.
TLSCertFile string `mapstructure:"tls_cert_file"` // The TLS cert file path.
TLSKeyFile string `mapstructure:"tls_key_file"` // The TLS key file path.
AcceptLimit AcceptConnectionConfig `mapstructure:"accept_limit"` // To Accept Connection Limiter when reach the resource
// components
server *grpc.Server
listener net.Listener
}
func (s *Server) Name() string {
return Name
}
func (s *Server) ShowName() string {
return ShowName
}
func (s *Server) Description() string {
return "This is a sharing plugin, which would start a gRPC server."
}
func (s *Server) DefaultConfig() string {
return `
# The address of grpc server. Default value is :11800
address: :11800
# The network of grpc. Default value is :tcp
network: tcp
# The max size of receiving log. Default value is 2M. The unit is Byte.
max_recv_msg_size: 2097152
# The max concurrent stream channels.
max_concurrent_streams: 32
# The TLS cert file path.
tls_cert_file: ""
# The TLS key file path.
tls_key_file: ""
# To Accept Connection Limiter when reach the resource
accept_limit:
# The max CPU utilization limit
cpu_utilization: 75
# The max connection count
connection_count: 4000
`
}
func (s *Server) Prepare() error {
var options []grpc.ServerOption
if s.TLSCertFile != "" && s.TLSKeyFile != "" {
if c, err := credentials.NewServerTLSFromFile(s.TLSCertFile, s.TLSKeyFile); err == nil {
options = append(options, grpc.Creds(c))
} else {
log.Logger.Errorf("error in checking TLS files: %v", err)
return err
}
}
options = append(options, grpc.MaxRecvMsgSize(s.MaxRecvMsgSize), grpc.MaxConcurrentStreams(s.MaxConcurrentStreams))
s.server = grpc.NewServer(options...)
listener, err := NewConnectionManager(s.Network, s.Address, s.AcceptLimit)
if err != nil {
log.Logger.Errorf("grpc server cannot be created: %v", err)
return err
}
s.listener = listener
return nil
}
func (s *Server) Start() error {
go func() {
log.Logger.WithField("address", s.Address).Info("grpc server is starting...")
if err := s.server.Serve(s.listener); err != nil {
log.Logger.WithField("address", s.Address).Infof("grpc server has failure when starting: %v", err)
}
}()
return nil
}
func (s *Server) Close() error {
s.server.Stop()
log.Logger.Info("grpc server is closed")
return nil
}
func (s *Server) GetServer() interface{} {
return s.server
}