func CatfileObject()

in internal/git/gitpipe/catfile_object.go [37:194]


func CatfileObject(
	ctx context.Context,
	objectReader catfile.ObjectContentReader,
	it ObjectIterator,
) (CatfileObjectIterator, error) {
	queue, queueCleanup, err := objectReader.Queue(ctx)
	if err != nil {
		return nil, err
	}
	var queueRefcount int32 = 2

	requestChan := make(chan catfileObjectRequest, 32)
	go func() {
		defer func() {
			if atomic.AddInt32(&queueRefcount, -1) == 0 {
				queueCleanup()
			}
			close(requestChan)
		}()

		sendRequest := func(request catfileObjectRequest) bool {
			// Please refer to `sendResult()` for why we treat the context specially.
			select {
			case <-ctx.Done():
				return true
			default:
			}

			select {
			case requestChan <- request:
				return false
			case <-ctx.Done():
				return true
			}
		}

		var i int64
		for it.Next() {
			if err := queue.RequestObject(ctx, it.ObjectID().Revision()); err != nil {
				sendRequest(catfileObjectRequest{err: err})
				return
			}

			if isDone := sendRequest(catfileObjectRequest{
				objectName: it.ObjectName(),
			}); isDone {
				// If the context got cancelled, then we need to flush out all
				// outstanding requests so that the downstream consumer is
				// unblocked.
				if err := queue.Flush(ctx); err != nil {
					sendRequest(catfileObjectRequest{err: err})
					return
				}

				sendRequest(catfileObjectRequest{err: ctx.Err()})
				return
			}

			i++
			if i%int64(cap(requestChan)) == 0 {
				if err := queue.Flush(ctx); err != nil {
					sendRequest(catfileObjectRequest{err: err})
					return
				}
			}
		}

		if err := it.Err(); err != nil {
			sendRequest(catfileObjectRequest{err: err})
			return
		}

		if err := queue.Flush(ctx); err != nil {
			sendRequest(catfileObjectRequest{err: err})
			return
		}
	}()

	resultChan := make(chan CatfileObjectResult)
	go func() {
		defer func() {
			if atomic.AddInt32(&queueRefcount, -1) == 0 {
				queueCleanup()
			}
			close(resultChan)
		}()

		sendResult := func(result CatfileObjectResult) bool {
			// In case the context has been cancelled, we have a race between observing
			// an error from the killed Git process and observing the context
			// cancellation itself. But if we end up here because of cancellation of the
			// Git process, we don't want to pass that one down the pipeline but instead
			// just stop the pipeline gracefully. We thus have this check here up front
			// to error messages from the Git process.
			select {
			case <-ctx.Done():
				return true
			default:
			}

			select {
			case resultChan <- result:
				return false
			case <-ctx.Done():
				return true
			}
		}

		var previousObject *synchronizingObject

		// It's fine to iterate over the request channel without paying attention to
		// context cancellation because the request channel itself would be closed if the
		// context was cancelled.
		for request := range requestChan {
			if request.err != nil {
				sendResult(CatfileObjectResult{err: request.err})
				break
			}

			// We mustn't try to read another object before reading the previous object
			// has concluded. Given that this is not under our control but under the
			// control of the caller, we thus have to wait until the blocking reader has
			// reached EOF.
			if previousObject != nil {
				select {
				case <-previousObject.doneCh:
				case <-ctx.Done():
					return
				}
			}

			object, err := queue.ReadObject(ctx)
			if err != nil {
				sendResult(CatfileObjectResult{
					err: fmt.Errorf("requesting object: %w", err),
				})
				return
			}

			previousObject = &synchronizingObject{
				Object: object,
				doneCh: make(chan interface{}),
			}

			if isDone := sendResult(CatfileObjectResult{
				ObjectName: request.objectName,
				Object:     previousObject,
			}); isDone {
				return
			}
		}
	}()

	return &catfileObjectIterator{
		ctx: ctx,
		ch:  resultChan,
	}, nil
}