func()

in internal/gitaly/gitaly.go [216:299]


func (gc *GitalyClient) EachFileChange(put PutFunc, del DelFunc) error {
	var err error
	gc.FromHash, err = determineFromHash(gc)
	if err != nil {
		return fmt.Errorf("determine from hash: %w", err)
	}

	request := &pb.FindChangedPathsRequest{
		Repository: gc.repository,
		Requests: []*pb.FindChangedPathsRequest_Request{{
			Type: &pb.FindChangedPathsRequest_Request_TreeRequest_{
				TreeRequest: &pb.FindChangedPathsRequest_Request_TreeRequest{
					LeftTreeRevision:  gc.FromHash,
					RightTreeRevision: gc.ToHash,
				},
			},
		}},
	}

	ctx, cancel := context.WithCancel(gc.ctx)
	defer cancel()
	stream, err := gc.diffServiceClient.FindChangedPaths(ctx, request)
	if err != nil {
		return fmt.Errorf("find changed paths: %w", err)
	}

	pathsByBlobID := map[string][]string{}

	for {
		c, errFindChangedPathsResp := stream.Recv()
		if errFindChangedPathsResp == io.EOF { //nolint:errorlint
			break
		}
		if errFindChangedPathsResp != nil {
			return fmt.Errorf("recv: %w", errFindChangedPathsResp)
		}
		for _, change := range c.Paths {
			// We skip submodules from indexing now just to mirror the go-git
			// implementation but it can be not that expensive to implement with gitaly actually so some
			// investigation is required here
			if change.OldMode == SubmoduleFileMode || change.NewMode == SubmoduleFileMode {
				continue
			}

			switch change.GetStatus() {
			case pb.ChangedPaths_DELETED:
				if err = del(string(change.Path)); err != nil {
					return fmt.Errorf("del: %w", err)
				}
			case pb.ChangedPaths_RENAMED:
				if err = del(string(change.OldPath)); err != nil {
					return fmt.Errorf("del: %w", err)
				}

				// Fallthrough to index the blob at its new path.
				fallthrough
			case pb.ChangedPaths_ADDED, pb.ChangedPaths_MODIFIED, pb.ChangedPaths_COPIED:
				pathsByBlobID[change.NewBlobId] = append(pathsByBlobID[change.NewBlobId], string(change.Path))
			case pb.ChangedPaths_TYPE_CHANGE:
				slog.Warn("status is not supported to perform indexing", "status", change.GetStatus(), "repoId", gc.repository.GlRepository)
			default:
				slog.Warn("status is not supported to perform indexing", "status", change.GetStatus(), "repoId", gc.repository.GlRepository)
			}
		}
	}

	revisions := make([]string, 0, IndexBatchSize)
	for blobID := range pathsByBlobID {
		revisions = append(revisions, blobID)
		if len(revisions) == IndexBatchSize {
			err = gc.bulkIndex(ctx, revisions, pathsByBlobID, put)
			if err != nil {
				return fmt.Errorf("bulkIndex: %w", err)
			}
			revisions = revisions[:0]
		}

	}
	err = gc.bulkIndex(ctx, revisions, pathsByBlobID, put) // index the last remaining batch
	if err != nil {
		return fmt.Errorf("bulkIndex: %w", err)
	}
	return nil
}