internal/testhelper/testserver/gitaly.go (578 lines of code) (raw):
package testserver
import (
"context"
"net"
"os"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
"gitlab.com/gitlab-org/gitaly/v16/internal/backup"
"gitlab.com/gitlab-org/gitaly/v16/internal/bundleuri"
"gitlab.com/gitlab-org/gitaly/v16/internal/cache"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping"
housekeepingmgr "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/manager"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/auth"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/server"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode"
nodeimpl "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/node"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitlab"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/limiter"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
praefectconfig "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/streamcache"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testdb"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
// RunGitalyServer starts gitaly server based on the provided cfg and returns a connection address.
// It accepts addition Registrar to register all required service instead of
// calling service.RegisterAll explicitly because it creates a circular dependency
// when the function is used in on of internal/gitaly/service/... packages.
func RunGitalyServer(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, deps *service.Dependencies), opts ...GitalyServerOpt) string {
return StartGitalyServer(tb, cfg, registrar, opts...).Address()
}
// StartGitalyServer creates and runs gitaly (and praefect as a proxy) server.
func StartGitalyServer(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, deps *service.Dependencies), opts ...GitalyServerOpt) GitalyServer {
gitalySrv, gitalyAddr, disablePraefect := runGitaly(tb, cfg, registrar, opts...)
if !testhelper.IsPraefectEnabled() || disablePraefect {
return GitalyServer{
Server: gitalySrv,
shutdown: gitalySrv.Stop,
address: gitalyAddr,
}
}
praefectServer := runPraefectProxy(tb, cfg, gitalyAddr)
return GitalyServer{
Server: gitalySrv,
shutdown: func() {
praefectServer.Shutdown()
gitalySrv.Stop()
},
address: praefectServer.Address(),
}
}
func runPraefectProxy(tb testing.TB, gitalyCfg config.Cfg, gitalyAddr string) PraefectServer {
var virtualStorages []*praefectconfig.VirtualStorage
for _, storage := range gitalyCfg.Storages {
virtualStorages = append(virtualStorages, &praefectconfig.VirtualStorage{
Name: storage.Name,
Nodes: []*praefectconfig.Node{
{
Storage: storage.Name,
Address: gitalyAddr,
Token: gitalyCfg.Auth.Token,
},
},
})
}
return StartPraefect(tb, praefectconfig.Config{
SocketPath: testhelper.GetTemporaryGitalySocketFileName(tb),
Auth: auth.Config{
Token: gitalyCfg.Auth.Token,
},
DB: testdb.GetConfig(tb, testdb.New(tb).Name),
Failover: praefectconfig.Failover{
Enabled: true,
ElectionStrategy: praefectconfig.ElectionStrategyLocal,
},
BackgroundVerification: praefectconfig.BackgroundVerification{
// Some tests cases purposefully create bad metadata by deleting a repository off
// the disk. If the background verifier is running, it could find these repositories
// and remove the invalid metadata related to them. This can cause the test assertions
// to fail. As the background verifier runs asynchronously and possibly changes state
// during a test and issues unexpected RPCs, it's disabled generally for all tests.
VerificationInterval: -1,
},
Replication: praefectconfig.DefaultReplicationConfig(),
Logging: log.Config{
Format: "json",
Level: "info",
},
VirtualStorages: virtualStorages,
})
}
// GitalyServer is a helper that carries additional info and
// functionality about gitaly (+praefect) server.
type GitalyServer struct {
Server *grpc.Server
shutdown func()
address string
}
// Shutdown terminates running gitaly (+praefect) server.
func (gs GitalyServer) Shutdown() {
gs.shutdown()
}
// Address returns address of the running gitaly (or praefect) service.
func (gs GitalyServer) Address() string {
return gs.address
}
// waitHealthy waits until the server hosted at address becomes healthy.
func waitHealthy(tb testing.TB, ctx context.Context, addr string, authToken string) {
grpcOpts := []grpc.DialOption{
client.UnaryInterceptor(),
client.StreamInterceptor(),
}
if authToken != "" {
grpcOpts = append(grpcOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(authToken)))
}
conn, err := client.New(ctx, addr, client.WithGrpcOptions(grpcOpts))
require.NoError(tb, err)
defer testhelper.MustClose(tb, conn)
healthClient := healthpb.NewHealthClient(conn)
resp, err := healthClient.Check(ctx, &healthpb.HealthCheckRequest{}, grpc.WaitForReady(true))
require.NoError(tb, err)
require.Equal(tb, healthpb.HealthCheckResponse_SERVING, resp.GetStatus(), "server not yet ready to serve")
}
func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, deps *service.Dependencies), opts ...GitalyServerOpt) (*grpc.Server, string, bool) {
tb.Helper()
ctx := testhelper.Context(tb)
var gsd gitalyServerDeps
for _, opt := range opts {
gsd = opt(gsd)
}
// We set up the structerr interceptors so that any error metadata that gets set via
// `structerr.WithMetadata()` is not only logged, but also present in the error details.
serverOpts := []server.Option{
server.WithUnaryInterceptor(StructErrUnaryInterceptor),
server.WithStreamInterceptor(StructErrStreamInterceptor),
}
deps := gsd.createDependencies(tb, ctx, cfg)
tb.Cleanup(func() { testhelper.MustClose(tb, gsd.conns) })
var txMiddleware server.TransactionMiddleware
if deps.GetNode() != nil {
unaryInterceptors := []grpc.UnaryServerInterceptor{
storagemgr.NewUnaryInterceptor(
deps.Logger, protoregistry.GitalyProtoPreregistered, deps.GetTransactionRegistry(), deps.GetNode(), deps.GetLocator(),
),
}
streamInterceptors := []grpc.StreamServerInterceptor{
storagemgr.NewStreamInterceptor(
deps.Logger, protoregistry.GitalyProtoPreregistered, deps.GetTransactionRegistry(), deps.GetNode(), deps.GetLocator(),
),
}
if gsd.transactionInterceptorsFn != nil {
unary, stream := gsd.transactionInterceptorsFn(deps.GetLogger(), deps.GetNode(), deps.GetRepositoryFactory())
unaryInterceptors = append(unaryInterceptors, unary...)
streamInterceptors = append(streamInterceptors, stream...)
}
txMiddleware = server.TransactionMiddleware{
UnaryInterceptors: unaryInterceptors,
StreamInterceptors: streamInterceptors,
}
}
serverFactory := server.NewGitalyServerFactory(
cfg,
gsd.logger.WithField("test", tb.Name()),
deps.GetBackchannelRegistry(),
deps.GetDiskCache(),
[]*limithandler.LimiterMiddleware{deps.GetLimitHandler()},
txMiddleware,
)
if cfg.RuntimeDir != "" {
internalServer, err := serverFactory.CreateInternal(serverOpts...)
require.NoError(tb, err)
registrar(internalServer, deps)
registerHealthServerIfNotRegistered(internalServer)
require.NoError(tb, os.MkdirAll(cfg.InternalSocketDir(), mode.Directory))
tb.Cleanup(func() { require.NoError(tb, os.RemoveAll(cfg.InternalSocketDir())) })
internalListener, err := net.Listen("unix", cfg.InternalSocketPath())
require.NoError(tb, err)
go func() {
assert.NoError(tb, internalServer.Serve(internalListener), "failure to serve internal gRPC")
}()
waitHealthy(tb, ctx, "unix://"+internalListener.Addr().String(), cfg.Auth.Token)
}
secure := cfg.TLS.CertPath != "" && cfg.TLS.KeyPath != ""
externalServer, err := serverFactory.CreateExternal(secure, serverOpts...)
require.NoError(tb, err)
tb.Cleanup(serverFactory.GracefulStop)
registrar(externalServer, deps)
registerHealthServerIfNotRegistered(externalServer)
var listener net.Listener
var addr string
switch {
case cfg.TLSListenAddr != "":
listener, err = net.Listen("tcp", cfg.TLSListenAddr)
require.NoError(tb, err)
_, port, err := net.SplitHostPort(listener.Addr().String())
require.NoError(tb, err)
addr = "tls://localhost:" + port
case cfg.ListenAddr != "":
listener, err = net.Listen("tcp", cfg.ListenAddr)
require.NoError(tb, err)
addr = "tcp://" + listener.Addr().String()
default:
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(tb)
listener, err = net.Listen("unix", serverSocketPath)
require.NoError(tb, err)
addr = "unix://" + serverSocketPath
}
go func() {
assert.NoError(tb, externalServer.Serve(listener), "failure to serve external gRPC")
}()
waitHealthy(tb, ctx, addr, cfg.Auth.Token)
return externalServer, addr, gsd.disablePraefect
}
func registerHealthServerIfNotRegistered(srv *grpc.Server) {
if _, found := srv.GetServiceInfo()["grpc.health.v1.Health"]; !found {
// we should register health service as it is used for the health checks
// praefect service executes periodically (and on the bootstrap step)
healthpb.RegisterHealthServer(srv, health.NewServer())
}
}
type gitalyServerDeps struct {
disablePraefect bool
logger log.Logger
conns *client.Pool
locator storage.Locator
txMgr transaction.Manager
hookMgr hook.Manager
gitlabClient gitlab.Client
gitCmdFactory gitcmd.CommandFactory
backchannelReg *backchannel.Registry
catfileCache catfile.Cache
diskCache cache.Cache
packObjectsCache streamcache.Cache
packObjectsLimiter limiter.Limiter
limitHandler *limithandler.LimiterMiddleware
repositoryCounter *counter.RepositoryCounter
updaterWithHooks *updateref.UpdaterWithHooks
housekeepingManager housekeepingmgr.Manager
backupSink *backup.Sink
backupLocator backup.Locator
signingKey string
transactionRegistry *storagemgr.TransactionRegistry
procReceiveRegistry *hook.ProcReceiveRegistry
bundleURIManager *bundleuri.GenerationManager
bundleURISink *bundleuri.Sink
bundleURIStrategy bundleuri.GenerationStrategy
localRepoFactory localrepo.Factory
MigrationStateManager migration.StateManager
transactionInterceptorsFn func(log.Logger, storage.Node, localrepo.Factory) ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor)
}
func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Context, cfg config.Cfg) *service.Dependencies {
if gsd.logger == nil {
gsd.logger = testhelper.NewLogger(tb, testhelper.WithLoggerName("gitaly"))
}
if gsd.conns == nil {
gsd.conns = client.NewPool(client.WithDialOptions(client.UnaryInterceptor(), client.StreamInterceptor()))
}
if gsd.locator == nil {
gsd.locator = config.NewLocator(cfg)
}
if gsd.gitlabClient == nil {
gsd.gitlabClient = gitlab.NewMockClient(
tb, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive,
)
}
if gsd.backchannelReg == nil {
gsd.backchannelReg = backchannel.NewRegistry()
}
if gsd.txMgr == nil {
gsd.txMgr = transaction.NewManager(cfg, gsd.logger, gsd.backchannelReg)
}
if gsd.gitCmdFactory == nil {
gsd.gitCmdFactory = gittest.NewCommandFactory(tb, cfg)
}
if gsd.transactionRegistry == nil {
gsd.transactionRegistry = storagemgr.NewTransactionRegistry()
}
if gsd.procReceiveRegistry == nil {
gsd.procReceiveRegistry = hook.NewProcReceiveRegistry()
}
migrations := []migration.Migration{}
if gsd.MigrationStateManager == nil {
gsd.MigrationStateManager = migration.NewStateManager(migrations)
}
var node storage.Node
if testhelper.IsWALEnabled() {
dbMgr, err := databasemgr.NewDBManager(
ctx,
cfg.Storages,
keyvalue.NewBadgerStore,
helper.NewNullTickerFactory(),
gsd.logger,
)
require.NoError(tb, err)
tb.Cleanup(dbMgr.Close)
var raftFactory raftmgr.RaftReplicaFactory
var raftNode *raftmgr.Node
if testhelper.IsRaftEnabled() && !testhelper.IsPraefectEnabled() {
cfg.Raft = config.DefaultRaftConfig(uuid.New().String())
// Speed up initial election overhead in the test setup
cfg.Raft.ElectionTicks = 5
cfg.Raft.RTTMilliseconds = 100
cfg.Raft.SnapshotDir = testhelper.TempDir(tb)
raftNode, err = raftmgr.NewNode(cfg, gsd.logger, dbMgr, gsd.conns)
require.NoError(tb, err)
raftFactory = raftmgr.DefaultFactoryWithNode(cfg.Raft, raftNode)
}
partitionFactoryOptions := []partition.FactoryOption{
partition.WithCmdFactory(gsd.gitCmdFactory),
partition.WithRepoFactory(localrepo.NewFactory(gsd.logger, gsd.locator, gsd.gitCmdFactory, gsd.catfileCache)),
partition.WithMetrics(partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus))),
partition.WithRaftConfig(cfg.Raft),
partition.WithRaftFactory(raftFactory),
}
nodeMgr, err := nodeimpl.NewManager(
cfg.Storages,
storagemgr.NewFactory(
gsd.logger,
dbMgr,
migration.NewFactory(
partition.NewFactory(partitionFactoryOptions...),
migration.NewMetrics(),
migrations,
),
storagemgr.DefaultMaxInactivePartitions,
storagemgr.NewMetrics(cfg.Prometheus),
),
)
require.NoError(tb, err)
tb.Cleanup(nodeMgr.Close)
if testhelper.IsRaftEnabled() && !testhelper.IsPraefectEnabled() {
for _, storageCfg := range cfg.Storages {
baseStorage, err := nodeMgr.GetStorage(storageCfg.Name)
require.NoError(tb, err)
require.NoError(tb, raftNode.SetBaseStorage(storageCfg.Name, baseStorage))
}
node = raftNode
} else {
node = nodeMgr
}
}
// This is to allow building a bundle generation from a Sink
// without having to create one beforehand.
// If bundleURIManager is defined though, we use that one.
if gsd.bundleURIManager == nil && gsd.bundleURISink != nil {
var strategy bundleuri.GenerationStrategy
if gsd.bundleURIStrategy != nil {
strategy = gsd.bundleURIStrategy
} else {
strategy = bundleuri.NewSimpleStrategy(true)
}
manager, err := bundleuri.NewGenerationManager(ctx, gsd.bundleURISink, gsd.logger, node, strategy)
require.NoError(tb, err)
gsd.bundleURIManager = manager
}
if gsd.hookMgr == nil {
gsd.hookMgr = hook.NewManager(
cfg, gsd.locator,
gsd.logger,
gsd.gitCmdFactory,
gsd.txMgr,
gsd.gitlabClient,
hook.NewTransactionRegistry(gsd.transactionRegistry),
gsd.procReceiveRegistry,
node,
)
}
if gsd.catfileCache == nil {
cache := catfile.NewCache(cfg)
gsd.catfileCache = cache
tb.Cleanup(cache.Stop)
}
if gsd.diskCache == nil {
gsd.diskCache = cache.New(cfg, gsd.locator, gsd.logger)
}
if gsd.packObjectsCache == nil {
gsd.packObjectsCache = streamcache.New(cfg.PackObjectsCache, gsd.logger)
tb.Cleanup(gsd.packObjectsCache.Stop)
}
if gsd.packObjectsLimiter == nil {
gsd.packObjectsLimiter = limiter.NewConcurrencyLimiter(
limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 0}),
0,
0,
limiter.NewNoopConcurrencyMonitor(),
)
}
if gsd.limitHandler == nil {
_, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters)
}
if gsd.repositoryCounter == nil {
gsd.repositoryCounter = counter.NewRepositoryCounter(cfg.Storages)
}
if gsd.updaterWithHooks == nil {
gsd.updaterWithHooks = updateref.NewUpdaterWithHooks(cfg, gsd.logger, gsd.locator, gsd.hookMgr, gsd.gitCmdFactory, gsd.catfileCache)
}
if gsd.housekeepingManager == nil {
gsd.housekeepingManager = housekeepingmgr.New(cfg.Prometheus, gsd.logger, gsd.txMgr, node)
}
if gsd.signingKey != "" {
cfg.Git.SigningKey = gsd.signingKey
}
gsd.localRepoFactory = localrepo.NewFactory(gsd.logger, gsd.locator, gsd.gitCmdFactory, gsd.catfileCache)
return &service.Dependencies{
Logger: gsd.logger,
Cfg: cfg,
ClientPool: gsd.conns,
StorageLocator: gsd.locator,
TransactionManager: gsd.txMgr,
GitalyHookManager: gsd.hookMgr,
GitCmdFactory: gsd.gitCmdFactory,
BackchannelRegistry: gsd.backchannelReg,
GitlabClient: gsd.gitlabClient,
CatfileCache: gsd.catfileCache,
DiskCache: gsd.diskCache,
PackObjectsCache: gsd.packObjectsCache,
PackObjectsLimiter: gsd.packObjectsLimiter,
LimitHandler: gsd.limitHandler,
RepositoryCounter: gsd.repositoryCounter,
UpdaterWithHooks: gsd.updaterWithHooks,
HousekeepingManager: gsd.housekeepingManager,
TransactionRegistry: gsd.transactionRegistry,
Node: node,
BackupSink: gsd.backupSink,
BackupLocator: gsd.backupLocator,
ProcReceiveRegistry: gsd.procReceiveRegistry,
BundleURIManager: gsd.bundleURIManager,
LocalRepositoryFactory: gsd.localRepoFactory,
MigrationStateManager: gsd.MigrationStateManager,
}
}
// GitalyServerOpt is a helper type to shorten declarations.
type GitalyServerOpt func(gitalyServerDeps) gitalyServerDeps
// WithLogger sets a log.Logger instance that will be used for gitaly services initialisation.
func WithLogger(logger log.Logger) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.logger = logger
return deps
}
}
// WithLocator sets a storage.Locator instance that will be used for gitaly services initialisation.
func WithLocator(locator storage.Locator) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.locator = locator
return deps
}
}
// WithGitCommandFactory sets a gitcmd.CommandFactory instance that will be used for gitaly services
// initialisation.
func WithGitCommandFactory(gitCmdFactory gitcmd.CommandFactory) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.gitCmdFactory = gitCmdFactory
return deps
}
}
// WithGitLabClient sets gitlab.Client instance that will be used for gitaly services initialisation.
func WithGitLabClient(gitlabClient gitlab.Client) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.gitlabClient = gitlabClient
return deps
}
}
// WithHookManager sets hook.Manager instance that will be used for gitaly services initialisation.
func WithHookManager(hookMgr hook.Manager) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.hookMgr = hookMgr
return deps
}
}
// WithTransactionManager sets transaction.Manager instance that will be used for gitaly services initialisation.
func WithTransactionManager(txMgr transaction.Manager) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.txMgr = txMgr
return deps
}
}
// WithDisablePraefect disables setup and usage of the praefect as a proxy before the gitaly service.
func WithDisablePraefect() GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.disablePraefect = true
return deps
}
}
// WithBackchannelRegistry sets backchannel.Registry instance that will be used for gitaly services initialisation.
func WithBackchannelRegistry(backchannelReg *backchannel.Registry) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.backchannelReg = backchannelReg
return deps
}
}
// WithDiskCache sets the cache.Cache instance that will be used for gitaly services initialisation.
func WithDiskCache(diskCache cache.Cache) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.diskCache = diskCache
return deps
}
}
// WithRepositoryCounter sets the counter.RepositoryCounter instance that will be used for gitaly services initialisation.
func WithRepositoryCounter(repositoryCounter *counter.RepositoryCounter) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.repositoryCounter = repositoryCounter
return deps
}
}
// WithPackObjectsLimiter sets the PackObjectsLimiter that will be
// used for gitaly services initialization.
func WithPackObjectsLimiter(limiter *limiter.ConcurrencyLimiter) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.packObjectsLimiter = limiter
return deps
}
}
// WithPackObjectsCache sets the PackObjectsCache that will be
// used for gitaly services initialization.
func WithPackObjectsCache(cache streamcache.Cache) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.packObjectsCache = cache
return deps
}
}
// WithHousekeepingManager sets the housekeeping.Manager that will be used for Gitaly services
// initialization.
func WithHousekeepingManager(manager housekeepingmgr.Manager) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.housekeepingManager = manager
return deps
}
}
// WithBackupSink sets the backup.Sink that will be used for Gitaly services
func WithBackupSink(backupSink *backup.Sink) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.backupSink = backupSink
return deps
}
}
// WithBackupLocator sets the backup.Locator that will be used for Gitaly services
func WithBackupLocator(backupLocator backup.Locator) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.backupLocator = backupLocator
return deps
}
}
// WithBundleURIStrategy sets the bundleuri.GenerationStrategy that will be used by the bundleuri.GenerationManager
// The default value is bundleuri.NewSimpleStrategy(true)
func WithBundleURIStrategy(strategy bundleuri.GenerationStrategy) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.bundleURIStrategy = strategy
return deps
}
}
// WithBundleURISink sets the *bundleuri.Sink that will be used by the bundleuri.GenerationManager
func WithBundleURISink(sink *bundleuri.Sink) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.bundleURISink = sink
return deps
}
}
// WithBundleURIManager sets the *bundleuri.GenerationManager that will be used by the bundleuri.GenerationManager
func WithBundleURIManager(manager *bundleuri.GenerationManager) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.bundleURIManager = manager
return deps
}
}
// WithSigningKey sets the signing key path that will be used for Gitaly
// services.
func WithSigningKey(signingKey string) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.signingKey = signingKey
return deps
}
}
// WithTransactionRegistry sets the transaction registry that will be used for Gitaly services.
func WithTransactionRegistry(registry *storagemgr.TransactionRegistry) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.transactionRegistry = registry
return deps
}
}
// WithProcReceiveRegistry sets the proc receive registry that will be used for Gitaly services.
func WithProcReceiveRegistry(registry *hook.ProcReceiveRegistry) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.procReceiveRegistry = registry
return deps
}
}
// WithRepositoryFactory sets the localrepo.Factory that will be used for gitaly services initialisation.
func WithRepositoryFactory(repoFactory localrepo.Factory) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.localRepoFactory = repoFactory
return deps
}
}
// WithTransactionInterceptors allows for setting additional transaction middlewares to the server via
// a callback function.
func WithTransactionInterceptors(
fn func(log.Logger, storage.Node, localrepo.Factory) ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor),
) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.transactionInterceptorsFn = fn
return deps
}
}