pkg/api/service/service.go (149 lines of code) (raw):

// package service implements the gRPC API declared in ../../../proto/step.proto package service import ( "context" "fmt" "time" "gitlab.com/gitlab-org/step-runner/pkg/api/internal/jobs" "gitlab.com/gitlab-org/step-runner/pkg/api/internal/variables" "gitlab.com/gitlab-org/step-runner/pkg/internal/syncmap" "gitlab.com/gitlab-org/step-runner/pkg/runner" "gitlab.com/gitlab-org/step-runner/proto" "gitlab.com/gitlab-org/step-runner/schema/v1" ) type errBadJobID struct{ id string } func (e *errBadJobID) Error() string { return fmt.Sprintf("no job with id %q", e.id) } var executeAsync = func(delegate func()) { go delegate() } type StepRunnerService struct { proto.StepRunnerServer stepParser runner.StepParser env *runner.Environment jobs *syncmap.SyncMap[string, *jobs.Job] execute func(delegate func()) runExitWaitTime time.Duration } func New(stepParser runner.StepParser, env *runner.Environment, options ...func(*StepRunnerService)) *StepRunnerService { svc := &StepRunnerService{ stepParser: stepParser, env: env, jobs: syncmap.New[string, *jobs.Job](), execute: executeAsync, } for _, option := range options { option(svc) } return svc } // Run parses, prepares, and initiates execution of a RunRequest. func (s *StepRunnerService) Run(ctx context.Context, request *proto.RunRequest) (response *proto.RunResponse, err error) { if _, ok := s.jobs.Get(request.Id); ok { return &proto.RunResponse{}, nil } specDef, err := s.loadSteps(request.Steps, request) if err != nil { return nil, fmt.Errorf("loading step: %w", err) } jobOpts := []func(*jobs.Job){} if s.runExitWaitTime != 0 { jobOpts = append(jobOpts, jobs.WithRunExitWaitTime(s.runExitWaitTime)) } job, err := jobs.New(request.Id, specDef.Dir(), jobOpts...) if err != nil { return nil, fmt.Errorf("initializing request: %w", err) } defer func() { if err != nil { job.Close() } }() jobVars, err := variables.Prepare(request.Job, job.TmpDir) if err != nil { return nil, fmt.Errorf("preparing environment: %w", err) } env, err := runner.GlobalEnvironment(s.env, jobVars) if err != nil { return nil, fmt.Errorf("initializing global environment: %w", err) } globalCtxEnv := env.AddLexicalScope(request.Env) stdout, stderr := job.Logs() globCtx := runner.NewGlobalContext(job.WorkDir, jobVars, globalCtxEnv, stdout, stderr) params := &runner.Params{} step, err := s.stepParser.Parse(globCtx, specDef, params, runner.StepDefinedInGitLabJob) if err != nil { return nil, fmt.Errorf("failed to start step runner service: %w", err) } inputs := params.NewInputsWithDefault(specDef.SpecInputs()) stepsCtx, err := runner.NewStepsContext(globCtx, specDef.Dir(), inputs, globCtx.Env()) if err != nil { return nil, err } // actually execute the steps request s.jobs.Put(request.Id, job) s.execute(func() { job.Run(stepsCtx, step) }) return &proto.RunResponse{}, nil } func (s *StepRunnerService) loadSteps(stepsStr string, request *proto.RunRequest) (*runner.SpecDefinition, error) { spec, step, err := schema.ReadSteps(stepsStr) if err != nil { return nil, fmt.Errorf("reading steps %q: %w", stepsStr, err) } protoSpec, err := spec.Compile() if err != nil { return nil, fmt.Errorf("compiling steps: %w", err) } protoDef, err := step.Compile() if err != nil { return nil, fmt.Errorf("compiling steps: %w", err) } dir := request.WorkDir if request.Job != nil && request.Job.BuildDir != "" { dir = request.Job.BuildDir } return runner.NewSpecDefinition(protoSpec, protoDef, dir), nil } func (s *StepRunnerService) Close(ctx context.Context, request *proto.CloseRequest) (*proto.CloseResponse, error) { job, ok := s.jobs.Get(request.Id) if !ok { return &proto.CloseResponse{}, nil } job.Close() s.jobs.Remove(request.Id) return &proto.CloseResponse{}, nil } // toIOWriter can be used to "cast" a func([]byte)(int, error) to an io.Writer. type toIOWriter func([]byte) (int, error) func (w toIOWriter) Write(p []byte) (int, error) { return w(p) } func (s *StepRunnerService) FollowLogs(request *proto.FollowLogsRequest, writer proto.StepRunner_FollowLogsServer) error { job, ok := s.jobs.Get(request.Id) if !ok { return &errBadJobID{id: request.Id} } return job.FollowLogs(writer.Context(), request.Offset, toIOWriter(func(p []byte) (int, error) { err := writer.Send(&proto.FollowLogsResponse{Data: p}) if err != nil { return 0, err } return len(p), nil })) } func (s *StepRunnerService) Status(ctx context.Context, request *proto.StatusRequest) (*proto.StatusResponse, error) { stats := []*proto.Status{} if request.Id != "" { job, ok := s.jobs.Get(request.Id) if !ok { return nil, &errBadJobID{id: request.Id} } stats = append(stats, job.Status()) } else { s.jobs.ForEach(func(_ string, j *jobs.Job) { stats = append(stats, j.Status()) }) } return &proto.StatusResponse{Jobs: stats}, nil } func WithExecutor(executor func(delegate func())) func(service *StepRunnerService) { return func(service *StepRunnerService) { service.execute = executor } } func WithJobRunExitWaitTime(waitTime time.Duration) func(service *StepRunnerService) { return func(service *StepRunnerService) { service.runExitWaitTime = waitTime } }