in internal/gitaly/storage/storagemgr/partition/transaction_manager.go [1041:1148]
func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transaction) error {
span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.Commit", nil)
defer span.Finish()
transaction.result = make(resultChannel, 1)
if transaction.repositoryTarget() && !transaction.repositoryExists {
// Determine if the repository was created in this transaction and stage its state
// for committing if so.
if err := mgr.stageRepositoryCreation(ctx, transaction); err != nil {
if errors.Is(err, storage.ErrRepositoryNotFound) {
// The repository wasn't created as part of this transaction.
return nil
}
return fmt.Errorf("stage repository creation: %w", err)
}
}
if transaction.repositoryCreation == nil {
if err := mgr.packObjects(ctx, transaction); err != nil {
return fmt.Errorf("pack objects: %w", err)
}
if err := mgr.prepareHousekeeping(ctx, transaction); err != nil {
return fmt.Errorf("preparing housekeeping: %w", err)
}
// If there were objects packed that should be committed, record the packfile's creation.
if transaction.packPrefix != "" {
packDir := filepath.Join(transaction.relativePath, "objects", "pack")
for _, fileExtension := range []string{".pack", ".idx", ".rev"} {
if err := transaction.walEntry.CreateFile(
filepath.Join(transaction.stagingDirectory, "objects"+fileExtension),
filepath.Join(packDir, transaction.packPrefix+fileExtension),
); err != nil {
return fmt.Errorf("record file creation: %w", err)
}
}
}
// Reference changes are only recorded if the repository exists when the transaction
// began. Repository creations record the entire state of the repository at the end
// of the transaction so ReferenceRecorder is not used. ReferenceRecorder is not used
// with reftables.
//
// We only stage the packed-refs file if reference transactions were recorded or
// this was a housekeeping run. This prevents a duplicate removal being staged
// after a repository removal operation as the removal would look like a modification
// to the recorder.
if transaction.referenceRecorder != nil && (len(transaction.referenceUpdates) > 0 || transaction.runHousekeeping != nil) {
if err := transaction.referenceRecorder.StagePackedRefs(); err != nil {
return fmt.Errorf("stage packed refs: %w", err)
}
}
}
transaction.manifest = &gitalypb.LogEntry{
RelativePath: transaction.relativePath,
Operations: transaction.walEntry.Operations(),
ReferenceTransactions: transaction.referenceUpdatesToProto(),
}
if transaction.deleteRepository {
transaction.manifest.RepositoryDeletion = &gitalypb.LogEntry_RepositoryDeletion{}
}
if err := transaction.stageKeyValueOperations(); err != nil {
return fmt.Errorf("stage key-value operations: %w", err)
}
transaction.manifest.Operations = transaction.walEntry.Operations()
if err := wal.WriteManifest(ctx, transaction.walEntry.Directory(), transaction.manifest); err != nil {
return fmt.Errorf("writing manifest file: %w", err)
}
// Sync the log entry completely.
if err := safe.NewSyncer().SyncRecursive(ctx, transaction.walEntry.Directory()); err != nil {
return fmt.Errorf("flush log entry: %w", err)
}
if err := func() error {
defer trace.StartRegion(ctx, "commit queue").End()
transaction.metrics.commitQueueDepth.Inc()
defer transaction.metrics.commitQueueDepth.Dec()
defer prometheus.NewTimer(mgr.metrics.commitQueueWaitSeconds).ObserveDuration()
select {
case mgr.admissionQueue <- transaction:
transaction.admitted = true
return nil
case <-ctx.Done():
return ctx.Err()
case <-mgr.closing:
return storage.ErrTransactionProcessingStopped
}
}(); err != nil {
return err
}
defer trace.StartRegion(ctx, "result wait").End()
select {
case err := <-transaction.result:
return unwrapExpectedError(err)
case <-ctx.Done():
return ctx.Err()
}
}