in internal/gitaly/service/blob/blobs.go [85:219]
func (s *server) processBlobs(
ctx context.Context,
repo *localrepo.Repo,
objectIter gitpipe.ObjectIterator,
catfileInfoIter gitpipe.CatfileInfoIterator,
blobsLimit uint32,
bytesLimit int64,
callback func(oid string, size int64, contents []byte, path []byte) error,
) error {
// If we have a zero bytes limit, then the caller didn't request any blob contents at all.
// We can thus skip reading blob contents completely.
if bytesLimit == 0 {
// This is a bit untidy, but some callers may already use an object info iterator to
// enumerate objects, where it thus wouldn't make sense to recreate it via the
// object iterator. We thus support an optional `catfileInfoIter` parameter: if set,
// we just use that one and ignore the object iterator.
if catfileInfoIter == nil {
objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo)
if err != nil {
return structerr.NewInternal("creating object info reader: %w", err)
}
defer cancel()
catfileInfoIter, err = gitpipe.CatfileInfo(ctx, objectInfoReader, objectIter)
if err != nil {
return structerr.NewInternal("creating object info iterator: %w", err)
}
}
var i uint32
for catfileInfoIter.Next() {
blob := catfileInfoIter.Result()
if err := callback(
blob.ObjectID().String(),
blob.ObjectSize(),
nil,
blob.ObjectName,
); err != nil {
return structerr.NewInternal("sending blob chunk: %w", err)
}
i++
if blobsLimit > 0 && i >= blobsLimit {
break
}
}
if err := catfileInfoIter.Err(); err != nil {
return structerr.NewInternal("%w", err)
}
} else {
objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return structerr.NewInternal("creating object reader: %w", err)
}
defer cancel()
catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, objectIter)
if err != nil {
return structerr.NewInternal("creating object iterator: %w", err)
}
var i uint32
for catfileObjectIter.Next() {
blob := catfileObjectIter.Result()
headerSent := false
dataChunker := streamio.NewWriter(func(p []byte) error {
var oid string
var size int64
if !headerSent {
oid = blob.ObjectID().String()
size = blob.ObjectSize()
headerSent = true
}
pCopy := make([]byte, len(p))
copy(pCopy, p)
if err := callback(oid, size, pCopy, blob.ObjectName); err != nil {
return structerr.NewInternal("sending blob chunk: %w", err)
}
return nil
})
readLimit := bytesLimit
if readLimit < 0 {
readLimit = blob.ObjectSize()
}
_, err := io.CopyN(dataChunker, blob, readLimit)
if err != nil && !errors.Is(err, io.EOF) {
return structerr.NewInternal("sending blob data: %w", err)
}
// Discard trailing blob data in case the blob is bigger than the read
// limit. We only do so in case we haven't yet seen `io.EOF`: if we did,
// then the object may be closed already.
if !errors.Is(err, io.EOF) {
_, err = io.Copy(io.Discard, blob)
if err != nil {
return structerr.NewInternal("discarding blob data: %w", err)
}
}
// If we still didn't send any header, then it probably means that the blob
// itself didn't contain any data. Let's be prepared and send out the blob
// header manually in that case.
if !headerSent {
if err := callback(
blob.ObjectID().String(),
blob.ObjectSize(),
nil,
blob.ObjectName,
); err != nil {
return structerr.NewInternal("sending blob chunk: %w", err)
}
}
i++
if blobsLimit > 0 && i >= blobsLimit {
break
}
}
if err := catfileObjectIter.Err(); err != nil {
return structerr.NewInternal("%w", err)
}
}
return nil
}