in internal/gitaly/gitaly.go [216:299]
func (gc *GitalyClient) EachFileChange(put PutFunc, del DelFunc) error {
var err error
gc.FromHash, err = determineFromHash(gc)
if err != nil {
return fmt.Errorf("determine from hash: %w", err)
}
request := &pb.FindChangedPathsRequest{
Repository: gc.repository,
Requests: []*pb.FindChangedPathsRequest_Request{{
Type: &pb.FindChangedPathsRequest_Request_TreeRequest_{
TreeRequest: &pb.FindChangedPathsRequest_Request_TreeRequest{
LeftTreeRevision: gc.FromHash,
RightTreeRevision: gc.ToHash,
},
},
}},
}
ctx, cancel := context.WithCancel(gc.ctx)
defer cancel()
stream, err := gc.diffServiceClient.FindChangedPaths(ctx, request)
if err != nil {
return fmt.Errorf("find changed paths: %w", err)
}
pathsByBlobID := map[string][]string{}
for {
c, errFindChangedPathsResp := stream.Recv()
if errFindChangedPathsResp == io.EOF { //nolint:errorlint
break
}
if errFindChangedPathsResp != nil {
return fmt.Errorf("recv: %w", errFindChangedPathsResp)
}
for _, change := range c.Paths {
// We skip submodules from indexing now just to mirror the go-git
// implementation but it can be not that expensive to implement with gitaly actually so some
// investigation is required here
if change.OldMode == SubmoduleFileMode || change.NewMode == SubmoduleFileMode {
continue
}
switch change.GetStatus() {
case pb.ChangedPaths_DELETED:
if err = del(string(change.Path)); err != nil {
return fmt.Errorf("del: %w", err)
}
case pb.ChangedPaths_RENAMED:
if err = del(string(change.OldPath)); err != nil {
return fmt.Errorf("del: %w", err)
}
// Fallthrough to index the blob at its new path.
fallthrough
case pb.ChangedPaths_ADDED, pb.ChangedPaths_MODIFIED, pb.ChangedPaths_COPIED:
pathsByBlobID[change.NewBlobId] = append(pathsByBlobID[change.NewBlobId], string(change.Path))
case pb.ChangedPaths_TYPE_CHANGE:
slog.Warn("status is not supported to perform indexing", "status", change.GetStatus(), "repoId", gc.repository.GlRepository)
default:
slog.Warn("status is not supported to perform indexing", "status", change.GetStatus(), "repoId", gc.repository.GlRepository)
}
}
}
revisions := make([]string, 0, IndexBatchSize)
for blobID := range pathsByBlobID {
revisions = append(revisions, blobID)
if len(revisions) == IndexBatchSize {
err = gc.bulkIndex(ctx, revisions, pathsByBlobID, put)
if err != nil {
return fmt.Errorf("bulkIndex: %w", err)
}
revisions = revisions[:0]
}
}
err = gc.bulkIndex(ctx, revisions, pathsByBlobID, put) // index the last remaining batch
if err != nil {
return fmt.Errorf("bulkIndex: %w", err)
}
return nil
}