pkg/api/internal/jobs/jobs.go (169 lines of code) (raw):

// package jobs implements package jobs import ( "context" "errors" "fmt" "io" "log" "os" "path" "strings" "sync" "time" "google.golang.org/protobuf/types/known/timestamppb" "gitlab.com/gitlab-org/step-runner/pkg/api/internal/streamer/file" "gitlab.com/gitlab-org/step-runner/pkg/runner" "gitlab.com/gitlab-org/step-runner/proto" ) type Job struct { TmpDir string WorkDir string Ctx context.Context // The context used to manage the Job's entire lifetime. ID string // The ID of the job to run/being run. Must be unique. Typically this will be the CI job ID. cancel func() // Used to cancel the Ctx. err error // Captures any error returned when executing steps. startTime time.Time // time when the job finished execution. finishTime time.Time // time when the job finished execution. mux sync.RWMutex finishC chan struct{} runOnce sync.Once closeOnce sync.Once logs *file.Streamer status proto.StepResult_Status runExitWaitTime time.Duration } func New(jobID, workDir string, options ...func(*Job)) (*Job, error) { if workDir == "" { osWorkDir, err := os.Getwd() if err != nil { return nil, fmt.Errorf("could not determine working directory: %w", err) } workDir = osWorkDir } if err := os.MkdirAll(workDir, 0700); err != nil { return nil, fmt.Errorf("creating workdir %q: %w", workDir, err) } // TODO: add job timeout to RunRequest and hook it up here ctx, cancel := context.WithCancel(context.Background()) job := &Job{ TmpDir: "", WorkDir: workDir, ID: jobID, Ctx: ctx, cancel: cancel, logs: nil, status: proto.StepResult_unspecified, finishC: make(chan struct{}, 1), runExitWaitTime: time.Second * 2, } for _, opt := range options { opt(job) } if job.logs == nil { job.TmpDir = path.Join(os.TempDir(), "step-runner-output-"+jobID) if err := os.MkdirAll(job.TmpDir, 0700); err != nil { return nil, fmt.Errorf("creating tmpdir %q: %w", job.TmpDir, err) } logs, err := file.New(path.Join(job.TmpDir, "logs")) if err != nil { _ = os.RemoveAll(job.TmpDir) return nil, fmt.Errorf("creating log file: %w", err) } job.logs = logs } return job, nil } // Logs returns a pair of io.Writers corresponding to the Job's stdout and stderr (in that order). func (j *Job) Logs() (io.Writer, io.Writer) { return j.logs, j.logs } // Run actually starts execution of the steps request and captures the result. It is intended to be run in a // goroutine. func (j *Job) Run(stepsCtx *runner.StepsContext, step runner.Step) { j.runOnce.Do(func() { defer func() { stepsCtx.Cleanup() _ = j.logs.Close() j.finishC <- struct{}{} }() j.mux.Lock() j.startTime = time.Now() j.status = proto.StepResult_running j.mux.Unlock() result, err := step.Run(j.Ctx, stepsCtx) j.onRunCompletion(result, err) if j.err != nil { log.Printf("an error occurred executing job %q: %s", j.ID, err.Error()) } }) } func (j *Job) onRunCompletion(stepResult *proto.StepResult, err error) { j.mux.Lock() defer j.mux.Unlock() j.finishTime = time.Now() switch stepResult.Status { case proto.StepResult_unspecified: j.err = fmt.Errorf("job %q did not start running: %w", j.ID, err) j.status = proto.StepResult_failure case proto.StepResult_running: j.err = fmt.Errorf("job %q did not finish running: %w", j.ID, err) j.status = proto.StepResult_failure case proto.StepResult_failure: // When a job is cancelled (by calling `Job.Close()`) or times out (both of // which cancel the context passed to `exec.CommandContext()`), the returned // error can: // * be one of context.Cancelled or context.DeadlineExceeded. // * be another error type that ends with the string "signal: killed". // // In both cases the `StepResult_Status` returned by `Step.Run()` is // `failure`, but we want it to be `cancelled`. Since the latter can also // happen when the process is otherwise killed (e.g. OOM killer), so we // have to also check that the context was actually cancelled. if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || (j.Ctx.Err() != nil && strings.HasSuffix(err.Error(), "signal: killed")) { j.err = fmt.Errorf("job %q cancelled: %w", j.ID, err) j.status = proto.StepResult_cancelled return } fallthrough default: j.err = err j.status = stepResult.Status } } // Close() cancels jobs (if still running) and cleans up all resources associated with managing the job. func (j *Job) Close() { j.closeOnce.Do(func() { j.cancel() // block until Run has exited select { case <-j.finishC: case <-time.NewTimer(j.runExitWaitTime).C: // A caller called close without first calling Run... 2 seconds ought to be enough for exec.Cmd.Run() to // return... j.mux.Lock() defer j.mux.Unlock() if j.status == proto.StepResult_unspecified { j.status = proto.StepResult_cancelled } if j.err == nil { j.err = context.Canceled } } _ = os.RemoveAll(j.TmpDir) }) } // FollowLogs writes the stdout/stderr captured from running steps to the supplied writer. // The function blocks until the steps are run and logs are written to the writer. // Errors returned are indicative of failure to write to the writer, not failure of running steps. func (j *Job) FollowLogs(ctx context.Context, offset int64, writer io.Writer) error { if err := j.logs.Follow(ctx, offset, writer); err != nil { return fmt.Errorf("following logs for job %q: %w", j.ID, err) } return nil } // Status returns a proto.Status objected representing the current status of the job. If the job has not Finished, some // fields may be empty/nil. func (j *Job) Status() *proto.Status { j.mux.RLock() defer j.mux.RUnlock() st := proto.Status{ Id: j.ID, Status: j.status, } if !j.startTime.IsZero() { st.StartTime = timestamppb.New(j.startTime) } if !j.finishTime.IsZero() { st.EndTime = timestamppb.New(j.finishTime) } if j.err != nil { st.Message = j.err.Error() } return &st } func WithRunExitWaitTime(waitTime time.Duration) func(*Job) { return func(j *Job) { j.runExitWaitTime = waitTime } } func WithLogs(logs *file.Streamer) func(*Job) { return func(j *Job) { j.logs = logs } }