func()

in internal/gitaly/storage/storagemgr/partition/transaction_manager.go [1041:1148]


func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transaction) error {
	span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.Commit", nil)
	defer span.Finish()

	transaction.result = make(resultChannel, 1)

	if transaction.repositoryTarget() && !transaction.repositoryExists {
		// Determine if the repository was created in this transaction and stage its state
		// for committing if so.
		if err := mgr.stageRepositoryCreation(ctx, transaction); err != nil {
			if errors.Is(err, storage.ErrRepositoryNotFound) {
				// The repository wasn't created as part of this transaction.
				return nil
			}

			return fmt.Errorf("stage repository creation: %w", err)
		}
	}

	if transaction.repositoryCreation == nil {
		if err := mgr.packObjects(ctx, transaction); err != nil {
			return fmt.Errorf("pack objects: %w", err)
		}

		if err := mgr.prepareHousekeeping(ctx, transaction); err != nil {
			return fmt.Errorf("preparing housekeeping: %w", err)
		}

		// If there were objects packed that should be committed, record the packfile's creation.
		if transaction.packPrefix != "" {
			packDir := filepath.Join(transaction.relativePath, "objects", "pack")
			for _, fileExtension := range []string{".pack", ".idx", ".rev"} {
				if err := transaction.walEntry.CreateFile(
					filepath.Join(transaction.stagingDirectory, "objects"+fileExtension),
					filepath.Join(packDir, transaction.packPrefix+fileExtension),
				); err != nil {
					return fmt.Errorf("record file creation: %w", err)
				}
			}
		}

		// Reference changes are only recorded if the repository exists when the transaction
		// began. Repository creations record the entire state of the repository at the end
		// of the transaction so ReferenceRecorder is not used. ReferenceRecorder is not used
		// with reftables.
		//
		// We only stage the packed-refs file if reference transactions were recorded or
		// this was a housekeeping run. This prevents a duplicate removal being staged
		// after a repository removal operation as the removal would look like a modification
		// to the recorder.
		if transaction.referenceRecorder != nil && (len(transaction.referenceUpdates) > 0 || transaction.runHousekeeping != nil) {
			if err := transaction.referenceRecorder.StagePackedRefs(); err != nil {
				return fmt.Errorf("stage packed refs: %w", err)
			}
		}
	}

	transaction.manifest = &gitalypb.LogEntry{
		RelativePath:          transaction.relativePath,
		Operations:            transaction.walEntry.Operations(),
		ReferenceTransactions: transaction.referenceUpdatesToProto(),
	}

	if transaction.deleteRepository {
		transaction.manifest.RepositoryDeletion = &gitalypb.LogEntry_RepositoryDeletion{}
	}

	if err := transaction.stageKeyValueOperations(); err != nil {
		return fmt.Errorf("stage key-value operations: %w", err)
	}
	transaction.manifest.Operations = transaction.walEntry.Operations()

	if err := wal.WriteManifest(ctx, transaction.walEntry.Directory(), transaction.manifest); err != nil {
		return fmt.Errorf("writing manifest file: %w", err)
	}

	// Sync the log entry completely.
	if err := safe.NewSyncer().SyncRecursive(ctx, transaction.walEntry.Directory()); err != nil {
		return fmt.Errorf("flush log entry: %w", err)
	}

	if err := func() error {
		defer trace.StartRegion(ctx, "commit queue").End()
		transaction.metrics.commitQueueDepth.Inc()
		defer transaction.metrics.commitQueueDepth.Dec()
		defer prometheus.NewTimer(mgr.metrics.commitQueueWaitSeconds).ObserveDuration()

		select {
		case mgr.admissionQueue <- transaction:
			transaction.admitted = true
			return nil
		case <-ctx.Done():
			return ctx.Err()
		case <-mgr.closing:
			return storage.ErrTransactionProcessingStopped
		}
	}(); err != nil {
		return err
	}

	defer trace.StartRegion(ctx, "result wait").End()
	select {
	case err := <-transaction.result:
		return unwrapExpectedError(err)
	case <-ctx.Done():
		return ctx.Err()
	}
}