in internal/gitaly/storage/storagemgr/partition/transaction_manager.go [207:464]
func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOptions) (_ storage.Transaction, returnedErr error) {
defer trace.StartRegion(ctx, "begin").End()
defer prometheus.NewTimer(mgr.metrics.beginDuration(opts.Write)).ObserveDuration()
transactionDurationTimer := prometheus.NewTimer(mgr.metrics.transactionDuration(opts.Write))
trace.Log(ctx, "correlation_id", correlation.ExtractFromContext(ctx))
trace.Log(ctx, "storage_name", mgr.storageName)
trace.Log(ctx, "partition_id", mgr.partitionID.String())
trace.Log(ctx, "write", strconv.FormatBool(opts.Write))
trace.Log(ctx, "relative_path_filter_set", strconv.FormatBool(opts.RelativePaths != nil))
trace.Log(ctx, "relative_path_filter", strings.Join(opts.RelativePaths, ";"))
trace.Log(ctx, "force_exclusive_snapshot", strconv.FormatBool(opts.ForceExclusiveSnapshot))
// Wait until the manager has been initialized so the notification channels
// and the LSNs are loaded.
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-mgr.initialized:
if !mgr.initializationSuccessful {
return nil, errInitializationFailed
}
}
var relativePath string
if len(opts.RelativePaths) > 0 {
// Set the first repository as the tracked repository
relativePath = opts.RelativePaths[0]
}
if opts.RelativePaths == nil && opts.Write {
return nil, errWritableAllRepository
}
span, _ := tracing.StartSpanIfHasParent(ctx, "transaction.Begin", nil)
span.SetTag("write", opts.Write)
span.SetTag("relativePath", relativePath)
defer span.Finish()
mgr.mutex.Lock()
txn := &Transaction{
write: opts.Write,
commit: mgr.commit,
snapshotLSN: mgr.logManager.AppendedLSN(),
finished: make(chan struct{}),
relativePath: relativePath,
metrics: mgr.metrics,
}
mgr.createSnapshotLockIfNeeded(txn.snapshotLSN)
mgr.snapshotLocks[txn.snapshotLSN].activeSnapshotters.Add(1)
defer mgr.snapshotLocks[txn.snapshotLSN].activeSnapshotters.Done()
readReady := mgr.snapshotLocks[txn.snapshotLSN].applied
var entry *committedEntry
if txn.write {
entry = mgr.updateCommittedEntry(txn.snapshotLSN)
}
mgr.mutex.Unlock()
span.SetTag("snapshotLSN", txn.snapshotLSN)
txn.finish = func(admitted bool) error {
defer trace.StartRegion(ctx, "finish transaction").End()
defer close(txn.finished)
defer transactionDurationTimer.ObserveDuration()
defer func() {
if txn.db != nil {
txn.db.Discard()
}
if txn.write {
var removedAnyEntry bool
mgr.mutex.Lock()
removedAnyEntry = mgr.cleanCommittedEntry(entry)
mgr.mutex.Unlock()
// Signal the manager this transaction finishes. The purpose of this signaling is to wake it up
// and clean up stale entries in the database. The manager scans and removes leading empty
// entries. We signal only if the transaction modifies the in-memory committed entry.
// This signal queue is buffered. If the queue is full, the manager hasn't woken up. The
// next scan will cover the work of the prior one. So, no need to let the transaction wait.
//
// ┌─ 1st signal ┌── The manager scans til here
// ┌─┐ ┌─┐ ┌─┐ ┌─┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐
// └─┘ └─┘ └┬┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘
// └─ 2nd signal
//
if removedAnyEntry {
select {
case mgr.completedQueue <- struct{}{}:
default:
}
}
}
}()
cleanTemporaryState := func() error {
defer trace.StartRegion(ctx, "cleanTemporaryState").End()
var cleanupErr error
if txn.snapshot != nil {
if err := txn.snapshot.Close(); err != nil {
cleanupErr = errors.Join(cleanupErr, fmt.Errorf("close snapshot: %w", err))
}
}
if txn.stagingSnapshot != nil {
if err := txn.stagingSnapshot.Close(); err != nil {
cleanupErr = errors.Join(cleanupErr, fmt.Errorf("close staging snapshot: %w", err))
}
}
if txn.stagingDirectory != "" {
if err := os.RemoveAll(txn.stagingDirectory); err != nil {
cleanupErr = errors.Join(cleanupErr, fmt.Errorf("remove staging directory: %w", err))
}
}
return cleanupErr
}
if admitted {
// If the transaction was admitted, `.Run()` is responsible for cleaning the transaction up.
// Cleaning up the snapshots can take a relatively long time if the snapshots are large, or if
// the file system is busy. To avoid blocking transaction processing, we us a pool of background
// workers to clean up the transaction snapshots.
//
// The number of background workers is limited to exert backpressure on write transactions if
// we can't clean up after them fast enough.
mgr.cleanupWorkers.Go(func() error {
if err := cleanTemporaryState(); err != nil {
mgr.cleanupWorkerFailedOnce.Do(func() { close(mgr.cleanupWorkerFailed) })
return fmt.Errorf("clean temporary state async: %w", err)
}
return nil
})
return nil
}
if err := cleanTemporaryState(); err != nil {
return fmt.Errorf("clean temporary state sync: %w", err)
}
return nil
}
defer func() {
if returnedErr != nil {
if err := txn.finish(false); err != nil {
mgr.logger.WithError(err).ErrorContext(ctx, "failed finishing unsuccessful transaction begin")
}
}
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-mgr.ctx.Done():
return nil, storage.ErrTransactionProcessingStopped
case <-readReady:
txn.db = mgr.db.NewTransaction(txn.write)
txn.recordingReadWriter = keyvalue.NewRecordingReadWriter(txn.db)
relativePaths := opts.RelativePaths
if relativePaths == nil {
relativePaths = txn.PartitionRelativePaths()
}
var err error
txn.stagingDirectory, err = os.MkdirTemp(mgr.stagingDirectory, "")
if err != nil {
return nil, fmt.Errorf("mkdir temp: %w", err)
}
if txn.snapshot, err = mgr.snapshotManager.GetSnapshot(ctx,
relativePaths,
txn.write || opts.ForceExclusiveSnapshot,
); err != nil {
return nil, fmt.Errorf("get snapshot: %w", err)
}
if txn.write {
// Create a directory to store all staging files.
if err := os.Mkdir(txn.walFilesPath(), mode.Directory); err != nil {
return nil, fmt.Errorf("create wal files directory: %w", err)
}
txn.walEntry = wal.NewEntry(txn.walFilesPath())
}
txn.fs = fsrecorder.NewFS(txn.snapshot.Root(), txn.walEntry)
if txn.repositoryTarget() {
txn.repositoryExists, err = mgr.doesRepositoryExist(ctx, txn.snapshot.RelativePath(txn.relativePath))
if err != nil {
return nil, fmt.Errorf("does repository exist: %w", err)
}
txn.snapshotRepository = mgr.repositoryFactory.Build(txn.snapshot.RelativePath(txn.relativePath))
if txn.write {
if txn.repositoryExists {
txn.quarantineDirectory = filepath.Join(txn.stagingDirectory, "quarantine")
if err := os.MkdirAll(filepath.Join(txn.quarantineDirectory, "pack"), mode.Directory); err != nil {
return nil, fmt.Errorf("create quarantine directory: %w", err)
}
txn.snapshotRepository, err = txn.snapshotRepository.Quarantine(ctx, txn.quarantineDirectory)
if err != nil {
return nil, fmt.Errorf("quarantine: %w", err)
}
refRecorderTmpDir := filepath.Join(txn.stagingDirectory, "ref-recorder")
if err := os.Mkdir(refRecorderTmpDir, os.ModePerm); err != nil {
return nil, fmt.Errorf("create reference recorder tmp dir: %w", err)
}
refBackend, err := txn.snapshotRepository.ReferenceBackend(ctx)
if err != nil {
return nil, fmt.Errorf("reference backend: %w", err)
}
if refBackend == git.ReferenceBackendFiles {
objectHash, err := txn.snapshotRepository.ObjectHash(ctx)
if err != nil {
return nil, fmt.Errorf("object hash: %w", err)
}
if txn.referenceRecorder, err = wal.NewReferenceRecorder(refRecorderTmpDir, txn.walEntry, txn.snapshot.Root(), txn.relativePath, objectHash.ZeroOID); err != nil {
return nil, fmt.Errorf("new reference recorder: %w", err)
}
}
} else {
// The repository does not exist, and this is a write. This should thus create the repository. As the repository's final state
// is still being logged in TransactionManager, we already log here the creation of any missing parent directories of
// the repository. When the transaction commits, we don't know if they existed or not, so we can't record this later.
//
// If the repository is at the root of the storage, there's no parent directories to create.
if parentDir := filepath.Dir(txn.relativePath); parentDir != "." {
if err := storage.MkdirAll(txn.fs, parentDir); err != nil {
return nil, fmt.Errorf("create parent directories: %w", err)
}
}
txn.quarantineDirectory = filepath.Join(mgr.storagePath, txn.snapshot.RelativePath(txn.relativePath), "objects")
}
}
}
return txn, nil
}
}