in internal/git/gitpipe/catfile_object.go [37:194]
func CatfileObject(
ctx context.Context,
objectReader catfile.ObjectContentReader,
it ObjectIterator,
) (CatfileObjectIterator, error) {
queue, queueCleanup, err := objectReader.Queue(ctx)
if err != nil {
return nil, err
}
var queueRefcount int32 = 2
requestChan := make(chan catfileObjectRequest, 32)
go func() {
defer func() {
if atomic.AddInt32(&queueRefcount, -1) == 0 {
queueCleanup()
}
close(requestChan)
}()
sendRequest := func(request catfileObjectRequest) bool {
// Please refer to `sendResult()` for why we treat the context specially.
select {
case <-ctx.Done():
return true
default:
}
select {
case requestChan <- request:
return false
case <-ctx.Done():
return true
}
}
var i int64
for it.Next() {
if err := queue.RequestObject(ctx, it.ObjectID().Revision()); err != nil {
sendRequest(catfileObjectRequest{err: err})
return
}
if isDone := sendRequest(catfileObjectRequest{
objectName: it.ObjectName(),
}); isDone {
// If the context got cancelled, then we need to flush out all
// outstanding requests so that the downstream consumer is
// unblocked.
if err := queue.Flush(ctx); err != nil {
sendRequest(catfileObjectRequest{err: err})
return
}
sendRequest(catfileObjectRequest{err: ctx.Err()})
return
}
i++
if i%int64(cap(requestChan)) == 0 {
if err := queue.Flush(ctx); err != nil {
sendRequest(catfileObjectRequest{err: err})
return
}
}
}
if err := it.Err(); err != nil {
sendRequest(catfileObjectRequest{err: err})
return
}
if err := queue.Flush(ctx); err != nil {
sendRequest(catfileObjectRequest{err: err})
return
}
}()
resultChan := make(chan CatfileObjectResult)
go func() {
defer func() {
if atomic.AddInt32(&queueRefcount, -1) == 0 {
queueCleanup()
}
close(resultChan)
}()
sendResult := func(result CatfileObjectResult) bool {
// In case the context has been cancelled, we have a race between observing
// an error from the killed Git process and observing the context
// cancellation itself. But if we end up here because of cancellation of the
// Git process, we don't want to pass that one down the pipeline but instead
// just stop the pipeline gracefully. We thus have this check here up front
// to error messages from the Git process.
select {
case <-ctx.Done():
return true
default:
}
select {
case resultChan <- result:
return false
case <-ctx.Done():
return true
}
}
var previousObject *synchronizingObject
// It's fine to iterate over the request channel without paying attention to
// context cancellation because the request channel itself would be closed if the
// context was cancelled.
for request := range requestChan {
if request.err != nil {
sendResult(CatfileObjectResult{err: request.err})
break
}
// We mustn't try to read another object before reading the previous object
// has concluded. Given that this is not under our control but under the
// control of the caller, we thus have to wait until the blocking reader has
// reached EOF.
if previousObject != nil {
select {
case <-previousObject.doneCh:
case <-ctx.Done():
return
}
}
object, err := queue.ReadObject(ctx)
if err != nil {
sendResult(CatfileObjectResult{
err: fmt.Errorf("requesting object: %w", err),
})
return
}
previousObject = &synchronizingObject{
Object: object,
doneCh: make(chan interface{}),
}
if isDone := sendResult(CatfileObjectResult{
ObjectName: request.objectName,
Object: previousObject,
}); isDone {
return
}
}
}()
return &catfileObjectIterator{
ctx: ctx,
ch: resultChan,
}, nil
}