internal/gitaly/storage/storagemgr/partition/transaction_manager.go (1,584 lines of code) (raw):
package partition
import (
"bufio"
"bytes"
"container/list"
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"regexp"
"runtime/trace"
"slices"
"sort"
"strconv"
"strings"
"sync"
"github.com/dgraph-io/badger/v4"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd"
housekeepingcfg "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict/fshistory"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/fsrecorder"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/snapshot"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal"
logging "gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/offloading"
"gitlab.com/gitlab-org/gitaly/v16/internal/safe"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/internal/tracing"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/correlation"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)
var (
// ErrRepositoryAlreadyExists is attempting to create a repository that already exists.
ErrRepositoryAlreadyExists = structerr.NewAlreadyExists("repository already exists")
// errInitializationFailed is returned when the TransactionManager failed to initialize successfully.
errInitializationFailed = errors.New("initializing transaction processing failed")
// errCommittedEntryGone is returned when the log entry of a LSN is gone from database while it's still
// accessed by other transactions.
errCommittedEntryGone = errors.New("in-used committed entry is gone")
// errNotDirectory is returned when the repository's path doesn't point to a directory
errNotDirectory = errors.New("repository's path didn't point to a directory")
// Below errors are used to error out in cases when updates have been staged in a read-only transaction.
errReadOnlyHousekeeping = errors.New("housekeeping in a read-only transaction")
errReadOnlyKeyValue = errors.New("key-value writes in a read-only transaction")
// errHousekeepingConflictOtherUpdates is returned when the transaction includes housekeeping alongside
// with other updates.
errHousekeepingConflictOtherUpdates = errors.New("housekeeping in the same transaction with other updates")
// errWritableAllRepository is returned when a transaction is started with
// no relative path filter specified and is not read-only. Transactions do
// not currently support writing to multiple repositories and so a writable
// transaction without a specified target relative path would be ambiguous.
errWritableAllRepository = errors.New("cannot start writable all repository transaction")
// keyAppliedLSN is the database key storing a partition's last applied log entry's LSN.
keyAppliedLSN = []byte("applied_lsn")
)
// InvalidReferenceFormatError is returned when a reference name was invalid.
type InvalidReferenceFormatError struct {
// ReferenceName is the reference with invalid format.
ReferenceName git.ReferenceName
}
// Error returns the formatted error string.
func (err InvalidReferenceFormatError) Error() string {
return fmt.Sprintf("invalid reference format: %q", err.ReferenceName)
}
// newConflictingKeyValueOperationError returns an error that is raised when a transaction
// attempts to commit a key-value operation that conflicted with other concurrently committed transactions.
func newConflictingKeyValueOperationError(key string) error {
return structerr.NewAborted("conflicting key-value operations").WithMetadata("key", key)
}
// repositoryCreation models a repository creation in a transaction.
type repositoryCreation struct {
// objectHash defines the object format the repository is created with.
objectHash git.ObjectHash
}
type transactionState int
const (
// transactionStateOpen indicates the transaction is open, and hasn't been committed or rolled back yet.
transactionStateOpen = transactionState(iota)
// transactionStateRollback indicates the transaction has been rolled back.
transactionStateRollback
// transactionStateCommit indicates the transaction has already been committed.
transactionStateCommit
)
// Transaction is a unit-of-work that contains reference changes to perform on the repository.
type Transaction struct {
// write denotes whether or not this transaction is a write transaction.
write bool
// repositoryExists indicates whether the target repository existed when this transaction began.
repositoryExists bool
// metrics stores metric reporters inherited from the manager.
metrics ManagerMetrics
// state records whether the transaction is still open. Transaction is open until either Commit()
// or Rollback() is called on it.
state transactionState
// stateLatch guards the transaction against concurrent commit and rollback operations. Transactions
// are not generally safe for concurrent use. As the transaction may need to be committed in the
// post-receive hook, there's potential for a race. If the RPC times out, it could be that the
// PostReceiveHook RPC's goroutine attempts to commit a transaction at the same time as the parent
// RPC's goroutine attempts to abort it. stateLatch guards against this race.
stateLatch sync.Mutex
// commit commits the Transaction through the TransactionManager.
commit func(context.Context, *Transaction) error
// result is where the outcome of the transaction is sent to by TransactionManager once it
// has been determined.
result chan error
// admitted is set when the transaction was admitted for processing in the TransactionManager.
// Transaction queues in admissionQueue to be committed, and is considered admitted once it has
// been dequeued by TransactionManager.Run(). Once the transaction is admitted, its ownership moves
// from the client goroutine to the TransactionManager.Run() goroutine, and the client goroutine must
// not do any modifications to the state of the transaction anymore to avoid races.
admitted bool
// finish cleans up the transaction releasing the resources associated with it. It must be called
// once the transaction is done with.
finish func(admitted bool) error
// finished is closed when the transaction has been finished. This enables waiting on transactions
// to finish where needed.
finished chan struct{}
// relativePath is the relative path of the repository this transaction is targeting.
relativePath string
// stagingDirectory is the directory where the transaction stages its files prior
// to them being logged. It is cleaned up when the transaction finishes.
stagingDirectory string
// quarantineDirectory is the directory within the stagingDirectory where the new objects of the
// transaction are quarantined.
quarantineDirectory string
// packPrefix contains the prefix (`pack-<digest>`) of the transaction's pack if the transaction
// had objects to log.
packPrefix string
// snapshotRepository is a snapshot of the target repository with a possible quarantine applied
// if this is a read-write transaction.
snapshotRepository *localrepo.Repo
// snapshotLSN is the log sequence number which this transaction is reading the repository's
// state at.
snapshotLSN storage.LSN
// snapshot is the transaction's snapshot of the partition file system state. It's used to rewrite
// relative paths to point to the snapshot instead of the actual repositories.
snapshot snapshot.FileSystem
// db is the transaction's snapshot of the partition's key-value state. The keyvalue.Transaction is
// discarded when the transaction finishes. The recorded writes are write-ahead logged and applied
// to the partition from the WAL.
db keyvalue.Transaction
// fs is the transaction's file system handle. Operations through it are recorded in the transaction.
fs fsrecorder.FS
// referenceRecorder records the file system operations performed by reference transactions.
referenceRecorder *wal.ReferenceRecorder
// recordingReadWriter is a ReadWriter operating on db that also records operations performed. This
// is used to record the operations performed so they can be conflict checked and write-ahead logged.
recordingReadWriter keyvalue.RecordingReadWriter
// stagingSnapshot is the snapshot used for staging the transaction, and where the staging repository
// exists.
stagingSnapshot snapshot.FileSystem
// manifest is the manifest of the log entry. It's stored the log entry as some of the operations may
// need to still modify it after admission.
manifest *gitalypb.LogEntry
// walEntry is the log entry where the transaction stages its state for committing.
walEntry *wal.Entry
initialReferenceValues map[git.ReferenceName]git.Reference
referenceUpdates []git.ReferenceUpdates
repositoryCreation *repositoryCreation
deleteRepository bool
includedObjects map[git.ObjectID]struct{}
runHousekeeping *runHousekeeping
// objectDependencies are the object IDs this transaction depends on in
// the repository. The dependencies are used to guard against invalid packs
// being committed which don't contain all necessary objects. The write could
// either be missing objects, or a concurrent prune could have removed the
// dependencies.
objectDependencies map[git.ObjectID]struct{}
}
// Begin opens a new transaction. The caller must call either Commit or Rollback to release
// the resources tied to the transaction. The returned Transaction is not safe for concurrent use.
//
// The returned Transaction's read snapshot includes all writes that were committed prior to the
// Begin call. Begin blocks until the committed writes have been applied to the repository.
func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOptions) (_ storage.Transaction, returnedErr error) {
defer trace.StartRegion(ctx, "begin").End()
defer prometheus.NewTimer(mgr.metrics.beginDuration(opts.Write)).ObserveDuration()
transactionDurationTimer := prometheus.NewTimer(mgr.metrics.transactionDuration(opts.Write))
trace.Log(ctx, "correlation_id", correlation.ExtractFromContext(ctx))
trace.Log(ctx, "storage_name", mgr.storageName)
trace.Log(ctx, "partition_id", mgr.partitionID.String())
trace.Log(ctx, "write", strconv.FormatBool(opts.Write))
trace.Log(ctx, "relative_path_filter_set", strconv.FormatBool(opts.RelativePaths != nil))
trace.Log(ctx, "relative_path_filter", strings.Join(opts.RelativePaths, ";"))
trace.Log(ctx, "force_exclusive_snapshot", strconv.FormatBool(opts.ForceExclusiveSnapshot))
// Wait until the manager has been initialized so the notification channels
// and the LSNs are loaded.
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-mgr.initialized:
if !mgr.initializationSuccessful {
return nil, errInitializationFailed
}
}
var relativePath string
if len(opts.RelativePaths) > 0 {
// Set the first repository as the tracked repository
relativePath = opts.RelativePaths[0]
}
if opts.RelativePaths == nil && opts.Write {
return nil, errWritableAllRepository
}
span, _ := tracing.StartSpanIfHasParent(ctx, "transaction.Begin", nil)
span.SetTag("write", opts.Write)
span.SetTag("relativePath", relativePath)
defer span.Finish()
mgr.mutex.Lock()
txn := &Transaction{
write: opts.Write,
commit: mgr.commit,
snapshotLSN: mgr.logManager.AppendedLSN(),
finished: make(chan struct{}),
relativePath: relativePath,
metrics: mgr.metrics,
}
mgr.createSnapshotLockIfNeeded(txn.snapshotLSN)
mgr.snapshotLocks[txn.snapshotLSN].activeSnapshotters.Add(1)
defer mgr.snapshotLocks[txn.snapshotLSN].activeSnapshotters.Done()
readReady := mgr.snapshotLocks[txn.snapshotLSN].applied
var entry *committedEntry
if txn.write {
entry = mgr.updateCommittedEntry(txn.snapshotLSN)
}
mgr.mutex.Unlock()
span.SetTag("snapshotLSN", txn.snapshotLSN)
txn.finish = func(admitted bool) error {
defer trace.StartRegion(ctx, "finish transaction").End()
defer close(txn.finished)
defer transactionDurationTimer.ObserveDuration()
defer func() {
if txn.db != nil {
txn.db.Discard()
}
if txn.write {
var removedAnyEntry bool
mgr.mutex.Lock()
removedAnyEntry = mgr.cleanCommittedEntry(entry)
mgr.mutex.Unlock()
// Signal the manager this transaction finishes. The purpose of this signaling is to wake it up
// and clean up stale entries in the database. The manager scans and removes leading empty
// entries. We signal only if the transaction modifies the in-memory committed entry.
// This signal queue is buffered. If the queue is full, the manager hasn't woken up. The
// next scan will cover the work of the prior one. So, no need to let the transaction wait.
//
// ┌─ 1st signal ┌── The manager scans til here
// ┌─┐ ┌─┐ ┌─┐ ┌─┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐
// └─┘ └─┘ └┬┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘
// └─ 2nd signal
//
if removedAnyEntry {
select {
case mgr.completedQueue <- struct{}{}:
default:
}
}
}
}()
cleanTemporaryState := func() error {
defer trace.StartRegion(ctx, "cleanTemporaryState").End()
var cleanupErr error
if txn.snapshot != nil {
if err := txn.snapshot.Close(); err != nil {
cleanupErr = errors.Join(cleanupErr, fmt.Errorf("close snapshot: %w", err))
}
}
if txn.stagingSnapshot != nil {
if err := txn.stagingSnapshot.Close(); err != nil {
cleanupErr = errors.Join(cleanupErr, fmt.Errorf("close staging snapshot: %w", err))
}
}
if txn.stagingDirectory != "" {
if err := os.RemoveAll(txn.stagingDirectory); err != nil {
cleanupErr = errors.Join(cleanupErr, fmt.Errorf("remove staging directory: %w", err))
}
}
return cleanupErr
}
if admitted {
// If the transaction was admitted, `.Run()` is responsible for cleaning the transaction up.
// Cleaning up the snapshots can take a relatively long time if the snapshots are large, or if
// the file system is busy. To avoid blocking transaction processing, we us a pool of background
// workers to clean up the transaction snapshots.
//
// The number of background workers is limited to exert backpressure on write transactions if
// we can't clean up after them fast enough.
mgr.cleanupWorkers.Go(func() error {
if err := cleanTemporaryState(); err != nil {
mgr.cleanupWorkerFailedOnce.Do(func() { close(mgr.cleanupWorkerFailed) })
return fmt.Errorf("clean temporary state async: %w", err)
}
return nil
})
return nil
}
if err := cleanTemporaryState(); err != nil {
return fmt.Errorf("clean temporary state sync: %w", err)
}
return nil
}
defer func() {
if returnedErr != nil {
if err := txn.finish(false); err != nil {
mgr.logger.WithError(err).ErrorContext(ctx, "failed finishing unsuccessful transaction begin")
}
}
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-mgr.ctx.Done():
return nil, storage.ErrTransactionProcessingStopped
case <-readReady:
txn.db = mgr.db.NewTransaction(txn.write)
txn.recordingReadWriter = keyvalue.NewRecordingReadWriter(txn.db)
relativePaths := opts.RelativePaths
if relativePaths == nil {
relativePaths = txn.PartitionRelativePaths()
}
var err error
txn.stagingDirectory, err = os.MkdirTemp(mgr.stagingDirectory, "")
if err != nil {
return nil, fmt.Errorf("mkdir temp: %w", err)
}
if txn.snapshot, err = mgr.snapshotManager.GetSnapshot(ctx,
relativePaths,
txn.write || opts.ForceExclusiveSnapshot,
); err != nil {
return nil, fmt.Errorf("get snapshot: %w", err)
}
if txn.write {
// Create a directory to store all staging files.
if err := os.Mkdir(txn.walFilesPath(), mode.Directory); err != nil {
return nil, fmt.Errorf("create wal files directory: %w", err)
}
txn.walEntry = wal.NewEntry(txn.walFilesPath())
}
txn.fs = fsrecorder.NewFS(txn.snapshot.Root(), txn.walEntry)
if txn.repositoryTarget() {
txn.repositoryExists, err = mgr.doesRepositoryExist(ctx, txn.snapshot.RelativePath(txn.relativePath))
if err != nil {
return nil, fmt.Errorf("does repository exist: %w", err)
}
txn.snapshotRepository = mgr.repositoryFactory.Build(txn.snapshot.RelativePath(txn.relativePath))
if txn.write {
if txn.repositoryExists {
txn.quarantineDirectory = filepath.Join(txn.stagingDirectory, "quarantine")
if err := os.MkdirAll(filepath.Join(txn.quarantineDirectory, "pack"), mode.Directory); err != nil {
return nil, fmt.Errorf("create quarantine directory: %w", err)
}
txn.snapshotRepository, err = txn.snapshotRepository.Quarantine(ctx, txn.quarantineDirectory)
if err != nil {
return nil, fmt.Errorf("quarantine: %w", err)
}
refRecorderTmpDir := filepath.Join(txn.stagingDirectory, "ref-recorder")
if err := os.Mkdir(refRecorderTmpDir, os.ModePerm); err != nil {
return nil, fmt.Errorf("create reference recorder tmp dir: %w", err)
}
refBackend, err := txn.snapshotRepository.ReferenceBackend(ctx)
if err != nil {
return nil, fmt.Errorf("reference backend: %w", err)
}
if refBackend == git.ReferenceBackendFiles {
objectHash, err := txn.snapshotRepository.ObjectHash(ctx)
if err != nil {
return nil, fmt.Errorf("object hash: %w", err)
}
if txn.referenceRecorder, err = wal.NewReferenceRecorder(refRecorderTmpDir, txn.walEntry, txn.snapshot.Root(), txn.relativePath, objectHash.ZeroOID); err != nil {
return nil, fmt.Errorf("new reference recorder: %w", err)
}
}
} else {
// The repository does not exist, and this is a write. This should thus create the repository. As the repository's final state
// is still being logged in TransactionManager, we already log here the creation of any missing parent directories of
// the repository. When the transaction commits, we don't know if they existed or not, so we can't record this later.
//
// If the repository is at the root of the storage, there's no parent directories to create.
if parentDir := filepath.Dir(txn.relativePath); parentDir != "." {
if err := storage.MkdirAll(txn.fs, parentDir); err != nil {
return nil, fmt.Errorf("create parent directories: %w", err)
}
}
txn.quarantineDirectory = filepath.Join(mgr.storagePath, txn.snapshot.RelativePath(txn.relativePath), "objects")
}
}
}
return txn, nil
}
}
// repositoryTarget returns true if the transaction targets a repository.
func (txn *Transaction) repositoryTarget() bool {
return txn.relativePath != ""
}
// PartitionRelativePaths returns all known repository relative paths for the
// transactions partition.
func (txn *Transaction) PartitionRelativePaths() []string {
it := txn.KV().NewIterator(keyvalue.IteratorOptions{
Prefix: []byte(storage.RepositoryKeyPrefix),
})
defer it.Close()
var relativePaths []string
for it.Rewind(); it.Valid(); it.Next() {
key := it.Item().Key()
relativePath := bytes.TrimPrefix(key, []byte(storage.RepositoryKeyPrefix))
relativePaths = append(relativePaths, string(relativePath))
}
return relativePaths
}
// RewriteRepository returns a copy of the repository that has been set up to correctly access
// the repository in the transaction's snapshot.
func (txn *Transaction) RewriteRepository(repo *gitalypb.Repository) *gitalypb.Repository {
rewritten := proto.Clone(repo).(*gitalypb.Repository)
rewritten.RelativePath = txn.snapshot.RelativePath(repo.GetRelativePath())
if repo.GetRelativePath() == txn.relativePath {
rewritten.GitObjectDirectory = txn.snapshotRepository.GetGitObjectDirectory()
rewritten.GitAlternateObjectDirectories = txn.snapshotRepository.GetGitAlternateObjectDirectories()
}
return rewritten
}
// OriginalRepository returns the repository as it was before rewriting it to point to the snapshot.
func (txn *Transaction) OriginalRepository(repo *gitalypb.Repository) *gitalypb.Repository {
original := proto.Clone(repo).(*gitalypb.Repository)
original.RelativePath = strings.TrimPrefix(repo.GetRelativePath(), txn.snapshot.Prefix()+string(os.PathSeparator))
original.GitObjectDirectory = ""
original.GitAlternateObjectDirectories = nil
return original
}
func (txn *Transaction) updateState(newState transactionState) error {
txn.stateLatch.Lock()
defer txn.stateLatch.Unlock()
switch txn.state {
case transactionStateOpen:
txn.state = newState
return nil
case transactionStateRollback:
return storage.ErrTransactionAlreadyRollbacked
case transactionStateCommit:
return storage.ErrTransactionAlreadyCommitted
default:
return fmt.Errorf("unknown transaction state: %q", txn.state)
}
}
// Commit performs the changes. If no error is returned, the transaction was successful and the changes
// have been performed. If an error was returned, the transaction may or may not be persisted.
func (txn *Transaction) Commit(ctx context.Context) (returnedErr error) {
defer trace.StartRegion(ctx, "commit").End()
if err := txn.updateState(transactionStateCommit); err != nil {
return err
}
defer prometheus.NewTimer(txn.metrics.commitDuration(txn.write)).ObserveDuration()
defer func() {
if err := txn.finishUnadmitted(); err != nil && returnedErr == nil {
returnedErr = err
}
}()
if !txn.write {
// These errors are only for reporting programming mistakes where updates have been
// accidentally staged in a read-only transaction. The changes would not be anyway
// performed as read-only transactions are not committed through the manager.
switch {
case txn.runHousekeeping != nil:
return errReadOnlyHousekeeping
case len(txn.recordingReadWriter.WriteSet()) > 0:
return errReadOnlyKeyValue
default:
return nil
}
}
if txn.runHousekeeping != nil && (txn.referenceUpdates != nil ||
txn.deleteRepository ||
txn.includedObjects != nil) {
return errHousekeepingConflictOtherUpdates
}
return txn.commit(ctx, txn)
}
// Rollback releases resources associated with the transaction without performing any changes.
func (txn *Transaction) Rollback(ctx context.Context) error {
defer trace.StartRegion(ctx, "rollback").End()
if err := txn.updateState(transactionStateRollback); err != nil {
return err
}
defer prometheus.NewTimer(txn.metrics.rollbackDuration(txn.write)).ObserveDuration()
return txn.finishUnadmitted()
}
// finishUnadmitted cleans up after the transaction if it wasn't yet admitted. If the transaction was admitted,
// the Transaction is being processed by TransactionManager. The clean up responsibility moves there as well
// to avoid races.
func (txn *Transaction) finishUnadmitted() error {
if txn.admitted {
return nil
}
return txn.finish(false)
}
// SnapshotLSN returns the LSN of the Transaction's read snapshot.
func (txn *Transaction) SnapshotLSN() storage.LSN {
return txn.snapshotLSN
}
// Root returns the path to the read snapshot.
func (txn *Transaction) Root() string {
return txn.snapshot.Root()
}
// RecordInitialReferenceValues records the initial values of the references for the next UpdateReferences call. If oid is
// not a zero OID, it's used as the initial value. If oid is a zero value, the reference's actual value is resolved.
//
// The reference's first recorded value is used as its old OID in the update. RecordInitialReferenceValues can be used to
// record the value without staging an update in the transaction. This is useful for example generally recording the initial
// value in the 'prepare' phase of the reference transaction hook before any changes are made without staging any updates
// before the 'committed' phase is reached. The recorded initial values are only used for the next UpdateReferences call.
func (txn *Transaction) RecordInitialReferenceValues(ctx context.Context, initialValues map[git.ReferenceName]git.Reference) error {
txn.initialReferenceValues = make(map[git.ReferenceName]git.Reference, len(initialValues))
objectHash, err := txn.snapshotRepository.ObjectHash(ctx)
if err != nil {
return fmt.Errorf("object hash: %w", err)
}
for name, reference := range initialValues {
if !reference.IsSymbolic {
oid := git.ObjectID(reference.Target)
if objectHash.IsZeroOID(oid) {
// If this is a zero OID, resolve the value to see if this is a force update or the
// reference doesn't exist.
if current, err := txn.snapshotRepository.ResolveRevision(ctx, name.Revision()); err != nil {
if !errors.Is(err, git.ErrReferenceNotFound) {
return fmt.Errorf("resolve revision: %w", err)
}
// The reference doesn't exist, leave the value as zero oid.
} else {
oid = current
}
}
txn.initialReferenceValues[name] = git.NewReference(name, oid)
} else {
txn.initialReferenceValues[name] = reference
}
}
return nil
}
// UpdateReferences updates the given references as part of the transaction. Each call is treated as
// a different reference transaction. This allows for performing directory-file conflict inducing
// changes in a transaction. For example:
//
// - First call - delete 'refs/heads/parent'
// - Second call - create 'refs/heads/parent/child'
//
// If a reference is updated multiple times during a transaction, its first recorded old OID used as
// the old OID when verifying the reference update, and the last recorded new OID is used as the new
// OID in the final commit. This means updates like 'oid-1 -> oid-2 -> oid-3' will ultimately be
// committed as 'oid-1 -> oid-3'. The old OIDs of the intermediate states are not verified when
// committing the write to the actual repository and are discarded from the final committed log
// entry.
func (txn *Transaction) UpdateReferences(ctx context.Context, updates git.ReferenceUpdates) error {
u := git.ReferenceUpdates{}
for reference, update := range updates {
// Transactions should only stage references with valid names as otherwise Git would already
// fail when they try to stage them against their snapshot. `update-ref` happily accepts references
// outside of `refs` directory so such references could theoretically arrive here. We thus sanity
// check that all references modified are within the refs directory.
//
// HEAD is a special case and refers to a default branch update.
if !strings.HasPrefix(reference.String(), "refs/") && reference != "HEAD" {
return InvalidReferenceFormatError{ReferenceName: reference}
}
oldOID := update.OldOID
oldTarget := update.OldTarget
if initialValue, ok := txn.initialReferenceValues[reference]; ok {
if !initialValue.IsSymbolic {
oldOID = git.ObjectID(initialValue.Target)
} else {
oldTarget = git.ReferenceName(initialValue.Target)
}
}
if oldOID == update.NewOID && oldTarget == update.NewTarget {
// This was a no-op.
continue
}
for _, updates := range txn.referenceUpdates {
if txUpdate, ok := updates[reference]; ok {
if txUpdate.NewOID != "" {
oldOID = txUpdate.NewOID
}
if txUpdate.NewTarget != "" {
oldTarget = txUpdate.NewTarget
}
}
}
u[reference] = git.ReferenceUpdate{
OldOID: oldOID,
NewOID: update.NewOID,
OldTarget: oldTarget,
NewTarget: update.NewTarget,
}
}
txn.initialReferenceValues = nil
if len(u) == 0 {
return nil
}
// ReferenceRecorder is not used with reftables.
if txn.referenceRecorder != nil {
if err := txn.referenceRecorder.RecordReferenceUpdates(ctx, updates); err != nil {
return fmt.Errorf("record reference updates: %w", err)
}
}
txn.referenceUpdates = append(txn.referenceUpdates, u)
return nil
}
// DeleteRepository deletes the repository when the transaction is committed.
func (txn *Transaction) DeleteRepository() {
txn.deleteRepository = true
}
// PackRefs sets pack-refs housekeeping task as a part of the transaction. The transaction can only runs other
// housekeeping tasks in the same transaction. No other updates are allowed.
func (txn *Transaction) PackRefs() {
if txn.runHousekeeping == nil {
txn.runHousekeeping = &runHousekeeping{}
}
txn.runHousekeeping.packRefs = &runPackRefs{
PrunedRefs: map[git.ReferenceName]struct{}{},
}
}
// Repack sets repacking housekeeping task as a part of the transaction.
func (txn *Transaction) Repack(config housekeepingcfg.RepackObjectsConfig) {
if txn.runHousekeeping == nil {
txn.runHousekeeping = &runHousekeeping{}
}
txn.runHousekeeping.repack = &runRepack{
config: config,
}
}
// WriteCommitGraphs enables the commit graph to be rewritten as part of the transaction.
func (txn *Transaction) WriteCommitGraphs(config housekeepingcfg.WriteCommitGraphConfig) {
if txn.runHousekeeping == nil {
txn.runHousekeeping = &runHousekeeping{}
}
txn.runHousekeeping.writeCommitGraphs = &writeCommitGraphs{
config: config,
}
}
// SetOffloadingConfig configures a transaction to run an offloading task
// by setting the runOffloading struct. This configuration will be picked up later
// by the prepareOffloading function to execute an offloading task when the transaction commits.
func (txn *Transaction) SetOffloadingConfig(cfg housekeepingcfg.OffloadingConfig) {
if txn.runHousekeeping == nil {
txn.runHousekeeping = &runHousekeeping{}
}
txn.runHousekeeping.runOffloading = &runOffloading{
config: cfg,
}
}
// IncludeObject includes the given object and its dependencies in the transaction's logged pack file even
// if the object is unreachable from the references.
func (txn *Transaction) IncludeObject(oid git.ObjectID) {
if txn.includedObjects == nil {
txn.includedObjects = map[git.ObjectID]struct{}{}
}
txn.includedObjects[oid] = struct{}{}
}
// KV returns a handle to the key-value store snapshot of the transaction.
func (txn *Transaction) KV() keyvalue.ReadWriter {
return keyvalue.NewPrefixedReadWriter(txn.recordingReadWriter, []byte("kv/"))
}
// FS returns a handle to the transaction's file system snapshot.
func (txn *Transaction) FS() storage.FS {
return txn.fs
}
// walFilesPath returns the path to the directory where this transaction is staging the files that will
// be logged alongside the transaction's log entry.
func (txn *Transaction) walFilesPath() string {
return filepath.Join(txn.stagingDirectory, "wal-files")
}
// snapshotLock contains state used to synchronize snapshotters and the log application with each other.
// Snapshotters wait on the applied channel until all of the committed writes in the read snapshot have
// been applied on the repository. The log application waits until all activeSnapshotters have managed to
// snapshot their state prior to applying the next log entry to the repository.
type snapshotLock struct {
// applied is closed when the transaction the snapshotters are waiting for has been applied to the
// repository and is ready for reading.
applied chan struct{}
// activeSnapshotters tracks snapshotters who are either taking a snapshot or waiting for the
// log entry to be applied. Log application waits for active snapshotters to finish before applying
// the next entry.
activeSnapshotters sync.WaitGroup
}
// committedEntry is a wrapper for a log entry. It is used to keep track of entries in which their snapshots are still
// accessed by other transactions.
type committedEntry struct {
// entry is the in-memory reflection of referenced log entry.
entry *gitalypb.LogEntry
// lsn is the associated LSN of the entry
lsn storage.LSN
// snapshotReaders accounts for the number of transaction readers of the snapshot.
snapshotReaders int
// objectDependencies are the objects this transaction depends upon.
objectDependencies map[git.ObjectID]struct{}
}
// GetLogReader provides controlled access to underlying log management system for log consumption purpose. It
// allows the consumers to access to on-disk location of a LSN and acknowledge consumed position.
func (mgr *TransactionManager) GetLogReader() storage.LogReader {
return mgr.logManager
}
// GetLogWriter provides controlled access to underlying log management system for log appending purpose.
func (mgr *TransactionManager) GetLogWriter() storage.LogWriter {
return mgr.logManager
}
// TransactionManager is responsible for transaction management of a single repository. Each repository has
// a single TransactionManager; it is the repository's single-writer. It accepts writes one at a time from
// the admissionQueue. Each admitted write is processed in three steps:
//
// 1. The references being updated are verified by ensuring the expected old tips match what the references
// actually point to prior to update. The entire transaction is by default aborted if a single reference
// fails the verification step. The reference verification behavior can be controlled on a per-transaction
// level by setting:
// - The reference verification failures can be ignored instead of aborting the entire transaction.
// If done, the references that failed verification are dropped from the transaction but the updates
// that passed verification are still performed.
// 2. The transaction is appended to the write-ahead log. Once the write has been logged, it is effectively
// committed and will be applied to the repository even after restarting.
// 3. The transaction is applied from the write-ahead log to the repository by actually performing the reference
// changes.
//
// The goroutine that issued the transaction is waiting for the result while these steps are being performed. As
// there is no transaction control for readers yet, the issuer is only notified of a successful write after the
// write has been applied to the repository.
//
// TransactionManager recovers transactions after interruptions by applying the write-ahead logged transactions to
// the repository on start up.
//
// TransactionManager maintains the write-ahead log in a key-value store. It maintains the following key spaces:
// - `partition/<partition_id>/applied_lsn`
// - This key stores the LSN of the log entry that has been applied to the repository. This allows for
// determining how far a partition is in processing the log and which log entries need to be applied
// after starting up. Partition starts from LSN 0 if there are no log entries recorded to have
// been applied.
//
// - `partition/<partition_id:string>/log/entry/<log_index:uint64>`
// - These keys hold the actual write-ahead log entries. A partition's first log entry starts at LSN 1
// and the LSN keeps monotonically increasing from there on without gaps. The write-ahead log
// entries are processed in ascending order.
//
// The values in the database are marshaled protocol buffer messages. Numbers in the keys are encoded as big
// endian to preserve the sort order of the numbers also in lexicographical order.
type TransactionManager struct {
// ctx is the context used for all operations.
ctx context.Context
// close cancels ctx and stops the transaction processing.
close context.CancelFunc
// logger is the logger to use to write log messages.
logger logging.Logger
// closing is closed when close is called. It unblock transactions that are waiting to be admitted.
closing <-chan struct{}
// closed is closed when Run returns. It unblocks transactions that are waiting for a result after
// being admitted. This is differentiated from ctx.Done in order to enable testing that Run correctly
// releases awaiters when the transactions processing is stopped.
closed chan struct{}
// stagingDirectory is a path to a directory where this TransactionManager should stage the files of the transactions
// before it logs them. The TransactionManager cleans up the files during runtime but stale files may be
// left around after crashes. The files are temporary and any leftover files are expected to be cleaned up when
// Gitaly starts.
stagingDirectory string
// commandFactory is used to spawn git commands without a repository.
commandFactory gitcmd.CommandFactory
// repositoryFactory is used to build localrepo.Repo instances.
repositoryFactory localrepo.StorageScopedFactory
// storageName is the name of the storage the TransactionManager's partition is a member of.
storageName string
// storagePath is an absolute path to the root of the storage this TransactionManager
// is operating in.
storagePath string
// storage.PartitionID is the ID of the partition this manager is operating on. This is used to determine the database keys.
partitionID storage.PartitionID
// db is the handle to the key-value store used for storing the write-ahead log related state.
db keyvalue.Transactioner
// logManager manages the underlying Write-Ahead Log entries.
logManager storage.LogManager
// admissionQueue is where the incoming writes are waiting to be admitted to the transaction
// manager.
admissionQueue chan *Transaction
// completedQueue is a queue notifying when a transaction finishes.
completedQueue chan struct{}
// initialized is closed when the manager has been initialized. It's used to block new transactions
// from beginning prior to the manager having initialized its runtime state on start up.
initialized chan struct{}
// initializationSuccessful is set if the TransactionManager initialized successfully. If it didn't,
// transactions will fail to begin.
initializationSuccessful bool
// mutex guards access to snapshotLocks and appendedLSN. These fields are accessed by both
// Run and Begin which are ran in different goroutines.
mutex sync.Mutex
// cleanupWorkers is a worker pool that TransactionManager uses to run transaction clean up in the
// background. This way transaction processing is not blocked on the clean up.
cleanupWorkers *errgroup.Group
// cleanupWorkerFailed is closed if one of the clean up workers failed. This signals to the manager
// to stop processing and exit.
cleanupWorkerFailed chan struct{}
// cleanupWorkerFailedOnce ensures cleanupWorkerFailed is closed only once.
cleanupWorkerFailedOnce sync.Once
// snapshotLocks contains state used for synchronizing snapshotters with the log application. The
// lock is released after the corresponding log entry is applied.
snapshotLocks map[storage.LSN]*snapshotLock
// snapshotManager is responsible for creation and management of file system snapshots.
snapshotManager *snapshot.Manager
// conflictMgr is responsible for checking concurrent transactions against each other for conflicts.
conflictMgr *conflict.Manager
// fsHistory stores the history of file system operations for conflict checking purposes.
fsHistory *fshistory.History
// appliedLSN holds the LSN of the last log entry applied to the partition.
appliedLSN storage.LSN
// committedEntries keeps some latest appended log entries around. Some types of transactions, such as
// housekeeping, operate on snapshot repository. There is a gap between transaction doing its work and the time
// when it is committed. They need to verify if concurrent operations can cause conflict. These log entries are
// still kept around even after they are applied. They are removed when there are no active readers accessing
// the corresponding snapshots.
committedEntries *list.List
// testHooks are used in the tests to trigger logic at certain points in the execution.
// They are used to synchronize more complex test scenarios. Not used in production.
testHooks testHooks
// metrics stores reporters which facilitate metric recording of transactional operations.
metrics ManagerMetrics
// offloadingSink points to the offloading storage used during offloading tasks.
offloadingSink *offloading.Sink
}
// testHooks defines hooks for testing various stages of WAL log operations.
type testHooks struct {
// beforeInitialization is triggered before initialization starts.
beforeInitialization func()
// beforeAppendLogEntry is triggered before appending a log entry at the target LSN.
beforeAppendLogEntry func(targetLSN storage.LSN)
// beforeApplyLogEntry is triggered before applying a log entry at the target LSN.
beforeApplyLogEntry func(targetLSN storage.LSN)
// beforeStoreAppliedLSN is triggered before storing the target applied LSN.
beforeStoreAppliedLSN func(targetLSN storage.LSN)
// beforeRunExiting is triggered before the run loop exits.
beforeRunExiting func()
}
type transactionManagerParameters struct {
PtnID storage.PartitionID
Logger logging.Logger
DB keyvalue.Transactioner
StorageName, StoragePath string
StateDir, StagingDir string
OffloadingSink *offloading.Sink
CmdFactory gitcmd.CommandFactory
RepositoryFactory localrepo.StorageScopedFactory
Metrics ManagerMetrics
LogManager storage.LogManager
}
// NewTransactionManager returns a new TransactionManager for the given repository.
func NewTransactionManager(parameters *transactionManagerParameters) *TransactionManager {
ctx, cancel := context.WithCancel(context.Background())
cleanupWorkers := &errgroup.Group{}
cleanupWorkers.SetLimit(25)
return &TransactionManager{
ctx: ctx,
close: cancel,
logger: parameters.Logger,
closing: ctx.Done(),
closed: make(chan struct{}),
commandFactory: parameters.CmdFactory,
repositoryFactory: parameters.RepositoryFactory,
storageName: parameters.StorageName,
storagePath: parameters.StoragePath,
partitionID: parameters.PtnID,
db: parameters.DB,
logManager: parameters.LogManager,
admissionQueue: make(chan *Transaction),
completedQueue: make(chan struct{}, 1),
initialized: make(chan struct{}),
snapshotLocks: make(map[storage.LSN]*snapshotLock),
conflictMgr: conflict.NewManager(),
fsHistory: fshistory.New(),
stagingDirectory: parameters.StagingDir,
cleanupWorkers: cleanupWorkers,
cleanupWorkerFailed: make(chan struct{}),
committedEntries: list.New(),
metrics: parameters.Metrics,
offloadingSink: parameters.OffloadingSink,
testHooks: testHooks{
beforeInitialization: func() {},
beforeAppendLogEntry: func(storage.LSN) {},
beforeApplyLogEntry: func(storage.LSN) {},
beforeStoreAppliedLSN: func(storage.LSN) {},
beforeRunExiting: func() {},
},
}
}
// resultChannel represents a future that will yield the result of a transaction once its
// outcome has been decided.
type resultChannel chan error
// commit queues the transaction for processing and returns once the result has been determined.
func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transaction) error {
span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.Commit", nil)
defer span.Finish()
transaction.result = make(resultChannel, 1)
if transaction.repositoryTarget() && !transaction.repositoryExists {
// Determine if the repository was created in this transaction and stage its state
// for committing if so.
if err := mgr.stageRepositoryCreation(ctx, transaction); err != nil {
if errors.Is(err, storage.ErrRepositoryNotFound) {
// The repository wasn't created as part of this transaction.
return nil
}
return fmt.Errorf("stage repository creation: %w", err)
}
}
if transaction.repositoryCreation == nil {
if err := mgr.packObjects(ctx, transaction); err != nil {
return fmt.Errorf("pack objects: %w", err)
}
if err := mgr.prepareHousekeeping(ctx, transaction); err != nil {
return fmt.Errorf("preparing housekeeping: %w", err)
}
// If there were objects packed that should be committed, record the packfile's creation.
if transaction.packPrefix != "" {
packDir := filepath.Join(transaction.relativePath, "objects", "pack")
for _, fileExtension := range []string{".pack", ".idx", ".rev"} {
if err := transaction.walEntry.CreateFile(
filepath.Join(transaction.stagingDirectory, "objects"+fileExtension),
filepath.Join(packDir, transaction.packPrefix+fileExtension),
); err != nil {
return fmt.Errorf("record file creation: %w", err)
}
}
}
// Reference changes are only recorded if the repository exists when the transaction
// began. Repository creations record the entire state of the repository at the end
// of the transaction so ReferenceRecorder is not used. ReferenceRecorder is not used
// with reftables.
//
// We only stage the packed-refs file if reference transactions were recorded or
// this was a housekeeping run. This prevents a duplicate removal being staged
// after a repository removal operation as the removal would look like a modification
// to the recorder.
if transaction.referenceRecorder != nil && (len(transaction.referenceUpdates) > 0 || transaction.runHousekeeping != nil) {
if err := transaction.referenceRecorder.StagePackedRefs(); err != nil {
return fmt.Errorf("stage packed refs: %w", err)
}
}
}
transaction.manifest = &gitalypb.LogEntry{
RelativePath: transaction.relativePath,
Operations: transaction.walEntry.Operations(),
ReferenceTransactions: transaction.referenceUpdatesToProto(),
}
if transaction.deleteRepository {
transaction.manifest.RepositoryDeletion = &gitalypb.LogEntry_RepositoryDeletion{}
}
if err := transaction.stageKeyValueOperations(); err != nil {
return fmt.Errorf("stage key-value operations: %w", err)
}
transaction.manifest.Operations = transaction.walEntry.Operations()
if err := wal.WriteManifest(ctx, transaction.walEntry.Directory(), transaction.manifest); err != nil {
return fmt.Errorf("writing manifest file: %w", err)
}
// Sync the log entry completely.
if err := safe.NewSyncer().SyncRecursive(ctx, transaction.walEntry.Directory()); err != nil {
return fmt.Errorf("flush log entry: %w", err)
}
if err := func() error {
defer trace.StartRegion(ctx, "commit queue").End()
transaction.metrics.commitQueueDepth.Inc()
defer transaction.metrics.commitQueueDepth.Dec()
defer prometheus.NewTimer(mgr.metrics.commitQueueWaitSeconds).ObserveDuration()
select {
case mgr.admissionQueue <- transaction:
transaction.admitted = true
return nil
case <-ctx.Done():
return ctx.Err()
case <-mgr.closing:
return storage.ErrTransactionProcessingStopped
}
}(); err != nil {
return err
}
defer trace.StartRegion(ctx, "result wait").End()
select {
case err := <-transaction.result:
return unwrapExpectedError(err)
case <-ctx.Done():
return ctx.Err()
}
}
// stageKeyValueOperations records the key-value operations performed into the WAL entry.
func (txn *Transaction) stageKeyValueOperations() error {
for key := range txn.recordingReadWriter.WriteSet() {
key := []byte(key)
item, err := txn.db.Get(key)
if err != nil {
if errors.Is(err, badger.ErrKeyNotFound) {
txn.walEntry.DeleteKey(key)
continue
}
return fmt.Errorf("get: %w", err)
}
value, err := item.ValueCopy(nil)
if err != nil {
return fmt.Errorf("value copy: %w", err)
}
txn.walEntry.SetKey(key, value)
}
return nil
}
func (txn *Transaction) referenceUpdatesToProto() []*gitalypb.LogEntry_ReferenceTransaction {
var referenceTransactions []*gitalypb.LogEntry_ReferenceTransaction
for _, updates := range txn.referenceUpdates {
changes := make([]*gitalypb.LogEntry_ReferenceTransaction_Change, 0, len(updates))
for reference, update := range updates {
changes = append(changes, &gitalypb.LogEntry_ReferenceTransaction_Change{
ReferenceName: []byte(reference),
NewOid: []byte(update.NewOID),
NewTarget: []byte(update.NewTarget),
})
}
// Sort the reference updates so the reference changes are always logged in a deterministic order.
sort.Slice(changes, func(i, j int) bool {
return bytes.Compare(
changes[i].GetReferenceName(),
changes[j].GetReferenceName(),
) < 0
})
referenceTransactions = append(referenceTransactions, &gitalypb.LogEntry_ReferenceTransaction{
Changes: changes,
})
}
return referenceTransactions
}
// stageRepositoryCreation determines the repository's state following a creation. It reads the repository's
// complete state and stages it into the transaction for committing.
func (mgr *TransactionManager) stageRepositoryCreation(ctx context.Context, transaction *Transaction) error {
span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.stageRepositoryCreation", nil)
defer span.Finish()
objectHash, err := transaction.snapshotRepository.ObjectHash(ctx)
if err != nil {
return fmt.Errorf("object hash: %w", err)
}
transaction.repositoryCreation = &repositoryCreation{
objectHash: objectHash,
}
references, err := transaction.snapshotRepository.GetReferences(ctx)
if err != nil {
return fmt.Errorf("get references: %w", err)
}
referenceUpdates := make(git.ReferenceUpdates, len(references))
for _, ref := range references {
if ref.IsSymbolic {
return fmt.Errorf("unexpected symbolic ref: %v", ref)
}
referenceUpdates[ref.Name] = git.ReferenceUpdate{
OldOID: objectHash.ZeroOID,
NewOID: git.ObjectID(ref.Target),
}
}
transaction.referenceUpdates = []git.ReferenceUpdates{referenceUpdates}
return nil
}
// setupStagingRepository sets up a snapshot that is used for verifying and staging changes. It contains up to
// date state of the partition. It does not have the quarantine configured.
func (mgr *TransactionManager) setupStagingRepository(ctx context.Context, transaction *Transaction) (*localrepo.Repo, error) {
defer trace.StartRegion(ctx, "setupStagingRepository").End()
span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.setupStagingRepository", nil)
defer span.Finish()
if transaction.stagingSnapshot != nil {
return nil, errors.New("staging snapshot already setup")
}
var err error
transaction.stagingSnapshot, err = mgr.snapshotManager.GetSnapshot(ctx, []string{transaction.relativePath}, true)
if err != nil {
return nil, fmt.Errorf("new snapshot: %w", err)
}
return mgr.repositoryFactory.Build(transaction.stagingSnapshot.RelativePath(transaction.relativePath)), nil
}
// packPrefixRegexp matches the output of `git index-pack` where it
// prints the packs prefix in the format `pack <digest>`.
var packPrefixRegexp = regexp.MustCompile(`^pack\t([0-9a-f]+)\n$`)
// packObjects walks the objects in the quarantine directory starting from the new
// reference tips introduced by the transaction and the explicitly included objects. All
// objects in the quarantine directory that are encountered during the walk are included in
// a packfile that gets committed with the transaction. All encountered objects that are missing
// from the quarantine directory are considered the transaction's dependencies. The dependencies
// are later verified to exist in the repository before committing the transaction, and they will
// be guarded against concurrent pruning operations. The final pack is staged in the WAL directory
// of the transaction ready for committing. The pack's index and reverse index is also included.
//
// Objects that were not reachable from the walk are not committed with the transaction. Objects
// that already exist in the repository are included in the packfile if the client wrote them into
// the quarantine directory.
//
// The packed objects are not yet checked for validity. See the following issue for more
// details on this: https://gitlab.com/gitlab-org/gitaly/-/issues/5779
func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Transaction) (returnedErr error) {
defer trace.StartRegion(ctx, "packObjects").End()
if !transaction.repositoryTarget() {
return nil
}
if _, err := os.Stat(mgr.getAbsolutePath(transaction.snapshotRepository.GetRelativePath())); err != nil {
if !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("stat: %w", err)
}
// The repository does not exist. Exit early as the Git commands below would fail. There's
// nothing to pack and no dependencies if the repository doesn't exist.
return nil
}
span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.packObjects", nil)
defer span.Finish()
// We want to only pack the objects that are present in the quarantine as they are potentially
// new. Disable the alternate, which is the repository's original object directory, so that we'll
// only walk the objects in the quarantine directory below.
quarantineOnlySnapshotRepository, err := transaction.snapshotRepository.QuarantineOnly()
if err != nil {
return fmt.Errorf("quarantine only: %w", err)
}
objectHash, err := quarantineOnlySnapshotRepository.ObjectHash(ctx)
if err != nil {
return fmt.Errorf("object hash: %w", err)
}
heads := make([]string, 0)
for _, referenceUpdates := range transaction.referenceUpdates {
for _, update := range referenceUpdates {
if !update.IsRegularUpdate() {
// We don't have to worry about symrefs here.
continue
}
if update.NewOID == objectHash.ZeroOID {
// Reference deletions can't introduce new objects so ignore them.
continue
}
heads = append(heads, update.NewOID.String())
}
}
for objectID := range transaction.includedObjects {
heads = append(heads, objectID.String())
}
if len(heads) == 0 {
// No need to pack objects if there are no changes that can introduce new objects.
return nil
}
objectWalkReader, objectWalkWriter := io.Pipe()
group, ctx := errgroup.WithContext(ctx)
group.Go(func() (returnedErr error) {
defer objectWalkWriter.CloseWithError(returnedErr)
// Walk the new reference tips and included objects in the quarantine directory. The reachable
// objects will be included in the transaction's logged packfile and the unreachable ones
// discarded, and missing objects regard as the transaction's dependencies.
if err := quarantineOnlySnapshotRepository.WalkObjects(ctx,
strings.NewReader(strings.Join(heads, "\n")),
objectWalkWriter,
); err != nil {
return fmt.Errorf("walk objects: %w", err)
}
return nil
})
objectsToPackReader, objectsToPackWriter := io.Pipe()
// We'll only start the commands needed for object packing if the walk above produces objects
// we need to pack.
startObjectPacking := func() {
packReader, packWriter := io.Pipe()
group.Go(func() (returnedErr error) {
defer func() {
objectsToPackReader.CloseWithError(returnedErr)
packWriter.CloseWithError(returnedErr)
}()
if err := quarantineOnlySnapshotRepository.PackObjects(ctx, objectsToPackReader, packWriter); err != nil {
return fmt.Errorf("pack objects: %w", err)
}
return nil
})
group.Go(func() (returnedErr error) {
defer packReader.CloseWithError(returnedErr)
// index-pack places the pack, index, and reverse index into the transaction's staging directory.
var stdout, stderr bytes.Buffer
if err := quarantineOnlySnapshotRepository.ExecAndWait(ctx, gitcmd.Command{
Name: "index-pack",
Flags: []gitcmd.Option{gitcmd.Flag{Name: "--stdin"}, gitcmd.Flag{Name: "--rev-index"}},
Args: []string{filepath.Join(transaction.stagingDirectory, "objects.pack")},
}, gitcmd.WithStdin(packReader), gitcmd.WithStdout(&stdout), gitcmd.WithStderr(&stderr)); err != nil {
return structerr.New("index pack: %w", err).WithMetadata("stderr", stderr.String())
}
matches := packPrefixRegexp.FindStringSubmatch(stdout.String())
if len(matches) != 2 {
return structerr.New("unexpected index-pack output").WithMetadata("stdout", stdout.String())
}
transaction.packPrefix = fmt.Sprintf("pack-%s", matches[1])
return nil
})
}
transaction.objectDependencies = map[git.ObjectID]struct{}{}
group.Go(func() (returnedErr error) {
defer objectWalkReader.CloseWithError(returnedErr)
// objectLine comes in two formats from the walk:
// 1. '<oid> <path>\n' in case the object is found. <path> may or may not be set.
// 2. '?<oid>\n' in case the object is not found.
//
// Objects that are found are included in the transaction's packfile.
//
// Objects that are not found are recorded as the transaction's
// dependencies since they should exist in the repository.
scanner := bufio.NewScanner(objectWalkReader)
defer objectsToPackWriter.CloseWithError(returnedErr)
packObjectsStarted := false
for scanner.Scan() {
objectLine := scanner.Text()
if objectLine[0] == '?' {
// Remove the '?' prefix so we're left with just the object ID.
transaction.objectDependencies[git.ObjectID(objectLine[1:])] = struct{}{}
continue
}
// At this point we have an object that we need to pack. If `pack-objects` and `index-pack`
// haven't yet been launched, launch them.
if !packObjectsStarted {
packObjectsStarted = true
startObjectPacking()
}
// Write the objects to `git pack-objects`. Restore the new line that was
// trimmed by the scanner.
if _, err := objectsToPackWriter.Write([]byte(objectLine + "\n")); err != nil {
return fmt.Errorf("write object id for packing: %w", err)
}
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("scanning rev-list output: %w", err)
}
return nil
})
return group.Wait()
}
// preparePackRefsReftable is used to prepare compaction for reftables.
//
// The flow here is to find the delta of tables modified post compactions. We note the
// list of tables which were deleted and which were added. In the verification stage,
// we use this information to finally create the modified tables.list. Which is also
// why we don't track 'tables.list' operation here.
func (mgr *TransactionManager) preparePackRefsReftable(ctx context.Context, transaction *Transaction) error {
runPackRefs := transaction.runHousekeeping.packRefs
repoPath := mgr.getAbsolutePath(transaction.snapshotRepository.GetRelativePath())
tablesListPre, err := git.ReadReftablesList(repoPath)
if err != nil {
return fmt.Errorf("reading tables.list pre-compaction: %w", err)
}
// Execute git-pack-refs command. The command runs in the scope of the snapshot repository. Thus, we can
// let it prune the ref references without causing any impact to other concurrent transactions.
var stderr bytes.Buffer
if err := transaction.snapshotRepository.ExecAndWait(ctx, gitcmd.Command{
Name: "pack-refs",
// By using the '--auto' flag, we ensure that git uses the best heuristic
// for compaction. For reftables, it currently uses a geometric progression.
// This ensures we don't keep compacting unnecessarily to a single file.
Flags: []gitcmd.Option{gitcmd.Flag{Name: "--auto"}},
}, gitcmd.WithStderr(&stderr)); err != nil {
return structerr.New("exec pack-refs: %w", err).WithMetadata("stderr", stderr.String())
}
tablesListPost, err := git.ReadReftablesList(repoPath)
if err != nil {
return fmt.Errorf("reading tables.list post-compaction: %w", err)
}
// If there are no changes after compaction, we don't need to log anything.
if slices.Equal(tablesListPre, tablesListPost) {
return nil
}
tablesPostMap := make(map[string]struct{})
for _, table := range tablesListPost {
tablesPostMap[table] = struct{}{}
}
for _, table := range tablesListPre {
if _, ok := tablesPostMap[table]; !ok {
// If the table no longer exists, we remove it.
transaction.walEntry.RemoveDirectoryEntry(
filepath.Join(transaction.relativePath, "reftable", table),
)
} else {
// If the table exists post compaction too, remove it from the
// map, since we don't want to record an existing table.
delete(tablesPostMap, table)
}
}
for file := range tablesPostMap {
// The remaining tables in tableListPost are new tables
// which need to be recorded.
if err := transaction.walEntry.CreateFile(
filepath.Join(repoPath, "reftable", file),
filepath.Join(transaction.relativePath, "reftable", file),
); err != nil {
return fmt.Errorf("creating new table: %w", err)
}
}
runPackRefs.reftablesAfter = tablesListPost
runPackRefs.reftablesBefore = tablesListPre
return nil
}
// preparePackRefsFiles runs git-pack-refs command against the snapshot repository. It collects the resulting packed-refs
// file and the list of pruned references. Unfortunately, git-pack-refs doesn't output which refs are pruned. So, we
// performed two ref walkings before and after running the command. The difference between the two walks is the list of
// pruned refs. This workaround works but is not performant on large repositories with huge amount of loose references.
// Smaller repositories or ones that run housekeeping frequent won't have this issue.
// The work of adding pruned refs dump to `git-pack-refs` is tracked here:
// https://gitlab.com/gitlab-org/git/-/issues/222
func (mgr *TransactionManager) preparePackRefsFiles(ctx context.Context, transaction *Transaction) error {
runPackRefs := transaction.runHousekeeping.packRefs
for _, lock := range []string{".new", ".lock"} {
lockRelativePath := filepath.Join(transaction.relativePath, "packed-refs"+lock)
lockAbsolutePath := filepath.Join(transaction.snapshot.Root(), lockRelativePath)
if err := os.Remove(lockAbsolutePath); err != nil {
if errors.Is(err, fs.ErrNotExist) {
continue
}
return fmt.Errorf("remove %v: %w", lockAbsolutePath, err)
}
// The lock file existed. Log its deletion.
transaction.walEntry.RemoveDirectoryEntry(lockRelativePath)
}
// First walk to collect the list of loose refs.
looseReferences := make(map[git.ReferenceName]struct{})
repoPath := mgr.getAbsolutePath(transaction.snapshotRepository.GetRelativePath())
if err := filepath.WalkDir(filepath.Join(repoPath, "refs"), func(path string, entry fs.DirEntry, err error) error {
if err != nil {
return err
}
if !entry.IsDir() {
// Get fully qualified refs.
ref, err := filepath.Rel(repoPath, path)
if err != nil {
return fmt.Errorf("extracting ref name: %w", err)
}
looseReferences[git.ReferenceName(ref)] = struct{}{}
}
return nil
}); err != nil {
return fmt.Errorf("initial walking refs directory: %w", err)
}
// Execute git-pack-refs command. The command runs in the scope of the snapshot repository. Thus, we can
// let it prune the ref references without causing any impact to other concurrent transactions.
var stderr bytes.Buffer
if err := transaction.snapshotRepository.ExecAndWait(ctx, gitcmd.Command{
Name: "pack-refs",
Flags: []gitcmd.Option{gitcmd.Flag{Name: "--all"}},
}, gitcmd.WithStderr(&stderr)); err != nil {
return structerr.New("exec pack-refs: %w", err).WithMetadata("stderr", stderr.String())
}
// Second walk and compare with the initial list of loose references. Any disappeared refs are pruned.
//
// The transaction's reference recorder handles staging the modified packed-refs file.
for ref := range looseReferences {
_, err := os.Stat(filepath.Join(repoPath, ref.String()))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
runPackRefs.PrunedRefs[ref] = struct{}{}
} else {
return fmt.Errorf("second walk refs directory: %w", err)
}
}
}
return nil
}
// packfileExtensions contains the packfile extension and its dependencies. They will be collected after running
// repacking command.
var packfileExtensions = map[string]struct{}{
"multi-pack-index": {},
".pack": {},
".idx": {},
".rev": {},
".mtimes": {},
".bitmap": {},
".promisor": {},
}
// unwrapExpectedError unwraps expected errors that may occur and returns them directly to the caller.
func unwrapExpectedError(err error) error {
// The manager controls its own execution context and it is canceled only when Stop is called.
// Any context.Canceled errors returned are thus from shutting down so we report that here.
if errors.Is(err, context.Canceled) {
return storage.ErrTransactionProcessingStopped
}
return err
}
// Run starts the transaction processing. On start up Run loads the indexes of the last appended and applied
// log entries from the database. It will then apply any transactions that have been logged but not applied
// to the repository. Once the recovery is completed, Run starts processing new transactions by verifying the
// references, logging the transaction and finally applying it to the repository. The transactions are acknowledged
// once they've been applied to the repository.
//
// Run keeps running until Stop is called or it encounters a fatal error. All transactions will error with
// storage.ErrTransactionProcessingStopped when Run returns.
func (mgr *TransactionManager) Run() error {
return mgr.run(mgr.ctx)
}
func (mgr *TransactionManager) run(ctx context.Context) (returnedErr error) {
defer func() {
// On-going operations may fail with a context canceled error if the manager is stopped. This is
// not a real error though given the manager will recover from this on restart. Swallow the error.
if errors.Is(returnedErr, context.Canceled) {
returnedErr = nil
}
}()
defer func() {
if err := mgr.cleanupWorkers.Wait(); err != nil {
returnedErr = errors.Join(returnedErr, fmt.Errorf("clean up worker: %w", err))
}
}()
defer func() {
if err := mgr.logManager.Close(); err != nil {
returnedErr = errors.Join(returnedErr, fmt.Errorf("stopping log manager: %w", err))
}
}()
// Defer the Stop in order to release all on-going Commit calls in case of error.
defer close(mgr.closed)
defer mgr.Close()
defer mgr.testHooks.beforeRunExiting()
if err := mgr.initialize(ctx); err != nil {
return fmt.Errorf("initialize: %w", err)
}
for {
if mgr.appliedLSN < mgr.logManager.AppendedLSN() {
lsn := mgr.appliedLSN + 1
if err := mgr.applyLogEntry(ctx, lsn); err != nil {
return fmt.Errorf("apply log entry: %w", err)
}
continue
}
if err := mgr.processTransaction(ctx); err != nil {
return fmt.Errorf("process transaction: %w", err)
}
}
}
// processTransaction waits for a transaction and processes it by verifying and
// logging it.
func (mgr *TransactionManager) processTransaction(ctx context.Context) (returnedErr error) {
var transaction *Transaction
select {
case transaction = <-mgr.admissionQueue:
defer trace.StartRegion(ctx, "processTransaction").End()
defer prometheus.NewTimer(mgr.metrics.transactionProcessingDurationSeconds).ObserveDuration()
// The transaction does not finish itself anymore once it has been admitted for
// processing. This avoids the client concurrently removing the staged state
// while the manager is still operating on it. We thus need to defer its finishing.
//
// The error is always empty here as we run the clean up in background. If a background
// task fails, cleanupWorkerFailed channel is closed prompting the manager to exit and
// return the error from the errgroup.
defer func() { _ = transaction.finish(true) }()
case <-mgr.cleanupWorkerFailed:
return errors.New("cleanup worker failed")
case <-mgr.completedQueue:
return nil
case logErr := <-mgr.logManager.GetNotificationQueue():
if logErr != nil {
return fmt.Errorf("log manager failed: %w", logErr)
}
return nil
case <-ctx.Done():
}
// Return if the manager was stopped. The select is indeterministic so this guarantees
// the manager stops the processing even if there are transactions in the queue.
if err := ctx.Err(); err != nil {
return err
}
span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.processTransaction", nil)
defer span.Finish()
transaction.result <- func() (commitErr error) {
var zeroOID git.ObjectID
if transaction.repositoryTarget() {
repositoryExists, err := mgr.doesRepositoryExist(ctx, transaction.relativePath)
if err != nil {
return fmt.Errorf("does repository exist: %w", err)
}
if transaction.repositoryCreation != nil && repositoryExists {
return ErrRepositoryAlreadyExists
} else if transaction.repositoryCreation == nil && !repositoryExists {
return storage.ErrRepositoryNotFound
}
if repositoryExists {
targetRepository := mgr.repositoryFactory.Build(transaction.relativePath)
objectHash, err := targetRepository.ObjectHash(ctx)
if err != nil {
return fmt.Errorf("object hash: %w", err)
}
zeroOID = objectHash.ZeroOID
// Verify that all objects this transaction depends on are present in the repository. The dependency
// objects are the reference tips set in the transaction and the objects the transaction's packfile
// is based on. If an object dependency is missing, the transaction is aborted as applying it would
// result in repository corruption.
if err := mgr.verifyObjectsExist(ctx, targetRepository, transaction.objectDependencies); err != nil {
return fmt.Errorf("verify object dependencies: %w", err)
}
refBackend, err := targetRepository.ReferenceBackend(ctx)
if err != nil {
return fmt.Errorf("reference backend: %w", err)
}
if refBackend == git.ReferenceBackendReftables || transaction.runHousekeeping != nil {
if refBackend == git.ReferenceBackendReftables {
if err := mgr.verifyReferences(ctx, transaction); err != nil {
return fmt.Errorf("verify references: %w", err)
}
}
if transaction.runHousekeeping != nil {
housekeepingEntry, err := mgr.verifyHousekeeping(ctx, transaction, refBackend, objectHash.ZeroOID)
if err != nil {
return fmt.Errorf("verifying pack refs: %w", err)
}
transaction.manifest.Housekeeping = housekeepingEntry
}
transaction.manifest.Operations = transaction.walEntry.Operations()
// The transaction has already written the manifest to the disk as a read-only file
// before queuing for commit. Remove the old file so we can replace it below.
if err := wal.RemoveManifest(ctx, transaction.walEntry.Directory()); err != nil {
return fmt.Errorf("remove outdated manifest")
}
// Operations working on the staging snapshot add more files into the log entry,
// and modify the manifest.
if err := wal.WriteManifest(ctx, transaction.walEntry.Directory(), transaction.manifest); err != nil {
return fmt.Errorf("writing manifest file: %w", err)
}
// Fsync only the file itself and the parent directory.
syncer := safe.NewSyncer()
if err := syncer.Sync(ctx, wal.ManifestPath(transaction.walEntry.Directory())); err != nil {
return fmt.Errorf("flush updated maninest file: %w", err)
}
if err := syncer.Sync(ctx, transaction.walEntry.Directory()); err != nil {
return fmt.Errorf("flush parent dir of updated manifest file: %w", err)
}
}
}
}
// Prepare the transaction to conflict check it. We'll commit it later if we
// succeed logging the transaction.
mgr.mutex.Lock()
preparedTX, err := mgr.conflictMgr.Prepare(ctx, &conflict.Transaction{
ReadLSN: transaction.SnapshotLSN(),
TargetRelativePath: transaction.relativePath,
DeleteRepository: transaction.deleteRepository,
ZeroOID: zeroOID,
ReferenceUpdates: transaction.referenceUpdates,
})
mgr.mutex.Unlock()
if err != nil {
return fmt.Errorf("prepare: %w", err)
}
if err := mgr.verifyKeyValueOperations(ctx, transaction); err != nil {
return fmt.Errorf("verify key-value operations: %w", err)
}
commitFS, err := mgr.verifyFileSystemOperations(ctx, transaction)
if err != nil {
return fmt.Errorf("verify file system operations: %w", err)
}
mgr.testHooks.beforeAppendLogEntry(mgr.logManager.AppendedLSN() + 1)
if err := mgr.appendLogEntry(ctx, transaction.objectDependencies, transaction.manifest, transaction.walFilesPath()); err != nil {
return fmt.Errorf("append log entry: %w", err)
}
// Commit the prepared transaction now that we've managed to commit the log entry.
mgr.mutex.Lock()
preparedTX.Commit(ctx, mgr.logManager.AppendedLSN())
commitFS(mgr.logManager.AppendedLSN())
mgr.mutex.Unlock()
return nil
}()
return nil
}
// verifyFileSystemOperations verifies the file system operations logged by a transaction still apply and don't conflict
// with other concurrently committed operations.
func (mgr *TransactionManager) verifyFileSystemOperations(ctx context.Context, tx *Transaction) (func(lsn storage.LSN), error) {
defer trace.StartRegion(ctx, "verifyFileSystemOperations").End()
if len(tx.walEntry.Operations()) == 0 {
return func(storage.LSN) {}, nil
}
mgr.mutex.Lock()
fsTX := mgr.fsHistory.Begin(tx.SnapshotLSN())
mgr.mutex.Unlock()
// isLooseReference returns whether this path is inside the `refs`
// directory of the repository.
isLooseReference := func(path string) bool {
return strings.HasPrefix(path, filepath.Join(tx.relativePath, "refs"))
}
// isTablesList returns true if this is the table.list file used with reftables.
isTablesList := func(path string) bool {
return path == filepath.Join(tx.relativePath, "reftable", "tables.list")
}
for _, op := range tx.walEntry.Operations() {
switch op.GetOperation().(type) {
case *gitalypb.LogEntry_Operation_CreateDirectory_:
path := string(op.GetCreateDirectory().GetPath())
if err := fsTX.Read(path); err != nil {
return nil, fmt.Errorf("read: %w", err)
}
if err := fsTX.CreateDirectory(path); err != nil {
return nil, fmt.Errorf("create directory: %w", err)
}
case *gitalypb.LogEntry_Operation_CreateHardLink_:
op := op.GetCreateHardLink()
if op.GetSourceInStorage() {
if err := fsTX.Read(string(op.GetSourcePath())); err != nil {
return nil, fmt.Errorf("destination read: %w", err)
}
}
destinationPath := string(op.GetDestinationPath())
// The reference changes have already gone through logical conflict
// checks at this point. We skip a conflict check as the loose reference
// we're about to create has already been conflict checked.
//
// CreateFile call below will only succeed if the loose reference file
// does not exist. This is mostly to handle conflicts with reference packing
// and loose reference creation. Other conflicts are not currently resolved.
if !isLooseReference(destinationPath) {
if err := fsTX.Read(destinationPath); err != nil {
return nil, fmt.Errorf("destination read: %w", err)
}
}
if err := fsTX.CreateFile(destinationPath); err != nil {
return nil, fmt.Errorf("create file: %w", err)
}
case *gitalypb.LogEntry_Operation_RemoveDirectoryEntry_:
path := string(op.GetRemoveDirectoryEntry().GetPath())
// reftable/tables.list file conflicts on every single reference write
// as all reference updates need to modify it. The conflicts have already
// been resolved at this point. Don't conflict check it.
if !isTablesList(path) {
if err := fsTX.Read(path); err != nil {
return nil, fmt.Errorf("read: %w", err)
}
}
if err := fsTX.Remove(path); err != nil {
return nil, fmt.Errorf("remove: %w", err)
}
}
}
return fsTX.Commit, nil
}
// verifyKeyValueOperations checks the key-value operations of the transaction for conflicts and includes
// them in the log entry. The conflict checking ensures serializability. Transaction is considered to
// conflict if it read a key a concurrently committed transaction set or deleted. Iterated key prefixes
// are predicate locked.
func (mgr *TransactionManager) verifyKeyValueOperations(ctx context.Context, tx *Transaction) error {
defer trace.StartRegion(ctx, "verifyKeyValueOperations").End()
if readSet := tx.recordingReadWriter.ReadSet(); len(readSet) > 0 {
if err := mgr.walkCommittedEntries(tx, func(entry *gitalypb.LogEntry, _ map[git.ObjectID]struct{}) error {
for _, op := range entry.GetOperations() {
var key []byte
switch op := op.GetOperation().(type) {
case *gitalypb.LogEntry_Operation_SetKey_:
key = op.SetKey.GetKey()
case *gitalypb.LogEntry_Operation_DeleteKey_:
key = op.DeleteKey.GetKey()
}
stringKey := string(key)
if _, ok := readSet[stringKey]; ok {
return newConflictingKeyValueOperationError(stringKey)
}
for prefix := range tx.recordingReadWriter.PrefixesRead() {
if bytes.HasPrefix(key, []byte(prefix)) {
return newConflictingKeyValueOperationError(stringKey)
}
}
}
return nil
}); err != nil {
return fmt.Errorf("walking committed entries: %w", err)
}
}
return nil
}
// verifyObjectsExist verifies that all objects passed in to the method exist in the repository.
// If an object is missing, an InvalidObjectError error is raised.
func (mgr *TransactionManager) verifyObjectsExist(ctx context.Context, repository *localrepo.Repo, oids map[git.ObjectID]struct{}) error {
defer trace.StartRegion(ctx, "verifyObjectsExist").End()
if len(oids) == 0 {
return nil
}
revisions := make([]git.Revision, 0, len(oids))
for oid := range oids {
revisions = append(revisions, oid.Revision())
}
objectHash, err := repository.ObjectHash(ctx)
if err != nil {
return fmt.Errorf("object hash: %w", err)
}
if err := checkObjects(ctx, repository, revisions, func(revision git.Revision, oid git.ObjectID) error {
if objectHash.IsZeroOID(oid) {
return localrepo.InvalidObjectError(revision)
}
return nil
}); err != nil {
return fmt.Errorf("check objects: %w", err)
}
return nil
}
// Close stops the transaction processing causing Run to return.
func (mgr *TransactionManager) Close() { mgr.close() }
// CloseSnapshots closes any remaining snapshots in the cache. Caller of Run() should
// call it after there are no more active transactions and no new transactions will be
// started.
func (mgr *TransactionManager) CloseSnapshots() error {
// snapshotManager may not be set if initializing it fails.
if mgr.snapshotManager == nil {
return nil
}
return mgr.snapshotManager.Close()
}
// snapshotsDir returns the directory where the transactions' snapshots are stored.
func (mgr *TransactionManager) snapshotsDir() string {
return filepath.Join(mgr.stagingDirectory, "snapshots")
}
// initialize initializes the TransactionManager's state from the database. It initializes WAL log manager and the
// applied LSNs and initializes the notification channels that synchronize transaction beginning with log entry
// applying.
func (mgr *TransactionManager) initialize(ctx context.Context) error {
defer trace.StartRegion(ctx, "initialize").End()
defer close(mgr.initialized)
var appliedLSN gitalypb.LSN
if err := mgr.readKey(keyAppliedLSN, &appliedLSN); err != nil && !errors.Is(err, badger.ErrKeyNotFound) {
return fmt.Errorf("read applied LSN: %w", err)
}
mgr.appliedLSN = storage.LSN(appliedLSN.GetValue())
if err := mgr.logManager.Initialize(ctx, mgr.appliedLSN); err != nil {
return fmt.Errorf("initialize log management: %w", err)
}
if err := os.Mkdir(mgr.snapshotsDir(), mode.Directory); err != nil {
return fmt.Errorf("create snapshot manager directory: %w", err)
}
var err error
if mgr.snapshotManager, err = snapshot.NewManager(mgr.logger, mgr.storagePath, mgr.snapshotsDir(), mgr.metrics.snapshot); err != nil {
return fmt.Errorf("new snapshot manager: %w", err)
}
// Create a snapshot lock for the applied LSN as it is used for synchronizing
// the snapshotters with the log application.
mgr.createSnapshotLockIfNeeded(mgr.appliedLSN)
close(mgr.snapshotLocks[mgr.appliedLSN].applied)
// Each unapplied log entry should have a snapshot lock as they are created in normal
// operation when committing a log entry. Recover these entries.
for i := mgr.appliedLSN + 1; i <= mgr.logManager.AppendedLSN(); i++ {
mgr.createSnapshotLockIfNeeded(i)
}
mgr.testHooks.beforeInitialization()
mgr.initializationSuccessful = true
return nil
}
// doesRepositoryExist returns whether the repository exists or not.
func (mgr *TransactionManager) doesRepositoryExist(ctx context.Context, relativePath string) (bool, error) {
defer trace.StartRegion(ctx, "doesRepositoryExist").End()
stat, err := os.Stat(mgr.getAbsolutePath(relativePath))
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
return false, nil
}
return false, fmt.Errorf("stat repository directory: %w", err)
}
if !stat.IsDir() {
return false, errNotDirectory
}
return true, nil
}
// getAbsolutePath returns the relative path's absolute path in the storage.
func (mgr *TransactionManager) getAbsolutePath(relativePath ...string) string {
return filepath.Join(append([]string{mgr.storagePath}, relativePath...)...)
}
// packFilePath returns a log entry's pack file's absolute path in the wal files directory.
func packFilePath(walFiles string) string {
return filepath.Join(walFiles, "transaction.pack")
}
// verifyReferences verifies that the references in the transaction apply on top of the already accepted
// reference changes. The old tips in the transaction are verified against the current actual tips.
// It returns the write-ahead log entry for the reference transactions successfully verified.
func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction *Transaction) error {
defer trace.StartRegion(ctx, "verifyReferences").End()
if len(transaction.referenceUpdates) == 0 {
return nil
}
span, _ := tracing.StartSpanIfHasParent(ctx, "transaction.verifyReferences", nil)
defer span.Finish()
stagingRepository, err := mgr.setupStagingRepository(ctx, transaction)
if err != nil {
return fmt.Errorf("setup staging snapshot: %w", err)
}
// Apply quarantine to the staging repository in order to ensure the new objects are available when we
// are verifying references. Without it we'd encounter errors about missing objects as the new objects
// are not in the repository.
stagingRepositoryWithQuarantine, err := stagingRepository.Quarantine(ctx, transaction.quarantineDirectory)
if err != nil {
return fmt.Errorf("quarantine: %w", err)
}
if err := mgr.verifyReferencesWithGitForReftables(ctx, transaction.manifest.GetReferenceTransactions(), transaction, stagingRepositoryWithQuarantine); err != nil {
return fmt.Errorf("verify references with git: %w", err)
}
return nil
}
// verifyReferencesWithGitForReftables is responsible for converting the logical reference updates
// to transaction operations.
//
// To ensure that we don't modify existing tables and autocompact, we lock the existing tables
// before applying the updates. This way the reftable backend will only create new tables
func (mgr *TransactionManager) verifyReferencesWithGitForReftables(
ctx context.Context,
referenceTransactions []*gitalypb.LogEntry_ReferenceTransaction,
tx *Transaction,
repo *localrepo.Repo,
) error {
reftablePath := mgr.getAbsolutePath(repo.GetRelativePath(), "reftable/")
existingTables := make(map[string]struct{})
lockedTables := make(map[string]struct{})
// reftableWalker allows us to walk the reftable directory.
reftableWalker := func(handler func(path string) error) fs.WalkDirFunc {
return func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
if filepath.Base(path) == "reftable" {
return nil
}
return fmt.Errorf("unexpected directory: %s", filepath.Base(path))
}
return handler(path)
}
}
// We first track the existing tables in the reftable directory.
if err := filepath.WalkDir(
reftablePath,
reftableWalker(func(path string) error {
if filepath.Base(path) == "tables.list" {
return nil
}
existingTables[path] = struct{}{}
return nil
}),
); err != nil {
return fmt.Errorf("finding reftables: %w", err)
}
// We then lock existing tables as to disable the autocompaction.
for table := range existingTables {
lockedPath := table + ".lock"
f, err := os.Create(lockedPath)
if err != nil {
return fmt.Errorf("creating reftable lock: %w", err)
}
if err = f.Close(); err != nil {
return fmt.Errorf("closing reftable lock: %w", err)
}
lockedTables[lockedPath] = struct{}{}
}
// Since autocompaction is now disabled, adding references will
// add new tables but not compact them.
for _, referenceTransaction := range referenceTransactions {
if err := mgr.applyReferenceTransaction(ctx, referenceTransaction.GetChanges(), repo); err != nil {
return fmt.Errorf("applying reference: %w", err)
}
}
// With this, we can track the new tables added along with the 'tables.list'
// as operations on the transaction.
if err := filepath.WalkDir(
reftablePath,
reftableWalker(func(path string) error {
if _, ok := lockedTables[path]; ok {
return nil
}
if _, ok := existingTables[path]; ok {
return nil
}
base := filepath.Base(path)
if base == "tables.list" {
tx.walEntry.RemoveDirectoryEntry(filepath.Join(tx.relativePath, "reftable", base))
}
return tx.walEntry.CreateFile(path, filepath.Join(tx.relativePath, "reftable", base))
}),
); err != nil {
return fmt.Errorf("finding reftables: %w", err)
}
// Finally release the locked tables.
for lockedTable := range lockedTables {
if err := os.Remove(lockedTable); err != nil {
return fmt.Errorf("deleting locked file: %w", err)
}
}
return nil
}
// applyReferenceTransaction applies a reference transaction with `git update-ref`.
func (mgr *TransactionManager) applyReferenceTransaction(ctx context.Context, changes []*gitalypb.LogEntry_ReferenceTransaction_Change, repository *localrepo.Repo) (returnedErr error) {
defer trace.StartRegion(ctx, "applyReferenceTransaction").End()
updater, err := updateref.New(ctx, repository, updateref.WithDisabledTransactions(), updateref.WithNoDeref())
if err != nil {
return fmt.Errorf("new: %w", err)
}
defer func() {
if err := updater.Close(); err != nil {
returnedErr = errors.Join(returnedErr, fmt.Errorf("close updater: %w", err))
}
}()
if err := updater.Start(); err != nil {
return fmt.Errorf("start: %w", err)
}
version, err := repository.GitVersion(ctx)
if err != nil {
return fmt.Errorf("git version: %w", err)
}
for _, change := range changes {
if len(change.GetNewTarget()) > 0 {
if err := updater.UpdateSymbolicReference(
version,
git.ReferenceName(change.GetReferenceName()),
git.ReferenceName(change.GetNewTarget()),
); err != nil {
return fmt.Errorf("update symref %q: %w", change.GetReferenceName(), err)
}
} else {
if err := updater.Update(git.ReferenceName(change.GetReferenceName()), git.ObjectID(change.GetNewOid()), ""); err != nil {
return fmt.Errorf("update %q: %w", change.GetReferenceName(), err)
}
}
}
if err := updater.Prepare(); err != nil {
return fmt.Errorf("prepare: %w", err)
}
if err := updater.Commit(); err != nil {
return fmt.Errorf("commit: %w", err)
}
return nil
}
// appendLogEntry appends a log entry of a transaction to the write-ahead log. After the log entry is appended to WAL,
// the corresponding snapshot lock and in-memory reference for the latest appended LSN is created.
func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDependencies map[git.ObjectID]struct{}, logEntry *gitalypb.LogEntry, logEntryPath string) error {
defer trace.StartRegion(ctx, "appendLogEntry").End()
// After this latch block, the transaction is committed and all subsequent transactions
// are guaranteed to read it.
appendedLSN, err := mgr.logManager.AppendLogEntry(logEntryPath)
if err != nil {
return fmt.Errorf("append log entry: %w", err)
}
mgr.mutex.Lock()
mgr.committedEntries.PushBack(&committedEntry{
lsn: appendedLSN,
entry: logEntry,
objectDependencies: objectDependencies,
})
mgr.mutex.Unlock()
return nil
}
// applyLogEntry reads a log entry at the given LSN and applies it to the repository.
func (mgr *TransactionManager) applyLogEntry(ctx context.Context, lsn storage.LSN) error {
defer trace.StartRegion(ctx, "applyLogEntry").End()
defer prometheus.NewTimer(mgr.metrics.transactionApplicationDurationSeconds).ObserveDuration()
manifest, err := wal.ReadManifest(mgr.logManager.GetEntryPath(lsn))
if err != nil {
return fmt.Errorf("read log entry: %w", err)
}
// Ensure all snapshotters have finished snapshotting the previous state before we apply
// the new state to the repository. No new snapshotters can arrive at this point. All
// new transactions would be waiting for the committed log entry we are about to apply.
previousLSN := lsn - 1
mgr.mutex.Lock()
previousLock := mgr.snapshotLocks[previousLSN]
mgr.mutex.Unlock()
// This might take a while, it should better wait out side of mutex lock.
previousLock.activeSnapshotters.Wait()
mgr.mutex.Lock()
delete(mgr.snapshotLocks, previousLSN)
mgr.mutex.Unlock()
mgr.testHooks.beforeApplyLogEntry(lsn)
if err := mgr.db.Update(func(tx keyvalue.ReadWriter) error {
if err := applyOperations(ctx, safe.NewSyncer().Sync, mgr.storagePath, mgr.logManager.GetEntryPath(lsn), manifest.GetOperations(), tx); err != nil {
return fmt.Errorf("apply operations: %w", err)
}
return nil
}); err != nil {
return fmt.Errorf("update: %w", err)
}
if err := mgr.storeAppliedLSN(lsn); err != nil {
return fmt.Errorf("set applied LSN: %w", err)
}
mgr.snapshotManager.SetLSN(lsn)
// Notify the transactions waiting for this log entry to be applied prior to take their
// snapshot.
mgr.mutex.Lock()
mgr.createSnapshotLockIfNeeded(lsn)
close(mgr.snapshotLocks[lsn].applied)
mgr.mutex.Unlock()
return nil
}
// storeAppliedLSN stores the partition's applied LSN in the database.
func (mgr *TransactionManager) storeAppliedLSN(lsn storage.LSN) error {
mgr.testHooks.beforeStoreAppliedLSN(lsn)
if err := mgr.setKey(keyAppliedLSN, lsn.ToProto()); err != nil {
return err
}
if err := mgr.logManager.AcknowledgePosition(log.AppliedPosition, lsn); err != nil {
return fmt.Errorf("acknowledge applied LSN: %w", err)
}
mgr.appliedLSN = lsn
return nil
}
// setKey marshals and stores a given protocol buffer message into the database under the given key.
func (mgr *TransactionManager) setKey(key []byte, value proto.Message) error {
marshaledValue, err := proto.Marshal(value)
if err != nil {
return fmt.Errorf("marshal value: %w", err)
}
writeBatch := mgr.db.NewWriteBatch()
defer writeBatch.Cancel()
if err := writeBatch.Set(key, marshaledValue); err != nil {
return fmt.Errorf("set: %w", err)
}
return writeBatch.Flush()
}
// readKey reads a key from the database and unmarshals its value in to the destination protocol
// buffer message.
func (mgr *TransactionManager) readKey(key []byte, destination proto.Message) error {
return mgr.db.View(func(txn keyvalue.ReadWriter) error {
item, err := txn.Get(key)
if err != nil {
return fmt.Errorf("get: %w", err)
}
return item.Value(func(value []byte) error { return proto.Unmarshal(value, destination) })
})
}
// updateCommittedEntry updates the reader counter of the committed entry of the snapshot that this transaction depends on.
func (mgr *TransactionManager) updateCommittedEntry(snapshotLSN storage.LSN) *committedEntry {
// Since the goroutine doing this is holding the lock, the snapshotLSN shouldn't change and no new transactions
// can be committed or added. That should guarantee .Back() is always the latest transaction and the one we're
// using to base our snapshot on.
if elm := mgr.committedEntries.Back(); elm != nil {
entry := elm.Value.(*committedEntry)
entry.snapshotReaders++
return entry
}
entry := &committedEntry{
lsn: snapshotLSN,
snapshotReaders: 1,
}
mgr.committedEntries.PushBack(entry)
return entry
}
// walkCommittedEntries walks all committed entries after input transaction's snapshot LSN. It loads the content of the
// entry from disk and triggers the callback with entry content.
func (mgr *TransactionManager) walkCommittedEntries(transaction *Transaction, callback func(*gitalypb.LogEntry, map[git.ObjectID]struct{}) error) error {
mgr.mutex.Lock()
defer mgr.mutex.Unlock()
for elm := mgr.committedEntries.Front(); elm != nil; elm = elm.Next() {
committed := elm.Value.(*committedEntry)
if committed.lsn <= transaction.snapshotLSN {
continue
}
if committed.entry == nil {
return errCommittedEntryGone
}
// Transaction manager works on the partition level, including a repository and all of its pool
// member repositories (if any). We need to filter log entries of the repository this
// transaction targets.
if committed.entry.GetRelativePath() != transaction.relativePath {
continue
}
if err := callback(committed.entry, committed.objectDependencies); err != nil {
return fmt.Errorf("callback: %w", err)
}
}
return nil
}
// cleanCommittedEntry reduces the snapshot readers counter of the committed entry. It also removes entries with no more
// readers at the head of the list.
func (mgr *TransactionManager) cleanCommittedEntry(entry *committedEntry) bool {
entry.snapshotReaders--
removedAnyEntry := false
elm := mgr.committedEntries.Front()
for elm != nil {
front := elm.Value.(*committedEntry)
if front.snapshotReaders > 0 {
// If the first entry had still some snapshot readers, that means
// our transaction was not the oldest reader. We can't remove any entries
// as they'll still be needed for conflict checking the older transactions.
return removedAnyEntry
}
mgr.committedEntries.Remove(elm)
// It's safe to drop the transaction from the conflict detection history as there are no transactions
// reading at an older snapshot. Since the changes are already in the transaction's snapshot, it would
// already base its changes on them.
mgr.conflictMgr.EvictLSN(mgr.ctx, front.lsn)
mgr.fsHistory.EvictLSN(front.lsn)
removedAnyEntry = true
elm = mgr.committedEntries.Front()
}
return removedAnyEntry
}
func (mgr *TransactionManager) createSnapshotLockIfNeeded(lsn storage.LSN) {
if _, exist := mgr.snapshotLocks[lsn]; !exist {
mgr.snapshotLocks[lsn] = &snapshotLock{applied: make(chan struct{})}
}
}