internal/git/gitpipe/catfile_info.go (245 lines of code) (raw):

package gitpipe import ( "bufio" "bytes" "context" "errors" "fmt" "io" "sync/atomic" "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" ) // CatfileInfoResult is a result for the CatfileInfo pipeline step. type CatfileInfoResult struct { // err is an error which occurred during execution of the pipeline. err error // ObjectName is the object name as received from the revlistResultChan. ObjectName []byte // ObjectInfo provides information about the object. git.ObjectInfo } type catfileInfoConfig struct { skipResult func(*catfile.ObjectInfo) bool diskUsage bool objectType ObjectType } // CatfileInfoOption is an option for the CatfileInfo and CatfileInfoAllObjects pipeline steps. type CatfileInfoOption func(cfg *catfileInfoConfig) // WithSkipCatfileInfoResult will execute the given function for each ObjectInfo processed by the // pipeline. If the callback returns `true`, then the object will be skipped and not passed down the // pipeline. func WithSkipCatfileInfoResult(skipResult func(*catfile.ObjectInfo) bool) CatfileInfoOption { return func(cfg *catfileInfoConfig) { cfg.skipResult = skipResult } } // WithDiskUsageSize will cause the size of the object to be returned to be the // size it takes up on disk. This value will override the existing size field. func WithDiskUsageSize() CatfileInfoOption { return func(cfg *catfileInfoConfig) { cfg.diskUsage = true } } // WithCatfileObjectTypeFilter will set up a `--filter=object:type=` filter for git-cat-file(1). // This will cause it to filter out any objects which do not match the given type. This mode should // only be used with `CatfileInfoAllObjects()`. func WithCatfileObjectTypeFilter(t ObjectType) CatfileInfoOption { return func(cfg *catfileInfoConfig) { cfg.objectType = t } } type catfileInfoRequest struct { objectID git.ObjectID objectName []byte err error } // CatfileInfo processes revlistResults from the given channel and extracts object information via // `git cat-file --batch-check`. The returned channel will contain all processed catfile info // results. Any error received via the channel or encountered in this step will cause the pipeline // to fail. Context cancellation will gracefully halt the pipeline. func CatfileInfo( ctx context.Context, objectInfoReader catfile.ObjectInfoReader, it ObjectIterator, opts ...CatfileInfoOption, ) (CatfileInfoIterator, error) { var cfg catfileInfoConfig for _, opt := range opts { opt(&cfg) } queue, queueCleanup, err := objectInfoReader.Queue(ctx) if err != nil { return nil, err } var queueRefcount int32 = 2 requestChan := make(chan catfileInfoRequest, 32) go func() { defer func() { if atomic.AddInt32(&queueRefcount, -1) == 0 { queueCleanup() } close(requestChan) }() var i int64 for it.Next() { if err := queue.RequestInfo(ctx, it.ObjectID().Revision()); err != nil { sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err}) return } if isDone := sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{ objectID: it.ObjectID(), objectName: it.ObjectName(), }); isDone { // If the context got cancelled, then we need to flush out all // outstanding requests so that the downstream consumer is // unblocked. if err := queue.Flush(ctx); err != nil { sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err}) return } sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: ctx.Err()}) return } i++ if i%int64(cap(requestChan)) == 0 { if err := queue.Flush(ctx); err != nil { sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err}) return } } } if err := it.Err(); err != nil { sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err}) return } if err := queue.Flush(ctx); err != nil { sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err}) return } }() resultChan := make(chan CatfileInfoResult) go func() { defer func() { if atomic.AddInt32(&queueRefcount, -1) == 0 { queueCleanup() } close(resultChan) }() // It's fine to iterate over the request channel without paying attention to // context cancellation because the request channel itself would be closed if the // context was cancelled. for request := range requestChan { if request.err != nil { sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{err: request.err}) break } objectInfo, err := queue.ReadInfo(ctx) if err != nil { sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ err: fmt.Errorf("retrieving object info for %q: %w", request.objectID, err), }) return } if cfg.skipResult != nil && cfg.skipResult(objectInfo) { continue } if isDone := sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ ObjectName: request.objectName, ObjectInfo: objectInfo, }); isDone { return } } }() return &catfileInfoIterator{ ctx: ctx, ch: resultChan, }, nil } // CatfileInfoAllObjects enumerates all Git objects part of the repository's object directory and // extracts their object info via `git cat-file --batch-check`. The returned channel will contain // all processed results. Any error encountered during execution of this pipeline step will cause // the pipeline to fail. Context cancellation will gracefully halt the pipeline. Note that with this // pipeline step, the resulting catfileInfoResults will never have an object name. func CatfileInfoAllObjects( ctx context.Context, repo *localrepo.Repo, opts ...CatfileInfoOption, ) CatfileInfoIterator { var cfg catfileInfoConfig for _, opt := range opts { opt(&cfg) } resultChan := make(chan CatfileInfoResult) go func() { defer close(resultChan) objectHash, err := repo.ObjectHash(ctx) if err != nil { sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ err: fmt.Errorf("detecting object hash: %w", err), }) return } batchCheckOption := gitcmd.Flag{Name: "--batch-check"} if cfg.diskUsage { batchCheckOption.Name = batchCheckOption.Name + fmt.Sprintf("=%%(objectname) %%(objecttype) %%(objectsize:disk)") } options := []gitcmd.Option{ batchCheckOption, gitcmd.Flag{Name: "--batch-all-objects"}, gitcmd.Flag{Name: "--buffer"}, gitcmd.Flag{Name: "--unordered"}, } if featureflag.MailmapOptions.IsEnabled(ctx) { options = append([]gitcmd.Option{gitcmd.Flag{Name: "--use-mailmap"}}, options...) } if cfg.objectType != "" { options = append(options, gitcmd.Flag{Name: fmt.Sprintf("--filter=object:type=%s", cfg.objectType)}) } var stderr bytes.Buffer cmd, err := repo.Exec(ctx, gitcmd.Command{ Name: "cat-file", Flags: options, }, gitcmd.WithStderr(&stderr), gitcmd.WithSetupStdout()) if err != nil { sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ err: fmt.Errorf("spawning cat-file failed: %w", err), }) return } reader := bufio.NewReader(cmd) for { objectInfo, err := catfile.ParseObjectInfo(objectHash, reader, false) if err != nil { if errors.Is(err, io.EOF) { break } sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ err: fmt.Errorf("parsing object info: %w", err), }) return } if cfg.skipResult != nil && cfg.skipResult(objectInfo) { continue } if isDone := sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ ObjectInfo: objectInfo, }); isDone { return } } if err := cmd.Wait(); err != nil { sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ err: fmt.Errorf("cat-file failed: %w, stderr: %q", err, stderr), }) return } }() return &catfileInfoIterator{ ctx: ctx, ch: resultChan, } } func sendCatfileInfoResult(ctx context.Context, ch chan<- CatfileInfoResult, result CatfileInfoResult) bool { // In case the context has been cancelled, we have a race between observing an error from // the killed Git process and observing the context cancellation itself. But if we end up // here because of cancellation of the Git process, we don't want to pass that one down the // pipeline but instead just stop the pipeline gracefully. We thus have this check here up // front to error messages from the Git process. select { case <-ctx.Done(): return true default: } select { case ch <- result: return false case <-ctx.Done(): return true } } func sendCatfileInfoRequest(ctx context.Context, ch chan<- catfileInfoRequest, request catfileInfoRequest) bool { // Please refer to `sendCatfileInfoResult()` for why we treat the context specially. select { case <-ctx.Done(): return true default: } select { case ch <- request: return false case <-ctx.Done(): return true } }