in internal/gitaly/storage/storagemgr/partition/transaction_manager.go [1280:1447]
func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Transaction) (returnedErr error) {
defer trace.StartRegion(ctx, "packObjects").End()
if !transaction.repositoryTarget() {
return nil
}
if _, err := os.Stat(mgr.getAbsolutePath(transaction.snapshotRepository.GetRelativePath())); err != nil {
if !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("stat: %w", err)
}
// The repository does not exist. Exit early as the Git commands below would fail. There's
// nothing to pack and no dependencies if the repository doesn't exist.
return nil
}
span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.packObjects", nil)
defer span.Finish()
// We want to only pack the objects that are present in the quarantine as they are potentially
// new. Disable the alternate, which is the repository's original object directory, so that we'll
// only walk the objects in the quarantine directory below.
quarantineOnlySnapshotRepository, err := transaction.snapshotRepository.QuarantineOnly()
if err != nil {
return fmt.Errorf("quarantine only: %w", err)
}
objectHash, err := quarantineOnlySnapshotRepository.ObjectHash(ctx)
if err != nil {
return fmt.Errorf("object hash: %w", err)
}
heads := make([]string, 0)
for _, referenceUpdates := range transaction.referenceUpdates {
for _, update := range referenceUpdates {
if !update.IsRegularUpdate() {
// We don't have to worry about symrefs here.
continue
}
if update.NewOID == objectHash.ZeroOID {
// Reference deletions can't introduce new objects so ignore them.
continue
}
heads = append(heads, update.NewOID.String())
}
}
for objectID := range transaction.includedObjects {
heads = append(heads, objectID.String())
}
if len(heads) == 0 {
// No need to pack objects if there are no changes that can introduce new objects.
return nil
}
objectWalkReader, objectWalkWriter := io.Pipe()
group, ctx := errgroup.WithContext(ctx)
group.Go(func() (returnedErr error) {
defer objectWalkWriter.CloseWithError(returnedErr)
// Walk the new reference tips and included objects in the quarantine directory. The reachable
// objects will be included in the transaction's logged packfile and the unreachable ones
// discarded, and missing objects regard as the transaction's dependencies.
if err := quarantineOnlySnapshotRepository.WalkObjects(ctx,
strings.NewReader(strings.Join(heads, "\n")),
objectWalkWriter,
); err != nil {
return fmt.Errorf("walk objects: %w", err)
}
return nil
})
objectsToPackReader, objectsToPackWriter := io.Pipe()
// We'll only start the commands needed for object packing if the walk above produces objects
// we need to pack.
startObjectPacking := func() {
packReader, packWriter := io.Pipe()
group.Go(func() (returnedErr error) {
defer func() {
objectsToPackReader.CloseWithError(returnedErr)
packWriter.CloseWithError(returnedErr)
}()
if err := quarantineOnlySnapshotRepository.PackObjects(ctx, objectsToPackReader, packWriter); err != nil {
return fmt.Errorf("pack objects: %w", err)
}
return nil
})
group.Go(func() (returnedErr error) {
defer packReader.CloseWithError(returnedErr)
// index-pack places the pack, index, and reverse index into the transaction's staging directory.
var stdout, stderr bytes.Buffer
if err := quarantineOnlySnapshotRepository.ExecAndWait(ctx, gitcmd.Command{
Name: "index-pack",
Flags: []gitcmd.Option{gitcmd.Flag{Name: "--stdin"}, gitcmd.Flag{Name: "--rev-index"}},
Args: []string{filepath.Join(transaction.stagingDirectory, "objects.pack")},
}, gitcmd.WithStdin(packReader), gitcmd.WithStdout(&stdout), gitcmd.WithStderr(&stderr)); err != nil {
return structerr.New("index pack: %w", err).WithMetadata("stderr", stderr.String())
}
matches := packPrefixRegexp.FindStringSubmatch(stdout.String())
if len(matches) != 2 {
return structerr.New("unexpected index-pack output").WithMetadata("stdout", stdout.String())
}
transaction.packPrefix = fmt.Sprintf("pack-%s", matches[1])
return nil
})
}
transaction.objectDependencies = map[git.ObjectID]struct{}{}
group.Go(func() (returnedErr error) {
defer objectWalkReader.CloseWithError(returnedErr)
// objectLine comes in two formats from the walk:
// 1. '<oid> <path>\n' in case the object is found. <path> may or may not be set.
// 2. '?<oid>\n' in case the object is not found.
//
// Objects that are found are included in the transaction's packfile.
//
// Objects that are not found are recorded as the transaction's
// dependencies since they should exist in the repository.
scanner := bufio.NewScanner(objectWalkReader)
defer objectsToPackWriter.CloseWithError(returnedErr)
packObjectsStarted := false
for scanner.Scan() {
objectLine := scanner.Text()
if objectLine[0] == '?' {
// Remove the '?' prefix so we're left with just the object ID.
transaction.objectDependencies[git.ObjectID(objectLine[1:])] = struct{}{}
continue
}
// At this point we have an object that we need to pack. If `pack-objects` and `index-pack`
// haven't yet been launched, launch them.
if !packObjectsStarted {
packObjectsStarted = true
startObjectPacking()
}
// Write the objects to `git pack-objects`. Restore the new line that was
// trimmed by the scanner.
if _, err := objectsToPackWriter.Write([]byte(objectLine + "\n")); err != nil {
return fmt.Errorf("write object id for packing: %w", err)
}
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("scanning rev-list output: %w", err)
}
return nil
})
return group.Wait()
}