internal/gitaly/service/blob/lfs_pointers.go (204 lines of code) (raw):

package blob import ( "bytes" "context" "fmt" "io" "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitpipe" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/chunk" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/protobuf/proto" ) const ( // lfsPointerMaxSize is the maximum size for an lfs pointer text blob. This limit is used // as a heuristic to filter blobs which can't be LFS pointers. The format of these pointers // is described in https://github.com/git-lfs/git-lfs/blob/master/docs/spec.md#the-pointer. lfsPointerMaxSize = 200 ) // ListLFSPointers finds all LFS pointers which are transitively reachable via a graph walk of the // given set of revisions. func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream gitalypb.BlobService_ListLFSPointersServer) error { ctx := stream.Context() repository := in.GetRepository() if err := s.locator.ValidateRepository(stream.Context(), repository); err != nil { return err } if len(in.GetRevisions()) == 0 { return structerr.NewInvalidArgument("missing revisions") } for _, revision := range in.GetRevisions() { if err := git.ValidateRevision([]byte(revision), git.AllowPathScopedRevision(), git.AllowPseudoRevision()); err != nil { return structerr.NewInvalidArgument("invalid revision: %w", err).WithMetadata("revision", revision) } } chunker := chunk.New(&lfsPointerSender{ send: func(pointers []*gitalypb.LFSPointer) error { return stream.Send(&gitalypb.ListLFSPointersResponse{ LfsPointers: pointers, }) }, }) repo := s.localRepoFactory.Build(repository) objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return structerr.NewInternal("creating object reader: %w", err) } defer cancel() revlistIter := gitpipe.Revlist(ctx, repo, in.GetRevisions(), gitpipe.WithObjects(), gitpipe.WithBlobLimit(lfsPointerMaxSize), gitpipe.WithObjectTypeFilter(gitpipe.ObjectTypeBlob), ) catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, revlistIter) if err != nil { return structerr.NewInternal("creating object iterator: %w", err) } if err := sendLFSPointers(chunker, catfileObjectIter, int(in.GetLimit())); err != nil { return err } return nil } // ListAllLFSPointers finds all LFS pointers which exist in the repository, including those which // are not reachable via graph walks. func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stream gitalypb.BlobService_ListAllLFSPointersServer) error { ctx := stream.Context() repository := in.GetRepository() if err := s.locator.ValidateRepository(stream.Context(), repository); err != nil { return structerr.NewInvalidArgument("%w", err) } repo := s.localRepoFactory.Build(repository) gitVersion, err := repo.GitVersion(ctx) if err != nil { return structerr.NewInternal("detecting availability of object type filter: %w", err) } chunker := chunk.New(&lfsPointerSender{ send: func(pointers []*gitalypb.LFSPointer) error { return stream.Send(&gitalypb.ListAllLFSPointersResponse{ LfsPointers: pointers, }) }, }) objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return structerr.NewInternal("creating object reader: %w", err) } defer cancel() catfileInfoOptions := []gitpipe.CatfileInfoOption{ gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool { return objectInfo.Type != "blob" || objectInfo.Size > lfsPointerMaxSize }), } if featureflag.CatfileObjectTypeFilter.IsEnabled(ctx) && gitVersion.IsCatfileObjectTypeFilterSupported() { catfileInfoOptions = append(catfileInfoOptions, gitpipe.WithCatfileObjectTypeFilter(gitpipe.ObjectTypeBlob)) } catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo, catfileInfoOptions...) catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) if err != nil { return structerr.NewInternal("creating object iterator: %w", err) } if err := sendLFSPointers(chunker, catfileObjectIter, int(in.GetLimit())); err != nil { return err } return nil } // GetLFSPointers takes the list of requested blob IDs and filters them down to blobs which are // valid LFS pointers. It is fine to pass blob IDs which do not point to a valid LFS pointer, but // passing blob IDs which do not exist results in an error. func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gitalypb.BlobService_GetLFSPointersServer) error { ctx := stream.Context() if err := validateGetLFSPointersRequest(stream.Context(), s.locator, req); err != nil { return structerr.NewInvalidArgument("%w", err) } repo := s.localRepoFactory.Build(req.GetRepository()) chunker := chunk.New(&lfsPointerSender{ send: func(pointers []*gitalypb.LFSPointer) error { return stream.Send(&gitalypb.GetLFSPointersResponse{ LfsPointers: pointers, }) }, }) objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo) if err != nil { return structerr.NewInternal("creating object info reader: %w", err) } defer cancel() objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return structerr.NewInternal("creating object reader: %w", err) } defer cancel() blobs := make([]gitpipe.RevisionResult, len(req.GetBlobIds())) for i, blobID := range req.GetBlobIds() { blobs[i] = gitpipe.RevisionResult{OID: git.ObjectID(blobID)} } catfileInfoIter, err := gitpipe.CatfileInfo(ctx, objectInfoReader, gitpipe.NewRevisionIterator(ctx, blobs), gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool { return objectInfo.Type != "blob" || objectInfo.Size > lfsPointerMaxSize }), ) if err != nil { return structerr.NewInternal("creating object info iterator: %w", err) } catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) if err != nil { return structerr.NewInternal("creating object iterator: %w", err) } if err := sendLFSPointers(chunker, catfileObjectIter, 0); err != nil { return err } return nil } func validateGetLFSPointersRequest(ctx context.Context, locator storage.Locator, req *gitalypb.GetLFSPointersRequest) error { if err := locator.ValidateRepository(ctx, req.GetRepository()); err != nil { return err } if len(req.GetBlobIds()) == 0 { return fmt.Errorf("empty BlobIds") } return nil } type lfsPointerSender struct { pointers []*gitalypb.LFSPointer send func([]*gitalypb.LFSPointer) error } func (t *lfsPointerSender) Reset() { t.pointers = t.pointers[:0] } func (t *lfsPointerSender) Append(m proto.Message) { t.pointers = append(t.pointers, m.(*gitalypb.LFSPointer)) } func (t *lfsPointerSender) Send() error { return t.send(t.pointers) } func sendLFSPointers(chunker *chunk.Chunker, iter gitpipe.CatfileObjectIterator, limit int) error { buffer := bytes.NewBuffer(make([]byte, 0, lfsPointerMaxSize)) var i int for iter.Next() { lfsPointer := iter.Result() // Avoid allocating bytes for an LFS pointer until we know the current blob really // is an LFS pointer. buffer.Reset() // Given that we filter pipeline objects by size, the biggest object we may see here // is 200 bytes in size. So it's not much of a problem to read this into memory // completely. if _, err := io.Copy(buffer, lfsPointer); err != nil { return structerr.NewInternal("reading LFS pointer data: %w", err) } pointer, fileOid, fileSize := git.IsLFSPointer(buffer.Bytes()) if !pointer { continue } objectData := make([]byte, buffer.Len()) copy(objectData, buffer.Bytes()) if err := chunker.Send(&gitalypb.LFSPointer{ Data: objectData, Size: int64(len(objectData)), Oid: lfsPointer.ObjectID().String(), FileOid: fileOid, FileSize: fileSize, }); err != nil { return structerr.NewInternal("sending LFS pointer chunk: %w", err) } i++ if limit > 0 && i >= limit { break } } if err := iter.Err(); err != nil { return structerr.NewInternal("%w", err) } if err := chunker.Flush(); err != nil { return structerr.NewInternal("%w", err) } return nil }