func()

in internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go [809:964]


func (mgr *TransactionManager) prepareOffloading(ctx context.Context, transaction *Transaction) (returnedErr error) {
	if transaction.runHousekeeping.runOffloading == nil {
		return nil
	}
	if mgr.offloadingSink == nil {
		return fmt.Errorf("absent offloading sink")
	}

	span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.prepareOffloading", nil)
	defer span.Finish()

	// Loading configurations for offloading
	cfg := transaction.runHousekeeping.runOffloading.config

	workingRepository := mgr.repositoryFactory.Build(transaction.snapshot.RelativePath(transaction.relativePath))
	// workingRepoPath is the current repository path which we are performing operations on.
	// In the context of transaction, workingRepoPath is a snapshot repository.
	workingRepoPath := mgr.getAbsolutePath(workingRepository.GetRelativePath())
	// Find the original repository's absolute path. In the context of transaction, originalRepo is the repo
	// which we are taking a snapshot of.
	originalRepo := &gitalypb.Repository{
		StorageName:  workingRepository.GetStorageName(),
		RelativePath: workingRepository.GetRelativePath(),
	}
	originalRepo = transaction.OriginalRepository(originalRepo)
	originalRepoAbsPath := mgr.getAbsolutePath(originalRepo.GetRelativePath())

	// cfg.Prefix should be empty in production, which triggers automatic UUID generation.
	// Non-empty prefix values are only used in test environments.
	if cfg.Prefix == "" {
		offloadingID := uuid.New().String()
		// When uploading to offloading storage, use [original repo's relative path + UUID] as prefix
		cfg.Prefix = filepath.Join(originalRepo.GetRelativePath(), offloadingID)
	}

	// Capture the list of pack-files before repacking.
	oldPackFiles, err := mgr.collectPackFiles(ctx, workingRepoPath)
	if err != nil {
		return fmt.Errorf("collecting existing packfiles: %w", err)
	}

	// Creating a temporary directory where we will put the pack files (the ones to be offloaded)
	filterToBase, err := os.MkdirTemp(workingRepoPath, "gitaly-offloading-*")
	if err != nil {
		return fmt.Errorf("create directory %s: %w", filterToBase, err)
	}
	filterToDir := filepath.Join(filterToBase, objectsDir, packFileDir)
	if err := os.MkdirAll(filterToDir, mode.Directory); err != nil {
		return fmt.Errorf("create directory %s: %w", filterToDir, err)
	}

	// Repack the repository. Current offloading implementation only offloads blobs, so we hard code filter here.
	// This can be relaxed to a parameter if more offloading type is supported
	repackingFilter := "blob:none"
	if err := housekeeping.PerformRepackingForOffloading(ctx, workingRepository, repackingFilter, filterToDir); err != nil {
		return errors.Join(errOffloadingOnRepacking, err)
	}
	packFilesToUpload, err := mgr.collectPackFiles(ctx, filterToBase)
	if err != nil {
		return fmt.Errorf("collect pack files to upload: %w", err)
	}
	if len(packFilesToUpload) == 0 {
		return fmt.Errorf("no pack files to upload")
	}
	newPackFilesToStay, err := mgr.collectPackFiles(ctx, workingRepoPath)
	if err != nil {
		return fmt.Errorf("collect new pack files: %w", err)
	}
	if slices.Equal(maps.Keys(oldPackFiles), maps.Keys(newPackFilesToStay)) {
		return fmt.Errorf("same packs after offloading repacking")
	}

	uploadedPackFiles := make([]string, 0, len(packFilesToUpload))
	defer func() {
		// If returnedErr is non-nil, attempt to remove the uploaded file.
		// This is a best-effort cleanup; we can't guarantee successful deletion.
		// If there is an error, the error is returned to the caller together with the returnedErr.
		// Any undeleted files will eventually be removed by a garbage collection job.
		if returnedErr != nil {
			deletionErrors := mgr.offloadingSink.DeleteObjects(ctx, cfg.Prefix, uploadedPackFiles)
			for _, err := range deletionErrors {
				if gcerrors.Code(err) != gcerrors.NotFound {
					returnedErr = errors.Join(returnedErr, err)
				}
			}
		}
	}()
	for file := range packFilesToUpload {
		if err := mgr.offloadingSink.Upload(ctx, filepath.Join(filterToDir, file), cfg.Prefix); err != nil {
			return errors.Join(errOffloadingObjectUpload, err)
		}
		uploadedPackFiles = append(uploadedPackFiles, file)
	}

	// Update git config file.
	promisorRemoteURL, err := url.JoinPath(cfg.SinkBaseURL, cfg.Prefix)
	if err != nil {
		return fmt.Errorf("constructing promisor remote URL: %w", err)
	}
	if err := housekeeping.SetOffloadingGitConfig(ctx, workingRepository, promisorRemoteURL, repackingFilter, nil); err != nil {
		return fmt.Errorf("setting offloading git config: %w", err)
	}

	// If the alternates file does not exist in the original repo, we will not log a removal entry.
	// Otherwise, if it does exist, we will log a removal.
	if _, err := os.Stat(stats.AlternatesFilePath(workingRepoPath)); !os.IsNotExist(err) {
		transaction.walEntry.RemoveDirectoryEntry(stats.AlternatesFilePath(transaction.relativePath))
	}

	alternatesInWorkingRepo := stats.AlternatesFilePath(workingRepoPath)
	// We are calculating the relative path of the cache entry based on the original repo path instead of the
	// snapshot repo path here. This is because the alternate file will eventually be copied back to the
	// original repo after offloading is completed. If we calculate relative to the snapshot repo path,
	// it will not work.

	cacheAbsPath := filepath.Join(cfg.CacheRoot, originalRepo.GetRelativePath(), objectsDir) // Must have objectsDir
	relativeCacheEntry, err := filepath.Rel(filepath.Join(originalRepoAbsPath, objectsDir), cacheAbsPath)
	if err != nil {
		return fmt.Errorf("find relative cache entry: %w", err)
	}
	if err := housekeeping.AddCacheAlternateEntry(alternatesInWorkingRepo, relativeCacheEntry); err != nil {
		return fmt.Errorf("adding cache alternate entry: %w", err)
	}

	// Record WAL entry
	for file := range oldPackFiles {
		transaction.walEntry.RemoveDirectoryEntry(filepath.Join(transaction.relativePath, objectsDir, packFileDir, file))
	}

	for file := range newPackFilesToStay {
		fileRelativePath := filepath.Join(transaction.relativePath, objectsDir, packFileDir, file)
		if err := transaction.walEntry.CreateFile(
			filepath.Join(transaction.snapshot.Root(), fileRelativePath),
			fileRelativePath,
		); err != nil {
			return fmt.Errorf("record pack-file creation: %w", err)
		}
	}

	transaction.walEntry.RemoveDirectoryEntry(filepath.Join(transaction.relativePath, configFile))
	if err := transaction.walEntry.CreateFile(
		filepath.Join(workingRepoPath, configFile),
		filepath.Join(transaction.relativePath, configFile),
	); err != nil {
		return fmt.Errorf("record config file replacement: %w", err)
	}

	if err := transaction.walEntry.CreateFile(
		stats.AlternatesFilePath(workingRepoPath),
		stats.AlternatesFilePath(transaction.relativePath),
	); err != nil {
		return fmt.Errorf("record alternates file replacement: %w", err)
	}

	return nil
}