func()

in pkg/cli/job_actions.go [332:501]


func (c *Client) JobStopAction(
	jobID string,
	showProgress bool,
	owner string,
	labels string,
	isForceStop bool,
	jobStopLimit uint32,
	jobStopMaxLimit uint32,
) error {
	var (
		errs             error
		jobsToStop       []*peloton.JobID
		jobStopLabels    []*peloton.Label
		stopJobIDs       []string
		stopFailedJobIDs []string
	)

	jobStopLabelSet := stringset.New()
	if len(labels) > 0 {
		var err error
		jobStopLabels, err = parsePelotonLabels(labels)
		if err != nil {
			return err
		}
		for _, label := range jobStopLabels {
			jobStopLabelSet.Add(label.GetKey() + ":" + label.GetValue())
		}
	}

	if jobID == "" && owner == "" {
		fmt.Printf("Either jobID or owner needs to be provided.\n")
		tabWriter.Flush()
		return nil
	}

	if jobID != "" {
		// check the job status
		jobGetResponse, err := c.jobGet(jobID)
		if err != nil {
			return err
		}

		jobConfig := jobGetResponse.GetJobInfo().GetConfig()
		// If the job doesn't satisfy owner or label constraints, return
		if owner != "" && jobConfig.GetOwningTeam() != owner {
			fmt.Printf("No matching job found\n")
			tabWriter.Flush()
			return nil
		}
		if len(labels) > 0 {
			jobHasStopLabels := false
			for _, label := range jobConfig.Labels {
				if jobStopLabelSet.Contains(strings.TrimSpace(label.GetKey()) +
					":" + strings.TrimSpace(label.GetValue())) {
					jobHasStopLabels = true
					break
				}
			}
			if !jobHasStopLabels {
				fmt.Fprintf(
					tabWriter,
					"No matching job found\n",
				)
				tabWriter.Flush()
				return nil
			}
		}

		if util.IsPelotonJobStateTerminal(
			jobGetResponse.GetJobInfo().GetRuntime().GetState()) {
			fmt.Fprintf(
				tabWriter,
				"Job is in terminal state: %s\n", jobGetResponse.GetJobInfo().
					GetRuntime().GetState().String(),
			)
			tabWriter.Flush()
			return nil
		}

		jobsToStop = append(jobsToStop, &peloton.JobID{Value: jobID})
	} else {
		jobStates := []job.JobState{
			job.JobState_INITIALIZED,
			job.JobState_PENDING,
			job.JobState_RUNNING,
		}
		spec := &job.QuerySpec{
			JobStates: jobStates,
			Owner:     owner,
			Labels:    jobStopLabels,
			Pagination: &query.PaginationSpec{
				Limit:    jobStopLimit,
				MaxLimit: jobStopMaxLimit,
			},
		}
		request := &job.QueryRequest{
			Spec:        spec,
			SummaryOnly: true,
		}

		response, err := c.jobClient.Query(c.ctx, request)
		if err != nil {
			return err
		}

		for _, jobSummary := range response.GetResults() {
			printResponseJSON(jobSummary)
			jobsToStop = append(jobsToStop, jobSummary.GetId())
		}

		if !isForceStop && !askForConfirmation(jobStopConfirmationMessage) {
			return nil
		}
	}

	if len(jobsToStop) == 0 {
		fmt.Fprintf(tabWriter, "No matching job(s) found\n")
		tabWriter.Flush()
		return nil
	}

	for _, jobID := range jobsToStop {
		request := &task.StopRequest{
			JobId: jobID,
		}
		response, err := c.taskClient.Stop(c.ctx, request)

		if err != nil {
			stopFailedJobIDs = append(stopFailedJobIDs,
				jobID.GetValue())
			errs = multierr.Append(errs, err)
		} else {
			stopJobIDs = append(stopJobIDs, jobID.GetValue())
		}

		if showProgress {
			continue
		}

		printTaskStopResponse(response, c.Debug)
		// Retry one more time in case failedInstanceList is non zero
		if len(response.GetInvalidInstanceIds()) > 0 {
			fmt.Fprint(
				tabWriter,
				"Retrying failed tasks",
			)
			response, err = c.taskClient.Stop(c.ctx, request)
			if err != nil {
				errs = multierr.Append(errs, err)
				continue
			}
			printTaskStopResponse(response, c.Debug)
		}
	}

	if showProgress {
		for _, jobID := range jobsToStop {
			if err := c.pollStatusWithTimeout(jobID); err != nil {
				errs = multierr.Append(errs, err)
			}
		}
	}

	fmt.Fprintf(tabWriter, "Stopping jobs: %v\n", stopJobIDs)
	if len(stopFailedJobIDs) != 0 {
		fmt.Fprintf(tabWriter, "Error stopping jobs: %v\n", stopFailedJobIDs)
	}
	tabWriter.Flush()
	return errs
}