internal/mode/advanced/git/gitaly.go (327 lines of code) (raw):

package git import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "os" "strconv" "strings" "time" logkit "gitlab.com/gitlab-org/labkit/log" "google.golang.org/grpc" "google.golang.org/grpc/credentials" gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" gitalyclient "gitlab.com/gitlab-org/gitaly/v16/client" pb "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "gitlab.com/gitlab-org/labkit/correlation" grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc" ) const ( SubmoduleFileMode = 0160000 // See https://stackoverflow.com/questions/9765453/is-gits-semi-secret-empty-tree-object-reliable-and-why-is-there-not-a-symbolic NullTreeSHA = "4b825dc642cb6eb9a060e54bf8d69288fbee4904" ZeroSHA = "0000000000000000000000000000000000000000" NullTreeSHA256 = "6ef19b41225c5369f1c104d45d8d85efa9b057b53b14b4b9b939dd74decc5321" ZeroSHA256 = "0000000000000000000000000000000000000000000000000000000000000000" FormatSha256 = "OBJECT_FORMAT_SHA256" clientName = "gitlab-elasticsearch-indexer" defaultLimitFileSize int64 = 1024 * 1024 ) type StorageConfig struct { Address string `json:"address"` Token string `json:"token"` StorageName string `json:"storage"` RelativePath string `json:"relative_path"` ProjectPath string `json:"project_path"` LimitFileSize int64 `json:"limit_file_size"` TokenVersion int `json:"token_version"` } type gitalyClient struct { conn *grpc.ClientConn repository *pb.Repository blobServiceClient pb.BlobServiceClient repositoryServiceClient pb.RepositoryServiceClient refServiceClient pb.RefServiceClient commitServiceClient pb.CommitServiceClient ctx context.Context FromHash string ToHash string limitFileSize int64 } func NewGitalyClient(config *StorageConfig, fromSHA, toSHA, correlationID string, projectID int64) (*gitalyClient, error) { var RPCCred credentials.PerRPCCredentials if config.TokenVersion == 0 || config.TokenVersion == 2 { RPCCred = gitalyauth.RPCCredentialsV2(config.Token) } else { return nil, errors.New("Unknown token version") } connOpts := append( gitalyclient.DefaultDialOpts, grpc.WithPerRPCCredentials(RPCCred), grpc.WithStreamInterceptor( grpccorrelation.StreamClientCorrelationInterceptor( grpccorrelation.WithClientName(clientName), ), ), grpc.WithUnaryInterceptor( grpccorrelation.UnaryClientCorrelationInterceptor( grpccorrelation.WithClientName(clientName), ), ), ) ctx := newContext(correlationID) conn, err := gitalyclient.Dial(config.Address, connOpts) if err != nil { return nil, fmt.Errorf("did not connect: %w", err) } repository := &pb.Repository{ StorageName: config.StorageName, RelativePath: config.RelativePath, GlProjectPath: config.ProjectPath, GlRepository: strconv.FormatInt(projectID, 10), } client := &gitalyClient{ conn: conn, repository: repository, blobServiceClient: pb.NewBlobServiceClient(conn), repositoryServiceClient: pb.NewRepositoryServiceClient(conn), refServiceClient: pb.NewRefServiceClient(conn), commitServiceClient: pb.NewCommitServiceClient(conn), ctx: ctx, limitFileSize: config.LimitFileSize, } if fromSHA == "" { client.FromHash = client.getNullSHA() } else if fromSHA == ZeroSHA { client.FromHash = NullTreeSHA } else if fromSHA == ZeroSHA256 { client.FromHash = NullTreeSHA256 } else { client.FromHash = fromSHA } if toSHA == "" { head, err := client.lookUpHEAD() if err != nil { return nil, fmt.Errorf("lookUpHEAD: %w", err) } client.ToHash = head } else { client.ToHash = toSHA } return client, nil } func ReadConfig(repoPath, projectPath string) (*StorageConfig, error) { data := strings.NewReader(os.Getenv("GITALY_CONNECTION_INFO")) config := StorageConfig{ RelativePath: repoPath, ProjectPath: projectPath, LimitFileSize: defaultLimitFileSize, } err := json.NewDecoder(data).Decode(&config) return &config, err } func NewGitalyClientFromEnv(repoPath, fromSHA, toSHA, correlationID string, projectID int64, projectPath string) (*gitalyClient, error) { config, err := ReadConfig(repoPath, projectPath) if err != nil { return nil, err } client, err := NewGitalyClient(config, fromSHA, toSHA, correlationID, projectID) if err != nil { return nil, fmt.Errorf("Failed to open %s: %w", config.RelativePath, err) } return client, nil } func (gc *gitalyClient) Close() { gc.conn.Close() //nolint } func (gc *gitalyClient) EachFileChange(put PutFunc, del DelFunc) error { request := &pb.GetRawChangesRequest{ Repository: gc.repository, FromRevision: gc.FromHash, ToRevision: gc.ToHash, } stream, err := gc.repositoryServiceClient.GetRawChanges(gc.ctx, request) if err != nil { return fmt.Errorf("could not call rpc.GetRawChanges: %w", err) } for { c, err := stream.Recv() if errors.Is(err, io.EOF) { break } if err != nil { return fmt.Errorf("%v.GetRawChanges, %w", c, err) } for _, change := range c.RawChanges { // TODO: We just skip submodules from indexing now just to mirror the go-git // implementation but it can be not that expensive to implement with gitaly actually so some // investigation is required here if change.OldMode == SubmoduleFileMode || change.NewMode == SubmoduleFileMode { continue } switch change.Operation.String() { case "DELETED", "RENAMED": path := string(change.OldPathBytes) logkit.WithFields( logkit.Fields{ "operation": "DELETE", "path": path, }, ).Debug("Indexing blob change") if err = del(path); err != nil { return err } } switch change.Operation.String() { case "ADDED", "RENAMED", "MODIFIED", "COPIED": file, err := gc.gitalyBuildFile(change, string(change.NewPathBytes)) if err != nil { return err } logkit.WithFields( logkit.Fields{ "operation": "PUT", "path": file.Path, }, ).Debug("Indexing blob change") if err = put(file, gc.FromHash, gc.ToHash); err != nil { return err } } } } return nil } func (gc *gitalyClient) getNullSHA() string { request := &pb.ObjectFormatRequest{Repository: gc.repository} response, err := gc.repositoryServiceClient.ObjectFormat(gc.ctx, request) if err != nil { return NullTreeSHA } if response.Format.String() == FormatSha256 { return NullTreeSHA256 } else { return NullTreeSHA } } // HEAD is not always set in some cases, so we find the last commit in // a default branch instead func (gc *gitalyClient) lookUpHEAD() (string, error) { defaultBranchName, err := gc.findDefaultBranchName() if err != nil { return "", err } request := &pb.FindCommitRequest{ Repository: gc.repository, Revision: defaultBranchName, } response, err := gc.commitServiceClient.FindCommit(gc.ctx, request) if err != nil { return "", fmt.Errorf("Cannot look up HEAD: %w", err) } return response.Commit.Id, nil } func (gc *gitalyClient) findDefaultBranchName() ([]byte, error) { request := &pb.FindDefaultBranchNameRequest{ Repository: gc.repository, } response, err := gc.refServiceClient.FindDefaultBranchName(gc.ctx, request) if err != nil { return nil, fmt.Errorf("Cannot find a default branch: %w", err) } return response.Name, nil } func (gc *gitalyClient) getBlob(oid string) (io.ReadCloser, error) { data := new(bytes.Buffer) request := &pb.GetBlobRequest{ Repository: gc.repository, Oid: oid, Limit: gc.limitFileSize, } stream, err := gc.blobServiceClient.GetBlob(gc.ctx, request) if err != nil { return nil, fmt.Errorf("Cannot get blob: %s", oid) } for { c, err := stream.Recv() if errors.Is(err, io.EOF) { break } if err != nil { return nil, fmt.Errorf("%v.GetBlob: %w", c, err) } if c.Data != nil { data.Write(c.Data) } } return io.NopCloser(data), nil } func (gc *gitalyClient) gitalyBuildFile(change *pb.GetRawChangesResponse_RawChange, path string) (*File, error) { var data io.ReadCloser var skipTooLarge bool // We limit the size to avoid loading too big blobs into memory // as they will be rejected on the indexer side anyway // Ideally, we need to create a lazy blob reader here. if change.Size > gc.limitFileSize { data = io.NopCloser(new(bytes.Buffer)) skipTooLarge = true } else { var err error data, err = gc.getBlob(change.BlobId) if err != nil { return nil, fmt.Errorf("getBlob returns error: %w", err) } } return &File{ Path: path, Oid: change.BlobId, Blob: getBlobReader(data), SkipTooLarge: skipTooLarge, }, nil } func getBlobReader(data io.ReadCloser) func() (io.ReadCloser, error) { return func() (io.ReadCloser, error) { return data, nil } } func (gc *gitalyClient) EachCommit(f CommitFunc) error { request := &pb.ListCommitsRequest{ Repository: gc.repository, Revisions: []string{ "^" + gc.FromHash, gc.ToHash, }, Reverse: true, } stream, err := gc.commitServiceClient.ListCommits(gc.ctx, request) if err != nil { return fmt.Errorf("could not call rpc.ListCommits: %w", err) } for { c, err := stream.Recv() if errors.Is(err, io.EOF) { break } if err != nil { return fmt.Errorf("error calling rpc.ListCommits: %w", err) } for _, cmt := range c.Commits { commit := &Commit{ Message: string(cmt.Body), Hash: string(cmt.Id), Author: gitalyBuildSignature(cmt.Author), Committer: gitalyBuildSignature(cmt.Committer), } logkit.WithField("commitID", cmt.Id).Debug("Indexing commit") if err := f(commit); err != nil { return err } } } return nil } func (gc *gitalyClient) GetLimitFileSize() int64 { return gc.limitFileSize } func gitalyBuildSignature(ca *pb.CommitAuthor) Signature { return Signature{ Name: string(ca.Name), Email: string(ca.Email), When: time.Unix(ca.Date.GetSeconds(), 0), // another option is ptypes.Timestamp(ca.Date) } } func newContext(correlationID string) context.Context { return correlation.ContextWithCorrelation(context.Background(), correlationID) }