in pkg/cli/job_actions.go [332:501]
func (c *Client) JobStopAction(
jobID string,
showProgress bool,
owner string,
labels string,
isForceStop bool,
jobStopLimit uint32,
jobStopMaxLimit uint32,
) error {
var (
errs error
jobsToStop []*peloton.JobID
jobStopLabels []*peloton.Label
stopJobIDs []string
stopFailedJobIDs []string
)
jobStopLabelSet := stringset.New()
if len(labels) > 0 {
var err error
jobStopLabels, err = parsePelotonLabels(labels)
if err != nil {
return err
}
for _, label := range jobStopLabels {
jobStopLabelSet.Add(label.GetKey() + ":" + label.GetValue())
}
}
if jobID == "" && owner == "" {
fmt.Printf("Either jobID or owner needs to be provided.\n")
tabWriter.Flush()
return nil
}
if jobID != "" {
// check the job status
jobGetResponse, err := c.jobGet(jobID)
if err != nil {
return err
}
jobConfig := jobGetResponse.GetJobInfo().GetConfig()
// If the job doesn't satisfy owner or label constraints, return
if owner != "" && jobConfig.GetOwningTeam() != owner {
fmt.Printf("No matching job found\n")
tabWriter.Flush()
return nil
}
if len(labels) > 0 {
jobHasStopLabels := false
for _, label := range jobConfig.Labels {
if jobStopLabelSet.Contains(strings.TrimSpace(label.GetKey()) +
":" + strings.TrimSpace(label.GetValue())) {
jobHasStopLabels = true
break
}
}
if !jobHasStopLabels {
fmt.Fprintf(
tabWriter,
"No matching job found\n",
)
tabWriter.Flush()
return nil
}
}
if util.IsPelotonJobStateTerminal(
jobGetResponse.GetJobInfo().GetRuntime().GetState()) {
fmt.Fprintf(
tabWriter,
"Job is in terminal state: %s\n", jobGetResponse.GetJobInfo().
GetRuntime().GetState().String(),
)
tabWriter.Flush()
return nil
}
jobsToStop = append(jobsToStop, &peloton.JobID{Value: jobID})
} else {
jobStates := []job.JobState{
job.JobState_INITIALIZED,
job.JobState_PENDING,
job.JobState_RUNNING,
}
spec := &job.QuerySpec{
JobStates: jobStates,
Owner: owner,
Labels: jobStopLabels,
Pagination: &query.PaginationSpec{
Limit: jobStopLimit,
MaxLimit: jobStopMaxLimit,
},
}
request := &job.QueryRequest{
Spec: spec,
SummaryOnly: true,
}
response, err := c.jobClient.Query(c.ctx, request)
if err != nil {
return err
}
for _, jobSummary := range response.GetResults() {
printResponseJSON(jobSummary)
jobsToStop = append(jobsToStop, jobSummary.GetId())
}
if !isForceStop && !askForConfirmation(jobStopConfirmationMessage) {
return nil
}
}
if len(jobsToStop) == 0 {
fmt.Fprintf(tabWriter, "No matching job(s) found\n")
tabWriter.Flush()
return nil
}
for _, jobID := range jobsToStop {
request := &task.StopRequest{
JobId: jobID,
}
response, err := c.taskClient.Stop(c.ctx, request)
if err != nil {
stopFailedJobIDs = append(stopFailedJobIDs,
jobID.GetValue())
errs = multierr.Append(errs, err)
} else {
stopJobIDs = append(stopJobIDs, jobID.GetValue())
}
if showProgress {
continue
}
printTaskStopResponse(response, c.Debug)
// Retry one more time in case failedInstanceList is non zero
if len(response.GetInvalidInstanceIds()) > 0 {
fmt.Fprint(
tabWriter,
"Retrying failed tasks",
)
response, err = c.taskClient.Stop(c.ctx, request)
if err != nil {
errs = multierr.Append(errs, err)
continue
}
printTaskStopResponse(response, c.Debug)
}
}
if showProgress {
for _, jobID := range jobsToStop {
if err := c.pollStatusWithTimeout(jobID); err != nil {
errs = multierr.Append(errs, err)
}
}
}
fmt.Fprintf(tabWriter, "Stopping jobs: %v\n", stopJobIDs)
if len(stopFailedJobIDs) != 0 {
fmt.Fprintf(tabWriter, "Error stopping jobs: %v\n", stopFailedJobIDs)
}
tabWriter.Flush()
return errs
}