internal/gitaly/server/server.go (154 lines of code) (raw):
package server
import (
"crypto/tls"
"fmt"
"time"
grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/selector"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/server/auth"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/grpcstats"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/listenmux"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/cache"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/customfieldshandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/featureflag"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/loghandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/panichandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/requestinfohandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/sentryhandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/statushandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
gitalylog "gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc"
grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
expcredentials "google.golang.org/grpc/experimental/credentials"
"google.golang.org/grpc/keepalive"
)
type serverConfig struct {
unaryInterceptors []grpc.UnaryServerInterceptor
streamInterceptors []grpc.StreamServerInterceptor
}
// Option is an option that can be passed to `New()`.
type Option func(*serverConfig)
// WithUnaryInterceptor adds another interceptor that shall be executed for unary RPC calls.
func WithUnaryInterceptor(interceptor grpc.UnaryServerInterceptor) Option {
return func(cfg *serverConfig) {
cfg.unaryInterceptors = append(cfg.unaryInterceptors, interceptor)
}
}
// WithStreamInterceptor adds another interceptor that shall be executed for streaming RPC calls.
func WithStreamInterceptor(interceptor grpc.StreamServerInterceptor) Option {
return func(cfg *serverConfig) {
cfg.streamInterceptors = append(cfg.streamInterceptors, interceptor)
}
}
// New returns a GRPC server instance with a set of interceptors configured.
func (s *GitalyServerFactory) New(external, secure bool, opts ...Option) (*grpc.Server, error) {
var cfg serverConfig
for _, opt := range opts {
opt(&cfg)
}
transportCredentials := insecure.NewCredentials()
// If tls config is specified attempt to extract tls options and use it
// as a grpc.ServerOption
if secure {
cert, err := s.cfg.TLS.Certificate()
if err != nil {
return nil, fmt.Errorf("error reading certificate and key paths: %w", err)
}
// The Go language maintains a list of cipher suites that do not have known security issues.
// This list of cipher suites should be used instead of the default list.
var secureCiphers []uint16
for _, cipher := range tls.CipherSuites() {
secureCiphers = append(secureCiphers, cipher.ID)
}
transportCredentials = expcredentials.NewTLSWithALPNDisabled(&tls.Config{
Certificates: []tls.Certificate{cert},
MinVersion: s.cfg.TLS.MinVersion.ProtocolVersion(),
CipherSuites: secureCiphers,
})
}
lm := listenmux.New(transportCredentials)
lm.Register(backchannel.NewServerHandshaker(
s.logger,
s.registry,
[]grpc.DialOption{client.UnaryInterceptor()},
))
logMsgProducer := grpcmwlogrus.WithMessageProducer(
loghandler.MessageProducer(
loghandler.PropagationMessageProducer(grpcmwlogrus.DefaultMessageProducer),
customfieldshandler.FieldsProducer,
grpcstats.FieldsProducer,
featureflag.FieldsProducer,
structerr.FieldsProducer,
),
)
logMatcher := gitalylog.NewLogMatcher()
streamServerInterceptors := []grpc.StreamServerInterceptor{
grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
requestinfohandler.StreamInterceptor,
grpcprometheus.StreamServerInterceptor,
customfieldshandler.StreamInterceptor,
selector.StreamServerInterceptor(s.logger.WithField("component", "gitaly.StreamServerInterceptor").StreamServerInterceptor(
grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat),
logMsgProducer,
), logMatcher),
loghandler.StreamLogDataCatcherServerInterceptor(),
sentryhandler.StreamLogHandler(),
statushandler.Stream, // Should be below LogHandler
auth.StreamServerInterceptor(s.cfg.Auth),
}
unaryServerInterceptors := []grpc.UnaryServerInterceptor{
grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler
requestinfohandler.UnaryInterceptor,
grpcprometheus.UnaryServerInterceptor,
customfieldshandler.UnaryInterceptor,
selector.UnaryServerInterceptor(s.logger.WithField("component", "gitaly.UnaryServerInterceptor").UnaryServerInterceptor(
grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat),
logMsgProducer,
), logMatcher),
loghandler.UnaryLogDataCatcherServerInterceptor(),
sentryhandler.UnaryLogHandler(),
statushandler.Unary, // Should be below LogHandler
auth.UnaryServerInterceptor(s.cfg.Auth),
}
// Should be below auth handler to prevent v2 hmac tokens from timing out while queued
for _, limitHandler := range s.limitHandlers {
streamServerInterceptors = append(streamServerInterceptors, limitHandler.StreamInterceptor())
unaryServerInterceptors = append(unaryServerInterceptors, limitHandler.UnaryInterceptor())
}
streamServerInterceptors = append(streamServerInterceptors,
grpctracing.StreamServerTracingInterceptor(),
cache.StreamInvalidator(s.cacheInvalidator, protoregistry.GitalyProtoPreregistered, s.logger),
// Panic handler should remain last so that application panics will be
// converted to errors and logged
panichandler.StreamPanicHandler(s.logger),
)
unaryServerInterceptors = append(unaryServerInterceptors,
grpctracing.UnaryServerTracingInterceptor(),
cache.UnaryInvalidator(s.cacheInvalidator, protoregistry.GitalyProtoPreregistered, s.logger),
// Panic handler should remain last so that application panics will be
// converted to errors and logged
panichandler.UnaryPanicHandler(s.logger),
)
streamServerInterceptors = append(streamServerInterceptors, cfg.streamInterceptors...)
unaryServerInterceptors = append(unaryServerInterceptors, cfg.unaryInterceptors...)
// Only requests coming through the external API need to be ran transactionalized. Only the HookService calls
// should arrive through the internal socket. Requests coming from there would already be running in a
// transaction as the external request that led to the internal socket call would have been transactionalized
// already.
if external {
if len(s.txMiddleware.UnaryInterceptors) > 0 {
unaryServerInterceptors = append(unaryServerInterceptors, s.txMiddleware.UnaryInterceptors...)
}
if len(s.txMiddleware.StreamInterceptors) > 0 {
streamServerInterceptors = append(streamServerInterceptors, s.txMiddleware.StreamInterceptors...)
}
}
serverOptions := []grpc.ServerOption{
grpc.StatsHandler(loghandler.PerRPCLogHandler{
Underlying: &grpcstats.PayloadBytes{},
FieldProducers: []loghandler.FieldsProducer{grpcstats.FieldsProducer},
}),
grpc.Creds(lm),
grpc.ChainStreamInterceptor(streamServerInterceptors...),
grpc.ChainUnaryInterceptor(unaryServerInterceptors...),
// We deliberately set the server MinTime to significantly less than the client interval of 20
// seconds to allow for network jitter. We can afford to be forgiving as the maximum number of
// concurrent clients for a Gitaly server is typically in the hundreds and this volume of
// keepalives won't add significant load.
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 10 * time.Second,
PermitWithoutStream: true,
}),
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 5 * time.Minute,
}),
grpc.WaitForHandlers(true),
}
return grpc.NewServer(serverOptions...), nil
}