pkg/api/service/service.go (149 lines of code) (raw):
// package service implements the gRPC API declared in ../../../proto/step.proto
package service
import (
"context"
"fmt"
"time"
"gitlab.com/gitlab-org/step-runner/pkg/api/internal/jobs"
"gitlab.com/gitlab-org/step-runner/pkg/api/internal/variables"
"gitlab.com/gitlab-org/step-runner/pkg/internal/syncmap"
"gitlab.com/gitlab-org/step-runner/pkg/runner"
"gitlab.com/gitlab-org/step-runner/proto"
"gitlab.com/gitlab-org/step-runner/schema/v1"
)
type errBadJobID struct{ id string }
func (e *errBadJobID) Error() string { return fmt.Sprintf("no job with id %q", e.id) }
var executeAsync = func(delegate func()) { go delegate() }
type StepRunnerService struct {
proto.StepRunnerServer
stepParser runner.StepParser
env *runner.Environment
jobs *syncmap.SyncMap[string, *jobs.Job]
execute func(delegate func())
runExitWaitTime time.Duration
}
func New(stepParser runner.StepParser, env *runner.Environment, options ...func(*StepRunnerService)) *StepRunnerService {
svc := &StepRunnerService{
stepParser: stepParser,
env: env,
jobs: syncmap.New[string, *jobs.Job](),
execute: executeAsync,
}
for _, option := range options {
option(svc)
}
return svc
}
// Run parses, prepares, and initiates execution of a RunRequest.
func (s *StepRunnerService) Run(ctx context.Context, request *proto.RunRequest) (response *proto.RunResponse, err error) {
if _, ok := s.jobs.Get(request.Id); ok {
return &proto.RunResponse{}, nil
}
specDef, err := s.loadSteps(request.Steps, request)
if err != nil {
return nil, fmt.Errorf("loading step: %w", err)
}
jobOpts := []func(*jobs.Job){}
if s.runExitWaitTime != 0 {
jobOpts = append(jobOpts, jobs.WithRunExitWaitTime(s.runExitWaitTime))
}
job, err := jobs.New(request.Id, specDef.Dir(), jobOpts...)
if err != nil {
return nil, fmt.Errorf("initializing request: %w", err)
}
defer func() {
if err != nil {
job.Close()
}
}()
jobVars, err := variables.Prepare(request.Job, job.TmpDir)
if err != nil {
return nil, fmt.Errorf("preparing environment: %w", err)
}
env, err := runner.GlobalEnvironment(s.env, jobVars)
if err != nil {
return nil, fmt.Errorf("initializing global environment: %w", err)
}
globalCtxEnv := env.AddLexicalScope(request.Env)
stdout, stderr := job.Logs()
globCtx := runner.NewGlobalContext(job.WorkDir, jobVars, globalCtxEnv, stdout, stderr)
params := &runner.Params{}
step, err := s.stepParser.Parse(globCtx, specDef, params, runner.StepDefinedInGitLabJob)
if err != nil {
return nil, fmt.Errorf("failed to start step runner service: %w", err)
}
inputs := params.NewInputsWithDefault(specDef.SpecInputs())
stepsCtx, err := runner.NewStepsContext(globCtx, specDef.Dir(), inputs, globCtx.Env())
if err != nil {
return nil, err
}
// actually execute the steps request
s.jobs.Put(request.Id, job)
s.execute(func() { job.Run(stepsCtx, step) })
return &proto.RunResponse{}, nil
}
func (s *StepRunnerService) loadSteps(stepsStr string, request *proto.RunRequest) (*runner.SpecDefinition, error) {
spec, step, err := schema.ReadSteps(stepsStr)
if err != nil {
return nil, fmt.Errorf("reading steps %q: %w", stepsStr, err)
}
protoSpec, err := spec.Compile()
if err != nil {
return nil, fmt.Errorf("compiling steps: %w", err)
}
protoDef, err := step.Compile()
if err != nil {
return nil, fmt.Errorf("compiling steps: %w", err)
}
dir := request.WorkDir
if request.Job != nil && request.Job.BuildDir != "" {
dir = request.Job.BuildDir
}
return runner.NewSpecDefinition(protoSpec, protoDef, dir), nil
}
func (s *StepRunnerService) Close(ctx context.Context, request *proto.CloseRequest) (*proto.CloseResponse, error) {
job, ok := s.jobs.Get(request.Id)
if !ok {
return &proto.CloseResponse{}, nil
}
job.Close()
s.jobs.Remove(request.Id)
return &proto.CloseResponse{}, nil
}
// toIOWriter can be used to "cast" a func([]byte)(int, error) to an io.Writer.
type toIOWriter func([]byte) (int, error)
func (w toIOWriter) Write(p []byte) (int, error) { return w(p) }
func (s *StepRunnerService) FollowLogs(request *proto.FollowLogsRequest, writer proto.StepRunner_FollowLogsServer) error {
job, ok := s.jobs.Get(request.Id)
if !ok {
return &errBadJobID{id: request.Id}
}
return job.FollowLogs(writer.Context(), request.Offset, toIOWriter(func(p []byte) (int, error) {
err := writer.Send(&proto.FollowLogsResponse{Data: p})
if err != nil {
return 0, err
}
return len(p), nil
}))
}
func (s *StepRunnerService) Status(ctx context.Context, request *proto.StatusRequest) (*proto.StatusResponse, error) {
stats := []*proto.Status{}
if request.Id != "" {
job, ok := s.jobs.Get(request.Id)
if !ok {
return nil, &errBadJobID{id: request.Id}
}
stats = append(stats, job.Status())
} else {
s.jobs.ForEach(func(_ string, j *jobs.Job) {
stats = append(stats, j.Status())
})
}
return &proto.StatusResponse{Jobs: stats}, nil
}
func WithExecutor(executor func(delegate func())) func(service *StepRunnerService) {
return func(service *StepRunnerService) {
service.execute = executor
}
}
func WithJobRunExitWaitTime(waitTime time.Duration) func(service *StepRunnerService) {
return func(service *StepRunnerService) {
service.runExitWaitTime = waitTime
}
}