internal/gitaly/service/commit/find_commits.go (327 lines of code) (raw):

package commit import ( "bufio" "bytes" "context" "errors" "fmt" "io" "regexp" "strconv" "strings" "gitlab.com/gitlab-org/gitaly/v16/internal/command" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" "gitlab.com/gitlab-org/gitaly/v16/internal/git/trailerparser" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/chunk" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) var statsPattern = regexp.MustCompile(`\s(\d+)\sfiles? changed(,\s(\d+)\sinsertions?\(\+\))?(,\s(\d+)\sdeletions?\(-\))?`) func validateFindCommitsRequest(ctx context.Context, locator storage.Locator, in *gitalypb.FindCommitsRequest) error { if err := locator.ValidateRepository(ctx, in.GetRepository()); err != nil { return err } if err := git.ValidateRevision(in.GetRevision(), git.AllowEmptyRevision()); err != nil { return err } return nil } func (s *server) FindCommits(req *gitalypb.FindCommitsRequest, stream gitalypb.CommitService_FindCommitsServer) error { ctx := stream.Context() if err := validateFindCommitsRequest(ctx, s.locator, req); err != nil { return structerr.NewInvalidArgument("%w", err) } repo := s.localRepoFactory.Build(req.GetRepository()) // Use Gitaly's default branch lookup function because that is already // migrated. if revision := req.GetRevision(); len(revision) == 0 && !req.GetAll() { defaultBranch, err := repo.GetDefaultBranch(ctx) if err != nil { return structerr.NewInternal("defaultBranchName: %w", err) } req.Revision = []byte(defaultBranch) } // Clients might send empty paths. That is an error for _, path := range req.GetPaths() { if len(path) == 0 { return structerr.NewInvalidArgument("path is empty string") } } if err := s.findCommits(ctx, req, stream); err != nil { return structerr.NewInternal("%w", err) } return nil } func (s *server) findCommits(ctx context.Context, req *gitalypb.FindCommitsRequest, stream gitalypb.CommitService_FindCommitsServer) (err error) { opts := gitcmd.ConvertGlobalOptions(req.GetGlobalOptions()) repo := s.localRepoFactory.Build(req.GetRepository()) var stderr bytes.Buffer gitLogCmd := getLogCommandSubCmd(req) logCmd, err := repo.Exec(ctx, gitLogCmd, append(opts, gitcmd.WithSetupStdout(), gitcmd.WithStderr(&stderr))...) if err != nil { return fmt.Errorf("error when creating git log command: %w", err) } defer func() { if err = logCmd.Wait(); err != nil { err = wrapGitLogCmdError(req.GetRevision(), err, stderr.String()) } }() objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return fmt.Errorf("creating catfile: %w", err) } defer cancel() getCommits := NewGetCommits(logCmd, objectReader, req.GetIncludeShortstat()) if calculateOffsetManually(req) { if err := getCommits.Offset(int(req.GetOffset())); err != nil { // If we're at EOF, then it means that the offset has been greater than the // number of available commits. We do not treat this as an error, but // instead just return EOF ourselves. if errors.Is(err, io.EOF) { return nil } return fmt.Errorf("skipping to offset %d: %w", req.GetOffset(), err) } } if err := streamCommits(getCommits, stream, req.GetTrailers(), req.GetIncludeShortstat(), len(req.GetIncludeReferencedBy()) > 0); err != nil { return fmt.Errorf("error streaming commits: %w", err) } return nil } func calculateOffsetManually(req *gitalypb.FindCommitsRequest) bool { return req.GetFollow() && req.GetOffset() > 0 } // GetCommits wraps a git log command that can be iterated on to get individual commit objects type GetCommits struct { scanner *bufio.Scanner objectReader catfile.ObjectContentReader } // NewGetCommits returns a new GetCommits object func NewGetCommits(cmd *command.Command, objectReader catfile.ObjectContentReader, shortStat bool) *GetCommits { getCommits := &GetCommits{ scanner: bufio.NewScanner(cmd), objectReader: objectReader, } // If include shortstat, the scanner splits commits by special token, // so that the commit hash and stats are combined. if shortStat { getCommits.scanner.Split(splitStat) } return getCommits } // Scan indicates whether or not there are more commits to return func (g *GetCommits) Scan() bool { return g.scanner.Scan() } // Err returns the first non EOF error func (g *GetCommits) Err() error { return g.scanner.Err() } // Offset skips over a number of commits func (g *GetCommits) Offset(offset int) error { for i := 0; i < offset; i++ { if !g.Scan() { err := g.Err() if err == nil { err = io.EOF } return fmt.Errorf("skipping commit: %w", err) } } return nil } // Commit returns the current commit func (g *GetCommits) Commit(ctx context.Context, trailers, shortStat, refs bool) (*gitalypb.GitCommit, error) { logOutput := strings.TrimSpace(g.scanner.Text()) var revAndTrailers []string var revAndStats []string var revAndRefs []string var revision string if shortStat { revAndStats = strings.SplitN(logOutput, "\n", 2) logOutput = revAndStats[0] } if refs { revAndRefs = strings.SplitN(logOutput, "\002", 2) logOutput = revAndRefs[0] } if trailers { revAndTrailers = strings.SplitN(logOutput, "\000", 2) revision = revAndTrailers[0] } else { revision = logOutput } commit, err := catfile.GetCommit(ctx, g.objectReader, git.Revision(revision)) if err != nil { return nil, fmt.Errorf("cat-file get commit %q: %w", revision, err) } if refs && len(revAndRefs) == 2 { commit.ReferencedBy = parseRefs(revAndRefs[1]) } if trailers && len(revAndTrailers) == 2 { commit.Trailers = trailerparser.Parse([]byte(revAndTrailers[1])) } if shortStat && len(revAndStats) == 2 { commit.ShortStats, err = parseStat(revAndStats[1]) if err != nil { return nil, fmt.Errorf("get stats: %w", err) } } return commit.GitCommit, nil } func streamCommits(getCommits *GetCommits, stream gitalypb.CommitService_FindCommitsServer, trailers, shortStat bool, refs bool) error { ctx := stream.Context() chunker := chunk.New(&commitsSender{ send: func(commits []*gitalypb.GitCommit) error { return stream.Send(&gitalypb.FindCommitsResponse{ Commits: commits, }) }, }) for getCommits.Scan() { commit, err := getCommits.Commit(ctx, trailers, shortStat, refs) if err != nil { return err } if err := chunker.Send(commit); err != nil { return err } } if getCommits.Err() != nil { return fmt.Errorf("get commits: %w", getCommits.Err()) } return chunker.Flush() } func getLogCommandSubCmd(req *gitalypb.FindCommitsRequest) gitcmd.Command { logFormatOption := "--format=%H" // To split the commits by '\x01' instead of '\n' if req.GetIncludeShortstat() { logFormatOption = "--format=%x01%H" } if req.GetTrailers() { logFormatOption += "%x00%(trailers:unfold,separator=%x00)" } if len(req.GetIncludeReferencedBy()) > 0 { // Delimit ref names with '\x02' to avoid confusing with trailers logFormatOption += "%x02%D" } subCmd := gitcmd.Command{Name: "log", Flags: []gitcmd.Option{gitcmd.Flag{Name: logFormatOption}}} // We will perform the offset in Go because --follow doesn't play well with --skip. // See: https://gitlab.com/gitlab-org/gitlab-ce/issues/3574#note_3040520 if req.GetOffset() > 0 && !calculateOffsetManually(req) { subCmd.Flags = append(subCmd.Flags, gitcmd.Flag{Name: fmt.Sprintf("--skip=%d", req.GetOffset())}) } limit := req.GetLimit() if calculateOffsetManually(req) { limit += req.GetOffset() } subCmd.Flags = append(subCmd.Flags, gitcmd.Flag{Name: fmt.Sprintf("--max-count=%d", limit)}) if req.GetFollow() && len(req.GetPaths()) > 0 { subCmd.Flags = append(subCmd.Flags, gitcmd.Flag{Name: "--follow"}) } if req.GetAuthor() != nil { subCmd.Flags = append(subCmd.Flags, gitcmd.Flag{Name: fmt.Sprintf("--author=%s", string(req.GetAuthor()))}) } if req.GetSkipMerges() { subCmd.Flags = append(subCmd.Flags, gitcmd.Flag{Name: "--no-merges"}) } if req.GetBefore() != nil { subCmd.Flags = append(subCmd.Flags, gitcmd.Flag{Name: fmt.Sprintf("--before=%s", req.GetBefore().String())}) } if req.GetAfter() != nil { subCmd.Flags = append(subCmd.Flags, gitcmd.Flag{Name: fmt.Sprintf("--after=%s", req.GetAfter().String())}) } if req.GetAll() { subCmd.Flags = append(subCmd.Flags, gitcmd.Flag{Name: "--all"}, gitcmd.Flag{Name: "--reverse"}) } if req.GetRevision() != nil { subCmd.Args = []string{string(req.GetRevision())} } if req.GetFirstParent() { subCmd.Flags = append(subCmd.Flags, gitcmd.Flag{Name: "--first-parent"}) } if len(req.GetPaths()) > 0 { for _, path := range req.GetPaths() { subCmd.PostSepArgs = append(subCmd.PostSepArgs, string(path)) } } if req.GetOrder() == gitalypb.FindCommitsRequest_TOPO { subCmd.Flags = append(subCmd.Flags, gitcmd.Flag{Name: "--topo-order"}) } if req.GetIncludeShortstat() { subCmd.Flags = append(subCmd.Flags, gitcmd.Flag{Name: "--shortstat"}) } if len(req.GetIncludeReferencedBy()) > 0 { subCmd.Flags = append(subCmd.Flags, gitcmd.Flag{Name: "--decorate=full"}) for _, pattern := range req.GetIncludeReferencedBy() { subCmd.Flags = append(subCmd.Flags, gitcmd.Flag{Name: fmt.Sprintf("--decorate-refs=%s", pattern)}) } } return subCmd } func parseRefs(refsLine string) [][]byte { var refs [][]byte for _, ref := range strings.Split(refsLine, ", ") { if ref == "" { continue } // Tags are output as `tag: refs/tags/<name>`. Trim the tag prefix in case // this is a tag. ref = strings.TrimPrefix(ref, "tag: ") // By itself, HEAD is printed as HEAD. If HEAD points to another branch // that is output, for example refs/heads/master, then HEAD printed as // `HEAD -> refs/heads/master`. We need to separate the refs and include // both in the response if this is the case. if leftRef, rightRef, ok := strings.Cut(ref, " -> "); ok { refs = append(refs, []byte(leftRef), []byte(rightRef)) continue } refs = append(refs, []byte(ref)) } return refs } func parseStat(line string) (*gitalypb.CommitStatInfo, error) { statInfo := &gitalypb.CommitStatInfo{} matched := statsPattern.FindStringSubmatch(line) if len(matched) != 6 { return nil, fmt.Errorf("unexpected stats format: %q", line) } fileStr, addStr, delStr := matched[1], matched[3], matched[5] file64, err := strconv.ParseInt(fileStr, 10, 32) if err != nil { return nil, fmt.Errorf("parsing file count: %w", err) } statInfo.ChangedFiles = int32(file64) if len(addStr) > 0 { add64, err := strconv.ParseInt(addStr, 10, 32) if err != nil { return nil, fmt.Errorf("parsing additions: %w", err) } statInfo.Additions = int32(add64) } if len(delStr) > 0 { del64, err := strconv.ParseInt(delStr, 10, 32) if err != nil { return nil, fmt.Errorf("parsing deletions: %w", err) } statInfo.Deletions = int32(del64) } return statInfo, nil } func splitStat(data []byte, atEOF bool) (int, []byte, error) { // If there is no more data to be read then we are fine. if atEOF && len(data) == 0 { return 0, nil, io.EOF } // Commits are separated by `\x01` bytes, so we require each commit to start with it. If // that is not the case we return an error. if !bytes.HasPrefix(data, []byte{'\x01'}) { return 0, nil, fmt.Errorf("expected \\x01 prefix: %q", string(data)) } // Skip the prefix. We only want to return the actual commit's data to the caller. data = data[1:] // We scan until the next `\x01` byte. If there is none, we're either at EOF (in which case // we just return remaining bytes) or we don't have sufficient data. index := bytes.IndexByte(data, '\x01') if index < 0 { if atEOF { return len(data) + 1, data, nil } return 0, nil, nil } return index + 1, data[:index], nil } var ( ambiguousArgRegex = regexp.MustCompile(`fatal: ambiguous argument '.*': unknown revision or path not in the working tree.`) badObjectRegex = regexp.MustCompile(`fatal: bad object [0-9a-g]+`) invalidRevisionRangeRegex = regexp.MustCompile(`fatal: Invalid revision range [0-9a-g]+\.\.[0-9a-g]+`) badRevisionRegex = regexp.MustCompile(`^fatal: bad revision '.*'\n$`) ) // wrapGitError wraps git log error with a structError. func wrapGitLogCmdError(revision []byte, err error, stderr string) structerr.Error { switch { case ambiguousArgRegex.MatchString(stderr): fallthrough case badObjectRegex.MatchString(stderr): fallthrough case badRevisionRegex.MatchString(stderr): fallthrough case invalidRevisionRangeRegex.MatchString(stderr): // for example git log 37811987837aacbd3b1d8ceb8de669b33f7c7c0a..37811987837aacbd3b1d8ceb8de669b33f7c7c0b will cause // fatal: Invalid revision range 37811987837aacbd3b1d8ceb8de669b33f7c7c0a..37811987837aacbd3b1d8ceb8de669b33f7c7c0b return structerr.NewNotFound("commits not found"). WithDetail(&gitalypb.FindCommitsError{}) default: return structerr.NewInternal("listing commits failed").WithMetadata("stderr", stderr) } }