internal/gitaly/service/blob/blobs.go (242 lines of code) (raw):

package blob import ( "context" "errors" "io" "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitpipe" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/chunk" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v16/streamio" "google.golang.org/protobuf/proto" ) func verifyListBlobsRequest(ctx context.Context, locator storage.Locator, req *gitalypb.ListBlobsRequest) error { if err := locator.ValidateRepository(ctx, req.GetRepository()); err != nil { return err } if len(req.GetRevisions()) == 0 { return errors.New("missing revisions") } for _, revision := range req.GetRevisions() { if err := git.ValidateRevision([]byte(revision), git.AllowPathScopedRevision(), git.AllowPseudoRevision()); err != nil { return structerr.NewInvalidArgument("invalid revision: %w", err).WithMetadata("revision", revision) } } return nil } // ListBlobs finds all blobs which are transitively reachable via a graph walk of the given set of // revisions. func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobService_ListBlobsServer) error { if err := verifyListBlobsRequest(stream.Context(), s.locator, req); err != nil { return structerr.NewInvalidArgument("%w", err) } ctx := stream.Context() repo := s.localRepoFactory.Build(req.GetRepository()) chunker := chunk.New(&blobSender{ send: func(blobs []*gitalypb.ListBlobsResponse_Blob) error { return stream.Send(&gitalypb.ListBlobsResponse{ Blobs: blobs, }) }, }) revlistOptions := []gitpipe.RevlistOption{ gitpipe.WithObjects(), gitpipe.WithObjectTypeFilter(gitpipe.ObjectTypeBlob), } revlistIter := gitpipe.Revlist(ctx, repo, req.GetRevisions(), revlistOptions...) if err := s.processBlobs(ctx, repo, revlistIter, nil, req.GetLimit(), req.GetBytesLimit(), func(oid string, size int64, contents []byte, path []byte) error { if !req.GetWithPaths() { path = nil } return chunker.Send(&gitalypb.ListBlobsResponse_Blob{ Oid: oid, Size: size, Data: contents, Path: path, }) }, ); err != nil { return structerr.NewInternal("processing blobs: %w", err) } if err := chunker.Flush(); err != nil { return structerr.NewInternal("%w", err) } return nil } func (s *server) processBlobs( ctx context.Context, repo *localrepo.Repo, objectIter gitpipe.ObjectIterator, catfileInfoIter gitpipe.CatfileInfoIterator, blobsLimit uint32, bytesLimit int64, callback func(oid string, size int64, contents []byte, path []byte) error, ) error { // If we have a zero bytes limit, then the caller didn't request any blob contents at all. // We can thus skip reading blob contents completely. if bytesLimit == 0 { // This is a bit untidy, but some callers may already use an object info iterator to // enumerate objects, where it thus wouldn't make sense to recreate it via the // object iterator. We thus support an optional `catfileInfoIter` parameter: if set, // we just use that one and ignore the object iterator. if catfileInfoIter == nil { objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo) if err != nil { return structerr.NewInternal("creating object info reader: %w", err) } defer cancel() catfileInfoIter, err = gitpipe.CatfileInfo(ctx, objectInfoReader, objectIter) if err != nil { return structerr.NewInternal("creating object info iterator: %w", err) } } var i uint32 for catfileInfoIter.Next() { blob := catfileInfoIter.Result() if err := callback( blob.ObjectID().String(), blob.ObjectSize(), nil, blob.ObjectName, ); err != nil { return structerr.NewInternal("sending blob chunk: %w", err) } i++ if blobsLimit > 0 && i >= blobsLimit { break } } if err := catfileInfoIter.Err(); err != nil { return structerr.NewInternal("%w", err) } } else { objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return structerr.NewInternal("creating object reader: %w", err) } defer cancel() catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, objectIter) if err != nil { return structerr.NewInternal("creating object iterator: %w", err) } var i uint32 for catfileObjectIter.Next() { blob := catfileObjectIter.Result() headerSent := false dataChunker := streamio.NewWriter(func(p []byte) error { var oid string var size int64 if !headerSent { oid = blob.ObjectID().String() size = blob.ObjectSize() headerSent = true } pCopy := make([]byte, len(p)) copy(pCopy, p) if err := callback(oid, size, pCopy, blob.ObjectName); err != nil { return structerr.NewInternal("sending blob chunk: %w", err) } return nil }) readLimit := bytesLimit if readLimit < 0 { readLimit = blob.ObjectSize() } _, err := io.CopyN(dataChunker, blob, readLimit) if err != nil && !errors.Is(err, io.EOF) { return structerr.NewInternal("sending blob data: %w", err) } // Discard trailing blob data in case the blob is bigger than the read // limit. We only do so in case we haven't yet seen `io.EOF`: if we did, // then the object may be closed already. if !errors.Is(err, io.EOF) { _, err = io.Copy(io.Discard, blob) if err != nil { return structerr.NewInternal("discarding blob data: %w", err) } } // If we still didn't send any header, then it probably means that the blob // itself didn't contain any data. Let's be prepared and send out the blob // header manually in that case. if !headerSent { if err := callback( blob.ObjectID().String(), blob.ObjectSize(), nil, blob.ObjectName, ); err != nil { return structerr.NewInternal("sending blob chunk: %w", err) } } i++ if blobsLimit > 0 && i >= blobsLimit { break } } if err := catfileObjectIter.Err(); err != nil { return structerr.NewInternal("%w", err) } } return nil } type blobSender struct { blobs []*gitalypb.ListBlobsResponse_Blob send func([]*gitalypb.ListBlobsResponse_Blob) error } func (t *blobSender) Reset() { t.blobs = t.blobs[:0] } func (t *blobSender) Append(m proto.Message) { t.blobs = append(t.blobs, m.(*gitalypb.ListBlobsResponse_Blob)) } func (t *blobSender) Send() error { return t.send(t.blobs) } // ListAllBlobs finds all blobs which exist in the repository, including those which are not // reachable via graph walks. func (s *server) ListAllBlobs(req *gitalypb.ListAllBlobsRequest, stream gitalypb.BlobService_ListAllBlobsServer) error { ctx := stream.Context() repository := req.GetRepository() if err := s.locator.ValidateRepository(stream.Context(), repository); err != nil { return err } repo := s.localRepoFactory.Build(repository) gitVersion, err := repo.GitVersion(ctx) if err != nil { return structerr.NewInternal("detecting availability of object type filter: %w", err) } chunker := chunk.New(&allBlobsSender{ send: func(blobs []*gitalypb.ListAllBlobsResponse_Blob) error { return stream.Send(&gitalypb.ListAllBlobsResponse{ Blobs: blobs, }) }, }) catfileInfoOptions := []gitpipe.CatfileInfoOption{ gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool { return objectInfo.Type != "blob" }), } if featureflag.CatfileObjectTypeFilter.IsEnabled(ctx) && gitVersion.IsCatfileObjectTypeFilterSupported() { catfileInfoOptions = append(catfileInfoOptions, gitpipe.WithCatfileObjectTypeFilter(gitpipe.ObjectTypeBlob)) } catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo, catfileInfoOptions...) if err := s.processBlobs(ctx, repo, catfileInfoIter, catfileInfoIter, req.GetLimit(), req.GetBytesLimit(), func(oid string, size int64, contents []byte, path []byte) error { return chunker.Send(&gitalypb.ListAllBlobsResponse_Blob{ Oid: oid, Size: size, Data: contents, }) }, ); err != nil { return structerr.NewInternal("processing blobs: %w", err) } if err := chunker.Flush(); err != nil { return structerr.NewInternal("flushing blobs: %w", err) } return nil } type allBlobsSender struct { blobs []*gitalypb.ListAllBlobsResponse_Blob send func([]*gitalypb.ListAllBlobsResponse_Blob) error } func (t *allBlobsSender) Reset() { t.blobs = t.blobs[:0] } func (t *allBlobsSender) Append(m proto.Message) { t.blobs = append(t.blobs, m.(*gitalypb.ListAllBlobsResponse_Blob)) } func (t *allBlobsSender) Send() error { return t.send(t.blobs) }