func()

in internal/gitaly/service/blob/blobs.go [85:219]


func (s *server) processBlobs(
	ctx context.Context,
	repo *localrepo.Repo,
	objectIter gitpipe.ObjectIterator,
	catfileInfoIter gitpipe.CatfileInfoIterator,
	blobsLimit uint32,
	bytesLimit int64,
	callback func(oid string, size int64, contents []byte, path []byte) error,
) error {
	// If we have a zero bytes limit, then the caller didn't request any blob contents at all.
	// We can thus skip reading blob contents completely.
	if bytesLimit == 0 {
		// This is a bit untidy, but some callers may already use an object info iterator to
		// enumerate objects, where it thus wouldn't make sense to recreate it via the
		// object iterator. We thus support an optional `catfileInfoIter` parameter: if set,
		// we just use that one and ignore the object iterator.
		if catfileInfoIter == nil {
			objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo)
			if err != nil {
				return structerr.NewInternal("creating object info reader: %w", err)
			}
			defer cancel()

			catfileInfoIter, err = gitpipe.CatfileInfo(ctx, objectInfoReader, objectIter)
			if err != nil {
				return structerr.NewInternal("creating object info iterator: %w", err)
			}
		}

		var i uint32
		for catfileInfoIter.Next() {
			blob := catfileInfoIter.Result()

			if err := callback(
				blob.ObjectID().String(),
				blob.ObjectSize(),
				nil,
				blob.ObjectName,
			); err != nil {
				return structerr.NewInternal("sending blob chunk: %w", err)
			}

			i++
			if blobsLimit > 0 && i >= blobsLimit {
				break
			}
		}

		if err := catfileInfoIter.Err(); err != nil {
			return structerr.NewInternal("%w", err)
		}
	} else {
		objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
		if err != nil {
			return structerr.NewInternal("creating object reader: %w", err)
		}
		defer cancel()

		catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, objectIter)
		if err != nil {
			return structerr.NewInternal("creating object iterator: %w", err)
		}

		var i uint32
		for catfileObjectIter.Next() {
			blob := catfileObjectIter.Result()

			headerSent := false
			dataChunker := streamio.NewWriter(func(p []byte) error {
				var oid string
				var size int64

				if !headerSent {
					oid = blob.ObjectID().String()
					size = blob.ObjectSize()
					headerSent = true
				}

				pCopy := make([]byte, len(p))
				copy(pCopy, p)

				if err := callback(oid, size, pCopy, blob.ObjectName); err != nil {
					return structerr.NewInternal("sending blob chunk: %w", err)
				}

				return nil
			})

			readLimit := bytesLimit
			if readLimit < 0 {
				readLimit = blob.ObjectSize()
			}

			_, err := io.CopyN(dataChunker, blob, readLimit)
			if err != nil && !errors.Is(err, io.EOF) {
				return structerr.NewInternal("sending blob data: %w", err)
			}

			// Discard trailing blob data in case the blob is bigger than the read
			// limit. We only do so in case we haven't yet seen `io.EOF`: if we did,
			// then the object may be closed already.
			if !errors.Is(err, io.EOF) {
				_, err = io.Copy(io.Discard, blob)
				if err != nil {
					return structerr.NewInternal("discarding blob data: %w", err)
				}
			}

			// If we still didn't send any header, then it probably means that the blob
			// itself didn't contain any data. Let's be prepared and send out the blob
			// header manually in that case.
			if !headerSent {
				if err := callback(
					blob.ObjectID().String(),
					blob.ObjectSize(),
					nil,
					blob.ObjectName,
				); err != nil {
					return structerr.NewInternal("sending blob chunk: %w", err)
				}
			}

			i++
			if blobsLimit > 0 && i >= blobsLimit {
				break
			}
		}

		if err := catfileObjectIter.Err(); err != nil {
			return structerr.NewInternal("%w", err)
		}
	}

	return nil
}