in internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go [809:964]
func (mgr *TransactionManager) prepareOffloading(ctx context.Context, transaction *Transaction) (returnedErr error) {
if transaction.runHousekeeping.runOffloading == nil {
return nil
}
if mgr.offloadingSink == nil {
return fmt.Errorf("absent offloading sink")
}
span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.prepareOffloading", nil)
defer span.Finish()
// Loading configurations for offloading
cfg := transaction.runHousekeeping.runOffloading.config
workingRepository := mgr.repositoryFactory.Build(transaction.snapshot.RelativePath(transaction.relativePath))
// workingRepoPath is the current repository path which we are performing operations on.
// In the context of transaction, workingRepoPath is a snapshot repository.
workingRepoPath := mgr.getAbsolutePath(workingRepository.GetRelativePath())
// Find the original repository's absolute path. In the context of transaction, originalRepo is the repo
// which we are taking a snapshot of.
originalRepo := &gitalypb.Repository{
StorageName: workingRepository.GetStorageName(),
RelativePath: workingRepository.GetRelativePath(),
}
originalRepo = transaction.OriginalRepository(originalRepo)
originalRepoAbsPath := mgr.getAbsolutePath(originalRepo.GetRelativePath())
// cfg.Prefix should be empty in production, which triggers automatic UUID generation.
// Non-empty prefix values are only used in test environments.
if cfg.Prefix == "" {
offloadingID := uuid.New().String()
// When uploading to offloading storage, use [original repo's relative path + UUID] as prefix
cfg.Prefix = filepath.Join(originalRepo.GetRelativePath(), offloadingID)
}
// Capture the list of pack-files before repacking.
oldPackFiles, err := mgr.collectPackFiles(ctx, workingRepoPath)
if err != nil {
return fmt.Errorf("collecting existing packfiles: %w", err)
}
// Creating a temporary directory where we will put the pack files (the ones to be offloaded)
filterToBase, err := os.MkdirTemp(workingRepoPath, "gitaly-offloading-*")
if err != nil {
return fmt.Errorf("create directory %s: %w", filterToBase, err)
}
filterToDir := filepath.Join(filterToBase, objectsDir, packFileDir)
if err := os.MkdirAll(filterToDir, mode.Directory); err != nil {
return fmt.Errorf("create directory %s: %w", filterToDir, err)
}
// Repack the repository. Current offloading implementation only offloads blobs, so we hard code filter here.
// This can be relaxed to a parameter if more offloading type is supported
repackingFilter := "blob:none"
if err := housekeeping.PerformRepackingForOffloading(ctx, workingRepository, repackingFilter, filterToDir); err != nil {
return errors.Join(errOffloadingOnRepacking, err)
}
packFilesToUpload, err := mgr.collectPackFiles(ctx, filterToBase)
if err != nil {
return fmt.Errorf("collect pack files to upload: %w", err)
}
if len(packFilesToUpload) == 0 {
return fmt.Errorf("no pack files to upload")
}
newPackFilesToStay, err := mgr.collectPackFiles(ctx, workingRepoPath)
if err != nil {
return fmt.Errorf("collect new pack files: %w", err)
}
if slices.Equal(maps.Keys(oldPackFiles), maps.Keys(newPackFilesToStay)) {
return fmt.Errorf("same packs after offloading repacking")
}
uploadedPackFiles := make([]string, 0, len(packFilesToUpload))
defer func() {
// If returnedErr is non-nil, attempt to remove the uploaded file.
// This is a best-effort cleanup; we can't guarantee successful deletion.
// If there is an error, the error is returned to the caller together with the returnedErr.
// Any undeleted files will eventually be removed by a garbage collection job.
if returnedErr != nil {
deletionErrors := mgr.offloadingSink.DeleteObjects(ctx, cfg.Prefix, uploadedPackFiles)
for _, err := range deletionErrors {
if gcerrors.Code(err) != gcerrors.NotFound {
returnedErr = errors.Join(returnedErr, err)
}
}
}
}()
for file := range packFilesToUpload {
if err := mgr.offloadingSink.Upload(ctx, filepath.Join(filterToDir, file), cfg.Prefix); err != nil {
return errors.Join(errOffloadingObjectUpload, err)
}
uploadedPackFiles = append(uploadedPackFiles, file)
}
// Update git config file.
promisorRemoteURL, err := url.JoinPath(cfg.SinkBaseURL, cfg.Prefix)
if err != nil {
return fmt.Errorf("constructing promisor remote URL: %w", err)
}
if err := housekeeping.SetOffloadingGitConfig(ctx, workingRepository, promisorRemoteURL, repackingFilter, nil); err != nil {
return fmt.Errorf("setting offloading git config: %w", err)
}
// If the alternates file does not exist in the original repo, we will not log a removal entry.
// Otherwise, if it does exist, we will log a removal.
if _, err := os.Stat(stats.AlternatesFilePath(workingRepoPath)); !os.IsNotExist(err) {
transaction.walEntry.RemoveDirectoryEntry(stats.AlternatesFilePath(transaction.relativePath))
}
alternatesInWorkingRepo := stats.AlternatesFilePath(workingRepoPath)
// We are calculating the relative path of the cache entry based on the original repo path instead of the
// snapshot repo path here. This is because the alternate file will eventually be copied back to the
// original repo after offloading is completed. If we calculate relative to the snapshot repo path,
// it will not work.
cacheAbsPath := filepath.Join(cfg.CacheRoot, originalRepo.GetRelativePath(), objectsDir) // Must have objectsDir
relativeCacheEntry, err := filepath.Rel(filepath.Join(originalRepoAbsPath, objectsDir), cacheAbsPath)
if err != nil {
return fmt.Errorf("find relative cache entry: %w", err)
}
if err := housekeeping.AddCacheAlternateEntry(alternatesInWorkingRepo, relativeCacheEntry); err != nil {
return fmt.Errorf("adding cache alternate entry: %w", err)
}
// Record WAL entry
for file := range oldPackFiles {
transaction.walEntry.RemoveDirectoryEntry(filepath.Join(transaction.relativePath, objectsDir, packFileDir, file))
}
for file := range newPackFilesToStay {
fileRelativePath := filepath.Join(transaction.relativePath, objectsDir, packFileDir, file)
if err := transaction.walEntry.CreateFile(
filepath.Join(transaction.snapshot.Root(), fileRelativePath),
fileRelativePath,
); err != nil {
return fmt.Errorf("record pack-file creation: %w", err)
}
}
transaction.walEntry.RemoveDirectoryEntry(filepath.Join(transaction.relativePath, configFile))
if err := transaction.walEntry.CreateFile(
filepath.Join(workingRepoPath, configFile),
filepath.Join(transaction.relativePath, configFile),
); err != nil {
return fmt.Errorf("record config file replacement: %w", err)
}
if err := transaction.walEntry.CreateFile(
stats.AlternatesFilePath(workingRepoPath),
stats.AlternatesFilePath(transaction.relativePath),
); err != nil {
return fmt.Errorf("record alternates file replacement: %w", err)
}
return nil
}