func()

in internal/gitaly/storage/storagemgr/partition/transaction_manager.go [207:464]


func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOptions) (_ storage.Transaction, returnedErr error) {
	defer trace.StartRegion(ctx, "begin").End()
	defer prometheus.NewTimer(mgr.metrics.beginDuration(opts.Write)).ObserveDuration()
	transactionDurationTimer := prometheus.NewTimer(mgr.metrics.transactionDuration(opts.Write))

	trace.Log(ctx, "correlation_id", correlation.ExtractFromContext(ctx))
	trace.Log(ctx, "storage_name", mgr.storageName)
	trace.Log(ctx, "partition_id", mgr.partitionID.String())
	trace.Log(ctx, "write", strconv.FormatBool(opts.Write))
	trace.Log(ctx, "relative_path_filter_set", strconv.FormatBool(opts.RelativePaths != nil))
	trace.Log(ctx, "relative_path_filter", strings.Join(opts.RelativePaths, ";"))
	trace.Log(ctx, "force_exclusive_snapshot", strconv.FormatBool(opts.ForceExclusiveSnapshot))

	// Wait until the manager has been initialized so the notification channels
	// and the LSNs are loaded.
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	case <-mgr.initialized:
		if !mgr.initializationSuccessful {
			return nil, errInitializationFailed
		}
	}

	var relativePath string
	if len(opts.RelativePaths) > 0 {
		// Set the first repository as the tracked repository
		relativePath = opts.RelativePaths[0]
	}

	if opts.RelativePaths == nil && opts.Write {
		return nil, errWritableAllRepository
	}

	span, _ := tracing.StartSpanIfHasParent(ctx, "transaction.Begin", nil)
	span.SetTag("write", opts.Write)
	span.SetTag("relativePath", relativePath)
	defer span.Finish()

	mgr.mutex.Lock()

	txn := &Transaction{
		write:        opts.Write,
		commit:       mgr.commit,
		snapshotLSN:  mgr.logManager.AppendedLSN(),
		finished:     make(chan struct{}),
		relativePath: relativePath,
		metrics:      mgr.metrics,
	}

	mgr.createSnapshotLockIfNeeded(txn.snapshotLSN)
	mgr.snapshotLocks[txn.snapshotLSN].activeSnapshotters.Add(1)
	defer mgr.snapshotLocks[txn.snapshotLSN].activeSnapshotters.Done()
	readReady := mgr.snapshotLocks[txn.snapshotLSN].applied

	var entry *committedEntry
	if txn.write {
		entry = mgr.updateCommittedEntry(txn.snapshotLSN)
	}

	mgr.mutex.Unlock()

	span.SetTag("snapshotLSN", txn.snapshotLSN)

	txn.finish = func(admitted bool) error {
		defer trace.StartRegion(ctx, "finish transaction").End()
		defer close(txn.finished)
		defer transactionDurationTimer.ObserveDuration()

		defer func() {
			if txn.db != nil {
				txn.db.Discard()
			}

			if txn.write {
				var removedAnyEntry bool

				mgr.mutex.Lock()
				removedAnyEntry = mgr.cleanCommittedEntry(entry)
				mgr.mutex.Unlock()

				// Signal the manager this transaction finishes. The purpose of this signaling is to wake it up
				// and clean up stale entries in the database. The manager scans and removes leading empty
				// entries. We signal only if the transaction modifies the in-memory committed entry.
				// This signal queue is buffered. If the queue is full, the manager hasn't woken up. The
				// next scan will cover the work of the prior one. So, no need to let the transaction wait.
				//
				//  ┌─ 1st signal   ┌── The manager scans til here
				// ┌─┐ ┌─┐ ┌─┐ ┌─┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐
				// └─┘ └─┘ └┬┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘
				//          └─ 2nd signal
				//
				if removedAnyEntry {
					select {
					case mgr.completedQueue <- struct{}{}:
					default:
					}
				}
			}
		}()

		cleanTemporaryState := func() error {
			defer trace.StartRegion(ctx, "cleanTemporaryState").End()

			var cleanupErr error
			if txn.snapshot != nil {
				if err := txn.snapshot.Close(); err != nil {
					cleanupErr = errors.Join(cleanupErr, fmt.Errorf("close snapshot: %w", err))
				}
			}

			if txn.stagingSnapshot != nil {
				if err := txn.stagingSnapshot.Close(); err != nil {
					cleanupErr = errors.Join(cleanupErr, fmt.Errorf("close staging snapshot: %w", err))
				}
			}

			if txn.stagingDirectory != "" {
				if err := os.RemoveAll(txn.stagingDirectory); err != nil {
					cleanupErr = errors.Join(cleanupErr, fmt.Errorf("remove staging directory: %w", err))
				}
			}

			return cleanupErr
		}

		if admitted {
			// If the transaction was admitted, `.Run()` is responsible for cleaning the transaction up.
			// Cleaning up the snapshots can take a relatively long time if the snapshots are large, or if
			// the file system is busy. To avoid blocking transaction processing, we us a pool of background
			// workers to clean up the transaction snapshots.
			//
			// The number of background workers is limited to exert backpressure on write transactions if
			// we can't clean up after them fast enough.
			mgr.cleanupWorkers.Go(func() error {
				if err := cleanTemporaryState(); err != nil {
					mgr.cleanupWorkerFailedOnce.Do(func() { close(mgr.cleanupWorkerFailed) })
					return fmt.Errorf("clean temporary state async: %w", err)
				}

				return nil
			})

			return nil
		}

		if err := cleanTemporaryState(); err != nil {
			return fmt.Errorf("clean temporary state sync: %w", err)
		}

		return nil
	}

	defer func() {
		if returnedErr != nil {
			if err := txn.finish(false); err != nil {
				mgr.logger.WithError(err).ErrorContext(ctx, "failed finishing unsuccessful transaction begin")
			}
		}
	}()

	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	case <-mgr.ctx.Done():
		return nil, storage.ErrTransactionProcessingStopped
	case <-readReady:
		txn.db = mgr.db.NewTransaction(txn.write)
		txn.recordingReadWriter = keyvalue.NewRecordingReadWriter(txn.db)

		relativePaths := opts.RelativePaths
		if relativePaths == nil {
			relativePaths = txn.PartitionRelativePaths()
		}

		var err error
		txn.stagingDirectory, err = os.MkdirTemp(mgr.stagingDirectory, "")
		if err != nil {
			return nil, fmt.Errorf("mkdir temp: %w", err)
		}

		if txn.snapshot, err = mgr.snapshotManager.GetSnapshot(ctx,
			relativePaths,
			txn.write || opts.ForceExclusiveSnapshot,
		); err != nil {
			return nil, fmt.Errorf("get snapshot: %w", err)
		}

		if txn.write {
			// Create a directory to store all staging files.
			if err := os.Mkdir(txn.walFilesPath(), mode.Directory); err != nil {
				return nil, fmt.Errorf("create wal files directory: %w", err)
			}

			txn.walEntry = wal.NewEntry(txn.walFilesPath())
		}

		txn.fs = fsrecorder.NewFS(txn.snapshot.Root(), txn.walEntry)

		if txn.repositoryTarget() {
			txn.repositoryExists, err = mgr.doesRepositoryExist(ctx, txn.snapshot.RelativePath(txn.relativePath))
			if err != nil {
				return nil, fmt.Errorf("does repository exist: %w", err)
			}

			txn.snapshotRepository = mgr.repositoryFactory.Build(txn.snapshot.RelativePath(txn.relativePath))
			if txn.write {
				if txn.repositoryExists {
					txn.quarantineDirectory = filepath.Join(txn.stagingDirectory, "quarantine")
					if err := os.MkdirAll(filepath.Join(txn.quarantineDirectory, "pack"), mode.Directory); err != nil {
						return nil, fmt.Errorf("create quarantine directory: %w", err)
					}

					txn.snapshotRepository, err = txn.snapshotRepository.Quarantine(ctx, txn.quarantineDirectory)
					if err != nil {
						return nil, fmt.Errorf("quarantine: %w", err)
					}

					refRecorderTmpDir := filepath.Join(txn.stagingDirectory, "ref-recorder")
					if err := os.Mkdir(refRecorderTmpDir, os.ModePerm); err != nil {
						return nil, fmt.Errorf("create reference recorder tmp dir: %w", err)
					}

					refBackend, err := txn.snapshotRepository.ReferenceBackend(ctx)
					if err != nil {
						return nil, fmt.Errorf("reference backend: %w", err)
					}

					if refBackend == git.ReferenceBackendFiles {
						objectHash, err := txn.snapshotRepository.ObjectHash(ctx)
						if err != nil {
							return nil, fmt.Errorf("object hash: %w", err)
						}

						if txn.referenceRecorder, err = wal.NewReferenceRecorder(refRecorderTmpDir, txn.walEntry, txn.snapshot.Root(), txn.relativePath, objectHash.ZeroOID); err != nil {
							return nil, fmt.Errorf("new reference recorder: %w", err)
						}
					}
				} else {
					// The repository does not exist, and this is a write. This should thus create the repository. As the repository's final state
					// is still being logged in TransactionManager, we already log here the creation of any missing parent directories of
					// the repository. When the transaction commits, we don't know if they existed or not, so we can't record this later.
					//
					// If the repository is at the root of the storage, there's no parent directories to create.
					if parentDir := filepath.Dir(txn.relativePath); parentDir != "." {
						if err := storage.MkdirAll(txn.fs, parentDir); err != nil {
							return nil, fmt.Errorf("create parent directories: %w", err)
						}
					}

					txn.quarantineDirectory = filepath.Join(mgr.storagePath, txn.snapshot.RelativePath(txn.relativePath), "objects")
				}
			}
		}

		return txn, nil
	}
}