func()

in internal/gitaly/storage/storagemgr/partition/transaction_manager.go [1674:1826]


func (mgr *TransactionManager) processTransaction(ctx context.Context) (returnedErr error) {
	var transaction *Transaction
	select {
	case transaction = <-mgr.admissionQueue:
		defer trace.StartRegion(ctx, "processTransaction").End()
		defer prometheus.NewTimer(mgr.metrics.transactionProcessingDurationSeconds).ObserveDuration()

		// The transaction does not finish itself anymore once it has been admitted for
		// processing. This avoids the client concurrently removing the staged state
		// while the manager is still operating on it. We thus need to defer its finishing.
		//
		// The error is always empty here as we run the clean up in background. If a background
		// task fails, cleanupWorkerFailed channel is closed prompting the manager to exit and
		// return the error from the errgroup.
		defer func() { _ = transaction.finish(true) }()
	case <-mgr.cleanupWorkerFailed:
		return errors.New("cleanup worker failed")
	case <-mgr.completedQueue:
		return nil
	case logErr := <-mgr.logManager.GetNotificationQueue():
		if logErr != nil {
			return fmt.Errorf("log manager failed: %w", logErr)
		}
		return nil
	case <-ctx.Done():
	}

	// Return if the manager was stopped. The select is indeterministic so this guarantees
	// the manager stops the processing even if there are transactions in the queue.
	if err := ctx.Err(); err != nil {
		return err
	}

	span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.processTransaction", nil)
	defer span.Finish()

	transaction.result <- func() (commitErr error) {
		var zeroOID git.ObjectID
		if transaction.repositoryTarget() {
			repositoryExists, err := mgr.doesRepositoryExist(ctx, transaction.relativePath)
			if err != nil {
				return fmt.Errorf("does repository exist: %w", err)
			}

			if transaction.repositoryCreation != nil && repositoryExists {
				return ErrRepositoryAlreadyExists
			} else if transaction.repositoryCreation == nil && !repositoryExists {
				return storage.ErrRepositoryNotFound
			}

			if repositoryExists {
				targetRepository := mgr.repositoryFactory.Build(transaction.relativePath)

				objectHash, err := targetRepository.ObjectHash(ctx)
				if err != nil {
					return fmt.Errorf("object hash: %w", err)
				}

				zeroOID = objectHash.ZeroOID

				// Verify that all objects this transaction depends on are present in the repository. The dependency
				// objects are the reference tips set in the transaction and the objects the transaction's packfile
				// is based on. If an object dependency is missing, the transaction is aborted as applying it would
				// result in repository corruption.
				if err := mgr.verifyObjectsExist(ctx, targetRepository, transaction.objectDependencies); err != nil {
					return fmt.Errorf("verify object dependencies: %w", err)
				}

				refBackend, err := targetRepository.ReferenceBackend(ctx)
				if err != nil {
					return fmt.Errorf("reference backend: %w", err)
				}

				if refBackend == git.ReferenceBackendReftables || transaction.runHousekeeping != nil {
					if refBackend == git.ReferenceBackendReftables {
						if err := mgr.verifyReferences(ctx, transaction); err != nil {
							return fmt.Errorf("verify references: %w", err)
						}
					}

					if transaction.runHousekeeping != nil {
						housekeepingEntry, err := mgr.verifyHousekeeping(ctx, transaction, refBackend, objectHash.ZeroOID)
						if err != nil {
							return fmt.Errorf("verifying pack refs: %w", err)
						}
						transaction.manifest.Housekeeping = housekeepingEntry
					}
					transaction.manifest.Operations = transaction.walEntry.Operations()

					// The transaction has already written the manifest to the disk as a read-only file
					// before queuing for commit. Remove the old file so we can replace it below.
					if err := wal.RemoveManifest(ctx, transaction.walEntry.Directory()); err != nil {
						return fmt.Errorf("remove outdated manifest")
					}

					// Operations working on the staging snapshot add more files into the log entry,
					// and modify the manifest.
					if err := wal.WriteManifest(ctx, transaction.walEntry.Directory(), transaction.manifest); err != nil {
						return fmt.Errorf("writing manifest file: %w", err)
					}

					// Fsync only the file itself and the parent directory.
					syncer := safe.NewSyncer()
					if err := syncer.Sync(ctx, wal.ManifestPath(transaction.walEntry.Directory())); err != nil {
						return fmt.Errorf("flush updated maninest file: %w", err)
					}
					if err := syncer.Sync(ctx, transaction.walEntry.Directory()); err != nil {
						return fmt.Errorf("flush parent dir of updated manifest file: %w", err)
					}
				}
			}
		}

		// Prepare the transaction to conflict check it. We'll commit it later if we
		// succeed logging the transaction.
		mgr.mutex.Lock()
		preparedTX, err := mgr.conflictMgr.Prepare(ctx, &conflict.Transaction{
			ReadLSN:            transaction.SnapshotLSN(),
			TargetRelativePath: transaction.relativePath,
			DeleteRepository:   transaction.deleteRepository,
			ZeroOID:            zeroOID,
			ReferenceUpdates:   transaction.referenceUpdates,
		})
		mgr.mutex.Unlock()
		if err != nil {
			return fmt.Errorf("prepare: %w", err)
		}

		if err := mgr.verifyKeyValueOperations(ctx, transaction); err != nil {
			return fmt.Errorf("verify key-value operations: %w", err)
		}

		commitFS, err := mgr.verifyFileSystemOperations(ctx, transaction)
		if err != nil {
			return fmt.Errorf("verify file system operations: %w", err)
		}

		mgr.testHooks.beforeAppendLogEntry(mgr.logManager.AppendedLSN() + 1)
		if err := mgr.appendLogEntry(ctx, transaction.objectDependencies, transaction.manifest, transaction.walFilesPath()); err != nil {
			return fmt.Errorf("append log entry: %w", err)
		}

		// Commit the prepared transaction now that we've managed to commit the log entry.
		mgr.mutex.Lock()
		preparedTX.Commit(ctx, mgr.logManager.AppendedLSN())
		commitFS(mgr.logManager.AppendedLSN())
		mgr.mutex.Unlock()

		return nil
	}()

	return nil
}