in internal/gitaly/storage/storagemgr/partition/transaction_manager.go [1674:1826]
func (mgr *TransactionManager) processTransaction(ctx context.Context) (returnedErr error) {
var transaction *Transaction
select {
case transaction = <-mgr.admissionQueue:
defer trace.StartRegion(ctx, "processTransaction").End()
defer prometheus.NewTimer(mgr.metrics.transactionProcessingDurationSeconds).ObserveDuration()
// The transaction does not finish itself anymore once it has been admitted for
// processing. This avoids the client concurrently removing the staged state
// while the manager is still operating on it. We thus need to defer its finishing.
//
// The error is always empty here as we run the clean up in background. If a background
// task fails, cleanupWorkerFailed channel is closed prompting the manager to exit and
// return the error from the errgroup.
defer func() { _ = transaction.finish(true) }()
case <-mgr.cleanupWorkerFailed:
return errors.New("cleanup worker failed")
case <-mgr.completedQueue:
return nil
case logErr := <-mgr.logManager.GetNotificationQueue():
if logErr != nil {
return fmt.Errorf("log manager failed: %w", logErr)
}
return nil
case <-ctx.Done():
}
// Return if the manager was stopped. The select is indeterministic so this guarantees
// the manager stops the processing even if there are transactions in the queue.
if err := ctx.Err(); err != nil {
return err
}
span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.processTransaction", nil)
defer span.Finish()
transaction.result <- func() (commitErr error) {
var zeroOID git.ObjectID
if transaction.repositoryTarget() {
repositoryExists, err := mgr.doesRepositoryExist(ctx, transaction.relativePath)
if err != nil {
return fmt.Errorf("does repository exist: %w", err)
}
if transaction.repositoryCreation != nil && repositoryExists {
return ErrRepositoryAlreadyExists
} else if transaction.repositoryCreation == nil && !repositoryExists {
return storage.ErrRepositoryNotFound
}
if repositoryExists {
targetRepository := mgr.repositoryFactory.Build(transaction.relativePath)
objectHash, err := targetRepository.ObjectHash(ctx)
if err != nil {
return fmt.Errorf("object hash: %w", err)
}
zeroOID = objectHash.ZeroOID
// Verify that all objects this transaction depends on are present in the repository. The dependency
// objects are the reference tips set in the transaction and the objects the transaction's packfile
// is based on. If an object dependency is missing, the transaction is aborted as applying it would
// result in repository corruption.
if err := mgr.verifyObjectsExist(ctx, targetRepository, transaction.objectDependencies); err != nil {
return fmt.Errorf("verify object dependencies: %w", err)
}
refBackend, err := targetRepository.ReferenceBackend(ctx)
if err != nil {
return fmt.Errorf("reference backend: %w", err)
}
if refBackend == git.ReferenceBackendReftables || transaction.runHousekeeping != nil {
if refBackend == git.ReferenceBackendReftables {
if err := mgr.verifyReferences(ctx, transaction); err != nil {
return fmt.Errorf("verify references: %w", err)
}
}
if transaction.runHousekeeping != nil {
housekeepingEntry, err := mgr.verifyHousekeeping(ctx, transaction, refBackend, objectHash.ZeroOID)
if err != nil {
return fmt.Errorf("verifying pack refs: %w", err)
}
transaction.manifest.Housekeeping = housekeepingEntry
}
transaction.manifest.Operations = transaction.walEntry.Operations()
// The transaction has already written the manifest to the disk as a read-only file
// before queuing for commit. Remove the old file so we can replace it below.
if err := wal.RemoveManifest(ctx, transaction.walEntry.Directory()); err != nil {
return fmt.Errorf("remove outdated manifest")
}
// Operations working on the staging snapshot add more files into the log entry,
// and modify the manifest.
if err := wal.WriteManifest(ctx, transaction.walEntry.Directory(), transaction.manifest); err != nil {
return fmt.Errorf("writing manifest file: %w", err)
}
// Fsync only the file itself and the parent directory.
syncer := safe.NewSyncer()
if err := syncer.Sync(ctx, wal.ManifestPath(transaction.walEntry.Directory())); err != nil {
return fmt.Errorf("flush updated maninest file: %w", err)
}
if err := syncer.Sync(ctx, transaction.walEntry.Directory()); err != nil {
return fmt.Errorf("flush parent dir of updated manifest file: %w", err)
}
}
}
}
// Prepare the transaction to conflict check it. We'll commit it later if we
// succeed logging the transaction.
mgr.mutex.Lock()
preparedTX, err := mgr.conflictMgr.Prepare(ctx, &conflict.Transaction{
ReadLSN: transaction.SnapshotLSN(),
TargetRelativePath: transaction.relativePath,
DeleteRepository: transaction.deleteRepository,
ZeroOID: zeroOID,
ReferenceUpdates: transaction.referenceUpdates,
})
mgr.mutex.Unlock()
if err != nil {
return fmt.Errorf("prepare: %w", err)
}
if err := mgr.verifyKeyValueOperations(ctx, transaction); err != nil {
return fmt.Errorf("verify key-value operations: %w", err)
}
commitFS, err := mgr.verifyFileSystemOperations(ctx, transaction)
if err != nil {
return fmt.Errorf("verify file system operations: %w", err)
}
mgr.testHooks.beforeAppendLogEntry(mgr.logManager.AppendedLSN() + 1)
if err := mgr.appendLogEntry(ctx, transaction.objectDependencies, transaction.manifest, transaction.walFilesPath()); err != nil {
return fmt.Errorf("append log entry: %w", err)
}
// Commit the prepared transaction now that we've managed to commit the log entry.
mgr.mutex.Lock()
preparedTX.Commit(ctx, mgr.logManager.AppendedLSN())
commitFS(mgr.logManager.AppendedLSN())
mgr.mutex.Unlock()
return nil
}()
return nil
}